asyncio学习笔记

内容目录

越来越多的框架往异步的方向发展,包括django后续的3.x版本也出了asgi版本,包括tornado,fastapi,aiohttp都在异步。

精通顺序:

  • 协程 异步编程都是基于协程实现的
  • asyncio python中专门进行异步编程的模块
  • 实战案例 了解前面的基础,实战也很简单
  1. 协程

协程,和进程/线程不一样,协程不是计算机提供的,而是由程序员人为创造出来的,它是由一个线程,在代码中游走切换运行的东西。

实现协程主流有这么几种方法:

  • greenlet,早期模块
  • yeild关键字
  • asyncio模块 python内置模块,python3.4引入,使用装饰器
  • asyncio模块 python内置模块,python3.5之后引入,使用async和await关键字 #目前主流
# 通过greenlet实现协程,需要pip提前安装,pip install greenlet,不推荐学习了,仅了解即可

from greenlet import greenlet

def func1():
    print(1)         # 第2步,输出1
    gr2.switch()     # 第3步,切换到func2函数
    print(2)         # 第6步,输出2
    gr2.switch()     # 第7步, 切换到func2函数,从上一次执行的位置继续执行

def func2():
    print(3)          # 第4步,输出3
    gr1.switch()      # 第5步,切换到func1函数,从上一次执行的位置继续向后执行
    print(4)          # 第8步,输出4

gr1 = greenlet(func1)
gr2 = greenlet(func2)

gr1.switch()          # 第1步,去执行func1函数
1
3
2
4
# 通过yield关键字实现,不推荐学习了,仅了解即可

def func1():
    yield 1
    yield from func2()
    yield 2

def func2():
    yield 3
    yield 4

f1 = func1()  # f1变成了生成器对象

for item in f1:
    print(item)
1
3
4
2
# 通过asyncio模块实现,python3.4+支持,不推荐学习了,仅了解即可

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2) # 遇到IO耗时的操作时,自动切换到tasks中的其他任务
    print(2)

@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2) # 遇到IO耗时的操作时,自动切换到tasks中的其他任务
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
---------------------------------------------------------------------------

RuntimeError                              Traceback (most recent call last)

~\AppData\Local\Temp\ipykernel_26780\3601692985.py in 
     22 
     23 loop = asyncio.get_event_loop()
---> 24 loop.run_until_complete(asyncio.wait(tasks))

c:\users\xhs20\appdata\local\programs\python\python37\lib\asyncio\base_events.py in run_until_complete(self, future)
    553         future.add_done_callback(_run_until_complete_cb)
    554         try:
--> 555             self.run_forever()
    556         except:
    557             if new_task and future.done() and not future.cancelled():

c:\users\xhs20\appdata\local\programs\python\python37\lib\asyncio\base_events.py in run_forever(self)
    508         self._check_closed()
    509         if self.is_running():
--> 510             raise RuntimeError('This event loop is already running')
    511         if events._get_running_loop() is not None:
    512             raise RuntimeError(

RuntimeError: This event loop is already running

1
3
2
4

在jupyter nootebook中执行上述代码会报错,在jupyter notebook中,事件循环是自动启动的,因此再次启动会抛出此错误。
我在vscode测试执行,是不会报错的,如果非要在jupyter notebook中执行,可以使用 nest_asyncio 允许嵌套事件循环

pip install nest_asyncio 然后在前面加入这两行

import nest_asyncio
nest_asyncio.apply()
...

file

  • asyncio遇到阻塞会自动切换的,而greenlet和yield则不会,就这一点而言,asyncio已经高级了很多
  • asyncio 无法直接判断某段代码是否会阻塞,需要开发者保证协程中的代码使用非阻塞的异步方法(如 await asyncio.sleep() 而不是 time.sleep())。
避免阻塞的方法包括:

  • 使用 await 或异步操作显式让出控制权。

  • 替换同步 I/O 或 CPU 密集型任务为异步或并行版本。

  • 使用线程池或进程池处理阻塞任务。
# 通过asyncio模块和async、await关键字实现,python3.6+支持,需要熟练掌握

import asyncio

async def func1():
    print(1)
    await asyncio.sleep(2) # 遇到IO耗时的操作时,自动切换到tasks中的其他任务
    print(2)

async def func2():
    print(3)
    await asyncio.sleep(2) # 遇到IO耗时的操作时,自动切换到tasks中的其他任务
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1
3
2
4

({ result=None>,
   result=None>},
 set())
协程的意义

在一个线程中,如果遇到IO等待时间,线程不会傻傻等待,利用空闲时间去干点其他事儿

案例:下载三张图片

# 同步的方式,vscode中运行的

import requests
from datetime import datetime

def download_img(url):
    print("开始下载:"+url)
    response = requests.get(url)
    print("下载完成!")

    filename = url.rsplit('_')[-1]
    with open(filename, mode='wb') as file_object:
        file_object.write(response.content)
start = datetime.now()
if __name__ == "__main__":
    url_list = [
        'https://car3.autoimg.cn/cardfs/product/g31/M07/97/EA/1400x0_1_q95_autohomecar__ChtlyGXweMqARy51ABjnnkZN7yg825.jpg',
        'https://car3.autoimg.cn/cardfs/product/g31/M09/97/E6/1400x0_1_q95_autohomecar__ChtlyGXweMaADgFRABimVylPS_4278.jpg',
        'https://car2.autoimg.cn/cardfs/product/g32/M05/7A/92/1400x0_1_q95_autohomecar__Chtk2WdETmSAGDybAA89-n_2I0c069.jpg'
    ]
    for item in url_list:
        download_img(item)
    timeDelta = datetime.now() - start
    print("耗时:", timeDelta)
开始下载:https://car3.autoimg.cn/cardfs/product/g31/M07/97/EA/1400x0_1_q95_autohomecar__ChtlyGXweMqARy51ABjnnkZN7yg825.jpg
下载完成!
开始下载:https://car3.autoimg.cn/cardfs/product/g31/M09/97/E6/1400x0_1_q95_autohomecar__ChtlyGXweMaADgFRABimVylPS_4278.jpg
下载完成!
开始下载:https://car2.autoimg.cn/cardfs/product/g32/M05/7A/92/1400x0_1_q95_autohomecar__Chtk2WdETmSAGDybAA89-n_2I0c069.jpg
下载完成!
耗时: 0:00:03.583362
# 异步使用asyncio和aiohttp的方式,vscode中运行的

import aiohttp
import asyncio
from datetime import datetime

async def fetch(session, url):
    print("发送请求:", url)
    async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(content)

async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
        'https://car3.autoimg.cn/cardfs/product/g31/M07/97/EA/1400x0_1_q95_autohomecar__ChtlyGXweMqARy51ABjnnkZN7yg825.jpg',
        'https://car3.autoimg.cn/cardfs/product/g31/M09/97/E6/1400x0_1_q95_autohomecar__ChtlyGXweMaADgFRABimVylPS_4278.jpg',
        'https://car2.autoimg.cn/cardfs/product/g32/M05/7A/92/1400x0_1_q95_autohomecar__Chtk2WdETmSAGDybAA89-n_2I0c069.jpg'
    ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)

start = datetime.now()
if __name__ == '__main__':
    asyncio.run(main())
    timeDelta = datetime.now() - start
    print("耗时:", timeDelta)

运行结果:
file

asyncio事件循环

可以理解为一个死循环,他会去检测并执行某些代码。

为了方便理解,做以下伪代码:

任务列表 = [ 任务1, 任务2,任务3, ... ]
while True:
  可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将“可执行”和“已完成”的任务返回
  for 就绪任务 in 可执行的任务列表:
    执行该已就绪的任务
  for 已完成的任务 in 已完成的任务列表:
    在任务列表中移除该已完成的任务

  如果 任务列表中的任务都已完成,则终止循环
loop = asyncio.get_event_loop()            # 生成一个事件循环列表
loop.run_until_complete(任务)              # 将“任务”放到任务列表
快速上手

简单粗暴的核心概念解释:

  • 协程函数:定义函数的时候,async def 函数名
  • 协程对象:协程函数加括号
async def func():
    pass

result = func()

上面几行简单的代码中,func就是协程函数,result就是协程对象。

注意,这里的虽然有result = func(),函数后面加括号?但这里func内部的代码不会被执行的

如果想要运行协程函数中的代码,必须要将协程对象交给事件循环来处理。

import asyncio

async def func():
    print("运行协程函数了!")

result = func()

loop = asyncio.get_event_loop() 
loop.run_until_complete(result) 
运行协程函数!

上面的写法,是asyncio在早期python版本中的写法,在python3.7后还有更加简便的方式,不用再定义事件循环对象,一条命令搞定。

import asyncio

async def func():
    print("运行协程函数了!")

result = func()

asyncio.run(result)  # 要习惯这种
运行协程函数了!
await关键字

await -- async wait

用法:await + 可等待对象(协程对象,Future,Task对象)或者说IO等待

# 无实际意义的实例😈
import asyncio

async def func():
    print("来")
    response = await asyncio.sleep(2)
    print("结束", response)

asyncio.run(func())
来
结束 None
# 协程对象调用协程对象,协程对象的嵌套

import asyncio

async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return "返回值"

async def func():
    print("执行协程函数内部代码")
    # 遇到IO操作挂起当前协程(任务),等IO操作完成后再往下继续执行,当前协程挂起时,事件循环可以去执行其他协程(任务)
    response = await others()
    print("IO请求结束,结果为:", response)

asyncio.run(func())
执行协程函数内部代码
start
end
IO请求结束,结果为: 返回值

以上示例中协程对象func()调用了协程对象others(),说明协程对象之间是可以嵌套的。

# 反向案例,帮助理解await,下面的代码因为有await,会串行执行(看输出很明显),看似是两个任务,但这两个任务没有放到事件循环中,故会串行执行
# 执行协程对象过程,遇到await就等,等到await后面的式子有结果了,才会继续往下执行
import asyncio

async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return "返回值"

async def func():
    print("执行协程函数内部代码")

    response1 = await others()
    print("IO请求结束,结果为:", response1)
    response2 = await others()               # 只是增加了这两行
    print("IO请求结束,结果为:", response2)

asyncio.run(func())
执行协程函数内部代码
start
end
IO请求结束,结果为: 返回值
start
end
IO请求结束,结果为: 返回值
task对象

将多个任务添加到事件循环中

Tasks用于并发协程调度,通过asyncio.create_task("协程对象")的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()函数外,还可以使用低层级的loop.create_task()或ensure_future()函数。不建议手动实例化Task对象。

  • python3.7之前推荐使用ensure_future()
  • python3.7以及之后推荐使用asyncio.create_task()
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main开始")

    # 创建task对象,并将func()添加到事件循环中
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(func())

    print("main结束")

    # 遇到await,进行线程切换
    ret1 = await task1
    ret2 = await task2

    print(ret1, ret2)

asyncio.run(main())
main开始
main结束
1
1
2
2
返回值 返回值
# 上面的示例只是为了更好地理解,实际情况下,可能像下面这样写:
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main开始")

    # 创建task对象,并将func()添加到事件循环中
    task_list = [
        asyncio.create_task(func()),
        asyncio.create_task(func())
    ]

    print("main结束")

    # 遇到await,进行线程切换
    done, pending = await asyncio.wait(task_list, timeout=None) # done就是两个任务的返回值, pending一般情况下没什么用,设置了timeout才用

    print(done,'类型:',type(done))
    print(pending,'类型:', type(pending))

asyncio.run(main())
main开始
main结束
1
1
2
2
{ result='返回值'>,  result='返回值'>} 类型: 
set() 类型: 

上面拿到的done集合,显示效果不好看,复制到下面并对齐:

{<Task finished coro=<func() done, defined at C:\Users\xhs20\AppData\Local\Temp\ipykernel_26780\2712902846.py:5> result='返回值'>,
 <Task finished coro=<func() done, defined at C:\Users\xhs20\AppData\Local\Temp\ipykernel_26780\2712902846.py:5> result='返回值'>}
 ```

```python
# 对上面的代码做了简单修改,给task加上名字:
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main开始")

    # 创建task对象,并将func()添加到事件循环中
    task_list = [
        asyncio.create_task(func(), name='t1'),
        asyncio.create_task(func(), name='t2')
    ]

    print("main结束")

    # 遇到await,进行线程切换
    done, pending = await asyncio.wait(task_list, timeout=None) # done就是两个任务的返回值, pending一般情况下没什么用,设置了timeout才用

    print(done,'类型:',type(done))
    print(pending,'类型:', type(pending))

asyncio.run(main())

我在jupyter notebook运行上述代码会报错,可能是python3.7还不支持给task加name参数,我在vscode使用3.8的环境运行结果如下:
file

  • 抢答题:上述代码中,最多存在多少个task?

    答:3个,别忘了main也算一个

# 思考以下代码输出
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

task_list = [
    asyncio.create_task(func()),
    asyncio.create_task(func())
]

done, pending = asyncio.run(asyncio.wait(task_list))

以上代码必然会报错,因为到asyncio.create_task(func()这一步的时候,它会将func()加入到task中,但目前没有task存在:
file

# 怎么改呢?这样:
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

task_list = [
    func(),
    func()
]

done, pending = asyncio.run(asyncio.wait(task_list)) # asyncio.wait用于等待一组异步任务完成。
print(done)

file

asyncio.run()会创建一个 Task 对象来运行提供的协程。让我们详细分析 asyncio.run() 的行为和它与 Task 的关系。

asyncio.run(coro) 是一个便捷方法,它可以用来运行一个顶层的异步协程(coro),并在运行完成后关闭事件循环。它在 Python 3.7 中引入,简化了运行单个异步任务的常见模式。

coro 是一个协程对象(需要用 async def 定义)。
asyncio.run() 的核心逻辑:

  1. 创建一个新的事件循环
  2. 在事件循环中运行传入的协程
  3. 将传入的协程包装为 Task 对象,并调度运行
  4. 等待 Task 完成
  5. 关闭事件循环
  6. 返回协程运行的结果

asyncio.run() 适合运行一个顶层协程并简化事件循环的管理。

如果需要并发运行多个任务,可以在 asyncio.run() 中通过显式的 asyncio.create_task() 创建更多任务。

  • 未完待续

发表回复