越来越多的框架往异步的方向发展,包括django后续的3.x版本也出了asgi版本,包括tornado,fastapi,aiohttp都在异步。
精通顺序:
- 协程 异步编程都是基于协程实现的
- asyncio python中专门进行异步编程的模块
- 实战案例 了解前面的基础,实战也很简单
- 协程
协程,和进程/线程不一样,协程不是计算机提供的,而是由程序员人为创造出来的,它是由一个线程,在代码中游走切换运行的东西。
实现协程主流有这么几种方法:
- greenlet,早期模块
- yeild关键字
- asyncio模块 python内置模块,python3.4引入,使用装饰器
- asyncio模块 python内置模块,python3.5之后引入,使用async和await关键字 #目前主流
# 通过greenlet实现协程,需要pip提前安装,pip install greenlet,不推荐学习了,仅了解即可
from greenlet import greenlet
def func1():
print(1) # 第2步,输出1
gr2.switch() # 第3步,切换到func2函数
print(2) # 第6步,输出2
gr2.switch() # 第7步, 切换到func2函数,从上一次执行的位置继续执行
def func2():
print(3) # 第4步,输出3
gr1.switch() # 第5步,切换到func1函数,从上一次执行的位置继续向后执行
print(4) # 第8步,输出4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步,去执行func1函数
1
3
2
4
# 通过yield关键字实现,不推荐学习了,仅了解即可
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1() # f1变成了生成器对象
for item in f1:
print(item)
1
3
4
2
# 通过asyncio模块实现,python3.4+支持,不推荐学习了,仅了解即可
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时的操作时,自动切换到tasks中的其他任务
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到IO耗时的操作时,自动切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
~\AppData\Local\Temp\ipykernel_26780\3601692985.py in
22
23 loop = asyncio.get_event_loop()
---> 24 loop.run_until_complete(asyncio.wait(tasks))
c:\users\xhs20\appdata\local\programs\python\python37\lib\asyncio\base_events.py in run_until_complete(self, future)
553 future.add_done_callback(_run_until_complete_cb)
554 try:
--> 555 self.run_forever()
556 except:
557 if new_task and future.done() and not future.cancelled():
c:\users\xhs20\appdata\local\programs\python\python37\lib\asyncio\base_events.py in run_forever(self)
508 self._check_closed()
509 if self.is_running():
--> 510 raise RuntimeError('This event loop is already running')
511 if events._get_running_loop() is not None:
512 raise RuntimeError(
RuntimeError: This event loop is already running
1
3
2
4
在jupyter nootebook中执行上述代码会报错,在jupyter notebook中,事件循环是自动启动的,因此再次启动会抛出此错误。
我在vscode测试执行,是不会报错的,如果非要在jupyter notebook中执行,可以使用 nest_asyncio 允许嵌套事件循环
pip install nest_asyncio 然后在前面加入这两行
import nest_asyncio nest_asyncio.apply() ...
- asyncio遇到阻塞会自动切换的,而greenlet和yield则不会,就这一点而言,asyncio已经高级了很多
- asyncio 无法直接判断某段代码是否会阻塞,需要开发者保证协程中的代码使用非阻塞的异步方法(如 await asyncio.sleep() 而不是 time.sleep())。
避免阻塞的方法包括:
- 使用 await 或异步操作显式让出控制权。
- 替换同步 I/O 或 CPU 密集型任务为异步或并行版本。
- 使用线程池或进程池处理阻塞任务。
# 通过asyncio模块和async、await关键字实现,python3.6+支持,需要熟练掌握
import asyncio
async def func1():
print(1)
await asyncio.sleep(2) # 遇到IO耗时的操作时,自动切换到tasks中的其他任务
print(2)
async def func2():
print(3)
await asyncio.sleep(2) # 遇到IO耗时的操作时,自动切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1
3
2
4
({ result=None>,
result=None>},
set())
协程的意义
在一个线程中,如果遇到IO等待时间,线程不会傻傻等待,利用空闲时间去干点其他事儿
案例:下载三张图片
# 同步的方式,vscode中运行的
import requests
from datetime import datetime
def download_img(url):
print("开始下载:"+url)
response = requests.get(url)
print("下载完成!")
filename = url.rsplit('_')[-1]
with open(filename, mode='wb') as file_object:
file_object.write(response.content)
start = datetime.now()
if __name__ == "__main__":
url_list = [
'https://car3.autoimg.cn/cardfs/product/g31/M07/97/EA/1400x0_1_q95_autohomecar__ChtlyGXweMqARy51ABjnnkZN7yg825.jpg',
'https://car3.autoimg.cn/cardfs/product/g31/M09/97/E6/1400x0_1_q95_autohomecar__ChtlyGXweMaADgFRABimVylPS_4278.jpg',
'https://car2.autoimg.cn/cardfs/product/g32/M05/7A/92/1400x0_1_q95_autohomecar__Chtk2WdETmSAGDybAA89-n_2I0c069.jpg'
]
for item in url_list:
download_img(item)
timeDelta = datetime.now() - start
print("耗时:", timeDelta)
开始下载:https://car3.autoimg.cn/cardfs/product/g31/M07/97/EA/1400x0_1_q95_autohomecar__ChtlyGXweMqARy51ABjnnkZN7yg825.jpg
下载完成!
开始下载:https://car3.autoimg.cn/cardfs/product/g31/M09/97/E6/1400x0_1_q95_autohomecar__ChtlyGXweMaADgFRABimVylPS_4278.jpg
下载完成!
开始下载:https://car2.autoimg.cn/cardfs/product/g32/M05/7A/92/1400x0_1_q95_autohomecar__Chtk2WdETmSAGDybAA89-n_2I0c069.jpg
下载完成!
耗时: 0:00:03.583362
# 异步使用asyncio和aiohttp的方式,vscode中运行的
import aiohttp
import asyncio
from datetime import datetime
async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://car3.autoimg.cn/cardfs/product/g31/M07/97/EA/1400x0_1_q95_autohomecar__ChtlyGXweMqARy51ABjnnkZN7yg825.jpg',
'https://car3.autoimg.cn/cardfs/product/g31/M09/97/E6/1400x0_1_q95_autohomecar__ChtlyGXweMaADgFRABimVylPS_4278.jpg',
'https://car2.autoimg.cn/cardfs/product/g32/M05/7A/92/1400x0_1_q95_autohomecar__Chtk2WdETmSAGDybAA89-n_2I0c069.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
start = datetime.now()
if __name__ == '__main__':
asyncio.run(main())
timeDelta = datetime.now() - start
print("耗时:", timeDelta)
运行结果:
asyncio事件循环
可以理解为一个死循环,他会去检测并执行某些代码。
为了方便理解,做以下伪代码:
任务列表 = [ 任务1, 任务2,任务3, ... ]
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将“可执行”和“已完成”的任务返回
for 就绪任务 in 可执行的任务列表:
执行该已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除该已完成的任务
如果 任务列表中的任务都已完成,则终止循环
loop = asyncio.get_event_loop() # 生成一个事件循环列表
loop.run_until_complete(任务) # 将“任务”放到任务列表
快速上手
简单粗暴的核心概念解释:
- 协程函数:定义函数的时候,async def 函数名
- 协程对象:协程函数加括号
async def func():
pass
result = func()
上面几行简单的代码中,func就是协程函数,result就是协程对象。
注意,这里的虽然有result = func(),函数后面加括号?但这里func内部的代码不会被执行的
如果想要运行协程函数中的代码,必须要将协程对象交给事件循环来处理。
import asyncio
async def func():
print("运行协程函数了!")
result = func()
loop = asyncio.get_event_loop()
loop.run_until_complete(result)
运行协程函数!
上面的写法,是asyncio在早期python版本中的写法,在python3.7后还有更加简便的方式,不用再定义事件循环对象,一条命令搞定。
import asyncio
async def func():
print("运行协程函数了!")
result = func()
asyncio.run(result) # 要习惯这种
运行协程函数了!
await关键字
await -- async wait
用法:await + 可等待对象(协程对象,Future,Task对象)或者说IO等待
# 无实际意义的实例😈
import asyncio
async def func():
print("来")
response = await asyncio.sleep(2)
print("结束", response)
asyncio.run(func())
来
结束 None
# 协程对象调用协程对象,协程对象的嵌套
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return "返回值"
async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等IO操作完成后再往下继续执行,当前协程挂起时,事件循环可以去执行其他协程(任务)
response = await others()
print("IO请求结束,结果为:", response)
asyncio.run(func())
执行协程函数内部代码
start
end
IO请求结束,结果为: 返回值
以上示例中协程对象func()调用了协程对象others(),说明协程对象之间是可以嵌套的。
# 反向案例,帮助理解await,下面的代码因为有await,会串行执行(看输出很明显),看似是两个任务,但这两个任务没有放到事件循环中,故会串行执行
# 执行协程对象过程,遇到await就等,等到await后面的式子有结果了,才会继续往下执行
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return "返回值"
async def func():
print("执行协程函数内部代码")
response1 = await others()
print("IO请求结束,结果为:", response1)
response2 = await others() # 只是增加了这两行
print("IO请求结束,结果为:", response2)
asyncio.run(func())
执行协程函数内部代码
start
end
IO请求结束,结果为: 返回值
start
end
IO请求结束,结果为: 返回值
task对象
将多个任务添加到事件循环中
Tasks用于并发协程调度,通过asyncio.create_task("协程对象")的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()函数外,还可以使用低层级的loop.create_task()或ensure_future()函数。不建议手动实例化Task对象。
- python3.7之前推荐使用ensure_future()
- python3.7以及之后推荐使用asyncio.create_task()
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main开始")
# 创建task对象,并将func()添加到事件循环中
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
print("main结束")
# 遇到await,进行线程切换
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main())
main开始
main结束
1
1
2
2
返回值 返回值
# 上面的示例只是为了更好地理解,实际情况下,可能像下面这样写:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main开始")
# 创建task对象,并将func()添加到事件循环中
task_list = [
asyncio.create_task(func()),
asyncio.create_task(func())
]
print("main结束")
# 遇到await,进行线程切换
done, pending = await asyncio.wait(task_list, timeout=None) # done就是两个任务的返回值, pending一般情况下没什么用,设置了timeout才用
print(done,'类型:',type(done))
print(pending,'类型:', type(pending))
asyncio.run(main())
main开始
main结束
1
1
2
2
{ result='返回值'>, result='返回值'>} 类型:
set() 类型:
上面拿到的done集合,显示效果不好看,复制到下面并对齐:
{<Task finished coro=<func() done, defined at C:\Users\xhs20\AppData\Local\Temp\ipykernel_26780\2712902846.py:5> result='返回值'>,
<Task finished coro=<func() done, defined at C:\Users\xhs20\AppData\Local\Temp\ipykernel_26780\2712902846.py:5> result='返回值'>}
```
```python
# 对上面的代码做了简单修改,给task加上名字:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main开始")
# 创建task对象,并将func()添加到事件循环中
task_list = [
asyncio.create_task(func(), name='t1'),
asyncio.create_task(func(), name='t2')
]
print("main结束")
# 遇到await,进行线程切换
done, pending = await asyncio.wait(task_list, timeout=None) # done就是两个任务的返回值, pending一般情况下没什么用,设置了timeout才用
print(done,'类型:',type(done))
print(pending,'类型:', type(pending))
asyncio.run(main())
我在jupyter notebook运行上述代码会报错,可能是python3.7还不支持给task加name参数,我在vscode使用3.8的环境运行结果如下:
-
抢答题:上述代码中,最多存在多少个task?
答:3个,别忘了main也算一个
# 思考以下代码输出
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
task_list = [
asyncio.create_task(func()),
asyncio.create_task(func())
]
done, pending = asyncio.run(asyncio.wait(task_list))
以上代码必然会报错,因为到asyncio.create_task(func()这一步的时候,它会将func()加入到task中,但目前没有task存在:
# 怎么改呢?这样:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
task_list = [
func(),
func()
]
done, pending = asyncio.run(asyncio.wait(task_list)) # asyncio.wait用于等待一组异步任务完成。
print(done)
asyncio.run()会创建一个 Task 对象来运行提供的协程。让我们详细分析 asyncio.run() 的行为和它与 Task 的关系。
asyncio.run(coro) 是一个便捷方法,它可以用来运行一个顶层的异步协程(coro),并在运行完成后关闭事件循环。它在 Python 3.7 中引入,简化了运行单个异步任务的常见模式。
coro 是一个协程对象(需要用 async def 定义)。
asyncio.run() 的核心逻辑:
- 创建一个新的事件循环
- 在事件循环中运行传入的协程
- 将传入的协程包装为 Task 对象,并调度运行
- 等待 Task 完成
- 关闭事件循环
- 返回协程运行的结果
asyncio.run() 适合运行一个顶层协程并简化事件循环的管理。
如果需要并发运行多个任务,可以在 asyncio.run() 中通过显式的 asyncio.create_task() 创建更多任务。
- 未完待续