# 协程(微线程,纤程)
[TOC]
---
## 基础概念
多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。由于我们要解决的问题是CPU高速执行能力和IO设备的龟速严重不匹配,多线程和多进程只是解决这一问题的一种方法。
另一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。
### 同步IO模型和异步IO模型
```python
# 同步IO模型:
do_some_code()
f = open('/path/to/file', 'r')
r = f.read() # <== 线程停在此处等待IO操作结果
# IO操作完成后线程才能继续执行:
do_some_code(r)
# 异步io模型
loop = get_event_loop()
# 需要一个消息循环
while True:
# 主线程不断地重复“读取消息-处理消息”这一过程
event = loop.get_event()
process_event(event)
```
在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,**在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。**
协程是实现异步IO的高级形式,又称微线程,纤程。英文名Coroutine。
子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
```
def A():
print('1')
print('2')
print('3')
def B():
print('x')
print('y')
print('z')
# 协程执行结果,可能如下
1
2
x
y
3
z
```
看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?
主要行效率比多线程高很多,主要表现一下两点:
1. 不用切换线程,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
2. 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了。
因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
Python对协程的支持是通过generator实现的。
在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。
```python
import inspect
def consumer():
r = ''
while True:
# 3. 通过yield拿到消息n(最开始send进来的n为none,不返回只做启动用)
# yield关键字右边可以不需要加表达式(yield默认返回None)
n = yield r
if not n:
return
# 4. 拿到n之后进行处理
print('[CONSUMER] Consuming %s...' % n)
# 5. 处理完成,再下个循环又通过yield返回
r = '200 OK'
def produce(c):
# 1. 调用c.send(None)启动生成器;
# GEN_CREATED: 等待开始执行
print(inspect.getgeneratorstate(c))
c.send(None)
n = 0
while n < 3:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
# 2. 一旦生产了东西,通过c.send(n)切换到consumer执行
r = c.send(n)
print(inspect.getgeneratorstate(c))
# 6. 得到consumer处理的结果,再通过下一个循环,继续生产下一条消息
print('[PRODUCER] Consumer return: %s' % r)
# 在close前,状态 都是 GEN_SUSPENDED # 在yield表达式处暂停
print(inspect.getgeneratorstate(c))
# 7. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
c.close()
# close后 状态为GEN_CLOSED # 执行结束
print(inspect.getgeneratorstate(c))
c = consumer()
produce(c)
```
> 整个流程无锁,由一个线程执行,`produce`和`consumer`协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
### 协程生成器的基本行为
协程有四个状态,可以使用`inspect.getgeneratorstate(...)`函数确定:
GEN_CREATED # 等待开始执行
GEN_RUNNING # 解释器正在执行(只有在多线程应用中才能看到这个状态)
GEN_SUSPENDED # 在yield表达式处暂停
GEN_CLOSED # 执行结束
### 生成器api
1. `.send(value)`方法,生成器可以使用`.send(...)`方法发送数据,发送的数据会成为生成器函数中yield表达式的值。如上列中的n和r
2. `.throw(...)`方法,让调用方抛出异常,在生成器中处理
3. `.close()`方法,终止生成器
## asyncio
asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持,asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。
### @asyncio.coroutine
用asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作。
```python
import asyncio
import threading
# @asyncio.coroutine把一个generator标记为coroutine类型
@asyncio.coroutine
def baby(num):
print('baby %s sleep! (%s)' % (num,threading.currentThread()))
# 异步调用asyncio.sleep(2)生成器: 假设是一个耗时2秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。
yield from asyncio.sleep(2)
print('baby %s week up! (%s)' % (num,threading.currentThread()))
# 获取EventLoop: 事件循环对象
loop = asyncio.get_event_loop()
tasks = [baby(1), baby(2), baby(3)]
# 把上面coroutine扔到EventLoop中执行
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
'''
baby 1 sleep! (<_MainThread(MainThread, started 29028)>)
baby 2 sleep! (<_MainThread(MainThread, started 29028)>)
baby 3 sleep! (<_MainThread(MainThread, started 29028)>)
# (暂停约2秒,并且是在同一线程里面,实现了并发)
baby 1 week up! (<_MainThread(MainThread, started 29028)>)
baby 2 week up! (<_MainThread(MainThread, started 29028)>)
baby 3 week up! (<_MainThread(MainThread, started 29028)>)
1. baby(1)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环baby(2)
2. baby(2)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环baby(3)
3. baby(3)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行消息循环baby(1),至此所有操作都是以极快的时间完成的,花费了极少时间,此时三个baby同时都在睡眠,(asyncio.sleep)
4. 等待baby(1)睡眠完成,此时几乎同时其他baby也的睡眠也结束了,所以接着执行各个baby的打印wake up操作.结束整个消息循环,run_until_complete结束.
'''
```
**用asyncio的异步网络连接来获取sina、sohu和163的网站首页**
```python
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
# 刷新底层传输的写缓冲区。也就是把需要发送出去的数据,从缓冲区发送出去。没有手工刷新,asyncio为你自动刷新了。当执行到reader.readline()时,asyncio知道应该把发送缓冲区的数据发送出去了。
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
```
### async/await
async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:
1. 把@asyncio.coroutine替换为async;
2. 把yield from替换为await。
使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。**协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行**。
```python
import asyncio
import threading
async def baby(num):
print('baby %s sleep! (%s)' % (num,threading.currentThread()))
await asyncio.sleep(1)
print('baby %s week up! (%s)' % (num,threading.currentThread()))
loop = asyncio.get_event_loop()
# ???? 执行完的顺序让人疑惑
tasks = [baby(2), baby(1), baby(3),baby(4),baby(5)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
```
> tips: await 和 yield from 可以理解为 “不等了” (主线程是一个事件循环,执行到await,就“我不等了,您慢慢执行,我先走一步,好了再给我说”)
### 绑定回调
```python
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)
def callback(future):
print('Callback: ', future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)
print('TIME: ', now() - start)
```
**利用future对象回调别的函数**
?? future对象的特性,以下代码不太懂 ??
```python
import asyncio
import functools
def callback(future, n):
print('{}: future done: {}'.format(n, future.result()))
async def register_callbacks(all_done):
print('registering callbacks on future')
# 偏函数配合回调,all_done是future对象
all_done.add_done_callback(functools.partial(callback, n=1))
all_done.add_done_callback(functools.partial(callback, n=2))
async def main(all_done):
# 到此同步中断,异步执行回调函数注册
await register_callbacks(all_done)
print('setting result of future')
all_done.set_result('the result')
event_loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
event_loop.run_until_complete(main(all_done))
finally:
event_loop.close()
'''
registering callbacks on future
setting result of future
1: future done: the result
2: future done: the result
'''
```
### 多线程与asyncio对比
**多线程**
```python
# sinner_thread.py
import threading
import itertools
import time
import sys
# 这个类定义一个可变对象,用于从外部控制线程
class Signal:
go = True
# 这个函数会在单独的线程中运行,signal 参数是前边定义的Signal类的实例
def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
# itertools.cycle 函数从指定的序列中反复不断地生成元素
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # 使用退格符把光标移回行首
time.sleep(.1) # 每 0.1 秒刷新一次
if not signal.go: # 如果 go属性不是 True,退出循环
break
write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头
def slow_function(): # 模拟耗时操作
# 假装等待I/O一段时间
time.sleep(20) # 调用sleep 会阻塞主线程,这么做事为了释放GIL,创建从属线程
return 42
# 这个函数设置从属线程,显示线程对象,运行耗时计算,最后杀死进程
def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner) # 显示线程对象 输出 spinner object: <Thread(Thread-1, initial)>
spinner.start() # 启动从属进程
result = slow_function() # 运行slow_function 行数,阻塞主线程。同时从属线程以动画形式旋转指针
# python 并没有提供终止线程的API,所以若想关闭线程,必须给线程发送消息。这里我们使用signal.go 属性:在主线程中把它设置为False后,spinner 线程会接收到,然后退出
signal.go = False
spinner.join() # 等待spinner 线程结束
return result
def main():
result = supervisor()
print('Answer', result)
if __name__ == '__main__':
main()
```
**协程**
```python
# spinner_asyncio.py
# 通过协程以动画的形式显示文本式旋转指针
import asyncio
import itertools
import sys
async def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'): # itertools.cycle 函数从指定的序列中反复不断地生成元素
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # 使用退格符把光标移回行首
try:
# 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 这样的休眠不会阻塞事件循环
# 除非想阻塞主线程,从而冻结事件循环或整个应用,否则不要再 asyncio 协程中使用 time.sleep().如果协程需要在一段时间内什么都不做,应该使用 yield from asyncio.sleep(DELAY)
# 此处相当于另一协程
await asyncio.sleep(0.1)
# 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求
except asyncio.CancelledError as e:
print(str(e))
break
write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头
async def slow_function(): # 5 现在此函数是协程,使用休眠假装进行I/O 操作时,使用 yield from 继续执行事件循环
# 假装等待I/O一段时间
await asyncio.sleep(3) # 此表达式把控制权交给主循环,在休眠结束后回复这个协程
return 42
# ?? 不能改为asynic supervisor 否则asyncio.async会报错 ,已找到原因已被asyncio.ensure_future替代??
async def supervisor():
spinner = asyncio.ensure_future(spin('thinking!')) # asyncio.async() 函数排定协程的运行时间,使用一个 Task 对象包装spin 协程,并立即返回
print('spinner object:', spinner) # Task 对象,输出类似 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
# 驱动slow_function() 函数,结束后,获取返回值。同事事件循环继续运行,
# 因为slow_function 函数最后使用yield from asyncio.sleep(3) 表达式把控制权交给主循环
result = await slow_function()
# Task 对象可以取消;取消后会在协程当前暂停的yield处抛出 asyncio.CancelledError 异常
# 协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消
spinner.cancel()
return result
def main():
loop = asyncio.get_event_loop() # 获取事件循环引用
# 驱动supervisor 协程,让它运行完毕;这个协程的返回值是这次调用的返回值
result = loop.run_until_complete(supervisor())
loop.close()
print('Answer', result)
if __name__ == '__main__':
main()
```
**分析两段代码**
1. Task对象不由自己动手实例化,而是通过把协程传给 asyncio.async(...) 函数或 loop.create_task(...) 方法获取Task 对象已经排定了运行时间;而Thread 实例必须调用start方法,明确告知它运行
2. 在线程版supervisor函数中,slow_function 是普通的函数,由线程直接调用,而异步版的slow_function 函数是协程,由yield from 驱动。
3. 没有API能从外部终止线程,因为线程随时可能被中断。而协程如果想终止任务,可以使用Task.cancel() 实例方法,在协程内部抛出CancelledError 异常。协程可以在暂停的yield 处捕获这个异常,处理终止请求
4. supervisor 协程必须在main 函数中由loop.run_until_complete 方法执行。
5. **协程和线程相比关键的一个优点是**线程必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,而协程默认会做好保护,我们必须显式产出(使用yield 或 yield from 交出控制权)才能让程序的余下部分运行。
#### asyncio.Future:故意不阻塞
asyncio.Future 类与 concurrent.futures.Future 类的接口基本一致,不过实现方式不同,不可互换。在 concurrent.futures.Future 中,future只是调度执行某物的结果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一个协程,排定它的运行时间,然后返回一个asyncio.Task 实例(也是asyncio.Future 类的实例,因为 Task 是 Future 的子类,用于包装协程。(在 concurrent.futures.Future 中,类似的操作是Executor.submit(...))。
与concurrent.futures.Future 类似,asyncio.Future 类也提供了:
* `.done()` 返回布尔值,表示Future 是否已经执行
* `.add_done_callback()` 这个方法只有一个参数,类型是可调用对象,Future运行结束后会回调这个对象。
* `.result()` 这个方法没有参数,因此不能指定超时时间。 如果调用 .result() 方法时期还没有运行完毕,会抛出 asyncio.InvalidStateError 异常。
### 协程嵌套 (协程的常用方式)
```python
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
'''
# wait 有timeout参数
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
'''
# 如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。
# 使用asyncio.wait(tasks)返回的顺序有点难以理解,但使用asyncio.gather(*tasks)返回值的顺序就好理解得多
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
```
**抛出返回值到run_until_complete:**
```python
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.gather(*tasks)
start = now()
loop = asyncio.get_event_loop()
# 也可以直接返回到run_until_complete处理协程结果
results = loop.run_until_complete(main())
for result in results:
print('Task ret: ', result)
print('TIME: ', now() - start)
```
***使用as_completed**
```python
async def main(num):
tasks = []
i = 1
while i <= num :
tasks.append(asyncio.ensure_future(do_some_work(i)))
i += 1
for task in asyncio.as_completed(tasks):
result = await task
print('Task ret: {}'.format(result))
start = now()
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main(10))
print('TIME: ', now() - start)
```
### 协程停止
future对象有几个状态:Pending,Running,Done,Cancelled. 创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task
```python
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main(5))
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
# 启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。
print(task)
# 返回true或false,已执行的返回false
print(task.cancel())
#loop stop之后还需要再次开启事件循环,最后再close,不然还会抛出异常:
# 抛出异常后要重新启动循环
loop.stop()
loop.run_forever()
finally:
loop.close()
```
**批量停止**
```python
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(main(5))
try:
loop.run_until_complete(task)
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
print('-------------------')
# 批量停止,如果全部停止成功就直接返回true,与上列不同
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
```
### 不同线程的事件循环
很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。
```python
import asyncio
from threading import Thread
import time
now = lambda: time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
# 这里的计时没有意义,因为more_work具体的执行是在新的两个线程里面
print('TIME: {}'.format(now() - start))
```
启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法,后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3
### 新线程协程
```python
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_some_work(x):
print('Waiting {}'.format(x))
await asyncio.sleep(x)
print('Done after {}s'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
print('TIME: {}'.format(time.time() - start))
```
上述的例子,主线程中创建一个new_loop,然后在另外的两个子线程中开启一个无限事件循环。主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。
### master-worker主从模式
对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。
为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。
```python
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
```
### 停止子线程
如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,会抛出KeyboardInterrupt错误,我们修改一下主循环:
```python
try:
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
print(e)
new_loop.stop()
```
可是实际上并不好使,虽然主线程try了KeyboardInterrupt异常,但是子线程并没有退出,为了解决这个问题,可以设置子线程为守护线程,这样当主线程结束的时候,子线程也随机退出。
```python
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True) # 设置子线程为守护线程
t.start()
try:
while True:
# print('start rpop')
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
print(e)
new_loop.stop()
```
线程停止程序的时候,主线程退出后,子线程也随机退出才了,并且停止了子线程的协程任务。
```python
try:
while True:
# 用brpop方法,会block住task,如果主线程有消息,才会消费。
# 这种方式更适合队列消费,不用上面的要停顿一秒
_, task = rcon.brpop("queue")
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:
print('error', e)
new_loop.stop()
finally:
pass
```
### 协程消费
主线程用于监听队列,然后子线程的做事件循环的worker是一种方式。还有一种方式实现这种类似master-worker的方案。即把监听队列的无限循环挪进协程中。程序初始化就创建若干个协程,实现类似并行的效果。一般这个方案就可以了
```python
import time
import asyncio
import redis
now = lambda : time.time()
# 最多开多少个协程
MAX_COROUTINES = 10
def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=1,port=6379)
# connection_pool = redis.ConnectionPool(host='172.28.3.24', db=1,port=6379)
return redis.Redis(connection_pool=connection_pool)
rcon = get_redis()
async def worker():
print('Start worker')
while True:
start = now()
task = rcon.rpop("queue")
if not task:
await asyncio.sleep(1)
continue
print('Wait ', int(task))
await asyncio.sleep(int(task))
print('Done ', task, now() - start)
def main():
i = 0
while i < MAX_COROUTINES:
asyncio.ensure_future(worker())
i += 1
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt as e:
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
if __name__ == '__main__':
main()
```
## aiohttp
asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
### 实现web服务器
```python
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
```
## 例1,[使用asyncio 和 aiohttp 包下载国旗][1]
## 例2,[使用python-aiohttp爬取网易云音乐,今日头条,搭建微信公众平台][2]
[1]: https://gitee.com/nixi8/Python/tree/master/script
[2]: https://github.com/SigalHu/WeiXin