const fs = require('graceful-fs'); const { join, resolve } = require('path'); const _mkdirp = require('mkdirp'); const parseJson = require('parse-json'); const _rimraf = require('rimraf'); const promisify = require('./util/promisify'); const close = promisify(fs.close); const mkdirp = promisify(_mkdirp); const open = promisify(fs.open); const read = promisify(fs.readFile); const readdir = promisify(fs.readdir); const readfd = promisify(fs.read); const rename = promisify(fs.rename); const rimraf = promisify(_rimraf); const write = promisify(fs.writeFile); const nextPow2 = n => { const exponent = Math.log(n) / Math.log(2); const nextExponent = Math.floor(exponent) + 1; return Math.pow(2, nextExponent); }; const resizePow2 = (buffer, n) => { const tmpBuffer = Buffer.allocUnsafe(nextPow2(n)); buffer.copy(tmpBuffer.slice(0, buffer.length)); return tmpBuffer; }; const MAX_CHUNK = 2 * 1024 * 1024; const TMP_CHUNK = 0.5 * 1024 * 1024; const MAX_CHUNK_PLUS = 2.5 * 1024 * 1024; const LARGE_CONTENT = 64 * 1024; let tmpBuffer = Buffer.allocUnsafe(TMP_CHUNK); let outBuffer = Buffer.allocUnsafe(MAX_CHUNK_PLUS); const _buffers = []; const alloc = size => { const buffer = _buffers.pop(); if (buffer && buffer.length >= size) { return buffer; } return Buffer.allocUnsafe(size); }; const drop = buffer => _buffers.push(buffer); class WriteOutput { constructor(length = 0, table = [], buffer = alloc(MAX_CHUNK_PLUS)) { this.length = length; this.table = table; this.buffer = buffer; } static clone(other) { return new WriteOutput(other.length, other.table, other.buffer); } take() { const output = WriteOutput.clone(this); this.length = 0; this.table = []; this.buffer = alloc(MAX_CHUNK_PLUS); return output; } add(key, content) { if (content !== null) { // Write content to a temporary buffer let length = tmpBuffer.utf8Write(content); while (length === tmpBuffer.length) { tmpBuffer = Buffer.allocUnsafe(tmpBuffer.length * 2); length = tmpBuffer.utf8Write(content); } const start = this.length; const end = start + length; // Ensure output buffer is long enough to add the new content if (end > this.buffer.length) { this.buffer = resizePow2(this.buffer, end); } // Copy temporary buffer to the end of the current output buffer tmpBuffer.copy(this.buffer.slice(start, end)); this.table.push({ name: key, start, end, }); this.length = end; } else { this.table.push({ name: key, start: -1, end: -1, }); } } } class Semaphore { constructor(max) { this.max = max; this.count = 0; this.next = []; } async guard() { if (this.count < this.max) { this.count++; return new SemaphoreGuard(this); } else { return new Promise(resolve => { this.next.push(resolve); }).then(() => new SemaphoreGuard(this)); } } } class SemaphoreGuard { constructor(parent) { this.parent = parent; } done() { const next = this.parent.next.shift(); if (next) { next(); } else { this.parent.count--; } } } class Append2 { constructor({ cacheDirPath: path, autoParse }) { this.path = path; this.autoParse = autoParse; this.inBuffer = Buffer.alloc(0); this._buffers = []; this.outBuffer = Buffer.alloc(0); } async _readFile(file) { const fd = await open(file, 'r+'); let body = alloc(MAX_CHUNK_PLUS); await readfd(fd, body, 0, 4, null); const fullLength = body.readUInt32LE(0); if (fullLength > body.length) { drop(body); body = alloc(nextPow2(fullLength)); } await readfd(fd, body, 0, fullLength, null); close(fd); const tableLength = body.readUInt32LE(0); const tableBody = body.utf8Slice(4, 4 + tableLength); const table = parseJson(tableBody); const content = body.slice(4 + tableLength); return [table, content, body]; } async read() { const out = {}; const size = { used: 0, total: 0 }; const table = {}; const order = {}; await mkdirp(this.path); const items = await readdir(this.path); const logs = items.filter(item => /^log\d+$/.test(item)); logs.sort(); const reverseLogs = logs.reverse(); const sema = new Semaphore(8); return Promise.all( reverseLogs.map(async (_file, index) => { const file = join(this.path, _file); const guard = await sema.guard(); const [table, content, body] = await this._readFile(file); const keys = Object.keys(table); if (keys.length > 0) { size.total += table[keys.length - 1].end; } for (const entry of table) { if ( typeof order[entry.name] === 'undefined' || order[entry.name] > index ) { if (typeof order[entry.name] !== 'undefined') { size.used -= table[entry.name]; } table[entry.name] = entry.end - entry.start; size.used += entry.end - entry.start; order[entry.name] = index; // Negative start positions are not set on the output. They are // treated as if they were deleted in a prior write. A future // compact will remove all instances of any old entries. if (entry.start >= 0) { await new Promise(process.nextTick); const data = content.utf8Slice(entry.start, entry.end); if (this.autoParse) { out[entry.name] = parseJson(data); } else { out[entry.name] = data; } } else { delete out[entry.name]; } } } drop(body); guard.done(); }), ) .then(async () => { if (size.used / size.total < 0.6) { await this.compact(out); } }) .then(() => out); } async _markLog() { const count = (await readdir(this.path)).filter(item => /log\d+$/.test(item), ).length; const marker = Math.random() .toString(16) .substring(2) .padStart(13, '0'); const logName = `log${count.toString().padStart(4, '0')}`; const file = resolve(this.path, logName); await write(file, marker); const writtenMarker = await read(file, 'utf8'); if (marker === writtenMarker) { return file; } return null; } async _write(file, output) { // 4 bytes - full length // 4 bytes - length of table // x bytes - table // y bytes - content // Write table into a temporary buffer at position 8 const content = JSON.stringify(output.table); let length = tmpBuffer.utf8Write(content, 8); // Make the temporary buffer longer if the space used is the same as the // length while (8 + length === tmpBuffer.length) { tmpBuffer = Buffer.allocUnsafe(nextPow2(8 + length)); // Write again to see if the length is more due to the last buffer being // too short. length = tmpBuffer.utf8Write(content, 8); } // Ensure the buffer is long enough to fit the table and content. const end = 8 + length + output.length; if (end > tmpBuffer.length) { tmpBuffer = resizePow2(tmpBuffer, end); } // Copy the output after the table. output.buffer.copy(tmpBuffer.slice(8 + length, end)); // Full length after this uint. tmpBuffer.writeUInt32LE(end - 4, 0); // Length of table after this uint. tmpBuffer.writeUInt32LE(length, 4); if (end > output.buffer.length) { output.buffer = alloc(nextPow2(end)); } tmpBuffer.copy(output.buffer.slice(0, end)); await write(file, output.buffer.slice(0, end)); drop(output.buffer); } async _markAndWrite(output) { const file = await this._markLog(); if (file !== null) { await this._write(file, output.take()); } } // Write out a log chunk once the file reaches the maximum chunk size. async _writeAtMax(output) { while (output.length >= MAX_CHUNK) { await this._markAndWrite(output); } } // Write out a log chunk if their is any entries in the table. async _writeAtAny(output) { while (output.table.length > 0) { await this._markAndWrite(output); } } async write(ops) { let smallOutput = new WriteOutput(); let largeOutput = new WriteOutput(); const outputPromises = []; await mkdirp(this.path); for (const op of ops) { if (op.value !== null) { let content = op.value; if (typeof content !== 'string') { content = JSON.stringify(content); } if (content.length < LARGE_CONTENT) { smallOutput.add(op.key, content); await this._writeAtMax(smallOutput); } else { largeOutput.add(op.key, content); await this._writeAtMax(largeOutput); } } else { smallOutput.add(op.key, null); await this._writeAtMax(smallOutput); } } await this._writeAtAny(smallOutput); await this._writeAtAny(largeOutput); await Promise.all(outputPromises); } async sizes() { const size = { used: 0, total: 0, }; const table = {}; const order = {}; await mkdirp(this.path); const items = await readdir(this.path); const logs = items.filter(item => /^log\d+$/.test(item)); logs.sort(); const reverseLogs = logs.reverse(); const sema = new Semaphore(8); return Promise.all( reverseLogs.map(async (_file, index) => { const file = join(this.path, _file); const guard = await sema.guard(); const [table, content, body] = await this._readFile(file); size.total += content.length; for (const entry of table) { if ( typeof order[entry.name] === 'undefined' || order[entry.name] > index ) { if (typeof order[entry.name] !== 'undefined') { size.used -= table[entry.name]; } table[entry.name] = entry.end - entry.start; size.used += entry.end - entry.start; order[entry.name] = index; } } drop(body); guard.done(); }), ).then(() => size); } async compact(_obj = this.read()) { const obj = await _obj; const ops = []; for (const key in obj) { ops.push({ key, value: obj[key], }); } const truePath = this.path; this.path += '~'; await this.write(ops); this.path = truePath; await rimraf(this.path); await rename(`${this.path}~`, this.path); } } module.exports = Append2;