在Node里耍多线程和多进程,会不会闪到腰?!
一、Node和JS的关系
- Node是JS的运行环境。最初JS只在浏览器中运行,它依赖于浏览器的JS引擎(如Chrome的V8、Firefox的SpiderMonkey)。Node从Chrome中获得灵感,并直接套用了V8引擎,让JS可以在服务器端运行,意味着可以使用JS来编写控制台应用、创建API和构建网络服务器。
- Node的核心部分使用C++编写,比如用于解析和执行JS的V8引擎(就是Chrome中的V8引擎),用于操作异步任务的libuv库,以及许多核心库(如fs、net、stream、worker_thread、child_process等)。Node将这些使用C++编写的底层功能,包装成JS接口,使我们可以使用JS来编写应用程序。
- 我们使用Vue或者React开发前端应用时,都要安装Node。调试运行时,实际上就是使用Node作为开发时服务器,调试时网站运行在本地Node服务器上,编译、打包和发布工作,则由Webpack或Vite来完成。
- JS的确是单线程,它处理并发的机制是非阻塞的事件循环机制。但Node的核心功能是C++编写的,自然支持创建多线程或多进程来完成异步操作。
二、Node多线程
Node提供worker_threads模块,用于手动创建多线程。之前有介绍用于异步操作的libuv库,它主要作用于Node单线程事件循环机制中的异步操作,主线程碰到异步任务时,会把它扔给libuv,libuv完成后扔到任务队列里。libuv的高效,源于它基于线程池,这有些类似于C#的TPL了,后面章节再细说。
worker_threads并不依赖于libuv,它直接依赖于底层操作系统的线程实现,在每个Worker中,有独立的V8引擎和上下文。这至少意味着:(1)创建线程的代价是比较高的;(2)线程之间相互独立,需要依靠消息机制通讯,通讯开销会比较大。所以,没事别乱开线程。
2.1 worker_thread的常用API
- Worker类:用来创建和管理Worker线程。一个Worker线程可以执行与主线程并行的任务。
- isMainThread:一个布尔值,指示代码是否在主线程中运行。
- parentPort:这是主线程与工作线程之间的通信通道。
- workerData:这是传递给Worker线程的初始数据。可以在创建Worker时传递,并在Worker中访问。
- MessageChannel:用于创建两个相互连接的通信端口,允许在不同线程之间进行通信。
- SharedArrayBuffer/Atomics:允许在多个线程之间共享数据时,并进行安全的原子操作。
2.2 主线程和Worker线程通讯
//main.js //以下文件包含了主线程和Worker线程的代码,通过isMainThread来判断 //实际开发中,一般将Worker线程的代码放到单独的JS文件中 const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); if (isMainThread) { // 以下代码在主线程中======================= // 创建一个新的Worker线程 // __filename指向当前文件路径,表示Worker也在当前文件中 //workerData,用于向Worker线程传递数据 const worker = new Worker(__filename, { workerData: { start: 1, end: 100 } }); //主线程监听Worker线程,message、error和exit是事件名称 //监听从Worker线程发来的消息 //result为Worker线程中postMessage出来的值 worker.on('message', result => { console.log(`Result from worker: ${result}`); }); //监听Worker线程中的错误 worker.on('error', error => { console.error(`Worker error: ${error}`); }); //监听Worker线程的结束状态 worker.on('exit', code => { if (code !== 0) { console.error(`Worker stopped with exit code ${code}`); } else { console.log('Worker finished successfully'); } }); } else { // 以下代码在Worker线程中============================ // 访问workerData const { start, end } = workerData; // 简单的计算任务:计算[start, end]范围内的和 let sum = 0; for (let i = start; i <= end; i++) { sum += i; } // 发送结果给主线程 parentPort.postMessage(sum); }
2.3 Worker线程之间通过MessageChannel通讯
//main.js,主线程================================================ const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); // 创建一个新的MessageChannel,解构出两个通讯端口 const { port1, port2 } = new MessageChannel(); // 创建第一个Worker const worker1 = new Worker(path.resolve(__dirname, 'worker1.js')); // 创建第二个Worker const worker2 = new Worker(path.resolve(__dirname, 'worker2.js')); // 将port1发送给worker1,port2发送给worker2 worker1.postMessage({ port: port1 }, [port1]); worker2.postMessage({ port: port2 }, [port2]); //worker1.js,Worker1线程========================================= const { parentPort } = require('worker_threads'); //第一个message来自主线程,可获取通讯端口 parentPort.on('message', (message) => { const port = message.port; //第二个message来自其它Worker线程 port.on('message', (msg) => { console.log(`Worker1 received: ${msg}`); // 发送响应消息 port.postMessage('Hello from Worker1'); }); }); //worker2.js,Worker2线程========================================= const { parentPort } = require('worker_threads'); parentPort.on('message', (message) => { const port = message.port; port.postMessage('Hello from Worker2'); port.on('message', (msg) => { console.log(`Worker2 received: ${msg}`); }); });
2.4 不同线程间共享信息SharedArrayBuffer 和Atomics
//使用SharedArrayBuffer 和Atomics,可以绕过消息通讯,极大提升多线程通讯性能 /main.js,主线程=============================================== const { Worker } = require('worker_threads'); const path = require('path'); //创建一个共享的ArrayBuffer,并初始化 const sharedBuffer = new SharedArrayBuffer(4); // 4字节的共享内存 const sharedArray = new Int32Array(sharedBuffer); sharedArray[0] = 0; //创建两个Worker线程 const worker1 = new Worker(path.resolve(__dirname, 'worker.js'), { workerData: { sharedBuffer } }); const worker2 = new Worker(path.resolve(__dirname, 'worker.js'), { workerData: { sharedBuffer } }); //监听Worker的消息 worker1.on('message', (msg) => console.log('From Worker1:', msg)); worker2.on('message', (msg) => console.log('From Worker2:', msg)); //等待Workers完成任务 worker1.on('exit', () => console.log('Worker1 exited')); worker2.on('exit', () => console.log('Worker2 exited')); //worker.js,Worker线程========================================= const { parentPort, workerData } = require('worker_threads'); const { SharedArrayBuffer, Atomics } = require('atomics'); // 获取共享的ArrayBuffer const sharedArray = new Int32Array(workerData.sharedBuffer); // 使用Atomics,安全的进行原子操作 Atomics.add(sharedArray, 0, 1); // 增加共享数组的第一个元素 // 获取当前值 const currentValue = Atomics.load(sharedArray, 0); // 向主线程发送消息 parentPort.postMessage(`Current value: ${currentValue}`);
三、Node多进程
Node不仅可以创建议多线程,还允许创建子进程。每个子进程,都是一个Node实例,有独立的V8引擎、Node运行时和内存资源。相比多线程,自然是更加消耗资源的(10-30M)。
Node提供了child_process和cluster两个模块用于创建和管理子进程。cluster建立在child_process之上,内置了负载均衡和自动重启机制,可以更加高效的利用CPU的多核性能,是专为Node服务器应用设计的。
3.1 Child Process模块
child_process模块提供了spawn、exec、execFile、fork等方法用于创建子进程:
**spawn**
:用于启动一个新的进程,可以与其进行数据流的交互。**exec**
:用于运行一个命令,并将输出(stdout 和 stderr)作为回调函数的参数返回。**execFile**
:与exec
类似,但更适合直接执行文件而不是命令字符串。**fork**
:专门用于创建新的Node.js进程,并且它有专门的通信通道,适用于父进程和子进程之间传递消息。
(1)spawn方法
它是最基本的创建子进程的方法,适用于需要与子进程进行长时间交互的场景。
//在子进程中,执行系统操作命令=================================== const { spawn } = require('child_process'); // 示例:执行 ls 命令,[]中为参数 //spawn方法返回ChildProcess对象,具有 stdout、stderr、stdin属性 //分别对应子进程的标准输出、标准错误和标准输入流。 const ls = spawn('ls', ['-lh', '/usr']); // 监听子进程的标准输出 ls.stdout.on('data', (data) => { console.log(`stdout: ${data}`); }); // 监听子进程的标准错误输出 ls.stderr.on('data', (data) => { console.error(`stderr: ${data}`); }); // 监听子进程的退出事件 ls.on('close', (code) => { console.log(`子进程退出,退出码 ${code}`); }); //在子进程中,执行JS脚本========================================= const { spawn } = require('child_process'); // 示例:执行 JavaScript 文件 const child = spawn('node', ['script.js']); child.stdout.on('data', (data) => { console.log(`子进程 stdout:\n${data}`); }); child.stderr.on('data', (data) => { console.error(`子进程 stderr:\n${data}`); }); child.on('close', (code) => { console.log(`子进程退出,退出码 ${code}`); });
(2)exec方法
相比 spawn() 方法,它将整个命令(包括参数)作为一个字符串传递给底层的 shell执行,适合于简单的命令和短期执行的任务。
//在子进程中,执行系统操作命令=================================== const { exec } = require('child_process'); // 示例:执行 ls 命令 //直接在回调中监听 exec('ls -lh /usr', (error, stdout, stderr) => { if (error) { console.error(`执行错误:${error.message}`); return; } if (stderr) { console.error(`stderr: ${stderr}`); return; } console.log(`stdout: ${stdout}`); }); //在子进程中,执行JS脚本======================================== const { exec } = require('child_process'); // 示例:执行 JavaScript 文件 exec('node script.js', (error, stdout, stderr) => { if (error) { console.error(`执行错误:${error.message}`); return; } if (stderr) { console.error(`子进程 stderr:\n${stderr}`); return; } console.log(`子进程 stdout:\n${stdout}`); });
(3)execFile方法
execFile() 方法与 exec() 类似,但需要显式指定可执行文件的路径和参数列表,不会调用系统的 shell
//在子进程中,执行系统操作命令=================================== const { execFile } = require('child_process'); // 示例:执行 node 命令 execFile('node', ['--version'], (error, stdout, stderr) => { if (error) { console.error(`执行错误:${error.message}`); return; } console.log(`stdout: ${stdout}`); }); //在子进程中,执行JS脚本======================================== const { execFile } = require('child_process'); // 示例:执行 JavaScript 文件 execFile('node', ['script.js'], (error, stdout, stderr) => { if (error) { console.error(`执行错误:${error.message}`); return; } console.log(`子进程 stdout:\n${stdout}`); });
(4)fork方法
fork()是Node.js特有的,用于创建一个新的Node.js进程,并且会在父进程和子进程之间创建一个通信通道。它非常适合在多进程架构中进行进程间通信(IPC)
//main.js======================================================= const { fork } = require('child_process'); const child = fork('./child.js'); child.on('message', (msg) => { console.log(`Message from child: ${msg}`); }); child.send('Hello from parent'); // child.js===================================================== process.on('message', (msg) => { console.log(`Message from parent: ${msg}`); process.send('Hello from child'); });
3.2 Cluster模块
cluster模块,基于child_process的fork方法,在此基础上,增加了负载均衡和自动重启等高级功能:
- Cluster模块允许创建多个工作进程,每个工作进程都是一个独立的 Node.js 实例,可以在不同的 CPU 核心上运行。
- 每个工作进程都可以处理客户端请求,共享同一个 TCP 连接,从而提高服务器的并发处理能力和吞吐量。
- Cluster 模块默认使用轮询(Round-Robin)策略将客户端连接分发到各个工作进程,实现负载均衡。可以通过配置自定义的负载均衡策略,如基于请求次数、CPU 负载等来动态分配客户端请求。
- 主进程(Master)负责管理所有工作进程的生命周期,包括启动、停止和重启等。当一个工作进程崩溃或退出时,主进程可以自动重新启动该工作进程,保持应用程序的稳定性。
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; //CPU核数 if (cluster.isMaster) { //以下代码在主进程中执行 console.log(`主进程 ${process.pid} 正在运行`); // 衍生工作进程 for (let i = 0; i < numCPUs; i++) { cluster.fork(); //创建多个子进程 } // 监听工作进程的退出事件 cluster.on('exit', (worker, code, signal) => { console.log(`工作进程 ${worker.process.pid} 已退出`); }); } else { //以下代码在子进程中执行 // 每个工作进程都可以共享一个 TCP 连接 // 这里是一个 HTTP 服务器示例 http.createServer((req, res) => { res.writeHead(200); res.end('Hello World\n'); }).listen(8000); console.log(`工作进程 ${process.pid} 已启动`); }
*这是一个系列文章,将全面介绍多线程、协程和单线程事件循环机制,建议收藏、点赞哦!
*你在并发编程过程中碰到了哪些难题?欢迎评论区交流~~~
我是functionMC > function MyClass(){…}
C#/TS/鸿蒙/AI等技术问题,以及如何写Bug、防脱发、送外卖等高深问题,都可以私信提问哦!