WorkerPool.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. exports.default = void 0;
  6. var _child_process = _interopRequireDefault(require("child_process"));
  7. var _queue = _interopRequireDefault(require("neo-async/queue"));
  8. var _mapSeries = _interopRequireDefault(require("neo-async/mapSeries"));
  9. var _readBuffer = _interopRequireDefault(require("./readBuffer"));
  10. var _WorkerError = _interopRequireDefault(require("./WorkerError"));
  11. var _serializer = require("./serializer");
  12. function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
  13. /* eslint-disable no-console */
  14. const workerPath = require.resolve('./worker');
  15. let workerId = 0;
  16. class PoolWorker {
  17. constructor(options, onJobDone) {
  18. this.disposed = false;
  19. this.nextJobId = 0;
  20. this.jobs = Object.create(null);
  21. this.activeJobs = 0;
  22. this.onJobDone = onJobDone;
  23. this.id = workerId;
  24. workerId += 1; // Empty or invalid node args would break the child process
  25. const sanitizedNodeArgs = (options.nodeArgs || []).filter(opt => !!opt);
  26. this.worker = _child_process.default.spawn(process.execPath, [].concat(sanitizedNodeArgs).concat(workerPath, options.parallelJobs), {
  27. detached: true,
  28. stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe']
  29. });
  30. this.worker.unref(); // This prevents a problem where the worker stdio can be undefined
  31. // when the kernel hits the limit of open files.
  32. // More info can be found on: https://github.com/webpack-contrib/thread-loader/issues/2
  33. if (!this.worker.stdio) {
  34. throw new Error(`Failed to create the worker pool with workerId: ${workerId} and ${''}configuration: ${JSON.stringify(options)}. Please verify if you hit the OS open files limit.`);
  35. }
  36. const [,,, readPipe, writePipe] = this.worker.stdio;
  37. this.readPipe = readPipe;
  38. this.writePipe = writePipe;
  39. this.listenStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr);
  40. this.readNextMessage();
  41. }
  42. listenStdOutAndErrFromWorker(workerStdout, workerStderr) {
  43. if (workerStdout) {
  44. workerStdout.on('data', this.writeToStdout);
  45. }
  46. if (workerStderr) {
  47. workerStderr.on('data', this.writeToStderr);
  48. }
  49. }
  50. ignoreStdOutAndErrFromWorker(workerStdout, workerStderr) {
  51. if (workerStdout) {
  52. workerStdout.removeListener('data', this.writeToStdout);
  53. }
  54. if (workerStderr) {
  55. workerStderr.removeListener('data', this.writeToStderr);
  56. }
  57. }
  58. writeToStdout(data) {
  59. if (!this.disposed) {
  60. process.stdout.write(data);
  61. }
  62. }
  63. writeToStderr(data) {
  64. if (!this.disposed) {
  65. process.stderr.write(data);
  66. }
  67. }
  68. run(data, callback) {
  69. const jobId = this.nextJobId;
  70. this.nextJobId += 1;
  71. this.jobs[jobId] = {
  72. data,
  73. callback
  74. };
  75. this.activeJobs += 1;
  76. this.writeJson({
  77. type: 'job',
  78. id: jobId,
  79. data
  80. });
  81. }
  82. warmup(requires) {
  83. this.writeJson({
  84. type: 'warmup',
  85. requires
  86. });
  87. }
  88. writeJson(data) {
  89. const lengthBuffer = Buffer.alloc(4);
  90. const messageBuffer = Buffer.from(JSON.stringify(data, _serializer.replacer), 'utf-8');
  91. lengthBuffer.writeInt32BE(messageBuffer.length, 0);
  92. this.writePipe.write(lengthBuffer);
  93. this.writePipe.write(messageBuffer);
  94. }
  95. writeEnd() {
  96. const lengthBuffer = Buffer.alloc(4);
  97. lengthBuffer.writeInt32BE(0, 0);
  98. this.writePipe.write(lengthBuffer);
  99. }
  100. readNextMessage() {
  101. this.state = 'read length';
  102. this.readBuffer(4, (lengthReadError, lengthBuffer) => {
  103. if (lengthReadError) {
  104. console.error(`Failed to communicate with worker (read length) ${lengthReadError}`);
  105. return;
  106. }
  107. this.state = 'length read';
  108. const length = lengthBuffer.readInt32BE(0);
  109. this.state = 'read message';
  110. this.readBuffer(length, (messageError, messageBuffer) => {
  111. if (messageError) {
  112. console.error(`Failed to communicate with worker (read message) ${messageError}`);
  113. return;
  114. }
  115. this.state = 'message read';
  116. const messageString = messageBuffer.toString('utf-8');
  117. const message = JSON.parse(messageString, _serializer.reviver);
  118. this.state = 'process message';
  119. this.onWorkerMessage(message, err => {
  120. if (err) {
  121. console.error(`Failed to communicate with worker (process message) ${err}`);
  122. return;
  123. }
  124. this.state = 'soon next';
  125. setImmediate(() => this.readNextMessage());
  126. });
  127. });
  128. });
  129. }
  130. onWorkerMessage(message, finalCallback) {
  131. const {
  132. type,
  133. id
  134. } = message;
  135. switch (type) {
  136. case 'job':
  137. {
  138. const {
  139. data,
  140. error,
  141. result
  142. } = message;
  143. (0, _mapSeries.default)(data, (length, callback) => this.readBuffer(length, callback), (eachErr, buffers) => {
  144. const {
  145. callback: jobCallback
  146. } = this.jobs[id];
  147. const callback = (err, arg) => {
  148. if (jobCallback) {
  149. delete this.jobs[id];
  150. this.activeJobs -= 1;
  151. this.onJobDone();
  152. if (err) {
  153. jobCallback(err instanceof Error ? err : new Error(err), arg);
  154. } else {
  155. jobCallback(null, arg);
  156. }
  157. }
  158. finalCallback();
  159. };
  160. if (eachErr) {
  161. callback(eachErr);
  162. return;
  163. }
  164. let bufferPosition = 0;
  165. if (result.result) {
  166. result.result = result.result.map(r => {
  167. if (r.buffer) {
  168. const buffer = buffers[bufferPosition];
  169. bufferPosition += 1;
  170. if (r.string) {
  171. return buffer.toString('utf-8');
  172. }
  173. return buffer;
  174. }
  175. return r.data;
  176. });
  177. }
  178. if (error) {
  179. callback(this.fromErrorObj(error), result);
  180. return;
  181. }
  182. callback(null, result);
  183. });
  184. break;
  185. }
  186. case 'loadModule':
  187. {
  188. const {
  189. request,
  190. questionId
  191. } = message;
  192. const {
  193. data
  194. } = this.jobs[id]; // eslint-disable-next-line no-unused-vars
  195. data.loadModule(request, (error, source, sourceMap, module) => {
  196. this.writeJson({
  197. type: 'result',
  198. id: questionId,
  199. error: error ? {
  200. message: error.message,
  201. details: error.details,
  202. missing: error.missing
  203. } : null,
  204. result: [source, sourceMap // TODO: Serialize module?
  205. // module,
  206. ]
  207. });
  208. });
  209. finalCallback();
  210. break;
  211. }
  212. case 'resolve':
  213. {
  214. const {
  215. context,
  216. request,
  217. options,
  218. questionId
  219. } = message;
  220. const {
  221. data
  222. } = this.jobs[id];
  223. if (options) {
  224. data.getResolve(options)(context, request, (error, result) => {
  225. this.writeJson({
  226. type: 'result',
  227. id: questionId,
  228. error: error ? {
  229. message: error.message,
  230. details: error.details,
  231. missing: error.missing
  232. } : null,
  233. result
  234. });
  235. });
  236. } else {
  237. data.resolve(context, request, (error, result) => {
  238. this.writeJson({
  239. type: 'result',
  240. id: questionId,
  241. error: error ? {
  242. message: error.message,
  243. details: error.details,
  244. missing: error.missing
  245. } : null,
  246. result
  247. });
  248. });
  249. }
  250. finalCallback();
  251. break;
  252. }
  253. case 'emitWarning':
  254. {
  255. const {
  256. data
  257. } = message;
  258. const {
  259. data: jobData
  260. } = this.jobs[id];
  261. jobData.emitWarning(this.fromErrorObj(data));
  262. finalCallback();
  263. break;
  264. }
  265. case 'emitError':
  266. {
  267. const {
  268. data
  269. } = message;
  270. const {
  271. data: jobData
  272. } = this.jobs[id];
  273. jobData.emitError(this.fromErrorObj(data));
  274. finalCallback();
  275. break;
  276. }
  277. default:
  278. {
  279. console.error(`Unexpected worker message ${type} in WorkerPool.`);
  280. finalCallback();
  281. break;
  282. }
  283. }
  284. }
  285. fromErrorObj(arg) {
  286. let obj;
  287. if (typeof arg === 'string') {
  288. obj = {
  289. message: arg
  290. };
  291. } else {
  292. obj = arg;
  293. }
  294. return new _WorkerError.default(obj, this.id);
  295. }
  296. readBuffer(length, callback) {
  297. (0, _readBuffer.default)(this.readPipe, length, callback);
  298. }
  299. dispose() {
  300. if (!this.disposed) {
  301. this.disposed = true;
  302. this.ignoreStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr);
  303. this.writeEnd();
  304. }
  305. }
  306. }
  307. class WorkerPool {
  308. constructor(options) {
  309. this.options = options || {};
  310. this.numberOfWorkers = options.numberOfWorkers;
  311. this.poolTimeout = options.poolTimeout;
  312. this.workerNodeArgs = options.workerNodeArgs;
  313. this.workerParallelJobs = options.workerParallelJobs;
  314. this.workers = new Set();
  315. this.activeJobs = 0;
  316. this.timeout = null;
  317. this.poolQueue = (0, _queue.default)(this.distributeJob.bind(this), options.poolParallelJobs);
  318. this.terminated = false;
  319. this.setupLifeCycle();
  320. }
  321. isAbleToRun() {
  322. return !this.terminated;
  323. }
  324. terminate() {
  325. if (this.terminated) {
  326. return;
  327. }
  328. this.terminated = true;
  329. this.poolQueue.kill();
  330. this.disposeWorkers(true);
  331. }
  332. setupLifeCycle() {
  333. process.on('exit', () => {
  334. this.terminate();
  335. });
  336. }
  337. run(data, callback) {
  338. if (this.timeout) {
  339. clearTimeout(this.timeout);
  340. this.timeout = null;
  341. }
  342. this.activeJobs += 1;
  343. this.poolQueue.push(data, callback);
  344. }
  345. distributeJob(data, callback) {
  346. // use worker with the fewest jobs
  347. let bestWorker;
  348. for (const worker of this.workers) {
  349. if (!bestWorker || worker.activeJobs < bestWorker.activeJobs) {
  350. bestWorker = worker;
  351. }
  352. }
  353. if (bestWorker && (bestWorker.activeJobs === 0 || this.workers.size >= this.numberOfWorkers)) {
  354. bestWorker.run(data, callback);
  355. return;
  356. }
  357. const newWorker = this.createWorker();
  358. newWorker.run(data, callback);
  359. }
  360. createWorker() {
  361. // spin up a new worker
  362. const newWorker = new PoolWorker({
  363. nodeArgs: this.workerNodeArgs,
  364. parallelJobs: this.workerParallelJobs
  365. }, () => this.onJobDone());
  366. this.workers.add(newWorker);
  367. return newWorker;
  368. }
  369. warmup(requires) {
  370. while (this.workers.size < this.numberOfWorkers) {
  371. this.createWorker().warmup(requires);
  372. }
  373. }
  374. onJobDone() {
  375. this.activeJobs -= 1;
  376. if (this.activeJobs === 0 && isFinite(this.poolTimeout)) {
  377. this.timeout = setTimeout(() => this.disposeWorkers(), this.poolTimeout);
  378. }
  379. }
  380. disposeWorkers(fromTerminate) {
  381. if (!this.options.poolRespawn && !fromTerminate) {
  382. this.terminate();
  383. return;
  384. }
  385. if (this.activeJobs === 0 || fromTerminate) {
  386. for (const worker of this.workers) {
  387. worker.dispose();
  388. }
  389. this.workers.clear();
  390. }
  391. }
  392. }
  393. exports.default = WorkerPool;