SerializerAppend2.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. const fs = require('graceful-fs');
  2. const { join, resolve } = require('path');
  3. const _mkdirp = require('mkdirp');
  4. const parseJson = require('parse-json');
  5. const _rimraf = require('rimraf');
  6. const promisify = require('./util/promisify');
  7. const close = promisify(fs.close);
  8. const mkdirp = promisify(_mkdirp);
  9. const open = promisify(fs.open);
  10. const read = promisify(fs.readFile);
  11. const readdir = promisify(fs.readdir);
  12. const readfd = promisify(fs.read);
  13. const rename = promisify(fs.rename);
  14. const rimraf = promisify(_rimraf);
  15. const write = promisify(fs.writeFile);
  16. const nextPow2 = n => {
  17. const exponent = Math.log(n) / Math.log(2);
  18. const nextExponent = Math.floor(exponent) + 1;
  19. return Math.pow(2, nextExponent);
  20. };
  21. const resizePow2 = (buffer, n) => {
  22. const tmpBuffer = Buffer.allocUnsafe(nextPow2(n));
  23. buffer.copy(tmpBuffer.slice(0, buffer.length));
  24. return tmpBuffer;
  25. };
  26. const MAX_CHUNK = 2 * 1024 * 1024;
  27. const TMP_CHUNK = 0.5 * 1024 * 1024;
  28. const MAX_CHUNK_PLUS = 2.5 * 1024 * 1024;
  29. const LARGE_CONTENT = 64 * 1024;
  30. let tmpBuffer = Buffer.allocUnsafe(TMP_CHUNK);
  31. let outBuffer = Buffer.allocUnsafe(MAX_CHUNK_PLUS);
  32. const _buffers = [];
  33. const alloc = size => {
  34. const buffer = _buffers.pop();
  35. if (buffer && buffer.length >= size) {
  36. return buffer;
  37. }
  38. return Buffer.allocUnsafe(size);
  39. };
  40. const drop = buffer => _buffers.push(buffer);
  41. class WriteOutput {
  42. constructor(length = 0, table = [], buffer = alloc(MAX_CHUNK_PLUS)) {
  43. this.length = length;
  44. this.table = table;
  45. this.buffer = buffer;
  46. }
  47. static clone(other) {
  48. return new WriteOutput(other.length, other.table, other.buffer);
  49. }
  50. take() {
  51. const output = WriteOutput.clone(this);
  52. this.length = 0;
  53. this.table = [];
  54. this.buffer = alloc(MAX_CHUNK_PLUS);
  55. return output;
  56. }
  57. add(key, content) {
  58. if (content !== null) {
  59. // Write content to a temporary buffer
  60. let length = tmpBuffer.utf8Write(content);
  61. while (length === tmpBuffer.length) {
  62. tmpBuffer = Buffer.allocUnsafe(tmpBuffer.length * 2);
  63. length = tmpBuffer.utf8Write(content);
  64. }
  65. const start = this.length;
  66. const end = start + length;
  67. // Ensure output buffer is long enough to add the new content
  68. if (end > this.buffer.length) {
  69. this.buffer = resizePow2(this.buffer, end);
  70. }
  71. // Copy temporary buffer to the end of the current output buffer
  72. tmpBuffer.copy(this.buffer.slice(start, end));
  73. this.table.push({
  74. name: key,
  75. start,
  76. end,
  77. });
  78. this.length = end;
  79. } else {
  80. this.table.push({
  81. name: key,
  82. start: -1,
  83. end: -1,
  84. });
  85. }
  86. }
  87. }
  88. class Semaphore {
  89. constructor(max) {
  90. this.max = max;
  91. this.count = 0;
  92. this.next = [];
  93. }
  94. async guard() {
  95. if (this.count < this.max) {
  96. this.count++;
  97. return new SemaphoreGuard(this);
  98. } else {
  99. return new Promise(resolve => {
  100. this.next.push(resolve);
  101. }).then(() => new SemaphoreGuard(this));
  102. }
  103. }
  104. }
  105. class SemaphoreGuard {
  106. constructor(parent) {
  107. this.parent = parent;
  108. }
  109. done() {
  110. const next = this.parent.next.shift();
  111. if (next) {
  112. next();
  113. } else {
  114. this.parent.count--;
  115. }
  116. }
  117. }
  118. class Append2 {
  119. constructor({ cacheDirPath: path, autoParse }) {
  120. this.path = path;
  121. this.autoParse = autoParse;
  122. this.inBuffer = Buffer.alloc(0);
  123. this._buffers = [];
  124. this.outBuffer = Buffer.alloc(0);
  125. }
  126. async _readFile(file) {
  127. const fd = await open(file, 'r+');
  128. let body = alloc(MAX_CHUNK_PLUS);
  129. await readfd(fd, body, 0, 4, null);
  130. const fullLength = body.readUInt32LE(0);
  131. if (fullLength > body.length) {
  132. drop(body);
  133. body = alloc(nextPow2(fullLength));
  134. }
  135. await readfd(fd, body, 0, fullLength, null);
  136. close(fd);
  137. const tableLength = body.readUInt32LE(0);
  138. const tableBody = body.utf8Slice(4, 4 + tableLength);
  139. const table = parseJson(tableBody);
  140. const content = body.slice(4 + tableLength);
  141. return [table, content, body];
  142. }
  143. async read() {
  144. const out = {};
  145. const size = { used: 0, total: 0 };
  146. const table = {};
  147. const order = {};
  148. await mkdirp(this.path);
  149. const items = await readdir(this.path);
  150. const logs = items.filter(item => /^log\d+$/.test(item));
  151. logs.sort();
  152. const reverseLogs = logs.reverse();
  153. const sema = new Semaphore(8);
  154. return Promise.all(
  155. reverseLogs.map(async (_file, index) => {
  156. const file = join(this.path, _file);
  157. const guard = await sema.guard();
  158. const [table, content, body] = await this._readFile(file);
  159. const keys = Object.keys(table);
  160. if (keys.length > 0) {
  161. size.total += table[keys.length - 1].end;
  162. }
  163. for (const entry of table) {
  164. if (
  165. typeof order[entry.name] === 'undefined' ||
  166. order[entry.name] > index
  167. ) {
  168. if (typeof order[entry.name] !== 'undefined') {
  169. size.used -= table[entry.name];
  170. }
  171. table[entry.name] = entry.end - entry.start;
  172. size.used += entry.end - entry.start;
  173. order[entry.name] = index;
  174. // Negative start positions are not set on the output. They are
  175. // treated as if they were deleted in a prior write. A future
  176. // compact will remove all instances of any old entries.
  177. if (entry.start >= 0) {
  178. await new Promise(process.nextTick);
  179. const data = content.utf8Slice(entry.start, entry.end);
  180. if (this.autoParse) {
  181. out[entry.name] = parseJson(data);
  182. } else {
  183. out[entry.name] = data;
  184. }
  185. } else {
  186. delete out[entry.name];
  187. }
  188. }
  189. }
  190. drop(body);
  191. guard.done();
  192. }),
  193. )
  194. .then(async () => {
  195. if (size.used / size.total < 0.6) {
  196. await this.compact(out);
  197. }
  198. })
  199. .then(() => out);
  200. }
  201. async _markLog() {
  202. const count = (await readdir(this.path)).filter(item =>
  203. /log\d+$/.test(item),
  204. ).length;
  205. const marker = Math.random()
  206. .toString(16)
  207. .substring(2)
  208. .padStart(13, '0');
  209. const logName = `log${count.toString().padStart(4, '0')}`;
  210. const file = resolve(this.path, logName);
  211. await write(file, marker);
  212. const writtenMarker = await read(file, 'utf8');
  213. if (marker === writtenMarker) {
  214. return file;
  215. }
  216. return null;
  217. }
  218. async _write(file, output) {
  219. // 4 bytes - full length
  220. // 4 bytes - length of table
  221. // x bytes - table
  222. // y bytes - content
  223. // Write table into a temporary buffer at position 8
  224. const content = JSON.stringify(output.table);
  225. let length = tmpBuffer.utf8Write(content, 8);
  226. // Make the temporary buffer longer if the space used is the same as the
  227. // length
  228. while (8 + length === tmpBuffer.length) {
  229. tmpBuffer = Buffer.allocUnsafe(nextPow2(8 + length));
  230. // Write again to see if the length is more due to the last buffer being
  231. // too short.
  232. length = tmpBuffer.utf8Write(content, 8);
  233. }
  234. // Ensure the buffer is long enough to fit the table and content.
  235. const end = 8 + length + output.length;
  236. if (end > tmpBuffer.length) {
  237. tmpBuffer = resizePow2(tmpBuffer, end);
  238. }
  239. // Copy the output after the table.
  240. output.buffer.copy(tmpBuffer.slice(8 + length, end));
  241. // Full length after this uint.
  242. tmpBuffer.writeUInt32LE(end - 4, 0);
  243. // Length of table after this uint.
  244. tmpBuffer.writeUInt32LE(length, 4);
  245. if (end > output.buffer.length) {
  246. output.buffer = alloc(nextPow2(end));
  247. }
  248. tmpBuffer.copy(output.buffer.slice(0, end));
  249. await write(file, output.buffer.slice(0, end));
  250. drop(output.buffer);
  251. }
  252. async _markAndWrite(output) {
  253. const file = await this._markLog();
  254. if (file !== null) {
  255. await this._write(file, output.take());
  256. }
  257. }
  258. // Write out a log chunk once the file reaches the maximum chunk size.
  259. async _writeAtMax(output) {
  260. while (output.length >= MAX_CHUNK) {
  261. await this._markAndWrite(output);
  262. }
  263. }
  264. // Write out a log chunk if their is any entries in the table.
  265. async _writeAtAny(output) {
  266. while (output.table.length > 0) {
  267. await this._markAndWrite(output);
  268. }
  269. }
  270. async write(ops) {
  271. let smallOutput = new WriteOutput();
  272. let largeOutput = new WriteOutput();
  273. const outputPromises = [];
  274. await mkdirp(this.path);
  275. for (const op of ops) {
  276. if (op.value !== null) {
  277. let content = op.value;
  278. if (typeof content !== 'string') {
  279. content = JSON.stringify(content);
  280. }
  281. if (content.length < LARGE_CONTENT) {
  282. smallOutput.add(op.key, content);
  283. await this._writeAtMax(smallOutput);
  284. } else {
  285. largeOutput.add(op.key, content);
  286. await this._writeAtMax(largeOutput);
  287. }
  288. } else {
  289. smallOutput.add(op.key, null);
  290. await this._writeAtMax(smallOutput);
  291. }
  292. }
  293. await this._writeAtAny(smallOutput);
  294. await this._writeAtAny(largeOutput);
  295. await Promise.all(outputPromises);
  296. }
  297. async sizes() {
  298. const size = {
  299. used: 0,
  300. total: 0,
  301. };
  302. const table = {};
  303. const order = {};
  304. await mkdirp(this.path);
  305. const items = await readdir(this.path);
  306. const logs = items.filter(item => /^log\d+$/.test(item));
  307. logs.sort();
  308. const reverseLogs = logs.reverse();
  309. const sema = new Semaphore(8);
  310. return Promise.all(
  311. reverseLogs.map(async (_file, index) => {
  312. const file = join(this.path, _file);
  313. const guard = await sema.guard();
  314. const [table, content, body] = await this._readFile(file);
  315. size.total += content.length;
  316. for (const entry of table) {
  317. if (
  318. typeof order[entry.name] === 'undefined' ||
  319. order[entry.name] > index
  320. ) {
  321. if (typeof order[entry.name] !== 'undefined') {
  322. size.used -= table[entry.name];
  323. }
  324. table[entry.name] = entry.end - entry.start;
  325. size.used += entry.end - entry.start;
  326. order[entry.name] = index;
  327. }
  328. }
  329. drop(body);
  330. guard.done();
  331. }),
  332. ).then(() => size);
  333. }
  334. async compact(_obj = this.read()) {
  335. const obj = await _obj;
  336. const ops = [];
  337. for (const key in obj) {
  338. ops.push({
  339. key,
  340. value: obj[key],
  341. });
  342. }
  343. const truePath = this.path;
  344. this.path += '~';
  345. await this.write(ops);
  346. this.path = truePath;
  347. await rimraf(this.path);
  348. await rename(`${this.path}~`, this.path);
  349. }
  350. }
  351. module.exports = Append2;