const fs = require('graceful-fs');
|
const join = require('path').join;
|
const Readable = require('stream').Readable;
|
|
const _mkdirp = require('mkdirp');
|
const _rimraf = require('rimraf');
|
const writeJsonFile = require('write-json-file');
|
|
const entries = require('./util/Object.entries');
|
const values = require('./util/Object.values');
|
const promisify = require('./util/promisify');
|
|
const rimraf = promisify(_rimraf);
|
const open = promisify(fs.open);
|
const close = promisify(fs.close);
|
const read = promisify(fs.read);
|
const readFile = promisify(fs.readFile);
|
const write = promisify(fs.write);
|
const rename = promisify(fs.rename);
|
const unlink = promisify(fs.unlink);
|
const stat = promisify(fs.stat);
|
const mkdirp = promisify(_mkdirp);
|
|
const APPEND_VERSION = 1;
|
|
const _blockSize = 4 * 1024;
|
const _logSize = 2 * 1024 * 1024;
|
const _minCompactSize = 512 * 1024;
|
const _compactMultiplierThreshold = 1.5;
|
|
const value = (key, size, start) => ({
|
key,
|
size: size || 0,
|
start: start || 0,
|
});
|
|
const objFrom = map => {
|
if (map instanceof Map) {
|
const obj = {};
|
map.forEach((value, key) => {
|
obj[key] = value;
|
});
|
return obj;
|
}
|
return map;
|
};
|
|
const table = ({ nextByte, blockSize, logSize, map }) => ({
|
version: APPEND_VERSION,
|
nextByte: nextByte,
|
blockSize: blockSize,
|
logSize: logSize,
|
map: objFrom(map),
|
});
|
|
const modTable = ({ nextByte, blockSize, logSize, map }) => ({
|
version: APPEND_VERSION,
|
nextByte: nextByte,
|
blockSize: blockSize,
|
logSize: logSize,
|
map: new Map(entries(map)),
|
});
|
|
function putKey(_table, key, size) {
|
// _table.map[key] = value(key, size, _table.nextByte, Math.ceil(size / _table.blockSize));
|
_table.map.set(key, value(key, size, _table.nextByte));
|
_table.nextByte = _table.nextByte + size;
|
return _table;
|
}
|
|
function delKey(_table, key) {
|
// if (_table.map[key]) {
|
// delete _table.map[key];
|
if (_table.map.get(key)) {
|
_table.map.delete(key);
|
}
|
return _table;
|
}
|
|
const _tablepath = ({ path }) => join(path, 'table.json');
|
|
const _defaultTable = ({ blockSize, logSize }) =>
|
table({
|
nextByte: 0,
|
blockSize: blockSize || _blockSize,
|
logSize: logSize || _logSize,
|
map: {},
|
});
|
|
const timeout100 = () => new Promise(resolve => setTimeout(resolve, 100));
|
|
const _retry = (fn, n) => {
|
n = n || 5;
|
const _retryFn = value => {
|
if (n) {
|
n--;
|
return fn(value).catch(_retryFn);
|
}
|
return fn(value);
|
};
|
return _retryFn;
|
};
|
|
const _readTable = _this =>
|
readFile(_tablepath(_this), 'utf8')
|
.catch(e => JSON.stringify(_defaultTable(_this)))
|
.then(JSON.parse)
|
.then(_table => {
|
if (_table.version !== APPEND_VERSION) {
|
return _defaultTable(_this);
|
}
|
return _table;
|
});
|
|
const _writeTable = (_this, _table) => writeJsonFile(_tablepath(_this), _table);
|
|
const _logFilepath = ({ path }, { logSize }, index) => {
|
let logId = ((index / logSize) | 0).toString();
|
while (logId.length < 4) {
|
logId = `0${logId}`;
|
}
|
return join(path, `log${logId}`);
|
};
|
|
const _openLog = (_this, mode, _table, index) => {
|
if (_this._fd !== null) {
|
return Promise.resolve();
|
} else {
|
// If mode is 'a', stat the log to write to, if it should be empty and
|
// isn't, unlink before opening.
|
return Promise.resolve()
|
.then(() => {
|
if (mode === 'a' && index % _table.logSize === 0) {
|
return stat(_logFilepath(_this, _table, index))
|
.then(({ size }) => {
|
if (size > 0) {
|
return unlink(_logFilepath(_this, _table, index)).then(
|
timeout100,
|
);
|
}
|
})
|
.catch(() => {});
|
}
|
})
|
.then(() => open(_logFilepath(_this, _table, index), mode))
|
.then(fd => {
|
_this._fd = fd;
|
if (mode === 'a') {
|
_this._writeBuffer = new Buffer(_table.logSize);
|
_this._writeOffset = 0;
|
}
|
})
|
.catch(e => {
|
throw e;
|
});
|
}
|
};
|
|
const _closeLog = _this => {
|
if (_this._fd === null) {
|
return Promise.resolve();
|
} else {
|
return Promise.resolve()
|
.then(() => {
|
if (_this._writeBuffer) {
|
return write(_this._fd, _this._writeBuffer, 0, _this._writeOffset);
|
}
|
})
|
.then(() => close(_this._fd))
|
.then(() => {
|
_this._fd = null;
|
_this._writeBuffer = null;
|
_this._writeOffset = 0;
|
});
|
}
|
};
|
|
const _readBufferSize = (_this, { blockSize, logSize }) =>
|
Math.min(32 * blockSize, logSize);
|
|
const _readLog = (_this, _table) => {
|
let index = 0;
|
const out = new Readable({
|
read() {},
|
});
|
|
const rbSize = _table.logSize;
|
const _readBuffer = new Buffer(rbSize);
|
|
function _log() {
|
if (index >= _table.nextByte) {
|
out.push(null);
|
return _closeLog(_this);
|
}
|
|
const offset = 0;
|
function step() {
|
if (!_this._fd) {
|
index = _table.nextByte;
|
return _log();
|
}
|
|
return read(_this._fd, _readBuffer, 0, rbSize, 0).then(read => {
|
index += _table.logSize;
|
out.push(_readBuffer);
|
return _log();
|
});
|
}
|
|
return _closeLog(_this)
|
.then(() => _openLog(_this, 'r', _table, index))
|
.then(step);
|
}
|
Promise.resolve().then(_log);
|
|
return out;
|
};
|
|
const _appendBlock = (_this, _table, blockContent, index, next) => {
|
let prep;
|
if (_this._fd !== null && index % _table.logSize === 0) {
|
prep = _closeLog(_this).then(() => _openLog(_this, 'a', _table, index));
|
} else if (_this._fd === null) {
|
prep = _openLog(_this, 'a', _table, index);
|
}
|
function work() {
|
if (!_this._fd) {
|
return next(new Error());
|
}
|
if (blockContent.length > _table.logSize) {
|
return next(new Error('block longer than max size'));
|
}
|
const writeSlice = _this._writeBuffer.slice(
|
_this._writeOffset,
|
_this._writeOffset + blockContent.length,
|
);
|
// if (blockContent.length < _table.blockSize) {
|
// writeSlice.fill(0);
|
// }
|
blockContent.copy(writeSlice);
|
_this._writeOffset += blockContent.length;
|
if (_this._writeOffset > _this._writeBuffer.length) {
|
return next(
|
new Error(
|
`writeOffset ${_this._writeOffset} past writeBuffer length ${
|
_this._writeBuffer.length
|
}`,
|
),
|
);
|
}
|
if (_this._writeOffset > _table.logSize) {
|
return next(
|
new Error(
|
`writeOffset ${_this._writeOffset} past logSize ${_table.logSize}`,
|
),
|
);
|
}
|
next();
|
// return fs.write(_this._fd, blockContent, 0, _table.blockSize, next);
|
}
|
if (prep) {
|
prep.then(work);
|
} else {
|
work();
|
}
|
|
// return Promise.resolve()
|
// .then(function() {
|
// if (index % (_table.logSize / _table.blockSize) === 0) {
|
// return _closeLog(_this);
|
// }
|
// })
|
// .then(function() {
|
// return _openLog(_this, 'a', _table, index);
|
// })
|
// .then(function() {
|
// if (!_this._fd) {
|
// throw new Error();
|
// }
|
// if (blockContent.length > _table.blockSize) {
|
// throw new Error('block longer than max size');
|
// }
|
// if (blockContent.length < _table.blockSize) {
|
// var _blockContent = new Buffer(_table.blockSize);
|
// blockContent.copy(_blockContent);
|
// blockContent = _blockContent;
|
// }
|
// return write(_this._fd, blockContent, 0, _table.blockSize);
|
// });
|
};
|
|
const _sizeNeeded = (_this, { map }) =>
|
values(map).reduce((carry, { size }) => carry + size, 0);
|
|
const _sizeUsed = (_this, { nextByte }) => nextByte;
|
|
const _compactSize = (_this, _table) =>
|
Math.max(
|
_this.compactSizeThreshold,
|
_sizeNeeded(_this, _table) * _this.compactMultiplierThreshold,
|
);
|
|
const _lock = (_this, mustLock, promiseFn) => {
|
if (mustLock !== false) {
|
return (_this.lock = promiseFn(_this.lock));
|
}
|
return promiseFn(Promise.resolve());
|
};
|
|
const serialFsTask = (array, each) =>
|
new Promise((resolve, reject) => {
|
let queue = 0;
|
let index = 0;
|
let inNext = false;
|
function next(err) {
|
if (err) {
|
return reject(err);
|
}
|
if (index === array.length) {
|
return resolve();
|
}
|
queue++;
|
if (inNext) {
|
return;
|
}
|
inNext = true;
|
while (queue > index && index < array.length) {
|
try {
|
each(array[index++], next);
|
} catch (e) {
|
return next(e);
|
}
|
}
|
inNext = false;
|
}
|
next();
|
});
|
|
class AppendSerializer {
|
constructor(options) {
|
this.path = options.cacheDirPath;
|
this.autoParse = options.autoParse;
|
this.blockSize = options.blockSize || _blockSize;
|
this.logSize = options.logSize || _logSize;
|
this.compactSizeThreshold = options.compactSizeThreshold || _minCompactSize;
|
this.compactMultiplierThreshold =
|
options.compactMultiplierThreshold || _compactMultiplierThreshold;
|
|
this.lock = Promise.resolve();
|
this._fd = null;
|
}
|
|
read(mustLock) {
|
const start = Date.now();
|
const _this = this;
|
|
function _read() {
|
let activeTable;
|
return Promise.resolve()
|
.then(_retry(() => _readTable(_this)))
|
.then(_table => {
|
activeTable = _table;
|
})
|
.then(() => {
|
const map = new Map();
|
|
const valueStarts = [];
|
values(activeTable.map).forEach(value => {
|
valueStarts.push({
|
start: value.start,
|
end: value.start + value.size,
|
value,
|
});
|
});
|
valueStarts.sort((a, b) => a.start - b.start);
|
|
return new Promise((resolve, reject) => {
|
let valueIndex = 0;
|
let destBuffer = new Buffer(2 * 1024 * 1024);
|
let offset = 0;
|
let logOffset = 0;
|
const log = _readLog(_this, activeTable);
|
log.on('data', data => {
|
if (valueIndex >= valueStarts.length) {
|
return;
|
}
|
for (let bufferIndex = 0; bufferIndex < data.length; ) {
|
if (bufferIndex + logOffset >= valueStarts[valueIndex].end) {
|
valueIndex++;
|
}
|
if (valueIndex >= valueStarts.length) {
|
return;
|
}
|
const value = valueStarts[valueIndex].value;
|
if (bufferIndex + logOffset >= value.start) {
|
if (value.size > destBuffer.length) {
|
const newLength = Math.pow(
|
2,
|
Math.ceil(Math.log(value.size) / Math.log(2)),
|
);
|
destBuffer = new Buffer(newLength);
|
}
|
|
const readAmount = Math.min(
|
value.start + value.size - logOffset - bufferIndex,
|
activeTable.logSize - bufferIndex,
|
);
|
data
|
.slice(bufferIndex, bufferIndex + readAmount)
|
.copy(destBuffer.slice(offset, offset + readAmount));
|
bufferIndex += readAmount;
|
offset += readAmount;
|
|
if (offset >= value.size) {
|
offset = 0;
|
if (_this.autoParse) {
|
// console.log(value.size, destBuffer.utf8Slice(0, value.size))
|
map.set(
|
value.key,
|
JSON.parse(destBuffer.utf8Slice(0, value.size)),
|
);
|
} else {
|
map.set(value.key, destBuffer.utf8Slice(0, value.size));
|
}
|
}
|
} else if (bufferIndex + logOffset < value.start) {
|
bufferIndex += value.start - (bufferIndex + logOffset);
|
}
|
}
|
logOffset += activeTable.logSize;
|
});
|
log.on('end', resolve);
|
log.on('error', reject);
|
}).then(() => objFrom(map));
|
});
|
}
|
|
return _lock(_this, mustLock, promise =>
|
promise
|
.then(() => _read())
|
.catch(e =>
|
_closeLog(_this).then(() => {
|
throw e;
|
}),
|
),
|
);
|
}
|
|
write(ops, mustLock) {
|
if (ops.length === 0) {
|
return Promise.resolve();
|
}
|
|
const steps = 0;
|
const _this = this;
|
|
let activeTable;
|
let contentBuffer;
|
let contentLength;
|
function _write() {
|
return Promise.resolve()
|
.then(_retry(() => mkdirp(_this.path)))
|
.then(_retry(() => _readTable(_this)))
|
.then(_table => {
|
activeTable = modTable(_table);
|
const _ops = ops.slice();
|
function step(op, next) {
|
// steps++;
|
// var op = _ops.shift();
|
// if (!op) {
|
// return;
|
// }
|
|
let content = op.value;
|
if (content !== null) {
|
if (typeof content !== 'string') {
|
content = JSON.stringify(content);
|
}
|
|
if (
|
Buffer.byteLength &&
|
contentBuffer &&
|
Buffer.byteLength(content) <= contentBuffer.length
|
) {
|
contentLength = contentBuffer.utf8Write(content);
|
} else {
|
contentBuffer = new Buffer(content);
|
contentLength = contentBuffer.length;
|
}
|
|
const blockCount = Math.ceil(
|
((activeTable.nextByte % activeTable.logSize) + contentLength) /
|
activeTable.logSize,
|
);
|
let nextByte = activeTable.nextByte;
|
activeTable = putKey(activeTable, op.key, contentLength);
|
let bufferIndex = 0;
|
|
const bulk = Array.from(new Array(blockCount)).map((_, i) => i);
|
return serialFsTask(bulk, (_, next) => {
|
const blockSlice = contentBuffer.slice(
|
bufferIndex,
|
Math.min(
|
bufferIndex +
|
(activeTable.logSize - (nextByte % activeTable.logSize)),
|
contentLength,
|
),
|
);
|
_appendBlock(_this, activeTable, blockSlice, nextByte, next);
|
bufferIndex += blockSlice.length;
|
nextByte += blockSlice.length;
|
}).then(next);
|
|
// function append() {
|
// if (bufferIndex < contentBuffer.length) {
|
// var blockSlice = contentBuffer.slice(bufferIndex, bufferIndex + activeTable.blockSize);
|
// bufferIndex += activeTable.blockSize;
|
// return _appendBlock(_this, activeTable, blockSlice, nextByte++)
|
// .then(append);
|
// }
|
// }
|
// return append()
|
// .then(step);
|
} else {
|
activeTable = delKey(activeTable, op.key);
|
next();
|
}
|
}
|
|
return serialFsTask(_ops, step);
|
|
// return step();
|
})
|
.then(() => _closeLog(_this))
|
.then(
|
_retry(() => {
|
activeTable = table(activeTable);
|
return _writeTable(_this, activeTable);
|
}),
|
);
|
}
|
|
return _lock(_this, mustLock, promise =>
|
promise
|
.then(() => _write())
|
.catch(e =>
|
_closeLog(_this).then(() => {
|
throw e;
|
}),
|
)
|
.then(() => {
|
if (
|
_sizeUsed(_this, activeTable) > _compactSize(_this, activeTable)
|
) {
|
return _this.compact(false);
|
}
|
}),
|
);
|
}
|
|
compact(mustLock) {
|
const _this = this;
|
|
return _this
|
.read(mustLock)
|
.then(map => {
|
const ops = [];
|
Object.keys(map).forEach(key => {
|
ops.push({
|
key,
|
value: map[key],
|
});
|
});
|
return ops;
|
})
|
.then(ops =>
|
rimraf(`${_this.path}~`)
|
.then(timeout100)
|
.then(() => ops),
|
)
|
.then(ops => {
|
const copy = new AppendSerializer({
|
cacheDirPath: `${_this.path}~`,
|
|
blockSize: _this.blockSize,
|
logSize: _this.logSize,
|
compactSizeThreshold: _this.compactSizeThreshold,
|
compactMultiplierThreshold: _this.compactMultiplierThreshold,
|
});
|
|
return _lock(_this, mustLock, promise =>
|
promise
|
.then(() => copy.write(ops))
|
.then(() => rimraf(_this.path))
|
.then(timeout100)
|
.then(_retry(() => rename(copy.path, _this.path), 10)),
|
);
|
});
|
}
|
}
|
|
module.exports = AppendSerializer;
|