这两天因为一点个人原因写了点好久没碰的 Python ,其中涉及到「协程」编程,上次搞的时候,它还是 Web 框架 tornado
特有的 feature
,现在已经有 async
、await
关键字支持了。思考了一下其实现,回顾了下这些年的演变,觉得还有点意思。
都是单线程,为什么原来低效率的代码用了
async
、await
加一些异步库就变得效率高了?
如果在做基于 Python 的网络或者 Web 开发时,对于这个问题曾感到疑惑,这篇文章尝试给一个答案。
首先,本文不是带你浏览源代码,然后对照原始代码给你讲 Python 标准的实现。相反,我们会从实际问题出发,思考解决问题的方案,一步步体会解决方案的演进路径,最重要的,希望能在过程中获得知识系统性提升。 ⚠️本文仅是提供了一个独立的思考方向,并未遵循历史和现有实际具体的实现细节。
其次,阅读这篇文章需要你对 Python 比较熟悉,至少了解 Python 中的生成器 generator
的概念。
这是性能的关键。但我们这里只解释概念,其实现细节不是重点,这对我们理解 Python 的协程已经足够了,如已足够了解,前进到 0x02
。首先,你要知道所有的网络服务程序都是一个巨大的死循环,你的业务逻辑都在这个循环的某个时刻被调用:
def handler(request):
# 处理请求
pass
# 你的 handler 运行在 while 循环中
while True:
# 获取一个新请求
request = accept()
# 根据路由映射获取到用户写的业务逻辑函数
handler = get_handler(request)
# 运行用户的handler,处理请求
handler(request)
设想你的 Web 服务的某个 handler
,在接收到请求后需要一个 API 调用才能响应结果。
对于最传统的网络应用,你的 API 请求发出去后在等待响应,此时程序停止运行,甚至新的请求也得在响应结束后才进得来。如果你依赖的 API 请求网络丢包严重,响应特别慢呢?那应用的吞吐量将非常低。
很多传统 Web 服务器使用多线程技术解决这个问题:把 handler
的运行放到其他线程上,每个线程处理一个请求,本线程阻塞不影响新请求进入。这能一定程度上解决问题,但对于并发比较大的系统,过多线程调度会带来很大的性能开销。
IO 多路复用可以做到不使用线程解决问题,它是由操作系统内核提供的功能,可以说专门为这类场景而生。简单来讲,你的程序遇到网络IO时,告诉操作系统帮你盯着,同时操作系统提供给你一个方法,让你可以随时获取到有哪些 IO 操作已经完成。就像这样:
# 操作系统的IO复用示例伪代码
# 向操作系统IO注册自己关注的IO操作的id和类型
io_register(io_id, io_type)
io_register(io_id, io_type)
# 获取完成的IO操作
events = io_get_finished()
for (io_id, io_type) in events:
if io_type == READ:
data = read_data(io_id)
elif io_type == WRITE:
write_data(io_id,data)
把 IO 复用逻辑融合到我们的服务器中,大概会像这样:
call_backs = {}
def handler(req):
# do jobs here
io_register(io_id, io_type)
def call_back(result):
# 使用返回的result完成剩余工作...
call_backs[io_id] = call_back
# 新的循环
while True:
# 获取已经完成的io事件
events = io_get_finished()
for (io_id, io_type) in events:
if io_type == READ: # 读取
data = read(io_id)
call_back = call_backs[io_id]
call_back(data)
else:
# 其他类型io事件的处理
pass
# 获取一个新请求
request = accept()
# 根据路由映射获取到用户写的业务逻辑函数
handler = get_handler(request)
# 运行用户的handler,处理请求
handler(request)
我们的 handler
对于 IO 操作,注册了回调就立刻返回,同时每次迭代都会对已完成的 IO 执行回调,网络请求不再阻塞整个服务器。
上面的伪代码仅便于理解,具体实现细节更复杂。而且就连接受新请求也是在从操作系统得到监听端口的 IO 事件后进行的。
我们如果把循环部分还有 call_backs
字典拆分到单独模块,就能得到一个 EventLoop
,也就是 Python 标准库 asyncio
包中提供的 ioloop
。
着重看下我们业务中经常写的 handler
函数,在有独立的 ioloop
后,它现在变成类似这样:
def handler(request):
# 业务逻辑代码...
# 需要执行一次 API 请求
def call_back(result):
# 使用 API 返回的result完成剩余工作
print(result)
# 没有io_call这个方法,这里只是示意,表示注册一个IO操作
asyncio.get_event_loop().io_call(api, call_back)
到这里,性能问题已经解决了:我们不再需要多线程就能源源不断接受新请求,而且不用care依赖的 API 响应有多慢。
但是我们也引入了一个新问题,原来流畅的业务逻辑代码现在被拆成了两部分,请求 API 之前的代码还正常,请求 API 之后的代码只能写在回调函数里面了。
这里我们业务逻辑只有一个 API 调用,如果有多个 API ,再加上对 Redis 或者 MySQL 的调用(它们本质也是网络请求),整个逻辑会被拆分的更散,这对业务开发是一笔负担。
对于有匿名函数的一些语言(没错就是JavaScript),还可能会引发所谓的「回调地狱」。
接下来我们想办法解决这个问题。
我们很容易会想到:如果函数在运行到网络 IO 操作处后能够暂停,完成后又能在断点处唤醒就好了。
如果你对 Python 的「生成器」熟悉,你应该会发现,它恰好具有这个功能:
def example():
value = yield 2
print("get", value)
return value
g = example()
# 启动生成器,我们会得到 2
got = g.send(None)
print(got) # 2
try:
# 再次启动 会显示 "get 4", 就是我们传入的值
got = g.send(got*2)
except StopIteration as e:
# 生成器运行完成,将会print(4),e.value 是生成器return的值
print(e.value)
函数中有 yield
关键字,调用函数将会得到一个生成器,生成器一个关键的方法 send()
可以跟生成器交互。
g.send(None)
会运行生成器内代码直到遇到 yield
,并返回其后的对象,也就是 2
,生成器代码就停在这里了,直到我们再次执行 g.send(got*2)
,会把 2*2
也就是 4
赋值给yield
前面的变量 value
,然后继续运行生成器代码。
yield
在这里就像一扇门,可以把一件东西从这里送出去,也可以把另一件东西拿进来。
如果 send
让生成器运行到下一个 yield
前就结束了,send
调用会引发一个特殊的异常StopIteration
,这个异常自带一个属性 value
,为生成器 return
的值。
如果我们把我们的 handler
用 yield
关键字转换成一个生成器,运行它来把 IO 操作的具体内容返回,IO 完成后的回调函数中把 IO 结果放回并恢复生成器运行,那就解决了业务代码不流畅的问题了:
def handler(request):
# 业务逻辑代码...
# 需要执行一次 API 请求,直接把 IO 请求信息yield出去
result = yield io_info
# 使用 API 返回的result完成剩余工作
print(result)
# 这个函数注册到ioloop中,用来当有新请求的时候回调
def on_request(request):
# 根据路由映射获取到用户写的业务逻辑函数
handler = get_handler(request)
g = handler(request)
# 首次启动获得io_info
io_info = g.send(None)
# io完成回调函数
def call_back(result):
# 重新启动生成器
g.send(result)
asyncio.get_event_loop().io_call(io_info, call_back)
上面的例子,用户写的 handler
代码已经不会被打散到 callback
中,on_request
函数使用 callback
和 ioloop
交互,但它会被实现在 Web 框架中,对用户不可见。
上面代码足以给我们提供用生成器消灭的 callback
的启发,但局限性有两点:
我们来看一个更复杂的例子:其中 request
执行真正的 IO,func1
、func2
仅调用。显然我们的代码只能写成这样:
def func1():
ret = yield request("http://test.com/foo")
ret = yield func2(ret)
return ret
def func2(data):
result = yield request("http://test.com/"+data)
return result
def request(url):
# 这里模拟返回一个io操作,包含io操作的所有信息,这里用字符串简化代替
result = yield "iojob of %s" % url
return result
对于 request
,我们把 IO 操作通过 yield
暴露给框架。
对于 func1
和 func2
,调用 request
显然也要加 yield
关键字,否则 request
调用返回一个生成器后不会暂停,继续执行后续逻辑显然会出错。
这基本就是我们在没有 yield from
、aysnc
、await
时代,在 tornado
框架中写异步代码的样子。
要运行整个调用栈,大概流程如下:
func1()
得到生成器send(None)
启动它得到会得到 request("http://test.com/foo")
的结果,还是生成器对象send(None)
启动由 request()
产生的生成器,会得到 IO 操作,由框架注册到 ioloop
并指定回调request
生成器,生成器会走到 return
语句结束request
生成器的返回值,将上一层 func1
唤醒,同时又得到 func2()
生成器对算法和数据结构熟悉的朋友遇到这种前进后退的遍历逻辑,可以递归也可以用栈,因为递归使用生成器还做不到,我们可以使用栈,其实这就是「调用栈」一词的由来。借助栈,我们可以把整个调用链上串联的所有生成器对表现为一个生成器,对其不断 send
就能不断得到所有 IO 操作信息并推动调用链前进,实现方法如下:
send
,如果得到生成器就入栈并进入下一轮迭代yield
出来,让框架注册到 ioloop
如果实现出来,代码不长但信息量比较大。
它把整个调用链对外变成一个生成器,对其调用 send
,就能整个调用链中的 IO,完成这些 IO,继续推动调用链内的逻辑执行,直到整体逻辑结束:
def wrapper(gen):
# 第一层调用 入栈
stack = Stack()
stack.push(gen)
# 开始逐层调用
while True:
# 获取栈顶元素
item = stack.peak()
result = None
# 生成器
if isgenerator(item):
try:
# 尝试获取下层调用并入栈
child = item.send(result)
stack.push(child)
# result 使用过后就还原为None
result = None
# 入栈后直接进入下次循环,继续向下探索
continue
except StopIteration as e:
# 如果自己运行结束了,就暂存result,下一步让自己出栈
result = e.value
else: # IO 操作
# 遇到了 IO 操作,yield 出去,IO 完成后会被用 IO 结果唤醒并暂存到 result
result = yield item
# 走到这里则本层已经执行完毕,出栈,下次迭代将是调用链上一层
stack.pop()
# 没有上一层的话,那整个调用链都执行完成了,return
if stack.empty():
print("finished")
return result
这可能是最复杂的部分,如果看起来吃力的话,其实只要明白,对于上面示例中的调用链,它可以实现的效果如下就好了:
w = wrapper(func1())
# 将会得到 "iojob of http://test.com/foo"
w.send(None)
# 上个iojob foo 完成后的结果"bar"传入,继续运行,得到 "iojob of http://test.com/bar"
w.send("bar")
# 上个iojob bar 完成后的结构"barz"传入,继续运行,结束。
w.send("barz")
有了这部分以后框架再加上配套的代码:
# 维护一个就绪列表,存放所有完成的IO事件,格式为(wrapper,result)
ready = []
def on_request(request):
handler = get_handler(request)
# 使用 wrapper 包装后,可以只通过 send 处理 IO 了
g = wrapper(func1())
# 把开始状态直接视为结果为None的就绪状态
ready.append((g, None))
# 让ioloop每轮循环都执行此函数,用来处理的就绪的IO
def process_ready(self):
def call_back(g, result):
ready.append((g, result))
# 遍历所有已经就绪生成器,将其向下推进
for g, result in self.ready:
# 用result唤醒生成器,并得到下一个io操作
io_job = g.send(result)
# 注册io操作 完成后把生成器加入就绪列表,等待下一轮处理
asyncio.get_event_loop().io_call(
io_job, lambda result: ready.append((g, result)
这里核心思想是维护一个就绪列表,ioloop
每轮迭代都来扫一遍,推动就绪的状态的生成器向下运行,并把新的 IO 操作注册,IO 完成后再次加入就绪,经过几轮 ioloop
的迭代一个 handler
最终会被执行完成。
至此,我们使用生成器写法写业务逻辑已经可以正常运行。
如果到这里能读懂, Python 的协程原理基本就明白了。
我们已经实现了一个微型的协程框架,标准库的实现细节跟这里看起来大不一样,但具体的思想是一致的。
我们的协程框架有一个限制,我们只能把 IO 操作异步化,虽然在网络编程和 Web 编程的世界里,阻塞的基本只有 IO 操作,但也有一些例外,比如我想让当前操作 sleep
几秒,用 time.sleep()
又会让整个线程阻塞住,就需要特殊实现。再比如,可以把一些 CPU 密集的操作通过多线程异步化,让另一个线程通知事件已经完成后再执行后续。
所以,协程最好能与网络解耦开,让等待网络IO只是其中一种场景,提高扩展性。
Python 官方的解决方案是让用户自己处理阻塞代码,至于是向 ioloop
来注册 IO 事件还是开一个线程完全由你自己,并提供了一个标准「占位符」Future
,表示他的结果等到未来才会有,其部分原型如下:
class Future:
# 设置结果
def set_result(result): pass
# 获取结果
def result(): pass
# 表示这个future对象是不是已经被设置过结果了
def done(): pass
# 设置在他被设置结果时应该执行的回调函数,可以设置多个
def add_done_callback(callback): pass
我们的稍加改动就能支持 Future
,让扩展性变得更强。对于用户代码的中的网络请求函数 request
:
# 现在 request 函数,不是生成器,它返回future
def request(url):
# future 理解为占位符
fut = Future()
def callback(result):
# 当网络IO完成回调的时候给占位符赋值
fut.set_result(result)
asyncio.get_event_loop().io_call(url, callback)
# 返回占位符
return future
现在,request
不再是一个生成器,而是直接返回 future
。而对于位于框架中处理就绪列表的函数:
def process_ready(self):
def callback(fut):
# future被设置结果会被放入就绪列表
ready.append((g, fut.result()))
# 遍历所有已经就绪生成器,将其向下推进
for g, result in self.ready:
# 用result唤醒生成器,得到的不再是io操作,而是future
fut = g.send(result)
# future被设置结果的时候会调用callback
fut.add_done_callback(callback)
许多年前用 tornado
的时候,大概只有一个 yield
关键字可用,协程要想实现,就是这么个思路,甚至 yield
关键字和 return
关键字不能一个函数里面出现,你要想在生成器运行完后返回一个值,需要手动 raise
一个异常,虽然效果跟现在 return
一样,但写起来还是很别扭,不优雅。
后来有了 yield from
表达式。
它可以做什么?通俗地说,它就是做了上面那个生成器 wrapper
所做的事:通过栈实现调用链遍历的 ,它是 wrapper
逻辑的语法糖。
有了它,同一个例子你可以这么写:
def func1():
# 注意 yield from
ret = yield from request("http://test.com/foo")
# 注意 yield from
ret = yield from func2(ret)
return ret
def func2(data):
# 注意 yield from
result = yield from request("http://test.com/"+data)
return result
# 现在 request 函数,不是生成器,它返回future
def request(url):
# 同上基于future实现的request
然后你就不再需要那个烧脑的 wrapper
函数了:
g = func1()
# 返回第一个请求的 future
g.send(None)
# 继续运行,自动进入func2 并得到第它里面的那个future
g.send("bar")
# 继续运行,完成调用链剩余逻辑,抛出StopIteration异常
g.send("barz")
yield from
直接打通了整个调用链,已经是很大的进步了,但是用来异步编程看着还是别扭,其他语言都有专门的协程 async
、await
关键字了,直到再后来的版本把这些内容用专用的 async
、await
关键字包装,才成为今天比较优雅的样子。
总的来说, Python 的原生的协程从两方面实现:
callback
代码变成同步代码,减少业务编写困难有生成器这种对象的语言,其 IO 协程实现大抵如此,JavaScript 协程的演进基本一模一样,关键字相同,Future
类比 Promise
本质相同。
但是对于以协程出名的 Go 的协程实现跟这个就不同了,并不显式地基于生成器。
如果类比的话,可以 Python 的 gevent
算作一类,都是自己实现 runtime
,并 patch
掉系统调用接入自己的 runtime
,自己来调度协程,gevent
专注于网络相关,基于网络 IO 调度,比较简单,而 Go 实现了完善的多核支持,调度更加复杂和完善,而且创造了基于 channel
新编程范式。
本文由哈喽比特于4年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/CKjFHV-xFfK_n_9DKFq6Eg
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。