SerializerAppend.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. const fs = require('graceful-fs');
  2. const join = require('path').join;
  3. const Readable = require('stream').Readable;
  4. const _mkdirp = require('mkdirp');
  5. const _rimraf = require('rimraf');
  6. const writeJsonFile = require('write-json-file');
  7. const entries = require('./util/Object.entries');
  8. const values = require('./util/Object.values');
  9. const promisify = require('./util/promisify');
  10. const rimraf = promisify(_rimraf);
  11. const open = promisify(fs.open);
  12. const close = promisify(fs.close);
  13. const read = promisify(fs.read);
  14. const readFile = promisify(fs.readFile);
  15. const write = promisify(fs.write);
  16. const rename = promisify(fs.rename);
  17. const unlink = promisify(fs.unlink);
  18. const stat = promisify(fs.stat);
  19. const mkdirp = promisify(_mkdirp);
  20. const APPEND_VERSION = 1;
  21. const _blockSize = 4 * 1024;
  22. const _logSize = 2 * 1024 * 1024;
  23. const _minCompactSize = 512 * 1024;
  24. const _compactMultiplierThreshold = 1.5;
  25. const value = (key, size, start) => ({
  26. key,
  27. size: size || 0,
  28. start: start || 0,
  29. });
  30. const objFrom = map => {
  31. if (map instanceof Map) {
  32. const obj = {};
  33. map.forEach((value, key) => {
  34. obj[key] = value;
  35. });
  36. return obj;
  37. }
  38. return map;
  39. };
  40. const table = ({ nextByte, blockSize, logSize, map }) => ({
  41. version: APPEND_VERSION,
  42. nextByte: nextByte,
  43. blockSize: blockSize,
  44. logSize: logSize,
  45. map: objFrom(map),
  46. });
  47. const modTable = ({ nextByte, blockSize, logSize, map }) => ({
  48. version: APPEND_VERSION,
  49. nextByte: nextByte,
  50. blockSize: blockSize,
  51. logSize: logSize,
  52. map: new Map(entries(map)),
  53. });
  54. function putKey(_table, key, size) {
  55. // _table.map[key] = value(key, size, _table.nextByte, Math.ceil(size / _table.blockSize));
  56. _table.map.set(key, value(key, size, _table.nextByte));
  57. _table.nextByte = _table.nextByte + size;
  58. return _table;
  59. }
  60. function delKey(_table, key) {
  61. // if (_table.map[key]) {
  62. // delete _table.map[key];
  63. if (_table.map.get(key)) {
  64. _table.map.delete(key);
  65. }
  66. return _table;
  67. }
  68. const _tablepath = ({ path }) => join(path, 'table.json');
  69. const _defaultTable = ({ blockSize, logSize }) =>
  70. table({
  71. nextByte: 0,
  72. blockSize: blockSize || _blockSize,
  73. logSize: logSize || _logSize,
  74. map: {},
  75. });
  76. const timeout100 = () => new Promise(resolve => setTimeout(resolve, 100));
  77. const _retry = (fn, n) => {
  78. n = n || 5;
  79. const _retryFn = value => {
  80. if (n) {
  81. n--;
  82. return fn(value).catch(_retryFn);
  83. }
  84. return fn(value);
  85. };
  86. return _retryFn;
  87. };
  88. const _readTable = _this =>
  89. readFile(_tablepath(_this), 'utf8')
  90. .catch(e => JSON.stringify(_defaultTable(_this)))
  91. .then(JSON.parse)
  92. .then(_table => {
  93. if (_table.version !== APPEND_VERSION) {
  94. return _defaultTable(_this);
  95. }
  96. return _table;
  97. });
  98. const _writeTable = (_this, _table) => writeJsonFile(_tablepath(_this), _table);
  99. const _logFilepath = ({ path }, { logSize }, index) => {
  100. let logId = ((index / logSize) | 0).toString();
  101. while (logId.length < 4) {
  102. logId = `0${logId}`;
  103. }
  104. return join(path, `log${logId}`);
  105. };
  106. const _openLog = (_this, mode, _table, index) => {
  107. if (_this._fd !== null) {
  108. return Promise.resolve();
  109. } else {
  110. // If mode is 'a', stat the log to write to, if it should be empty and
  111. // isn't, unlink before opening.
  112. return Promise.resolve()
  113. .then(() => {
  114. if (mode === 'a' && index % _table.logSize === 0) {
  115. return stat(_logFilepath(_this, _table, index))
  116. .then(({ size }) => {
  117. if (size > 0) {
  118. return unlink(_logFilepath(_this, _table, index)).then(
  119. timeout100,
  120. );
  121. }
  122. })
  123. .catch(() => {});
  124. }
  125. })
  126. .then(() => open(_logFilepath(_this, _table, index), mode))
  127. .then(fd => {
  128. _this._fd = fd;
  129. if (mode === 'a') {
  130. _this._writeBuffer = new Buffer(_table.logSize);
  131. _this._writeOffset = 0;
  132. }
  133. })
  134. .catch(e => {
  135. throw e;
  136. });
  137. }
  138. };
  139. const _closeLog = _this => {
  140. if (_this._fd === null) {
  141. return Promise.resolve();
  142. } else {
  143. return Promise.resolve()
  144. .then(() => {
  145. if (_this._writeBuffer) {
  146. return write(_this._fd, _this._writeBuffer, 0, _this._writeOffset);
  147. }
  148. })
  149. .then(() => close(_this._fd))
  150. .then(() => {
  151. _this._fd = null;
  152. _this._writeBuffer = null;
  153. _this._writeOffset = 0;
  154. });
  155. }
  156. };
  157. const _readBufferSize = (_this, { blockSize, logSize }) =>
  158. Math.min(32 * blockSize, logSize);
  159. const _readLog = (_this, _table) => {
  160. let index = 0;
  161. const out = new Readable({
  162. read() {},
  163. });
  164. const rbSize = _table.logSize;
  165. const _readBuffer = new Buffer(rbSize);
  166. function _log() {
  167. if (index >= _table.nextByte) {
  168. out.push(null);
  169. return _closeLog(_this);
  170. }
  171. const offset = 0;
  172. function step() {
  173. if (!_this._fd) {
  174. index = _table.nextByte;
  175. return _log();
  176. }
  177. return read(_this._fd, _readBuffer, 0, rbSize, 0).then(read => {
  178. index += _table.logSize;
  179. out.push(_readBuffer);
  180. return _log();
  181. });
  182. }
  183. return _closeLog(_this)
  184. .then(() => _openLog(_this, 'r', _table, index))
  185. .then(step);
  186. }
  187. Promise.resolve().then(_log);
  188. return out;
  189. };
  190. const _appendBlock = (_this, _table, blockContent, index, next) => {
  191. let prep;
  192. if (_this._fd !== null && index % _table.logSize === 0) {
  193. prep = _closeLog(_this).then(() => _openLog(_this, 'a', _table, index));
  194. } else if (_this._fd === null) {
  195. prep = _openLog(_this, 'a', _table, index);
  196. }
  197. function work() {
  198. if (!_this._fd) {
  199. return next(new Error());
  200. }
  201. if (blockContent.length > _table.logSize) {
  202. return next(new Error('block longer than max size'));
  203. }
  204. const writeSlice = _this._writeBuffer.slice(
  205. _this._writeOffset,
  206. _this._writeOffset + blockContent.length,
  207. );
  208. // if (blockContent.length < _table.blockSize) {
  209. // writeSlice.fill(0);
  210. // }
  211. blockContent.copy(writeSlice);
  212. _this._writeOffset += blockContent.length;
  213. if (_this._writeOffset > _this._writeBuffer.length) {
  214. return next(
  215. new Error(
  216. `writeOffset ${_this._writeOffset} past writeBuffer length ${
  217. _this._writeBuffer.length
  218. }`,
  219. ),
  220. );
  221. }
  222. if (_this._writeOffset > _table.logSize) {
  223. return next(
  224. new Error(
  225. `writeOffset ${_this._writeOffset} past logSize ${_table.logSize}`,
  226. ),
  227. );
  228. }
  229. next();
  230. // return fs.write(_this._fd, blockContent, 0, _table.blockSize, next);
  231. }
  232. if (prep) {
  233. prep.then(work);
  234. } else {
  235. work();
  236. }
  237. // return Promise.resolve()
  238. // .then(function() {
  239. // if (index % (_table.logSize / _table.blockSize) === 0) {
  240. // return _closeLog(_this);
  241. // }
  242. // })
  243. // .then(function() {
  244. // return _openLog(_this, 'a', _table, index);
  245. // })
  246. // .then(function() {
  247. // if (!_this._fd) {
  248. // throw new Error();
  249. // }
  250. // if (blockContent.length > _table.blockSize) {
  251. // throw new Error('block longer than max size');
  252. // }
  253. // if (blockContent.length < _table.blockSize) {
  254. // var _blockContent = new Buffer(_table.blockSize);
  255. // blockContent.copy(_blockContent);
  256. // blockContent = _blockContent;
  257. // }
  258. // return write(_this._fd, blockContent, 0, _table.blockSize);
  259. // });
  260. };
  261. const _sizeNeeded = (_this, { map }) =>
  262. values(map).reduce((carry, { size }) => carry + size, 0);
  263. const _sizeUsed = (_this, { nextByte }) => nextByte;
  264. const _compactSize = (_this, _table) =>
  265. Math.max(
  266. _this.compactSizeThreshold,
  267. _sizeNeeded(_this, _table) * _this.compactMultiplierThreshold,
  268. );
  269. const _lock = (_this, mustLock, promiseFn) => {
  270. if (mustLock !== false) {
  271. return (_this.lock = promiseFn(_this.lock));
  272. }
  273. return promiseFn(Promise.resolve());
  274. };
  275. const serialFsTask = (array, each) =>
  276. new Promise((resolve, reject) => {
  277. let queue = 0;
  278. let index = 0;
  279. let inNext = false;
  280. function next(err) {
  281. if (err) {
  282. return reject(err);
  283. }
  284. if (index === array.length) {
  285. return resolve();
  286. }
  287. queue++;
  288. if (inNext) {
  289. return;
  290. }
  291. inNext = true;
  292. while (queue > index && index < array.length) {
  293. try {
  294. each(array[index++], next);
  295. } catch (e) {
  296. return next(e);
  297. }
  298. }
  299. inNext = false;
  300. }
  301. next();
  302. });
  303. class AppendSerializer {
  304. constructor(options) {
  305. this.path = options.cacheDirPath;
  306. this.autoParse = options.autoParse;
  307. this.blockSize = options.blockSize || _blockSize;
  308. this.logSize = options.logSize || _logSize;
  309. this.compactSizeThreshold = options.compactSizeThreshold || _minCompactSize;
  310. this.compactMultiplierThreshold =
  311. options.compactMultiplierThreshold || _compactMultiplierThreshold;
  312. this.lock = Promise.resolve();
  313. this._fd = null;
  314. }
  315. read(mustLock) {
  316. const start = Date.now();
  317. const _this = this;
  318. function _read() {
  319. let activeTable;
  320. return Promise.resolve()
  321. .then(_retry(() => _readTable(_this)))
  322. .then(_table => {
  323. activeTable = _table;
  324. })
  325. .then(() => {
  326. const map = new Map();
  327. const valueStarts = [];
  328. values(activeTable.map).forEach(value => {
  329. valueStarts.push({
  330. start: value.start,
  331. end: value.start + value.size,
  332. value,
  333. });
  334. });
  335. valueStarts.sort((a, b) => a.start - b.start);
  336. return new Promise((resolve, reject) => {
  337. let valueIndex = 0;
  338. let destBuffer = new Buffer(2 * 1024 * 1024);
  339. let offset = 0;
  340. let logOffset = 0;
  341. const log = _readLog(_this, activeTable);
  342. log.on('data', data => {
  343. if (valueIndex >= valueStarts.length) {
  344. return;
  345. }
  346. for (let bufferIndex = 0; bufferIndex < data.length; ) {
  347. if (bufferIndex + logOffset >= valueStarts[valueIndex].end) {
  348. valueIndex++;
  349. }
  350. if (valueIndex >= valueStarts.length) {
  351. return;
  352. }
  353. const value = valueStarts[valueIndex].value;
  354. if (bufferIndex + logOffset >= value.start) {
  355. if (value.size > destBuffer.length) {
  356. const newLength = Math.pow(
  357. 2,
  358. Math.ceil(Math.log(value.size) / Math.log(2)),
  359. );
  360. destBuffer = new Buffer(newLength);
  361. }
  362. const readAmount = Math.min(
  363. value.start + value.size - logOffset - bufferIndex,
  364. activeTable.logSize - bufferIndex,
  365. );
  366. data
  367. .slice(bufferIndex, bufferIndex + readAmount)
  368. .copy(destBuffer.slice(offset, offset + readAmount));
  369. bufferIndex += readAmount;
  370. offset += readAmount;
  371. if (offset >= value.size) {
  372. offset = 0;
  373. if (_this.autoParse) {
  374. // console.log(value.size, destBuffer.utf8Slice(0, value.size))
  375. map.set(
  376. value.key,
  377. JSON.parse(destBuffer.utf8Slice(0, value.size)),
  378. );
  379. } else {
  380. map.set(value.key, destBuffer.utf8Slice(0, value.size));
  381. }
  382. }
  383. } else if (bufferIndex + logOffset < value.start) {
  384. bufferIndex += value.start - (bufferIndex + logOffset);
  385. }
  386. }
  387. logOffset += activeTable.logSize;
  388. });
  389. log.on('end', resolve);
  390. log.on('error', reject);
  391. }).then(() => objFrom(map));
  392. });
  393. }
  394. return _lock(_this, mustLock, promise =>
  395. promise
  396. .then(() => _read())
  397. .catch(e =>
  398. _closeLog(_this).then(() => {
  399. throw e;
  400. }),
  401. ),
  402. );
  403. }
  404. write(ops, mustLock) {
  405. if (ops.length === 0) {
  406. return Promise.resolve();
  407. }
  408. const steps = 0;
  409. const _this = this;
  410. let activeTable;
  411. let contentBuffer;
  412. let contentLength;
  413. function _write() {
  414. return Promise.resolve()
  415. .then(_retry(() => mkdirp(_this.path)))
  416. .then(_retry(() => _readTable(_this)))
  417. .then(_table => {
  418. activeTable = modTable(_table);
  419. const _ops = ops.slice();
  420. function step(op, next) {
  421. // steps++;
  422. // var op = _ops.shift();
  423. // if (!op) {
  424. // return;
  425. // }
  426. let content = op.value;
  427. if (content !== null) {
  428. if (typeof content !== 'string') {
  429. content = JSON.stringify(content);
  430. }
  431. if (
  432. Buffer.byteLength &&
  433. contentBuffer &&
  434. Buffer.byteLength(content) <= contentBuffer.length
  435. ) {
  436. contentLength = contentBuffer.utf8Write(content);
  437. } else {
  438. contentBuffer = new Buffer(content);
  439. contentLength = contentBuffer.length;
  440. }
  441. const blockCount = Math.ceil(
  442. ((activeTable.nextByte % activeTable.logSize) + contentLength) /
  443. activeTable.logSize,
  444. );
  445. let nextByte = activeTable.nextByte;
  446. activeTable = putKey(activeTable, op.key, contentLength);
  447. let bufferIndex = 0;
  448. const bulk = Array.from(new Array(blockCount)).map((_, i) => i);
  449. return serialFsTask(bulk, (_, next) => {
  450. const blockSlice = contentBuffer.slice(
  451. bufferIndex,
  452. Math.min(
  453. bufferIndex +
  454. (activeTable.logSize - (nextByte % activeTable.logSize)),
  455. contentLength,
  456. ),
  457. );
  458. _appendBlock(_this, activeTable, blockSlice, nextByte, next);
  459. bufferIndex += blockSlice.length;
  460. nextByte += blockSlice.length;
  461. }).then(next);
  462. // function append() {
  463. // if (bufferIndex < contentBuffer.length) {
  464. // var blockSlice = contentBuffer.slice(bufferIndex, bufferIndex + activeTable.blockSize);
  465. // bufferIndex += activeTable.blockSize;
  466. // return _appendBlock(_this, activeTable, blockSlice, nextByte++)
  467. // .then(append);
  468. // }
  469. // }
  470. // return append()
  471. // .then(step);
  472. } else {
  473. activeTable = delKey(activeTable, op.key);
  474. next();
  475. }
  476. }
  477. return serialFsTask(_ops, step);
  478. // return step();
  479. })
  480. .then(() => _closeLog(_this))
  481. .then(
  482. _retry(() => {
  483. activeTable = table(activeTable);
  484. return _writeTable(_this, activeTable);
  485. }),
  486. );
  487. }
  488. return _lock(_this, mustLock, promise =>
  489. promise
  490. .then(() => _write())
  491. .catch(e =>
  492. _closeLog(_this).then(() => {
  493. throw e;
  494. }),
  495. )
  496. .then(() => {
  497. if (
  498. _sizeUsed(_this, activeTable) > _compactSize(_this, activeTable)
  499. ) {
  500. return _this.compact(false);
  501. }
  502. }),
  503. );
  504. }
  505. compact(mustLock) {
  506. const _this = this;
  507. return _this
  508. .read(mustLock)
  509. .then(map => {
  510. const ops = [];
  511. Object.keys(map).forEach(key => {
  512. ops.push({
  513. key,
  514. value: map[key],
  515. });
  516. });
  517. return ops;
  518. })
  519. .then(ops =>
  520. rimraf(`${_this.path}~`)
  521. .then(timeout100)
  522. .then(() => ops),
  523. )
  524. .then(ops => {
  525. const copy = new AppendSerializer({
  526. cacheDirPath: `${_this.path}~`,
  527. blockSize: _this.blockSize,
  528. logSize: _this.logSize,
  529. compactSizeThreshold: _this.compactSizeThreshold,
  530. compactMultiplierThreshold: _this.compactMultiplierThreshold,
  531. });
  532. return _lock(_this, mustLock, promise =>
  533. promise
  534. .then(() => copy.write(ops))
  535. .then(() => rimraf(_this.path))
  536. .then(timeout100)
  537. .then(_retry(() => rename(copy.path, _this.path), 10)),
  538. );
  539. });
  540. }
  541. }
  542. module.exports = AppendSerializer;