Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
本文是是celery源码解析的第三篇,在前两篇里分别介绍了vine和py-amqp:
本篇我们继续celery的基础库: kombu,一个python实现的消息库,在celery中承担核心的消息处理流程。本文包括下面几个部分:
接上篇,我们继续学习AMQP的相关概念。理解这些基础概念对kombu为什么这样实现很有帮助。这次我们用小故事来模拟kombu的消息处理流程。
小学三年级的小明同学喜欢同桌的小红同学,喜欢她的马尾和笑容,经常写小纸条给她。这里小纸条就是Message,小明同学是Producer, 小红同学是Consumer,这种直接投递的方式是direct。有时候,小红同学不在座位上,小明就把纸条放在她的抽屉里。抽屉就当做Queue使用,临时存放投递的消息。老师发现小明和小红上课经常有小动作后,棒打鸳鸯把他们分开了,他们不再是同桌。小明同学没法忘记小红的笑容,距离产生了更多的美,就拜托前面的小马帮他递小纸条,纸条封面上写着“请给小红”。小马就是Exchange,小马的前座也是Exchange,“请给小红”就是消息的route-key。常在河边走,哪有不湿脚。有次纸条被老师抓住,老师让小明同学在讲台上把纸条的内容讲给大家听。当众念小纸条这叫广播, 也就是fanout。
幼稚的小故事也是一种真实的生活,谁又没有写过小纸条呢,请暂停回忆一分钟:) 。业务是生活场景的一种抽象,代码又是更高层一点的抽象。理解业务,就对代码上的概念不发楞。
以上这些概念Exchange,Queue都是broker要实现的内容。可是客户端Producer/Consumer也包含,这是为什么呢?消息传输过程可不可以简化成一个客户端只使用producer发送消息,另外一个客户端只使用consumer消费消息呢?这样也不是不行,前提是AMQP协议中exchange和queue的创建及绑定,需要使用管理工具在broker先创建好,这无疑约束了AMPQ使用的灵活性。kombu中包含了Exchange,Queue模型,主要是用来对broker的管理。
kombu是植物家族的重要一员, 芹菜(celery)、葡萄藤(vine)、海带(kombu)是快乐的一家人。我们解析kombu,采用的版本是 5.0.0
, 主要模块如下:
模块 | 功能 |
---|---|
abstract.py | 抽象的绑定实现,对象是否可以绑定到channel |
compression.py | 压缩算法的汇总 |
connection.py | broker的连接 |
entity.py | 实体类,包括Exchange,binding和Queue对象的实现 |
matcher.py | 匹配策略 |
message.py | 消息对象,并且附带消息的操作接口ack,reject等 |
messaging.py | 消息处理,包括Producer和Consumer |
mixins.py,pools.py,simple.py | 增强功能或者提升便捷使用的封装 |
serialization.py | 序列化算法的汇总 |
transport | 对接各种存储引擎的数据传输实现,主要有内存,redis,pyamqp(RabbitMQ) 等 |
asynchronous | 异步实现 |
kombu底层使用pyamqp提供的AMQP协议支持,并完成Producer,Consumer,Exchange,Queue等模型实现。
老规矩,先从kombu的使用开始。下面是一个生产者发送消息的示例:
# kombu-5.0.0/examples/complete_send.py
from kombu import Connection, Producer, Exchange, Queue
exchange = Exchange('kombu_demo', type='direct')
with Connection('amqp://guest:guest@localhost:5672//') as connection:
producer = Producer(connection)
# 消息需要使用exchange
producer.publish({'hello': 'world'},
exchange=exchange,
routing_key='kombu_demo',
serializer='json', compression='zlib')
生产者示例包括下面几步:
消费者的示例会略微复杂一点:
kombu-5.0.0/examples/complete_receive.py
from pprint import pformat
from kombu import Connection, Exchange, Queue, Consumer, eventloop
exchange = Exchange('kombu_demo', type='direct')
queue = Queue('kombu_demo', exchange, routing_key='kombu_demo')
# 格式化函数
def pretty(obj):
return pformat(obj, indent=4)
#: This is the callback applied when a message is received.
def handle_message(body, message):
print(f'Received message: {body!r}')
print(' properties:\n{}'.format(pretty(message.properties)))
print(' delivery_info:\n{}'.format(pretty(message.delivery_info)))
message.ack()
with Connection('amqp://guest:guest@localhost:5672//') as connection:
with Consumer(connection, queue, callbacks=[handle_message]):
for _ in eventloop(connection):
pass
消费者示例主要包括下面几步:
我们再回头看看下图,对比一下示例,加强理解:
hello-world-example-routing
示例中的生产者位于图的左半区,消费者位于图的右半区。中间部分的broker,在文章的第一篇里,我们使用redis服务作为broker。示例还有重要的一点就是,全程没有创建channel,都是自动创建的。一般情况下,我们有3个进程,Producer进程和Consumer进程通过Broker进程进行消息的处理,这是一个典型的分布式系统。
Proudcer的构造函数:
class Producer:
def __init__(self, channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None, compression=None,
on_return=None):
self._channel = channel
self.exchange = exchange
self.routing_key = routing_key or self.routing_key
self.serializer = serializer or self.serializer
self.compression = compression or self.compression
self.on_return = on_return or self.on_return
self._channel_promise = None
if self.exchange is None:
# 默认的exchange
self.exchange = Exchange('')
...
if self._channel:
self.revive(self._channel)
def revive(self, channel):
"""Revive the producer after connection loss."""
if is_connection(channel):
connection = channel
self.__connection__ = connection
channel = ChannelPromise(lambda: connection.default_channel)
if isinstance(channel, ChannelPromise):
self._channel = channel
self.exchange = self.exchange(channel)
else:
# Channel already concrete
self._channel = channel
if self.on_return:
self._channel.events['basic_return'].add(self.on_return)
self.exchange = self.exchange(channel)
Producer除了设置自身的属性外,还包括对channel的处理。前文介绍过connection也是channel的一种,这里要先处理好connection,然后再从connection获得默认的channel。同时对于已经成功的channel,则进行将producer绑定到channel。self.exchange(channel)
等同于 self.exchange.__call__(channel)
。producer创建完成后,可以通过publish方法发送消息:
def publish(self, body, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0,
content_type=None, content_encoding=None, serializer=None,
headers=None, compression=None, exchange=None, retry=False,
retry_policy=None, declare=None, expiration=None, timeout=None,
**properties):
# 初始化routing-key, exchange
routing_key = self.routing_key if routing_key is None else routing_key
exchange_name, properties['delivery_mode'] = self._delivery_details(
exchange or self.exchange, delivery_mode,
)
# 准备body和body类型,编码
body, content_type, content_encoding = self._prepare(
body, serializer, content_type, content_encoding,
compression, headers)
# 使用message封装body
message = self.channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
)
...
# 利用channel发送消息
return channel.basic_publish(
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
timeout=timeout
)
Producer是对channel的业务封装,创建时候有channel则使用channel,没有channel则使用connection的default_channel。Producer发送消息的过程,完成exchange和message包装后,使用channel进行发送。
Consumer的构造函数和上下文:
class Consumer:
def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
callbacks=None, on_decode_error=None, on_message=None,
accept=None, prefetch_count=None, tag_prefix=None):
self.channel = channel
# Queue的列表
self.queues = maybe_list(queues or [])
self.no_ack = self.no_ack if no_ack is None else no_ack
# 消息的回调函数
self.callbacks = (self.callbacks or [] if callbacks is None
else callbacks)
# 自定义的消息处理方法
self.on_message = on_message
self.tag_prefix = tag_prefix
self._active_tags = {}
...
if self.channel:
self.revive(self.channel)
def revive(self, channel):
"""Revive consumer after connection loss."""
self._active_tags.clear()
channel = self.channel = maybe_channel(channel)
# modify dict size while iterating over it is not allowed
for qname, queue in list(self._queues.items()):
# name may have changed after declare
self._queues.pop(qname, None)
queue = self._queues[queue.name] = queue(self.channel)
# queue和channel绑定
queue.revive(channel)
...
def __enter__(self):
self.consume()
return self
Consumer和Producer类似,设置完属性后也要处理好channel,不同的是其中的queue(在producer中是exchange)和channel绑定并提供一个上下文环境。在上下文环境中进行消息消费:
def consume(self, no_ack=None):
tag = self._add_tag(queue, consumer_tag)
# 每个queue消息消息
for queue in self._queues:
queue.consume(tag, self._receive_callback,
no_ack=no_ack, nowait=nowait)
def _receive_callback(self, message):
accept = self.accept
on_m, channel, decoded = self.on_message, self.channel, None
try:
...
# 消息反序列化
decoded = None if on_m else message.decode()
except Exception as exc:
if not self.on_decode_error:
raise
self.on_decode_error(message, exc)
else:
return on_m(message) if on_m else self.receive(decoded, message)
def receive(self, body, message):
"""Method called when a message is received.
This dispatches to the registered :attr:`callbacks`.
Arguments:
body (Any): The decoded message body.
message (~kombu.Message): The message instance.
Raises:
NotImplementedError: If no consumer callbacks have been
registered.
"""
# 执行callback
callbacks = self.callbacks
...
# 默认就是body和message回传给业务函数
[callback(body, message) for callback in callbacks]
consumer可以使用多个queue,每个queue消费消息的时候可以使用覆盖处理函数或者使用系统的处理函数。一般情况下callback会获得到解码后的body和消息原文。如何持续的消费消息,在connection部分再介绍。
producer需要使用exchange,consumer需要使用queue,消息是通过exchange和queue搭桥传递的。Exchange和Queue有共同的父类MaybeChannelBound:
+-------------------+
| MaybeChannelBound |
+-------^-----------+
|
+----------------+----------------+
| |
+----+-----+ +---+---+
| Exchange | | Queue |
+----------+ +-------+
MaybeChannelBound约定了类对channel的绑定行为:
class MaybeChannelBound(Object):
_channel = None
_is_bound = False
def __call__(self, channel):
"""`self(channel) -> self.bind(channel)`."""
return self.bind(channel)
下面绑定channel的动作和是否绑定的判断也可以验证这一点。
def maybe_bind(self, channel):
"""Bind instance to channel if not already bound."""
if not self.is_bound and channel:
self._channel = maybe_channel(channel)
self.when_bound()
self._is_bound = True
return self
@property
def is_bound(self):
"""Flag set if the channel is bound."""
return self._is_bound and self._channel is not None
exchange对象的创建和绑定到channel:
class Exchange(MaybeChannelBound):
def __init__(self, name='', type='', channel=None, **kwargs):
super().__init__(**kwargs)
self.name = name or self.name
self.type = type or self.type
self.maybe_bind(channel)
...
创建完成的exchange对象需要进行申明,申明的过程就是让broker创建exchange的过程:
def declare(self, nowait=False, passive=None, channel=None):
"""Declare the exchange.
Creates the exchange on the broker, unless passive is set
in which case it will only assert that the exchange exists.
Argument:
nowait (bool): If set the server will not respond, and a
response will not be waited for. Default is :const:`False`.
"""
if self._can_declare():
passive = self.passive if passive is None else passive
# 依托于channel
return (channel or self.channel).exchange_declare(
exchange=self.name, type=self.type, durable=self.durable,
auto_delete=self.auto_delete, arguments=self.arguments,
nowait=nowait, passive=passive,
)
queue对象创建完成后也需要绑定到channel:
class Queue(MaybeChannelBound):
def __init__(self, name='', exchange=None, routing_key='',
channel=None, bindings=None, on_declared=None,
**kwargs):
super().__init__(**kwargs)
self.name = name or self.name
self.maybe_bind(channel)
...
然后申明queue,这个过程包括下面3个步骤:
def declare(self, nowait=False, channel=None):
"""Declare queue and exchange then binds queue to exchange."""
if not self.no_declare:
# - declare main binding.
self._create_exchange(nowait=nowait, channel=channel)
self._create_queue(nowait=nowait, channel=channel)
self._create_bindings(nowait=nowait, channel=channel)
return self.name
def _create_exchange(self, nowait=False, channel=None):
if self.exchange:
# 隐式申明exchange
self.exchange.declare(nowait=nowait, channel=channel)
def _create_queue(self, nowait=False, channel=None):
# 申明queue
self.queue_declare(nowait=nowait, passive=False, channel=channel)
if self.exchange and self.exchange.name:
# 绑定queue和exchange
self.queue_bind(nowait=nowait, channel=channel)
def _create_bindings(self, nowait=False, channel=None):
for B in self.bindings:
channel = channel or self.channel
B.declare(channel)
B.bind(self, nowait=nowait, channel=channel)
queue的申明也是让broker创建queue:
def queue_declare(self, nowait=False, passive=False, channel=None):
...
ret = channel.queue_declare(
queue=self.name,
passive=passive,
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete,
arguments=queue_arguments,
nowait=nowait,
)
...
queue比exchange多一个步骤就是bind到exchange。queue_bind的工作是让broker创建queue和exchange的关联关系。
def queue_bind(self, nowait=False, channel=None):
"""Create the queue binding on the server."""
return (channel or self.channel).queue_bind(
queue=self.name,
exchange=exchange,
routing_key=routing_key,
arguments=arguments,
nowait=nowait,
)
从Exchange和Queue的实现,我们可以知道生产者不用关心消费者的实现,只需要创建和申明exchange即可。消费者则是需要知道生产者,除了创建和申明queue后,还需要绑定queue和exchange的关系。又因为消费者和生产者在不同的进程,即使生成者创建了exchange,消费者也需要在本地隐式创建exchange对象。
消息对象,除了纯粹的数据结构外,也包含channel的引用,毕竟消息可以直接执行ack动作:
class Message:
def __init__(self, body=None, delivery_tag=None,
content_type=None, content_encoding=None, delivery_info=None,
properties=None, headers=None, postencode=None,
accept=None, channel=None, **kwargs):
# 通道,主要的API来源
self.channel = channel
# 投递标签,可以用来响应
self.delivery_tag = delivery_tag
...
self.headers = headers or {}
self.body = body
...
self._state = 'RECEIVED'
消息本身还带有四个状态:
RECEIVED
默认状态ACK
完成ack响应REJECTED
拒绝消息REQUEUED
重新投递消息其中 {'ACK', 'REJECTED', 'REQUEUED'}
三个状态的转换都需要使用channel进行操作broker,成功后再切换:
def ack(self, multiple=False):
# 回应ACK
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
self._state = 'ACK'
def reject(self, requeue=False):
# 拒绝(抛弃消息)
self.channel.basic_reject(self.delivery_tag, requeue=requeue)
self._state = 'REJECTED'
def requeue(self):
# 拒绝(退回消息)(和reject区别在requeue=True)
self.channel.basic_reject(self.delivery_tag, requeue=True)
self._state = 'REQUEUED'
消息上附带的信息,通过不同的load方法进行序列化:
from .serialization import loads
@property
def payload(self):
return loads(self.body, self.content_type,
self.content_encoding, accept=self.accept)
Connection负责管理producer/consumer到broker的网络连接:
class Connection:
def __init__(self, hostname='localhost', userid=None,
password=None, virtual_host=None, port=None, insist=False,
ssl=False, transport=None, connect_timeout=5,
transport_options=None, login_method=None, uri_prefix=None,
heartbeat=0, failover_strategy='round-robin',
alternates=None, **kwargs):
...
params = self._initial_params = {
'hostname': hostname, 'userid': userid,
'password': password, 'virtual_host': virtual_host,
'port': port, 'insist': insist, 'ssl': ssl,
'transport': transport, 'connect_timeout': connect_timeout,
'login_method': login_method, 'heartbeat': heartbeat
}
...
self._init_params(**params)
...
重点在_init_params中对各种支持AQMP协议的broker的管理, 比如redis,RobbitMQ:
def _init_params(self, hostname, userid, password, virtual_host, port,
insist, ssl, transport, connect_timeout,
login_method, heartbeat):
transport = transport or 'amqp'
if transport == 'amqp' and supports_librabbitmq():
transport = 'librabbitmq'
if transport == 'rediss' and ssl_available and not ssl:
logger.warning(
'Secure redis scheme specified (rediss) with no ssl '
'options, defaulting to insecure SSL behaviour.'
)
ssl = {'ssl_cert_reqs': CERT_NONE}
self.hostname = hostname
self.userid = userid
self.password = password
self.login_method = login_method
# 虚拟主机隔离
self.virtual_host = virtual_host or self.virtual_host
self.port = port or self.port
self.insist = insist
self.connect_timeout = connect_timeout
self.ssl = ssl
# 传输类
self.transport_cls = transport
self.heartbeat = heartbeat and float(heartbeat)
配置完connection信息后,就需要创建网络连接。这个过程通过调用connection属性或者default_channel属性时候自动创建:
@property
def connection(self):
"""The underlying connection object.
Warning:
This instance is transport specific, so do not
depend on the interface of this object.
"""
if not self._closed:
if not self.connected:
# 创建连接
return self._ensure_connection(
max_retries=1, reraise_as_library_errors=False
)
return self._connection
@property
def default_channel(self):
"""Default channel.
Created upon access and closed when the connection is closed.
Note:
Can be used for automatic channel handling when you only need one
channel, and also it is the channel implicitly used if
a connection is passed instead of a channel, to functions that
require a channel.
"""
# make sure we're still connected, and if not refresh.
conn_opts = self._extract_failover_opts()
# 创建连接
self._ensure_connection(**conn_opts)
if self._default_channel is None:
self._default_channel = self.channel()
return self._default_channel
连接创建完成后,继续创建channel:
def channel(self):
"""Create and return a new channel."""
self._debug('create channel')
chan = self.transport.create_channel(self.connection)
return chan
def create_transport(self):
# 创建传输连接
return self.get_transport_cls()(client=self)
def get_transport_cls(self):
"""Get the currently used transport class."""
transport_cls = self.transport_cls
if not transport_cls or isinstance(transport_cls, str):
transport_cls = get_transport_cls(transport_cls)
return transport_cls
创建broker的连接过程,是通过transport的创建,其中细节涉及对不同类型的broker服务的适配,内容挺多,我们下一章再进行解析。
Matcher负责处理消息的匹配机制,serialization复杂消息的序列化。两者的实现方式类似,都使用注册中心模式+策略模式实现。
Matcher的注册中心:
class MatcherRegistry:
"""Pattern matching function registry."""
"""匹配器的注册中心"""
MatcherNotInstalled = MatcherNotInstalled
matcher_pattern_first = ["pcre", ]
def __init__(self):
self._matchers = {}
self._default_matcher = None
#: Global registry of matchers.
registry = MatcherRegistry()
注册glob(模糊)模式和pcre(正则)模式两种策略:
def register_glob():
"""Register glob into default registry."""
"""使用glob(通配符)匹配"""
registry.register('glob', fnmatch)
def register_pcre():
"""Register pcre into default registry."""
"""使用正则匹配"""
registry.register('pcre', rematch)
# Register the base matching methods.
register_glob()
register_pcre()
匹配消息的方法,就是使用模式进行识别:
def match(self, data, pattern, matcher=None, matcher_kwargs=None):
"""Call the matcher."""
if matcher and not self._matchers.get(matcher):
raise self.MatcherNotInstalled(
f'No matcher installed for {matcher}'
)
# 默认使用通配符匹配
match_func = self._matchers[matcher or 'glob']
# 通配符和正则匹配的传参先后顺序有差异
if matcher in self.matcher_pattern_first:
first_arg = bytes_to_str(pattern)
second_arg = bytes_to_str(data)
else:
first_arg = bytes_to_str(data)
second_arg = bytes_to_str(pattern)
return match_func(first_arg, second_arg, **matcher_kwargs or {})
Serializer的注册中心:
class SerializerRegistry:
"""The registry keeps track of serialization methods."""
"""序列化方法的注册中心"""
def __init__(self):
self._encoders = {}
self._decoders = {}
self._default_encode = None
self._default_content_type = None
self._default_content_encoding = None
# 记录禁用的编解码类型
self._disabled_content_types = set()
# 双向字典,可以进行互查
self.type_to_name = {}
self.name_to_type = {}
# 全局单例,并且导出函数绑定,使用API更简介
registry = SerializerRegistry()
dumps = registry.dumps
loads = registry.loads
register = registry.register
unregister = registry.unregister
json, yaml, pickle和msgpack四种序列化策略的注册:
def register_json():
"""Register a encoder/decoder for JSON serialization."""
from kombu.utils import json as _json
registry.register('json', _json.dumps, _json.loads,
content_type='application/json',
content_encoding='utf-8')
def register_yaml():
"""Register a encoder/decoder for YAML serialization.
It is slower than JSON, but allows for more data types
to be serialized. Useful if you need to send data such as dates
"""
import yaml
registry.register('yaml', yaml.safe_dump, yaml.safe_load,
content_type='application/x-yaml',
content_encoding='utf-8')
def register_pickle():
"""Register pickle serializer.
The fastest serialization method, but restricts
you to python clients.
"""
def pickle_dumps(obj, dumper=pickle.dumps):
return dumper(obj, protocol=pickle_protocol)
registry.register('pickle', pickle_dumps, unpickle,
content_type='application/x-python-serialize',
content_encoding='binary')
def register_msgpack():
"""Register msgpack serializer.
See Also:
https://msgpack.org/.
"""
pack = unpack = None
import msgpack
from msgpack import packb, unpackb
def pack(s):
return packb(s, use_bin_type=True)
def unpack(s):
return unpackb(s, raw=False)
registry.register(
'msgpack', pack, unpack,
content_type='application/x-msgpack',
content_encoding='binary',
)
register_json()
register_pickle()
register_yaml()
register_msgpack()
反序列化的使用:
# kombu-5.0.0/kombu/serialization.py:285
# 导出策略
loads = registry.loads
# kombu-5.0.0/kombu/message.py:10
from .serialization import loads
class Message:
def _decode(self):
# 使用策略反序列化message-body
return loads(self.body, self.content_type,
self.content_encoding, accept=self.accept)
通过kombu的Producer可以发送消息到broker,使用Comsumer则可以消费消息。发送消息的时候需要使用Exchange,用来将消费分发到不同的目标Queue;消费消息的时候,需要使用Queue,Queue还需要通过绑定的方式和Exchange关联起来。Exchange和Queue都是使用底层的channel进行数据传输,所以需要进绑定(binding);还需要在远程的broker中创建,所以创建后的的Exchange和Queue需要进行申明(declare)。消息会附带上投递信息,进行序列化后从生产者到broker转发给消费者,消费者再使用投递信息上的序列化约定,将消息反序列成业务信息。
pickle不仅支持数据接口的序列化,还支持函数的序列化:
python3
Python 3.8.5 (v3.8.5:580fbb018f, Jul 20 2020, 12:11:27)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pickle
>>>
>>> def hello(msg):
... print("hello", msg)
...
>>> p = pickle.dumps(hello)
>>> p
b'\x80\x04\x95\x16\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x05hello\x94\x93\x94.'
>>>
>>> q = pickle.loads(p)
>>>
>>> q("python")
hello python
>>>
上面的hello函数可以通过pickle打包,再重新解包执行。利用这个机制使用kombu,可以将producer进程的函数发送到consumer进程远程执行。pickle支持的数据类型还挺丰富,官方文档中介绍包括下面多种类型:
The following types can be pickled:
* None, True, and False
* integers, floating point numbers, complex numbers
* strings, bytes, bytearrays
* tuples, lists, sets, and dictionaries containing only picklable objects
* functions defined at the top level of a module (using def, not lambda)
* built-in functions defined at the top level of a module
* classes that are defined at the top level of a module
* instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details).
Object
提供了一种快速构建对象的方法:
class Object:
"""Common base class.
Supports automatic kwargs->attributes handling, and cloning.
"""
attrs = ()
def __init__(self, *args, **kwargs):
# attrs 在子类中定义
for name, type_ in self.attrs:
value = kwargs.get(name)
# 从字典参数给属性动态赋值
if value is not None:
setattr(self, name, (type_ or _any)(value))
else:
try:
getattr(self, name)
except AttributeError:
setattr(self, name, None)
Queue展示了这种方式的示例,比如max_length属性:
class Queue(MaybeChannelBound):
attrs = (
..
('max_length', int),
...
)
def __init__(self, name='', exchange=None, routing_key='',
channel=None, bindings=None, on_declared=None,
**kwargs):
self.name = name or self.name
...
def queue_declare(self, nowait=False, passive=False, channel=None):
...
queue_arguments = channel.prepare_queue_arguments(
self.queue_arguments or {},
expires=self.expires,
message_ttl=self.message_ttl,
max_length=self.max_length,
max_length_bytes=self.max_length_bytes,
max_priority=self.max_priority,
)
...
在Queue的构造函数中并没有定义max_length属性,但是queue_declare中却可以直接使用这个属性,可以对比name属性感受一下差异。这对我们简化定义属性很多的对象有帮助,比如一些配置类。
itertools.count
提供了一种通过迭代器生成递增ID的方法:
>>> from itertools import count
>>>
>>> for i in count():
... if i % 10 == 0:
... print(i)
... if i>50:
... break
...
0
10
20
30
40
50
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/dNIDpQwvbAKXjmgegnzWnA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。