Node.js 有多重并发的能力,包括单线程异步、多线程、多进程等,这些能力可以根据业务进行不同选择,帮助提高代码的运行效率。
本文希望通过读 p-limit、pm2 和 worker_threads 的一些代码,来了解 Node.js 的并发能力。
版本说明
- Node.js 15.4.0
- Npm: 7.0.15
Node.js 最常用的并发手段就是异步,不因为资源的消耗而阻塞程序的执行。
从逻辑上讲,异步并不是为了并发,而是为了不阻塞主线程。但是我们却可以同时发起多个异步操作,来起到并发的效果,虽然计算的过程是同步的。
当性能的瓶颈是 I/O 操作,比如查询数据库、读取文件或者是访问网络,我们就可以使用异步的方式,来完成并发。而由于计算量比较小,所以不会过多的限制性能。每当这个时候,你只需要默默担心下游的 QPS 就好了。
以 I/O 操作为主的应用,更适合用 Node.js 来做,比如 Web 服务中同时执行 M 个 SQL,亦或是离线脚本中同时访问发起 N 个 RPC 服务。
所以在代码中使用 async/await 的确很舒服,但是适当的合并请求,使用 Promise.all 才能提高性能。
一旦你习惯了 Promise.all,同时了解了 EventLoop 的机制,你会发现 I/O 请求的限制往往在下游。因为对于 Node.js 来说,同时发送 10 个 RPC 请求和同时发送 100 个 RPC 请求的成本差别并不大,都是“发送-等待”的节奏,但是下游的“供应商”是会受不了的,这时你需要限制并发数。
常用限制并发数的 Npm 包是 p-limit,大致用法如下。
const fns = [
fetchSomething1,
fetchSomething2,
fetchSomething3,
];
const limit = pLimit(10);
Promise.all(
fns
.map(fn =>
limit(async () => {
await fn() // fetch1/2/3
})
) // map
); // Promise.all
为了深入了解,我们看一段 p-limit 的源码,具体如下。
const pLimit = concurrency => {
// ...
const queue = new Queue();
let activeCount = 0;
// ...
const enqueue = (fn, resolve, ...args) => {
queue.enqueue(run.bind(null, fn, resolve, ...args));
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.size < 0) {
queue.dequeue()();
}
})();
};
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, ...args);
});
// ...
return generator;
};
稍微解释一下上面的代码:
a.调用一次 pLimit 会生成一个限制并发的函数 generator
b.多个 generator 函数会共用一个队列
c. activeCount 需要小于 concurrency 2. pLimit 的实现依据队列(yocto-queue)
a. 队列有两个方法:equeue 和 dequeue,equeue 负责进入队列
b. 每个 generator 函数执行会将一个函数压如队列
c. 当发现 activeCount 小于最大并发数时,则调用 dequeue 弹出一个函数,并执行它。 3. 每次被压入队列的不是原始函数,而是经过 run 函数处理的函数。
// run 函数
const run = async (fn, resolve, ...args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
// next 函数
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
i . 让 activeCount +1
ii . 执行异步函数 fn,并将结果传递给 resolve
a. 为保证 next 的顺序,采用了 await result
iii. 调用 next 函数 2. 函数 next 做两件事情
i. 让 activeCount -1
ii. 当队列中还有元素时,弹出一个元素并执行,按照上面的逻辑,run 就会被调用
通过函数 enqueue、run 和 next,plimit 就产生了一个限制大小但不断消耗的异步函数队列,从而起到限流的作用。
更详细的 p-limit 使用:Node 开发中使用 p-limit 限制并发原理[1]
pPromise 并没有处理超时,简单的办法是可以使用 setTimeout 实现一个。
let timer = null;
const timerPromise = new Promise((resolve, reject) => {
timer = setTimeout(() => {
reject('time out');
}, 1000);
});
Promise.all([
timerPromise,
fetchPromise,
])
.then(res => clearTimeout(timer))
.catch(err => console.error(err));
如果想看更正规的写法,可以参照 p-timeout 的代码,下面是一段的截取。
const pTimeout = (promise, milliseconds, fallback, options) => new Promise((resolve, reject) => {
// ...
const timer = options.customTimers.setTimeout.call(undefined, () => {
if (typeof fallback === 'function') {
try {
resolve(fallback());
} catch (error) {
reject(error);
}
return;
}
const message = typeof fallback === 'string' ? fallback : `Promise timed out after ${milliseconds} milliseconds`;
const timeoutError = fallback instanceof Error ? fallback : new TimeoutError(message);
// ...
reject(timeoutError);
}, milliseconds);
(async () => {
try {
resolve(await promise);
} catch (error) {
reject(error);
} finally {
options.customTimers.clearTimeout.call(undefined, timer);
}
})();
});
p-limit 做了更多的校验和更好的封装:
把超时和主程序封装在一个 Promise 中
更利于用户理解
灵活度更高:如果使用 Promise.all 只能通过 reject 表示超时,而 p-limit 可以通过 resolve 和 reject 两个方式触发超时
对于超时后的错误提示做了封装
用户可以指定错误信息
超时可以触发特定的错误,或者是指定的函数
clearTimeout 加在 finally 中的写法更舒服
为了方便追踪异步资源,我们可以使用 async_hooks 模块。
The async_hooks module provides an API to track asynchronous resources.
在 NodeJS 中,一个异步资源表示为一个关联回调函数的对象。有以下几个特点:
回调可以被多次调用(比如反复打开文件,多次创建网络连接);
资源可以在回调被调用之前关闭;
AsyncHook 更多的是异步抽象,而不会去管理这些异步的不同。
当多个 Worker 使用时,每个线程会创建自己的 async_hooks 的接口。
https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html
先看一段 async_hooks 的代码
const fs = require('fs');
const asyncHooks = require('async_hooks');
let indent = 0;
const asyncHook = asyncHooks.createHook({
init(asyncId, type, triggerAsyncId, resource) {
const eid = asyncHooks.executionAsyncId();
const indentStr = ' '.repeat(indent);
fs.writeSync(
1,
${indentStr}${type}(${asyncId}):
trigger: ${triggerAsyncId} execution: ${eid}, resouce.keys: ${Object.keys(resource)}\n);
},
before(asyncId) {
const indentStr = ' '.repeat(indent);
fs.writeSync(1, ${indentStr}before: ${asyncId}\n);
indent += 2;
},
after(asyncId) {
indent -= 2;
const indentStr = ' '.repeat(indent);
fs.writeSync(1, ${indentStr}after: ${asyncId}\n);
},
destroy(asyncId) {
const indentStr = ' '.repeat(indent);
fs.writeSync(1, ${indentStr}destroy: ${asyncId}\n);
},
});
asyncHook.enable();
Promise.resolve('ok').then(() => {
setTimeout(() => {
console.log('>>>', asyncHooks.executionAsyncId());
}, 10);
});
运行结果如下。
asyncHook.enable() / asyncHook.disable():打开/关闭 Async Hooks
Hook callbacks:当资源进入不同阶段,下面的函数会被调用
init:被声明时调用
before:声明之后、执行之前调用
after:异步执行完成后立即调用
destroy:异步资源被销毁时被调用
变量
asyncId:异步的 ID,每一次异步调用会使用唯一的 id,Hook callbacks 的方法,可以使用 asyncId 串起来。
triggerAsyncId: 触发当前 asyncId 的 asyncId。
使用 asyncId 和 triggerAsyncId 可以完整的追踪到异步调用的顺序
其中根节点 root 是 1。
上面代码的调用顺序:1 -> 2 -> 3 -> 4 -> 5,6,7
映射代码上就是:root -> Promise.resolve -> Promise.then -> setTimeout -> console.log
在上面的 init 方法中 type 参数标明了资源类型,type 类型有 30 多种,具体可以参看下面的链接。
https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html#async_hooks_type
本次程序主要用到了下面几种:
PROMISE:Promise 对象
Timeout:setTimeout 使用
TTYWRAP:console.log
SIGNALWRAP:console.log
TickObject:console.log
不要在 Async Hooks 的方法中使用异步函数,或者会引发异步的函数,如 console.log。因为 Async Hooks 方法就是在监控异步,而自身使用异步函数,会导致自己调用自己。
如果想打印输出怎么办?
好的解决办法是使用 fs.writeSync 或者 fs.writeFileSync,即同步输出的办法。
异步在 I/O 资源的利用上可以实现并发, 但是异步无法并发的使用 CPU 资源。多进程才能更好地利用多核操作系统的优点。
Node.js 使用 Cluster 模块来完成多进程,我们可以通过 pm2 的代码来了解多进程,可以先从下面两个文件入手:
lib/God.js 和 lib/God/ClusterMode.js。
// lib/God.js
// ...
cluster.setupMaster({
windowsHide: true,
exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js')
});
// ...
// lib/God/ClusterMode.js
module.exports = function ClusterMode(God) {
// ...
try {
clu = cluster.fork({
pm2_env: JSON.stringify(env_copy),
windowsHide: true
});
} catch(e) {
God.logAndGenerateError(e);
return cb(e);
}
// ...
};
上面两端代码主要讲了 cluster 的两个基本函数:
setupMaster
fork
简单理解,就是 setupMaster 用于设置,而 fork 用于创建子进程。比如下面的例子。
const cluster = require('cluster');
cluster.setupMaster({
exec: 'worker.js',
args: ['--use', 'https'],
silent: true
});
cluster.fork();
进程间的通信使用的是事件监听来通信。
const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
const worker = cluster.fork();
[
'error',
'exit',
'listening',
'message',
'online'
].forEach(workerEvent => {
worker.on(workerEvent, msg => {
console.log([${workerEvent}] from worker:, msg);
});
});
} else {
http.createServer(function(req, res) {
process.send(${req.url});
res.end(Hello World: ${req.url});
}).listen(8000);
}
运行后,访问:http://localhost:8000/ 后结果如下:
通过 process.send,子进程可以给主进程发送信息,发送的信息可以是字符串,或者是可以进行 JSONStringify 的对象。而如果一个对象不能 JSONStringify,则会报错,比如下面这段代码。
http.createServer(function(req, res) {
process.send(req);
res.end(Hello World: ${req.url});
}).listen(8000);
会报错:
这就意味着 Cluster 的通信是消息通信,但是没办法共享内存。(貌似就是进程的定义,但是强调一下没什么坏处)
可以通过 Cluster 模块对子进程进行设置。
execArgv:执行参数
exec:执行命令,包含可执行文件、脚本文件、参数。
args: 执行参数
cwd:执行目录
serialization: 使传递数据支持高级序列化,比如 BigInt、Map、Set、ArrayBuffer 等 JavaScript 内嵌类型
silent:是否沉默,如果设置为 true,子进程的输出就被屏蔽了
uid:子进程的 uid
gid:子进程的 gid
inspectPort:子线程的 inspect 端口
可以参看:nodejs 如何使用 cluster 榨干机器性能[2]
如果想要共享内存,就需要多线程,Node.js 引入了 worker_threads 模块来完成多线程。
假设有一个 server.js 的文件。
const http = require('http');
const runServer = port => {
const server = http.createServer((_req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
const msg = `server on ${port}`;
console.log(msg);
res.end(msg);
});
server.listen(port);
};
module.exports.runServer = runServer;
通过 cluster 监听端口,可以如下。
const cluster = require('cluster');
const { runServer } = require('./server');
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < 4; i ++) {
cluster.fork();
}
} else {
console.log(`worker${cluster.worker.id}: ${cluster.worker.process.pid}`);
runServer(3000);
}
const { Worker, isMainThread } = require('worker_threads');
const { runServer } = require('./server');
console.log('isMainThread', isMainThread);
if (isMainThread) {
for (let i = 0; i < 3; i ++) {
new Worker(__filename);
}
} else {
runServer(4000);
}
结果如下。
我们没办法在一个进程中监听多个端口,具体可以查看 Node.: 中 net.js 和 cluster.js 做了什么。
那么 Worker Threads 优势在哪?
Worker Threads 更擅长通信,这是线程的优势,不仅是可以消息通信,还可以共享内存。
具体可以看:多线程 worker_threads 如何通信[3]
子线程通过 Worker 实例管理,而下面介绍实例化中的几个重要参数。
maxOldGenerationSizeMb:子线程中栈的最大内存
maxYoungGenerationSizeMb:子线程中创建对象的堆的最大内存
codeRangeSizeMb:生成代码消耗的内存
stackSizeMb:该线程默认堆的大小
如果这 stdout/stderr/stdin 设置为 true,子线程会有独立的管道输出,而不会把 out/err/in 合并到父进程。
workerData: 父线程传递给子线程的数据,必须要通过 require('worker_threads').workerData 获取。
argv: 父线程传递给子线程的参数,子线程通过 process.argv 获取。
execArgv: Node 的执行参数。
env: 父线程传递给子线程的环境,通过 process.env 可以获取。
SHARE_ENV:指定父线程和子线程可以共享环境变量
作为 Web 服务,提高并发数,选择 Cluster 更好;
作为脚本,希望提高并发,选择 Worker Threads 更好;
当计算不是瓶颈,在某个进程或线程中,灵活异步的使用更好。
[1]Node 开发中使用 p-limit 限制并发原理: https://tech.bytedance.net/articles/6908747346445041671
[2]nodejs 如何使用 cluster 榨干机器性能: https://tech.bytedance.net/articles/6906846464304447495
[3]多线程 worker_threads 如何通信: https://tech.bytedance.net/articles/6907111611668889608
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/MwRqSeGfeQDDkVs-Ejpx7w
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。