ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] ## 队列介绍(Queue) 进程彼此之间互相隔离,要实现进程间通信IPC(Inter-Process Communication),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的 **管道存在数据不安全性,而队列是基于管道+锁实现的,是安全的,所以主要掌握队列即可** ### **队列** Queue([maxsize]): 创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 ~~~ maxsize是队列中允许最大项数,省略则无大小限制。 但需要明确: 1、队列内存放的是消息而非大数据 2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小 ~~~ ### **主要方法介绍:** --- * q.put(item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。 block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。 timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 * q.get( [ block [ ,timeout ] ] ) 返回队列q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。 block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常。 timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 --- * q.get_nowait( ) 同q.get(False)方法。如果队列中有项目则获取,如果没有也不阻塞而是直接抛出异常 * q.put_nowait( ) 同q.put(False)方法。往队列中放入数据,如果队列已满,则不能等待而直接抛出异常(容易丢失数据) --- * q.qsize() 返回队列中目前项目的正确数量,多进程模式下并不可靠。 * q.empty() 判断队列q是否为空,空返回True,多进程模式下并不可靠。 * q.full() 判断队列q是否已满,已满返回为True,多进程模式下并不可靠。 --- * q.close() 关闭队列,防止队列中加入更多数据。已入队列但尚未写入的数据不收影响, 如果q被垃圾收集,将自动调用此方法。 不会对正在被阻塞的get或put方法使用者返回错误 ### **队列的简单使用** 一个进程不停的生成对象并加入到队列中,另外一个进程不停的从队列中获取数据,如果队列中没有数据则阻塞,直到产生新数据 ~~~ import os,time,random from multiprocessing import Process,Queue def inputQ(queue,i):# 向queue中输入数据的函数 info = str('输入进程:%s 进程ID:%s'%(i,os.getpid())) queue.put(info) time.sleep(random.randint(1,3)) def outputQ(queue,i):# 向queue中输出数据的函数 info = queue.get() print('输出进程:%s 进程ID:%s \t[%s]'%(i,os.getpid(),info)) # print(info) if __name__ == '__main__': record1 = [] record2 = [] queue = Queue(3) for i in range(5):# 输入进程 process = Process(target=inputQ,args=(queue,i)) process.start() record1.append(process) for i in range(5):# 输出进程 process = Process(target=outputQ,args=(queue,i)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join() ~~~ 运行结果 ``` 输出进程:0 进程ID:12320 [输入进程:0 进程ID:11584] 输出进程:1 进程ID:8852 [输入进程:2 进程ID:3716] 输出进程:2 进程ID:11744 [输入进程:1 进程ID:12372] 输出进程:3 进程ID:1252 [输入进程:3 进程ID:8904] 输出进程:4 进程ID:12432 [输入进程:4 进程ID:12968] ``` ## 管道(Pipe) 管道在数据管理上是不安全的,而队列是基于管道+锁实现了安全的数据管理的,所以管道知识了解即可 管道实例化时会产生两个端口(左端和右端),分别都可以收发数据,\ ### 简单语法案例如下: ~~~ from multiprocessing import Pipe l,r=Pipe() #实例化 l.send('来自:l') #从左端口发数据 print(r.recv()) #从右端口收数据 r.send('来自:r') #从右端口发数据 print(l.recv()) #从左端口收数据 l.close() #关闭左端口,不能再写入数据 #运行结果: 来自:l 来自:r ~~~ 如果端口不关闭,且管道没有数据了,再用recv获取就会导致程序阻塞 如果端口关闭了,且管道没有数据了,再用recv获取就会抛出`EOFError`异常,可以捕获 ### 多进程下语法案例: 由于管道两端都可以结束数据,所以在多进程下,需要在不不适用关闭不使用的端口,才能在管道中没有数据,捕获异常以便结束 ~~~ from multiprocessing import Pipe,Process def cons(L,R): L.close() #3.子进程不用L输入,所以一开始就关闭 while True: try: #子进程用R端接收 print(R.recv()) except EOFError:break if __name__ == '__main__': L,R=Pipe() p1=Process(target=cons,args=(L,R)).start() R.close() #1.主进程不使R端收发数据,所以一开始就关闭R端 L.send('来自L') L.close() #2.主进程用L端发完后,也要关闭Ll端 ~~~ 必须以上端口都关闭后,才能产生我们需要的报错,然后才能捕获后停止程序 ## 生产者消费者模型 ### **为什么要使用生产者消费者模型** 生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。 同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。 ### **什么是生产者和消费者模式** 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 **阻塞队列就是用来给生产者和消费者解耦的** ### 生产者消费者模型总结 1. 程序中有两类角色 一类负责生产数据(生产者) 一类负责处理数据(消费者) 2. 引入生产者消费者模型为了解决的问题是 平衡生产者与消费者之间的速度差 程序解开耦合 3. 如何实现生产者消费者模型 生产者<--->队列<--->消费者 ### 生产者消费者模型实现 基于队列来实现一个生产者消费者模型 ~~~ from multiprocessing import Process,Queue import time,random,os def consumer(q,name):# 消费者们:即吃货们 while True: res=q.get() time.sleep(random.randint(1,3)) print('%s 吃 %s' %(name,res)) def producer(q,name,food): #生产者们:即厨师们 for i in range(3): time.sleep(random.randint(1,3)) res='%s%s' %(food,i) q.put(res) print('%s 生产了 %s' %(name,res)) if __name__ == '__main__': q=Queue() p1=Process(target=producer,args=(q,'noah','包子')) c1=Process(target=consumer,args=(q,'bobo')) p1.start() c1.start() print('主'.center(15,'-')) ~~~ 执行结果 ~~~ -------主------- noah 生产了 包子0 noah 生产了 包子1 bobo 吃 包子0 noah 生产了 包子2 bobo 吃 包子1 bobo 吃 包子2 ~~~ 此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。 解决方式是让生产者在生产完毕后,往队列中再发一个结束信号(None),这样消费者在接收到结束信号后就可以break出死循环 ~~~ from multiprocessing import Process, Queue import time, random, os def consumer(q, name):# 消费者们:即吃货们 while True: res = q.get() if res is None:break time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (name, res)) def producer(q, name, food):# 生产者们:即厨师们 for i in range(3): time.sleep(random.randint(1, 3)) res = '%s%s' % (food, i) q.put(res) print('%s 生产了 %s' % (name, res)) if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=(q, 'noah', '包子')) c1 = Process(target=consumer, args=(q, 'bobo')) p1.start() c1.start() print('主'.center(15, '-')) p1.join() q.put(None) ~~~ 但是当有多个生产者和多个消费者时,几个消费者就需要发送几次结束信号,所以可以使用更高级的JoinableQueue队列 ## JoinableQueue队列 像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 但从实际案例中,可以看出JoinableQueue队列也并不会节省好多代码,加上真实环境中消息中间件的大量应用,队列实际使用并不多 ### 语法介绍 **JoinableQueue([maxsize])** * 参数介绍 ~~~ maxsize是队列中允许最大项数,省略则无大小限制。 ~~~ ### 方法介绍 JoinableQueue的实例p除了与Queue对象相同的方法之外还具有: * q.task_done(): 使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。 如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常 * q.join(): 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止 ### 基于JoinableQueue实现生产者消费者模型 ~~~ from multiprocessing import Process, Queue,JoinableQueue import time, random, os def consumer(q, name):# 消费者们:即吃货们 while True: res = q.get() time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (name, res)) q.task_done() #发送信号给队列,已经从队列中取走一个数据并处理完毕了 def producer(q, name, food):# 生产者们:即厨师们 for i in range(2): time.sleep(random.randint(1, 3)) res = '%s%s' % (food, i) q.put(res) print('%s 生产了 %s' % (name, res)) q.join() #等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=(q, 'noah', '包子')) p2 = Process(target=producer, args=(q, 'claire', '骨头')) c1 = Process(target=consumer, args=(q, 'bobo')) c1.daemon=True #注意这个守护进程 p1.start() p2.start() c1.start() print('主'.center(15, '-')) p1.join() p2.join() #1、主进程等生产者p1、p2结束 #2、而p1、p2、是在消费者把所有数据都取干净之后才会结束 #3、主进程结束后,被设置成守护进程的生产者也会结束 ~~~ 执行结果: ``` -------主------- noah 生产了 包子0 noah 生产了 包子1 claire 生产了 骨头0 bobo 吃 包子0 claire 生产了 骨头1 bobo 吃 包子1 bobo 吃 骨头0 bobo 吃 骨头1 Process finished with exit code 0 ``` ## 进程之间的数据共享(Manager) 进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此 **但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。** **正式环境会使用数据库来解决现在进程之间的数据共享问题。** Manager支持很多类型的进程方法,但常用的是list, dict两种 ~~~ from multiprocessing import Process,Manager,Lock def wahaha(lock,i,dic): with lock: #操作进程间共享数据一定要要加锁 dic['count']-=1 if __name__ == '__main__': lock=Lock() m=Manager() dic=m.dict({'count':5}) p_l=[] for i in range(5): p1=Process(target=wahaha,args=(lock,i,dic)) p_l.append(p1) p1.start() for p in p_l: p.join() print('end.....%s'%dic) ~~~ 执行结果: ~~~ end.....{'count': 0} ~~~