作者:weapon,闲来笑浮生悬笔一卷入毫端,朱绂临身可与言者不过二三。
博客: zhihu.com/people/hong-wei-peng
起步
Python 提供的多线程模型中并没有提供读写锁,读写锁相对于单纯的互斥锁,适用性更高,可以多个线程同时占用读模式的读写锁,但是只能一个线程占用写模式的读写锁。
通俗点说就是当没有写锁时,就可以加读锁且任意线程可以同时加;而写锁只能有一个线程,且必须在没有读锁时才能加上。
简单的实现
这是读写锁的一个简单的实现,
self.read_num
用来保存获得读锁的线程数,这个属性属于临界区,对其操作也要加锁,所以这里需要一个保护内部数据的额外的锁
self._extra
。
但是这个锁是不公平的。理想情况下,线程获得所的机会应该是一样的,不管线程是读操作还是写操作。而从上述代码可以看到,读请求都会立即设置
self.read_num += 1
,不管有没有获得锁,而写请求想要获得锁还得等待
read_num
为 0 。
所以这个就造成了只有锁没有被占用或者没有读请求时,可以获得写权限。我们应该想办法避免读模式锁长期占用。
读写锁的优先级
读写锁也有分 读优先 和 写优先。上面的代码就属于读优先。
如果要改成写优先,那就换成去记录写线程的引用计数,读和写在同时竞争时,可以让写线程增加写的计数,这样可使读线程的读锁一直获取不到, 因为读线程要先判断写的引用计数,若不为0,则等待其为 0,然后进行读。这部分代码不罗列了。
但这样显然不够灵活。我们不需要两个相似的读写锁类。我们希望重构我们代码,使它更强大。
改进
为了能够满足自定义优先级的读写锁,要记录等待的读写线程数,并且需要两个条件
threading.Condition
用来处理哪方优先的通知。计数引用可以扩大语义:正数:表示正在读操作的线程数,负数:表示正在写操作的线程数(最多-1)
在获取读操作时,先然后判断时候有等待的写线程,没有,进行读操作,有,则等待读的计数加 1 后等待
Condition
通知;等待读的计数减 1,计数引用加 1,继续读操作,若条件不成立,循环等待;
在获取写操作时,若锁没有被占用,引用计数减 1,若被占用,等待写线程数加 1,等待写条件
Condition
的通知。
读模式和写模式的释放都是一样,需要根据判断去通知对应的
Condition
:
class
RWLock
(
object
):
def
__init__(
self
)
:
self
.lock = threading.Lock()
self
.rcond = threading.Condition(
self
.lock)
self
.wcond = threading.Condition(
self
.lock)
self
.read_waiter =
0
# 等待获取读锁的线程数
self
.write_waiter =
0
# 等待获取写锁的线程数
self
.state =
0
# 正数:表示正在读操作的线程数 负数:表示正在写操作的线程数(最多-1)
self
.owners = []
# 正在操作的线程id集合
self
.write_first = True
# 默认写优先,False表示读优先
def
write_acquire(
self
, blocking=True)
:
# 获取写锁只有当
me = threading.get_ident()
with
self
.
lock:
while
not
self
._write_acquire(me):
if
not
blocking:
return
False
self
.write_waiter +=
1
self
.wcond.wait()
self
.write_waiter -=
1
return
True
def
_write_acquire(
self
, me)
:
# 获取写锁只有当锁没人占用,或者当前线程已经占用
if
self
.state ==
0
or
(
self
.state <
0
and
me
in
self
.owners):
self
.state -=
1
self
.owners.append(me)
return
True
if
self
.state >
0
and
me
in
self
.
owners:
raise RuntimeError(
'cannot recursively wrlock a rdlocked lock'
)
return
False
def
read_acquire(
self
, blocking=True)
:
me = threading.get_ident()
with
self
.
lock:
while
not
self
._read_acquire(me):
if
not
blocking:
return
False
self
.read_waiter +=
1
self
.rcond.wait()
self
.read_waiter -=
1
return
True
def
_read_acquire(
self
, me)
:
if
self
.state <
0
:
# 如果锁被写锁占用
return
False
if
not
self
.
write_waiter:
ok = True
else:
ok = me
in
self
.owners
if
ok
or
not
self
.
write_first:
self
.state +=
1
self
.owners.append(me)
return
True
return
False
def
unlock(
self
)
:
me = threading.get_ident()
with
self
.
lock:
try:
self
.owners.remove(me)
except
ValueError:
raise RuntimeError(
'cannot release un-acquired lock'
)
if
self
.state >
0
:
self
.state -=
1
else:
self
.state +=
1
if
not
self
.
state:
if
self
.write_waiter
and
self
.
write_first:
# 如果有写操作在等待(默认写优先)
self
.wcond.notify()
elif
self
.
read_waiter:
self
.rcond.notify_all()
elif
self
.
write_waiter:
self
.wcond.notify()
read_release = unlock
write_release = unlock
用Python创建微信机器人
用Python机器人监听微信群聊
用Python获取摄像头并实时控制人脸
开源项目 | 用Python美化LeetCode仓库
推荐Python中文社区旗下的几个服务类公众号
征稿启事 | Python中文社区有奖征文
▼ 点击成为 社区注册会员 「在看」 一下,一起PY!