资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

41线程3_RLock_Condition_Barrier-创新互联

目录

网站的建设创新互联建站专注网站定制,经验丰富,不做模板,主营网站定制开发.小程序定制开发,H5页面制作!给你焕然一新的设计体验!已为成都围栏护栏等企业提供专业服务。

threading.RLock类:...1

threading.Condition类:...2

threading.Barrier类:...4

threading.RLock类:

可重入锁,是线程相关的锁;

线程A获得可重复锁,并可多次成功获取,不会阻塞,最后要在线程A中做和acquire次数相同的release(拿多少次锁,还多少回来);

注,线程相关:

threading.local类;

例:

lock = threading.RLock()

ret = lock.acquire()

print(ret)

ret = lock.acquire(timeout=5)

print(ret)

ret = lock.acquire(False)

print(ret)

ret = lock.acquire(False)   #全能拿到锁

print(ret)

lock.release()

lock.release()

lock.release()

lock.release()

# lock.release()   #前面没有对应的acquire,抛RuntimeError: cannot release un-acquired lock

def sub(lock:threading.RLock):

lock.release()   #主线程中加的,不能在子线程中释放,理解线程级别

threading.Thread(target=sub, args=(lock,)).start()

输出:

True

True

True

True

Exception in thread Thread-1:

Traceback (most recent call last):

File "D:\Python\Python35\lib\threading.py", line 914, in _bootstrap_inner

self.run()

File "D:\Python\Python35\lib\threading.py", line 862, in run

self._target(*self._args, **self._kwargs)

File "E:/git_practice/cmdb/example_threading2.py", line 249, in sub

lock.release()

RuntimeError: cannot release un-acquired lock

threading.Condition类:

Condition(lock=None),构造方法,可传入一个lock或RLock对象,默认是RLock;

cond = threading.Condition()

cond.acquire(*args),获取锁;

cond.release()

cond.wait(timeout=None),等待或超时;

cond.notify(n=1),唤醒至多指定数目个数的等待线程,默认1个,没有等待的线程就没有任何操作,源码中waiter;

cond.notify_all(),唤醒所有等待的线程,源码中waiters;

总结:

Condition用于生产者-消费者模型,解决生产者-消费者速度匹配问题;

采用了通知机制,非常有效率;

使用Condition,必须先acquire,用完要release,因为内部使用了锁,默认使用RLock,最好的方式是使用with上下文;

消费者wait等待通知,生产者生产好消息,对消费者发通知,可使用notify或notify_all;

可把Condition理解为一把高级的锁,它提供了Lock、RLock更高级的功能,允许我们能够控制复杂的线程同步问题;

threading.Condition内部维护了一个锁对象(默认是RLock),可在创建Condition对象时把锁对象作为参数传入;

threading.Condition也提供了acquire和release方法,含义与锁的一致,其实它只是简单调用内部锁对象的对应的方法而已;

threading.Condition还提供了wait、notify、notify_all方法:

wait([timeout]),释放内部所占用的锁,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供timeout),当线程被唤醒并重新占用锁时,程序才会继续执行下去;

notify(),唤醒一个挂起的线程(如果存在挂起的线程),notify()不会释放所占用的锁;

notify_all(),唤醒所有挂起的线程(如果存在挂起的线程),不会释放所占用的锁;

Lock与RLock:

RLock允许在同一线程中被多次acquire,而Lock不允许这种情况;

如果使用RLock,那么 acquire和release必须成对出现,即调用了n次acquire,必须调用n次release才能真正释放所占用的锁;

例:

class Dispatcher:

def __init__(self):

self.data = 0

self.event = threading.Event()

def produce(self):

for i in range(100):

data = random.randint(1,100)

self.data = data

self.event.wait(1)

def custom(self):

while True:   #消费者浪费了大量cpu时间,主动来查看有没有数据

logging.info(self.data)   #有重复消费问题

self.event.wait(1)   #隔1秒生成1个

d = Dispatcher()

p = threading.Thread(target=d.produce)

c = threading.Thread(target=d.custom)

c.start()   #消费者先启动

p.start()

输出:

……

2018-08-06-15:54:25       Thread info: 13052 Thread-1 13

2018-08-06-15:54:25       Thread info: 12052 Thread-2 13

2018-08-06-15:54:26       Thread info: 12052 Thread-2 13

……

例:

class Dispatcher:

def __init__(self):

self.data = 0

self.event = threading.Event()

self.cond = threading.Condition()

def produce(self):

for i in range(100):

data = random.randint(1,100)

# logging.info(data)

with self.cond:

self.data = data

self.cond.notify(2)   #通知机制,有数据,通知消费者来消费;交给2个人做,一般是1(生产者)对多(消费者)

     self.cond.notify_all()   #通知所有消费者,1对多

self.event.wait(1)

def custom(self):

# while True:

while not self.event.is_set():

# logging.info(self.data)

with self.cond:   #消费者被迫匹配生产者

self.cond.wait()

logging.info(self.data)

# self.event.wait(1)

d = Dispatcher()

p = threading.Thread(target=d.produce)

# c = threading.Thread(target=d.custom)

# c1 = threading.Thread(target=d.custom)   #开启2个消费线程

# c.start()

# c1.start()

for i in range(5):   #开启5个消费线程;如果produce中self.conf.notify(2),生产者通知2个线程处理,5个消费者中谁抢在前谁处理

threading.Thread(target=d.custom, name='c-{}'.format(i)).start()

p.start()   #如果生产者先启动,已经生成的数据不会被消费者消费,除非在队列中

注:

以上有线程安全问题,解决:中间加MQ;

上例不是线程安全的,程序逻辑有很多瑕疵,但可很好的理解Condition的使用和生产者消费者模型;

一对多,其实就是广播模式;

threading.Barrier类:

屏障、栅栏,可以想象成路障、道闸,3.2引入;

Barrier(paties,action=None,timeout=None),构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值;

n_waiting,当前在屏障中等待的线程数;

paties,参与方数目,需要多少个等待;

wait(timeout=None),等待通过屏障,返回0到线程数count-1的整数,count为等待的线程总数,每个线程返回不同;如果wait方法设置了超时,并超时发送,屏障将处于broken状态;wait方法超时发生,屏障处于broken状态,直至reset;

broken,如果屏障处于打破的状态,返回True;

abort(),将屏障置于broken状态,等待中的线程或调用等待方法的线程中都会抛BrokenBarrierError异常,直至reset方法来恢复屏障;

reset(),恢复屏障,重新开始拦截;

应用场景:

1、并发初始化;如,centos7中systemd,能并行启动就并行;

所有线程都必须初始化完成后,才能继续工作,如运行前加载数据、检查,如果这些工作没完成就开始运行,将不能正常工作;

10个线程做10种工作准备,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程;

如,启动一个程序,先加载磁盘文件、缓存预热、初始化连接池等,这些工作齐头并进,不过只有等满足了,程序才能继续后向执行,假设数据库连接失败,则初始化工作失效,就要abort,屏障broken,所有线程收到异常退出;

2、工作量,有10个计算任务,完成6个就算工作完成,如求样本数、求平均数;

例:

def worker(barrier:threading.Barrier):

logging.info('n_waiting={}'.format(barrier.n_waiting))

try:

bid = barrier.wait()

logging.info('after barrier {}'.format(bid))

except threading.BrokenBarrierError:

logging.info('broken barrier is {}'.format(threading.current_thread()))

barrier = threading.Barrier(3)   #3个一拨3个一拨

for _ in range(3):   #依次3,4,5,6

threading.Thread(target=worker,args=(barrier,)).start()

输出:

2018-08-07-08:27:53       Thread info: 11496 Thread-1 n_waiting=0

2018-08-07-08:27:53       Thread info: 12540 Thread-2 n_waiting=1

2018-08-07-08:27:53       Thread info: 4612 Thread-3 n_waiting=2

2018-08-07-08:27:53       Thread info: 4612 Thread-3 after barrier 2

2018-08-07-08:27:53       Thread info: 11496 Thread-1 after barrier 0

2018-08-07-08:27:53       Thread info: 12540 Thread-2 after barrier 1

例:

for i in range(6):

if i == 2:   #屏障中等待2个,屏障被broken,wait的线程抛异常,新wait的线程也抛异常,直至屏障恢复,才继续按达到参与方的数目继续拦截

barrier.abort()

elif i == 3:

barrier.reset()

threading.Event().wait(1)

threading.Thread(target=worker,args=(barrier,)).start()

输出:

2018-08-07-09:21:49       Thread info: 12668 Thread-1 n_waiting=0

2018-08-07-09:21:50       Thread info: 12424 Thread-2 n_waiting=1

2018-08-07-09:21:50       Thread info: 12424 Thread-2 broken barrier is

2018-08-07-09:21:50       Thread info: 12668 Thread-1 broken barrier is

2018-08-07-09:21:51       Thread info: 11468 Thread-3 n_waiting=0

2018-08-07-09:21:51       Thread info: 11468 Thread-3 broken barrier is

2018-08-07-09:21:52       Thread info: 9788 Thread-4 n_waiting=0

2018-08-07-09:21:53       Thread info: 12680 Thread-5 n_waiting=1

2018-08-07-09:21:54       Thread info: 10948 Thread-6 n_waiting=2

2018-08-07-09:21:54       Thread info: 10948 Thread-6 after barrier 2

2018-08-07-09:21:54       Thread info: 9788 Thread-4 after barrier 0

2018-08-07-09:21:54       Thread info: 12680 Thread-5 after barrier 1

例:

wait方法超时发生,屏障处于broken状态,直至reset;

def worker(barrier:threading.Barrier, i:int):

logging.info('waiting for {} threads'.format(barrier.n_waiting))

try:

logging.info(barrier.broken)

if i < 3:

barrier_id = barrier.wait(1)

else:

if i == 6:

barrier.reset()

barrier_id = barrier.wait()

logging.info('after barrier {}'.format(barrier_id))

except threading.BrokenBarrierError:

logging.info('broken barrier. run.')

barrier = threading.Barrier(3)

for x in range(9):

threading.Event().wait(2)

threading.Thread(target=worker, args=(barrier,x), name='worker-{}'.format(x)).start()

输出:

2018-08-07-09:33:24       Thread info: 10556 worker-0 waiting for 0 threads

2018-08-07-09:33:24       Thread info: 10556 worker-0 False

2018-08-07-09:33:25       Thread info: 10556 worker-0 broken barrier. run.

2018-08-07-09:33:26       Thread info: 12752 worker-1 waiting for 0 threads

2018-08-07-09:33:26       Thread info: 12752 worker-1 True

2018-08-07-09:33:26       Thread info: 12752 worker-1 broken barrier. run.

2018-08-07-09:33:28       Thread info: 5324 worker-2 waiting for 0 threads

2018-08-07-09:33:28       Thread info: 5324 worker-2 True

2018-08-07-09:33:28       Thread info: 5324 worker-2 broken barrier. run.

2018-08-07-09:33:30       Thread info: 6716 worker-3 waiting for 0 threads

2018-08-07-09:33:30       Thread info: 6716 worker-3 True

2018-08-07-09:33:30       Thread info: 6716 worker-3 broken barrier. run.

2018-08-07-09:33:32       Thread info: 9180 worker-4 waiting for 0 threads

2018-08-07-09:33:32       Thread info: 9180 worker-4 True

2018-08-07-09:33:32       Thread info: 9180 worker-4 broken barrier. run.

2018-08-07-09:33:34       Thread info: 6788 worker-5 waiting for 0 threads

2018-08-07-09:33:34       Thread info: 6788 worker-5 True

2018-08-07-09:33:34       Thread info: 6788 worker-5 broken barrier. run.

2018-08-07-09:33:36       Thread info: 12044 worker-6 waiting for 0 threads

2018-08-07-09:33:36       Thread info: 12044 worker-6 True

2018-08-07-09:33:38       Thread info: 5020 worker-7 waiting for 1 threads

2018-08-07-09:33:38       Thread info: 5020 worker-7 False

2018-08-07-09:33:40       Thread info: 13052 worker-8 waiting for 2 threads

2018-08-07-09:33:40       Thread info: 13052 worker-8 False

2018-08-07-09:33:40       Thread info: 13052 worker-8 after barrier 2

2018-08-07-09:33:40       Thread info: 5020 worker-7 after barrier 1

2018-08-07-09:33:40       Thread info: 12044 worker-6 after barrier 0

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


网页名称:41线程3_RLock_Condition_Barrier-创新互联
URL标题:http://www.cdkjz.cn/article/dgiedj.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

大客户专线   成都:13518219792   座机:028-86922220