Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
本文是是celery源码解析的第四篇,在前3篇里分别介绍了vine, py-amqp和kombu:
本篇我们继续聊聊kombu这个python实现的消息库中的一些常用算法实现,和各种排序算法不一样,都是解决一些具体的业务问题,非常有用。本文包括下面几个部分:
缓存,顾名思义,就是将计算结果暂时存起来,以供后期使用,这样可以省去重复计算的工作。比如我们计算斐波那契数列的递归算法:
# 根据定义递归求解
def fib(n):
if n <= 1:
return n
return fib(n - 1) + fib(n - 2)
我们求n为5的数,展开数学公式大概如下(这里简化python函数fib名称为数学函数f):
f(5)=f(4) +f(3)
=f(3) +f(2) +f(2) +f(1)
=f(2) +f(1)+f(1)+f(0)+f(1)+f(0)+f(1)
=f(1)+f(0)+f(1)+f(1)+f(0)+f(1)+f(0)+f(1)
=5
根据数学公式,我们可以知道,在执行f(5)过程中,重复执行了5次f(1), 3次f(0)。要提高执行效率,就可以用到缓存。最简单的实现版本:
# 根据定义递归求解
cache = {}
def fib_v1(n):
if n in cache:
return cache[n]
if n <= 1:
result = n
else:
result = fib(n - 1) + fib(n - 2)
cache[n] = result
return result
这种实现方式有2个弊端,一个是依赖一个外部的cache变量,另一个是cache功能和fib函数绑定,还需要修改fib函数。我们可以通过一个装饰器实现这个cache,而不用改动fib函数:
def cache_decorator(fun):
_cache = {}
def wrapper(*args, **kwargs):
if args in _cache:
return _cache[args]
else:
ret = fun(*args, **kwargs)
_cache[args] = ret
return ret
return wrapper
使用的时候可以直接给fib函数添加上装饰器:
@cache_decorator
def fib(n):
...
这种缓存实现实现方式,还是会有问题:无法进行清理,内存会持续增长。编程中有一句话是: 命名和缓存失效是计算机科学里面最难应对的两件事。关于缓存淘汰有各种算法,请见参考链接,我这里重点介绍一下LRU和LFU。
关于LRU,在我之前介绍tinydb时候有过介绍。其中的实现如下:
class LRUCache(abc.MutableMapping, Generic[K, V]):
def __init__(self, capacity=None):
self.capacity = capacity # 缓存容量
self.cache = OrderedDict() # 有序字典
def get(self, key: K, default: D = None) -> Optional[Union[V, D]]:
value = self.cache.get(key) # 从换成获取
if value is not None:
del self.cache[key]
self.cache[key] = value # 更新缓存顺序
return value
return default
def set(self, key: K, value: V):
if self.cache.get(key):
del self.cache[key]
self.cache[key] = value # 更新缓存顺序及值
else:
self.cache[key] = value
if self.capacity is not None and self.length > self.capacity:
self.cache.popitem(last=False) # 淘汰最古老的数据
LRU的特点只要保持缓存数据是有序的, 我们甚至不需要自己实现,使用系统functools中的实现:
from functools import lru_cache
@lru_cache()
def fib(n):
...
kombu中给我们提供了一个线程安全的版本, 主要实现:
# kombu-5.0.0/kombu/utils/functional.py
class LRUCache(UserDict):
"""LRU Cache implementation using a doubly linked list to track access.
"""
def __init__(self, limit=None):
self.limit = limit
self.mutex = threading.RLock()
self.data = OrderedDict()
def __getitem__(self, key):
with self.mutex:
value = self[key] = self.data.pop(key)
return value
def __setitem__(self, key, value):
# remove least recently used key.
with self.mutex:
if self.limit and len(self.data) >= self.limit:
self.data.pop(next(iter(self.data)))
self.data[key] = value
...
上面代码在设置和获取数据时候都先获取锁,然后再进行数据操作。
关于缓存使用,除了通过业务场景判断适用那种淘汰算法外,还可以使用具体的缓存命中率指标进行分析:
def memoize(maxsize=None, keyfun=None, Cache=LRUCache):
"""Decorator to cache function return value."""
def _memoize(fun):
mutex = threading.Lock()
cache = Cache(limit=maxsize)
@wraps(fun)
def _M(*args, **kwargs):
if keyfun:
key = keyfun(args, kwargs)
else:
key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items()))
try:
with mutex:
value = cache[key]
except KeyError:
value = fun(*args, **kwargs)
# 未命中需要执行函数
_M.misses += 1
with mutex:
cache[key] = value
else:
# 命中率增加
_M.hits += 1
return value
def clear():
"""Clear the cache and reset cache statistics."""
# 清理缓存及统计
cache.clear()
_M.hits = _M.misses = 0
# 统计信息
_M.hits = _M.misses = 0
_M.clear = clear
_M.original_func = fun
return _M
return _memoize
memoize的实现并不复杂,增加了hits/misses数据,可以统计分析缓存的命中率,帮助正确使用LRU缓存。还添加了clear接口,可以在需要的时候对缓存直接进行清理。
注意memoize使用了一个锁,在LRUCache还是使用了一个锁,这个锁的使用,我们以后再讲。
限流是指在系统面临高并发、大流量请求的情况下,限制新的流量对系统的访问,从而保证系统服务的安全性。常用的限流算法有计数器、漏斗算法和令牌桶算法。其中计数器算法又分固定窗口算法、滑动窗口算法,后者我们在TCP协议中经常会碰到。
算法中存在一个令牌桶,以恒定的速率向令牌桶中放入令牌。当请求来时,会首先到令牌桶中去拿令牌,如果拿到了令牌,则该请求会被处理,并消耗掉令牌;如果拿不到令牌,则该请求会被丢弃。当然令牌桶也有一定的容量,如果满了令牌就无法放进去了,这样算法就有限流作用。又因为令牌产生的速率是很定的,如果消费速率较低,桶里会额外缓存一部分令牌,用于应对流量突发时候的消耗。下面是算法的示意图:
Token bucket Diagram
我们具体看看kombu中提供的实现。TokenBucket类:
class TokenBucket:
#: The rate in tokens/second that the bucket will be refilled.
fill_rate = None
#: Maximum number of tokens in the bucket.
capacity = 1
#: Timestamp of the last time a token was taken out of the bucket.
timestamp = None
def __init__(self, fill_rate, capacity=1):
# 容量上限
self.capacity = float(capacity)
# 剩余令牌数,初始等于容量上限
self._tokens = capacity
# 填充率
self.fill_rate = float(fill_rate)
self.timestamp = monotonic()
# 数据容器
self.contents = deque()
def add(self, item):
self.contents.append(item)
def pop(self):
# 先进先出
return self.contents.popleft()
代码包括:
令牌桶是否可用的判断:
def can_consume(self, tokens=1):
"""Check if one or more tokens can be consumed.
Returns:
bool: true if the number of tokens can be consumed
from the bucket. If they can be consumed, a call will also
consume the requested number of tokens from the bucket.
Calls will only consume `tokens` (the number requested)
or zero tokens -- it will never consume a partial number
of tokens.
"""
if tokens <= self._get_tokens():
# 消费n个令牌
self._tokens -= tokens
return True
return False
def _get_tokens(self):
if self._tokens < self.capacity:
# 记录当前时间
now = monotonic()
# 计算已经流失的令牌数量
delta = self.fill_rate * (now - self.timestamp)
# 更新容量上限或者剩余令牌和流失数量之和
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens
我们可以看到,算法在进行令牌消费判断的同时,还会对桶的剩余流量进行自校正,很巧妙。
TokenBucket的使用在ConsumerMixin的run方法中。创建了一个速率为1的令牌桶,然后持续的进行消费。如果有令牌则消费消费者上的消息;如果没有令牌则进行休眠
#ch23-celery/kombu-5.0.0/kombu/mixins.py:240
class ConsumerMixin:
def run(self, _tokens=1, **kwargs):
restart_limit = TokenBucket(1)
...
# 无限循环
while not self.should_stop:
try:
# 有令牌消费
if restart_limit.can_consume(_tokens): # pragma: no cover
for _ in self.consume(limit=None, **kwargs):
pass
else:
# 没浪费休眠
sleep(restart_limit.expected_time(_tokens))
except errors:
...
其中的休眠时间,是由令牌桶根据期望值计算得来:
def expected_time(self, tokens=1):
"""Return estimated time of token availability.
Returns:
float: the time in seconds.
"""
_tokens = self._get_tokens()
tokens = max(tokens, _tokens)
return (tokens - _tokens) / self.fill_rate
Round-Robin调度算法,最常见的大概是在nginx。Round-Robin方式可让nginx将请求按顺序轮流地分配到后端服务器上,它均衡地对待后端的每一台服务器,而不关心服务器实际的连接数和当前的系统负载,循环往复。在kombu中也提供了几种类似的调度算法:
我们先看Round-Robin方式:
class round_robin_cycle:
"""Iterator that cycles between items in round-robin."""
"""轮询调度算法"""
def __init__(self, it=None):
self.items = it if it is not None else []
def update(self, it):
"""Update items from iterable."""
"""更新列表"""
self.items[:] = it
def consume(self, n):
"""Consume n items."""
"""消费n个元素"""
return self.items[:n]
def rotate(self, last_used):
"""Move most recently used item to end of list."""
"""旋转:把最后一个元素放到列表某尾"""
items = self.items
try:
items.append(items.pop(items.index(last_used)))
except ValueError:
pass
return last_used
算法实现很简单,就是一个有序队列,可以每次消费前n个有序元素,并且可以将最近使用的元素旋转到队尾。下面是旋转的单元测试:
def test_round_robin_cycle():
it = cycle_by_name('round_robin')(['A', 'B', 'C'])
assert it.consume(3) == ['A', 'B', 'C']
it.rotate('B')
assert it.consume(3) == ['A', 'C', 'B']
it.rotate('A')
assert it.consume(3) == ['C', 'B', 'A']
it.rotate('A')
assert it.consume(3) == ['C', 'B', 'A']
it.rotate('C')
assert it.consume(3) == ['B', 'A', 'C']
还有一种公平循环的调度算法:
class FairCycle:
"""Cycle between resources.
Consume from a set of resources, where each resource gets
an equal chance to be consumed from.
Arguments:
fun (Callable): Callback to call.
resources (Sequence[Any]): List of resources.
predicate (type): Exception predicate.
"""
def __init__(self, fun, resources, predicate=Exception):
self.fun = fun
self.resources = resources
self.predicate = predicate
# 初始位置
self.pos = 0
FairCycle是一种资源之间公平循环的调度算法, 构造函数中:
使用的方式是使用get方法传入回调:
def _next(self):
while 1:
try:
resource = self.resources[self.pos]
# 位置加1
self.pos += 1
return resource
except IndexError:
# 到尾部后,重置位置
self.pos = 0
if not self.resources:
raise self.predicate()
def get(self, callback, **kwargs):
"""Get from next resource."""
# 无限重试
for tried in count(0): # for infinity
# 获取资源
resource = self._next()
try:
# 利用资源
return self.fun(resource, callback, **kwargs)
except self.predicate:
# reraise when retries exchausted.
# 容错上限
if tried >= len(self.resources) - 1:
raise
调度主要体现再获取资源的next函数上,没次获取资源后位置标志进行后移,到尾部后在重置到0,继续下一轮循环。算法还可以对资源进行容错,也就是如果获取到的资源无法正常使用,还可以尝试使用下一个资源进行重试。
兰波特时间戳算法(LamportClock),使用逻辑时间戳作为值的版本以允许跨服务器对值进行排序,是解决分布式系统时间一致的重要算法。
服务器上的系统时间,使用物理的晶体振荡测量,会有不准的情况。我们会经常遇到服务器或者快或者慢的情况,一般使用NTP服务,来和互联网上的某个时间源进行同步。如果本地时间提前了,进行联网校时后,会出现本地时间倒退的问题。而对于两台不同的服务器上,要进行时间统一,就更不能使用系统时间。
兰波特时间戳算法,原理如下:
这个过程,可以看下面的图示:
从图中可以看到下面两点:
了解算法的场景和原理后,我们再来看算法的实现。
class LamportClock:
#: The clocks current value.
value = 0
def __init__(self, initial_value=0, Lock=Lock):
self.value = initial_value
self.mutex = Lock()
def adjust(self, other):
with self.mutex:
value = self.value = max(self.value, other) + 1
return value
def forward(self):
with self.mutex:
self.value += 1
return self.value
算法的实现其实非常简单,就是转发的时候时间戳+1;收到消息后进行校正,这个过程中使用线程锁,保证本地的有序。
前面讲的几种算法,都是基于线程锁实现。使用锁会降低效率,如果在协程中,可以使用无锁的方案,会更高效。kombu的LaxBoundedSemaphore实现,可以作为一种参考。
我们先看使用示例:
>>> from future import print_statement as printf
# ^ ignore: just fooling stupid pyflakes
>>> x = LaxBoundedSemaphore(2)
>>> x.acquire(printf, 'HELLO 1')
HELLO 1
>>> x.acquire(printf, 'HELLO 2')
HELLO 2
>>> x.acquire(printf, 'HELLO 3')
>>> x._waiters # private, do not access directly
[print, ('HELLO 3',)]
>>> x.release()
HELLO 3
示例展示了几步:
下面是具体的实现,LaxBoundedSemaphore的构造函数:
class LaxBoundedSemaphore:
def __init__(self, value):
# 信号容量
self.initial_value = self.value = value
# 使用双端队列,FIFO
self._waiting = deque()
self._add_waiter = self._waiting.append
self._pop_waiter = self._waiting.popleft
申请执行回调函数,会进行信号判断,信号充足会执行行回调并消减一次信号量;信号量不足则将函数及参数放入代办的队列:
def acquire(self, callback, *partial_args, **partial_kwargs):
"""Acquire semaphore.
This will immediately apply ``callback`` if
the resource is available, otherwise the callback is suspended
until the semaphore is released.
Arguments:
callback (Callable): The callback to apply.
*partial_args (Any): partial arguments to callback.
"""
value = self.value
if value <= 0:
# 容量不够的时候先暂存执行函数,并不更改可用数量
self._add_waiter((callback, partial_args, partial_kwargs))
return False
else:
# 可用数量-1
self.value = max(value - 1, 0)
# 直接执行函数
callback(*partial_args, **partial_kwargs)
return True
使用release时候会取出头部的代办函数,并进行执行,此时信号量不增不减。如果代办全部执行完成后,则逐步恢复信号量到默认值:
def release(self):
"""Release semaphore.
Note:
If there are any waiters this will apply the first waiter
that is waiting for the resource (FIFO order).
"""
try:
waiter, args, kwargs = self._pop_waiter()
except IndexError:
# 无缓存则只增加可用数量
self.value = min(self.value + 1, self.initial_value)
else:
# 有缓存则执行第一个缓存,可用数量不变还是小于0
waiter(*args, **kwargs)
本篇文章,我们学习了5种实用的业务算法。LRU缓存淘汰算法,可以对缓存中最早的数据进行淘汰。令牌桶限流算法,可以协助进行服务流量限流,较好的保护后端服务,避免突发流量的到时的崩溃。Round-Robin调度算法,可以进行负载的均衡,保障资源的平衡使用。LamportClock时间戳算法,可以在分布式系统中,进行不同服务之间的有序时间戳同步。LaxBoundedSemaphore有限信号量算法,是一种无锁算法,可高效的提供资源使用控制。
kombu中提供了一个自动重试算法,可以作为重试算法的模版:
# kombu-5.0.0/kombu/utils/functional.py
def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
max_retries=None, interval_start=2, interval_step=2,
interval_max=30, callback=None, timeout=None):
kwargs = {} if not kwargs else kwargs
args = [] if not args else args
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
# 超时时间
end = time() + timeout if timeout else None
for retries in count():
try:
return fun(*args, **kwargs)
except catch as exc:
# 超过次数
if max_retries is not None and retries >= max_retries:
raise
# 超过时间
if end and time() > end:
raise
...
# 休眠
sleep(1.0)
从模版可以看到重试时候使用次数和超时时间两个维度进行跳出(不可能无限重试):
实际上关于休眠时间,也有一些更复杂的算法,比如线性递增之类,这里使用了固定间隔的休眠
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/1dZ_sXfLMHFIm1T4Qe2c7Q
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。