Node.js 官方提供了[Cluster](https://nodejs.org/dist/latest-v16.x/docs/api/cluster.html)和[Child process](https://nodejs.org/dist/latest-v16.x/docs/api/child_process.html)创建子进程,通过[Worker threads](https://nodejs.org/dist/latest-v16.x/docs/api/worker_threads.html)模块创建子线程。但前者无法共享内存,通信必须使用 JSON 格式,有一定的局限性和性能问题。后者更轻量,并且可以共享内存,通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来做到这一点,即数据格式没有太多要求。但是要注意,数据中不能包含函数。
  Worker threads 从 Node V12 开始成为正式标准,其对于执行 CPU 密集型的操作很有用,而对 I/O 密集型工作没有多大帮助。 Node.js 内置的异步 I/O 操作要比它效率更高。注意,Worker threads 是基于 Node.js 架构的多工作线程,如下图所示。在每个工作线程中,都会包含 V8 和 libuv,即都包含Event Loop。
:-: ![](https://img.kancloud.cn/e8/36/e8366cc7d9932bcd4fd41edfb891726b_2400x1280.jpeg =800x)
## 一、线程池
  创建、执行、销毁一个 Worker 的开销是很大的,所以需要实现一个线程池(Worker Pool),在初始化时创建有限数量的 Worker 并加载单一的 worker.js,主线程和 Worker 可进行进程间通信,当所有任务完成后,这些 Worker 将会被统一销毁。
  在 Worker 中通过 parentPort.postMessage() 向主线程发送消息,而在主线程中可以通过 worker.on('message') 接收发送过来的消息,worker 是一个 Worker 实例,例如 new Worker(filePath)。
  下面是一个官方示例,isMainThread 可判断当前是否是主线程,workerData 是传递给 Worker 的数据。
~~~
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
module.exports = function parseJSAsync(script) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename, {
workerData: script
});
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
});
});
};
} else {
const script = workerData;
parentPort.postMessage(script);
}
~~~
  下面是一个线程池示例,参考自《[worker\_threads 初体验](https://blog.skk.moe/post/say-hello-to-nodejs-worker-thread/)》一文,做了微调,具体在此不在赘述,可阅读原文或注释。
~~~
// 获取当前设备的 CPU 线程数目,作为 numberOfThreads 的默认值。
const { length: cpusLength } = require('os').cpus();
const { Worker } = require('worker_threads');
class WorkerPool {
constructor(workerPath, options = {}, numberOfThreads = cpusLength) {
if (numberOfThreads < 1) {
throw new Error('Number of threads should be greater or equal than 1!');
}
this.workerPath = workerPath;
this.numberOfThreads = numberOfThreads;
// 任务队列
this._queue = [];
// Worker 索引
this._workersById = {};
// Worker 激活状态索引
this._activeWorkersById = {};
// 创建 Workers
for (let i = 0; i < this.numberOfThreads; i++) {
const worker = new Worker(workerPath, options);
this._workersById[i] = worker;
// 将这些 Worker 设置为未激活状态
this._activeWorkersById[i] = false;
}
}
/**
* 检查空闲的 Worker
*/
getInactiveWorkerId() {
for (let i = 0; i < this.numberOfThreads; i++) {
if (!this._activeWorkersById[i]) return i;
}
return -1;
}
/**
* 调用 Worker 执行,目的是在指定的 Worker 里执行指定的任务
*/
runWorker(workerId, taskObj) {
const worker = this._workersById[workerId];
// 当任务执行完毕后执行
const doAfterTaskIsFinished = () => {
// 去除所有的 Listener,不然一次次添加不同的 Listener 会内存溢出(OOM)
worker.removeAllListeners('message');
worker.removeAllListeners('error');
// 将这个 Worker 设为未激活状态
this._activeWorkersById[workerId] = false;
if (this._queue.length) {
// 任务队列非空,使用该 Worker 执行任务队列中第一个任务
this.runWorker(workerId, this._queue.shift());
}
};
// 将这个 Worker 设置为激活状态
this._activeWorkersById[workerId] = true;
// 设置两个回调,用于 Worker 的监听器
const messageCallback = result => {
taskObj.cb(null, result);
doAfterTaskIsFinished();
};
const errorCallback = error => {
taskObj.cb(error);
doAfterTaskIsFinished();
};
// 为 Worker 添加 'message' 和 'error' 两个 Listener
worker.once('message', messageCallback);
worker.once('error', errorCallback);
// 将数据传给 Worker 供其获取和执行
worker.postMessage(taskObj.data);
}
/**
* 运行线程
*/
run(data) {
// Promise 是个好东西
return new Promise((resolve, reject) => {
// 调用 getInactiveWorkerId() 获取一个空闲的 Worker
const availableWorkerId = this.getInactiveWorkerId();
const taskObj = {
data,
cb: (error, result) => {
// 虽然 Workers 需要使用 Listener 和 Callback,但这不能阻止我们使用 Promise,对吧?
// 不,你不能 util.promisify(taskObj) 。人不能,至少不应该。
if (error) reject(error);
return resolve(result);
}
};
if (availableWorkerId === -1) {
// 当前没有空闲的 Workers 了,把任务丢进队列里,这样一旦有 Workers 空闲时就会开始执行。
this._queue.push(taskObj);
return null;
}
// 有一个空闲的 Worker,用它执行任务
this.runWorker(availableWorkerId, taskObj);
})
}
/**
* 销毁
*/
destroy(force = false) {
for (let i = 0; i < this.numberOfThreads; i++) {
if (this._activeWorkersById[i] && !force) {
// 通常情况下,不应该在还有 Worker 在执行的时候就销毁它,这一定是什么地方出了问题,所以还是抛个 Error 比较好
// 不过保留一个 force 参数,总有人用得到的
throw new Error(`The worker ${i} is still runing!`);
}
// 销毁这个 Worker
this._workersById[i].terminate();
}
}
}
module.exports = WorkerPool;
~~~
## 二、实践
  之所以需要多线程,是为了解决一个优化需求。就是有一个接口,里面有很多查询数据库(MySQL和MongoDB)的操作,单条语句并不会慢,但累加后整体的响应速度就会变慢,那么就想通过多线程,同时处理一些查询语句,然后整合结果。
  先对线程池做最简单的处理,创建 worker.js,接收 userId。
~~~
const { isMainThread, parentPort } = require('worker_threads');
// 不是主线程时执行
if (!isMainThread) {
parentPort.on('message', async ({userId }) => {
console.log('postMessage', userId);
parentPort.postMessage(userId);
});
}
~~~
  然后初始化线程池,将数组中的 userId 传递给 Worker,pool.run({ userId: item })。
~~~
const WorkerPool = require('./workerPool');
const { join } = require('path');
async function workerMain(services) {
const workerPath = join(__dirname + '/worker.js');
// 初始化一个 Worker Pool
const pool = new WorkerPool(workerPath);
Promise.all([4,12,13,15].map(async item => {
await pool.run({ userId: item });
})).then(json => {
// 销毁线程池
pool.destroy();
});
}
~~~
  输出顺序没有按照数组的顺序,并且每次的输出顺序还都是不同的,由此可知,代码是并发运行的。
~~~
postMessage 12
postMessage 4
postMessage 15
postMessage 13
~~~
  那么接下来就引入数据库查询的代码,公司项目基于[sequelize.js](https://sequelize.org/)封装了增删改查的逻辑,通过 services 变量可以调用相关的操作。在主线程中,计划将 services 传递到 Worker 中。
~~~
async function workerMain(services) {
// Worker Threads 不能共享实例以及带函数的对象
const workerPath = join(__dirname + '/worker.js', { workerData: services });
// 初始化一个 Worker Pool
const pool = new WorkerPool(workerPath);
// 省略代码......
}
~~~
  然而报错了,大致是下面这个意思,无法克隆,因为对象中包含函数,就会引发错误。
~~~
node:internal/worker:349
ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args);
could not be cloned.
~~~
  想以通信的方式实现数据库的并发查询,目前看来不能完成。
  其实可以在 worker.js 中单独引入 services, 不过由于我们在脚本文件中采用了 import 语法,因此在执行时会报错,SyntaxError: Cannot use import statement outside a module。
~~~
const { isMainThread, parentPort, workerData } = require('worker_threads');
const services = require('../services');
// 不是主线程时执行
if (!isMainThread) {
// 省略代码......
}
~~~
  还有一种解决方案,其成本就比较高,就是单独再实现一套服务层,也就是说再封装一层符合Node.js 模块化语法的数据库操作集合。
*****
> 原文出处:
[博客园-Node.js躬行记](https://www.cnblogs.com/strick/category/1688575.html)
[知乎专栏-Node.js躬行记](https://zhuanlan.zhihu.com/pwnode)
已建立一个微信前端交流群,如要进群,请先加微信号freedom20180706或扫描下面的二维码,请求中需注明“看云加群”,在通过请求后就会把你拉进来。还搜集整理了一套[面试资料](https://github.com/pwstrick/daily),欢迎阅读。
![](https://box.kancloud.cn/2e1f8ecf9512ecdd2fcaae8250e7d48a_430x430.jpg =200x200)
推荐一款前端监控脚本:[shin-monitor](https://github.com/pwstrick/shin-monitor),不仅能监控前端的错误、通信、打印等行为,还能计算各类性能参数,包括 FMP、LCP、FP 等。
- ES6
- 1、let和const
- 2、扩展运算符和剩余参数
- 3、解构
- 4、模板字面量
- 5、对象字面量的扩展
- 6、Symbol
- 7、代码模块化
- 8、数字
- 9、字符串
- 10、正则表达式
- 11、对象
- 12、数组
- 13、类型化数组
- 14、函数
- 15、箭头函数和尾调用优化
- 16、Set
- 17、Map
- 18、迭代器
- 19、生成器
- 20、类
- 21、类的继承
- 22、Promise
- 23、Promise的静态方法和应用
- 24、代理和反射
- HTML
- 1、SVG
- 2、WebRTC基础实践
- 3、WebRTC视频通话
- 4、Web音视频基础
- CSS进阶
- 1、CSS基础拾遗
- 2、伪类和伪元素
- 3、CSS属性拾遗
- 4、浮动形状
- 5、渐变
- 6、滤镜
- 7、合成
- 8、裁剪和遮罩
- 9、网格布局
- 10、CSS方法论
- 11、管理后台响应式改造
- React
- 1、函数式编程
- 2、JSX
- 3、组件
- 4、生命周期
- 5、React和DOM
- 6、事件
- 7、表单
- 8、样式
- 9、组件通信
- 10、高阶组件
- 11、Redux基础
- 12、Redux中间件
- 13、React Router
- 14、测试框架
- 15、React Hooks
- 16、React源码分析
- 利器
- 1、npm
- 2、Babel
- 3、webpack基础
- 4、webpack进阶
- 5、Git
- 6、Fiddler
- 7、自制脚手架
- 8、VSCode插件研发
- 9、WebView中的页面调试方法
- Vue.js
- 1、数据绑定
- 2、指令
- 3、样式和表单
- 4、组件
- 5、组件通信
- 6、内容分发
- 7、渲染函数和JSX
- 8、Vue Router
- 9、Vuex
- TypeScript
- 1、数据类型
- 2、接口
- 3、类
- 4、泛型
- 5、类型兼容性
- 6、高级类型
- 7、命名空间
- 8、装饰器
- Node.js
- 1、Buffer、流和EventEmitter
- 2、文件系统和网络
- 3、命令行工具
- 4、自建前端监控系统
- 5、定时任务的调试
- 6、自制短链系统
- 7、定时任务的进化史
- 8、通用接口
- 9、微前端实践
- 10、接口日志查询
- 11、E2E测试
- 12、BFF
- 13、MySQL归档
- 14、压力测试
- 15、活动规则引擎
- 16、活动配置化
- 17、UmiJS版本升级
- 18、半吊子的可视化搭建系统
- 19、KOA源码分析(上)
- 20、KOA源码分析(下)
- 21、花10分钟入门Node.js
- 22、Node环境升级日志
- 23、Worker threads
- 24、低代码
- 25、Web自动化测试
- 26、接口拦截和页面回放实验
- 27、接口管理
- 28、Cypress自动化测试实践
- 29、基于Electron的开播助手
- Node.js精进
- 1、模块化
- 2、异步编程
- 3、流
- 4、事件触发器
- 5、HTTP
- 6、文件
- 7、日志
- 8、错误处理
- 9、性能监控(上)
- 10、性能监控(下)
- 11、Socket.IO
- 12、ElasticSearch
- 监控系统
- 1、SDK
- 2、存储和分析
- 3、性能监控
- 4、内存泄漏
- 5、小程序
- 6、较长的白屏时间
- 7、页面奔溃
- 8、shin-monitor源码分析
- 前端性能精进
- 1、优化方法论之测量
- 2、优化方法论之分析
- 3、浏览器之图像
- 4、浏览器之呈现
- 5、浏览器之JavaScript
- 6、网络
- 7、构建
- 前端体验优化
- 1、概述
- 2、基建
- 3、后端
- 4、数据
- 5、后台
- Web优化
- 1、CSS优化
- 2、JavaScript优化
- 3、图像和网络
- 4、用户体验和工具
- 5、网站优化
- 6、优化闭环实践
- 数据结构与算法
- 1、链表
- 2、栈、队列、散列表和位运算
- 3、二叉树
- 4、二分查找
- 5、回溯算法
- 6、贪心算法
- 7、分治算法
- 8、动态规划
- 程序员之路
- 大学
- 2011年
- 2012年
- 2013年
- 2014年
- 项目反思
- 前端基础学习分享
- 2015年
- 再一次项目反思
- 然并卵
- PC网站CSS分享
- 2016年
- 制造自己的榫卯
- PrimusUI
- 2017年
- 工匠精神
- 2018年
- 2019年
- 前端学习之路分享
- 2020年
- 2021年
- 2022年
- 2023年
- 日志
- 2020