prettier: convert app/js decaffeinated files to Prettier format

This commit is contained in:
mserranom 2020-02-17 18:34:28 +01:00
parent e79e044644
commit 3587b42311
18 changed files with 4057 additions and 2734 deletions

View file

@ -12,289 +12,334 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let DiffGenerator;
let DiffGenerator
var ConsistencyError = function(message) {
const error = new Error(message);
error.name = "ConsistencyError";
error.__proto__ = ConsistencyError.prototype;
return error;
};
ConsistencyError.prototype.__proto__ = Error.prototype;
const error = new Error(message)
error.name = 'ConsistencyError'
error.__proto__ = ConsistencyError.prototype
return error
}
ConsistencyError.prototype.__proto__ = Error.prototype
const logger = require("logger-sharelatex");
const logger = require('logger-sharelatex')
module.exports = (DiffGenerator = {
ConsistencyError,
module.exports = DiffGenerator = {
ConsistencyError,
rewindUpdate(content, update) {
for (let j = update.op.length - 1, i = j; j >= 0; j--, i = j) {
const op = update.op[i];
if (op.broken !== true) {
try {
content = DiffGenerator.rewindOp(content, op);
} catch (e) {
if (e instanceof ConsistencyError && (i = update.op.length - 1)) {
// catch known case where the last op in an array has been
// merged into a later op
logger.error({err: e, update, op: JSON.stringify(op)}, "marking op as broken");
op.broken = true;
} else {
throw e; // rethrow the execption
}
}
}
}
return content;
},
rewindUpdate(content, update) {
for (let j = update.op.length - 1, i = j; j >= 0; j--, i = j) {
const op = update.op[i]
if (op.broken !== true) {
try {
content = DiffGenerator.rewindOp(content, op)
} catch (e) {
if (e instanceof ConsistencyError && (i = update.op.length - 1)) {
// catch known case where the last op in an array has been
// merged into a later op
logger.error(
{ err: e, update, op: JSON.stringify(op) },
'marking op as broken'
)
op.broken = true
} else {
throw e // rethrow the execption
}
}
}
}
return content
},
rewindOp(content, op) {
let p;
if (op.i != null) {
// ShareJS will accept an op where p > content.length when applied,
// and it applies as though p == content.length. However, the op is
// passed to us with the original p > content.length. Detect if that
// is the case with this op, and shift p back appropriately to match
// ShareJS if so.
({ p } = op);
const max_p = content.length - op.i.length;
if (p > max_p) {
logger.warn({max_p, p}, "truncating position to content length");
p = max_p;
}
rewindOp(content, op) {
let p
if (op.i != null) {
// ShareJS will accept an op where p > content.length when applied,
// and it applies as though p == content.length. However, the op is
// passed to us with the original p > content.length. Detect if that
// is the case with this op, and shift p back appropriately to match
// ShareJS if so.
;({ p } = op)
const max_p = content.length - op.i.length
if (p > max_p) {
logger.warn({ max_p, p }, 'truncating position to content length')
p = max_p
}
const textToBeRemoved = content.slice(p, p + op.i.length);
if (op.i !== textToBeRemoved) {
throw new ConsistencyError(
`Inserted content, '${op.i}', does not match text to be removed, '${textToBeRemoved}'`
);
}
const textToBeRemoved = content.slice(p, p + op.i.length)
if (op.i !== textToBeRemoved) {
throw new ConsistencyError(
`Inserted content, '${op.i}', does not match text to be removed, '${textToBeRemoved}'`
)
}
return content.slice(0, p) + content.slice(p + op.i.length);
return content.slice(0, p) + content.slice(p + op.i.length)
} else if (op.d != null) {
return content.slice(0, op.p) + op.d + content.slice(op.p)
} else {
return content
}
},
} else if (op.d != null) {
return content.slice(0, op.p) + op.d + content.slice(op.p);
} else {
return content;
}
},
rewindUpdates(content, updates) {
for (const update of Array.from(updates.reverse())) {
try {
content = DiffGenerator.rewindUpdate(content, update)
} catch (e) {
e.attempted_update = update // keep a record of the attempted update
throw e // rethrow the exception
}
}
return content
},
rewindUpdates(content, updates) {
for (const update of Array.from(updates.reverse())) {
try {
content = DiffGenerator.rewindUpdate(content, update);
} catch (e) {
e.attempted_update = update; // keep a record of the attempted update
throw e; // rethrow the exception
}
}
return content;
},
buildDiff(initialContent, updates) {
let diff = [{ u: initialContent }]
for (const update of Array.from(updates)) {
diff = DiffGenerator.applyUpdateToDiff(diff, update)
}
diff = DiffGenerator.compressDiff(diff)
return diff
},
buildDiff(initialContent, updates) {
let diff = [ {u: initialContent} ];
for (const update of Array.from(updates)) {
diff = DiffGenerator.applyUpdateToDiff(diff, update);
}
diff = DiffGenerator.compressDiff(diff);
return diff;
},
compressDiff(diff) {
const newDiff = []
for (const part of Array.from(diff)) {
const lastPart = newDiff[newDiff.length - 1]
if (
lastPart != null &&
(lastPart.meta != null ? lastPart.meta.user : undefined) != null &&
(part.meta != null ? part.meta.user : undefined) != null
) {
if (
lastPart.i != null &&
part.i != null &&
lastPart.meta.user.id === part.meta.user.id
) {
lastPart.i += part.i
lastPart.meta.start_ts = Math.min(
lastPart.meta.start_ts,
part.meta.start_ts
)
lastPart.meta.end_ts = Math.max(
lastPart.meta.end_ts,
part.meta.end_ts
)
} else if (
lastPart.d != null &&
part.d != null &&
lastPart.meta.user.id === part.meta.user.id
) {
lastPart.d += part.d
lastPart.meta.start_ts = Math.min(
lastPart.meta.start_ts,
part.meta.start_ts
)
lastPart.meta.end_ts = Math.max(
lastPart.meta.end_ts,
part.meta.end_ts
)
} else {
newDiff.push(part)
}
} else {
newDiff.push(part)
}
}
return newDiff
},
compressDiff(diff) {
const newDiff = [];
for (const part of Array.from(diff)) {
const lastPart = newDiff[newDiff.length - 1];
if ((lastPart != null) && ((lastPart.meta != null ? lastPart.meta.user : undefined) != null) && ((part.meta != null ? part.meta.user : undefined) != null)) {
if ((lastPart.i != null) && (part.i != null) && (lastPart.meta.user.id === part.meta.user.id)) {
lastPart.i += part.i;
lastPart.meta.start_ts = Math.min(lastPart.meta.start_ts, part.meta.start_ts);
lastPart.meta.end_ts = Math.max(lastPart.meta.end_ts, part.meta.end_ts);
} else if ((lastPart.d != null) && (part.d != null) && (lastPart.meta.user.id === part.meta.user.id)) {
lastPart.d += part.d;
lastPart.meta.start_ts = Math.min(lastPart.meta.start_ts, part.meta.start_ts);
lastPart.meta.end_ts = Math.max(lastPart.meta.end_ts, part.meta.end_ts);
} else {
newDiff.push(part);
}
} else {
newDiff.push(part);
}
}
return newDiff;
},
applyOpToDiff(diff, op, meta) {
let consumedDiff
const position = 0
applyOpToDiff(diff, op, meta) {
let consumedDiff;
const position = 0;
let remainingDiff = diff.slice()
;({ consumedDiff, remainingDiff } = DiffGenerator._consumeToOffset(
remainingDiff,
op.p
))
const newDiff = consumedDiff
let remainingDiff = diff.slice();
({consumedDiff, remainingDiff} = DiffGenerator._consumeToOffset(remainingDiff, op.p));
const newDiff = consumedDiff;
if (op.i != null) {
newDiff.push({
i: op.i,
meta
})
} else if (op.d != null) {
;({
consumedDiff,
remainingDiff
} = DiffGenerator._consumeDiffAffectedByDeleteOp(remainingDiff, op, meta))
newDiff.push(...Array.from(consumedDiff || []))
}
if (op.i != null) {
newDiff.push({
i: op.i,
meta
});
} else if (op.d != null) {
({consumedDiff, remainingDiff} = DiffGenerator._consumeDiffAffectedByDeleteOp(remainingDiff, op, meta));
newDiff.push(...Array.from(consumedDiff || []));
}
newDiff.push(...Array.from(remainingDiff || []))
newDiff.push(...Array.from(remainingDiff || []));
return newDiff
},
return newDiff;
},
applyUpdateToDiff(diff, update) {
for (const op of Array.from(update.op)) {
if (op.broken !== true) {
diff = DiffGenerator.applyOpToDiff(diff, op, update.meta)
}
}
return diff
},
applyUpdateToDiff(diff, update) {
for (const op of Array.from(update.op)) {
if (op.broken !== true) {
diff = DiffGenerator.applyOpToDiff(diff, op, update.meta);
}
}
return diff;
},
_consumeToOffset(remainingDiff, totalOffset) {
let part
const consumedDiff = []
let position = 0
while ((part = remainingDiff.shift())) {
const length = DiffGenerator._getLengthOfDiffPart(part)
if (part.d != null) {
consumedDiff.push(part)
} else if (position + length >= totalOffset) {
const partOffset = totalOffset - position
if (partOffset > 0) {
consumedDiff.push(DiffGenerator._slicePart(part, 0, partOffset))
}
if (partOffset < length) {
remainingDiff.unshift(DiffGenerator._slicePart(part, partOffset))
}
break
} else {
position += length
consumedDiff.push(part)
}
}
_consumeToOffset(remainingDiff, totalOffset) {
let part;
const consumedDiff = [];
let position = 0;
while ((part = remainingDiff.shift())) {
const length = DiffGenerator._getLengthOfDiffPart(part);
if (part.d != null) {
consumedDiff.push(part);
} else if ((position + length) >= totalOffset) {
const partOffset = totalOffset - position;
if (partOffset > 0) {
consumedDiff.push(DiffGenerator._slicePart(part, 0, partOffset));
}
if (partOffset < length) {
remainingDiff.unshift(DiffGenerator._slicePart(part, partOffset));
}
break;
} else {
position += length;
consumedDiff.push(part);
}
}
return {
consumedDiff,
remainingDiff
}
},
return {
consumedDiff,
remainingDiff
};
},
_consumeDiffAffectedByDeleteOp(remainingDiff, deleteOp, meta) {
const consumedDiff = []
let remainingOp = deleteOp
while (remainingOp && remainingDiff.length > 0) {
let newPart
;({
newPart,
remainingDiff,
remainingOp
} = DiffGenerator._consumeDeletedPart(remainingDiff, remainingOp, meta))
if (newPart != null) {
consumedDiff.push(newPart)
}
}
return {
consumedDiff,
remainingDiff
}
},
_consumeDiffAffectedByDeleteOp(remainingDiff, deleteOp, meta) {
const consumedDiff = [];
let remainingOp = deleteOp;
while (remainingOp && (remainingDiff.length > 0)) {
let newPart;
({newPart, remainingDiff, remainingOp} = DiffGenerator._consumeDeletedPart(remainingDiff, remainingOp, meta));
if (newPart != null) { consumedDiff.push(newPart); }
}
return {
consumedDiff,
remainingDiff
};
},
_consumeDeletedPart(remainingDiff, op, meta) {
let deletedContent, newPart, remainingOp
const part = remainingDiff.shift()
const partLength = DiffGenerator._getLengthOfDiffPart(part)
_consumeDeletedPart(remainingDiff, op, meta) {
let deletedContent, newPart, remainingOp;
const part = remainingDiff.shift();
const partLength = DiffGenerator._getLengthOfDiffPart(part);
if (part.d != null) {
// Skip existing deletes
remainingOp = op
newPart = part
} else if (partLength > op.d.length) {
// Only the first bit of the part has been deleted
const remainingPart = DiffGenerator._slicePart(part, op.d.length)
remainingDiff.unshift(remainingPart)
if (part.d != null) {
// Skip existing deletes
remainingOp = op;
newPart = part;
deletedContent = DiffGenerator._getContentOfPart(part).slice(
0,
op.d.length
)
if (deletedContent !== op.d) {
throw new ConsistencyError(
`deleted content, '${deletedContent}', does not match delete op, '${op.d}'`
)
}
} else if (partLength > op.d.length) {
// Only the first bit of the part has been deleted
const remainingPart = DiffGenerator._slicePart(part, op.d.length);
remainingDiff.unshift(remainingPart);
if (part.u != null) {
newPart = {
d: op.d,
meta
}
} else if (part.i != null) {
newPart = null
}
deletedContent = DiffGenerator._getContentOfPart(part).slice(0, op.d.length);
if (deletedContent !== op.d) {
throw new ConsistencyError(`deleted content, '${deletedContent}', does not match delete op, '${op.d}'`);
}
remainingOp = null
} else if (partLength === op.d.length) {
// The entire part has been deleted, but it is the last part
if (part.u != null) {
newPart = {
d: op.d,
meta
};
} else if (part.i != null) {
newPart = null;
}
deletedContent = DiffGenerator._getContentOfPart(part)
if (deletedContent !== op.d) {
throw new ConsistencyError(
`deleted content, '${deletedContent}', does not match delete op, '${op.d}'`
)
}
remainingOp = null;
if (part.u != null) {
newPart = {
d: op.d,
meta
}
} else if (part.i != null) {
newPart = null
}
} else if (partLength === op.d.length) {
// The entire part has been deleted, but it is the last part
remainingOp = null
} else if (partLength < op.d.length) {
// The entire part has been deleted and there is more
deletedContent = DiffGenerator._getContentOfPart(part);
if (deletedContent !== op.d) {
throw new ConsistencyError(`deleted content, '${deletedContent}', does not match delete op, '${op.d}'`);
}
deletedContent = DiffGenerator._getContentOfPart(part)
const opContent = op.d.slice(0, deletedContent.length)
if (deletedContent !== opContent) {
throw new ConsistencyError(
`deleted content, '${deletedContent}', does not match delete op, '${opContent}'`
)
}
if (part.u != null) {
newPart = {
d: op.d,
meta
};
} else if (part.i != null) {
newPart = null;
}
if (part.u) {
newPart = {
d: part.u,
meta
}
} else if (part.i != null) {
newPart = null
}
remainingOp = null;
remainingOp = {
p: op.p,
d: op.d.slice(DiffGenerator._getLengthOfDiffPart(part))
}
}
} else if (partLength < op.d.length) {
// The entire part has been deleted and there is more
return {
newPart,
remainingDiff,
remainingOp
}
},
deletedContent = DiffGenerator._getContentOfPart(part);
const opContent = op.d.slice(0, deletedContent.length);
if (deletedContent !== opContent) {
throw new ConsistencyError(`deleted content, '${deletedContent}', does not match delete op, '${opContent}'`);
}
_slicePart(basePart, from, to) {
let part
if (basePart.u != null) {
part = { u: basePart.u.slice(from, to) }
} else if (basePart.i != null) {
part = { i: basePart.i.slice(from, to) }
}
if (basePart.meta != null) {
part.meta = basePart.meta
}
return part
},
if (part.u) {
newPart = {
d: part.u,
meta
};
} else if (part.i != null) {
newPart = null;
}
_getLengthOfDiffPart(part) {
return (part.u || part.d || part.i || '').length
},
remainingOp =
{p: op.p, d: op.d.slice(DiffGenerator._getLengthOfDiffPart(part))};
}
return {
newPart,
remainingDiff,
remainingOp
};
},
_slicePart(basePart, from, to) {
let part;
if (basePart.u != null) {
part = { u: basePart.u.slice(from, to) };
} else if (basePart.i != null) {
part = { i: basePart.i.slice(from, to) };
}
if (basePart.meta != null) {
part.meta = basePart.meta;
}
return part;
},
_getLengthOfDiffPart(part) {
return (part.u || part.d || part.i || '').length;
},
_getContentOfPart(part) {
return part.u || part.d || part.i || '';
}
});
_getContentOfPart(part) {
return part.u || part.d || part.i || ''
}
}

View file

@ -11,124 +11,178 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let DiffManager;
const UpdatesManager = require("./UpdatesManager");
const DocumentUpdaterManager = require("./DocumentUpdaterManager");
const DiffGenerator = require("./DiffGenerator");
const logger = require("logger-sharelatex");
let DiffManager
const UpdatesManager = require('./UpdatesManager')
const DocumentUpdaterManager = require('./DocumentUpdaterManager')
const DiffGenerator = require('./DiffGenerator')
const logger = require('logger-sharelatex')
module.exports = (DiffManager = {
getLatestDocAndUpdates(project_id, doc_id, fromVersion, callback) {
// Get updates last, since then they must be ahead and it
// might be possible to rewind to the same version as the doc.
if (callback == null) { callback = function(error, content, version, updates) {}; }
return DocumentUpdaterManager.getDocument(project_id, doc_id, function(error, content, version) {
if (error != null) { return callback(error); }
if ((fromVersion == null)) { // If we haven't been given a version, just return lastest doc and no updates
return callback(null, content, version, []);
}
return UpdatesManager.getDocUpdatesWithUserInfo(project_id, doc_id, {from: fromVersion}, function(error, updates) {
if (error != null) { return callback(error); }
return callback(null, content, version, updates);
});
});
},
getDiff(project_id, doc_id, fromVersion, toVersion, callback) {
if (callback == null) { callback = function(error, diff) {}; }
return DiffManager.getDocumentBeforeVersion(project_id, doc_id, fromVersion, function(error, startingContent, updates) {
let diff;
if (error != null) {
if (error.message === "broken-history") {
return callback(null, "history unavailable");
} else {
return callback(error);
}
}
module.exports = DiffManager = {
getLatestDocAndUpdates(project_id, doc_id, fromVersion, callback) {
// Get updates last, since then they must be ahead and it
// might be possible to rewind to the same version as the doc.
if (callback == null) {
callback = function(error, content, version, updates) {}
}
return DocumentUpdaterManager.getDocument(project_id, doc_id, function(
error,
content,
version
) {
if (error != null) {
return callback(error)
}
if (fromVersion == null) {
// If we haven't been given a version, just return lastest doc and no updates
return callback(null, content, version, [])
}
return UpdatesManager.getDocUpdatesWithUserInfo(
project_id,
doc_id,
{ from: fromVersion },
function(error, updates) {
if (error != null) {
return callback(error)
}
return callback(null, content, version, updates)
}
)
})
},
const updatesToApply = [];
for (const update of Array.from(updates.slice().reverse())) {
if (update.v <= toVersion) {
updatesToApply.push(update);
}
}
getDiff(project_id, doc_id, fromVersion, toVersion, callback) {
if (callback == null) {
callback = function(error, diff) {}
}
return DiffManager.getDocumentBeforeVersion(
project_id,
doc_id,
fromVersion,
function(error, startingContent, updates) {
let diff
if (error != null) {
if (error.message === 'broken-history') {
return callback(null, 'history unavailable')
} else {
return callback(error)
}
}
try {
diff = DiffGenerator.buildDiff(startingContent, updatesToApply);
} catch (e) {
return callback(e);
}
return callback(null, diff);
});
},
const updatesToApply = []
for (const update of Array.from(updates.slice().reverse())) {
if (update.v <= toVersion) {
updatesToApply.push(update)
}
}
getDocumentBeforeVersion(project_id, doc_id, version, _callback) {
// Whichever order we get the latest document and the latest updates,
// there is potential for updates to be applied between them so that
// they do not return the same 'latest' versions.
// If this happens, we just retry and hopefully get them at the compatible
// versions.
let retry;
if (_callback == null) { _callback = function(error, document, rewoundUpdates) {}; }
let retries = 3;
const callback = function(error, ...args) {
if (error != null) {
if (error.retry && (retries > 0)) {
logger.warn({error, project_id, doc_id, version, retries}, "retrying getDocumentBeforeVersion");
return retry();
} else {
return _callback(error);
}
} else {
return _callback(null, ...Array.from(args));
}
};
try {
diff = DiffGenerator.buildDiff(startingContent, updatesToApply)
} catch (e) {
return callback(e)
}
return (retry = function() {
retries--;
return DiffManager._tryGetDocumentBeforeVersion(project_id, doc_id, version, callback);
})();
},
return callback(null, diff)
}
)
},
_tryGetDocumentBeforeVersion(project_id, doc_id, version, callback) {
if (callback == null) { callback = function(error, document, rewoundUpdates) {}; }
logger.log({project_id, doc_id, version}, "getting document before version");
return DiffManager.getLatestDocAndUpdates(project_id, doc_id, version, function(error, content, version, updates) {
let startingContent;
if (error != null) { return callback(error); }
getDocumentBeforeVersion(project_id, doc_id, version, _callback) {
// Whichever order we get the latest document and the latest updates,
// there is potential for updates to be applied between them so that
// they do not return the same 'latest' versions.
// If this happens, we just retry and hopefully get them at the compatible
// versions.
let retry
if (_callback == null) {
_callback = function(error, document, rewoundUpdates) {}
}
let retries = 3
const callback = function(error, ...args) {
if (error != null) {
if (error.retry && retries > 0) {
logger.warn(
{ error, project_id, doc_id, version, retries },
'retrying getDocumentBeforeVersion'
)
return retry()
} else {
return _callback(error)
}
} else {
return _callback(null, ...Array.from(args))
}
}
// bail out if we hit a broken update
for (const u of Array.from(updates)) {
if (u.broken) {
return callback(new Error("broken-history"));
}
}
return (retry = function() {
retries--
return DiffManager._tryGetDocumentBeforeVersion(
project_id,
doc_id,
version,
callback
)
})()
},
// discard any updates which are ahead of this document version
while ((updates[0] != null ? updates[0].v : undefined) >= version) {
updates.shift();
}
_tryGetDocumentBeforeVersion(project_id, doc_id, version, callback) {
if (callback == null) {
callback = function(error, document, rewoundUpdates) {}
}
logger.log(
{ project_id, doc_id, version },
'getting document before version'
)
return DiffManager.getLatestDocAndUpdates(
project_id,
doc_id,
version,
function(error, content, version, updates) {
let startingContent
if (error != null) {
return callback(error)
}
const lastUpdate = updates[0];
if ((lastUpdate != null) && (lastUpdate.v !== (version - 1))) {
error = new Error(`latest update version, ${lastUpdate.v}, does not match doc version, ${version}`);
error.retry = true;
return callback(error);
}
logger.log({docVersion: version, lastUpdateVersion: (lastUpdate != null ? lastUpdate.v : undefined), updateCount: updates.length}, "rewinding updates");
// bail out if we hit a broken update
for (const u of Array.from(updates)) {
if (u.broken) {
return callback(new Error('broken-history'))
}
}
const tryUpdates = updates.slice().reverse();
// discard any updates which are ahead of this document version
while ((updates[0] != null ? updates[0].v : undefined) >= version) {
updates.shift()
}
try {
startingContent = DiffGenerator.rewindUpdates(content, tryUpdates);
// tryUpdates is reversed, and any unapplied ops are marked as broken
} catch (e) {
return callback(e);
}
const lastUpdate = updates[0]
if (lastUpdate != null && lastUpdate.v !== version - 1) {
error = new Error(
`latest update version, ${lastUpdate.v}, does not match doc version, ${version}`
)
error.retry = true
return callback(error)
}
return callback(null, startingContent, tryUpdates);
});
}
});
logger.log(
{
docVersion: version,
lastUpdateVersion: lastUpdate != null ? lastUpdate.v : undefined,
updateCount: updates.length
},
'rewinding updates'
)
const tryUpdates = updates.slice().reverse()
try {
startingContent = DiffGenerator.rewindUpdates(content, tryUpdates)
// tryUpdates is reversed, and any unapplied ops are marked as broken
} catch (e) {
return callback(e)
}
return callback(null, startingContent, tryUpdates)
}
)
}
}

View file

@ -11,60 +11,80 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let DocumentUpdaterManager;
const request = require("request");
const logger = require("logger-sharelatex");
const Settings = require("settings-sharelatex");
let DocumentUpdaterManager
const request = require('request')
const logger = require('logger-sharelatex')
const Settings = require('settings-sharelatex')
module.exports = (DocumentUpdaterManager = {
getDocument(project_id, doc_id, callback) {
if (callback == null) { callback = function(error, content, version) {}; }
const url = `${Settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}`;
logger.log({project_id, doc_id}, "getting doc from document updater");
return request.get(url, function(error, res, body){
if (error != null) {
return callback(error);
}
if ((res.statusCode >= 200) && (res.statusCode < 300)) {
try {
body = JSON.parse(body);
} catch (error1) {
error = error1;
return callback(error);
}
logger.log({project_id, doc_id, version: body.version}, "got doc from document updater");
return callback(null, body.lines.join("\n"), body.version);
} else {
error = new Error(`doc updater returned a non-success status code: ${res.statusCode}`);
logger.error({err: error, project_id, doc_id, url}, "error accessing doc updater");
return callback(error);
}
});
},
module.exports = DocumentUpdaterManager = {
getDocument(project_id, doc_id, callback) {
if (callback == null) {
callback = function(error, content, version) {}
}
const url = `${Settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}`
logger.log({ project_id, doc_id }, 'getting doc from document updater')
return request.get(url, function(error, res, body) {
if (error != null) {
return callback(error)
}
if (res.statusCode >= 200 && res.statusCode < 300) {
try {
body = JSON.parse(body)
} catch (error1) {
error = error1
return callback(error)
}
logger.log(
{ project_id, doc_id, version: body.version },
'got doc from document updater'
)
return callback(null, body.lines.join('\n'), body.version)
} else {
error = new Error(
`doc updater returned a non-success status code: ${res.statusCode}`
)
logger.error(
{ err: error, project_id, doc_id, url },
'error accessing doc updater'
)
return callback(error)
}
})
},
setDocument(project_id, doc_id, content, user_id, callback) {
if (callback == null) { callback = function(error) {}; }
const url = `${Settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}`;
logger.log({project_id, doc_id}, "setting doc in document updater");
return request.post({
url,
json: {
lines: content.split("\n"),
source: "restore",
user_id,
undoing: true
}
}, function(error, res, body){
if (error != null) {
return callback(error);
}
if ((res.statusCode >= 200) && (res.statusCode < 300)) {
return callback(null);
} else {
error = new Error(`doc updater returned a non-success status code: ${res.statusCode}`);
logger.error({err: error, project_id, doc_id, url}, "error accessing doc updater");
return callback(error);
}
});
}
});
setDocument(project_id, doc_id, content, user_id, callback) {
if (callback == null) {
callback = function(error) {}
}
const url = `${Settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}`
logger.log({ project_id, doc_id }, 'setting doc in document updater')
return request.post(
{
url,
json: {
lines: content.split('\n'),
source: 'restore',
user_id,
undoing: true
}
},
function(error, res, body) {
if (error != null) {
return callback(error)
}
if (res.statusCode >= 200 && res.statusCode < 300) {
return callback(null)
} else {
error = new Error(
`doc updater returned a non-success status code: ${res.statusCode}`
)
logger.error(
{ err: error, project_id, doc_id, url },
'error accessing doc updater'
)
return callback(error)
}
}
)
}
}

View file

@ -10,61 +10,75 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
const { ObjectId } = require("mongojs");
const request = require("request");
const async = require("async");
const settings = require("settings-sharelatex");
const { port } = settings.internal.trackchanges;
const logger = require("logger-sharelatex");
const LockManager = require("./LockManager");
const { ObjectId } = require('mongojs')
const request = require('request')
const async = require('async')
const settings = require('settings-sharelatex')
const { port } = settings.internal.trackchanges
const logger = require('logger-sharelatex')
const LockManager = require('./LockManager')
module.exports = {
check(callback){
const project_id = ObjectId(settings.trackchanges.healthCheck.project_id);
const url = `http://localhost:${port}/project/${project_id}`;
logger.log({project_id}, "running health check");
const jobs = [
cb=>
request.get({url:`http://localhost:${port}/check_lock`, timeout:3000}, function(err, res, body) {
if (err != null) {
logger.err({err, project_id}, "error checking lock for health check");
return cb(err);
} else if ((res != null ? res.statusCode : undefined) !== 200) {
return cb(`status code not 200, it's ${res.statusCode}`);
} else {
return cb();
}
})
,
cb=>
request.post({url:`${url}/flush`, timeout:10000}, function(err, res, body) {
if (err != null) {
logger.err({err, project_id}, "error flushing for health check");
return cb(err);
} else if ((res != null ? res.statusCode : undefined) !== 204) {
return cb(`status code not 204, it's ${res.statusCode}`);
} else {
return cb();
}
})
,
cb=>
request.get({url:`${url}/updates`, timeout:10000}, function(err, res, body){
if (err != null) {
logger.err({err, project_id}, "error getting updates for health check");
return cb(err);
} else if ((res != null ? res.statusCode : undefined) !== 200) {
return cb(`status code not 200, it's ${res.statusCode}`);
} else {
return cb();
}
})
];
return async.series(jobs, callback);
},
check(callback) {
const project_id = ObjectId(settings.trackchanges.healthCheck.project_id)
const url = `http://localhost:${port}/project/${project_id}`
logger.log({ project_id }, 'running health check')
const jobs = [
cb =>
request.get(
{ url: `http://localhost:${port}/check_lock`, timeout: 3000 },
function(err, res, body) {
if (err != null) {
logger.err(
{ err, project_id },
'error checking lock for health check'
)
return cb(err)
} else if ((res != null ? res.statusCode : undefined) !== 200) {
return cb(`status code not 200, it's ${res.statusCode}`)
} else {
return cb()
}
}
),
cb =>
request.post({ url: `${url}/flush`, timeout: 10000 }, function(
err,
res,
body
) {
if (err != null) {
logger.err({ err, project_id }, 'error flushing for health check')
return cb(err)
} else if ((res != null ? res.statusCode : undefined) !== 204) {
return cb(`status code not 204, it's ${res.statusCode}`)
} else {
return cb()
}
}),
cb =>
request.get({ url: `${url}/updates`, timeout: 10000 }, function(
err,
res,
body
) {
if (err != null) {
logger.err(
{ err, project_id },
'error getting updates for health check'
)
return cb(err)
} else if ((res != null ? res.statusCode : undefined) !== 200) {
return cb(`status code not 200, it's ${res.statusCode}`)
} else {
return cb()
}
})
]
return async.series(jobs, callback)
},
checkLock(callback) {
return LockManager.healthCheck(callback);
}
};
checkLock(callback) {
return LockManager.healthCheck(callback)
}
}

View file

@ -12,191 +12,263 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let HttpController;
const UpdatesManager = require("./UpdatesManager");
const DiffManager = require("./DiffManager");
const PackManager = require("./PackManager");
const RestoreManager = require("./RestoreManager");
const logger = require("logger-sharelatex");
const HealthChecker = require("./HealthChecker");
const _ = require("underscore");
let HttpController
const UpdatesManager = require('./UpdatesManager')
const DiffManager = require('./DiffManager')
const PackManager = require('./PackManager')
const RestoreManager = require('./RestoreManager')
const logger = require('logger-sharelatex')
const HealthChecker = require('./HealthChecker')
const _ = require('underscore')
module.exports = (HttpController = {
flushDoc(req, res, next) {
if (next == null) { next = function(error) {}; }
const { doc_id } = req.params;
const { project_id } = req.params;
logger.log({project_id, doc_id}, "compressing doc history");
return UpdatesManager.processUncompressedUpdatesWithLock(project_id, doc_id, function(error) {
if (error != null) { return next(error); }
return res.send(204);
});
},
module.exports = HttpController = {
flushDoc(req, res, next) {
if (next == null) {
next = function(error) {}
}
const { doc_id } = req.params
const { project_id } = req.params
logger.log({ project_id, doc_id }, 'compressing doc history')
return UpdatesManager.processUncompressedUpdatesWithLock(
project_id,
doc_id,
function(error) {
if (error != null) {
return next(error)
}
return res.send(204)
}
)
},
flushProject(req, res, next) {
if (next == null) { next = function(error) {}; }
const { project_id } = req.params;
logger.log({project_id}, "compressing project history");
return UpdatesManager.processUncompressedUpdatesForProject(project_id, function(error) {
if (error != null) { return next(error); }
return res.send(204);
});
},
flushProject(req, res, next) {
if (next == null) {
next = function(error) {}
}
const { project_id } = req.params
logger.log({ project_id }, 'compressing project history')
return UpdatesManager.processUncompressedUpdatesForProject(
project_id,
function(error) {
if (error != null) {
return next(error)
}
return res.send(204)
}
)
},
flushAll(req, res, next) {
// limit on projects to flush or -1 for all (default)
if (next == null) { next = function(error) {}; }
const limit = (req.query.limit != null) ? parseInt(req.query.limit, 10) : -1;
logger.log({limit}, "flushing all projects");
return UpdatesManager.flushAll(limit, function(error, result) {
if (error != null) { return next(error); }
const {failed, succeeded, all} = result;
const status = `${succeeded.length} succeeded, ${failed.length} failed`;
if (limit === 0) {
return res.status(200).send(`${status}\nwould flush:\n${all.join('\n')}\n`);
} else if (failed.length > 0) {
logger.log({failed, succeeded}, "error flushing projects");
return res.status(500).send(`${status}\nfailed to flush:\n${failed.join('\n')}\n`);
} else {
return res.status(200).send(`${status}\nflushed ${succeeded.length} projects of ${all.length}\n`);
}
});
},
flushAll(req, res, next) {
// limit on projects to flush or -1 for all (default)
if (next == null) {
next = function(error) {}
}
const limit = req.query.limit != null ? parseInt(req.query.limit, 10) : -1
logger.log({ limit }, 'flushing all projects')
return UpdatesManager.flushAll(limit, function(error, result) {
if (error != null) {
return next(error)
}
const { failed, succeeded, all } = result
const status = `${succeeded.length} succeeded, ${failed.length} failed`
if (limit === 0) {
return res
.status(200)
.send(`${status}\nwould flush:\n${all.join('\n')}\n`)
} else if (failed.length > 0) {
logger.log({ failed, succeeded }, 'error flushing projects')
return res
.status(500)
.send(`${status}\nfailed to flush:\n${failed.join('\n')}\n`)
} else {
return res
.status(200)
.send(
`${status}\nflushed ${succeeded.length} projects of ${all.length}\n`
)
}
})
},
checkDanglingUpdates(req, res, next) {
if (next == null) { next = function(error) {}; }
logger.log("checking dangling updates");
return UpdatesManager.getDanglingUpdates(function(error, result) {
if (error != null) { return next(error); }
if (result.length > 0) {
logger.log({dangling: result}, "found dangling updates");
return res.status(500).send(`dangling updates:\n${result.join('\n')}\n`);
} else {
return res.status(200).send("no dangling updates found\n");
}
});
},
checkDanglingUpdates(req, res, next) {
if (next == null) {
next = function(error) {}
}
logger.log('checking dangling updates')
return UpdatesManager.getDanglingUpdates(function(error, result) {
if (error != null) {
return next(error)
}
if (result.length > 0) {
logger.log({ dangling: result }, 'found dangling updates')
return res.status(500).send(`dangling updates:\n${result.join('\n')}\n`)
} else {
return res.status(200).send('no dangling updates found\n')
}
})
},
checkDoc(req, res, next) {
if (next == null) { next = function(error) {}; }
const { doc_id } = req.params;
const { project_id } = req.params;
logger.log({project_id, doc_id}, "checking doc history");
return DiffManager.getDocumentBeforeVersion(project_id, doc_id, 1, function(error, document, rewoundUpdates) {
if (error != null) { return next(error); }
const broken = [];
for (const update of Array.from(rewoundUpdates)) {
for (const op of Array.from(update.op)) {
if (op.broken === true) {
broken.push(op);
}
}
}
if (broken.length > 0) {
return res.send(broken);
} else {
return res.send(204);
}
});
},
checkDoc(req, res, next) {
if (next == null) {
next = function(error) {}
}
const { doc_id } = req.params
const { project_id } = req.params
logger.log({ project_id, doc_id }, 'checking doc history')
return DiffManager.getDocumentBeforeVersion(project_id, doc_id, 1, function(
error,
document,
rewoundUpdates
) {
if (error != null) {
return next(error)
}
const broken = []
for (const update of Array.from(rewoundUpdates)) {
for (const op of Array.from(update.op)) {
if (op.broken === true) {
broken.push(op)
}
}
}
if (broken.length > 0) {
return res.send(broken)
} else {
return res.send(204)
}
})
},
getDiff(req, res, next) {
let from, to;
if (next == null) { next = function(error) {}; }
const { doc_id } = req.params;
const { project_id } = req.params;
getDiff(req, res, next) {
let from, to
if (next == null) {
next = function(error) {}
}
const { doc_id } = req.params
const { project_id } = req.params
if (req.query.from != null) {
from = parseInt(req.query.from, 10);
} else {
from = null;
}
if (req.query.to != null) {
to = parseInt(req.query.to, 10);
} else {
to = null;
}
if (req.query.from != null) {
from = parseInt(req.query.from, 10)
} else {
from = null
}
if (req.query.to != null) {
to = parseInt(req.query.to, 10)
} else {
to = null
}
logger.log({project_id, doc_id, from, to}, "getting diff");
return DiffManager.getDiff(project_id, doc_id, from, to, function(error, diff) {
if (error != null) { return next(error); }
return res.json({diff});
});
},
logger.log({ project_id, doc_id, from, to }, 'getting diff')
return DiffManager.getDiff(project_id, doc_id, from, to, function(
error,
diff
) {
if (error != null) {
return next(error)
}
return res.json({ diff })
})
},
getUpdates(req, res, next) {
let before, min_count;
if (next == null) { next = function(error) {}; }
const { project_id } = req.params;
getUpdates(req, res, next) {
let before, min_count
if (next == null) {
next = function(error) {}
}
const { project_id } = req.params
if (req.query.before != null) {
before = parseInt(req.query.before, 10);
}
if (req.query.min_count != null) {
min_count = parseInt(req.query.min_count, 10);
}
if (req.query.before != null) {
before = parseInt(req.query.before, 10)
}
if (req.query.min_count != null) {
min_count = parseInt(req.query.min_count, 10)
}
return UpdatesManager.getSummarizedProjectUpdates(project_id, {before, min_count}, function(error, updates, nextBeforeTimestamp) {
if (error != null) { return next(error); }
return res.json({
updates,
nextBeforeTimestamp
});
});
},
return UpdatesManager.getSummarizedProjectUpdates(
project_id,
{ before, min_count },
function(error, updates, nextBeforeTimestamp) {
if (error != null) {
return next(error)
}
return res.json({
updates,
nextBeforeTimestamp
})
}
)
},
restore(req, res, next) {
if (next == null) { next = function(error) {}; }
let {doc_id, project_id, version} = req.params;
const user_id = req.headers["x-user-id"];
version = parseInt(version, 10);
return RestoreManager.restoreToBeforeVersion(project_id, doc_id, version, user_id, function(error) {
if (error != null) { return next(error); }
return res.send(204);
});
},
restore(req, res, next) {
if (next == null) {
next = function(error) {}
}
let { doc_id, project_id, version } = req.params
const user_id = req.headers['x-user-id']
version = parseInt(version, 10)
return RestoreManager.restoreToBeforeVersion(
project_id,
doc_id,
version,
user_id,
function(error) {
if (error != null) {
return next(error)
}
return res.send(204)
}
)
},
pushDocHistory(req, res, next) {
if (next == null) { next = function(error) {}; }
const { project_id } = req.params;
const { doc_id } = req.params;
logger.log({project_id, doc_id}, "pushing all finalised changes to s3");
return PackManager.pushOldPacks(project_id, doc_id, function(error) {
if (error != null) { return next(error); }
return res.send(204);
});
},
pushDocHistory(req, res, next) {
if (next == null) {
next = function(error) {}
}
const { project_id } = req.params
const { doc_id } = req.params
logger.log({ project_id, doc_id }, 'pushing all finalised changes to s3')
return PackManager.pushOldPacks(project_id, doc_id, function(error) {
if (error != null) {
return next(error)
}
return res.send(204)
})
},
pullDocHistory(req, res, next) {
if (next == null) { next = function(error) {}; }
const { project_id } = req.params;
const { doc_id } = req.params;
logger.log({project_id, doc_id}, "pulling all packs from s3");
return PackManager.pullOldPacks(project_id, doc_id, function(error) {
if (error != null) { return next(error); }
return res.send(204);
});
},
pullDocHistory(req, res, next) {
if (next == null) {
next = function(error) {}
}
const { project_id } = req.params
const { doc_id } = req.params
logger.log({ project_id, doc_id }, 'pulling all packs from s3')
return PackManager.pullOldPacks(project_id, doc_id, function(error) {
if (error != null) {
return next(error)
}
return res.send(204)
})
},
healthCheck(req, res){
return HealthChecker.check(function(err){
if (err != null) {
logger.err({err}, "error performing health check");
return res.send(500);
} else {
return res.send(200);
}
});
},
healthCheck(req, res) {
return HealthChecker.check(function(err) {
if (err != null) {
logger.err({ err }, 'error performing health check')
return res.send(500)
} else {
return res.send(200)
}
})
},
checkLock(req, res){
return HealthChecker.checkLock(function(err) {
if (err != null) {
logger.err({err}, "error performing lock check");
return res.send(500);
} else {
return res.send(200);
}
});
}
});
checkLock(req, res) {
return HealthChecker.checkLock(function(err) {
if (err != null) {
logger.err({ err }, 'error performing lock check')
return res.send(500)
} else {
return res.send(200)
}
})
}
}

View file

@ -9,116 +9,149 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let LockManager;
const Settings = require("settings-sharelatex");
const redis = require("redis-sharelatex");
const rclient = redis.createClient(Settings.redis.lock);
const os = require("os");
const crypto = require("crypto");
const logger = require("logger-sharelatex");
let LockManager
const Settings = require('settings-sharelatex')
const redis = require('redis-sharelatex')
const rclient = redis.createClient(Settings.redis.lock)
const os = require('os')
const crypto = require('crypto')
const logger = require('logger-sharelatex')
const HOST = os.hostname();
const PID = process.pid;
const RND = crypto.randomBytes(4).toString('hex');
let COUNT = 0;
const HOST = os.hostname()
const PID = process.pid
const RND = crypto.randomBytes(4).toString('hex')
let COUNT = 0
module.exports = (LockManager = {
LOCK_TEST_INTERVAL: 50, // 50ms between each test of the lock
MAX_LOCK_WAIT_TIME: 10000, // 10s maximum time to spend trying to get the lock
LOCK_TTL: 300, // seconds (allow 5 minutes for any operation to complete)
module.exports = LockManager = {
LOCK_TEST_INTERVAL: 50, // 50ms between each test of the lock
MAX_LOCK_WAIT_TIME: 10000, // 10s maximum time to spend trying to get the lock
LOCK_TTL: 300, // seconds (allow 5 minutes for any operation to complete)
// Use a signed lock value as described in
// http://redis.io/topics/distlock#correct-implementation-with-a-single-instance
// to prevent accidental unlocking by multiple processes
randomLock() {
const time = Date.now();
return `locked:host=${HOST}:pid=${PID}:random=${RND}:time=${time}:count=${COUNT++}`;
},
// Use a signed lock value as described in
// http://redis.io/topics/distlock#correct-implementation-with-a-single-instance
// to prevent accidental unlocking by multiple processes
randomLock() {
const time = Date.now()
return `locked:host=${HOST}:pid=${PID}:random=${RND}:time=${time}:count=${COUNT++}`
},
unlockScript: 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end',
unlockScript:
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end',
tryLock(key, callback) {
if (callback == null) { callback = function(err, gotLock) {}; }
const lockValue = LockManager.randomLock();
return rclient.set(key, lockValue, "EX", this.LOCK_TTL, "NX", function(err, gotLock){
if (err != null) { return callback(err); }
if (gotLock === "OK") {
return callback(err, true, lockValue);
} else {
return callback(err, false);
}
});
},
tryLock(key, callback) {
if (callback == null) {
callback = function(err, gotLock) {}
}
const lockValue = LockManager.randomLock()
return rclient.set(key, lockValue, 'EX', this.LOCK_TTL, 'NX', function(
err,
gotLock
) {
if (err != null) {
return callback(err)
}
if (gotLock === 'OK') {
return callback(err, true, lockValue)
} else {
return callback(err, false)
}
})
},
getLock(key, callback) {
let attempt;
if (callback == null) { callback = function(error) {}; }
const startTime = Date.now();
return (attempt = function() {
if ((Date.now() - startTime) > LockManager.MAX_LOCK_WAIT_TIME) {
const e = new Error("Timeout");
e.key = key;
return callback(e);
}
getLock(key, callback) {
let attempt
if (callback == null) {
callback = function(error) {}
}
const startTime = Date.now()
return (attempt = function() {
if (Date.now() - startTime > LockManager.MAX_LOCK_WAIT_TIME) {
const e = new Error('Timeout')
e.key = key
return callback(e)
}
return LockManager.tryLock(key, function(error, gotLock, lockValue) {
if (error != null) { return callback(error); }
if (gotLock) {
return callback(null, lockValue);
} else {
return setTimeout(attempt, LockManager.LOCK_TEST_INTERVAL);
}
});
})();
},
return LockManager.tryLock(key, function(error, gotLock, lockValue) {
if (error != null) {
return callback(error)
}
if (gotLock) {
return callback(null, lockValue)
} else {
return setTimeout(attempt, LockManager.LOCK_TEST_INTERVAL)
}
})
})()
},
checkLock(key, callback) {
if (callback == null) { callback = function(err, isFree) {}; }
return rclient.exists(key, function(err, exists) {
if (err != null) { return callback(err); }
exists = parseInt(exists);
if (exists === 1) {
return callback(err, false);
} else {
return callback(err, true);
}
});
},
checkLock(key, callback) {
if (callback == null) {
callback = function(err, isFree) {}
}
return rclient.exists(key, function(err, exists) {
if (err != null) {
return callback(err)
}
exists = parseInt(exists)
if (exists === 1) {
return callback(err, false)
} else {
return callback(err, true)
}
})
},
releaseLock(key, lockValue, callback) {
return rclient.eval(LockManager.unlockScript, 1, key, lockValue, function(err, result) {
if (err != null) {
return callback(err);
}
if ((result != null) && (result !== 1)) { // successful unlock should release exactly one key
logger.error({key, lockValue, redis_err:err, redis_result:result}, "unlocking error");
return callback(new Error("tried to release timed out lock"));
}
return callback(err,result);
});
},
releaseLock(key, lockValue, callback) {
return rclient.eval(LockManager.unlockScript, 1, key, lockValue, function(
err,
result
) {
if (err != null) {
return callback(err)
}
if (result != null && result !== 1) {
// successful unlock should release exactly one key
logger.error(
{ key, lockValue, redis_err: err, redis_result: result },
'unlocking error'
)
return callback(new Error('tried to release timed out lock'))
}
return callback(err, result)
})
},
runWithLock(key, runner, callback) {
if (callback == null) { callback = function(error) {}; }
return LockManager.getLock(key, function(error, lockValue) {
if (error != null) { return callback(error); }
return runner(error1 =>
LockManager.releaseLock(key, lockValue, function(error2) {
error = error1 || error2;
if (error != null) { return callback(error); }
return callback();
})
);
});
},
runWithLock(key, runner, callback) {
if (callback == null) {
callback = function(error) {}
}
return LockManager.getLock(key, function(error, lockValue) {
if (error != null) {
return callback(error)
}
return runner(error1 =>
LockManager.releaseLock(key, lockValue, function(error2) {
error = error1 || error2
if (error != null) {
return callback(error)
}
return callback()
})
)
})
},
healthCheck(callback) {
const action = releaseLock => releaseLock();
return LockManager.runWithLock(`HistoryLock:HealthCheck:host=${HOST}:pid=${PID}:random=${RND}`, action, callback);
},
healthCheck(callback) {
const action = releaseLock => releaseLock()
return LockManager.runWithLock(
`HistoryLock:HealthCheck:host=${HOST}:pid=${PID}:random=${RND}`,
action,
callback
)
},
close(callback) {
rclient.quit();
return rclient.once('end', callback);
}
});
close(callback) {
rclient.quit()
return rclient.once('end', callback)
}
}

View file

@ -13,137 +13,181 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let MongoAWS;
const settings = require("settings-sharelatex");
const logger = require("logger-sharelatex");
const AWS = require('aws-sdk');
const S3S = require('s3-streams');
const {db, ObjectId} = require("./mongojs");
const JSONStream = require("JSONStream");
const ReadlineStream = require("byline");
const zlib = require("zlib");
const Metrics = require("metrics-sharelatex");
let MongoAWS
const settings = require('settings-sharelatex')
const logger = require('logger-sharelatex')
const AWS = require('aws-sdk')
const S3S = require('s3-streams')
const { db, ObjectId } = require('./mongojs')
const JSONStream = require('JSONStream')
const ReadlineStream = require('byline')
const zlib = require('zlib')
const Metrics = require('metrics-sharelatex')
const DAYS = 24 * 3600 * 1000; // one day in milliseconds
const DAYS = 24 * 3600 * 1000 // one day in milliseconds
const createStream = function(streamConstructor, project_id, doc_id, pack_id) {
const AWS_CONFIG = {
accessKeyId: settings.trackchanges.s3.key,
secretAccessKey: settings.trackchanges.s3.secret,
endpoint: settings.trackchanges.s3.endpoint,
s3ForcePathStyle: settings.trackchanges.s3.pathStyle
};
const AWS_CONFIG = {
accessKeyId: settings.trackchanges.s3.key,
secretAccessKey: settings.trackchanges.s3.secret,
endpoint: settings.trackchanges.s3.endpoint,
s3ForcePathStyle: settings.trackchanges.s3.pathStyle
}
return streamConstructor(new AWS.S3(AWS_CONFIG), {
"Bucket": settings.trackchanges.stores.doc_history,
"Key": project_id+"/changes-"+doc_id+"/pack-"+pack_id
});
};
return streamConstructor(new AWS.S3(AWS_CONFIG), {
Bucket: settings.trackchanges.stores.doc_history,
Key: project_id + '/changes-' + doc_id + '/pack-' + pack_id
})
}
module.exports = (MongoAWS = {
module.exports = MongoAWS = {
archivePack(project_id, doc_id, pack_id, _callback) {
if (_callback == null) {
_callback = function(error) {}
}
const callback = function(...args) {
_callback(...Array.from(args || []))
return (_callback = function() {})
}
archivePack(project_id, doc_id, pack_id, _callback) {
const query = {
_id: ObjectId(pack_id),
doc_id: ObjectId(doc_id)
}
if (_callback == null) { _callback = function(error) {}; }
const callback = function(...args) {
_callback(...Array.from(args || []));
return _callback = function() {};
};
if (project_id == null) {
return callback(new Error('invalid project id'))
}
if (doc_id == null) {
return callback(new Error('invalid doc id'))
}
if (pack_id == null) {
return callback(new Error('invalid pack id'))
}
const query = {
_id: ObjectId(pack_id),
doc_id: ObjectId(doc_id)
};
logger.log({ project_id, doc_id, pack_id }, 'uploading data to s3')
if ((project_id == null)) { return callback(new Error("invalid project id")); }
if ((doc_id == null)) { return callback(new Error("invalid doc id")); }
if ((pack_id == null)) { return callback(new Error("invalid pack id")); }
const upload = createStream(S3S.WriteStream, project_id, doc_id, pack_id)
logger.log({project_id, doc_id, pack_id}, "uploading data to s3");
return db.docHistory.findOne(query, function(err, result) {
if (err != null) {
return callback(err)
}
if (result == null) {
return callback(new Error('cannot find pack to send to s3'))
}
if (result.expiresAt != null) {
return callback(new Error('refusing to send pack with TTL to s3'))
}
const uncompressedData = JSON.stringify(result)
if (uncompressedData.indexOf('\u0000') !== -1) {
const error = new Error('null bytes found in upload')
logger.error({ err: error, project_id, doc_id, pack_id }, error.message)
return callback(error)
}
return zlib.gzip(uncompressedData, function(err, buf) {
logger.log(
{
project_id,
doc_id,
pack_id,
origSize: uncompressedData.length,
newSize: buf.length
},
'compressed pack'
)
if (err != null) {
return callback(err)
}
upload.on('error', err => callback(err))
upload.on('finish', function() {
Metrics.inc('archive-pack')
logger.log({ project_id, doc_id, pack_id }, 'upload to s3 completed')
return callback(null)
})
upload.write(buf)
return upload.end()
})
})
},
const upload = createStream(S3S.WriteStream, project_id, doc_id, pack_id);
readArchivedPack(project_id, doc_id, pack_id, _callback) {
if (_callback == null) {
_callback = function(error, result) {}
}
const callback = function(...args) {
_callback(...Array.from(args || []))
return (_callback = function() {})
}
return db.docHistory.findOne(query, function(err, result) {
if (err != null) { return callback(err); }
if ((result == null)) { return callback(new Error("cannot find pack to send to s3")); }
if (result.expiresAt != null) { return callback(new Error("refusing to send pack with TTL to s3")); }
const uncompressedData = JSON.stringify(result);
if (uncompressedData.indexOf("\u0000") !== -1) {
const error = new Error("null bytes found in upload");
logger.error({err: error, project_id, doc_id, pack_id}, error.message);
return callback(error);
}
return zlib.gzip(uncompressedData, function(err, buf) {
logger.log({project_id, doc_id, pack_id, origSize: uncompressedData.length, newSize: buf.length}, "compressed pack");
if (err != null) { return callback(err); }
upload.on('error', err => callback(err));
upload.on('finish', function() {
Metrics.inc("archive-pack");
logger.log({project_id, doc_id, pack_id}, "upload to s3 completed");
return callback(null);
});
upload.write(buf);
return upload.end();
});
});
},
if (project_id == null) {
return callback(new Error('invalid project id'))
}
if (doc_id == null) {
return callback(new Error('invalid doc id'))
}
if (pack_id == null) {
return callback(new Error('invalid pack id'))
}
readArchivedPack(project_id, doc_id, pack_id, _callback) {
if (_callback == null) { _callback = function(error, result) {}; }
const callback = function(...args) {
_callback(...Array.from(args || []));
return _callback = function() {};
};
logger.log({ project_id, doc_id, pack_id }, 'downloading data from s3')
if ((project_id == null)) { return callback(new Error("invalid project id")); }
if ((doc_id == null)) { return callback(new Error("invalid doc id")); }
if ((pack_id == null)) { return callback(new Error("invalid pack id")); }
const download = createStream(S3S.ReadStream, project_id, doc_id, pack_id)
logger.log({project_id, doc_id, pack_id}, "downloading data from s3");
const inputStream = download
.on('open', obj => 1)
.on('error', err => callback(err))
const download = createStream(S3S.ReadStream, project_id, doc_id, pack_id);
const gunzip = zlib.createGunzip()
gunzip.setEncoding('utf8')
gunzip.on('error', function(err) {
logger.log(
{ project_id, doc_id, pack_id, err },
'error uncompressing gzip stream'
)
return callback(err)
})
const inputStream = download
.on('open', obj => 1).on('error', err => callback(err));
const outputStream = inputStream.pipe(gunzip)
const parts = []
outputStream.on('error', err => callback(err))
outputStream.on('end', function() {
let object
logger.log({ project_id, doc_id, pack_id }, 'download from s3 completed')
try {
object = JSON.parse(parts.join(''))
} catch (e) {
return callback(e)
}
object._id = ObjectId(object._id)
object.doc_id = ObjectId(object.doc_id)
object.project_id = ObjectId(object.project_id)
for (const op of Array.from(object.pack)) {
if (op._id != null) {
op._id = ObjectId(op._id)
}
}
return callback(null, object)
})
return outputStream.on('data', data => parts.push(data))
},
const gunzip = zlib.createGunzip();
gunzip.setEncoding('utf8');
gunzip.on('error', function(err) {
logger.log({project_id, doc_id, pack_id, err}, "error uncompressing gzip stream");
return callback(err);
});
const outputStream = inputStream.pipe(gunzip);
const parts = [];
outputStream.on('error', err => callback(err));
outputStream.on('end', function() {
let object;
logger.log({project_id, doc_id, pack_id}, "download from s3 completed");
try {
object = JSON.parse(parts.join(''));
} catch (e) {
return callback(e);
}
object._id = ObjectId(object._id);
object.doc_id = ObjectId(object.doc_id);
object.project_id = ObjectId(object.project_id);
for (const op of Array.from(object.pack)) {
if (op._id != null) { op._id = ObjectId(op._id); }
}
return callback(null, object);
});
return outputStream.on('data', data => parts.push(data));
},
unArchivePack(project_id, doc_id, pack_id, callback) {
if (callback == null) { callback = function(error) {}; }
return MongoAWS.readArchivedPack(project_id, doc_id, pack_id, function(err, object) {
if (err != null) { return callback(err); }
Metrics.inc("unarchive-pack");
// allow the object to expire, we can always retrieve it again
object.expiresAt = new Date(Date.now() + (7 * DAYS));
logger.log({project_id, doc_id, pack_id}, "inserting object from s3");
return db.docHistory.insert(object, callback);
});
}
});
unArchivePack(project_id, doc_id, pack_id, callback) {
if (callback == null) {
callback = function(error) {}
}
return MongoAWS.readArchivedPack(project_id, doc_id, pack_id, function(
err,
object
) {
if (err != null) {
return callback(err)
}
Metrics.inc('unarchive-pack')
// allow the object to expire, we can always retrieve it again
object.expiresAt = new Date(Date.now() + 7 * DAYS)
logger.log({ project_id, doc_id, pack_id }, 'inserting object from s3')
return db.docHistory.insert(object, callback)
})
}
}

View file

@ -11,128 +11,200 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let MongoManager;
const {db, ObjectId} = require("./mongojs");
const PackManager = require("./PackManager");
const async = require("async");
const _ = require("underscore");
const metrics = require('metrics-sharelatex');
const logger = require('logger-sharelatex');
let MongoManager
const { db, ObjectId } = require('./mongojs')
const PackManager = require('./PackManager')
const async = require('async')
const _ = require('underscore')
const metrics = require('metrics-sharelatex')
const logger = require('logger-sharelatex')
module.exports = (MongoManager = {
getLastCompressedUpdate(doc_id, callback) {
if (callback == null) { callback = function(error, update) {}; }
return db.docHistory
.find({doc_id: ObjectId(doc_id.toString())}, {pack: {$slice:-1}}) // only return the last entry in a pack
.sort({ v: -1 })
.limit(1)
.toArray(function(error, compressedUpdates) {
if (error != null) { return callback(error); }
return callback(null, compressedUpdates[0] || null);
});
},
module.exports = MongoManager = {
getLastCompressedUpdate(doc_id, callback) {
if (callback == null) {
callback = function(error, update) {}
}
return db.docHistory
.find({ doc_id: ObjectId(doc_id.toString()) }, { pack: { $slice: -1 } }) // only return the last entry in a pack
.sort({ v: -1 })
.limit(1)
.toArray(function(error, compressedUpdates) {
if (error != null) {
return callback(error)
}
return callback(null, compressedUpdates[0] || null)
})
},
peekLastCompressedUpdate(doc_id, callback) {
// under normal use we pass back the last update as
// callback(null,update,version).
//
// when we have an existing last update but want to force a new one
// to start, we pass it back as callback(null,null,version), just
// giving the version so we can check consistency.
if (callback == null) { callback = function(error, update, version) {}; }
return MongoManager.getLastCompressedUpdate(doc_id, function(error, update) {
if (error != null) { return callback(error); }
if (update != null) {
if (update.broken) { // marked as broken so we will force a new op
return callback(null, null);
} else if (update.pack != null) {
if (update.finalised) { // no more ops can be appended
return callback(null, null, update.pack[0] != null ? update.pack[0].v : undefined);
} else {
return callback(null, update, update.pack[0] != null ? update.pack[0].v : undefined);
}
} else {
return callback(null, update, update.v);
}
} else {
return PackManager.getLastPackFromIndex(doc_id, function(error, pack) {
if (error != null) { return callback(error); }
if (((pack != null ? pack.inS3 : undefined) != null) && ((pack != null ? pack.v_end : undefined) != null)) { return callback(null, null, pack.v_end); }
return callback(null, null);
});
}
});
},
peekLastCompressedUpdate(doc_id, callback) {
// under normal use we pass back the last update as
// callback(null,update,version).
//
// when we have an existing last update but want to force a new one
// to start, we pass it back as callback(null,null,version), just
// giving the version so we can check consistency.
if (callback == null) {
callback = function(error, update, version) {}
}
return MongoManager.getLastCompressedUpdate(doc_id, function(
error,
update
) {
if (error != null) {
return callback(error)
}
if (update != null) {
if (update.broken) {
// marked as broken so we will force a new op
return callback(null, null)
} else if (update.pack != null) {
if (update.finalised) {
// no more ops can be appended
return callback(
null,
null,
update.pack[0] != null ? update.pack[0].v : undefined
)
} else {
return callback(
null,
update,
update.pack[0] != null ? update.pack[0].v : undefined
)
}
} else {
return callback(null, update, update.v)
}
} else {
return PackManager.getLastPackFromIndex(doc_id, function(error, pack) {
if (error != null) {
return callback(error)
}
if (
(pack != null ? pack.inS3 : undefined) != null &&
(pack != null ? pack.v_end : undefined) != null
) {
return callback(null, null, pack.v_end)
}
return callback(null, null)
})
}
})
},
backportProjectId(project_id, doc_id, callback) {
if (callback == null) { callback = function(error) {}; }
return db.docHistory.update({
doc_id: ObjectId(doc_id.toString()),
project_id: { $exists: false }
}, {
$set: { project_id: ObjectId(project_id.toString()) }
}, {
multi: true
}, callback);
},
backportProjectId(project_id, doc_id, callback) {
if (callback == null) {
callback = function(error) {}
}
return db.docHistory.update(
{
doc_id: ObjectId(doc_id.toString()),
project_id: { $exists: false }
},
{
$set: { project_id: ObjectId(project_id.toString()) }
},
{
multi: true
},
callback
)
},
getProjectMetaData(project_id, callback) {
if (callback == null) { callback = function(error, metadata) {}; }
return db.projectHistoryMetaData.find({
project_id: ObjectId(project_id.toString())
}, function(error, results) {
if (error != null) { return callback(error); }
return callback(null, results[0]);
});
},
getProjectMetaData(project_id, callback) {
if (callback == null) {
callback = function(error, metadata) {}
}
return db.projectHistoryMetaData.find(
{
project_id: ObjectId(project_id.toString())
},
function(error, results) {
if (error != null) {
return callback(error)
}
return callback(null, results[0])
}
)
},
setProjectMetaData(project_id, metadata, callback) {
if (callback == null) { callback = function(error) {}; }
return db.projectHistoryMetaData.update({
project_id: ObjectId(project_id)
}, {
$set: metadata
}, {
upsert: true
}, callback);
},
setProjectMetaData(project_id, metadata, callback) {
if (callback == null) {
callback = function(error) {}
}
return db.projectHistoryMetaData.update(
{
project_id: ObjectId(project_id)
},
{
$set: metadata
},
{
upsert: true
},
callback
)
},
upgradeHistory(project_id, callback) {
// preserve the project's existing history
if (callback == null) { callback = function(error) {}; }
return db.docHistory.update({
project_id: ObjectId(project_id),
temporary: true,
expiresAt: {$exists: true}
}, {
$set: {temporary: false},
$unset: {expiresAt: ""}
}, {
multi: true
}, callback);
},
upgradeHistory(project_id, callback) {
// preserve the project's existing history
if (callback == null) {
callback = function(error) {}
}
return db.docHistory.update(
{
project_id: ObjectId(project_id),
temporary: true,
expiresAt: { $exists: true }
},
{
$set: { temporary: false },
$unset: { expiresAt: '' }
},
{
multi: true
},
callback
)
},
ensureIndices() {
// For finding all updates that go into a diff for a doc
db.docHistory.ensureIndex({ doc_id: 1, v: 1 }, { background: true });
// For finding all updates that affect a project
db.docHistory.ensureIndex({ project_id: 1, "meta.end_ts": 1 }, { background: true });
// For finding updates that don't yet have a project_id and need it inserting
db.docHistory.ensureIndex({ doc_id: 1, project_id: 1 }, { background: true });
// For finding project meta-data
db.projectHistoryMetaData.ensureIndex({ project_id: 1 }, { background: true });
// TTL index for auto deleting week old temporary ops
db.docHistory.ensureIndex({ expiresAt: 1 }, { expireAfterSeconds: 0, background: true });
// For finding packs to be checked for archiving
db.docHistory.ensureIndex({ last_checked: 1 }, { background: true });
// For finding archived packs
return db.docHistoryIndex.ensureIndex({ project_id: 1 }, { background: true });
}
});
ensureIndices() {
// For finding all updates that go into a diff for a doc
db.docHistory.ensureIndex({ doc_id: 1, v: 1 }, { background: true })
// For finding all updates that affect a project
db.docHistory.ensureIndex(
{ project_id: 1, 'meta.end_ts': 1 },
{ background: true }
)
// For finding updates that don't yet have a project_id and need it inserting
db.docHistory.ensureIndex(
{ doc_id: 1, project_id: 1 },
{ background: true }
)
// For finding project meta-data
db.projectHistoryMetaData.ensureIndex(
{ project_id: 1 },
{ background: true }
)
// TTL index for auto deleting week old temporary ops
db.docHistory.ensureIndex(
{ expiresAt: 1 },
{ expireAfterSeconds: 0, background: true }
)
// For finding packs to be checked for archiving
db.docHistory.ensureIndex({ last_checked: 1 }, { background: true })
// For finding archived packs
return db.docHistoryIndex.ensureIndex(
{ project_id: 1 },
{ background: true }
)
}
}
[
'getLastCompressedUpdate',
'getProjectMetaData',
'setProjectMetaData'
].map(method => metrics.timeAsyncMethod(MongoManager, method, 'mongo.MongoManager', logger));
;[
'getLastCompressedUpdate',
'getProjectMetaData',
'setProjectMetaData'
].map(method =>
metrics.timeAsyncMethod(MongoManager, method, 'mongo.MongoManager', logger)
)

File diff suppressed because it is too large Load diff

View file

@ -13,177 +13,199 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let LIMIT, pending;
let project_id, doc_id;
const Settings = require("settings-sharelatex");
const async = require("async");
const _ = require("underscore");
const {db, ObjectId, BSON} = require("./mongojs");
const fs = require("fs");
const Metrics = require("metrics-sharelatex");
Metrics.initialize("track-changes");
const logger = require("logger-sharelatex");
logger.initialize("track-changes-packworker");
let LIMIT, pending
let project_id, doc_id
const Settings = require('settings-sharelatex')
const async = require('async')
const _ = require('underscore')
const { db, ObjectId, BSON } = require('./mongojs')
const fs = require('fs')
const Metrics = require('metrics-sharelatex')
Metrics.initialize('track-changes')
const logger = require('logger-sharelatex')
logger.initialize('track-changes-packworker')
if ((Settings.sentry != null ? Settings.sentry.dsn : undefined) != null) {
logger.initializeErrorReporting(Settings.sentry.dsn);
logger.initializeErrorReporting(Settings.sentry.dsn)
}
const DAYS = 24 * 3600 * 1000;
const DAYS = 24 * 3600 * 1000
const LockManager = require("./LockManager");
const PackManager = require("./PackManager");
const LockManager = require('./LockManager')
const PackManager = require('./PackManager')
// this worker script is forked by the main process to look for
// document histories which can be archived
const source = process.argv[2];
const DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000;
const TIMEOUT = Number(process.argv[4]) || (30*60*1000);
let COUNT = 0; // number processed
let TOTAL = 0; // total number to process
const source = process.argv[2]
const DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000
const TIMEOUT = Number(process.argv[4]) || 30 * 60 * 1000
let COUNT = 0 // number processed
let TOTAL = 0 // total number to process
if (!source.match(/^[0-9]+$/)) {
const file = fs.readFileSync(source);
const result = (() => {
const result1 = [];
for (const line of Array.from(file.toString().split('\n'))) {
[project_id, doc_id] = Array.from(line.split(' '));
result1.push({doc_id, project_id});
}
return result1;
})();
pending = _.filter(result, row => __guard__(row != null ? row.doc_id : undefined, x => x.match(/^[a-f0-9]{24}$/)));
const file = fs.readFileSync(source)
const result = (() => {
const result1 = []
for (const line of Array.from(file.toString().split('\n'))) {
;[project_id, doc_id] = Array.from(line.split(' '))
result1.push({ doc_id, project_id })
}
return result1
})()
pending = _.filter(result, row =>
__guard__(row != null ? row.doc_id : undefined, x =>
x.match(/^[a-f0-9]{24}$/)
)
)
} else {
LIMIT = Number(process.argv[2]) || 1000;
LIMIT = Number(process.argv[2]) || 1000
}
let shutDownRequested = false;
let shutDownRequested = false
const shutDownTimer = setTimeout(function() {
logger.log("pack timed out, requesting shutdown");
// start the shutdown on the next pack
shutDownRequested = true;
// do a hard shutdown after a further 5 minutes
const hardTimeout = setTimeout(function() {
logger.error("HARD TIMEOUT in pack archive worker");
return process.exit();
}
, 5*60*1000);
return hardTimeout.unref();
}
, TIMEOUT);
logger.log('pack timed out, requesting shutdown')
// start the shutdown on the next pack
shutDownRequested = true
// do a hard shutdown after a further 5 minutes
const hardTimeout = setTimeout(function() {
logger.error('HARD TIMEOUT in pack archive worker')
return process.exit()
}, 5 * 60 * 1000)
return hardTimeout.unref()
}, TIMEOUT)
logger.log(`checking for updates, limit=${LIMIT}, delay=${DOCUMENT_PACK_DELAY}, timeout=${TIMEOUT}`);
logger.log(
`checking for updates, limit=${LIMIT}, delay=${DOCUMENT_PACK_DELAY}, timeout=${TIMEOUT}`
)
// work around for https://github.com/mafintosh/mongojs/issues/224
db.close = function(callback) {
return this._getServer(function(err, server) {
if (err != null) { return callback(err); }
server = (server.destroy != null) ? server : server.topology;
server.destroy(true, true);
return callback();
});
};
db.close = function(callback) {
return this._getServer(function(err, server) {
if (err != null) {
return callback(err)
}
server = server.destroy != null ? server : server.topology
server.destroy(true, true)
return callback()
})
}
const finish = function() {
if (shutDownTimer != null) {
logger.log('cancelling timeout');
clearTimeout(shutDownTimer);
}
logger.log('closing db');
return db.close(function() {
logger.log('closing LockManager Redis Connection');
return LockManager.close(function() {
logger.log({processedCount: COUNT, allCount: TOTAL}, 'ready to exit from pack archive worker');
const hardTimeout = setTimeout(function() {
logger.error('hard exit from pack archive worker');
return process.exit(1);
}
, 5*1000);
return hardTimeout.unref();
});
});
};
if (shutDownTimer != null) {
logger.log('cancelling timeout')
clearTimeout(shutDownTimer)
}
logger.log('closing db')
return db.close(function() {
logger.log('closing LockManager Redis Connection')
return LockManager.close(function() {
logger.log(
{ processedCount: COUNT, allCount: TOTAL },
'ready to exit from pack archive worker'
)
const hardTimeout = setTimeout(function() {
logger.error('hard exit from pack archive worker')
return process.exit(1)
}, 5 * 1000)
return hardTimeout.unref()
})
})
}
process.on('exit', code => logger.log({code}, 'pack archive worker exited'));
process.on('exit', code => logger.log({ code }, 'pack archive worker exited'))
const processUpdates = pending =>
async.eachSeries(pending, function(result, callback) {
let _id;
({_id, project_id, doc_id} = result);
COUNT++;
logger.log({project_id, doc_id}, `processing ${COUNT}/${TOTAL}`);
if ((project_id == null) || (doc_id == null)) {
logger.log({project_id, doc_id}, "skipping pack, missing project/doc id");
return callback();
}
const handler = function(err, result) {
if ((err != null) && (err.code === "InternalError") && err.retryable) {
logger.warn({err, result}, "ignoring S3 error in pack archive worker");
// Ignore any s3 errors due to random problems
err = null;
}
if (err != null) {
logger.error({err, result}, "error in pack archive worker");
return callback(err);
}
if (shutDownRequested) {
logger.warn("shutting down pack archive worker");
return callback(new Error("shutdown"));
}
return setTimeout(() => callback(err, result)
, DOCUMENT_PACK_DELAY);
};
if ((_id == null)) {
return PackManager.pushOldPacks(project_id, doc_id, handler);
} else {
return PackManager.processOldPack(project_id, doc_id, _id, handler);
}
}
, function(err, results) {
if ((err != null) && (err.message !== "shutdown")) {
logger.error({err}, 'error in pack archive worker processUpdates');
}
return finish();
})
;
async.eachSeries(
pending,
function(result, callback) {
let _id
;({ _id, project_id, doc_id } = result)
COUNT++
logger.log({ project_id, doc_id }, `processing ${COUNT}/${TOTAL}`)
if (project_id == null || doc_id == null) {
logger.log(
{ project_id, doc_id },
'skipping pack, missing project/doc id'
)
return callback()
}
const handler = function(err, result) {
if (err != null && err.code === 'InternalError' && err.retryable) {
logger.warn(
{ err, result },
'ignoring S3 error in pack archive worker'
)
// Ignore any s3 errors due to random problems
err = null
}
if (err != null) {
logger.error({ err, result }, 'error in pack archive worker')
return callback(err)
}
if (shutDownRequested) {
logger.warn('shutting down pack archive worker')
return callback(new Error('shutdown'))
}
return setTimeout(() => callback(err, result), DOCUMENT_PACK_DELAY)
}
if (_id == null) {
return PackManager.pushOldPacks(project_id, doc_id, handler)
} else {
return PackManager.processOldPack(project_id, doc_id, _id, handler)
}
},
function(err, results) {
if (err != null && err.message !== 'shutdown') {
logger.error({ err }, 'error in pack archive worker processUpdates')
}
return finish()
}
)
// find the packs which can be archived
const ObjectIdFromDate = function(date) {
const id = Math.floor(date.getTime() / 1000).toString(16) + "0000000000000000";
return ObjectId(id);
};
const ObjectIdFromDate = function(date) {
const id = Math.floor(date.getTime() / 1000).toString(16) + '0000000000000000'
return ObjectId(id)
}
// new approach, two passes
// find packs to be marked as finalised:true, those which have a newer pack present
// then only consider finalised:true packs for archiving
if (pending != null) {
logger.log(`got ${pending.length} entries from ${source}`);
processUpdates(pending);
logger.log(`got ${pending.length} entries from ${source}`)
processUpdates(pending)
} else {
const oneWeekAgo = new Date(Date.now() - (7 * DAYS));
db.docHistory.find({
expiresAt: {$exists: false},
project_id: {$exists: true},
v_end: {$exists: true},
_id: {$lt: ObjectIdFromDate(oneWeekAgo)},
last_checked: {$lt: oneWeekAgo}
}, {_id:1, doc_id:1, project_id:1}).sort({
last_checked:1
}).limit(LIMIT, function(err, results) {
if (err != null) {
logger.log({err}, 'error checking for updates');
finish();
return;
}
pending = _.uniq(results, false, result => result.doc_id.toString());
TOTAL = pending.length;
logger.log(`found ${TOTAL} documents to archive`);
return processUpdates(pending);
});
const oneWeekAgo = new Date(Date.now() - 7 * DAYS)
db.docHistory
.find(
{
expiresAt: { $exists: false },
project_id: { $exists: true },
v_end: { $exists: true },
_id: { $lt: ObjectIdFromDate(oneWeekAgo) },
last_checked: { $lt: oneWeekAgo }
},
{ _id: 1, doc_id: 1, project_id: 1 }
)
.sort({
last_checked: 1
})
.limit(LIMIT, function(err, results) {
if (err != null) {
logger.log({ err }, 'error checking for updates')
finish()
return
}
pending = _.uniq(results, false, result => result.doc_id.toString())
TOTAL = pending.length
logger.log(`found ${TOTAL} documents to archive`)
return processUpdates(pending)
})
}
function __guard__(value, transform) {
return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined;
}
return typeof value !== 'undefined' && value !== null
? transform(value)
: undefined
}

View file

@ -11,80 +11,99 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let ProjectIterator;
const Heap = require("heap");
let ProjectIterator
const Heap = require('heap')
module.exports = (ProjectIterator =
module.exports = ProjectIterator = ProjectIterator = class ProjectIterator {
constructor(packs, before, getPackByIdFn) {
this.before = before
this.getPackByIdFn = getPackByIdFn
const byEndTs = (a, b) =>
b.meta.end_ts - a.meta.end_ts || a.fromIndex - b.fromIndex
this.packs = packs.slice().sort(byEndTs)
this.queue = new Heap(byEndTs)
}
(ProjectIterator = class ProjectIterator {
constructor(packs, before, getPackByIdFn) {
this.before = before;
this.getPackByIdFn = getPackByIdFn;
const byEndTs = (a,b) => (b.meta.end_ts - a.meta.end_ts) || (a.fromIndex - b.fromIndex);
this.packs = packs.slice().sort(byEndTs);
this.queue = new Heap(byEndTs);
}
next(callback) {
// what's up next
// console.log ">>> top item", iterator.packs[0]
const iterator = this
const { before } = this
const { queue } = iterator
const opsToReturn = []
let nextPack = iterator.packs[0]
let lowWaterMark =
(nextPack != null ? nextPack.meta.end_ts : undefined) || 0
let nextItem = queue.peek()
next(callback) {
// what's up next
// console.log ">>> top item", iterator.packs[0]
const iterator = this;
const { before } = this;
const { queue } = iterator;
const opsToReturn = [];
let nextPack = iterator.packs[0];
let lowWaterMark = (nextPack != null ? nextPack.meta.end_ts : undefined) || 0;
let nextItem = queue.peek();
// console.log "queue empty?", queue.empty()
// console.log "nextItem", nextItem
// console.log "nextItem.meta.end_ts", nextItem?.meta.end_ts
// console.log "lowWaterMark", lowWaterMark
// console.log "queue empty?", queue.empty()
// console.log "nextItem", nextItem
// console.log "nextItem.meta.end_ts", nextItem?.meta.end_ts
// console.log "lowWaterMark", lowWaterMark
while (
before != null &&
(nextPack != null ? nextPack.meta.start_ts : undefined) > before
) {
// discard pack that is outside range
iterator.packs.shift()
nextPack = iterator.packs[0]
lowWaterMark = (nextPack != null ? nextPack.meta.end_ts : undefined) || 0
}
while ((before != null) && ((nextPack != null ? nextPack.meta.start_ts : undefined) > before)) {
// discard pack that is outside range
iterator.packs.shift();
nextPack = iterator.packs[0];
lowWaterMark = (nextPack != null ? nextPack.meta.end_ts : undefined) || 0;
}
if (
(queue.empty() ||
(nextItem != null ? nextItem.meta.end_ts : undefined) <=
lowWaterMark) &&
nextPack != null
) {
// retrieve the next pack and populate the queue
return this.getPackByIdFn(
nextPack.project_id,
nextPack.doc_id,
nextPack._id,
function(err, pack) {
if (err != null) {
return callback(err)
}
iterator.packs.shift() // have now retrieved this pack, remove it
// console.log "got pack", pack
for (const op of Array.from(pack.pack)) {
// console.log "adding op", op
if (before == null || op.meta.end_ts < before) {
op.doc_id = nextPack.doc_id
op.project_id = nextPack.project_id
queue.push(op)
}
}
// now try again
return iterator.next(callback)
}
)
}
if ((queue.empty() || ((nextItem != null ? nextItem.meta.end_ts : undefined) <= lowWaterMark)) && (nextPack != null)) {
// retrieve the next pack and populate the queue
return this.getPackByIdFn(nextPack.project_id, nextPack.doc_id, nextPack._id, function(err, pack) {
if (err != null) { return callback(err); }
iterator.packs.shift(); // have now retrieved this pack, remove it
// console.log "got pack", pack
for (const op of Array.from(pack.pack)) {
// console.log "adding op", op
if ((before == null) || (op.meta.end_ts < before)) {
op.doc_id = nextPack.doc_id;
op.project_id = nextPack.project_id;
queue.push(op);
}
}
// now try again
return iterator.next(callback);
});
}
// console.log "nextItem", nextItem, "lowWaterMark", lowWaterMark
while (
nextItem != null &&
(nextItem != null ? nextItem.meta.end_ts : undefined) > lowWaterMark
) {
opsToReturn.push(nextItem)
queue.pop()
nextItem = queue.peek()
}
// console.log "nextItem", nextItem, "lowWaterMark", lowWaterMark
while ((nextItem != null) && ((nextItem != null ? nextItem.meta.end_ts : undefined) > lowWaterMark)) {
opsToReturn.push(nextItem);
queue.pop();
nextItem = queue.peek();
}
// console.log "queue empty?", queue.empty()
// console.log "nextPack", nextPack?
// console.log "queue empty?", queue.empty()
// console.log "nextPack", nextPack?
if (queue.empty() && nextPack == null) {
// got everything
iterator._done = true
}
if (queue.empty() && (nextPack == null)) { // got everything
iterator._done = true;
}
return callback(null, opsToReturn)
}
return callback(null, opsToReturn);
}
done() {
return this._done;
}
}));
done() {
return this._done
}
}

View file

@ -12,116 +12,155 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let RedisManager;
const Settings = require("settings-sharelatex");
const redis = require("redis-sharelatex");
const rclient = redis.createClient(Settings.redis.history);
const Keys = Settings.redis.history.key_schema;
const async = require("async");
let RedisManager
const Settings = require('settings-sharelatex')
const redis = require('redis-sharelatex')
const rclient = redis.createClient(Settings.redis.history)
const Keys = Settings.redis.history.key_schema
const async = require('async')
module.exports = (RedisManager = {
module.exports = RedisManager = {
getOldestDocUpdates(doc_id, batchSize, callback) {
if (callback == null) {
callback = function(error, jsonUpdates) {}
}
const key = Keys.uncompressedHistoryOps({ doc_id })
return rclient.lrange(key, 0, batchSize - 1, callback)
},
getOldestDocUpdates(doc_id, batchSize, callback) {
if (callback == null) { callback = function(error, jsonUpdates) {}; }
const key = Keys.uncompressedHistoryOps({doc_id});
return rclient.lrange(key, 0, batchSize - 1, callback);
},
expandDocUpdates(jsonUpdates, callback) {
let rawUpdates
if (callback == null) {
callback = function(error, rawUpdates) {}
}
try {
rawUpdates = Array.from(jsonUpdates || []).map(update =>
JSON.parse(update)
)
} catch (e) {
return callback(e)
}
return callback(null, rawUpdates)
},
expandDocUpdates(jsonUpdates, callback) {
let rawUpdates;
if (callback == null) { callback = function(error, rawUpdates) {}; }
try {
rawUpdates = ( Array.from(jsonUpdates || []).map((update) => JSON.parse(update)) );
} catch (e) {
return callback(e);
}
return callback(null, rawUpdates);
},
deleteAppliedDocUpdates(project_id, doc_id, docUpdates, callback) {
if (callback == null) {
callback = function(error) {}
}
const multi = rclient.multi()
// Delete all the updates which have been applied (exact match)
for (const update of Array.from(docUpdates || [])) {
multi.lrem(Keys.uncompressedHistoryOps({ doc_id }), 1, update)
}
return multi.exec(function(error, results) {
if (error != null) {
return callback(error)
}
// It's ok to delete the doc_id from the set here. Even though the list
// of updates may not be empty, we will continue to process it until it is.
return rclient.srem(
Keys.docsWithHistoryOps({ project_id }),
doc_id,
function(error) {
if (error != null) {
return callback(error)
}
return callback(null)
}
)
})
},
deleteAppliedDocUpdates(project_id, doc_id, docUpdates, callback) {
if (callback == null) { callback = function(error) {}; }
const multi = rclient.multi();
// Delete all the updates which have been applied (exact match)
for (const update of Array.from(docUpdates || [])) {
multi.lrem(Keys.uncompressedHistoryOps({doc_id}), 1, update);
}
return multi.exec(function(error, results) {
if (error != null) { return callback(error); }
// It's ok to delete the doc_id from the set here. Even though the list
// of updates may not be empty, we will continue to process it until it is.
return rclient.srem(Keys.docsWithHistoryOps({project_id}), doc_id, function(error) {
if (error != null) { return callback(error); }
return callback(null);
});
});
},
getDocIdsWithHistoryOps(project_id, callback) {
if (callback == null) {
callback = function(error, doc_ids) {}
}
return rclient.smembers(Keys.docsWithHistoryOps({ project_id }), callback)
},
getDocIdsWithHistoryOps(project_id, callback) {
if (callback == null) { callback = function(error, doc_ids) {}; }
return rclient.smembers(Keys.docsWithHistoryOps({project_id}), callback);
},
// iterate over keys asynchronously using redis scan (non-blocking)
// handle all the cluster nodes or single redis server
_getKeys(pattern, callback) {
const nodes = (typeof rclient.nodes === 'function'
? rclient.nodes('master')
: undefined) || [rclient]
const doKeyLookupForNode = (node, cb) =>
RedisManager._getKeysFromNode(node, pattern, cb)
return async.concatSeries(nodes, doKeyLookupForNode, callback)
},
// iterate over keys asynchronously using redis scan (non-blocking)
// handle all the cluster nodes or single redis server
_getKeys(pattern, callback) {
const nodes = (typeof rclient.nodes === 'function' ? rclient.nodes('master') : undefined) || [ rclient ];
const doKeyLookupForNode = (node, cb) => RedisManager._getKeysFromNode(node, pattern, cb);
return async.concatSeries(nodes, doKeyLookupForNode, callback);
},
_getKeysFromNode(node, pattern, callback) {
let cursor = 0 // redis iterator
const keySet = {} // use hash to avoid duplicate results
// scan over all keys looking for pattern
var doIteration = cb =>
node.scan(cursor, 'MATCH', pattern, 'COUNT', 1000, function(
error,
reply
) {
let keys
if (error != null) {
return callback(error)
}
;[cursor, keys] = Array.from(reply)
for (const key of Array.from(keys)) {
keySet[key] = true
}
if (cursor === '0') {
// note redis returns string result not numeric
return callback(null, Object.keys(keySet))
} else {
return doIteration()
}
})
return doIteration()
},
_getKeysFromNode(node, pattern, callback) {
let cursor = 0; // redis iterator
const keySet = {}; // use hash to avoid duplicate results
// scan over all keys looking for pattern
var doIteration = cb =>
node.scan(cursor, "MATCH", pattern, "COUNT", 1000, function(error, reply) {
let keys;
if (error != null) { return callback(error); }
[cursor, keys] = Array.from(reply);
for (const key of Array.from(keys)) {
keySet[key] = true;
}
if (cursor === '0') { // note redis returns string result not numeric
return callback(null, Object.keys(keySet));
} else {
return doIteration();
}
})
;
return doIteration();
},
// extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
// or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
_extractIds(keyList) {
const ids = (() => {
const result = []
for (const key of Array.from(keyList)) {
const m = key.match(/:\{?([0-9a-f]{24})\}?/) // extract object id
result.push(m[1])
}
return result
})()
return ids
},
// extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
// or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
_extractIds(keyList) {
const ids = (() => {
const result = [];
for (const key of Array.from(keyList)) {
const m = key.match(/:\{?([0-9a-f]{24})\}?/); // extract object id
result.push(m[1]);
}
return result;
})();
return ids;
},
getProjectIdsWithHistoryOps(callback) {
if (callback == null) {
callback = function(error, project_ids) {}
}
return RedisManager._getKeys(
Keys.docsWithHistoryOps({ project_id: '*' }),
function(error, project_keys) {
if (error != null) {
return callback(error)
}
const project_ids = RedisManager._extractIds(project_keys)
return callback(error, project_ids)
}
)
},
getProjectIdsWithHistoryOps(callback) {
if (callback == null) { callback = function(error, project_ids) {}; }
return RedisManager._getKeys(Keys.docsWithHistoryOps({project_id:"*"}), function(error, project_keys) {
if (error != null) { return callback(error); }
const project_ids = RedisManager._extractIds(project_keys);
return callback(error, project_ids);
});
},
getAllDocIdsWithHistoryOps(callback) {
// return all the docids, to find dangling history entries after
// everything is flushed.
if (callback == null) { callback = function(error, doc_ids) {}; }
return RedisManager._getKeys(Keys.uncompressedHistoryOps({doc_id:"*"}), function(error, doc_keys) {
if (error != null) { return callback(error); }
const doc_ids = RedisManager._extractIds(doc_keys);
return callback(error, doc_ids);
});
}
});
getAllDocIdsWithHistoryOps(callback) {
// return all the docids, to find dangling history entries after
// everything is flushed.
if (callback == null) {
callback = function(error, doc_ids) {}
}
return RedisManager._getKeys(
Keys.uncompressedHistoryOps({ doc_id: '*' }),
function(error, doc_keys) {
if (error != null) {
return callback(error)
}
const doc_ids = RedisManager._extractIds(doc_keys)
return callback(error, doc_ids)
}
)
}
}

View file

@ -11,21 +11,38 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let RestoreManager;
const DocumentUpdaterManager = require("./DocumentUpdaterManager");
const DiffManager = require("./DiffManager");
const logger = require("logger-sharelatex");
let RestoreManager
const DocumentUpdaterManager = require('./DocumentUpdaterManager')
const DiffManager = require('./DiffManager')
const logger = require('logger-sharelatex')
module.exports = (RestoreManager = {
restoreToBeforeVersion(project_id, doc_id, version, user_id, callback) {
if (callback == null) { callback = function(error) {}; }
logger.log({project_id, doc_id, version, user_id}, "restoring document");
return DiffManager.getDocumentBeforeVersion(project_id, doc_id, version, function(error, content) {
if (error != null) { return callback(error); }
return DocumentUpdaterManager.setDocument(project_id, doc_id, content, user_id, function(error) {
if (error != null) { return callback(error); }
return callback();
});
});
}
});
module.exports = RestoreManager = {
restoreToBeforeVersion(project_id, doc_id, version, user_id, callback) {
if (callback == null) {
callback = function(error) {}
}
logger.log({ project_id, doc_id, version, user_id }, 'restoring document')
return DiffManager.getDocumentBeforeVersion(
project_id,
doc_id,
version,
function(error, content) {
if (error != null) {
return callback(error)
}
return DocumentUpdaterManager.setDocument(
project_id,
doc_id,
content,
user_id,
function(error) {
if (error != null) {
return callback(error)
}
return callback()
}
)
}
)
}
}

View file

@ -14,274 +14,324 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let oneMinute, twoMegabytes, UpdateCompressor;
const strInject = (s1, pos, s2) => s1.slice(0, pos) + s2 + s1.slice(pos);
const strRemove = (s1, pos, length) => s1.slice(0, pos) + s1.slice((pos + length));
let oneMinute, twoMegabytes, UpdateCompressor
const strInject = (s1, pos, s2) => s1.slice(0, pos) + s2 + s1.slice(pos)
const strRemove = (s1, pos, length) => s1.slice(0, pos) + s1.slice(pos + length)
const { diff_match_patch } = require("../lib/diff_match_patch");
const dmp = new diff_match_patch();
const { diff_match_patch } = require('../lib/diff_match_patch')
const dmp = new diff_match_patch()
module.exports = (UpdateCompressor = {
NOOP: "noop",
module.exports = UpdateCompressor = {
NOOP: 'noop',
// Updates come from the doc updater in format
// {
// op: [ { ... op1 ... }, { ... op2 ... } ]
// meta: { ts: ..., user_id: ... }
// }
// but it's easier to work with on op per update, so convert these updates to
// our compressed format
// [{
// op: op1
// meta: { start_ts: ... , end_ts: ..., user_id: ... }
// }, {
// op: op2
// meta: { start_ts: ... , end_ts: ..., user_id: ... }
// }]
convertToSingleOpUpdates(updates) {
const splitUpdates = [];
for (const update of Array.from(updates)) {
// Reject any non-insert or delete ops, i.e. comments
const ops = update.op.filter(o => (o.i != null) || (o.d != null));
if (ops.length === 0) {
splitUpdates.push({
op: UpdateCompressor.NOOP,
meta: {
start_ts: update.meta.start_ts || update.meta.ts,
end_ts: update.meta.end_ts || update.meta.ts,
user_id: update.meta.user_id
},
v: update.v
});
} else {
for (const op of Array.from(ops)) {
splitUpdates.push({
op,
meta: {
start_ts: update.meta.start_ts || update.meta.ts,
end_ts: update.meta.end_ts || update.meta.ts,
user_id: update.meta.user_id
},
v: update.v
});
}
}
}
return splitUpdates;
},
// Updates come from the doc updater in format
// {
// op: [ { ... op1 ... }, { ... op2 ... } ]
// meta: { ts: ..., user_id: ... }
// }
// but it's easier to work with on op per update, so convert these updates to
// our compressed format
// [{
// op: op1
// meta: { start_ts: ... , end_ts: ..., user_id: ... }
// }, {
// op: op2
// meta: { start_ts: ... , end_ts: ..., user_id: ... }
// }]
convertToSingleOpUpdates(updates) {
const splitUpdates = []
for (const update of Array.from(updates)) {
// Reject any non-insert or delete ops, i.e. comments
const ops = update.op.filter(o => o.i != null || o.d != null)
if (ops.length === 0) {
splitUpdates.push({
op: UpdateCompressor.NOOP,
meta: {
start_ts: update.meta.start_ts || update.meta.ts,
end_ts: update.meta.end_ts || update.meta.ts,
user_id: update.meta.user_id
},
v: update.v
})
} else {
for (const op of Array.from(ops)) {
splitUpdates.push({
op,
meta: {
start_ts: update.meta.start_ts || update.meta.ts,
end_ts: update.meta.end_ts || update.meta.ts,
user_id: update.meta.user_id
},
v: update.v
})
}
}
}
return splitUpdates
},
concatUpdatesWithSameVersion(updates) {
const concattedUpdates = [];
for (const update of Array.from(updates)) {
const lastUpdate = concattedUpdates[concattedUpdates.length - 1];
if ((lastUpdate != null) && (lastUpdate.v === update.v)) {
if (update.op !== UpdateCompressor.NOOP) { lastUpdate.op.push(update.op); }
} else {
const nextUpdate = {
op: [],
meta: update.meta,
v: update.v
};
if (update.op !== UpdateCompressor.NOOP) { nextUpdate.op.push(update.op); }
concattedUpdates.push(nextUpdate);
}
}
return concattedUpdates;
},
concatUpdatesWithSameVersion(updates) {
const concattedUpdates = []
for (const update of Array.from(updates)) {
const lastUpdate = concattedUpdates[concattedUpdates.length - 1]
if (lastUpdate != null && lastUpdate.v === update.v) {
if (update.op !== UpdateCompressor.NOOP) {
lastUpdate.op.push(update.op)
}
} else {
const nextUpdate = {
op: [],
meta: update.meta,
v: update.v
}
if (update.op !== UpdateCompressor.NOOP) {
nextUpdate.op.push(update.op)
}
concattedUpdates.push(nextUpdate)
}
}
return concattedUpdates
},
compressRawUpdates(lastPreviousUpdate, rawUpdates) {
if (__guard__(lastPreviousUpdate != null ? lastPreviousUpdate.op : undefined, x => x.length) > 1) {
// if the last previous update was an array op, don't compress onto it.
// The avoids cases where array length changes but version number doesn't
return [lastPreviousUpdate].concat(UpdateCompressor.compressRawUpdates(null,rawUpdates));
}
if (lastPreviousUpdate != null) {
rawUpdates = [lastPreviousUpdate].concat(rawUpdates);
}
let updates = UpdateCompressor.convertToSingleOpUpdates(rawUpdates);
updates = UpdateCompressor.compressUpdates(updates);
return UpdateCompressor.concatUpdatesWithSameVersion(updates);
},
compressRawUpdates(lastPreviousUpdate, rawUpdates) {
if (
__guard__(
lastPreviousUpdate != null ? lastPreviousUpdate.op : undefined,
x => x.length
) > 1
) {
// if the last previous update was an array op, don't compress onto it.
// The avoids cases where array length changes but version number doesn't
return [lastPreviousUpdate].concat(
UpdateCompressor.compressRawUpdates(null, rawUpdates)
)
}
if (lastPreviousUpdate != null) {
rawUpdates = [lastPreviousUpdate].concat(rawUpdates)
}
let updates = UpdateCompressor.convertToSingleOpUpdates(rawUpdates)
updates = UpdateCompressor.compressUpdates(updates)
return UpdateCompressor.concatUpdatesWithSameVersion(updates)
},
compressUpdates(updates) {
if (updates.length === 0) { return []; }
compressUpdates(updates) {
if (updates.length === 0) {
return []
}
let compressedUpdates = [updates.shift()];
for (const update of Array.from(updates)) {
const lastCompressedUpdate = compressedUpdates.pop();
if (lastCompressedUpdate != null) {
compressedUpdates = compressedUpdates.concat(UpdateCompressor._concatTwoUpdates(lastCompressedUpdate, update));
} else {
compressedUpdates.push(update);
}
}
let compressedUpdates = [updates.shift()]
for (const update of Array.from(updates)) {
const lastCompressedUpdate = compressedUpdates.pop()
if (lastCompressedUpdate != null) {
compressedUpdates = compressedUpdates.concat(
UpdateCompressor._concatTwoUpdates(lastCompressedUpdate, update)
)
} else {
compressedUpdates.push(update)
}
}
return compressedUpdates;
},
return compressedUpdates
},
MAX_TIME_BETWEEN_UPDATES: (oneMinute = 60 * 1000),
MAX_UPDATE_SIZE: (twoMegabytes = 2* 1024 * 1024),
MAX_TIME_BETWEEN_UPDATES: (oneMinute = 60 * 1000),
MAX_UPDATE_SIZE: (twoMegabytes = 2 * 1024 * 1024),
_concatTwoUpdates(firstUpdate, secondUpdate) {
let offset;
firstUpdate = {
op: firstUpdate.op,
meta: {
user_id: firstUpdate.meta.user_id || null,
start_ts: firstUpdate.meta.start_ts || firstUpdate.meta.ts,
end_ts: firstUpdate.meta.end_ts || firstUpdate.meta.ts
},
v: firstUpdate.v
};
secondUpdate = {
op: secondUpdate.op,
meta: {
user_id: secondUpdate.meta.user_id || null,
start_ts: secondUpdate.meta.start_ts || secondUpdate.meta.ts,
end_ts: secondUpdate.meta.end_ts || secondUpdate.meta.ts
},
v: secondUpdate.v
};
_concatTwoUpdates(firstUpdate, secondUpdate) {
let offset
firstUpdate = {
op: firstUpdate.op,
meta: {
user_id: firstUpdate.meta.user_id || null,
start_ts: firstUpdate.meta.start_ts || firstUpdate.meta.ts,
end_ts: firstUpdate.meta.end_ts || firstUpdate.meta.ts
},
v: firstUpdate.v
}
secondUpdate = {
op: secondUpdate.op,
meta: {
user_id: secondUpdate.meta.user_id || null,
start_ts: secondUpdate.meta.start_ts || secondUpdate.meta.ts,
end_ts: secondUpdate.meta.end_ts || secondUpdate.meta.ts
},
v: secondUpdate.v
}
if (firstUpdate.meta.user_id !== secondUpdate.meta.user_id) {
return [firstUpdate, secondUpdate];
}
if (firstUpdate.meta.user_id !== secondUpdate.meta.user_id) {
return [firstUpdate, secondUpdate]
}
if ((secondUpdate.meta.start_ts - firstUpdate.meta.end_ts) > UpdateCompressor.MAX_TIME_BETWEEN_UPDATES) {
return [firstUpdate, secondUpdate];
}
if (
secondUpdate.meta.start_ts - firstUpdate.meta.end_ts >
UpdateCompressor.MAX_TIME_BETWEEN_UPDATES
) {
return [firstUpdate, secondUpdate]
}
const firstOp = firstUpdate.op;
const secondOp = secondUpdate.op;
const firstOp = firstUpdate.op
const secondOp = secondUpdate.op
const firstSize = (firstOp.i != null ? firstOp.i.length : undefined) || (firstOp.d != null ? firstOp.d.length : undefined);
const secondSize = (secondOp.i != null ? secondOp.i.length : undefined) || (secondOp.d != null ? secondOp.d.length : undefined);
const firstSize =
(firstOp.i != null ? firstOp.i.length : undefined) ||
(firstOp.d != null ? firstOp.d.length : undefined)
const secondSize =
(secondOp.i != null ? secondOp.i.length : undefined) ||
(secondOp.d != null ? secondOp.d.length : undefined)
// Two inserts
if ((firstOp.i != null) && (secondOp.i != null) && (firstOp.p <= secondOp.p && secondOp.p <= (firstOp.p + firstOp.i.length)) && ((firstSize + secondSize) < UpdateCompressor.MAX_UPDATE_SIZE)) {
return [{
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op: {
p: firstOp.p,
i: strInject(firstOp.i, secondOp.p - firstOp.p, secondOp.i)
},
v: secondUpdate.v
}
];
// Two deletes
} else if ((firstOp.d != null) && (secondOp.d != null) && (secondOp.p <= firstOp.p && firstOp.p <= (secondOp.p + secondOp.d.length)) && ((firstSize + secondSize) < UpdateCompressor.MAX_UPDATE_SIZE)) {
return [{
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op: {
p: secondOp.p,
d: strInject(secondOp.d, firstOp.p - secondOp.p, firstOp.d)
},
v: secondUpdate.v
}
];
// An insert and then a delete
} else if ((firstOp.i != null) && (secondOp.d != null) && (firstOp.p <= secondOp.p && secondOp.p <= (firstOp.p + firstOp.i.length))) {
offset = secondOp.p - firstOp.p;
const insertedText = firstOp.i.slice(offset, offset + secondOp.d.length);
// Only trim the insert when the delete is fully contained within in it
if (insertedText === secondOp.d) {
const insert = strRemove(firstOp.i, offset, secondOp.d.length);
return [{
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op: {
p: firstOp.p,
i: insert
},
v: secondUpdate.v
}
];
} else {
// This will only happen if the delete extends outside the insert
return [firstUpdate, secondUpdate];
}
// Two inserts
if (
firstOp.i != null &&
secondOp.i != null &&
firstOp.p <= secondOp.p && secondOp.p <= firstOp.p + firstOp.i.length &&
firstSize + secondSize < UpdateCompressor.MAX_UPDATE_SIZE
) {
return [
{
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op: {
p: firstOp.p,
i: strInject(firstOp.i, secondOp.p - firstOp.p, secondOp.i)
},
v: secondUpdate.v
}
]
// Two deletes
} else if (
firstOp.d != null &&
secondOp.d != null &&
secondOp.p <= firstOp.p && firstOp.p <= secondOp.p + secondOp.d.length &&
firstSize + secondSize < UpdateCompressor.MAX_UPDATE_SIZE
) {
return [
{
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op: {
p: secondOp.p,
d: strInject(secondOp.d, firstOp.p - secondOp.p, firstOp.d)
},
v: secondUpdate.v
}
]
// An insert and then a delete
} else if (
firstOp.i != null &&
secondOp.d != null &&
firstOp.p <= secondOp.p && secondOp.p <= firstOp.p + firstOp.i.length
) {
offset = secondOp.p - firstOp.p
const insertedText = firstOp.i.slice(offset, offset + secondOp.d.length)
// Only trim the insert when the delete is fully contained within in it
if (insertedText === secondOp.d) {
const insert = strRemove(firstOp.i, offset, secondOp.d.length)
return [
{
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op: {
p: firstOp.p,
i: insert
},
v: secondUpdate.v
}
]
} else {
// This will only happen if the delete extends outside the insert
return [firstUpdate, secondUpdate]
}
// A delete then an insert at the same place, likely a copy-paste of a chunk of content
} else if ((firstOp.d != null) && (secondOp.i != null) && (firstOp.p === secondOp.p)) {
offset = firstOp.p;
const diff_ops = this.diffAsShareJsOps(firstOp.d, secondOp.i);
if (diff_ops.length === 0) {
return [{ // Noop
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op: {
p: firstOp.p,
i: ""
},
v: secondUpdate.v
}];
} else {
return diff_ops.map(function(op) {
op.p += offset;
return {
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op,
v: secondUpdate.v
};});
}
// A delete then an insert at the same place, likely a copy-paste of a chunk of content
} else if (
firstOp.d != null &&
secondOp.i != null &&
firstOp.p === secondOp.p
) {
offset = firstOp.p
const diff_ops = this.diffAsShareJsOps(firstOp.d, secondOp.i)
if (diff_ops.length === 0) {
return [
{
// Noop
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op: {
p: firstOp.p,
i: ''
},
v: secondUpdate.v
}
]
} else {
return diff_ops.map(function(op) {
op.p += offset
return {
meta: {
start_ts: firstUpdate.meta.start_ts,
end_ts: secondUpdate.meta.end_ts,
user_id: firstUpdate.meta.user_id
},
op,
v: secondUpdate.v
}
})
}
} else {
return [firstUpdate, secondUpdate]
}
},
} else {
return [firstUpdate, secondUpdate];
}
},
ADDED: 1,
REMOVED: -1,
UNCHANGED: 0,
diffAsShareJsOps(before, after, callback) {
if (callback == null) {
callback = function(error, ops) {}
}
const diffs = dmp.diff_main(before, after)
dmp.diff_cleanupSemantic(diffs)
ADDED: 1,
REMOVED: -1,
UNCHANGED: 0,
diffAsShareJsOps(before, after, callback) {
if (callback == null) { callback = function(error, ops) {}; }
const diffs = dmp.diff_main(before, after);
dmp.diff_cleanupSemantic(diffs);
const ops = [];
let position = 0;
for (const diff of Array.from(diffs)) {
const type = diff[0];
const content = diff[1];
if (type === this.ADDED) {
ops.push({
i: content,
p: position
});
position += content.length;
} else if (type === this.REMOVED) {
ops.push({
d: content,
p: position
});
} else if (type === this.UNCHANGED) {
position += content.length;
} else {
throw "Unknown type";
}
}
return ops;
}
});
const ops = []
let position = 0
for (const diff of Array.from(diffs)) {
const type = diff[0]
const content = diff[1]
if (type === this.ADDED) {
ops.push({
i: content,
p: position
})
position += content.length
} else if (type === this.REMOVED) {
ops.push({
d: content,
p: position
})
} else if (type === this.UNCHANGED) {
position += content.length
} else {
throw 'Unknown type'
}
}
return ops
}
}
function __guard__(value, transform) {
return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined;
}
return typeof value !== 'undefined' && value !== null
? transform(value)
: undefined
}

View file

@ -12,40 +12,66 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let UpdateTrimmer;
const MongoManager = require("./MongoManager");
const WebApiManager = require("./WebApiManager");
const logger = require("logger-sharelatex");
module.exports = (UpdateTrimmer = {
shouldTrimUpdates(project_id, callback) {
if (callback == null) { callback = function(error, shouldTrim) {}; }
return MongoManager.getProjectMetaData(project_id, function(error, metadata) {
if (error != null) { return callback(error); }
if (metadata != null ? metadata.preserveHistory : undefined) {
return callback(null, false);
} else {
return WebApiManager.getProjectDetails(project_id, function(error, details) {
if (error != null) { return callback(error); }
logger.log({project_id, details}, "got details");
if (__guard__(details != null ? details.features : undefined, x => x.versioning)) {
return MongoManager.setProjectMetaData(project_id, {preserveHistory: true}, function(error) {
if (error != null) { return callback(error); }
return MongoManager.upgradeHistory(project_id, function(error) {
if (error != null) { return callback(error); }
return callback(null, false);
});
});
} else {
return callback(null, true);
}
});
}
});
}
});
let UpdateTrimmer
const MongoManager = require('./MongoManager')
const WebApiManager = require('./WebApiManager')
const logger = require('logger-sharelatex')
module.exports = UpdateTrimmer = {
shouldTrimUpdates(project_id, callback) {
if (callback == null) {
callback = function(error, shouldTrim) {}
}
return MongoManager.getProjectMetaData(project_id, function(
error,
metadata
) {
if (error != null) {
return callback(error)
}
if (metadata != null ? metadata.preserveHistory : undefined) {
return callback(null, false)
} else {
return WebApiManager.getProjectDetails(project_id, function(
error,
details
) {
if (error != null) {
return callback(error)
}
logger.log({ project_id, details }, 'got details')
if (
__guard__(
details != null ? details.features : undefined,
x => x.versioning
)
) {
return MongoManager.setProjectMetaData(
project_id,
{ preserveHistory: true },
function(error) {
if (error != null) {
return callback(error)
}
return MongoManager.upgradeHistory(project_id, function(error) {
if (error != null) {
return callback(error)
}
return callback(null, false)
})
}
)
} else {
return callback(null, true)
}
})
}
})
}
}
function __guard__(value, transform) {
return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined;
}
return typeof value !== 'undefined' && value !== null
? transform(value)
: undefined
}

File diff suppressed because it is too large Load diff

View file

@ -10,96 +10,107 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let WebApiManager;
const request = require("requestretry"); // allow retry on error https://github.com/FGRibreau/node-request-retry
const logger = require("logger-sharelatex");
const Settings = require("settings-sharelatex");
let WebApiManager
const request = require('requestretry') // allow retry on error https://github.com/FGRibreau/node-request-retry
const logger = require('logger-sharelatex')
const Settings = require('settings-sharelatex')
// Don't let HTTP calls hang for a long time
const MAX_HTTP_REQUEST_LENGTH = 15000; // 15 seconds
const MAX_HTTP_REQUEST_LENGTH = 15000 // 15 seconds
// DEPRECATED! This method of getting user details via track-changes is deprecated
// in the way we lay out our services.
// Instead, web should be responsible for collecting the raw data (user_ids) and
// filling it out with calls to other services. All API calls should create a
// tree-like structure as much as possible, with web as the root.
module.exports = (WebApiManager = {
sendRequest(url, callback) {
if (callback == null) { callback = function(error, body) {}; }
return request.get({
url: `${Settings.apis.web.url}${url}`,
timeout: MAX_HTTP_REQUEST_LENGTH,
maxAttempts: 2, // for node-request-retry
auth: {
user: Settings.apis.web.user,
pass: Settings.apis.web.pass,
sendImmediately: true
}
}, function(error, res, body){
if (error != null) {
return callback(error);
}
if (res.statusCode === 404) {
logger.log({url}, "got 404 from web api");
return callback(null, null);
}
if ((res.statusCode >= 200) && (res.statusCode < 300)) {
return callback(null, body);
} else {
error = new Error(`web returned a non-success status code: ${res.statusCode} (attempts: ${res.attempts})`);
return callback(error);
}
});
},
module.exports = WebApiManager = {
sendRequest(url, callback) {
if (callback == null) {
callback = function(error, body) {}
}
return request.get(
{
url: `${Settings.apis.web.url}${url}`,
timeout: MAX_HTTP_REQUEST_LENGTH,
maxAttempts: 2, // for node-request-retry
auth: {
user: Settings.apis.web.user,
pass: Settings.apis.web.pass,
sendImmediately: true
}
},
function(error, res, body) {
if (error != null) {
return callback(error)
}
if (res.statusCode === 404) {
logger.log({ url }, 'got 404 from web api')
return callback(null, null)
}
if (res.statusCode >= 200 && res.statusCode < 300) {
return callback(null, body)
} else {
error = new Error(
`web returned a non-success status code: ${res.statusCode} (attempts: ${res.attempts})`
)
return callback(error)
}
}
)
},
getUserInfo(user_id, callback) {
if (callback == null) { callback = function(error, userInfo) {}; }
const url = `/user/${user_id}/personal_info`;
logger.log({user_id}, "getting user info from web");
return WebApiManager.sendRequest(url, function(error, body) {
let user;
if (error != null) {
logger.error({err: error, user_id, url}, "error accessing web");
return callback(error);
}
getUserInfo(user_id, callback) {
if (callback == null) {
callback = function(error, userInfo) {}
}
const url = `/user/${user_id}/personal_info`
logger.log({ user_id }, 'getting user info from web')
return WebApiManager.sendRequest(url, function(error, body) {
let user
if (error != null) {
logger.error({ err: error, user_id, url }, 'error accessing web')
return callback(error)
}
if (body === null) {
logger.error({user_id, url}, "no user found");
return callback(null, null);
}
try {
user = JSON.parse(body);
} catch (error1) {
error = error1;
return callback(error);
}
return callback(null, {
id: user.id,
email: user.email,
first_name: user.first_name,
last_name: user.last_name
});
});
},
if (body === null) {
logger.error({ user_id, url }, 'no user found')
return callback(null, null)
}
try {
user = JSON.parse(body)
} catch (error1) {
error = error1
return callback(error)
}
return callback(null, {
id: user.id,
email: user.email,
first_name: user.first_name,
last_name: user.last_name
})
})
},
getProjectDetails(project_id, callback) {
if (callback == null) { callback = function(error, details) {}; }
const url = `/project/${project_id}/details`;
logger.log({project_id}, "getting project details from web");
return WebApiManager.sendRequest(url, function(error, body) {
let project;
if (error != null) {
logger.error({err: error, project_id, url}, "error accessing web");
return callback(error);
}
getProjectDetails(project_id, callback) {
if (callback == null) {
callback = function(error, details) {}
}
const url = `/project/${project_id}/details`
logger.log({ project_id }, 'getting project details from web')
return WebApiManager.sendRequest(url, function(error, body) {
let project
if (error != null) {
logger.error({ err: error, project_id, url }, 'error accessing web')
return callback(error)
}
try {
project = JSON.parse(body);
} catch (error1) {
error = error1;
return callback(error);
}
return callback(null, project);
});
}
});
try {
project = JSON.parse(body)
} catch (error1) {
error = error1
return callback(error)
}
return callback(null, project)
})
}
}

View file

@ -1,12 +1,15 @@
// TODO: This file was created by bulk-decaffeinate.
// Sanity-check the conversion and remove this comment.
const Settings = require("settings-sharelatex");
const mongojs = require("mongojs");
const bson = require("bson");
const db = mongojs(Settings.mongo.url, ["docHistory", "projectHistoryMetaData", "docHistoryIndex"]);
const Settings = require('settings-sharelatex')
const mongojs = require('mongojs')
const bson = require('bson')
const db = mongojs(Settings.mongo.url, [
'docHistory',
'projectHistoryMetaData',
'docHistoryIndex'
])
module.exports = {
db,
ObjectId: mongojs.ObjectId,
BSON: new bson.BSONPure()
};
db,
ObjectId: mongojs.ObjectId,
BSON: new bson.BSONPure()
}