企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# IO IO指的就是输入输出。一般涉及到数据交换的地方都需要IO接口,比如说磁盘或者网络。 IO 编程中,Stream(流)是一个很重要的概念,可以把流想象成一个水管,数据就是水管里的水,但是只能单向流动。 Input Stream 就是数据从外面(磁盘、网络)流进内存,Output Stream 就是数据从内存流到外面去。 我们知道CPU和内存的速度远远高于外设的速度,也就是说在IO编程里面,存在速度严重不匹配的情况: 怎么办呢? - 让CPU等着,也就是程序暂停执行等待,这种模式称为同步IO - 另一种方法就是CPU不等待,然后去干别的去了,于是后续代码可以立刻执行,这种模式称为异步IO 使用异步 IO 来编写程序性能会远远高于同步 IO,但是异步 IO 的缺点是编程模型复杂。 比如你得知道什么时候通知你 “汉堡做好了”,而通知你的方法也各不相同。 如果是服务员跑过来找到你,这是回调模式 如果服务员发短信通知你,你就得不停地检查手机,这是轮询模式。 总之,异步 IO 的复杂度远远高于同步 IO。 ## 文件读写 ### 文件读 现代操作系统其实不允许普通的程序直接操作磁盘,实际上磁盘上的读写文件的功能都是由操作系统提供,而读写文件就是请求操作系统打开一个文件对象(也就是文件描述符),然后通过这个接口进行文件的读写 ```python try: f = open('/path/to/file', 'r') print(f.read()) finally: if f: f.close() ``` 文件读写可能产生IOError,不管是否出错,都会执行 `f.close()` 不过这么写还是比较繁琐,可以处用 `with` 语句 ```python with open('path/to/file', 'r') as f: print (f.read()) ``` 不过调用 `f.read()`有个巨大的缺点,它需要一次性将文件全部读到内存,如果文件比较大的话,内存就爆了。 可以使用 `read(size)` 来限定每次读取size个字节的内容 也可以使用 `readline()` 来读取一行内容,并且按行返回 list 总结一下,如果文件很小,read() 一次性读取最方便;如果不能确定文件大小,反复调用 read(size) 比较保险;如果是配置文件,调用 readlines() 最方便: ```python for line in f.readlines(): print(line.strip()) # 把末尾的'\n'删掉 ``` 如果要读取非UTF-8编码的文本文件,可以将 `encoding` 参数传入进去。 ```python >>> f = open('/Users/michael/gbk.txt', 'r', encoding='gbk') >>> f.read() '测试' ``` 如果有编码不规范的,可以会遇到 `UnicodeDecodeError` 可以直接忽略错误 ```python >>> f = open('/Users/michael/gbk.txt', 'r', encoding='gbk', errors='ignore') ``` ### 文件写 如果要写文件 ```python with open('/Users/michael/test.txt', 'w') as f: f.write('Hello, world!') ``` 如果我们希望追加到文件末尾怎么办?可以传入'a' 以追加(append)模式写入。 要写入特定编码的文本文件,请给 open() 函数传入 encoding 参数,将字符串自动转换成指定编码。 ## 文件和目录 如果要使用Python对文件进行删除、新建等,可以使用Python内置的 `os`模块 ```python import os # 详细系统信息 os.uname() # 操作系统类型 os.name # 环境变量 os.environ # 获取环境变量的值 os.environ.get('key') ``` ### 目录操作 > 操作文件和目录的函数一部分放在 os 模块中,一部分放在 os.path 模块中 查看路径: ```python >>> os.path.abspath('.') ``` 新建一个目录和删除一个目录 ```python >>> os.path.join('/Users/michael', 'testdir') '/Users/michael/testdir' # 然后创建一个目录: >>> os.mkdir('/Users/michael/testdir') # 删掉一个目录: >>> os.rmdir('/Users/michael/testdir') ``` 拆分路径 ```python >>> os.path.split('/Users/michael/testdir/file.txt') ``` os.path.splitext() 可以直接让你得到文件扩展名 ```python >>> os.path.splitext('/path/to/file.txt') ('/path/to/file', '.txt') ``` 注意这些合并、拆分路径的函数并**不**要求目录和文件要真实存在,它们只对字符串进行操作 如果要列出当前目录下的所有目录 ```python >>> [x for x in os.listdir('.') if os.path.isdir(x)] ``` 要列出所有的.py 文件,也只需一行代码: ```python >>> [x for x in os.listdir('.') if os.path.isfile(x) and os.path.splitext(x)[1]=='.py' ``` ### 文件操作 重命名 ```python >>> os.rename('test.txt', 'test.py') ``` 删掉文件 ```python >>> os.remove('test.py') ``` 复制文件需要使用 `shutil` 模块 ## StringIO 和 Bytes IO ### StringIO 除了file之外,内存的字节流、网络流等都可以使用open()返回一个有read()方法的对象。 在Python中统程为 file-like Object StringIO 就是在内存中创建的 file-like Object,常用作临时缓冲。 也就是在内存中读写 str。 要把 str 写入 StringIO,我们需要先创建一个 StringIO,然后,像文件一样写入即可 ```python >>> from io import StringIO >>> f = StringIO() >>> f.write('hello') 5 >>> print(f.getvalue()) hello ``` getvalue() 方法用于获得写入后的 str 要读取 StringIO ,可以用一个 str初始化 StringIO,然后像读文件一样读 ```python >>> from io import StringIO >>> f = StringIO('Hello!\nHi!\nGoodbye!') >>> while True: ... s = f.readline() ... if s == '': ... break ... print(s.strip()) ... Hello! Hi! Goodbye! ``` ### BytesIO 如果要操作二进制数据,就需要使用 BytesIO。 BytesIO 实现了在内存中读写 bytes,我们创建一个 BytesIO,然后写入一些 bytes: ```python >>> from io import BytesIO >>> f = BytesIO() >>> f.write('中文'.encode('utf-8')) 6 >>> print(f.getvalue()) b'\xe4\xb8\xad\xe6\x96\x87' ``` 请注意,写入的不是 str,而是经过 UTF-8 编码的 bytes。 ```python >>> from io import BytesIO >>> f = BytesIO(b'\xe4\xb8\xad\xe6\x96\x87') >>> f.read() b'\xe4\xb8\xad\xe6\x96\x87' ``` 和 StringIO 类似,可以用一个 bytes 初始化 BytesIO,然后,像读文件一样读取: ## 序列化 在程序运行过程中,所有的变量都在内存中,一旦掉点,就内存就会被回收。 比如说定义 `d = dict(name='Bob', age=20, score=88)`,如果把name 修改为了 'Bill',但是没有保存到磁盘上,下次重新运行,则变量又被初始化为 'Bob' 我们把变量从内存中变成可存储或者传输的过程称为序列化,Python中称为 `pickling` 其他语言中也称之为 serialization,marshalling,flattening 等等, 反过来,把变量内容从序列化的对象中重新读到内存称为反序列化。 ### pickle 如何把对象序列化并写入文件呢? ```python >>> import pickle >>> d = dict(name='Bob', age=20, score=88) >>> pickle.dumps(d) ``` 这样把任意对象序列化成一个 bytes,然后,就可以把这个 bytes 写入文件 或者可以直接写入一个 file-like Object中 ```python >>> f = open('dump.txt', 'wb') >>> pickle.dump(d, f) >>> f.close() ``` 如何反序列化呢? ```python >>> f = open('dump.txt', 'rb') >>> d = pickle.load(f) >>> f.close() >>> d {'age': 20, 'score': 88, 'name': 'Bob'} ``` 但是这样以二进制的方法进行存储,与其他的语言甚至其他版本的Python都不兼容。 所以我们可以考虑使用JSON这样比较标准的格式。 ### JSON JSON与Python内置的数据类型对应如下: | JSON 类型 | Python 类型 | | :--------- | :----------- | | {} | dict | | [] | list | | "string" | str | | 1234.56 | int 或 float | | true/false | True/False | | null | None | 如何把一个Python对象序列化为JSON ```python >>> import json >>> d = dict(name='Bob', age=20, score=88) >>> json.dumps(d) ``` 如果要把JSON反序列化为Python对象 可以使用 `loads()` ```python >>> json_str = '{"age": 20, "score": 88, "name": "Bob"}' >>> json.loads(json_str) ``` 刚刚我们只是试验了将dict对象直接序列化为JSON的 {} 对于普通的类,我们直接使用 `json.dumps(s)` 会得到一个 `TypeError` 因为 Student不是一个可以序列化的json对象 我们还需要写一个转换函数再把函数传入到 `dumps()`里面。 ```python def student2dict(std): return { 'name': std.name, 'age': std.age, 'score': std.score } ``` 也就是说,实例首先把转换成 `dict` ,然后序列化为JSON ```python >>> print(json.dumps(s, default=student2dict)) ``` 其实我们可以偷一个懒,把任意的class实例都变为 dict 因为通常 class 的实例都有一个__dict__属性,它就是一个 dict,用来存储实例变量 ```python print (json.dumps(s, default=lambda obj: obj.__dict__)) ``` 如果我们要把 JSON 反序列化为一个 Student 对象实例,loads() 方法首先转换出一个 dict 对象,然后,我们传入的 object_hook 函数负责把 dict 转换为 Student 实例: ```python def dict2student(d): return Student(d['name'], d['age'], d['score']) ``` 所以重载代码为: ```python >>> json_str = '{"age": 20, "score": 88, "name": "Bob"}' >>> print(json.loads(json_str, object_hook=dict2student)) <__main__.Student object at 0x10cd3c190> ``` # 异步IO 在一个线程中,CPU执行速度非常快,但是IO操作又特别的慢,如果遇到了IO操作,CPU就必须停下来等待。 也就是说在 IO 操作的过程中,当前线程被挂起,而其他需要 CPU 执行的代码就无法被当前线程执行了。 这样我们就必须多开几个线程或者进程来解决这个问题了,每个用户都会分配到一个线程,如果遇到IO导致线程被挂起,其他用户的线程不受影响。 这样虽然解决了并发问题,但是首先系统不能无上限的增加线程,而且系统切换线程的开销也非常大,所以一旦线程数量过多,CPU的时间就花在了线程切换上了, 另外一种解决思路是异步IO。 当代码需要执行一个耗时的 IO 操作时,它只发出 IO 指令,并不等待 IO 结果,然后就去执行其他代码了。一段时间后,当 IO 返回结果时,再通知 CPU 进行处理。 异步 IO 模型需要一个消息循环,在消息循环中,主线程不断地重复 “读取消息 - 处理消息” 这一过程: ```python loop = get_event_loop() while True: event = loop.get_event() process_event(event) ``` 消息模型其实早在应用在桌面应用程序中了。一个 GUI 程序的主线程就负责不停地读取消息并处理消息。所有的键盘、鼠标等消息都被发送到 GUI 程序的消息队列中,然后由 GUI 程序的主线程处理。由于 GUI 线程处理键盘、鼠标等消息的速度非常快,所以用户感觉不到延迟。 但是在某些时候,GUI 线程在一个消息处理的过程中遇到了问题,导致一次消息处理时间过长,此时,用户会感觉到整个 GUI 程序停止响应了,敲键盘、点鼠标都没有反应。所以在消息模型中,处理一个消息必须非常迅速,否则,主线程将无法及时处理消息队列中的其他消息,导致程序看上去停止响应。 那么消息模型是如何解决同步 IO 必须等待 IO 操作这一问题的呢? 当遇到 IO 操作时,代码只负责发出 IO 请求,不等待 IO 结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。 当 IO 操作完成后,将收到一条 “IO 完成” 的消息,处理该消息时就可以直接获取 IO 操作结果。 在 “发出 IO 请求” 到收到 “IO 完成” 的这段时间里,同步 IO 模型下,主线程只能挂起,但异步 IO 模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。 这样,在异步 IO 模型下,一个线程就可以同时处理多个 IO 请求,并且没有切换线程的操作。对于大多数 IO 密集型的应用程序,使用异步 IO 将大大提升系统的多任务处理能力。 ## 协程 在学习异步 IO 模型前,我们先来了解协程。 协程,又称微线程,纤程。英文名 Coroutine 我们知道函数(也叫子程序)是通过栈来实现的,一个线程就执行一个子程序,所有语言都是层级调用的。比如 A 调用 B,B 在执行过程中又调用了 C,C 执行完毕返回,B 执行完毕返回,最后是 A 执行完毕 子程序调用总是一个入口,一次返回,调用顺序是明确的 而协程在执行过程中,可中断,然后执行别的子程序,适当的时候再返回 注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似 CPU 的中断 ```python def A(): print('1') print('2') print('3') def B(): print('x') print('y') print('z') ``` 假设由协程执行,在执行 A 的过程中,可以随时中断,去执行 B,B 也可能在执行过程中中断再去执行 A 看起来 A、B 的执行有点像多线程,但协程的特点在于是一个线程执行 与多线程相比,子程序切换不是线程切换,而是由程序自身控制,所以没有线程切换的开销,和多线程相比,线程数量越多,协程性能越高。 而且协程还不需要多线程的 **锁机制**。因为只有一个线程,不存在同时写冲突。在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。 因为协程是一个线程执行,那怎么利用多核 CPU 呢? 最简单的方法是多进程 + 协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能 ## 使用 yield实现协程 Python 对协程的支持是通过 generator 实现的。 在 generator 中,我们不但可以通过 for 循环来迭代,还可以不断调用 next() 函数获取由 yield 语句返回的下一个值。 但是 Python 的 yield 不但可以返回一个值,它还可以接收调用者发出的参数 比如说: 传统的生产者 - 消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。 如果改用协程,生产者生产消息后,直接通过 yield 跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产 ```python def produce(c): # 启动生成器 c.send(None) n = 0 while n < 5: # 生产的东西 n = n + 1 print('[PRODUCER] Producing %s...' % n) # 切换到consumer执行 r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) # 关闭consumer c.close() def consumer(): r = '' while True: # 通过 yield 拿到消息,处理完毕又返回结果。 n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) # 返回结果 r = '200 OK' c = consumer() produce(c) ``` 执行结果: ```python [PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK ``` 需要先理解如下几点: - 例子中的 c.send(None),其功能类似于 next(c) - - n = yield r,这里是一条语句,赋值语句先计算 = 右边,由于右边是 yield 语句,所以 yield 语句执行完以后,进入暂停,而赋值语句在下一次启动生成器的时候首先被执行 - send 在接受 None 参数的情况下,等同于 next(generator) 的功能,但 send 同时也可接收其他参数,比如例子中的 c.send(n),要理解这种用法 比如说定义 ```python >>> def num(): a = yield 1 while True: a = yield a >>> c = num() >>> c.send(None) 1 >>> c.send(5) 5 >>> c.send(100) 100 ``` 首先使用 c.send(None),返回生成器的第一个值,a = yield 1 ,也就是 1(但此时,并未执行赋值语句) 接着我们使用了 c.send(5),再次启动生成器,并同时传入了一个参数 5,再次启动生成的时候,从上次 yield 语句断掉的地方开始执行,即 a 的赋值语句,由于我们传入了一个参数 5,所以 a 被赋值为 5 接着程序进入 whlie 循环,当程序执行到 a = yield a,同理,先返回生成器的值 5,下次启动生成器的时候,再执行赋值语句,以此类推... > 但注意,在一个生成器函数未启动之前,是不能传递值进去。也就是说在使用 c.send(n) 之前,必须先使用 c.send(None) 或者 next(c) 来返回生成器的第一个值。 下面我们来看之前的例子 consumer是一个 generator , 把一个 consumer 传入 producer 里面之后, - 首先调用 `c.send(None)` 启动生成器,n = yield r,此时 r 为空,n 还未赋值,然后生成器暂停,等待下一次启动 - 生成器返回空值后进入暂停,produce(c) 接着往下运行,进入 While 循环,此时 n 为 1,所以打印:[PRODUCER] Producing 1... - 运行到 r = c.send(1),再次启动生成器,并传入了参数 1,而生成器从上次 n 的赋值语句开始执行,n 被赋值为 1,n 存在,if 语句不执行,然后打印: ```python [CONSUMER] Consuming 1... ``` 接着 r 被赋值为'200 OK',然后又进入循环,执行 n = yield r,返回生成器的第二个值,'200 OK',然后生成器进入暂停,等待下一次启动。 也就是一旦生产了东西,通过 `c.send(n)`切换到 consumer执行 - consumer通过 yield 拿到消息,处理之后,又通过 yield 把结果传回 - producer 拿到 consumer 处理的结果,继续生产下一条消息,生成器返回'200 OK' 进入暂停后,produce(c) 往下运行,进入 r 的赋值语句,r 被赋值为'200 OK',接着往下运行,打印: ```python [PRODUCER] Consumer return: 200 OK ``` - producer决定不生产了,通过 `c.close()` 关闭 consumer 整个过程无锁,由一个线程执行,produce 和 consumer 协作完成任务,所以称为 “协程”,而非线程的抢占式多任务。 ## asyncio 在 Python3.4 中,协程都是通过使用 yield from 和 asyncio模块中的 @asyncio.coroutine 来实现的。asyncio 专门被用来实现异步 IO 操作 ### yield from yield 在生成器中有中断的功能,可以传出值,也可以从函数外部接收值,而 yield from 的实现就是简化了 yield 操作。 ```python def generator_1(titles): yield titles def generator_2(titles): yield from titles titles = ['Python','Java','C++'] for title in generator_1(titles): print('生成器1:',title) for title in generator_2(titles): print('生成器2:',title) ``` 执行结果: ```python 生成器1: ['Python', 'Java', 'C++'] 生成器2: Python 生成器2: Java 生成器2: C++ ``` yield titles 返回了 titles 完整列表,而 yield from titles 实际等价于: ```python for title in titles: # 等价于yield from titles yield title  ``` 而且yield from 还省去了很多异常处理。 ```python def generator_1(): total = 0 while True: x = yield print('加',x) if not x: break total += x return total def generator_2(): # 委托生成器 while True: total = yield from generator_1() # 子生成器 print('加和总数是:',total) def main(): # 调用方 g1 = generator_1() g1.send(None) g1.send(2) g1.send(3) g1.send(None) # g2 = generator_2() # g2.send(None) # g2.send(2) # g2.send(3) # g2.send(None) main() ``` - 【子生成器】:yield from 后的 generator_1 () 生成器函数是子生成 器 - 【委托生成器】:generator_2 () 是程序中的委托生成器,它负责委托子 生成器完成具体任务。 - 【调用方】:main () 是程序中的调用方,负责调用委托生成器。 **yield from 在其中还有一个关键的作用是:建立调用方和子生成器的通道,** - 在上述代码中 main() 每一次在调用 send(value) 时,value 不是传递给了委托生成器 generator_2 (),而是借助 yield from 传递给了子生成器 generator_1 () 中的 yield - 同理,子生成器中的数据也是通过 yield 直接发送到调用方 main () 中。 - 之后我们的代码都依据调用方-子生成器-委托生成器的规范形式书写。 ## 结合 @asyncio.coroutine 实现协程 在协程中,只要是和 IO 任务类似的、耗费时间的任务都需要使用 yield from 来进行中断,达到异步功能! ```python # 使用同步方式编写异步功能 import time import asyncio @asyncio.coroutine # 标志协程的装饰器 def taskIO_1(): print('开始运行IO任务1...') yield from asyncio.sleep(2) # 假设该任务耗时2s print('IO任务1已完成,耗时2s') return taskIO_1.__name__ @asyncio.coroutine # 标志协程的装饰器 def taskIO_2(): print('开始运行IO任务2...') yield from asyncio.sleep(3) # 假设该任务耗时3s print('IO任务2已完成,耗时3s') return taskIO_2.__name__ @asyncio.coroutine # 标志协程的装饰器 def main(): # 调用方 tasks = [taskIO_1(), taskIO_2()] # 把所有任务添加到task中 done,pending = yield from asyncio.wait(tasks) # 子生成器 for r in done: # done和pending都是一个任务,所以返回结果需要逐个调用result() print('协程无序返回值:'+r.result()) if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # 创建一个事件循环对象loop try: loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束 finally: loop.close() # 结束事件循环 print('所有IO任务总耗时%.5f秒' % float(time.time()-start)) ``` 执行结果如下 ```python 开始运行IO任务1... 开始运行IO任务2... IO任务1已完成,耗时2s IO任务2已完成,耗时3s 协程无序返回值:taskIO_2 协程无序返回值:taskIO_1 所有IO任务总耗时3.00209秒 ``` asyncio 的编程模型就是一个消息循环。我们从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了异步 IO。 【使用方法】: @asyncio.coroutine装饰器是协程函数的标志,我们需要在每一个任务函数前加这个装饰器,并在函数中使用 yield from。 在同步 IO 任务的代码中使用的 time.sleep(2) 来假设任务执行了 2 秒。但在协程中 yield from 后面必须是子生成器函数,而 time.sleep() 并不是生成器,所以这里需要使用内置模块提供的生成器函数 asyncio.sleep()。 【功能】:通过使用协程,极大增加了多任务执行效率,最后消耗的时间是任务队列中耗时最多的时间。上述例子中的总耗时 3 秒就是 taskIO_2() 的耗时时间。 【执行过程】: 上面代码先通过 get_event_loop()获取了一个标准事件循环 loop (因为是一个,所以协程是单线程) 然后,我们通过 run_until_complete(main()) 来运行协程 此处把调用方协程 main () 作为参数,调用方负责调用其他委托生成器,run_until_complete 的特点就像该函数的名字,直到循环事件的所有事件都处理完才能完整结束。 进入调用方协程,我们把多个任务 [taskIO_1() 和 taskIO_2()] 放到一个 task 列表中,可理解为打包任务。 现在,我们使用 asyncio.wait(tasks) 来获取一个 awaitable objects 即可等待对象的集合 (此处的 aws 是协程的列表),并发运行传入的 aws,同时通过 yield from 返回一个包含 (done, pending) 的元组,done 表示已完成的任务列表,pending 表示未完成的任务列表; 如果使用 asyncio.as_completed(tasks) 则会按完成顺序生成协程的迭代器 (常用于 for 循环中),因此当你用它迭代时,会尽快得到每个可用的结果。【此外,当轮询到某个事件时 (如 taskIO_1 ()),直到遇到该任务中的 yield from 中断,开始处理下一个事件 (如 taskIO_2 ())),当 yield from 后面的子生成器完成任务时,该事件才再次被唤醒】 因为 done 里面有我们需要的返回结果,但它目前还是个任务列表,所以要取出返回的结果值,我们遍历它并逐个调用 result() 取出结果即可。 > 注:对于 asyncio.wait() 和 asyncio.as_completed() 返回的结果均是先完成的任务结果排在前面,所以此时打印出的结果不一定和原始顺序相同,但使用 gather() 的话可以得到原始顺序的结果集, 最后我们通过 loop.close() 关闭事件循环。 综上所述:协程的完整实现是靠①事件循环+②协程。 ```python import asyncio @asyncio.coroutine def hello(): print("Hello world!") # 异步调用asyncio.sleep(1): r = yield from asyncio.sleep(1) print("Hello again!") # 获取EventLoop: loop = asyncio.get_event_loop() # 执行coroutine loop.run_until_complete(hello()) loop.close() ``` @asyncio.coroutine 把一个 generator 标记为 coroutine 类型,然后,我们就把这个 coroutine 扔到 EventLoop 中执行。 hello() 会首先打印出 Hello world!,然后,yield from 语法可以让我们方便地调用另一个 generator。 由于 asyncio.sleep() 也是一个 coroutine,所以线程不会等待 asyncio.sleep(),而是直接中断并执行下一个消息循环。当 asyncio.sleep() 返回时,线程就可以从 yield from 拿到返回值(此处是 None),然后接着执行下一行语句。 把 asyncio.sleep(1) 看成是一个耗时 1 秒的 IO 操作,在此期间,主线程并未等待,而是去执行 EventLoop 中其他可以执行的 coroutine 了,因此可以实现并发执行。 我们用 Task 封装两个 coroutine ```python import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ``` 执行过程 ```python Hello world! (<_MainThread(MainThread, started 140735195337472)>) Hello world! (<_MainThread(MainThread, started 140735195337472)>) (暂停约1秒) Hello again! (<_MainThread(MainThread, started 140735195337472)>) Hello again! (<_MainThread(MainThread, started 140735195337472)>) ``` 由打印的当前线程名称可以看出,两个 coroutine 是由同一个线程并发执行的。 如果把 asyncio.sleep() 换成真正的 IO 操作,则多个 coroutine 就可以由一个线程并发执行。 比如获取 sina、 sohu和163 ```python import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ``` 执行结果如下: ```python wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段时间) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0 ... ``` ## async + await实现协程 在 Python 3.4 中,我们发现很容易将协程和生成器混淆 在 Python 3.5 开始引入了新的语法 async 和 await,以简化并更好地标识异步 IO 要使用新的语法,只需要做两步简单的替换: - 把 @asyncio.coroutine 替换为 async; - 把 yield from 替换为 await。 ```python import time import asyncio async def taskIO_1(): print('开始运行IO任务1...') await asyncio.sleep(2) # 假设该任务耗时2s print('IO任务1已完成,耗时2s') return taskIO_1.__name__ async def taskIO_2(): print('开始运行IO任务2...') await asyncio.sleep(3) # 假设该任务耗时3s print('IO任务2已完成,耗时3s') return taskIO_2.__name__ async def main(): # 调用方 tasks = [taskIO_1(), taskIO_2()] # 把所有任务添加到task中 done,pending = await asyncio.wait(tasks) # 子生成器 for r in done: # done和pending都是一个任务,所以返回结果需要逐个调用result() print('协程无序返回值:'+r.result()) if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # 创建一个事件循环对象loop try: loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束 finally: loop.close() # 结束事件循环 print('所有IO任务总耗时%.5f秒' % float(time.time()-start)) ``` ## aiohttp asyncio 可以实现单线程并发 IO 操作。如果仅用在客户端,发挥的威力不大。 如果把 asyncio 用在服务器端,例如 Web 服务器,由于 HTTP 连接就是 IO 操作,因此可以用单线程 +coroutine 实现多用户的高并发支持。 asyncio 实现了 TCP、UDP、SSL 等协议,aiohttp 则是基于 asyncio 实现的 HTTP 框架。 我们先安装 aiohttp: pip install aiohttp 然后编写一个 HTTP 服务器,分别处理以下 URL: / - 首页返回 b'<h1>Index</h1>'; /hello/{name} - 根据 URL 参数返回文本 hello, %s!。 代码如下: ```python # coding: utf-8 import asyncio from aiohttp import web async def index(request): return web.Response(body='<h1>Index</h1>'.encode(), content_type='text/html') async def init(loop): app = web.Application() app.router.add_route('GET','/', index) srv = await loop.create_server(app._make_handler(),'127.0.0.1', 9000) logging.info('server started at http://127.0.0.1:9000...') return srv init() ``` ## 总结 【引出问题】: 同步编程的并发性不高 多进程编程效率受 CPU 核数限制,当任务数量远大于 CPU 核数时,执行效率会降低。 多线程编程需要线程之间的通信,而且需要锁机制来防止共享变量被不同线程乱改,而且由于 Python 中的 GIL (全局解释器锁),所以实际上也无法做到真正的并行。 【产生需求】: 可不可以采用同步的方式来编写异步功能代码? 能不能只用一个单线程就能做到不同任务间的切换?这样就没有了线程切换的时间消耗,也不用使用锁机制来削弱多任务并发效率! 对于 IO 密集型任务,可否有更高的处理方式来节省 CPU 等待时间? 此外,多进程和多线程是内核级别的程序,而协程是函数级别的程序,是可以通过程序员进行调用的。以下是协程特性的总结: |协程|属性| |:--|:--| |所需线程|单线程| |编程方式|同步| |实现效果|异步| |是否需要锁机制|否| |程序级别|函数级| |实现机制|事件循环+协程| |总耗时|最耗时事件的时间| |应用场景|IO 密集型任务等| 另外tqdm 是一个用来生成进度条的优秀的库。这个协程就像 asyncio.wait 一样工作,不过会显示一个代表完成度的进度条 ```python async def wait_with_progress(coros): for f in tqdm.tqdm(asyncio.as_completed(coros), total=len(coros)): await f ``` # 进程与线程 现代操作系统都是支持多任务的。 什么是多任务?比如说操作系统可以一边开着浏览器进行上网,也可以一般打开Word 即使过去的单核 CPU,也可以执行多任务。由于 CPU 执行代码都是顺序执行的,那么,单核 CPU 是怎么执行多任务的呢 答案就是操作系统轮流让各个任务交替执行,由于 CPU 的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。 真正的并行执行多任务只能在多核 CPU 上实现,但是,由于任务数量远远多于 CPU 的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行 对操作系统而言,一个任务就是一个进程。 有些进程还不止同时干一件事,比如 Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个 “子任务”,我们把进程内的这些 “子任务” 称为线程(Thread)。 由于每个进程至少要干一件事,所以,一个进程至少有一个线程, 如果我们要同时执行多个任务怎么办? - 一种是启动多个进程,每个进程虽然只有一个线程,但多个进程可以一块执行多个任务。 - 还有一种方法是启动一个进程,在一个进程内启动多个线程,这样,多个线程也可以一块执行多个任务。 - 当然还有第三种方法,就是启动多个进程,每个进程再启动多个线程,这样同时执行的任务就更多了,当然这种模型更复杂,实际很少采用。 总结一下,多任务有3种方式 - 多进程模式; - 多线程模式; - 多进程 + 多线程模式。 Python 既支持多进程,又支持多线程,我们会讨论如何编写这两种多任务程序。 线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。 多进程和多线程的程序涉及到同步、数据共享的问题,编写起来更复杂。 ## 多进程与多线程对比 要实现多任务,通常我们会设计 Master-Worker 模式,Master 负责分配任务,Worker 负责执行任务,因此,多任务环境下,通常是一个 Master,多个 Worker。 - 如果用多进程实现 Master-Worker,主进程就是 Master,其他进程就是 Worker。 - 如果用多线程实现 Master-Worker,主线程就是 Master,其他线程就是 Worker。 多进程最大的优点在于稳定性高,一个子进程崩溃了,不会影响其他的主进程或者子进程。 当然主进程挂了所有进程就全挂了,但是 Master 进程只负责分配任务,挂掉的概率低。著名的 Apache 最早就是采用多进程模式。 但是缺点在于创建进程的代价大,在Linux上可以使用fork调用,在 Windows 下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和 CPU 的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。 多线程模式通常比多进程快一点,但是也快不到哪去,而且,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。在 Windows 上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程。 在 Windows 下,多线程的效率比多进程要高,所以微软的 IIS 服务器默认采用多线程模式。由于多线程存在稳定性的问题,IIS 的稳定性就不如 Apache。为了缓解这个问题,IIS 和 Apache 现在又有多进程 + 多线程的混合模式,真是把问题越搞越复杂。 而且不管是多线程还是多进程,数量一多,效率肯定上不去。 鱼尾纹切换作业是有代价的,比如从语文切到数学,要先收拾桌子上的语文书本、钢笔(这叫保存现场),然后,打开数学课本、找出圆规直尺(这叫准备新环境),才能开始做数学作业。操作系统在切换进程或者线程时也是一样的,它需要先保存当前执行的现场环境(CPU 寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。 所以,多任务一旦多到一个限度,就会消耗掉系统所有的资源,结果效率急剧下降,所有任务都做不好。 考虑到 CPU 和 IO 之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待 IO 操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行。 现代操作系统对 IO 操作已经做了巨大的改进,最大的特点就是支持异步 IO。如果充分利用操作系统提供的异步 IO 支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx 就是支持异步 IO 的 Web 服务器,它在单核 CPU 上采用单进程模型就可以高效地支持多任务。 在多核 CPU 上,可以运行多个进程(数量与 CPU 核心数相同),充分利用多核 CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步 IO 编程模型来实现多任务是一个主要的趋势。 对应到 Python 语言,单线程的异步编程模型称为协程 ## 多进程 Linux操作系统使用 `fork()`这个系统调用来创建子进程。 `fork()`非常的特殊,普通的函数调用,调用一次,返回一次,但是 fork() 调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。 子进程永远返回 0,而父进程返回子进程的 ID。 这样做的理由是,一个父进程可以 fork 出很多子进程,所以,父进程要记下每个子进程的 ID,而子进程只需要调用 getppid() 就可以拿到父进程的 ID。 如何使用Python来创建子进程呢? ```python import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid)) ``` 运行结果如下: ```python Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876. ``` 由于 Windows 没有 fork 调用,上面的代码在 Windows 上无法运行。 有了 fork 调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的 Apache 服务器就是由父进程监听端口,每当有新的 http 请求时,就 fork 出子进程来处理新的 http 请求。 难道在 Windows 上无法用 Python 编写多进程的程序? 由于 Python 是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing 模块就是跨平台版本的多进程模块 multiprocessing 模块提供了一个 Process 类来代表一个进程对象 ```python from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') ``` 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。 join()方法可以等待子进程结束后再继续运行。 如果要启动大量的子进程,可以用进程池的方式批量创建子进程: ```python from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') ``` task 0,1,2,3 是立刻执行的,而 task 4 要等待前面某个 task 完成后才执行,这是因为 Pool 的默认大小为CPU的核数,也就是 4,因此,最多同时执行 4 个进程。这是 Pool 有意设计的限制,并不是操作系统的限制 如果改成: ```python p = Pool(5) ``` 就可以同时跑 5 个进程。 执行结果如下: ```python Parent process 669. Waiting for all subprocesses done... Run task 0 (671)... Run task 1 (672)... Run task 2 (673)... Run task 3 (674)... Task 2 runs 0.14 seconds. Run task 4 (673)... Task 1 runs 0.27 seconds. Task 3 runs 0.86 seconds. Task 0 runs 1.41 seconds. Task 4 runs 1.91 seconds. All subprocesses done. ``` ### 子进程 很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。 subprocess 模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。 ```python import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) print('Exit code:', r) ``` 这个演示了如何在 Python 代码中运行命令 nslookup www.python.org,这和命令行直接运行的效果是一样。 如果子进程还需要输入,则可以通过 communicate() 方法输入: ```python import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('utf-8')) print('Exit code:', p.returncode) ``` 相当于在命令行执行命令 nslookup,然后手动输入: ```python set q=mx python.org exit ``` ### 进程间通信 Process 之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python 的 multiprocessing 模块包装了底层的机制,提供了 Queue、Pipes 等多种方式来交换数据。 我们以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据: ```python from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,无法等待其结束,只能强行终止: pr.terminate() ``` 运行结果如下: ```python Process to write: 50563 Put A to queue... Process to read: 50564 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue. ``` 在 Unix/Linux 下,multiprocessing 模块封装了 fork() 调用,使我们不需要关注 fork() 的细节。由于 Windows 没有 fork 调用,因此,multiprocessing 需要 “模拟” 出 fork 的效果,父进程所有 Python 对象都必须通过 pickle 序列化再传到子进程去,所以,如果 multiprocessing 在 Windows 下调用失败了,要先考虑是不是 pickle 失败了。 ## 多线程 多任务可以由多进程完成,也可以由一个进程内的多线程完成。 线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python 也不例外,并且,Python 的线程是真正的 Posix Thread,而不是模拟出来的线程。 Python 的标准库提供了两个模块:_thread 和 threading,_thread 是低级模块,threading 是高级模块,对_thread 进行了封装。绝大多数情况下,我们只需要使用 threading 这个高级模块。 启动一个线程就是把一个函数传入并创建 Thread 实例,然后调用 start() 开始执行: ```python import time, threading # 新线程执行的代码: def loop(): print('thread %s is running...' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('thread %s ended.' % threading.current_thread().name) print('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread') t.start() t.join() print('thread %s ended.' % threading.current_thread().name) ``` 结果为: ```python thread MainThread is running... thread LoopThread is running... thread LoopThread >>> 1 thread LoopThread >>> 2 thread LoopThread >>> 3 thread LoopThread >>> 4 thread LoopThread >>> 5 thread LoopThread ended. thread MainThread ended. ``` 由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程 Python 的 threading 模块有个 current_thread() 函数,它永远返回当前线程的实例。 主线程实例的名字叫 MainThread,子线程的名字在创建时指定,我们用 LoopThread 命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字 Python 就自动给线程命名为 Thread-1,Thread-2…… ### Lock 多线程和多进程最大的区别在于,多进程,同样一个变量各自有一份拷贝在每个进程中。 而多线程,所有变量都由所有线程共享。所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。 比如 ```python import time, threading # 假定这是你的银行存款: balance = 0 def change_it(n): # 先存后取,结果应该为0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(100000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance) ``` 我们定义一个共享变量balance,初始值为0. 然后启动两个线程,先存后取,理论上应该一直为0. 但是由于线程的调度由操作系统决定,当 t1、t2 交替执行时,只要循环次数足够多,balance 的结果就不一定是 0 了。 原因是即使是简单的一条语句在 CPU 执行时是若干条语句 ```python balance = balance + n ``` 也分两步: - 计算 balance + n,存入临时变量中; - 将临时变量的值赋给 balance。 也就是可以看成: ```python x = balance + n balance = x ``` 而x就是局部变量,每个线程各有自己的x 如果操作系统以下面的顺序执行 t1、t2: ```python 初始值 balance = 0 t1: x1 = balance + 5 # x1 = 0 + 5 = 5 t2: x2 = balance + 8 # x2 = 0 + 8 = 8 t2: balance = x2 # balance = 8 t1: balance = x1 # balance = 5 t1: x1 = balance - 5 # x1 = 5 - 5 = 0 t1: balance = x1 # balance = 0 t2: x2 = balance - 8 # x2 = 0 - 8 = -8 t2: balance = x2 # balance = -8 结果 balance = -8 ``` 这是因为修改balance需要多条语句,执行这几条语句,线程可能中断,从而导致多个线程把同一个对象改乱了。 如果要确保 balance 计算正确,需要给 change_it() 上一把锁。 当某个线程开始执行 change_it() 时,我们说,该线程因为获得了锁,因此其他线程不能同时执行 change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过 threading.Lock() 来实现: ```python balance = 0 lock = threading.Lock() def run_thread(n): for i in range(100000): # 先要获取锁: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了一定要释放锁: lock.release() ``` 当多个线程同时执行 lock.acquire() 时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。 获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用 try...finally 来确保锁一定会被释放。 锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。 其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。 ### 多核CPU 要想把 N 核 CPU 的核心全部跑满,就必须启动 N 个死循环线程。 现在启动一个与CPU核心数量相同的N个线程 ```python import threading, multiprocessing def loop(): x = 0 while True: x = x ^ 1 for i in range(multiprocessing.cpu_count()): t = threading.Thread(target=loop) t.start() ``` 但是4 核 CPU 上可以监控到 CPU 占用率仅有 102%,也就是仅使用了一核。 但是用 C、C++ 或 Java 来改写相同的死循环,直接可以把全部核心跑满,4 核就跑到 400%,8 核就跑到 800%,为什么 Python 不行呢? 因为 Python 的线程虽然是真正的线程,但解释器执行代码时,有一个 GIL 锁:Global Interpreter Lock,任何 Python 线程执行前,必须先获得 GIL 锁,然后,每执行 100 条字节码,解释器就自动释放 GIL 锁,让别的线程有机会执行。这个 GIL 全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在 Python 中只能交替执行,即使 100 个线程跑在 100 核 CPU 上,也只能用到 1 个核。 GIL 是 Python 解释器设计的历史遗留问题,通常我们用的解释器是官方实现的 CPython,要真正利用多核,除非重写一个不带 GIL 的解释器。 所以,在 Python 中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过 C 扩展来实现,不过这样就失去了 Python 简单易用的特点。 不过,也不用过于担心,Python 虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个 Python 进程有各自独立的 GIL 锁,互不影响。 ## ThreadLocal 在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。 但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦: ```python def process_student(name): std = Student(name) # std是局部变量,但是每个函数都要用它,因此必须传进去: do_task_1(std) do_task_2(std) def do_task_1(std): do_subtask_1(std) do_subtask_2(std) def do_task_2(std): do_subtask_2(std) do_subtask_2(std) ``` 每个函数如果要使用局部变量,则需要一层一层的传进去。 我们也可以用一个全局 dict 存放所有的 Student 对象,然后以 thread 自身作为 key 获得线程对应的 Student 对象 ```python global_dict = {} def std_thread(name): std = Student(name) # 把std放到全局变量global_dict中: global_dict[threading.current_thread()] = std do_task_1() do_task_2() def do_task_1(): # 不传入std,而是根据当前线程查找: std = global_dict[threading.current_thread()] ... def do_task_2(): # 任何函数都可以查找出当前线程的std变量: std = global_dict[threading.current_thread()] ... ``` 这样写理论上是可行的,但是代码可读性比较差。 所以ThreadLocal 应运而生,ThreadLocal 帮你查找 dict ```python import threading # 创建全局ThreadLocal对象: local_school = threading.local() def process_student(): # 获取当前线程关联的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): # 绑定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join() ``` 执行结果: ```python Hello, Alice (in Thread-A) Hello, Bob (in Thread-B) ``` 全局变量 local_school 就是一个 ThreadLocal 对象,每个 Thread 对它都可以读写 student 属性,但互不影响。 可以把 local_school 看成全局变量,但每个属性如 local_school.student 都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal 内部会处理。 可以理解为全局变量 local_school 是一个 dict,不但可以用 local_school.student,还可以绑定其他变量,如 local_school.teacher 等等。 ThreadLocal 最常用的地方就是为每个线程绑定一个数据库连接,HTTP 请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。 > ThreadLocal 解决了参数在一个线程中各个函数之间互相传递的问题。 ## 分布式进程 在 Thread 和 Process 中,应当优选 Process,因为 Process 更稳定,而且,Process 可以分布到多台机器上,而 Thread 最多只能分布到同一台机器的多个 CPU 上。 Python 的 multiprocessing 模块不但支持多进程,其中 managers 子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。 举个例子:如果我们已经有一个通过 Queue 通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现? 原有的 Queue 可以继续使用,但是,通过 managers 模块把 Queue 通过网络暴露出去,就可以让其他机器的进程访问 Queue 了。 我们先看服务进程,服务进程负责启动 Queue,把 Queue 注册到网络上,然后往 Queue 里面写入任务: ```python import random, time, queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = queue.Queue() # 接收结果的队列: result_queue = queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.') ``` 请注意,当我们在一台机器上写多进程程序时,创建的 Queue 可以直接拿来用,但是,在分布式多进程环境下,添加任务到 Queue 不可以直接对原始的 task_queue 进行操作,必须通过 manager.get_task_queue() 获得的 Queue 接口添加,否则那样就绕过了 QueueManager 的封装, 然后,在另一台机器上启动任务进程(本机上启动也可以): ```python import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.') ``` 任务进程要通过网络连接到服务进程,所以要指定服务进程的 IP。 现在,可以试试分布式进程的工作效果了。先启动 task_master.py 服务进程: ```python $ python3 task_master.py Put task 3411... Put task 1605... Put task 1398... Put task 4729... Put task 5300... Put task 7471... Put task 68... Put task 4219... Put task 339... Put task 7866... Try get results... ``` task_master.py 进程发送完任务后,开始等待 result 队列的结果。现在启动 task_worker.py 进程: ```python $ python3 task_worker.py Connect to server 127.0.0.1... run task 3411 * 3411... run task 1605 * 1605... run task 1398 * 1398... run task 4729 * 4729... run task 5300 * 5300... run task 7471 * 7471... run task 68 * 68... run task 4219 * 4219... run task 339 * 339... run task 7866 * 7866... worker exit. ``` task_worker.py 进程结束,在 task_master.py 进程中会继续打印出结果: ```python Result: 3411 * 3411 = 11634921 Result: 1605 * 1605 = 2576025 Result: 1398 * 1398 = 1954404 Result: 4729 * 4729 = 22363441 Result: 5300 * 5300 = 28090000 Result: 7471 * 7471 = 55815841 Result: 68 * 68 = 4624 Result: 4219 * 4219 = 17799961 Result: 339 * 339 = 114921 Result: 7866 * 7866 = 61873956 ``` Master/Worker 模型有什么用? 其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个 worker,就可以把任务分布到几台甚至几十台机器上,比如把计算 n*n 的代码换成发送邮件,就实现了邮件队列的异步发送。 Queue 对象存储在哪?注意到 task_worker.py 中根本没有创建 Queue 的代码,所以,Queue 对象存储在 task_master.py 进程中: ```python ┌─────────────────────────────────────────┐ ┌──────────────────────────────────────┐ │task_master.py │ │ │task_worker.py │ │ │ │ │ │ task = manager.get_task_queue() │ │ │ task = manager.get_task_queue() │ │ result = manager.get_result_queue() │ │ result = manager.get_result_queue() │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ ┌─────────────────────────────────┐ │ │ │ │ │ │QueueManager │ │ │ │ │ │ │ │ ┌────────────┐ ┌──────────────┐ │ │ │ │ │ │ │ │ task_queue │ │ result_queue │ │<───┼──┼──┼──────────────┘ │ │ │ └────────────┘ └──────────────┘ │ │ │ │ │ └─────────────────────────────────┘ │ │ │ │ └─────────────────────────────────────────┘ └──────────────────────────────────────┘ │ Network ``` 而 Queue 之所以能通过网络访问,就是通过 QueueManager 实现的。由于 QueueManager 管理的不止一个 Queue,所以,要给每个 Queue 的网络调用接口起个名字,比如 get_task_queue。 authkey 有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果 task_worker.py 的 authkey 和 task_master.py 的 authkey 不一致,肯定连接不上。 注意 Queue 的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由 Worker 进程再去共享的磁盘上读取文件。