Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
本文是是celery源码解析的第七篇,在前六篇里分别介绍了:
本章我们从celery的蓝图学习celery的实现细节。
celery的蓝图,官方的解释是 A directed acyclic graph of reusable components ,翻译过来就是 可重用组件的有向无环图 。有WorkController(也叫worker)和Consumer两个蓝图,每个蓝图又由一些step组成,这些step根据依赖关系(requires)组成下面的树结构:
WorkController(Blueprint)
|- StateDB
|- Timer
|- Hub
|- Pool
|- WorkerComponent(Autoscaler)
|- Beat
|- Consumer(Blueprint)
|- Connection
|- Agent
|- Events
|- Mingle
|- Gossip
|- Tasks
|- Control
|- Heart
|- Evloop
其中Consumer是WorkController的一个step,这个step又启动了一个Consumer的蓝图,形成一个蓝图嵌蓝图的结构。蓝图这个词,可以理解为celery启动的时候需要一些步骤,这些步骤是有依赖顺序的,同级的步骤构成一个蓝图。
Worker蓝图包括{StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
七个步骤,一般情况下仅仅启动了其中的三个Hub, Pool, Consumer
:
[2021-11-24 15:53:12,984: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2021-11-24 15:53:12,988: DEBUG/MainProcess] | Worker: Building graph...
[2021-11-24 15:53:12,988: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
...
[2021-11-24 15:53:13,062: DEBUG/MainProcess] | Worker: Starting Hub
[2021-11-24 15:53:13,062: DEBUG/MainProcess] ^-- substep ok
[2021-11-24 15:53:13,062: DEBUG/MainProcess] | Worker: Starting Pool
[2021-11-24 15:53:13,410: DEBUG/MainProcess] ^-- substep ok
[2021-11-24 15:53:13,411: DEBUG/MainProcess] | Worker: Starting Consumer
这七个蓝图的顺序和配置的顺序是有差异的:
default_steps = {
'celery.worker.components:Hub',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
Consumer蓝图包括{Connection, Events, Mingle, Tasks, Control, Gossip, Agent, Heart, event loop}
十个步骤,一般情况下除了Agent
, 其它都会启动。
[2021-11-24 15:53:13,005: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2021-11-24 15:53:13,005: DEBUG/MainProcess] | Consumer: Building graph...
[2021-11-24 15:53:13,038: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Gossip, Agent, Heart, event loop}
...
Blueprint
主要有2个实现函数:apply
创建各个step,start
启动各个step:
def apply(self, parent, **kwargs):
# 创建step
self._debug('Preparing bootsteps.')
order = self.order = []
steps = self.steps = self.claim_steps()
self._debug('Building graph...')
for S in self._finalize_steps(steps):
step = S(parent, **kwargs)
steps[step.name] = step
order.append(step)
self._debug('New boot order: {%s}',
', '.join(s.alias for s in self.order))
for step in order:
# 隐式的创建step
step.include(parent)
return self
def start(self, parent):
# 启动蓝图
...
for i, step in enumerate(s for s in parent.steps if s is not None):
self._debug('Starting %s', step.alias)
self.started = i + 1
step.start(parent)
logger.debug('^-- substep ok')
Step
和子类StartStopStep
使用enbled属性,决定step步骤的是否启用:
enabled = True
def include_if(self, parent):
return self.enabled
def _should_include(self, parent):
if self.include_if(parent):
return True, self.create(parent)
return False, None
def include(self, parent):
inc, ret = self._should_include(parent)
if inc:
self.obj = ret
parent.steps.append(self)
return inc
比如默认情况下StateDB会根据参数关闭:
@click.option('-S',
'--statedb',
cls=CeleryOption,
type=click.Path(),
callback=lambda ctx, _, value: value or ctx.obj.app.conf.worker_state_db,
help_group="Worker Options",
help="Path to the state database. The extension '.db' may be "
"appended to the filename.")
...
class StateDB(bootsteps.Step):
"""Bootstep that sets up between-restart state database file."""
def __init__(self, w, **kwargs):
self.enabled = w.statedb
...
Step
类还有requires和last两个属性,blueprint可以根据这两个属性建立所有步骤的先后顺序:
def _find_last(self):
# 查找steps的尾
return next((C for C in self.steps.values() if C.last), None)
def _firstpass(self, steps):
# 查找依赖关系
for step in steps.values():
step.requires = [symbol_by_name(dep) for dep in step.requires]
stream = deque(step.requires for step in steps.values())
# 广度优先的遍历
while stream:
for node in stream.popleft():
node = symbol_by_name(node)
if node.name not in self.steps:
steps[node.name] = node
stream.append(node.requires)
Consumer这个特殊的Step是这样嵌套启动Consumer蓝图的:
class Consumer(bootsteps.StartStopStep):
"""Bootstep starting the Consumer blueprint."""
def create(self, w):
# consumer_cls就是Consumer蓝图
c = w.consumer = self.instantiate(
w.consumer_cls, w.process_task,
hostname=w.hostname,
task_events=w.task_events,
init_callback=w.ready_callback,
initial_prefetch_count=prefetch_count,
pool=w.pool,
timer=w.timer,
app=w.app,
controller=w,
hub=w.hub,
worker_options=w.options,
disable_rate_limits=w.disable_rate_limits,
prefetch_multiplier=w.prefetch_multiplier,
)
return c
celery将启动过程分成多个step,每个step承担不同的功能,不同的step又组合成多个蓝图,这种方式可以灵活的定义启动流程,并且让业务功能解耦,更易维护。下面我们继续学习其中的一些step。
Connection-Step主要功能是创建connection连接:
class Connection(bootsteps.StartStopStep):
"""Service managing the consumer broker connection."""
def __init__(self, c, **kwargs):
c.connection = None
super().__init__(c, **kwargs)
def start(self, c):
# 创建连接
c.connection = c.connect()
info('Connected to %s', c.connection.as_uri())
Pool-Step主要功能是启动一个调度池:
# Initialize bootsteps
self.pool_cls = _concurrency.get_implementation(self.pool_cls)
def create(self, w):
...
# 启动concurrency模型
pool = w.pool = self.instantiate(
w.pool_cls, w.min_concurrency,
initargs=(w.app, w.hostname),
maxtasksperchild=w.max_tasks_per_child,
max_memory_per_child=w.max_memory_per_child,
timeout=w.time_limit,
soft_timeout=w.soft_time_limit,
putlocks=w.pool_putlocks and threaded,
lost_worker_timeout=w.worker_lost_wait,
threads=threaded,
max_restarts=max_restarts,
allow_restart=allow_restart,
forking_enable=True,
semaphore=semaphore,
sched_strategy=self.optimization,
app=w.app,
)
...
return pool
并发模型主要包括下面一些实现,比如基于fork的多进程,基于eventlet和gevent的协程和多线程等:
ALIASES = {
'prefork': 'celery.concurrency.prefork:TaskPool',
'eventlet': 'celery.concurrency.eventlet:TaskPool',
'gevent': 'celery.concurrency.gevent:TaskPool',
'solo': 'celery.concurrency.solo:TaskPool',
'processes': 'celery.concurrency.prefork:TaskPool', # XXX compat alias
'threads': 'celery.concurrency.thread:TaskPool'
}
def get_implementation(cls):
"""Return pool implementation by name."""
return symbol_by_name(cls, ALIASES)
在前一篇的日志中,我们知道默认使用的是prefork也就是多线程模式:
class TaskPool(BasePool):
"""Multiprocessing Pool implementation."""
# billiard提供的池模式
BlockingPool = BlockingPool
...
TaskPool的实现主要依赖billiard库,我们以后再行介绍,这里简单了解一下celery的并发模型都在concurrency模块之下即可。
Evloop-Step是由Consumer blueprint启动:
class Evloop(bootsteps.StartStopStep):
"""Event loop service.
Note:
This is always started last.
"""
# [2021-11-24 20:08:31,037: DEBUG/MainProcess] | Consumer: Starting event loop
label = 'event loop'
last = True
def start(self, c):
self.patch_all(c)
c.loop(*c.loop_args())
这里的loop在consumer中定义, 默认使用异步循环(asynloop)和同步循环(synloop)中的同步循环:
def synloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0, **kwargs):
"""Fallback blocking event loop for transports that doesn't support AIO."""
RUN = bootsteps.RUN
on_task_received = obj.create_task_handler()
perform_pending_operations = obj.perform_pending_operations
...
consumer.on_message = on_task_received
consumer.consume()
obj.on_ready()
while blueprint.state == RUN and obj.connection:
...
try:
perform_pending_operations()
connection.drain_events(timeout=2.0)
except socket.timeout:
...
循环中主要功能是:
因为synloop会阻塞,所以需要设置step为last,确保在蓝图的最后启动。
我们再查看celery的任务处理日志:
[2021-11-24 21:33:50,535: INFO/MainProcess] Received task: myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2]
[2021-11-24 21:33:50,535: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7fe6086ac280> (args:('myapp.add', 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', {'lang': 'py', 'task': 'myapp.add', 'id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'parent_id': None, 'argsrepr': '(16, 16)', 'kwargsrepr': '{}', 'origin': 'gen83110@192.168.5.28', 'reply_to': '63862dbb-9d82-3bdd-b7fb-03580941362a', 'correlation_id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'hostname': 'celery@192.168.5.28', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [16, 16], 'kwargs': {}}, b'[[16, 16], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2021-11-24 21:33:50,536: DEBUG/MainProcess] Task accepted: myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] pid:83086
[2021-11-24 21:33:50,537: INFO/ForkPoolWorker-8] Task myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] succeeded in 0.000271957000000711s: 32
可以发现celery的worker在主进程(MainProcess)中接收到task后,会派发给子进程(ForkPoolWorker-8)执行。
前面synloop的消费函数on_message实际上是Consumer(Blueprint)的create_task_handler:
def create_task_handler(self, promise=promise):
strategies = self.strategies
on_unknown_message = self.on_unknown_message
on_unknown_task = self.on_unknown_task
on_invalid_task = self.on_invalid_task
callbacks = self.on_task_message
call_soon = self.call_soon
def on_task_received(message):
type_ = message.headers['task']
...
strategy = strategies[type_]
strategy(
message, payload,
promise(call_soon, (message.ack_log_error,)),
promise(call_soon, (message.reject_log_error,)),
callbacks,
)
...
return on_task_received
对于消息和任务的处理,celery提供了默认的执行策略:
# celery/worker/strategy.py:22
def default(task, app, consumer,
info=logger.info, error=logger.error, task_reserved=task_reserved,
to_system_tz=timezone.to_system, bytes=bytes,
proto1_to_proto2=proto1_to_proto2):
"""Default task execution strategy."""
...
# task event related
# (optimized to avoid calling request.send_event)
handle = consumer.on_task_request
...
Request = symbol_by_name(task.Request)
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
revoked_tasks = consumer.controller.state.revoked
def task_message_handler(message, body, ack, reject, callbacks,
to_timestamp=to_timestamp):
....
req = Req(
message,
on_ack=ack, on_reject=reject, app=app, hostname=hostname,
eventer=eventer, task=task, connection_errors=connection_errors,
body=body, headers=headers, decoded=decoded, utc=utc,
)
...
info('Received task: %s', req)
...
handle(req)
return task_message_handler
default策略主要做了下面2件事:
Request对象的执行是调用pool的执行方法:
def execute_using_pool(self, pool, **kwargs):
"""Used by the worker to send this task to the pool."""
result = pool.apply_async(
trace_task_ret,
args=(self._type, task_id, self._request_dict, self._body,
self._content_type, self._content_encoding),
accept_callback=self.on_accepted,
timeout_callback=self.on_timeout,
callback=self.on_success,
error_callback=self.on_failure,
soft_timeout=soft_time_limit or task.soft_time_limit,
timeout=time_limit or task.time_limit,
correlation_id=task_id,
)
# cannot create weakref to None
self._apply_result = maybe(ref, result)
return result
这样远程的任务请求就派发给Pool进行执行, pool如何执行task同样以后再介绍。
celery作为一款分布式任务调度框架,多个worker的协作由Mingle和Gossip两个step提供。我们先看Mingle-Step的日志:
[2021-12-12 13:37:56,632: DEBUG/MainProcess] | Consumer: Starting Mingle
[2021-12-12 13:37:56,632: INFO/MainProcess] mingle: searching for neighbors
[2021-12-12 13:37:57,674: INFO/MainProcess] mingle: all alone
...
Mingle-Step实现多个worker节点的同步通讯:
def start(self, c):
self.sync(c)
def sync(self, c):
info('mingle: searching for neighbors')
replies = self.send_hello(c)
if replies:
info('mingle: sync with %s nodes',
len([reply for reply, value in replies.items() if value]))
[self.on_node_reply(c, nodename, reply)
for nodename, reply in replies.items() if reply]
info('mingle: sync complete')
else:
info('mingle: all alone')
可以看到Mingle启动后,发送hello消息,然后对其它节点的回应进行处理。hello的发送是这样的:
def send_hello(self, c):
inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
our_revoked = c.controller.state.revoked
replies = inspect.hello(c.hostname, our_revoked._data) or {}
replies.pop(c.hostname, None) # delete my own response
return replies
...
# celery/app/control.py
def hello(self, from_node, revoked=None):
return self._request('hello', from_node=from_node, revoked=revoked)
对于回应的主要处理就是对当前worker的LamportClock进行校正。:
def on_node_reply(self, c, nodename, reply):
...
c.app.clock.adjust(clock) if clock else c.app.clock.forward()
...
Gossip-Step的功能会复杂一些,不像Mingle是一次性的,它是一个持续的过程。下面是它的日志,清晰展示会持续的监听:
[2021-12-05 15:59:19,088: DEBUG/MainProcess] w2@bogon joined the party[2021-12-12 13:37:58,096: DEBUG/MainProcess] w2@bogon joined the party
[2021-12-12 14:52:49,259: INFO/MainProcess] missed heartbeat from w2@bogon
[2021-12-12 14:52:49,262: DEBUG/MainProcess] w2@bogon joined the party
...
[2021-12-12 16:10:54,112: DEBUG/MainProcess] w2@bogon left
Gossip是一种算法,又称流行病算法,其图示如下:
gossip
简单的说在Gossip算法中网络节点每次向自己关联的节点广播消息,直到网络中所有节点都收到消息。
celery的gossip处理消息的过程是创建自己的Consumer和定时器:
def get_consumers(self, channel):
# 定时处理worker激活事件
self.register_timer()
# 消息消费者
ev = self.Receiver(channel, routing_key='worker.#',
queue_ttl=self.heartbeat_interval)
return [Consumer(
channel,
queues=[ev.queue],
on_message=partial(self.on_message, ev.event_from_message),
no_ack=True
)]
定时器负责处理其它节点的活跃状态, 如果节点不活跃,将它标记为脏节点,进行节点丢失处理,然后移除节点:
def periodic(self):
workers = self.state.workers
dirty = set()
for worker in workers.values():
if not worker.alive:
dirty.add(worker)
self.on_node_lost(worker)
for worker in dirty:
workers.pop(worker.hostname, None)
消费的消息,又分成2种类型: 选举消息和其它消息。
def on_message(self, prepare, message):
_type = message.delivery_info['routing_key']
try:
# 选举事件
handler = self.event_handlers[_type]
except KeyError:
pass
else:
return handler(message.payload)
# proto2: hostname in header; proto1: in body
hostname = (message.headers.get('hostname') or
message.payload['hostname'])
if hostname != self.hostname:
...
# 其它事件
_, event = prepare(message.payload)
self.update_state(event)
...
else:
self.clock.forward()
选举类的消息是处理选举消息和选举ack消息:
self.event_handlers = {
'worker.elect': self.on_elect,
'worker.elect.ack': self.on_elect_ack,
}
def on_elect(self, event):
...
def on_elect_ack(self, event):
...
其它事件主要是节点的上下线之类:
self.state = c.app.events.State(
on_node_join=self.on_node_join,
on_node_leave=self.on_node_leave,
max_tasks_in_memory=1,
)
def on_node_join(self, worker):
debug('%s joined the party', worker.hostname)
self._call_handlers(self.on.node_join, worker)
def on_node_leave(self, worker):
debug('%s left', worker.hostname)
self._call_handlers(self.on.node_leave, worker)
我们通过解析celery的两个Blueprint,了解到celery worker的启动流程包括建立和broker之间的AMQP协议连接,使用进程池/线程池/协程池方式处理任务,使用hello消息进行worker节点之间的LamportClock时钟校时,使用Gossip协议进行worker节点之间的通讯协作。在多进程情况下,每次的任务都先被主进程获取,然后分配给进程池中的子进程进行执行。
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/hgBBOH9yUmH370f2O9FAVg
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。