通知短信+运营短信,5秒速达,支持群发助手一键发送🚀高效触达和通知客户 广告
# 并发编程 [TOC] ## 进程 vs. 线程 对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。 真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。 由于每个进程至少要干一件事,所以,一个进程至少有一个线程。当然,像Word这种复杂的进程可以有多个线程,多个线程可以同时执行,多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。当然,真正地同时执行多线程也是需要多核CPU才可能实现。 进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个程序的执行实例就是一个进程。 Python支持的并发分为多线程并发与多进程并发。概念上来说,多进程并发即运行多个独立的程序,优势在于并发处理的任务都由操作系统管理,不足之处在于程序与各进程之间的通信和数据共享不方便;多线程并发则由程序员管理并发处理的任务,这种并发方式可以方便地在线程间共享数据(前提是不能互斥)。Python对多线程和多进程的支持都比一般编程语言更高级,最小化了需要我们完成的工作。 多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)著名的Apache最早就是采用多进程模式。 多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。 多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。在Windows上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程。 ### 如何选择 1. 在CPU密集型任务下,多进程更快,或者说效果更好; 2. 在IO密集型下,多线程能有效提高效率。 ### 计算密集型 vs. IO密集型 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如各种循环处理,计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。 涉及到网络、磁盘IO的任务都是IO密集型任务,如文件处理、网络爬虫等,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。 ## python的GUI? ## 线程池/进程池 标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。 ### 使用submit来操作线程池/进程池 #### 线程池 ```python from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message # 创建一个最大可容纳2个task的线程池 pool = ThreadPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task,返回future对象,对于Future对象可以简单地理解为一个在未来完成的操作。 future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束,此时因为time.sleep(2)的原因还没结束; time.sleep(3) print(future2.done()) # 判断task2是否结束,此时过了三秒,task1和task2都已结束 print(future1.result()) # 查看task1返回的结果,此时是直接返回结果hello,不用再等待2秒 print(future2.result()) # 查看task2返回的结果,此时是直接返回结果world,不用再等待2秒 ``` ``` [root@izbp11qoru4kuokur8uyr9z ~]# ps -eLf | grep python root 10849 10736 10849 2 3 14:53 pts/0 00:00:00 python _code2.py root 10849 10736 10850 0 3 14:53 pts/0 00:00:00 python _code2.py root 10849 10736 10851 0 3 14:53 pts/0 00:00:00 python _code2.py ``` #### 进程池 ```python # 虽然内部千差万别,但外部的api接口都一样 from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(10) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(11) print(future2.done()) print(future1.result()) print(future2.result()) ``` ``` [root@izbp11qoru4kuokur8uyr9z ~]# ps -eLf | grep python root 10989 10736 10989 1 3 15:16 pts/0 00:00:00 python _code2.py root 10989 10736 10992 0 3 15:16 pts/0 00:00:00 python _code2.py root 10989 10736 10993 0 3 15:16 pts/0 00:00:00 python _code2.py root 10990 10989 10990 0 1 15:16 pts/0 00:00:00 python _code2.py root 10991 10989 10991 0 1 15:16 pts/0 00:00:00 python _code2.py ``` ### 使用map/wait来操作线程池/进程池 #### map ```python import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() ### submit版本 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} # future_to_url : {<Future at 0x229c7018c88 state=running>: 'http://httpbin.org', <Future at 0x229c719ff98 state=running>: 'http://example.com/', <Future at 0x229c71bfb70 state=running>: 'https://api.github.com/'} # as_completed:扔进一组future实例的迭代,谁先完成返回谁 for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() # b'<!doctype html>\n<html>\n<head>\n ```` ' except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) ''' # 不按顺序返回,谁先结束返回谁 'http://example.com/' page is 1270 bytes 'http://httpbin.org' page is 8344 bytes 'https://api.github.com/' page is 2039 bytes ''' # map版本,代码更简洁 # 使用with语句会调用executor.__exit__方法,__exit__方法接着会调用executor.shutdown(wait=True)方法, # 然后就会在所有线程都执行完毕前阻塞线程 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # zip函数把两个可迭代的对象实例打包成一一对应的元祖 # map 与内置map方法类似,不过load_url函数会在多个线程中并发调用; for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data))) ''' map是按照URLS列表元素的顺序返回的: 'http://httpbin.org' page is 8344 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes ''' ``` #### wait wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED。 ```python from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) # 如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成。 # print(wait(futures)) # 如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成。 print(wait(futures, timeout=None, return_when='FIRST_COMPLETED')) ``` ### futures的其他方法 `Future.cancel()` : 可以终止某个线程和进程的任务 `Future.cancelled()` : 判断是否真的结束了任务 `Future.running()` : 判断是否还在运行 `Future.done()` : 判断是正常执行完毕的。 `Future.result(timeout=None)` : 查看线程或进程结果,其中timeout针对result结果做超时的控制。 `Future.add_done_callback(fn)` : 线程取消或者结束时会回调,fn有个参数代表当前的future对象 submit函数返回future对象,future提供了跟踪任务执行状态的方法。比如判断任务是否执行中future.running(),判断任务是否执行完成future.done()等等。 as_completed方法传入futures迭代器和timeout两个参数 默认timeout=None,阻塞等待任务执行完成,并返回执行完成的future对象迭代器,迭代器是通过yield实现的。 timeout>0,等待timeout时间,如果timeout时间到仍有任务未能完成,不再执行并抛出异常TimeoutError 其他请参考: https://docs.python.org/3/library/concurrent.futures.html#future-objects ### 实例分析进程池与线程池之间的差异 ```python # 计算网页大小,进程池方式 import requests from concurrent.futures import ProcessPoolExecutor import time,os def get(url): print('%s GET %s' %(os.getpid(),url)) response=requests.get(url) time.sleep(3) if response.status_code == 200: return {'url':url,'text':response.text} def parse(obj): res=obj.result() print('[%s] <%s> (%s)' % (os.getpid(), res['url'], len(res['text']))) if __name__ == '__main__': urls = [ 'https://www.python.org', 'https://www.baidu.com', 'https://www.jd.com', 'https://www.tmall.com', ] # 不用with的写法 # t = ProcessPoolExecutor(2) # for url in urls: # t.submit(get,url).add_done_callback(parse) # t.shutdown(wait=True) # t.__exit__方法会调用t.shutdown(wait=True)方法, with ProcessPoolExecutor(max_workers=4) as t: # 开一个进程池t,然后去并发下载网络数据, # 下载完毕后,由于主进程、子进程不是同一个进程空间,所以在解析数据时候,在主进程中add_done_callback去解析 [t.submit(get,url).add_done_callback(parse) for url in urls] print('主',os.getpid()) ''' output: 25896:是当前进程 29384,28224,22544,2404是开的进程池 29384 GET https://www.python.org 28224 GET https://www.baidu.com 22544 GET https://www.jd.com 2404 GET https://www.tmall.com [25896] <https://www.jd.com> (122591) [25896] <https://www.tmall.com> (230427) [25896] <https://www.baidu.com> (2443) [25896] <https://www.python.org> (48837) 主 25896 ''' # 计算网页大小,线程池方式 import requests from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time,os def get(url): print('pid:%s %s GET %s' % (os.getpid(),current_thread().getName(),url)) response=requests.get(url) time.sleep(3) if response.status_code == 200: return {'url':url,'text':response.text} def parse(obj): res=obj.result() print('pid:%s [%s] <%s> (%s)' % (os.getpid(), current_thread().getName(), res['url'], len(res['text']))) if __name__ == '__main__': urls = [ 'https://www.python.org', 'https://www.baidu.com', 'https://www.jd.com', 'https://www.tmall.com', ] with ThreadPoolExecutor(max_workers=4) as t: [t.submit(get,url).add_done_callback(parse) for url in urls] print('主',current_thread().getName(),os.getpid()) ''' # 主线程、子线程是同一个进程空间,所以在解析数据时候,可能主线程、子线程都会解析 pid:26788 ThreadPoolExecutor-0_0 GET https://www.python.org pid:26788 ThreadPoolExecutor-0_1 GET https://www.baidu.com pid:26788 ThreadPoolExecutor-0_2 GET https://www.jd.com pid:26788 ThreadPoolExecutor-0_3 GET https://www.tmall.com pid:26788 [ThreadPoolExecutor-0_2] <https://www.jd.com> (122591) pid:26788 [ThreadPoolExecutor-0_3] <https://www.tmall.com> (230425) pid:26788 [ThreadPoolExecutor-0_1] <https://www.baidu.com> (2443) pid:26788 [ThreadPoolExecutor-0_0] <https://www.python.org> (48837) 主 MainThread 26788 ''' ``` ### 注意事项: 1. 线程池或者进程池不能嵌套 ```python def wait_on_future(): # 会造成死锁的状态 f = executor.submit(pow, 5, 2) print(f.result()) executor = ThreadPoolExecutor(max_workers=1) executor.submit(wait_on_future) ``` ## 异步IO 考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行。 现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。 对应到Python语言,单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。