receiver.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. 'use strict';
  2. const { Writable } = require('stream');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const {
  5. BINARY_TYPES,
  6. EMPTY_BUFFER,
  7. kStatusCode,
  8. kWebSocket
  9. } = require('./constants');
  10. const { concat, toArrayBuffer, unmask } = require('./buffer-util');
  11. const { isValidStatusCode, isValidUTF8 } = require('./validation');
  12. const GET_INFO = 0;
  13. const GET_PAYLOAD_LENGTH_16 = 1;
  14. const GET_PAYLOAD_LENGTH_64 = 2;
  15. const GET_MASK = 3;
  16. const GET_DATA = 4;
  17. const INFLATING = 5;
  18. /**
  19. * HyBi Receiver implementation.
  20. *
  21. * @extends Writable
  22. */
  23. class Receiver extends Writable {
  24. /**
  25. * Creates a Receiver instance.
  26. *
  27. * @param {String} [binaryType=nodebuffer] The type for binary data
  28. * @param {Object} [extensions] An object containing the negotiated extensions
  29. * @param {Boolean} [isServer=false] Specifies whether to operate in client or
  30. * server mode
  31. * @param {Number} [maxPayload=0] The maximum allowed message length
  32. */
  33. constructor(binaryType, extensions, isServer, maxPayload) {
  34. super();
  35. this._binaryType = binaryType || BINARY_TYPES[0];
  36. this[kWebSocket] = undefined;
  37. this._extensions = extensions || {};
  38. this._isServer = !!isServer;
  39. this._maxPayload = maxPayload | 0;
  40. this._bufferedBytes = 0;
  41. this._buffers = [];
  42. this._compressed = false;
  43. this._payloadLength = 0;
  44. this._mask = undefined;
  45. this._fragmented = 0;
  46. this._masked = false;
  47. this._fin = false;
  48. this._opcode = 0;
  49. this._totalPayloadLength = 0;
  50. this._messageLength = 0;
  51. this._fragments = [];
  52. this._state = GET_INFO;
  53. this._loop = false;
  54. }
  55. /**
  56. * Implements `Writable.prototype._write()`.
  57. *
  58. * @param {Buffer} chunk The chunk of data to write
  59. * @param {String} encoding The character encoding of `chunk`
  60. * @param {Function} cb Callback
  61. * @private
  62. */
  63. _write(chunk, encoding, cb) {
  64. if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
  65. this._bufferedBytes += chunk.length;
  66. this._buffers.push(chunk);
  67. this.startLoop(cb);
  68. }
  69. /**
  70. * Consumes `n` bytes from the buffered data.
  71. *
  72. * @param {Number} n The number of bytes to consume
  73. * @return {Buffer} The consumed bytes
  74. * @private
  75. */
  76. consume(n) {
  77. this._bufferedBytes -= n;
  78. if (n === this._buffers[0].length) return this._buffers.shift();
  79. if (n < this._buffers[0].length) {
  80. const buf = this._buffers[0];
  81. this._buffers[0] = buf.slice(n);
  82. return buf.slice(0, n);
  83. }
  84. const dst = Buffer.allocUnsafe(n);
  85. do {
  86. const buf = this._buffers[0];
  87. const offset = dst.length - n;
  88. if (n >= buf.length) {
  89. dst.set(this._buffers.shift(), offset);
  90. } else {
  91. dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
  92. this._buffers[0] = buf.slice(n);
  93. }
  94. n -= buf.length;
  95. } while (n > 0);
  96. return dst;
  97. }
  98. /**
  99. * Starts the parsing loop.
  100. *
  101. * @param {Function} cb Callback
  102. * @private
  103. */
  104. startLoop(cb) {
  105. let err;
  106. this._loop = true;
  107. do {
  108. switch (this._state) {
  109. case GET_INFO:
  110. err = this.getInfo();
  111. break;
  112. case GET_PAYLOAD_LENGTH_16:
  113. err = this.getPayloadLength16();
  114. break;
  115. case GET_PAYLOAD_LENGTH_64:
  116. err = this.getPayloadLength64();
  117. break;
  118. case GET_MASK:
  119. this.getMask();
  120. break;
  121. case GET_DATA:
  122. err = this.getData(cb);
  123. break;
  124. default:
  125. // `INFLATING`
  126. this._loop = false;
  127. return;
  128. }
  129. } while (this._loop);
  130. cb(err);
  131. }
  132. /**
  133. * Reads the first two bytes of a frame.
  134. *
  135. * @return {(RangeError|undefined)} A possible error
  136. * @private
  137. */
  138. getInfo() {
  139. if (this._bufferedBytes < 2) {
  140. this._loop = false;
  141. return;
  142. }
  143. const buf = this.consume(2);
  144. if ((buf[0] & 0x30) !== 0x00) {
  145. this._loop = false;
  146. return error(
  147. RangeError,
  148. 'RSV2 and RSV3 must be clear',
  149. true,
  150. 1002,
  151. 'WS_ERR_UNEXPECTED_RSV_2_3'
  152. );
  153. }
  154. const compressed = (buf[0] & 0x40) === 0x40;
  155. if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
  156. this._loop = false;
  157. return error(
  158. RangeError,
  159. 'RSV1 must be clear',
  160. true,
  161. 1002,
  162. 'WS_ERR_UNEXPECTED_RSV_1'
  163. );
  164. }
  165. this._fin = (buf[0] & 0x80) === 0x80;
  166. this._opcode = buf[0] & 0x0f;
  167. this._payloadLength = buf[1] & 0x7f;
  168. if (this._opcode === 0x00) {
  169. if (compressed) {
  170. this._loop = false;
  171. return error(
  172. RangeError,
  173. 'RSV1 must be clear',
  174. true,
  175. 1002,
  176. 'WS_ERR_UNEXPECTED_RSV_1'
  177. );
  178. }
  179. if (!this._fragmented) {
  180. this._loop = false;
  181. return error(
  182. RangeError,
  183. 'invalid opcode 0',
  184. true,
  185. 1002,
  186. 'WS_ERR_INVALID_OPCODE'
  187. );
  188. }
  189. this._opcode = this._fragmented;
  190. } else if (this._opcode === 0x01 || this._opcode === 0x02) {
  191. if (this._fragmented) {
  192. this._loop = false;
  193. return error(
  194. RangeError,
  195. `invalid opcode ${this._opcode}`,
  196. true,
  197. 1002,
  198. 'WS_ERR_INVALID_OPCODE'
  199. );
  200. }
  201. this._compressed = compressed;
  202. } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
  203. if (!this._fin) {
  204. this._loop = false;
  205. return error(
  206. RangeError,
  207. 'FIN must be set',
  208. true,
  209. 1002,
  210. 'WS_ERR_EXPECTED_FIN'
  211. );
  212. }
  213. if (compressed) {
  214. this._loop = false;
  215. return error(
  216. RangeError,
  217. 'RSV1 must be clear',
  218. true,
  219. 1002,
  220. 'WS_ERR_UNEXPECTED_RSV_1'
  221. );
  222. }
  223. if (this._payloadLength > 0x7d) {
  224. this._loop = false;
  225. return error(
  226. RangeError,
  227. `invalid payload length ${this._payloadLength}`,
  228. true,
  229. 1002,
  230. 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
  231. );
  232. }
  233. } else {
  234. this._loop = false;
  235. return error(
  236. RangeError,
  237. `invalid opcode ${this._opcode}`,
  238. true,
  239. 1002,
  240. 'WS_ERR_INVALID_OPCODE'
  241. );
  242. }
  243. if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
  244. this._masked = (buf[1] & 0x80) === 0x80;
  245. if (this._isServer) {
  246. if (!this._masked) {
  247. this._loop = false;
  248. return error(
  249. RangeError,
  250. 'MASK must be set',
  251. true,
  252. 1002,
  253. 'WS_ERR_EXPECTED_MASK'
  254. );
  255. }
  256. } else if (this._masked) {
  257. this._loop = false;
  258. return error(
  259. RangeError,
  260. 'MASK must be clear',
  261. true,
  262. 1002,
  263. 'WS_ERR_UNEXPECTED_MASK'
  264. );
  265. }
  266. if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
  267. else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
  268. else return this.haveLength();
  269. }
  270. /**
  271. * Gets extended payload length (7+16).
  272. *
  273. * @return {(RangeError|undefined)} A possible error
  274. * @private
  275. */
  276. getPayloadLength16() {
  277. if (this._bufferedBytes < 2) {
  278. this._loop = false;
  279. return;
  280. }
  281. this._payloadLength = this.consume(2).readUInt16BE(0);
  282. return this.haveLength();
  283. }
  284. /**
  285. * Gets extended payload length (7+64).
  286. *
  287. * @return {(RangeError|undefined)} A possible error
  288. * @private
  289. */
  290. getPayloadLength64() {
  291. if (this._bufferedBytes < 8) {
  292. this._loop = false;
  293. return;
  294. }
  295. const buf = this.consume(8);
  296. const num = buf.readUInt32BE(0);
  297. //
  298. // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
  299. // if payload length is greater than this number.
  300. //
  301. if (num > Math.pow(2, 53 - 32) - 1) {
  302. this._loop = false;
  303. return error(
  304. RangeError,
  305. 'Unsupported WebSocket frame: payload length > 2^53 - 1',
  306. false,
  307. 1009,
  308. 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
  309. );
  310. }
  311. this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
  312. return this.haveLength();
  313. }
  314. /**
  315. * Payload length has been read.
  316. *
  317. * @return {(RangeError|undefined)} A possible error
  318. * @private
  319. */
  320. haveLength() {
  321. if (this._payloadLength && this._opcode < 0x08) {
  322. this._totalPayloadLength += this._payloadLength;
  323. if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
  324. this._loop = false;
  325. return error(
  326. RangeError,
  327. 'Max payload size exceeded',
  328. false,
  329. 1009,
  330. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  331. );
  332. }
  333. }
  334. if (this._masked) this._state = GET_MASK;
  335. else this._state = GET_DATA;
  336. }
  337. /**
  338. * Reads mask bytes.
  339. *
  340. * @private
  341. */
  342. getMask() {
  343. if (this._bufferedBytes < 4) {
  344. this._loop = false;
  345. return;
  346. }
  347. this._mask = this.consume(4);
  348. this._state = GET_DATA;
  349. }
  350. /**
  351. * Reads data bytes.
  352. *
  353. * @param {Function} cb Callback
  354. * @return {(Error|RangeError|undefined)} A possible error
  355. * @private
  356. */
  357. getData(cb) {
  358. let data = EMPTY_BUFFER;
  359. if (this._payloadLength) {
  360. if (this._bufferedBytes < this._payloadLength) {
  361. this._loop = false;
  362. return;
  363. }
  364. data = this.consume(this._payloadLength);
  365. if (this._masked) unmask(data, this._mask);
  366. }
  367. if (this._opcode > 0x07) return this.controlMessage(data);
  368. if (this._compressed) {
  369. this._state = INFLATING;
  370. this.decompress(data, cb);
  371. return;
  372. }
  373. if (data.length) {
  374. //
  375. // This message is not compressed so its lenght is the sum of the payload
  376. // length of all fragments.
  377. //
  378. this._messageLength = this._totalPayloadLength;
  379. this._fragments.push(data);
  380. }
  381. return this.dataMessage();
  382. }
  383. /**
  384. * Decompresses data.
  385. *
  386. * @param {Buffer} data Compressed data
  387. * @param {Function} cb Callback
  388. * @private
  389. */
  390. decompress(data, cb) {
  391. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  392. perMessageDeflate.decompress(data, this._fin, (err, buf) => {
  393. if (err) return cb(err);
  394. if (buf.length) {
  395. this._messageLength += buf.length;
  396. if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
  397. return cb(
  398. error(
  399. RangeError,
  400. 'Max payload size exceeded',
  401. false,
  402. 1009,
  403. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  404. )
  405. );
  406. }
  407. this._fragments.push(buf);
  408. }
  409. const er = this.dataMessage();
  410. if (er) return cb(er);
  411. this.startLoop(cb);
  412. });
  413. }
  414. /**
  415. * Handles a data message.
  416. *
  417. * @return {(Error|undefined)} A possible error
  418. * @private
  419. */
  420. dataMessage() {
  421. if (this._fin) {
  422. const messageLength = this._messageLength;
  423. const fragments = this._fragments;
  424. this._totalPayloadLength = 0;
  425. this._messageLength = 0;
  426. this._fragmented = 0;
  427. this._fragments = [];
  428. if (this._opcode === 2) {
  429. let data;
  430. if (this._binaryType === 'nodebuffer') {
  431. data = concat(fragments, messageLength);
  432. } else if (this._binaryType === 'arraybuffer') {
  433. data = toArrayBuffer(concat(fragments, messageLength));
  434. } else {
  435. data = fragments;
  436. }
  437. this.emit('message', data);
  438. } else {
  439. const buf = concat(fragments, messageLength);
  440. if (!isValidUTF8(buf)) {
  441. this._loop = false;
  442. return error(
  443. Error,
  444. 'invalid UTF-8 sequence',
  445. true,
  446. 1007,
  447. 'WS_ERR_INVALID_UTF8'
  448. );
  449. }
  450. this.emit('message', buf.toString());
  451. }
  452. }
  453. this._state = GET_INFO;
  454. }
  455. /**
  456. * Handles a control message.
  457. *
  458. * @param {Buffer} data Data to handle
  459. * @return {(Error|RangeError|undefined)} A possible error
  460. * @private
  461. */
  462. controlMessage(data) {
  463. if (this._opcode === 0x08) {
  464. this._loop = false;
  465. if (data.length === 0) {
  466. this.emit('conclude', 1005, '');
  467. this.end();
  468. } else if (data.length === 1) {
  469. return error(
  470. RangeError,
  471. 'invalid payload length 1',
  472. true,
  473. 1002,
  474. 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
  475. );
  476. } else {
  477. const code = data.readUInt16BE(0);
  478. if (!isValidStatusCode(code)) {
  479. return error(
  480. RangeError,
  481. `invalid status code ${code}`,
  482. true,
  483. 1002,
  484. 'WS_ERR_INVALID_CLOSE_CODE'
  485. );
  486. }
  487. const buf = data.slice(2);
  488. if (!isValidUTF8(buf)) {
  489. return error(
  490. Error,
  491. 'invalid UTF-8 sequence',
  492. true,
  493. 1007,
  494. 'WS_ERR_INVALID_UTF8'
  495. );
  496. }
  497. this.emit('conclude', code, buf.toString());
  498. this.end();
  499. }
  500. } else if (this._opcode === 0x09) {
  501. this.emit('ping', data);
  502. } else {
  503. this.emit('pong', data);
  504. }
  505. this._state = GET_INFO;
  506. }
  507. }
  508. module.exports = Receiver;
  509. /**
  510. * Builds an error object.
  511. *
  512. * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
  513. * @param {String} message The error message
  514. * @param {Boolean} prefix Specifies whether or not to add a default prefix to
  515. * `message`
  516. * @param {Number} statusCode The status code
  517. * @param {String} errorCode The exposed error code
  518. * @return {(Error|RangeError)} The error
  519. * @private
  520. */
  521. function error(ErrorCtor, message, prefix, statusCode, errorCode) {
  522. const err = new ErrorCtor(
  523. prefix ? `Invalid WebSocket frame: ${message}` : message
  524. );
  525. Error.captureStackTrace(err, error);
  526. err.code = errorCode;
  527. err[kStatusCode] = statusCode;
  528. return err;
  529. }