123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602 |
- const fs = require('graceful-fs');
- const join = require('path').join;
- const Readable = require('stream').Readable;
- const _mkdirp = require('mkdirp');
- const _rimraf = require('rimraf');
- const writeJsonFile = require('write-json-file');
- const entries = require('./util/Object.entries');
- const values = require('./util/Object.values');
- const promisify = require('./util/promisify');
- const rimraf = promisify(_rimraf);
- const open = promisify(fs.open);
- const close = promisify(fs.close);
- const read = promisify(fs.read);
- const readFile = promisify(fs.readFile);
- const write = promisify(fs.write);
- const rename = promisify(fs.rename);
- const unlink = promisify(fs.unlink);
- const stat = promisify(fs.stat);
- const mkdirp = promisify(_mkdirp);
- const APPEND_VERSION = 1;
- const _blockSize = 4 * 1024;
- const _logSize = 2 * 1024 * 1024;
- const _minCompactSize = 512 * 1024;
- const _compactMultiplierThreshold = 1.5;
- const value = (key, size, start) => ({
- key,
- size: size || 0,
- start: start || 0,
- });
- const objFrom = map => {
- if (map instanceof Map) {
- const obj = {};
- map.forEach((value, key) => {
- obj[key] = value;
- });
- return obj;
- }
- return map;
- };
- const table = ({ nextByte, blockSize, logSize, map }) => ({
- version: APPEND_VERSION,
- nextByte: nextByte,
- blockSize: blockSize,
- logSize: logSize,
- map: objFrom(map),
- });
- const modTable = ({ nextByte, blockSize, logSize, map }) => ({
- version: APPEND_VERSION,
- nextByte: nextByte,
- blockSize: blockSize,
- logSize: logSize,
- map: new Map(entries(map)),
- });
- function putKey(_table, key, size) {
- // _table.map[key] = value(key, size, _table.nextByte, Math.ceil(size / _table.blockSize));
- _table.map.set(key, value(key, size, _table.nextByte));
- _table.nextByte = _table.nextByte + size;
- return _table;
- }
- function delKey(_table, key) {
- // if (_table.map[key]) {
- // delete _table.map[key];
- if (_table.map.get(key)) {
- _table.map.delete(key);
- }
- return _table;
- }
- const _tablepath = ({ path }) => join(path, 'table.json');
- const _defaultTable = ({ blockSize, logSize }) =>
- table({
- nextByte: 0,
- blockSize: blockSize || _blockSize,
- logSize: logSize || _logSize,
- map: {},
- });
- const timeout100 = () => new Promise(resolve => setTimeout(resolve, 100));
- const _retry = (fn, n) => {
- n = n || 5;
- const _retryFn = value => {
- if (n) {
- n--;
- return fn(value).catch(_retryFn);
- }
- return fn(value);
- };
- return _retryFn;
- };
- const _readTable = _this =>
- readFile(_tablepath(_this), 'utf8')
- .catch(e => JSON.stringify(_defaultTable(_this)))
- .then(JSON.parse)
- .then(_table => {
- if (_table.version !== APPEND_VERSION) {
- return _defaultTable(_this);
- }
- return _table;
- });
- const _writeTable = (_this, _table) => writeJsonFile(_tablepath(_this), _table);
- const _logFilepath = ({ path }, { logSize }, index) => {
- let logId = ((index / logSize) | 0).toString();
- while (logId.length < 4) {
- logId = `0${logId}`;
- }
- return join(path, `log${logId}`);
- };
- const _openLog = (_this, mode, _table, index) => {
- if (_this._fd !== null) {
- return Promise.resolve();
- } else {
- // If mode is 'a', stat the log to write to, if it should be empty and
- // isn't, unlink before opening.
- return Promise.resolve()
- .then(() => {
- if (mode === 'a' && index % _table.logSize === 0) {
- return stat(_logFilepath(_this, _table, index))
- .then(({ size }) => {
- if (size > 0) {
- return unlink(_logFilepath(_this, _table, index)).then(
- timeout100,
- );
- }
- })
- .catch(() => {});
- }
- })
- .then(() => open(_logFilepath(_this, _table, index), mode))
- .then(fd => {
- _this._fd = fd;
- if (mode === 'a') {
- _this._writeBuffer = new Buffer(_table.logSize);
- _this._writeOffset = 0;
- }
- })
- .catch(e => {
- throw e;
- });
- }
- };
- const _closeLog = _this => {
- if (_this._fd === null) {
- return Promise.resolve();
- } else {
- return Promise.resolve()
- .then(() => {
- if (_this._writeBuffer) {
- return write(_this._fd, _this._writeBuffer, 0, _this._writeOffset);
- }
- })
- .then(() => close(_this._fd))
- .then(() => {
- _this._fd = null;
- _this._writeBuffer = null;
- _this._writeOffset = 0;
- });
- }
- };
- const _readBufferSize = (_this, { blockSize, logSize }) =>
- Math.min(32 * blockSize, logSize);
- const _readLog = (_this, _table) => {
- let index = 0;
- const out = new Readable({
- read() {},
- });
- const rbSize = _table.logSize;
- const _readBuffer = new Buffer(rbSize);
- function _log() {
- if (index >= _table.nextByte) {
- out.push(null);
- return _closeLog(_this);
- }
- const offset = 0;
- function step() {
- if (!_this._fd) {
- index = _table.nextByte;
- return _log();
- }
- return read(_this._fd, _readBuffer, 0, rbSize, 0).then(read => {
- index += _table.logSize;
- out.push(_readBuffer);
- return _log();
- });
- }
- return _closeLog(_this)
- .then(() => _openLog(_this, 'r', _table, index))
- .then(step);
- }
- Promise.resolve().then(_log);
- return out;
- };
- const _appendBlock = (_this, _table, blockContent, index, next) => {
- let prep;
- if (_this._fd !== null && index % _table.logSize === 0) {
- prep = _closeLog(_this).then(() => _openLog(_this, 'a', _table, index));
- } else if (_this._fd === null) {
- prep = _openLog(_this, 'a', _table, index);
- }
- function work() {
- if (!_this._fd) {
- return next(new Error());
- }
- if (blockContent.length > _table.logSize) {
- return next(new Error('block longer than max size'));
- }
- const writeSlice = _this._writeBuffer.slice(
- _this._writeOffset,
- _this._writeOffset + blockContent.length,
- );
- // if (blockContent.length < _table.blockSize) {
- // writeSlice.fill(0);
- // }
- blockContent.copy(writeSlice);
- _this._writeOffset += blockContent.length;
- if (_this._writeOffset > _this._writeBuffer.length) {
- return next(
- new Error(
- `writeOffset ${_this._writeOffset} past writeBuffer length ${
- _this._writeBuffer.length
- }`,
- ),
- );
- }
- if (_this._writeOffset > _table.logSize) {
- return next(
- new Error(
- `writeOffset ${_this._writeOffset} past logSize ${_table.logSize}`,
- ),
- );
- }
- next();
- // return fs.write(_this._fd, blockContent, 0, _table.blockSize, next);
- }
- if (prep) {
- prep.then(work);
- } else {
- work();
- }
- // return Promise.resolve()
- // .then(function() {
- // if (index % (_table.logSize / _table.blockSize) === 0) {
- // return _closeLog(_this);
- // }
- // })
- // .then(function() {
- // return _openLog(_this, 'a', _table, index);
- // })
- // .then(function() {
- // if (!_this._fd) {
- // throw new Error();
- // }
- // if (blockContent.length > _table.blockSize) {
- // throw new Error('block longer than max size');
- // }
- // if (blockContent.length < _table.blockSize) {
- // var _blockContent = new Buffer(_table.blockSize);
- // blockContent.copy(_blockContent);
- // blockContent = _blockContent;
- // }
- // return write(_this._fd, blockContent, 0, _table.blockSize);
- // });
- };
- const _sizeNeeded = (_this, { map }) =>
- values(map).reduce((carry, { size }) => carry + size, 0);
- const _sizeUsed = (_this, { nextByte }) => nextByte;
- const _compactSize = (_this, _table) =>
- Math.max(
- _this.compactSizeThreshold,
- _sizeNeeded(_this, _table) * _this.compactMultiplierThreshold,
- );
- const _lock = (_this, mustLock, promiseFn) => {
- if (mustLock !== false) {
- return (_this.lock = promiseFn(_this.lock));
- }
- return promiseFn(Promise.resolve());
- };
- const serialFsTask = (array, each) =>
- new Promise((resolve, reject) => {
- let queue = 0;
- let index = 0;
- let inNext = false;
- function next(err) {
- if (err) {
- return reject(err);
- }
- if (index === array.length) {
- return resolve();
- }
- queue++;
- if (inNext) {
- return;
- }
- inNext = true;
- while (queue > index && index < array.length) {
- try {
- each(array[index++], next);
- } catch (e) {
- return next(e);
- }
- }
- inNext = false;
- }
- next();
- });
- class AppendSerializer {
- constructor(options) {
- this.path = options.cacheDirPath;
- this.autoParse = options.autoParse;
- this.blockSize = options.blockSize || _blockSize;
- this.logSize = options.logSize || _logSize;
- this.compactSizeThreshold = options.compactSizeThreshold || _minCompactSize;
- this.compactMultiplierThreshold =
- options.compactMultiplierThreshold || _compactMultiplierThreshold;
- this.lock = Promise.resolve();
- this._fd = null;
- }
- read(mustLock) {
- const start = Date.now();
- const _this = this;
- function _read() {
- let activeTable;
- return Promise.resolve()
- .then(_retry(() => _readTable(_this)))
- .then(_table => {
- activeTable = _table;
- })
- .then(() => {
- const map = new Map();
- const valueStarts = [];
- values(activeTable.map).forEach(value => {
- valueStarts.push({
- start: value.start,
- end: value.start + value.size,
- value,
- });
- });
- valueStarts.sort((a, b) => a.start - b.start);
- return new Promise((resolve, reject) => {
- let valueIndex = 0;
- let destBuffer = new Buffer(2 * 1024 * 1024);
- let offset = 0;
- let logOffset = 0;
- const log = _readLog(_this, activeTable);
- log.on('data', data => {
- if (valueIndex >= valueStarts.length) {
- return;
- }
- for (let bufferIndex = 0; bufferIndex < data.length; ) {
- if (bufferIndex + logOffset >= valueStarts[valueIndex].end) {
- valueIndex++;
- }
- if (valueIndex >= valueStarts.length) {
- return;
- }
- const value = valueStarts[valueIndex].value;
- if (bufferIndex + logOffset >= value.start) {
- if (value.size > destBuffer.length) {
- const newLength = Math.pow(
- 2,
- Math.ceil(Math.log(value.size) / Math.log(2)),
- );
- destBuffer = new Buffer(newLength);
- }
- const readAmount = Math.min(
- value.start + value.size - logOffset - bufferIndex,
- activeTable.logSize - bufferIndex,
- );
- data
- .slice(bufferIndex, bufferIndex + readAmount)
- .copy(destBuffer.slice(offset, offset + readAmount));
- bufferIndex += readAmount;
- offset += readAmount;
- if (offset >= value.size) {
- offset = 0;
- if (_this.autoParse) {
- // console.log(value.size, destBuffer.utf8Slice(0, value.size))
- map.set(
- value.key,
- JSON.parse(destBuffer.utf8Slice(0, value.size)),
- );
- } else {
- map.set(value.key, destBuffer.utf8Slice(0, value.size));
- }
- }
- } else if (bufferIndex + logOffset < value.start) {
- bufferIndex += value.start - (bufferIndex + logOffset);
- }
- }
- logOffset += activeTable.logSize;
- });
- log.on('end', resolve);
- log.on('error', reject);
- }).then(() => objFrom(map));
- });
- }
- return _lock(_this, mustLock, promise =>
- promise
- .then(() => _read())
- .catch(e =>
- _closeLog(_this).then(() => {
- throw e;
- }),
- ),
- );
- }
- write(ops, mustLock) {
- if (ops.length === 0) {
- return Promise.resolve();
- }
- const steps = 0;
- const _this = this;
- let activeTable;
- let contentBuffer;
- let contentLength;
- function _write() {
- return Promise.resolve()
- .then(_retry(() => mkdirp(_this.path)))
- .then(_retry(() => _readTable(_this)))
- .then(_table => {
- activeTable = modTable(_table);
- const _ops = ops.slice();
- function step(op, next) {
- // steps++;
- // var op = _ops.shift();
- // if (!op) {
- // return;
- // }
- let content = op.value;
- if (content !== null) {
- if (typeof content !== 'string') {
- content = JSON.stringify(content);
- }
- if (
- Buffer.byteLength &&
- contentBuffer &&
- Buffer.byteLength(content) <= contentBuffer.length
- ) {
- contentLength = contentBuffer.utf8Write(content);
- } else {
- contentBuffer = new Buffer(content);
- contentLength = contentBuffer.length;
- }
- const blockCount = Math.ceil(
- ((activeTable.nextByte % activeTable.logSize) + contentLength) /
- activeTable.logSize,
- );
- let nextByte = activeTable.nextByte;
- activeTable = putKey(activeTable, op.key, contentLength);
- let bufferIndex = 0;
- const bulk = Array.from(new Array(blockCount)).map((_, i) => i);
- return serialFsTask(bulk, (_, next) => {
- const blockSlice = contentBuffer.slice(
- bufferIndex,
- Math.min(
- bufferIndex +
- (activeTable.logSize - (nextByte % activeTable.logSize)),
- contentLength,
- ),
- );
- _appendBlock(_this, activeTable, blockSlice, nextByte, next);
- bufferIndex += blockSlice.length;
- nextByte += blockSlice.length;
- }).then(next);
- // function append() {
- // if (bufferIndex < contentBuffer.length) {
- // var blockSlice = contentBuffer.slice(bufferIndex, bufferIndex + activeTable.blockSize);
- // bufferIndex += activeTable.blockSize;
- // return _appendBlock(_this, activeTable, blockSlice, nextByte++)
- // .then(append);
- // }
- // }
- // return append()
- // .then(step);
- } else {
- activeTable = delKey(activeTable, op.key);
- next();
- }
- }
- return serialFsTask(_ops, step);
- // return step();
- })
- .then(() => _closeLog(_this))
- .then(
- _retry(() => {
- activeTable = table(activeTable);
- return _writeTable(_this, activeTable);
- }),
- );
- }
- return _lock(_this, mustLock, promise =>
- promise
- .then(() => _write())
- .catch(e =>
- _closeLog(_this).then(() => {
- throw e;
- }),
- )
- .then(() => {
- if (
- _sizeUsed(_this, activeTable) > _compactSize(_this, activeTable)
- ) {
- return _this.compact(false);
- }
- }),
- );
- }
- compact(mustLock) {
- const _this = this;
- return _this
- .read(mustLock)
- .then(map => {
- const ops = [];
- Object.keys(map).forEach(key => {
- ops.push({
- key,
- value: map[key],
- });
- });
- return ops;
- })
- .then(ops =>
- rimraf(`${_this.path}~`)
- .then(timeout100)
- .then(() => ops),
- )
- .then(ops => {
- const copy = new AppendSerializer({
- cacheDirPath: `${_this.path}~`,
- blockSize: _this.blockSize,
- logSize: _this.logSize,
- compactSizeThreshold: _this.compactSizeThreshold,
- compactMultiplierThreshold: _this.compactMultiplierThreshold,
- });
- return _lock(_this, mustLock, promise =>
- promise
- .then(() => copy.write(ops))
- .then(() => rimraf(_this.path))
- .then(timeout100)
- .then(_retry(() => rename(copy.path, _this.path), 10)),
- );
- });
- }
- }
- module.exports = AppendSerializer;
|