Async IO in Python

发表于 2年以前  | 总阅读数:739 次

Async IO 是一种并发编程模型。本文我们主要学习 Python 中 Async IO 的使用。类比 Golang 中的协程,Python 中协程的调度不再依赖操作系统而是由程序自身完成。Async IO 适用于 IO 密集型的场景,它通过 event loop 在 IO 等待的同时执行其它协程,以充分利用 CPU。CPython 已经内置了 asyncio 包和 async、await 关键字来支持 Async IO。

"event loop" 是协程的调度器,我们可以把它假想为一个无限循环的程序,一直在监听协程,找到空闲的时机,触发协程运行。

Async IO 不同于多进程和多线程

多进程是真正意义上的同时执行。它依赖多核 CPU ,比较适合 CPU 密集型的场景,比如执行各种数学运算。

多进程可以和 Async IO 同时使用。更多介绍:https://www.youtube.com/watch?v=0kXaLh8Fz3k&t=630s。

多线程是一种并发执行的模型,一个进程可以执行多个线程。在 Python 中由于 GIL 的存在,多线程比较复杂。线程的切换需要依赖操作系统来完成。多线程和 Async IO 同样适用于 IO 密集型的场景,但是线程的切换、创建、销毁的成本要比协程高。多线程的程序还要处理线程安全问题。一台机器上通常创建的线程数最多是几万个。

对于多线程和 Async IO ,在下面的场景下更适合 Async IO :

  1. 大量的任务。
  2. 耗时较长的 IO 操作。

CPU 密集型场景的特点是计算机的内核不断努力工作,而 IO 密集型场景的特点是大量等待输入/输出的时间。

使用 asyncio 包 和 async/await

我们先使用同步的方法写一个示例代码:

#!/usr/bin/env python3
import time


def count():
    print("One")
    time.sleep(1)
    print("Two")


def main():
    # 循环三次
    for _ in range(3):
        count()


if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    # 输出总的执行时间
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

输出结果如下:

/demo2.py 
One
Two
One
Two
One
Two
/demo2.py executed in 3.02 seconds.

上面的代码逻辑很简单,循环调用 count() 方法三次,count() 方法每次睡眠一秒钟,三次调用完成,最后计算总的执行时间。一共耗时3.02秒。

如果我们使用 asyncioasync/await来实现,代码如下:

#!/usr/bin/env python3

import asyncio


async def count():
    print("One")
    # 使用 await 等待1秒执行完,使用 asyncio 的 sleep() 方法,而不是 time 包的 sleep().
    await asyncio.sleep(1)
    print("Two")


async def main():
    # 异步执行 count() 任务。
    await asyncio.gather(count(), count(), count())


if __name__ == "__main__":
    import time

    s = time.perf_counter()
    # 启动异步执行
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

输入结果如下:

/demo1.py 
One
One
One
Two
Two
Two
/demo1.py executed in 1.00 seconds.

和上一个例子不同的地方我们都在代码中添加了注释来说明。通过和上一个例子输出结果比较我们可以看到两点不同:

  1. One 和 Two 的输出顺序不同。
  2. 执行时间只有1.00秒。

输出顺序不同体现了 Async IO 在执行过程中的异步特性,不会一直等待返回结果,而是在某个任务等待时调度其他的任务去执行。因为任务之间没有顺序等待,所以总的执行时间也就约等于耗时最久任务的执行时间。

不得不说,这个 async await 的语法和 JavaScript 中的异步语法一摸一样。

async await 的一些规则说明

async def 引入了原生协程和异步生成器。当然也可以使用async with and async for.

关键字 await 将函数控制权传递回 event loop。

下面是几个语法正确和错误的例子:

async def f(x):
    y = await z(x)  # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x  # OK - this is an async generator

async def m(x):
    yield from gen(x)  # No - SyntaxError

# 不使用 async 直接使用 await 会报错
def m(x):
    y = await z(x)  # Still no - SyntaxError (no `async def` here)
    return y

其次,在使用 await f() 时,需要确保 f() 是“可等待的“。可等待对象通常需要实现 __await__() 方法。async f() 就是可等待的。

async 中使用 yield

在 async 方法中使用 yield,会生成一个异步的生成器。可以使用async for迭代返回值。比如:

async def mygen(u: int = 10):
    """Yield powers of 2."""
    i = 0
    while i < u:
        yield 2 ** i
        i += 1
        await asyncio.sleep(0.1)

async def main():
    # This does *not* introduce concurrent execution
    # It is meant to show syntax only
    g = [i async for i in mygen()]
    f = [j async for j in mygen() if not (j // 3 % 5)]
    return g, f

g, f = asyncio.run(main())

# g = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
# f = [1, 2, 16, 32, 256, 512]

asyncio的一些其它API

Queue

asyncio.Queue(maxsize=0) 实现了一个先进先出的队列(FIFO)。如果 maxsize 小于或等于零,则队列大小是无限的。如果它是一个大于 0 的整数,那么当队列达到 maxsize 时 await put() 会阻塞,直到一个任务被 get() 移除。

使用 asyncio.Queue() 实现发布订阅模式:

#!/usr/bin/env python3
# asyncq.py

import asyncio
import itertools as it
import os
import random
import time


async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()


async def randsleep(caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)


async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")


async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now - t:0.5f} seconds.")
        q.task_done()


async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()


if __name__ == "__main__":
    import argparse

    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=5)
    parser.add_argument("-c", "--ncon", type=int, default=10)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

通过异步的队列,解耦生产者和消费者,加快程序的执行。

create_task

使用 create_task() 创建多个

import asyncio

async def coro(seq) -> list:
    """'IO' wait time is proportional to the max element."""
    await asyncio.sleep(max(seq))
    return list(reversed(seq))

async def main():
    # This is a bit redundant in the case of one task
    # We could use `await coro([3, 2, 1])` on its own
    t = asyncio.create_task(coro([3, 2, 1]))  # Python 3.7+
    await t
    print(f't: type {type(t)}')
    print(f't done: {t.done()}')

t = asyncio.run(main())

补充

  1. asyncio 包并不是 Async IO 的唯一的实现,还有一些其它比较知名的包,比如:curio、trio。不过使用 asyncio 仍然是不错的选择。
  2. 在 Python 中 ,Async IO 默认通过单个 CPU 的单个进程完成,也可以主动使用多进程。
  3. asyncio.run() 的另一种写法:
# asyncio.run(main()) 等价于下面的写法
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

4 . Python 线程在绑定给 event loop 之前,自身不会执行。

5 . Event loop 是插拔式的。可以使用任何 event loop 的实现,与协程本身的结构没有关系。也可以参照 uvloop(https://github.com/MagicStack/uvloop)例子,实现自己的event loop。asyncio 包自身就有两种协程实现可以选择。

参考

https://realpython.com/async-io-python/(例子都摘自此文)

https://zhuanlan.zhihu.com/p/64991670

https://docs.python.org/3/library/asyncio-queue.html

本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/sjLwHc7XY7YxnAPpCHxp4g

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:1年以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:1年以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:1年以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:1年以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:1年以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:1年以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:1年以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:1年以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:1年以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:1年以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:1年以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:1年以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:1年以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:1年以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:1年以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:1年以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:1年以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:1年以前  |  398次阅读  |  详细内容 »
 相关文章
Android插件化方案 5年以前  |  237267次阅读
vscode超好用的代码书签插件Bookmarks 2年以前  |  8105次阅读
 目录