mirror of
https://github.com/overleaf/overleaf.git
synced 2024-11-21 20:47:08 -05:00
Merge pull request #17729 from overleaf/em-promisify-sync-manager
Promisify SyncManager GitOrigin-RevId: 134770d812a493e39410debb370ed4a58ffff4bf
This commit is contained in:
parent
c9373c25f4
commit
f03e3fd51e
10 changed files with 780 additions and 805 deletions
2
package-lock.json
generated
2
package-lock.json
generated
|
@ -43037,6 +43037,7 @@
|
|||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/promise-utils": "*",
|
||||
"@overleaf/redis-wrapper": "*",
|
||||
"@overleaf/settings": "*",
|
||||
"async": "^3.2.2",
|
||||
|
@ -51636,6 +51637,7 @@
|
|||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/promise-utils": "*",
|
||||
"@overleaf/redis-wrapper": "*",
|
||||
"@overleaf/settings": "*",
|
||||
"async": "^3.2.2",
|
||||
|
|
|
@ -305,6 +305,11 @@ export function getFailures(callback) {
|
|||
|
||||
export const promises = {
|
||||
getFailedProjects: promisify(getFailedProjects),
|
||||
record: promisify(record),
|
||||
getFailureRecord: promisify(getFailureRecord),
|
||||
getLastFailure: promisify(getLastFailure),
|
||||
getFailuresByType: promisify(getFailuresByType),
|
||||
getFailures: promisify(getFailures),
|
||||
record: promisify(record),
|
||||
recordSyncStart: promisify(recordSyncStart),
|
||||
setForceDebug: promisify(setForceDebug),
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
import { promisify } from 'util'
|
||||
import fs from 'fs'
|
||||
import crypto from 'crypto'
|
||||
import OError from '@overleaf/o-error'
|
||||
|
@ -51,3 +52,7 @@ export function _getBlobHash(fsPath, callback) {
|
|||
})
|
||||
})
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
_getBlobHash: promisify(_getBlobHash),
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import { promisify } from 'util'
|
||||
import fs from 'fs'
|
||||
import request from 'request'
|
||||
import stream from 'stream'
|
||||
import logger from '@overleaf/logger'
|
||||
import _ from 'lodash'
|
||||
import BPromise from 'bluebird'
|
||||
import { URL } from 'url'
|
||||
import OError from '@overleaf/o-error'
|
||||
import Settings from '@overleaf/settings'
|
||||
|
@ -354,7 +354,7 @@ export function deleteProject(projectId, callback) {
|
|||
)
|
||||
}
|
||||
|
||||
const getProjectBlobAsync = BPromise.promisify(getProjectBlob)
|
||||
const getProjectBlobAsync = promisify(getProjectBlob)
|
||||
|
||||
class BlobStore {
|
||||
constructor(projectId) {
|
||||
|
@ -435,3 +435,15 @@ function _requestHistoryService(options, callback) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
getMostRecentChunk: promisify(getMostRecentChunk),
|
||||
getChunkAtVersion: promisify(getChunkAtVersion),
|
||||
getMostRecentVersion: promisify(getMostRecentVersion),
|
||||
getProjectBlob: promisify(getProjectBlob),
|
||||
getProjectBlobStream: promisify(getProjectBlobStream),
|
||||
sendChanges: promisify(sendChanges),
|
||||
createBlobForUpdate: promisify(createBlobForUpdate),
|
||||
initializeProject: promisify(initializeProject),
|
||||
deleteProject: promisify(deleteProject),
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
* DS207: Consider shorter variations of null checks
|
||||
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
|
||||
*/
|
||||
import { promisify } from 'util'
|
||||
import async from 'async'
|
||||
import metrics from '@overleaf/metrics'
|
||||
import Settings from '@overleaf/settings'
|
||||
|
@ -273,3 +274,41 @@ class Lock {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Promisified version of runWithLock.
|
||||
*
|
||||
* @param {string} key
|
||||
* @param {(extendLock: Function) => Promise<any>} runner
|
||||
*/
|
||||
async function runWithLockPromises(key, runner) {
|
||||
const runnerCb = (extendLock, callback) => {
|
||||
const extendLockPromises = promisify(extendLock)
|
||||
runner(extendLockPromises)
|
||||
.then(result => {
|
||||
callback(null, result)
|
||||
})
|
||||
.catch(err => {
|
||||
callback(err)
|
||||
})
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
runWithLock(key, runnerCb, (err, result) => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve(result)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
tryLock: promisify(tryLock),
|
||||
extendLock: promisify(extendLock),
|
||||
getLock: promisify(getLock),
|
||||
checkLock: promisify(checkLock),
|
||||
releaseLock: promisify(releaseLock),
|
||||
runWithLock: runWithLockPromises,
|
||||
}
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
// @ts-check
|
||||
|
||||
import { promisify } from 'util'
|
||||
import Core from 'overleaf-editor-core'
|
||||
import { Readable as StringStream } from 'stream'
|
||||
import BPromise from 'bluebird'
|
||||
|
@ -171,3 +173,9 @@ function _loadFilesLimit(snapshot, kind, blobStore) {
|
|||
{ concurrency: MAX_REQUESTS }
|
||||
)
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
getFileSnapshotStream: promisify(getFileSnapshotStream),
|
||||
getProjectSnapshot: promisify(getProjectSnapshot),
|
||||
getLatestSnapshot: promisify(getLatestSnapshot),
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import _ from 'lodash'
|
||||
import async from 'async'
|
||||
import { promisify } from 'util'
|
||||
import { callbackify, promisify } from 'util'
|
||||
import { callbackifyMultiResult } from '@overleaf/promise-utils'
|
||||
import Settings from '@overleaf/settings'
|
||||
import logger from '@overleaf/logger'
|
||||
import Metrics from '@overleaf/metrics'
|
||||
|
@ -27,7 +27,7 @@ const keys = Settings.redis.lock.key_schema
|
|||
// To add expiresAt field to existing entries in collection (choose a suitable future expiry date):
|
||||
// db.projectHistorySyncState.updateMany({resyncProjectStructure: false, resyncDocContents: [], expiresAt: {$exists:false}}, {$set: {expiresAt: new Date("2019-07-01")}})
|
||||
|
||||
export function startResync(projectId, options, callback) {
|
||||
async function startResync(projectId, options = {}) {
|
||||
// We have three options here
|
||||
//
|
||||
// 1. If we update mongo before making the call to web then there's a
|
||||
|
@ -39,114 +39,71 @@ export function startResync(projectId, options, callback) {
|
|||
// after, causing all updates to be ignored from then on
|
||||
//
|
||||
// 3. We can wrap everything in a project lock
|
||||
if (typeof options === 'function') {
|
||||
callback = options
|
||||
options = {}
|
||||
}
|
||||
|
||||
Metrics.inc('project_history_resync')
|
||||
LockManager.runWithLock(
|
||||
keys.projectHistoryLock({ project_id: projectId }),
|
||||
(extendLock, releaseLock) =>
|
||||
_startResyncWithoutLock(projectId, options, releaseLock),
|
||||
function (error) {
|
||||
if (error) {
|
||||
OError.tag(error)
|
||||
// record error in starting sync ("sync ongoing")
|
||||
ErrorRecorder.record(projectId, -1, error, () => callback(error))
|
||||
} else {
|
||||
callback()
|
||||
try {
|
||||
await LockManager.promises.runWithLock(
|
||||
keys.projectHistoryLock({ project_id: projectId }),
|
||||
async extendLock => {
|
||||
await _startResyncWithoutLock(projectId, options)
|
||||
}
|
||||
)
|
||||
} catch (error) {
|
||||
// record error in starting sync ("sync ongoing")
|
||||
try {
|
||||
await ErrorRecorder.promises.record(projectId, -1, error)
|
||||
} catch (err) {
|
||||
// swallow any error thrown by ErrorRecorder.record()
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
export function startHardResync(projectId, options, callback) {
|
||||
if (typeof options === 'function') {
|
||||
callback = options
|
||||
options = {}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
async function startHardResync(projectId, options = {}) {
|
||||
Metrics.inc('project_history_hard_resync')
|
||||
LockManager.runWithLock(
|
||||
keys.projectHistoryLock({ project_id: projectId }),
|
||||
(extendLock, releaseLock) =>
|
||||
clearResyncState(projectId, function (err) {
|
||||
if (err) {
|
||||
return releaseLock(OError.tag(err))
|
||||
}
|
||||
RedisManager.clearFirstOpTimestamp(projectId, function (err) {
|
||||
if (err) {
|
||||
return releaseLock(OError.tag(err))
|
||||
}
|
||||
RedisManager.destroyDocUpdatesQueue(projectId, function (err) {
|
||||
if (err) {
|
||||
return releaseLock(OError.tag(err))
|
||||
}
|
||||
_startResyncWithoutLock(projectId, options, releaseLock)
|
||||
})
|
||||
})
|
||||
}),
|
||||
function (error) {
|
||||
if (error) {
|
||||
OError.tag(error)
|
||||
// record error in starting sync ("sync ongoing")
|
||||
ErrorRecorder.record(projectId, -1, error, () => callback(error))
|
||||
} else {
|
||||
callback()
|
||||
try {
|
||||
await LockManager.promises.runWithLock(
|
||||
keys.projectHistoryLock({ project_id: projectId }),
|
||||
async extendLock => {
|
||||
await clearResyncState(projectId)
|
||||
await RedisManager.promises.clearFirstOpTimestamp(projectId)
|
||||
await RedisManager.promises.destroyDocUpdatesQueue(projectId)
|
||||
await _startResyncWithoutLock(projectId, options)
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
} catch (error) {
|
||||
// record error in starting sync ("sync ongoing")
|
||||
await ErrorRecorder.promises.record(projectId, -1, error)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
function _startResyncWithoutLock(projectId, options, callback) {
|
||||
ErrorRecorder.recordSyncStart(projectId, function (error) {
|
||||
if (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
async function _startResyncWithoutLock(projectId, options) {
|
||||
await ErrorRecorder.promises.recordSyncStart(projectId)
|
||||
|
||||
_getResyncState(projectId, function (error, syncState) {
|
||||
if (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
const syncState = await _getResyncState(projectId)
|
||||
if (syncState.isSyncOngoing()) {
|
||||
throw new OError('sync ongoing')
|
||||
}
|
||||
syncState.setOrigin(options.origin || { kind: 'history-resync' })
|
||||
syncState.startProjectStructureSync()
|
||||
|
||||
if (syncState.isSyncOngoing()) {
|
||||
return callback(new OError('sync ongoing'))
|
||||
}
|
||||
await WebApiManager.promises.requestResync(projectId)
|
||||
await setResyncState(projectId, syncState)
|
||||
}
|
||||
|
||||
syncState.setOrigin(options.origin || { kind: 'history-resync' })
|
||||
syncState.startProjectStructureSync()
|
||||
|
||||
WebApiManager.requestResync(projectId, function (error) {
|
||||
if (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
|
||||
setResyncState(projectId, syncState, callback)
|
||||
})
|
||||
})
|
||||
async function _getResyncState(projectId) {
|
||||
const rawSyncState = await db.projectHistorySyncState.findOne({
|
||||
project_id: new ObjectId(projectId.toString()),
|
||||
})
|
||||
const syncState = SyncState.fromRaw(projectId, rawSyncState)
|
||||
return syncState
|
||||
}
|
||||
|
||||
function _getResyncState(projectId, callback) {
|
||||
db.projectHistorySyncState.findOne(
|
||||
{
|
||||
project_id: new ObjectId(projectId.toString()),
|
||||
},
|
||||
function (error, rawSyncState) {
|
||||
if (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
const syncState = SyncState.fromRaw(projectId, rawSyncState)
|
||||
callback(null, syncState)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
export function setResyncState(projectId, syncState, callback) {
|
||||
async function setResyncState(projectId, syncState) {
|
||||
// skip if syncState is null (i.e. unchanged)
|
||||
if (syncState == null) {
|
||||
return callback()
|
||||
} // skip if syncState is null (i.e. unchanged)
|
||||
return
|
||||
}
|
||||
const update = {
|
||||
$set: syncState.toRaw(),
|
||||
$push: {
|
||||
|
@ -158,142 +115,103 @@ export function setResyncState(projectId, syncState, callback) {
|
|||
},
|
||||
$currentDate: { lastUpdated: true },
|
||||
}
|
||||
|
||||
// handle different cases
|
||||
if (syncState.isSyncOngoing()) {
|
||||
// starting a new sync
|
||||
// starting a new sync; prevent the entry expiring while sync is in ongoing
|
||||
update.$inc = { resyncCount: 1 }
|
||||
update.$unset = { expiresAt: true } // prevent the entry expiring while sync is in ongoing
|
||||
update.$unset = { expiresAt: true }
|
||||
} else {
|
||||
// successful completion of existing sync
|
||||
// successful completion of existing sync; set the entry to expire in the
|
||||
// future
|
||||
update.$set.expiresAt = new Date(
|
||||
Date.now() + EXPIRE_RESYNC_HISTORY_INTERVAL_MS
|
||||
) // set the entry to expire in the future
|
||||
)
|
||||
}
|
||||
|
||||
// apply the update
|
||||
db.projectHistorySyncState.updateOne(
|
||||
{
|
||||
project_id: new ObjectId(projectId),
|
||||
},
|
||||
await db.projectHistorySyncState.updateOne(
|
||||
{ project_id: new ObjectId(projectId) },
|
||||
update,
|
||||
{
|
||||
upsert: true,
|
||||
},
|
||||
callback
|
||||
{ upsert: true }
|
||||
)
|
||||
}
|
||||
|
||||
export function clearResyncState(projectId, callback) {
|
||||
db.projectHistorySyncState.deleteOne(
|
||||
{
|
||||
project_id: new ObjectId(projectId.toString()),
|
||||
},
|
||||
callback
|
||||
)
|
||||
}
|
||||
|
||||
export function skipUpdatesDuringSync(projectId, updates, callback) {
|
||||
_getResyncState(projectId, function (error, syncState) {
|
||||
if (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
|
||||
if (!syncState.isSyncOngoing()) {
|
||||
logger.debug({ projectId }, 'not skipping updates: no resync in progress')
|
||||
return callback(null, updates) // don't return synsState when unchanged
|
||||
}
|
||||
|
||||
const filteredUpdates = []
|
||||
|
||||
for (const update of updates) {
|
||||
try {
|
||||
syncState.updateState(update)
|
||||
} catch (error1) {
|
||||
error = OError.tag(error1)
|
||||
if (error instanceof SyncError) {
|
||||
return callback(error)
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
const shouldSkipUpdate = syncState.shouldSkipUpdate(update)
|
||||
if (!shouldSkipUpdate) {
|
||||
filteredUpdates.push(update)
|
||||
} else {
|
||||
logger.debug({ projectId, update }, 'skipping update due to resync')
|
||||
}
|
||||
}
|
||||
|
||||
callback(null, filteredUpdates, syncState)
|
||||
async function clearResyncState(projectId) {
|
||||
await db.projectHistorySyncState.deleteOne({
|
||||
project_id: new ObjectId(projectId.toString()),
|
||||
})
|
||||
}
|
||||
|
||||
export function expandSyncUpdates(
|
||||
async function skipUpdatesDuringSync(projectId, updates) {
|
||||
const syncState = await _getResyncState(projectId)
|
||||
if (!syncState.isSyncOngoing()) {
|
||||
logger.debug({ projectId }, 'not skipping updates: no resync in progress')
|
||||
// don't return syncState when unchanged
|
||||
return { updates, syncState: null }
|
||||
}
|
||||
|
||||
const filteredUpdates = []
|
||||
|
||||
for (const update of updates) {
|
||||
syncState.updateState(update)
|
||||
const shouldSkipUpdate = syncState.shouldSkipUpdate(update)
|
||||
if (!shouldSkipUpdate) {
|
||||
filteredUpdates.push(update)
|
||||
} else {
|
||||
logger.debug({ projectId, update }, 'skipping update due to resync')
|
||||
}
|
||||
}
|
||||
return { updates: filteredUpdates, syncState }
|
||||
}
|
||||
|
||||
async function expandSyncUpdates(
|
||||
projectId,
|
||||
projectHistoryId,
|
||||
updates,
|
||||
extendLock,
|
||||
callback
|
||||
extendLock
|
||||
) {
|
||||
const areSyncUpdatesQueued =
|
||||
_.some(updates, 'resyncProjectStructure') ||
|
||||
_.some(updates, 'resyncDocContent')
|
||||
if (!areSyncUpdatesQueued) {
|
||||
logger.debug({ projectId }, 'no resync updates to expand')
|
||||
return callback(null, updates)
|
||||
return updates
|
||||
}
|
||||
|
||||
_getResyncState(projectId, (error, syncState) => {
|
||||
if (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
const syncState = await _getResyncState(projectId)
|
||||
|
||||
// compute the current snapshot from the most recent chunk
|
||||
SnapshotManager.getLatestSnapshot(
|
||||
// compute the current snapshot from the most recent chunk
|
||||
const snapshotFiles = await SnapshotManager.promises.getLatestSnapshot(
|
||||
projectId,
|
||||
projectHistoryId
|
||||
)
|
||||
|
||||
// check if snapshot files are valid
|
||||
const invalidFiles = _.pickBy(
|
||||
snapshotFiles,
|
||||
(v, k) => v == null || typeof v.isEditable !== 'function'
|
||||
)
|
||||
if (_.size(invalidFiles) > 0) {
|
||||
throw new SyncError('file is missing isEditable method', {
|
||||
projectId,
|
||||
projectHistoryId,
|
||||
(error, snapshotFiles) => {
|
||||
if (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
// check if snapshot files are valid
|
||||
const invalidFiles = _.pickBy(
|
||||
snapshotFiles,
|
||||
(v, k) => v == null || typeof v.isEditable !== 'function'
|
||||
)
|
||||
if (_.size(invalidFiles) > 0) {
|
||||
return callback(
|
||||
new SyncError('file is missing isEditable method', {
|
||||
projectId,
|
||||
invalidFiles,
|
||||
})
|
||||
)
|
||||
}
|
||||
const expander = new SyncUpdateExpander(
|
||||
projectId,
|
||||
snapshotFiles,
|
||||
syncState.origin
|
||||
)
|
||||
// expand updates asynchronously to avoid blocking
|
||||
const handleUpdate = (
|
||||
update,
|
||||
cb // n.b. lock manager calls cb asynchronously
|
||||
) =>
|
||||
expander.expandUpdate(update, error => {
|
||||
if (error) {
|
||||
return cb(OError.tag(error))
|
||||
}
|
||||
extendLock(cb)
|
||||
})
|
||||
async.eachSeries(updates, handleUpdate, error => {
|
||||
if (error) {
|
||||
return callback(OError.tag(error))
|
||||
}
|
||||
callback(null, expander.getExpandedUpdates())
|
||||
})
|
||||
}
|
||||
)
|
||||
})
|
||||
invalidFiles,
|
||||
})
|
||||
}
|
||||
|
||||
const expander = new SyncUpdateExpander(
|
||||
projectId,
|
||||
snapshotFiles,
|
||||
syncState.origin
|
||||
)
|
||||
|
||||
// expand updates asynchronously to avoid blocking
|
||||
for (const update of updates) {
|
||||
await expander.expandUpdate(update)
|
||||
await extendLock()
|
||||
}
|
||||
|
||||
return expander.getExpandedUpdates()
|
||||
}
|
||||
|
||||
class SyncState {
|
||||
|
@ -456,7 +374,7 @@ class SyncUpdateExpander {
|
|||
return !matchedExpectedFile
|
||||
}
|
||||
|
||||
expandUpdate(update, cb) {
|
||||
async expandUpdate(update) {
|
||||
if (update.resyncProjectStructure != null) {
|
||||
logger.debug(
|
||||
{ projectId: this.projectId, update },
|
||||
|
@ -523,16 +441,14 @@ class SyncUpdateExpander {
|
|||
expectedBinaryFiles,
|
||||
persistedBinaryFiles
|
||||
)
|
||||
cb()
|
||||
} else if (update.resyncDocContent != null) {
|
||||
logger.debug(
|
||||
{ projectId: this.projectId, update },
|
||||
'expanding resyncDocContent update'
|
||||
)
|
||||
this.queueTextOpForOutOfSyncContents(update, cb)
|
||||
await this.queueTextOpForOutOfSyncContents(update)
|
||||
} else {
|
||||
this.expandedUpdates.push(update)
|
||||
cb()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -637,13 +553,13 @@ class SyncUpdateExpander {
|
|||
}
|
||||
}
|
||||
|
||||
queueTextOpForOutOfSyncContents(update, cb) {
|
||||
async queueTextOpForOutOfSyncContents(update) {
|
||||
const pathname = UpdateTranslator._convertPathname(update.path)
|
||||
const snapshotFile = this.files[pathname]
|
||||
const expectedFile = update.resyncDocContent
|
||||
|
||||
if (!snapshotFile) {
|
||||
return cb(new OError('unrecognised file: not in snapshot'))
|
||||
throw new OError('unrecognised file: not in snapshot')
|
||||
}
|
||||
|
||||
// Compare hashes to see if the persisted file matches the expected content.
|
||||
|
@ -664,7 +580,7 @@ class SyncUpdateExpander {
|
|||
{ projectId: this.projectId, persistedHash, expectedHash },
|
||||
'skipping diff because hashes match and persisted file has no ops'
|
||||
)
|
||||
return cb()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
logger.debug('cannot compare hashes, will retrieve content')
|
||||
|
@ -672,79 +588,104 @@ class SyncUpdateExpander {
|
|||
|
||||
const expectedContent = update.resyncDocContent.content
|
||||
|
||||
const computeDiff = (persistedContent, cb) => {
|
||||
let op
|
||||
logger.debug(
|
||||
{ projectId: this.projectId, persistedContent, expectedContent },
|
||||
'diffing doc contents'
|
||||
)
|
||||
try {
|
||||
op = UpdateCompressor.diffAsShareJsOps(
|
||||
persistedContent,
|
||||
expectedContent
|
||||
)
|
||||
} catch (error) {
|
||||
return cb(
|
||||
OError.tag(error, 'error from diffAsShareJsOps', {
|
||||
projectId: this.projectId,
|
||||
persistedContent,
|
||||
expectedContent,
|
||||
})
|
||||
)
|
||||
}
|
||||
if (op.length === 0) {
|
||||
return cb()
|
||||
}
|
||||
update = {
|
||||
doc: update.doc,
|
||||
op,
|
||||
meta: {
|
||||
resync: true,
|
||||
origin: this.origin,
|
||||
ts: update.meta.ts,
|
||||
pathname,
|
||||
doc_length: persistedContent.length,
|
||||
},
|
||||
}
|
||||
logger.debug(
|
||||
{ projectId: this.projectId, diffCount: op.length },
|
||||
'doc contents differ'
|
||||
)
|
||||
this.expandedUpdates.push(update)
|
||||
Metrics.inc('project_history_resync_operation', 1, {
|
||||
status: 'update text file contents',
|
||||
})
|
||||
cb()
|
||||
}
|
||||
|
||||
let persistedContent
|
||||
// compute the difference between the expected and persisted content
|
||||
if (snapshotFile.load != null) {
|
||||
WebApiManager.getHistoryId(this.projectId, (err, historyId) => {
|
||||
if (err) {
|
||||
return cb(OError.tag(err))
|
||||
}
|
||||
const loadFile = snapshotFile.load(
|
||||
'eager',
|
||||
HistoryStoreManager.getBlobStore(historyId)
|
||||
)
|
||||
loadFile
|
||||
.then(file => computeDiff(file.getContent(), cb))
|
||||
.catch(err => cb(err)) // error loading file or computing diff
|
||||
})
|
||||
const historyId = await WebApiManager.promises.getHistoryId(
|
||||
this.projectId
|
||||
)
|
||||
const file = await snapshotFile.load(
|
||||
'eager',
|
||||
HistoryStoreManager.getBlobStore(historyId)
|
||||
)
|
||||
persistedContent = file.getContent()
|
||||
} else if (snapshotFile.content != null) {
|
||||
// use dummy content from queueAddOpsForMissingFiles for added missing files
|
||||
computeDiff(snapshotFile.content, cb)
|
||||
persistedContent = snapshotFile.content
|
||||
} else {
|
||||
cb(new OError('unrecognised file'))
|
||||
throw new OError('unrecognised file')
|
||||
}
|
||||
|
||||
let op
|
||||
logger.debug(
|
||||
{ projectId: this.projectId, persistedContent, expectedContent },
|
||||
'diffing doc contents'
|
||||
)
|
||||
try {
|
||||
op = UpdateCompressor.diffAsShareJsOps(persistedContent, expectedContent)
|
||||
} catch (error) {
|
||||
throw OError.tag(error, 'error from diffAsShareJsOps', {
|
||||
projectId: this.projectId,
|
||||
persistedContent,
|
||||
expectedContent,
|
||||
})
|
||||
}
|
||||
if (op.length === 0) {
|
||||
return
|
||||
}
|
||||
update = {
|
||||
doc: update.doc,
|
||||
op,
|
||||
meta: {
|
||||
resync: true,
|
||||
origin: this.origin,
|
||||
ts: update.meta.ts,
|
||||
pathname,
|
||||
doc_length: persistedContent.length,
|
||||
},
|
||||
}
|
||||
logger.debug(
|
||||
{ projectId: this.projectId, diffCount: op.length },
|
||||
'doc contents differ'
|
||||
)
|
||||
this.expandedUpdates.push(update)
|
||||
Metrics.inc('project_history_resync_operation', 1, {
|
||||
status: 'update text file contents',
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
startResync: promisify(startResync),
|
||||
startHardResync: promisify(startHardResync),
|
||||
setResyncState: promisify(setResyncState),
|
||||
clearResyncState: promisify(clearResyncState),
|
||||
skipUpdatesDuringSync: promisify(skipUpdatesDuringSync),
|
||||
expandSyncUpdates: promisify(expandSyncUpdates),
|
||||
// EXPORTS
|
||||
|
||||
const startResyncCb = callbackify(startResync)
|
||||
const startHardResyncCb = callbackify(startHardResync)
|
||||
const setResyncStateCb = callbackify(setResyncState)
|
||||
const clearResyncStateCb = callbackify(clearResyncState)
|
||||
const skipUpdatesDuringSyncCb = callbackifyMultiResult(skipUpdatesDuringSync, [
|
||||
'updates',
|
||||
'syncState',
|
||||
])
|
||||
const expandSyncUpdatesCb = (
|
||||
projectId,
|
||||
projectHistoryId,
|
||||
updates,
|
||||
extendLock,
|
||||
callback
|
||||
) => {
|
||||
const extendLockPromises = promisify(extendLock)
|
||||
expandSyncUpdates(projectId, projectHistoryId, updates, extendLockPromises)
|
||||
.then(result => {
|
||||
callback(null, result)
|
||||
})
|
||||
.catch(err => {
|
||||
callback(err)
|
||||
})
|
||||
}
|
||||
|
||||
export {
|
||||
startResyncCb as startResync,
|
||||
startHardResyncCb as startHardResync,
|
||||
setResyncStateCb as setResyncState,
|
||||
clearResyncStateCb as clearResyncState,
|
||||
skipUpdatesDuringSyncCb as skipUpdatesDuringSync,
|
||||
expandSyncUpdatesCb as expandSyncUpdates,
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
startResync,
|
||||
startHardResync,
|
||||
setResyncState,
|
||||
clearResyncState,
|
||||
skipUpdatesDuringSync,
|
||||
expandSyncUpdates,
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
"@overleaf/logger": "*",
|
||||
"@overleaf/metrics": "*",
|
||||
"@overleaf/o-error": "*",
|
||||
"@overleaf/promise-utils": "*",
|
||||
"@overleaf/redis-wrapper": "*",
|
||||
"@overleaf/settings": "*",
|
||||
"async": "^3.2.2",
|
||||
|
|
|
@ -162,7 +162,7 @@ describe('Syncing with web and doc-updater', function () {
|
|||
],
|
||||
error => {
|
||||
if (error) {
|
||||
throw error
|
||||
return done(error)
|
||||
}
|
||||
assert(
|
||||
createBlob.isDone(),
|
||||
|
|
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue