Merge pull request #16644 from overleaf/em-promisify-update-manager

Promisify UpdateManager

GitOrigin-RevId: 2c3e21ee6ef2454f79695ca8623c3d38720ff6bf
This commit is contained in:
Eric Mc Sween 2024-01-30 10:35:54 -05:00 committed by Copybot
parent 14bb3d7114
commit 8136036c33
17 changed files with 648 additions and 645 deletions

View file

@ -97,7 +97,7 @@ function histogram(key, value, buckets, labels = {}) {
} }
class Timer { class Timer {
constructor(key, sampleRate = 1, labels = {}, buckets) { constructor(key, sampleRate = 1, labels = {}, buckets = undefined) {
if (typeof sampleRate === 'object') { if (typeof sampleRate === 'object') {
// called with (key, labels, buckets) // called with (key, labels, buckets)
if (arguments.length === 3) { if (arguments.length === 3) {

View file

@ -1,3 +1,4 @@
const { promisify } = require('util')
const metrics = require('@overleaf/metrics') const metrics = require('@overleaf/metrics')
const logger = require('@overleaf/logger') const logger = require('@overleaf/logger')
const os = require('os') const os = require('os')
@ -64,6 +65,27 @@ module.exports = class RedisLocker {
// read-only copy for unit tests // read-only copy for unit tests
this.unlockScript = UNLOCK_SCRIPT this.unlockScript = UNLOCK_SCRIPT
this.promises = {
checkLock: promisify(this.checkLock.bind(this)),
getLock: promisify(this.getLock.bind(this)),
releaseLock: promisify(this.releaseLock.bind(this)),
// tryLock returns two values: gotLock and lockValue. We need to merge
// these two values into one for the promises version.
tryLock: id =>
new Promise((resolve, reject) => {
this.tryLock(id, (err, gotLock, lockValue) => {
if (err) {
reject(err)
} else if (!gotLock) {
resolve(null)
} else {
resolve(lockValue)
}
})
}),
}
} }
// Use a signed lock value as described in // Use a signed lock value as described in

24
package-lock.json generated
View file

@ -15503,8 +15503,15 @@
"node_modules/@types/chai": { "node_modules/@types/chai": {
"version": "4.3.0", "version": "4.3.0",
"resolved": "https://registry.npmjs.org/@types/chai/-/chai-4.3.0.tgz", "resolved": "https://registry.npmjs.org/@types/chai/-/chai-4.3.0.tgz",
"integrity": "sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw==", "integrity": "sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw=="
"dev": true },
"node_modules/@types/chai-as-promised": {
"version": "7.1.8",
"resolved": "https://registry.npmjs.org/@types/chai-as-promised/-/chai-as-promised-7.1.8.tgz",
"integrity": "sha512-ThlRVIJhr69FLlh6IctTXFkmhtP3NpMZ2QGq69StYLyKZFp/HOp1VdKZj7RvfNWYYcJ1xlbLGLLWj1UvP5u/Gw==",
"dependencies": {
"@types/chai": "*"
}
}, },
"node_modules/@types/connect": { "node_modules/@types/connect": {
"version": "3.4.35", "version": "3.4.35",
@ -43700,6 +43707,7 @@
"@overleaf/ranges-tracker": "*", "@overleaf/ranges-tracker": "*",
"@overleaf/redis-wrapper": "*", "@overleaf/redis-wrapper": "*",
"@overleaf/settings": "*", "@overleaf/settings": "*",
"@types/chai-as-promised": "^7.1.8",
"async": "^3.2.2", "async": "^3.2.2",
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"bunyan": "^1.8.15", "bunyan": "^1.8.15",
@ -54201,6 +54209,7 @@
"@overleaf/ranges-tracker": "*", "@overleaf/ranges-tracker": "*",
"@overleaf/redis-wrapper": "*", "@overleaf/redis-wrapper": "*",
"@overleaf/settings": "*", "@overleaf/settings": "*",
"@types/chai-as-promised": "^7.1.8",
"async": "^3.2.2", "async": "^3.2.2",
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"bunyan": "^1.8.15", "bunyan": "^1.8.15",
@ -61468,8 +61477,15 @@
"@types/chai": { "@types/chai": {
"version": "4.3.0", "version": "4.3.0",
"resolved": "https://registry.npmjs.org/@types/chai/-/chai-4.3.0.tgz", "resolved": "https://registry.npmjs.org/@types/chai/-/chai-4.3.0.tgz",
"integrity": "sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw==", "integrity": "sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw=="
"dev": true },
"@types/chai-as-promised": {
"version": "7.1.8",
"resolved": "https://registry.npmjs.org/@types/chai-as-promised/-/chai-as-promised-7.1.8.tgz",
"integrity": "sha512-ThlRVIJhr69FLlh6IctTXFkmhtP3NpMZ2QGq69StYLyKZFp/HOp1VdKZj7RvfNWYYcJ1xlbLGLLWj1UvP5u/Gw==",
"requires": {
"@types/chai": "*"
}
}, },
"@types/connect": { "@types/connect": {
"version": "3.4.35", "version": "3.4.35",

View file

@ -1,4 +1,4 @@
let DocumentManager const { promisifyAll } = require('@overleaf/promise-utils')
const RedisManager = require('./RedisManager') const RedisManager = require('./RedisManager')
const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager') const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager')
const PersistenceManager = require('./PersistenceManager') const PersistenceManager = require('./PersistenceManager')
@ -11,7 +11,7 @@ const RangesManager = require('./RangesManager')
const MAX_UNFLUSHED_AGE = 300 * 1000 // 5 mins, document should be flushed to mongo this time after a change const MAX_UNFLUSHED_AGE = 300 * 1000 // 5 mins, document should be flushed to mongo this time after a change
module.exports = DocumentManager = { const DocumentManager = {
getDoc(projectId, docId, _callback) { getDoc(projectId, docId, _callback) {
const timer = new Metrics.Timer('docManager.getDoc') const timer = new Metrics.Timer('docManager.getDoc')
const callback = (...args) => { const callback = (...args) => {
@ -680,3 +680,45 @@ module.exports = DocumentManager = {
) )
}, },
} }
module.exports = DocumentManager
module.exports.promises = promisifyAll(DocumentManager, {
multiResult: {
getDoc: [
'lines',
'version',
'ranges',
'pathname',
'projectHistoryId',
'unflushedTime',
'alreadyLoaded',
],
getDocWithLock: [
'lines',
'version',
'ranges',
'pathname',
'projectHistoryId',
'unflushedTime',
'alreadyLoaded',
],
getDocAndFlushIfOld: ['lines', 'version'],
getDocAndFlushIfOldWithLock: ['lines', 'version'],
getDocAndRecentOps: [
'lines',
'version',
'ops',
'ranges',
'pathname',
'projectHistoryId',
],
getDocAndRecentOpsWithLock: [
'lines',
'version',
'ops',
'ranges',
'pathname',
'projectHistoryId',
],
},
})

View file

@ -1,12 +1,12 @@
let HistoryManager
const async = require('async') const async = require('async')
const logger = require('@overleaf/logger') const logger = require('@overleaf/logger')
const { promisifyAll } = require('@overleaf/promise-utils')
const request = require('request') const request = require('request')
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager') const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager')
const metrics = require('./Metrics') const metrics = require('./Metrics')
module.exports = HistoryManager = { const HistoryManager = {
// flush changes in the background // flush changes in the background
flushProjectChangesAsync(projectId) { flushProjectChangesAsync(projectId) {
HistoryManager.flushProjectChanges( HistoryManager.flushProjectChanges(
@ -122,3 +122,12 @@ module.exports = HistoryManager = {
) )
}, },
} }
module.exports = HistoryManager
module.exports.promises = promisifyAll(HistoryManager, {
without: [
'flushProjectChangesAsync',
'recordAndFlushHistoryOps',
'shouldFlushHistoryOps',
],
})

View file

@ -3,7 +3,6 @@ const redis = require('@overleaf/redis-wrapper')
const rclient = redis.createClient(Settings.redis.lock) const rclient = redis.createClient(Settings.redis.lock)
const keys = Settings.redis.lock.key_schema const keys = Settings.redis.lock.key_schema
const RedisLocker = require('@overleaf/redis-wrapper/RedisLocker') const RedisLocker = require('@overleaf/redis-wrapper/RedisLocker')
const { promisify } = require('@overleaf/promise-utils')
module.exports = new RedisLocker({ module.exports = new RedisLocker({
rclient, rclient,
@ -17,10 +16,3 @@ module.exports = new RedisLocker({
metricsPrefix: 'doc', metricsPrefix: 'doc',
lockTTLSeconds: Settings.redisLockTTLSeconds, lockTTLSeconds: Settings.redisLockTTLSeconds,
}) })
module.exports.promises = {
checkLock: promisify(module.exports.checkLock.bind(module.exports)),
getLock: promisify(module.exports.getLock.bind(module.exports)),
releaseLock: promisify(module.exports.releaseLock.bind(module.exports)),
tryLock: promisify(module.exports.tryLock.bind(module.exports)),
}

View file

@ -1,8 +1,9 @@
const { promisify } = require('util')
const { promisifyMultiResult } = require('@overleaf/promise-utils')
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const Errors = require('./Errors') const Errors = require('./Errors')
const Metrics = require('./Metrics') const Metrics = require('./Metrics')
const logger = require('@overleaf/logger') const logger = require('@overleaf/logger')
const { promisifyAll } = require('@overleaf/promise-utils')
const request = require('requestretry').defaults({ const request = require('requestretry').defaults({
maxAttempts: 2, maxAttempts: 2,
retryDelay: 10, retryDelay: 10,
@ -175,10 +176,17 @@ function setDoc(
) )
} }
module.exports = { getDoc, setDoc } module.exports = {
getDoc,
module.exports.promises = promisifyAll(module.exports, { setDoc,
multiResult: { promises: {
getDoc: ['lines', 'version', 'ranges', 'pathname', 'projectHistoryId'], getDoc: promisifyMultiResult(getDoc, [
'lines',
'version',
'ranges',
'pathname',
'projectHistoryId',
]),
setDoc: promisify(setDoc),
}, },
}) }

View file

@ -1,5 +1,5 @@
let ProjectHistoryRedisManager
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const { promisifyAll } = require('@overleaf/promise-utils')
const projectHistoryKeys = Settings.redis?.project_history?.key_schema const projectHistoryKeys = Settings.redis?.project_history?.key_schema
const rclient = require('@overleaf/redis-wrapper').createClient( const rclient = require('@overleaf/redis-wrapper').createClient(
Settings.redis.project_history Settings.redis.project_history
@ -8,7 +8,7 @@ const logger = require('@overleaf/logger')
const metrics = require('./Metrics') const metrics = require('./Metrics')
const { docIsTooLarge } = require('./Limits') const { docIsTooLarge } = require('./Limits')
module.exports = ProjectHistoryRedisManager = { const ProjectHistoryRedisManager = {
queueOps(projectId, ...rest) { queueOps(projectId, ...rest) {
// Record metric for ops pushed onto queue // Record metric for ops pushed onto queue
const callback = rest.pop() const callback = rest.pop()
@ -172,3 +172,6 @@ module.exports = ProjectHistoryRedisManager = {
ProjectHistoryRedisManager.queueOps(projectId, jsonUpdate, callback) ProjectHistoryRedisManager.queueOps(projectId, jsonUpdate, callback)
}, },
} }
module.exports = ProjectHistoryRedisManager
module.exports.promises = promisifyAll(ProjectHistoryRedisManager)

View file

@ -7,7 +7,7 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let RangesManager const { promisifyAll } = require('@overleaf/promise-utils')
const RangesTracker = require('@overleaf/ranges-tracker') const RangesTracker = require('@overleaf/ranges-tracker')
const logger = require('@overleaf/logger') const logger = require('@overleaf/logger')
const Metrics = require('./Metrics') const Metrics = require('./Metrics')
@ -15,7 +15,7 @@ const _ = require('lodash')
const RANGE_DELTA_BUCKETS = [0, 1, 2, 3, 4, 5, 10, 20, 50] const RANGE_DELTA_BUCKETS = [0, 1, 2, 3, 4, 5, 10, 20, 50]
module.exports = RangesManager = { const RangesManager = {
MAX_COMMENTS: 500, MAX_COMMENTS: 500,
MAX_CHANGES: 2000, MAX_CHANGES: 2000,
@ -176,3 +176,11 @@ module.exports = RangesManager = {
return [emptyCount, totalCount] return [emptyCount, totalCount]
}, },
} }
module.exports = RangesManager
module.exports.promises = promisifyAll(RangesManager, {
without: ['_getRanges', '_emptyRangesCount'],
multiResult: {
applyUpdate: ['newRanges', 'rangesWereCollapsed'],
},
})

View file

@ -10,8 +10,8 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let RealTimeRedisManager
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const { promisifyAll } = require('@overleaf/promise-utils')
const rclient = require('@overleaf/redis-wrapper').createClient( const rclient = require('@overleaf/redis-wrapper').createClient(
Settings.redis.documentupdater Settings.redis.documentupdater
) )
@ -30,7 +30,7 @@ let COUNT = 0
const MAX_OPS_PER_ITERATION = 8 // process a limited number of ops for safety const MAX_OPS_PER_ITERATION = 8 // process a limited number of ops for safety
module.exports = RealTimeRedisManager = { const RealTimeRedisManager = {
getPendingUpdatesForDoc(docId, callback) { getPendingUpdatesForDoc(docId, callback) {
const multi = rclient.multi() const multi = rclient.multi()
multi.lrange( multi.lrange(
@ -92,3 +92,8 @@ module.exports = RealTimeRedisManager = {
} }
}, },
} }
module.exports = RealTimeRedisManager
module.exports.promises = promisifyAll(RealTimeRedisManager, {
without: ['sendData'],
})

View file

@ -1,16 +1,15 @@
let RedisManager
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const rclient = require('@overleaf/redis-wrapper').createClient( const rclient = require('@overleaf/redis-wrapper').createClient(
Settings.redis.documentupdater Settings.redis.documentupdater
) )
const logger = require('@overleaf/logger') const logger = require('@overleaf/logger')
const { promisifyAll } = require('@overleaf/promise-utils')
const metrics = require('./Metrics') const metrics = require('./Metrics')
const Errors = require('./Errors') const Errors = require('./Errors')
const crypto = require('crypto') const crypto = require('crypto')
const async = require('async') const async = require('async')
const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager') const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager')
const { docIsTooLarge } = require('./Limits') const { docIsTooLarge } = require('./Limits')
const { promisifyAll } = require('@overleaf/promise-utils')
// Sometimes Redis calls take an unexpectedly long time. We have to be // Sometimes Redis calls take an unexpectedly long time. We have to be
// quick with Redis calls because we're holding a lock that expires // quick with Redis calls because we're holding a lock that expires
@ -28,7 +27,7 @@ const MAX_RANGES_SIZE = 3 * MEGABYTES
const keys = Settings.redis.documentupdater.key_schema const keys = Settings.redis.documentupdater.key_schema
module.exports = RedisManager = { const RedisManager = {
rclient, rclient,
putDocInMemory( putDocInMemory(
@ -619,7 +618,9 @@ module.exports = RedisManager = {
}, },
} }
module.exports.promises = promisifyAll(module.exports, { module.exports = RedisManager
module.exports.promises = promisifyAll(RedisManager, {
without: ['_deserializeRanges', '_computeHash'],
multiResult: { multiResult: {
getDoc: [ getDoc: [
'lines', 'lines',

View file

@ -10,11 +10,11 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let ShareJsUpdateManager
const ShareJsModel = require('./sharejs/server/model') const ShareJsModel = require('./sharejs/server/model')
const ShareJsDB = require('./ShareJsDB') const ShareJsDB = require('./ShareJsDB')
const logger = require('@overleaf/logger') const logger = require('@overleaf/logger')
const Settings = require('@overleaf/settings') const Settings = require('@overleaf/settings')
const { promisifyAll } = require('@overleaf/promise-utils')
const Keys = require('./UpdateKeys') const Keys = require('./UpdateKeys')
const { EventEmitter } = require('events') const { EventEmitter } = require('events')
const util = require('util') const util = require('util')
@ -28,7 +28,7 @@ util.inherits(ShareJsModel, EventEmitter)
const MAX_AGE_OF_OP = 80 const MAX_AGE_OF_OP = 80
module.exports = ShareJsUpdateManager = { const ShareJsUpdateManager = {
getNewShareJsModel(projectId, docId, lines, version) { getNewShareJsModel(projectId, docId, lines, version) {
const db = new ShareJsDB(projectId, docId, lines, version) const db = new ShareJsDB(projectId, docId, lines, version)
const model = new ShareJsModel(db, { const model = new ShareJsModel(db, {
@ -143,3 +143,11 @@ module.exports = ShareJsUpdateManager = {
.digest('hex') .digest('hex')
}, },
} }
module.exports = ShareJsUpdateManager
module.exports.promises = promisifyAll(ShareJsUpdateManager, {
without: ['getNewShareJsModel', '_listenForOps', '_sendOp', '_computeHash'],
multiResult: {
applyUpdate: ['updatedDocLines', 'version', 'appliedOps'],
},
})

View file

@ -10,10 +10,10 @@
* DS207: Consider shorter variations of null checks * DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/ */
let SnapshotManager const { promisifyAll } = require('@overleaf/promise-utils')
const { db, ObjectId } = require('./mongodb') const { db, ObjectId } = require('./mongodb')
module.exports = SnapshotManager = { const SnapshotManager = {
recordSnapshot(projectId, docId, version, pathname, lines, ranges, callback) { recordSnapshot(projectId, docId, version, pathname, lines, ranges, callback) {
try { try {
projectId = new ObjectId(projectId) projectId = new ObjectId(projectId)
@ -76,3 +76,8 @@ module.exports = SnapshotManager = {
} }
}, },
} }
module.exports = SnapshotManager
module.exports.promises = promisifyAll(SnapshotManager, {
without: ['jsonRangesToMongo', '_safeObjectId'],
})

View file

@ -1,24 +1,12 @@
/* eslint-disable // @ts-check
no-unused-vars,
*/ const { callbackifyAll } = require('@overleaf/promise-utils')
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS201: Simplify complex destructure assignments
* DS205: Consider reworking code to avoid use of IIFEs
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let UpdateManager
const LockManager = require('./LockManager') const LockManager = require('./LockManager')
const RedisManager = require('./RedisManager') const RedisManager = require('./RedisManager')
const RealTimeRedisManager = require('./RealTimeRedisManager') const RealTimeRedisManager = require('./RealTimeRedisManager')
const ShareJsUpdateManager = require('./ShareJsUpdateManager') const ShareJsUpdateManager = require('./ShareJsUpdateManager')
const HistoryManager = require('./HistoryManager') const HistoryManager = require('./HistoryManager')
const Settings = require('@overleaf/settings')
const _ = require('lodash') const _ = require('lodash')
const async = require('async')
const logger = require('@overleaf/logger') const logger = require('@overleaf/logger')
const Metrics = require('./Metrics') const Metrics = require('./Metrics')
const Errors = require('./Errors') const Errors = require('./Errors')
@ -27,337 +15,228 @@ const RangesManager = require('./RangesManager')
const SnapshotManager = require('./SnapshotManager') const SnapshotManager = require('./SnapshotManager')
const Profiler = require('./Profiler') const Profiler = require('./Profiler')
module.exports = UpdateManager = { const UpdateManager = {
processOutstandingUpdates(projectId, docId, callback) { async processOutstandingUpdates(projectId, docId) {
if (!callback) {
callback = function () {}
}
const timer = new Metrics.Timer('updateManager.processOutstandingUpdates') const timer = new Metrics.Timer('updateManager.processOutstandingUpdates')
UpdateManager.fetchAndApplyUpdates(projectId, docId, function (error) { try {
timer.done() await UpdateManager.fetchAndApplyUpdates(projectId, docId)
callback(error) timer.done({ status: 'success' })
}) } catch (err) {
timer.done({ status: 'error' })
throw err
}
}, },
processOutstandingUpdatesWithLock(projectId, docId, callback) { async processOutstandingUpdatesWithLock(projectId, docId) {
if (!callback) {
callback = function () {}
}
const profile = new Profiler('processOutstandingUpdatesWithLock', { const profile = new Profiler('processOutstandingUpdatesWithLock', {
project_id: projectId, project_id: projectId,
doc_id: docId, doc_id: docId,
}) })
LockManager.tryLock(docId, (error, gotLock, lockValue) => {
if (error) { const lockValue = await LockManager.promises.tryLock(docId)
return callback(error) if (lockValue == null) {
} return
if (!gotLock) { }
return callback() profile.log('tryLock')
}
profile.log('tryLock') try {
UpdateManager.processOutstandingUpdates( await UpdateManager.processOutstandingUpdates(projectId, docId)
projectId, profile.log('processOutstandingUpdates')
docId, } finally {
function (error) { await LockManager.promises.releaseLock(docId, lockValue)
if (error) { profile.log('releaseLock').end()
return UpdateManager._handleErrorInsideLock( }
docId,
lockValue, await UpdateManager.continueProcessingUpdatesWithLock(projectId, docId)
error,
callback
)
}
profile.log('processOutstandingUpdates')
LockManager.releaseLock(docId, lockValue, error => {
if (error) {
return callback(error)
}
profile.log('releaseLock').end()
UpdateManager.continueProcessingUpdatesWithLock(
projectId,
docId,
callback
)
})
}
)
})
}, },
continueProcessingUpdatesWithLock(projectId, docId, callback) { async continueProcessingUpdatesWithLock(projectId, docId) {
if (!callback) { const length = await RealTimeRedisManager.promises.getUpdatesLength(docId)
callback = function () {} if (length > 0) {
await UpdateManager.processOutstandingUpdatesWithLock(projectId, docId)
} }
RealTimeRedisManager.getUpdatesLength(docId, (error, length) => {
if (error) {
return callback(error)
}
if (length > 0) {
UpdateManager.processOutstandingUpdatesWithLock(
projectId,
docId,
callback
)
} else {
callback()
}
})
}, },
fetchAndApplyUpdates(projectId, docId, callback) { async fetchAndApplyUpdates(projectId, docId) {
if (!callback) {
callback = function () {}
}
const profile = new Profiler('fetchAndApplyUpdates', { const profile = new Profiler('fetchAndApplyUpdates', {
project_id: projectId, project_id: projectId,
doc_id: docId, doc_id: docId,
}) })
RealTimeRedisManager.getPendingUpdatesForDoc(docId, (error, updates) => {
if (error) { const updates = await RealTimeRedisManager.promises.getPendingUpdatesForDoc(
return callback(error) docId
} )
logger.debug( logger.debug(
{ projectId, docId, count: updates.length }, { projectId, docId, count: updates.length },
'processing updates' 'processing updates'
) )
if (updates.length === 0) { if (updates.length === 0) {
return callback() return
} }
profile.log('getPendingUpdatesForDoc') profile.log('getPendingUpdatesForDoc')
const doUpdate = (update, cb) =>
UpdateManager.applyUpdate(projectId, docId, update, function (err) { for (const update of updates) {
profile.log('applyUpdate') await UpdateManager.applyUpdate(projectId, docId, update)
cb(err) profile.log('applyUpdate')
}) }
const finalCallback = function (err) { profile.log('async done').end()
profile.log('async done').end()
callback(err)
}
async.eachSeries(updates, doUpdate, finalCallback)
})
}, },
applyUpdate(projectId, docId, update, _callback) { async applyUpdate(projectId, docId, update) {
if (_callback == null) {
_callback = function () {}
}
const callback = function (error) {
if (error) {
RealTimeRedisManager.sendData({
project_id: projectId,
doc_id: docId,
error: error.message || error,
})
profile.log('sendData')
}
profile.end()
_callback(error)
}
const profile = new Profiler('applyUpdate', { const profile = new Profiler('applyUpdate', {
project_id: projectId, project_id: projectId,
doc_id: docId, doc_id: docId,
}) })
UpdateManager._sanitizeUpdate(update) UpdateManager._sanitizeUpdate(update)
profile.log('sanitizeUpdate', { sync: true }) profile.log('sanitizeUpdate', { sync: true })
DocumentManager.getDoc(
projectId, try {
docId, let { lines, version, ranges, pathname, projectHistoryId } =
function (error, lines, version, ranges, pathname, projectHistoryId) { await DocumentManager.promises.getDoc(projectId, docId)
profile.log('getDoc') profile.log('getDoc')
if (error) {
return callback(error) if (lines == null || version == null) {
} throw new Errors.NotFoundError(`document not found: ${docId}`)
if (lines == null || version == null) { }
return callback(
new Errors.NotFoundError(`document not found: ${docId}`) const previousVersion = version
) const incomingUpdateVersion = update.v
} let updatedDocLines, appliedOps
const previousVersion = version ;({ updatedDocLines, version, appliedOps } =
const incomingUpdateVersion = update.v await ShareJsUpdateManager.promises.applyUpdate(
ShareJsUpdateManager.applyUpdate(
projectId, projectId,
docId, docId,
update, update,
lines, lines,
version, version
function (error, updatedDocLines, version, appliedOps) { ))
profile.log('sharejs.applyUpdate', { profile.log('sharejs.applyUpdate', {
// only synchronous when the update applies directly to the // only synchronous when the update applies directly to the
// doc version, otherwise getPreviousDocOps is called. // doc version, otherwise getPreviousDocOps is called.
sync: incomingUpdateVersion === previousVersion, sync: incomingUpdateVersion === previousVersion,
}) })
if (error) {
return callback(error) const { newRanges, rangesWereCollapsed } =
} await RangesManager.promises.applyUpdate(
RangesManager.applyUpdate( projectId,
projectId, docId,
docId, ranges,
ranges, appliedOps,
appliedOps, updatedDocLines
updatedDocLines, )
function (error, newRanges, rangesWereCollapsed) { profile.log('RangesManager.applyUpdate', { sync: true })
UpdateManager._addProjectHistoryMetadataToOps(
appliedOps, UpdateManager._addProjectHistoryMetadataToOps(
pathname, appliedOps,
projectHistoryId, pathname,
lines projectHistoryId,
) lines
profile.log('RangesManager.applyUpdate', { sync: true }) )
if (error) {
return callback(error) const projectOpsLength = await RedisManager.promises.updateDocument(
} projectId,
RedisManager.updateDocument( docId,
projectId, updatedDocLines,
docId, version,
updatedDocLines, appliedOps,
version, newRanges,
appliedOps, update.meta
newRanges, )
update.meta, profile.log('RedisManager.updateDocument')
function (error, projectOpsLength) {
profile.log('RedisManager.updateDocument') HistoryManager.recordAndFlushHistoryOps(
if (error) { projectId,
return callback(error) appliedOps,
} projectOpsLength
HistoryManager.recordAndFlushHistoryOps( )
projectId, profile.log('recordAndFlushHistoryOps')
appliedOps,
projectOpsLength if (rangesWereCollapsed) {
) Metrics.inc('doc-snapshot')
profile.log('recordAndFlushHistoryOps') logger.debug(
if (rangesWereCollapsed) { {
Metrics.inc('doc-snapshot') projectId,
logger.debug( docId,
{ previousVersion,
projectId, lines,
docId, ranges,
previousVersion, update,
lines, },
ranges, 'update collapsed some ranges, snapshotting previous content'
update, )
},
'update collapsed some ranges, snapshotting previous content' // Do this last, since it's a mongo call, and so potentially longest running
) // If it overruns the lock, it's ok, since all of our redis work is done
// Do this last, since it's a mongo call, and so potentially longest running await SnapshotManager.promises.recordSnapshot(
// If it overruns the lock, it's ok, since all of our redis work is done projectId,
SnapshotManager.recordSnapshot( docId,
projectId, previousVersion,
docId, pathname,
previousVersion, lines,
pathname, ranges
lines,
ranges,
function (error) {
if (error) {
logger.error(
{
err: error,
projectId,
docId,
version,
lines,
ranges,
},
'error recording snapshot'
)
callback(error)
} else {
callback()
}
}
)
} else {
callback()
}
}
)
}
)
}
) )
} }
) } catch (error) {
RealTimeRedisManager.sendData({
project_id: projectId,
doc_id: docId,
error: error instanceof Error ? error.message : error,
})
profile.log('sendData')
throw error
} finally {
profile.end()
}
}, },
lockUpdatesAndDo(method, projectId, docId, ...rest) { // lockUpdatesAndDo can't be promisified yet because it expects a
const adjustedLength = Math.max(rest.length, 1) // callback-style function
const args = rest.slice(0, adjustedLength - 1) async lockUpdatesAndDo(method, projectId, docId, ...args) {
const callback = rest[adjustedLength - 1]
const profile = new Profiler('lockUpdatesAndDo', { const profile = new Profiler('lockUpdatesAndDo', {
project_id: projectId, project_id: projectId,
doc_id: docId, doc_id: docId,
}) })
return LockManager.getLock(docId, function (error, lockValue) {
profile.log('getLock')
if (error) {
return callback(error)
}
UpdateManager.processOutstandingUpdates(
projectId,
docId,
function (error) {
if (error) {
return UpdateManager._handleErrorInsideLock(
docId,
lockValue,
error,
callback
)
}
profile.log('processOutstandingUpdates')
method(
projectId,
docId,
...Array.from(args),
function (error, ...responseArgs) {
if (error) {
return UpdateManager._handleErrorInsideLock(
docId,
lockValue,
error,
callback
)
}
profile.log('method')
LockManager.releaseLock(docId, lockValue, function (error) {
if (error) {
return callback(error)
}
profile.log('releaseLock').end()
callback(null, ...Array.from(responseArgs))
// We held the lock for a while so updates might have queued up
UpdateManager.continueProcessingUpdatesWithLock(
projectId,
docId,
err => {
if (err) {
// The processing may fail for invalid user updates.
// This can be very noisy, put them on level DEBUG
// and record a metric.
Metrics.inc('background-processing-updates-error')
logger.debug(
{ err, projectId, docId },
'error processing updates in background'
)
}
}
)
})
}
)
}
)
})
},
_handleErrorInsideLock(docId, lockValue, originalError, callback) { const lockValue = await LockManager.promises.getLock(docId)
if (!callback) { profile.log('getLock')
callback = function () {}
let responseArgs
try {
await UpdateManager.processOutstandingUpdates(projectId, docId)
profile.log('processOutstandingUpdates')
// TODO: method is still a callback-style function. Change this when promisifying DocumentManager
responseArgs = await new Promise((resolve, reject) => {
method(projectId, docId, ...args, (error, ...responseArgs) => {
if (error) {
reject(error)
} else {
resolve(responseArgs)
}
})
})
profile.log('method')
} finally {
await LockManager.promises.releaseLock(docId, lockValue)
profile.log('releaseLock').end()
} }
LockManager.releaseLock(docId, lockValue, lockError =>
callback(originalError) // We held the lock for a while so updates might have queued up
UpdateManager.continueProcessingUpdatesWithLock(projectId, docId).catch(
err => {
// The processing may fail for invalid user updates.
// This can be very noisy, put them on level DEBUG
// and record a metric.
Metrics.inc('background-processing-updates-error')
logger.debug(
{ err, projectId, docId },
'error processing updates in background'
)
}
) )
return responseArgs
}, },
_sanitizeUpdate(update) { _sanitizeUpdate(update) {
@ -372,7 +251,7 @@ module.exports = UpdateManager = {
// 16-bit character of a blackboard bold character (http://www.fileformat.info/info/unicode/char/1d400/index.htm). // 16-bit character of a blackboard bold character (http://www.fileformat.info/info/unicode/char/1d400/index.htm).
// Something must be going on client side that is screwing up the encoding and splitting the // Something must be going on client side that is screwing up the encoding and splitting the
// two 16-bit characters so that \uD835 is standalone. // two 16-bit characters so that \uD835 is standalone.
for (const op of Array.from(update.op || [])) { for (const op of update.op || []) {
if (op.i != null) { if (op.i != null) {
// Replace high and low surrogate characters with 'replacement character' (\uFFFD) // Replace high and low surrogate characters with 'replacement character' (\uFFFD)
op.i = op.i.replace(/[\uD800-\uDFFF]/g, '\uFFFD') op.i = op.i.replace(/[\uD800-\uDFFF]/g, '\uFFFD')
@ -384,7 +263,7 @@ module.exports = UpdateManager = {
_addProjectHistoryMetadataToOps(updates, pathname, projectHistoryId, lines) { _addProjectHistoryMetadataToOps(updates, pathname, projectHistoryId, lines) {
let docLength = _.reduce(lines, (chars, line) => chars + line.length, 0) let docLength = _.reduce(lines, (chars, line) => chars + line.length, 0)
docLength += lines.length - 1 // count newline characters docLength += lines.length - 1 // count newline characters
return updates.forEach(function (update) { updates.forEach(function (update) {
update.projectHistoryId = projectHistoryId update.projectHistoryId = projectHistoryId
if (!update.meta) { if (!update.meta) {
update.meta = {} update.meta = {}
@ -400,20 +279,41 @@ module.exports = UpdateManager = {
// We want to include the doc_length at the start of each update, // We want to include the doc_length at the start of each update,
// before it's ops are applied. However, we need to track any // before it's ops are applied. However, we need to track any
// changes to it for the next update. // changes to it for the next update.
return (() => { for (const op of update.op) {
const result = [] if (op.i != null) {
for (const op of Array.from(update.op)) { docLength += op.i.length
if (op.i != null) {
docLength += op.i.length
}
if (op.d != null) {
result.push((docLength -= op.d.length))
} else {
result.push(undefined)
}
} }
return result if (op.d != null) {
})() docLength -= op.d.length
}
}
}) })
}, },
} }
const CallbackifiedUpdateManager = callbackifyAll(UpdateManager)
module.exports = CallbackifiedUpdateManager
module.exports.promises = UpdateManager
module.exports.lockUpdatesAndDo = function lockUpdatesAndDo(
method,
projectId,
docId,
...rest
) {
const adjustedLength = Math.max(rest.length, 1)
const args = rest.slice(0, adjustedLength - 1)
const callback = rest[adjustedLength - 1]
// TODO: During the transition to promises, UpdateManager.lockUpdatesAndDo
// returns the potentially multiple arguments that must be provided to the
// callback in an array.
UpdateManager.lockUpdatesAndDo(method, projectId, docId, ...args)
.then(responseArgs => {
callback(null, ...responseArgs)
})
.catch(err => {
callback(err)
})
}

View file

@ -25,6 +25,7 @@
"@overleaf/ranges-tracker": "*", "@overleaf/ranges-tracker": "*",
"@overleaf/redis-wrapper": "*", "@overleaf/redis-wrapper": "*",
"@overleaf/settings": "*", "@overleaf/settings": "*",
"@types/chai-as-promised": "^7.1.8",
"async": "^3.2.2", "async": "^3.2.2",
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"bunyan": "^1.8.15", "bunyan": "^1.8.15",

View file

@ -1,12 +1,15 @@
const chai = require('chai') const chai = require('chai')
const chaiAsPromised = require('chai-as-promised')
const sinonChai = require('sinon-chai')
const SandboxedModule = require('sandboxed-module') const SandboxedModule = require('sandboxed-module')
const sinon = require('sinon') const sinon = require('sinon')
// Chai configuration // Chai configuration
chai.should() chai.should()
chai.use(chaiAsPromised)
// Load sinon-chai assertions so expect(stubFn).to.have.been.calledWith('abc') // Load sinon-chai assertions so expect(stubFn).to.have.been.calledWith('abc')
// has a nicer failure messages // has a nicer failure messages
chai.use(require('sinon-chai')) chai.use(sinonChai)
// Global stubs // Global stubs
const sandbox = sinon.createSandbox() const sandbox = sinon.createSandbox()

View file

@ -1,82 +1,113 @@
/* eslint-disable // @ts-check
no-unused-vars,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS206: Consider reworking classes to avoid initClass
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
const sinon = require('sinon') const sinon = require('sinon')
const modulePath = '../../../../app/js/UpdateManager.js' const { expect } = require('chai')
const SandboxedModule = require('sandboxed-module') const SandboxedModule = require('sandboxed-module')
const MODULE_PATH = '../../../../app/js/UpdateManager.js'
describe('UpdateManager', function () { describe('UpdateManager', function () {
beforeEach(function () { beforeEach(function () {
let Profiler, Timer
this.project_id = 'project-id-123' this.project_id = 'project-id-123'
this.projectHistoryId = 'history-id-123' this.projectHistoryId = 'history-id-123'
this.doc_id = 'document-id-123' this.doc_id = 'document-id-123'
this.callback = sinon.stub() this.lockValue = 'mock-lock-value'
this.UpdateManager = SandboxedModule.require(modulePath, {
this.Metrics = {
inc: sinon.stub(),
Timer: class Timer {},
}
this.Metrics.Timer.prototype.done = sinon.stub()
this.Profiler = class Profiler {}
this.Profiler.prototype.log = sinon.stub().returns({ end: sinon.stub() })
this.Profiler.prototype.end = sinon.stub()
this.LockManager = {
promises: {
tryLock: sinon.stub().resolves(this.lockValue),
getLock: sinon.stub().resolves(this.lockValue),
releaseLock: sinon.stub().resolves(),
},
}
this.RedisManager = {
promises: {
setDocument: sinon.stub().resolves(),
updateDocument: sinon.stub(),
},
}
this.RealTimeRedisManager = {
sendData: sinon.stub(),
promises: {
getUpdatesLength: sinon.stub(),
getPendingUpdatesForDoc: sinon.stub(),
},
}
this.ShareJsUpdateManager = {
promises: {
applyUpdate: sinon.stub(),
},
}
this.HistoryManager = {
recordAndFlushHistoryOps: sinon.stub(),
}
this.Settings = {}
this.DocumentManager = {
promises: {
getDoc: sinon.stub(),
},
}
this.RangesManager = {
promises: {
applyUpdate: sinon.stub(),
},
}
this.SnapshotManager = {
promises: {
recordSnapshot: sinon.stub().resolves(),
},
}
this.UpdateManager = SandboxedModule.require(MODULE_PATH, {
requires: { requires: {
'./LockManager': (this.LockManager = {}), './LockManager': this.LockManager,
'./RedisManager': (this.RedisManager = {}), './RedisManager': this.RedisManager,
'./RealTimeRedisManager': (this.RealTimeRedisManager = {}), './RealTimeRedisManager': this.RealTimeRedisManager,
'./ShareJsUpdateManager': (this.ShareJsUpdateManager = {}), './ShareJsUpdateManager': this.ShareJsUpdateManager,
'./HistoryManager': (this.HistoryManager = {}), './HistoryManager': this.HistoryManager,
'./Metrics': (this.Metrics = { './Metrics': this.Metrics,
inc: sinon.stub(), '@overleaf/settings': this.Settings,
Timer: (Timer = (function () { './DocumentManager': this.DocumentManager,
Timer = class Timer { './RangesManager': this.RangesManager,
static initClass() { './SnapshotManager': this.SnapshotManager,
this.prototype.done = sinon.stub() './Profiler': this.Profiler,
}
}
Timer.initClass()
return Timer
})()),
}),
'@overleaf/settings': (this.Settings = {}),
'./DocumentManager': (this.DocumentManager = {}),
'./RangesManager': (this.RangesManager = {}),
'./SnapshotManager': (this.SnapshotManager = {}),
'./Profiler': (Profiler = (function () {
Profiler = class Profiler {
static initClass() {
this.prototype.log = sinon.stub().returns({ end: sinon.stub() })
this.prototype.end = sinon.stub()
}
}
Profiler.initClass()
return Profiler
})()),
}, },
}) })
}) })
describe('processOutstandingUpdates', function () { describe('processOutstandingUpdates', function () {
beforeEach(function () { beforeEach(async function () {
this.UpdateManager.fetchAndApplyUpdates = sinon.stub().callsArg(2) this.UpdateManager.promises.fetchAndApplyUpdates = sinon.stub().resolves()
this.UpdateManager.processOutstandingUpdates( await this.UpdateManager.promises.processOutstandingUpdates(
this.project_id, this.project_id,
this.doc_id, this.doc_id
this.callback
) )
}) })
it('should apply the updates', function () { it('should apply the updates', function () {
this.UpdateManager.fetchAndApplyUpdates this.UpdateManager.promises.fetchAndApplyUpdates
.calledWith(this.project_id, this.doc_id) .calledWith(this.project_id, this.doc_id)
.should.equal(true) .should.equal(true)
}) })
it('should call the callback', function () {
this.callback.called.should.equal(true)
})
it('should time the execution', function () { it('should time the execution', function () {
this.Metrics.Timer.prototype.done.called.should.equal(true) this.Metrics.Timer.prototype.done.called.should.equal(true)
}) })
@ -85,219 +116,184 @@ describe('UpdateManager', function () {
describe('processOutstandingUpdatesWithLock', function () { describe('processOutstandingUpdatesWithLock', function () {
describe('when the lock is free', function () { describe('when the lock is free', function () {
beforeEach(function () { beforeEach(function () {
this.LockManager.tryLock = sinon this.UpdateManager.promises.continueProcessingUpdatesWithLock = sinon
.stub() .stub()
.callsArgWith(1, null, true, (this.lockValue = 'mock-lock-value')) .resolves()
this.LockManager.releaseLock = sinon.stub().callsArg(2) this.UpdateManager.promises.processOutstandingUpdates = sinon
this.UpdateManager.continueProcessingUpdatesWithLock = sinon
.stub() .stub()
.callsArg(2) .resolves()
this.UpdateManager.processOutstandingUpdates = sinon.stub().callsArg(2)
}) })
describe('successfully', function () { describe('successfully', function () {
beforeEach(function () { beforeEach(async function () {
this.UpdateManager.processOutstandingUpdatesWithLock( await this.UpdateManager.promises.processOutstandingUpdatesWithLock(
this.project_id, this.project_id,
this.doc_id, this.doc_id
this.callback
) )
}) })
it('should acquire the lock', function () { it('should acquire the lock', function () {
this.LockManager.tryLock.calledWith(this.doc_id).should.equal(true) this.LockManager.promises.tryLock
.calledWith(this.doc_id)
.should.equal(true)
}) })
it('should free the lock', function () { it('should free the lock', function () {
this.LockManager.releaseLock this.LockManager.promises.releaseLock
.calledWith(this.doc_id, this.lockValue) .calledWith(this.doc_id, this.lockValue)
.should.equal(true) .should.equal(true)
}) })
it('should process the outstanding updates', function () { it('should process the outstanding updates', function () {
this.UpdateManager.processOutstandingUpdates this.UpdateManager.promises.processOutstandingUpdates
.calledWith(this.project_id, this.doc_id) .calledWith(this.project_id, this.doc_id)
.should.equal(true) .should.equal(true)
}) })
it('should do everything with the lock acquired', function () { it('should do everything with the lock acquired', function () {
this.UpdateManager.processOutstandingUpdates this.UpdateManager.promises.processOutstandingUpdates
.calledAfter(this.LockManager.tryLock) .calledAfter(this.LockManager.promises.tryLock)
.should.equal(true) .should.equal(true)
this.UpdateManager.processOutstandingUpdates this.UpdateManager.promises.processOutstandingUpdates
.calledBefore(this.LockManager.releaseLock) .calledBefore(this.LockManager.promises.releaseLock)
.should.equal(true) .should.equal(true)
}) })
it('should continue processing new updates that may have come in', function () { it('should continue processing new updates that may have come in', function () {
this.UpdateManager.continueProcessingUpdatesWithLock this.UpdateManager.promises.continueProcessingUpdatesWithLock
.calledWith(this.project_id, this.doc_id) .calledWith(this.project_id, this.doc_id)
.should.equal(true) .should.equal(true)
}) })
it('should return the callback', function () {
this.callback.called.should.equal(true)
})
}) })
describe('when processOutstandingUpdates returns an error', function () { describe('when processOutstandingUpdates returns an error', function () {
beforeEach(function () { beforeEach(async function () {
this.UpdateManager.processOutstandingUpdates = sinon this.error = new Error('Something went wrong')
this.UpdateManager.promises.processOutstandingUpdates = sinon
.stub() .stub()
.callsArgWith(2, (this.error = new Error('Something went wrong'))) .rejects(this.error)
this.UpdateManager.processOutstandingUpdatesWithLock( await expect(
this.project_id, this.UpdateManager.promises.processOutstandingUpdatesWithLock(
this.doc_id, this.project_id,
this.callback this.doc_id
) )
).to.be.rejectedWith(this.error)
}) })
it('should free the lock', function () { it('should free the lock', function () {
this.LockManager.releaseLock this.LockManager.promises.releaseLock
.calledWith(this.doc_id, this.lockValue) .calledWith(this.doc_id, this.lockValue)
.should.equal(true) .should.equal(true)
}) })
it('should return the error in the callback', function () {
this.callback.calledWith(this.error).should.equal(true)
})
}) })
}) })
describe('when the lock is taken', function () { describe('when the lock is taken', function () {
beforeEach(function () { beforeEach(async function () {
this.LockManager.tryLock = sinon.stub().callsArgWith(1, null, false) this.LockManager.promises.tryLock.resolves(null)
this.UpdateManager.processOutstandingUpdates = sinon.stub().callsArg(2) this.UpdateManager.promises.processOutstandingUpdates = sinon
this.UpdateManager.processOutstandingUpdatesWithLock( .stub()
.resolves()
await this.UpdateManager.promises.processOutstandingUpdatesWithLock(
this.project_id, this.project_id,
this.doc_id, this.doc_id
this.callback
) )
}) })
it('should return the callback', function () {
this.callback.called.should.equal(true)
})
it('should not process the updates', function () { it('should not process the updates', function () {
this.UpdateManager.processOutstandingUpdates.called.should.equal(false) this.UpdateManager.promises.processOutstandingUpdates.called.should.equal(
false
)
}) })
}) })
}) })
describe('continueProcessingUpdatesWithLock', function () { describe('continueProcessingUpdatesWithLock', function () {
describe('when there are outstanding updates', function () { describe('when there are outstanding updates', function () {
beforeEach(function () { beforeEach(async function () {
this.RealTimeRedisManager.getUpdatesLength = sinon this.RealTimeRedisManager.promises.getUpdatesLength.resolves(3)
this.UpdateManager.promises.processOutstandingUpdatesWithLock = sinon
.stub() .stub()
.callsArgWith(1, null, 3) .resolves()
this.UpdateManager.processOutstandingUpdatesWithLock = sinon await this.UpdateManager.promises.continueProcessingUpdatesWithLock(
.stub()
.callsArg(2)
this.UpdateManager.continueProcessingUpdatesWithLock(
this.project_id, this.project_id,
this.doc_id, this.doc_id
this.callback
) )
}) })
it('should process the outstanding updates', function () { it('should process the outstanding updates', function () {
this.UpdateManager.processOutstandingUpdatesWithLock this.UpdateManager.promises.processOutstandingUpdatesWithLock
.calledWith(this.project_id, this.doc_id) .calledWith(this.project_id, this.doc_id)
.should.equal(true) .should.equal(true)
}) })
it('should return the callback', function () {
this.callback.called.should.equal(true)
})
}) })
describe('when there are no outstanding updates', function () { describe('when there are no outstanding updates', function () {
beforeEach(function () { beforeEach(async function () {
this.RealTimeRedisManager.getUpdatesLength = sinon this.RealTimeRedisManager.promises.getUpdatesLength.resolves(0)
this.UpdateManager.promises.processOutstandingUpdatesWithLock = sinon
.stub() .stub()
.callsArgWith(1, null, 0) .resolves()
this.UpdateManager.processOutstandingUpdatesWithLock = sinon await this.UpdateManager.promises.continueProcessingUpdatesWithLock(
.stub()
.callsArg(2)
this.UpdateManager.continueProcessingUpdatesWithLock(
this.project_id, this.project_id,
this.doc_id, this.doc_id
this.callback
) )
}) })
it('should not try to process the outstanding updates', function () { it('should not try to process the outstanding updates', function () {
this.UpdateManager.processOutstandingUpdatesWithLock.called.should.equal( this.UpdateManager.promises.processOutstandingUpdatesWithLock.called.should.equal(
false false
) )
}) })
it('should return the callback', function () {
this.callback.called.should.equal(true)
})
}) })
}) })
describe('fetchAndApplyUpdates', function () { describe('fetchAndApplyUpdates', function () {
describe('with updates', function () { describe('with updates', function () {
beforeEach(function () { beforeEach(async function () {
this.updates = [{ p: 1, t: 'foo' }] this.updates = [{ p: 1, t: 'foo' }]
this.updatedDocLines = ['updated', 'lines'] this.updatedDocLines = ['updated', 'lines']
this.version = 34 this.version = 34
this.RealTimeRedisManager.getPendingUpdatesForDoc = sinon this.RealTimeRedisManager.promises.getPendingUpdatesForDoc.resolves(
.stub() this.updates
.callsArgWith(1, null, this.updates) )
this.UpdateManager.applyUpdate = sinon this.UpdateManager.promises.applyUpdate = sinon.stub().resolves()
.stub() await this.UpdateManager.promises.fetchAndApplyUpdates(
.callsArgWith(3, null, this.updatedDocLines, this.version)
this.UpdateManager.fetchAndApplyUpdates(
this.project_id, this.project_id,
this.doc_id, this.doc_id
this.callback
) )
}) })
it('should get the pending updates', function () { it('should get the pending updates', function () {
this.RealTimeRedisManager.getPendingUpdatesForDoc this.RealTimeRedisManager.promises.getPendingUpdatesForDoc
.calledWith(this.doc_id) .calledWith(this.doc_id)
.should.equal(true) .should.equal(true)
}) })
it('should apply the updates', function () { it('should apply the updates', function () {
Array.from(this.updates).map(update => this.updates.map(update =>
this.UpdateManager.applyUpdate this.UpdateManager.promises.applyUpdate
.calledWith(this.project_id, this.doc_id, update) .calledWith(this.project_id, this.doc_id, update)
.should.equal(true) .should.equal(true)
) )
}) })
it('should call the callback', function () {
this.callback.called.should.equal(true)
})
}) })
describe('when there are no updates', function () { describe('when there are no updates', function () {
beforeEach(function () { beforeEach(async function () {
this.updates = [] this.updates = []
this.RealTimeRedisManager.getPendingUpdatesForDoc = sinon this.RealTimeRedisManager.promises.getPendingUpdatesForDoc.resolves(
.stub() this.updates
.callsArgWith(1, null, this.updates) )
this.UpdateManager.applyUpdate = sinon.stub() this.UpdateManager.promises.applyUpdate = sinon.stub().resolves()
this.RedisManager.setDocument = sinon.stub() await this.UpdateManager.promises.fetchAndApplyUpdates(
this.UpdateManager.fetchAndApplyUpdates(
this.project_id, this.project_id,
this.doc_id, this.doc_id
this.callback
) )
}) })
it('should not call applyUpdate', function () { it('should not call applyUpdate', function () {
this.UpdateManager.applyUpdate.called.should.equal(false) this.UpdateManager.promises.applyUpdate.called.should.equal(false)
})
it('should call the callback', function () {
this.callback.called.should.equal(true)
}) })
}) })
}) })
@ -315,44 +311,41 @@ describe('UpdateManager', function () {
{ v: 42, op: 'mock-op-42' }, { v: 42, op: 'mock-op-42' },
{ v: 45, op: 'mock-op-45' }, { v: 45, op: 'mock-op-45' },
] ]
this.project_ops_length = sinon.stub() this.project_ops_length = 123
this.pathname = '/a/b/c.tex' this.pathname = '/a/b/c.tex'
this.DocumentManager.getDoc = sinon this.DocumentManager.promises.getDoc.resolves({
.stub() lines: this.lines,
.yields( version: this.version,
null, ranges: this.ranges,
this.lines, pathname: this.pathname,
this.version, projectHistoryId: this.projectHistoryId,
this.ranges, })
this.pathname, this.RangesManager.promises.applyUpdate.resolves({
this.projectHistoryId newRanges: this.updated_ranges,
) rangesWereCollapsed: false,
this.RangesManager.applyUpdate = sinon })
.stub() this.ShareJsUpdateManager.promises.applyUpdate = sinon.stub().resolves({
.yields(null, this.updated_ranges, false) updatedDocLines: this.updatedDocLines,
this.ShareJsUpdateManager.applyUpdate = sinon version: this.version,
.stub() appliedOps: this.appliedOps,
.yields(null, this.updatedDocLines, this.version, this.appliedOps) })
this.RedisManager.updateDocument = sinon this.RedisManager.promises.updateDocument.resolves(
.stub() this.project_ops_length
.yields(null, this.project_ops_length) )
this.RealTimeRedisManager.sendData = sinon.stub() this.UpdateManager.promises._addProjectHistoryMetadataToOps = sinon.stub()
this.UpdateManager._addProjectHistoryMetadataToOps = sinon.stub()
this.HistoryManager.recordAndFlushHistoryOps = sinon.stub()
}) })
describe('normally', function () { describe('normally', function () {
beforeEach(function () { beforeEach(async function () {
this.UpdateManager.applyUpdate( await this.UpdateManager.promises.applyUpdate(
this.project_id, this.project_id,
this.doc_id, this.doc_id,
this.update, this.update
this.callback
) )
}) })
it('should apply the updates via ShareJS', function () { it('should apply the updates via ShareJS', function () {
this.ShareJsUpdateManager.applyUpdate this.ShareJsUpdateManager.promises.applyUpdate
.calledWith( .calledWith(
this.project_id, this.project_id,
this.doc_id, this.doc_id,
@ -364,7 +357,7 @@ describe('UpdateManager', function () {
}) })
it('should update the ranges', function () { it('should update the ranges', function () {
this.RangesManager.applyUpdate this.RangesManager.promises.applyUpdate
.calledWith( .calledWith(
this.project_id, this.project_id,
this.doc_id, this.doc_id,
@ -376,7 +369,7 @@ describe('UpdateManager', function () {
}) })
it('should save the document', function () { it('should save the document', function () {
this.RedisManager.updateDocument this.RedisManager.promises.updateDocument
.calledWith( .calledWith(
this.project_id, this.project_id,
this.doc_id, this.doc_id,
@ -390,14 +383,12 @@ describe('UpdateManager', function () {
}) })
it('should add metadata to the ops', function () { it('should add metadata to the ops', function () {
this.UpdateManager._addProjectHistoryMetadataToOps this.UpdateManager.promises._addProjectHistoryMetadataToOps.should.have.been.calledWith(
.calledWith( this.appliedOps,
this.appliedOps, this.pathname,
this.pathname, this.projectHistoryId,
this.projectHistoryId, this.lines
this.lines )
)
.should.equal(true)
}) })
it('should push the applied ops into the history queue', function () { it('should push the applied ops into the history queue', function () {
@ -405,25 +396,20 @@ describe('UpdateManager', function () {
.calledWith(this.project_id, this.appliedOps, this.project_ops_length) .calledWith(this.project_id, this.appliedOps, this.project_ops_length)
.should.equal(true) .should.equal(true)
}) })
it('should call the callback', function () {
this.callback.called.should.equal(true)
})
}) })
describe('with UTF-16 surrogate pairs in the update', function () { describe('with UTF-16 surrogate pairs in the update', function () {
beforeEach(function () { beforeEach(async function () {
this.update = { op: [{ p: 42, i: '\uD835\uDC00' }] } this.update = { op: [{ p: 42, i: '\uD835\uDC00' }] }
this.UpdateManager.applyUpdate( await this.UpdateManager.promises.applyUpdate(
this.project_id, this.project_id,
this.doc_id, this.doc_id,
this.update, this.update
this.callback
) )
}) })
it('should apply the update but with surrogate pairs removed', function () { it('should apply the update but with surrogate pairs removed', function () {
this.ShareJsUpdateManager.applyUpdate this.ShareJsUpdateManager.promises.applyUpdate
.calledWith(this.project_id, this.doc_id, this.update) .calledWith(this.project_id, this.doc_id, this.update)
.should.equal(true) .should.equal(true)
@ -433,15 +419,16 @@ describe('UpdateManager', function () {
}) })
describe('with an error', function () { describe('with an error', function () {
beforeEach(function () { beforeEach(async function () {
this.error = new Error('something went wrong') this.error = new Error('something went wrong')
this.ShareJsUpdateManager.applyUpdate = sinon.stub().yields(this.error) this.ShareJsUpdateManager.promises.applyUpdate.rejects(this.error)
this.UpdateManager.applyUpdate( await expect(
this.project_id, this.UpdateManager.promises.applyUpdate(
this.doc_id, this.project_id,
this.update, this.doc_id,
this.callback this.update
) )
).to.be.rejectedWith(this.error)
}) })
it('should call RealTimeRedisManager.sendData with the error', function () { it('should call RealTimeRedisManager.sendData with the error', function () {
@ -453,23 +440,18 @@ describe('UpdateManager', function () {
}) })
.should.equal(true) .should.equal(true)
}) })
it('should call the callback with the error', function () {
this.callback.calledWith(this.error).should.equal(true)
})
}) })
describe('when ranges get collapsed', function () { describe('when ranges get collapsed', function () {
beforeEach(function () { beforeEach(async function () {
this.RangesManager.applyUpdate = sinon this.RangesManager.promises.applyUpdate.resolves({
.stub() newRanges: this.updated_ranges,
.yields(null, this.updated_ranges, true) rangesWereCollapsed: true,
this.SnapshotManager.recordSnapshot = sinon.stub().yields() })
this.UpdateManager.applyUpdate( await this.UpdateManager.promises.applyUpdate(
this.project_id, this.project_id,
this.doc_id, this.doc_id,
this.update, this.update
this.callback
) )
}) })
@ -478,7 +460,7 @@ describe('UpdateManager', function () {
}) })
it('should call SnapshotManager.recordSnapshot', function () { it('should call SnapshotManager.recordSnapshot', function () {
this.SnapshotManager.recordSnapshot this.SnapshotManager.promises.recordSnapshot
.calledWith( .calledWith(
this.project_id, this.project_id,
this.doc_id, this.doc_id,
@ -558,38 +540,41 @@ describe('UpdateManager', function () {
describe('lockUpdatesAndDo', function () { describe('lockUpdatesAndDo', function () {
beforeEach(function () { beforeEach(function () {
this.method = sinon.stub().callsArgWith(3, null, this.response_arg1) this.method = sinon
this.callback = sinon.stub() .stub()
.yields(null, this.response_arg1, this.response_arg2)
this.arg1 = 'argument 1' this.arg1 = 'argument 1'
this.response_arg1 = 'response argument 1' this.response_arg1 = 'response argument 1'
this.lockValue = 'mock-lock-value' this.response_arg2 = 'response argument 2'
this.LockManager.getLock = sinon
.stub()
.callsArgWith(1, null, this.lockValue)
this.LockManager.releaseLock = sinon.stub().callsArg(2)
}) })
describe('successfully', function () { describe('successfully', function () {
beforeEach(function () { beforeEach(async function () {
this.UpdateManager.continueProcessingUpdatesWithLock = sinon.stub() this.UpdateManager.promises.continueProcessingUpdatesWithLock = sinon
this.UpdateManager.processOutstandingUpdates = sinon.stub().callsArg(2) .stub()
this.UpdateManager.lockUpdatesAndDo( .resolves()
this.UpdateManager.promises.processOutstandingUpdates = sinon
.stub()
.resolves()
this.response = await this.UpdateManager.promises.lockUpdatesAndDo(
this.method, this.method,
this.project_id, this.project_id,
this.doc_id, this.doc_id,
this.arg1, this.arg1
this.callback
) )
}) })
it('should lock the doc', function () { it('should lock the doc', function () {
this.LockManager.getLock.calledWith(this.doc_id).should.equal(true) this.LockManager.promises.getLock
.calledWith(this.doc_id)
.should.equal(true)
}) })
it('should process any outstanding updates', function () { it('should process any outstanding updates', function () {
this.UpdateManager.processOutstandingUpdates this.UpdateManager.promises.processOutstandingUpdates.should.have.been.calledWith(
.calledWith(this.project_id, this.doc_id) this.project_id,
.should.equal(true) this.doc_id
)
}) })
it('should call the method', function () { it('should call the method', function () {
@ -598,76 +583,71 @@ describe('UpdateManager', function () {
.should.equal(true) .should.equal(true)
}) })
it('should return the method response to the callback', function () { it('should return the method response arguments', function () {
this.callback.calledWith(null, this.response_arg1).should.equal(true) expect(this.response).to.deep.equal([
this.response_arg1,
this.response_arg2,
])
}) })
it('should release the lock', function () { it('should release the lock', function () {
this.LockManager.releaseLock this.LockManager.promises.releaseLock
.calledWith(this.doc_id, this.lockValue) .calledWith(this.doc_id, this.lockValue)
.should.equal(true) .should.equal(true)
}) })
it('should continue processing updates', function () { it('should continue processing updates', function () {
this.UpdateManager.continueProcessingUpdatesWithLock this.UpdateManager.promises.continueProcessingUpdatesWithLock
.calledWith(this.project_id, this.doc_id) .calledWith(this.project_id, this.doc_id)
.should.equal(true) .should.equal(true)
}) })
}) })
describe('when processOutstandingUpdates returns an error', function () { describe('when processOutstandingUpdates returns an error', function () {
beforeEach(function () { beforeEach(async function () {
this.UpdateManager.processOutstandingUpdates = sinon this.error = new Error('Something went wrong')
this.UpdateManager.promises.processOutstandingUpdates = sinon
.stub() .stub()
.callsArgWith(2, (this.error = new Error('Something went wrong'))) .rejects(this.error)
this.UpdateManager.lockUpdatesAndDo( await expect(
this.method, this.UpdateManager.promises.lockUpdatesAndDo(
this.project_id, this.method,
this.doc_id, this.project_id,
this.arg1, this.doc_id,
this.callback this.arg1
) )
).to.be.rejectedWith(this.error)
}) })
it('should free the lock', function () { it('should free the lock', function () {
this.LockManager.releaseLock this.LockManager.promises.releaseLock
.calledWith(this.doc_id, this.lockValue) .calledWith(this.doc_id, this.lockValue)
.should.equal(true) .should.equal(true)
}) })
it('should return the error in the callback', function () {
this.callback.calledWith(this.error).should.equal(true)
})
}) })
describe('when the method returns an error', function () { describe('when the method returns an error', function () {
beforeEach(function () { beforeEach(async function () {
this.UpdateManager.processOutstandingUpdates = sinon.stub().callsArg(2) this.error = new Error('something went wrong')
this.method = sinon this.UpdateManager.promises.processOutstandingUpdates = sinon
.stub() .stub()
.callsArgWith( .resolves()
3, this.method = sinon.stub().yields(this.error)
(this.error = new Error('something went wrong')), await expect(
this.response_arg1 this.UpdateManager.promises.lockUpdatesAndDo(
this.method,
this.project_id,
this.doc_id,
this.arg1
) )
this.UpdateManager.lockUpdatesAndDo( ).to.be.rejectedWith(this.error)
this.method,
this.project_id,
this.doc_id,
this.arg1,
this.callback
)
}) })
it('should free the lock', function () { it('should free the lock', function () {
this.LockManager.releaseLock this.LockManager.promises.releaseLock
.calledWith(this.doc_id, this.lockValue) .calledWith(this.doc_id, this.lockValue)
.should.equal(true) .should.equal(true)
}) })
it('should return the error in the callback', function () {
this.callback.calledWith(this.error).should.equal(true)
})
}) })
}) })
}) })