gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,我们可以把之前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:
gunicorn 源码选择的版本是 20.0.0
,主要的文件及包如下:
文件 | 描述 |
---|---|
app包 | guincorn 的 Application (不是wsgi定义的applicaton) |
http包 | gunicorn 对 http协议的一些处理 |
workers包 | gunicorn 的工作类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等 |
arbiter.py | guicorn 的master实现 |
根据gunicorn的设计特点:
Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.
gunicorn使用pre-fork 工作模型,也就是master提前fork出预定数量的work,管理worker集合。所有的request和response都由worker进程处理。
我们重点放在:gunicorn的服务实现,master-worker如何实现和协作上。
编写测试app,可以看到这是一个符合wsgi规范的application:
# myapp.py
def app(environ, start_response): # env 和 http 状态及头设定回调
data = b"Hello, World!\n"
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data]) # 返回数据
使用4个work节点,日志级别debug的方式启动服务,加载 myapp:app
# gunicorn -w 4 --log-level debug myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration: # 准备配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0 # 启动gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted # 启动master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462) # 监听端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers
使用 curl
测试服务
# curl http://127.0.0.1:8000
Hello, World!
同时gunicorn中可以看到 worker=50465 处理了这个http请求
[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
运行时候,还可以通过发送信号,手动扩充work节点数
# kill -TTIN 50462
观察服务日志,会发现 master=50462 进程处理了 ttin
信号,并且扩展worker节点数到5
...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers
使用 Ctrl+C
关闭服务,可以看到也是 master=50462 进程处理了 int
信号,并且在关闭worker节点后关闭自己
^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
如果对gunicon的参数不了解,可以使用下面命令查看帮助
# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]
optional arguments:
-h, --help show this help message and exit
...
-w INT, --workers INT
The number of worker processes for handling requests. [1]
帮助使用我们熟悉的 argparse 实现。
class Setting(object):
def add_option(self, parser):
args = tuple(self.cli)
help_txt = "%s [%s]" % (self.short, self.default)
help_txt = help_txt.replace("%", "%%")
kwargs = {
"dest": self.name,
"action": self.action or "store",
"type": self.type or str,
"default": None,
"help": help_txt
}
...
parser.add_argument(*args, **kwargs) # 添加选项
class Workers(Setting): # --workers 的选项类
name = "workers"
section = "Worker Processes"
cli = ["-w", "--workers"]
meta = "INT"
validator = validate_pos_int
type = int
default = int(os.environ.get("WEB_CONCURRENCY", 1))
desc = """\
The number of worker processes for handling requests.
A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
You'll want to vary this a bit to find the best for your particular
application's work load.
By default, the value of the ``WEB_CONCURRENCY`` environment variable.
If it is not defined, the default is ``1``.
"""
def parser(self):
kwargs = {
"usage": self.usage,
"prog": self.prog
}
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument("-v", "--version",
action="version", default=argparse.SUPPRESS,
version="%(prog)s (version " + __version__ + ")\n",
help="show program's version number and exit")
parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)
keys = sorted(self.settings, key=self.settings.__getitem__) # 动态添加参数选项
for k in keys:
self.settings[k].add_option(parser)
return parser
gunicorn的application主要是下面三个类实现。需要注意的是这里的application可以理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。
3个Application梳理后,大概的代码模版如下:
class WSGIApplication(Application)
def __init__(self, usage=None, prog=None):
self.do_load_config() # 加载配置
def do_load_config():
...
cfg = self.init(parser, args, args.args) # 初始化配置
...
def init(...):
...
self.app_uri = args[0] # 获取wsgi-application参数
def load(...):
util.import_app(self.app_uri) # 动态加载wsgi-application
...
def run(...):
self.load()
Arbiter(self).run() # 启动master,也就是Arbiter
def run(): # 运行服务
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
if __name__ == '__main__':
run()
application部分的实现,相对比较简单,就不再赘述。
Arbiter 仲裁者,事实上的master进程核心,整理后代码模版如下:
class Arbiter(object):
def __init__(self, app):
self.worker_class = self.cfg.worker_class # worker类
self.num_workers = self.cfg.worker # worker数量
...
def start():
self.init_signals() # 初始化信号监听
...
sock.create_socket(...) # 创建socket服务
def run(self):
self.start()
try:
self.manage_workers() # 启动节点
while True: # 无限循环
...
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep() # 持续休眠
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
# 处理信号
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
...
handler()
self.wakeup() # 唤醒
except (StopIteration, KeyboardInterrupt):
...
在了解Arbiter工作前先了解一下信号, linux 系统可以使用下面命令查看信号清单
# kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
...
信号是操作系统提供的事件,可以用来进行跨进程的通信。Arbiter.init_signals 做的工作如下:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
def init_signals(self):
...
# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld) # 添加信号监听器
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
之前演示的扩容信号 TTIN
是这样处理的 :
def handle_ttin(self):
"""\
SIGTTIN handling.
Increases the number of workers by one.
"""
self.num_workers += 1 # 扩容
self.manage_workers() # 管理worker
Arbiter的sleep和warkeup是这样实现的:
self.PIPE = pair = os.pipe() # 创建管道
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0) # 使用select监听管道的数据变化
if not ready[0]:
return
while os.read(self.PIPE[0], 1): # 读取管道数据
pass
except (select.error, OSError) as e:
...
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.') # 管道写入
except IOError as e:
...
需要说明的是Arbiter通过 sock.create_sockets
创建了socket,并绑定端口和监听,然后在fork-worker的时候,将socket传递给了子进程。
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid # 记录worker的pid
self.WORKERS[pid] = worker # 添加到worker集合
return pid
销毁worker是使用信号:
def kill_workers(self, sig):
"""\
Kill all workers with the signal `sig`
:attr sig: `signal.SIG*` value
"""
worker_pids = list(self.WORKERS.keys())
for pid in worker_pids:
os.kill(pid, sig)
接下来,我们看看worker,主要是sync-worker的实现。worker的关系主要如下:
接之前Arbiter中fork-worker的代码,创建完成的work进入 init_process
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
work的init_process模版如下:
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super().init_process() so that the ``run()`` loop is initiated.
"""
# For waking ourselves up
self.PIPE = os.pipe() # 创建管道
...
self.wait_fds = self.sockets + [self.PIPE[0]] # 监听管道和socket
...
self.init_signals() # 初始化信号监听
...
self.load_wsgi() # 加载wsgi的应用
...
# Enter main run loop
self.booted = True
self.run() # 工作循环
work一样的进行信号监听:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
# reset signaling
for s in self.SIGNALS:
signal.signal(s, signal.SIG_DFL)
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
...
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1]) # 等待select唤醒
work最重要的run循环:
def run(self, timeout):
listener = self.sockets[0]
while self.alive:
...
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener) # 接受客户端链接
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
...
try:
self.wait(timeout) # 休眠等待
except StopWaiting:
return
处理客户端连接,这一部分和之前介绍http比较类似,也不再赘述。
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
work处理完成请求后进入等待
def wait(self, timeout):
try:
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
可以用下面一张图展示gunicorn的工作流程,作为我们的小结论
可以使用thread,实现一个定时器
# reloader.py
class Reloader(threading.Thread):
def __init__(self, extra_files=None, interval=1, callback=None):
super().__init__()
self.setDaemon(True)
self._interval = interval
self._callback = callback
def run(self):
mtimes = {}
while True:
for filename in self.get_files():
try:
mtime = os.stat(filename).st_mtime
except OSError:
continue
old_time = mtimes.get(filename)
if old_time is None:
mtimes[filename] = mtime
continue
elif mtime > old_time:
if self._callback:
self._callback(filename)
time.sleep(self._interval)
在使用 gunicorn myapp:app
命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:
# util.py
klass = components.pop(-1)
mod = importlib.import_module('.'.join(components))
return getattr(mod, klass)
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/HAdihtS1BRNQNr_4WtyJlA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。