mirror of
https://github.com/overleaf/overleaf.git
synced 2024-12-12 10:03:33 -05:00
448 lines
12 KiB
JavaScript
448 lines
12 KiB
JavaScript
|
import fs from 'fs'
|
||
|
import request from 'request'
|
||
|
import stream from 'stream'
|
||
|
import logger from '@overleaf/logger'
|
||
|
import _ from 'lodash'
|
||
|
import BPromise from 'bluebird'
|
||
|
import { URL } from 'url'
|
||
|
import OError from '@overleaf/o-error'
|
||
|
import Settings from '@overleaf/settings'
|
||
|
import * as Versions from './Versions.js'
|
||
|
import * as Errors from './Errors.js'
|
||
|
import * as LocalFileWriter from './LocalFileWriter.js'
|
||
|
import * as HashManager from './HashManager.js'
|
||
|
|
||
|
const HTTP_REQUEST_TIMEOUT = 300 * 1000 // 5 minutes
|
||
|
|
||
|
/**
|
||
|
* Container for functions that need to be mocked in tests
|
||
|
*
|
||
|
* TODO: Rewrite tests in terms of exported functions only
|
||
|
*/
|
||
|
export const _mocks = {}
|
||
|
|
||
|
class StringStream extends stream.Readable {
|
||
|
_read() {}
|
||
|
}
|
||
|
|
||
|
_mocks.getMostRecentChunk = (projectId, historyId, callback) => {
|
||
|
const path = `projects/${historyId}/latest/history`
|
||
|
logger.debug({ projectId, historyId }, 'getting chunk from history service')
|
||
|
_requestChunk({ path, json: true }, callback)
|
||
|
}
|
||
|
|
||
|
export function getMostRecentChunk(...args) {
|
||
|
_mocks.getMostRecentChunk(...args)
|
||
|
}
|
||
|
|
||
|
export function getChunkAtVersion(projectId, historyId, version, callback) {
|
||
|
const path = `projects/${historyId}/versions/${version}/history`
|
||
|
logger.debug(
|
||
|
{ projectId, historyId, version },
|
||
|
'getting chunk from history service for version'
|
||
|
)
|
||
|
_requestChunk({ path, json: true }, callback)
|
||
|
}
|
||
|
|
||
|
export function getMostRecentVersion(projectId, historyId, callback) {
|
||
|
getMostRecentChunk(projectId, historyId, (error, chunk) => {
|
||
|
if (error) {
|
||
|
return callback(OError.tag(error))
|
||
|
}
|
||
|
const mostRecentVersion =
|
||
|
chunk.chunk.startVersion + (chunk.chunk.history.changes || []).length
|
||
|
const lastChange = _.last(
|
||
|
_.sortBy(chunk.chunk.history.changes || [], x => x.timestamp)
|
||
|
)
|
||
|
// find the latest project and doc versions in the chunk
|
||
|
_getLatestProjectVersion(projectId, chunk, (err1, projectVersion) =>
|
||
|
_getLatestV2DocVersions(projectId, chunk, (err2, v2DocVersions) => {
|
||
|
// return the project and doc versions
|
||
|
const projectStructureAndDocVersions = {
|
||
|
project: projectVersion,
|
||
|
docs: v2DocVersions,
|
||
|
}
|
||
|
callback(
|
||
|
err1 || err2,
|
||
|
mostRecentVersion,
|
||
|
projectStructureAndDocVersions,
|
||
|
lastChange
|
||
|
)
|
||
|
})
|
||
|
)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
function _requestChunk(options, callback) {
|
||
|
_requestHistoryService(options, (err, chunk) => {
|
||
|
if (err) {
|
||
|
return callback(OError.tag(err))
|
||
|
}
|
||
|
if (
|
||
|
chunk == null ||
|
||
|
chunk.chunk == null ||
|
||
|
chunk.chunk.startVersion == null
|
||
|
) {
|
||
|
return callback(new OError('unexpected response'))
|
||
|
}
|
||
|
callback(null, chunk)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
function _getLatestProjectVersion(projectId, chunk, callback) {
|
||
|
// find the initial project version
|
||
|
let projectVersion =
|
||
|
chunk.chunk.history.snapshot && chunk.chunk.history.snapshot.projectVersion
|
||
|
// keep track of any errors
|
||
|
let error = null
|
||
|
// iterate over the changes in chunk to find the most recent project version
|
||
|
for (const change of chunk.chunk.history.changes || []) {
|
||
|
if (change.projectVersion != null) {
|
||
|
if (
|
||
|
projectVersion != null &&
|
||
|
Versions.lt(change.projectVersion, projectVersion)
|
||
|
) {
|
||
|
logger.warn(
|
||
|
{ projectId, chunk, projectVersion, change },
|
||
|
'project structure version out of order in chunk'
|
||
|
)
|
||
|
if (!error) {
|
||
|
error = new Errors.OpsOutOfOrderError(
|
||
|
'project structure version out of order'
|
||
|
)
|
||
|
}
|
||
|
} else {
|
||
|
projectVersion = change.projectVersion
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
callback(error, projectVersion)
|
||
|
}
|
||
|
|
||
|
function _getLatestV2DocVersions(projectId, chunk, callback) {
|
||
|
// find the initial doc versions (indexed by docId as this is immutable)
|
||
|
const v2DocVersions =
|
||
|
(chunk.chunk.history.snapshot &&
|
||
|
chunk.chunk.history.snapshot.v2DocVersions) ||
|
||
|
{}
|
||
|
// keep track of any errors
|
||
|
let error = null
|
||
|
// iterate over the changes in the chunk to find the most recent doc versions
|
||
|
for (const change of chunk.chunk.history.changes || []) {
|
||
|
if (change.v2DocVersions != null) {
|
||
|
for (const docId in change.v2DocVersions) {
|
||
|
const docInfo = change.v2DocVersions[docId]
|
||
|
const { v } = docInfo
|
||
|
if (
|
||
|
v2DocVersions[docId] &&
|
||
|
v2DocVersions[docId].v != null &&
|
||
|
Versions.lt(v, v2DocVersions[docId].v)
|
||
|
) {
|
||
|
logger.warn(
|
||
|
{
|
||
|
projectId,
|
||
|
docId,
|
||
|
changeVersion: docInfo,
|
||
|
previousVersion: v2DocVersions[docId],
|
||
|
},
|
||
|
'doc version out of order in chunk'
|
||
|
)
|
||
|
if (!error) {
|
||
|
error = new Errors.OpsOutOfOrderError('doc version out of order')
|
||
|
}
|
||
|
} else {
|
||
|
v2DocVersions[docId] = docInfo
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
callback(error, v2DocVersions)
|
||
|
}
|
||
|
|
||
|
export function getProjectBlob(historyId, blobHash, callback) {
|
||
|
logger.debug({ historyId, blobHash }, 'getting blob from history service')
|
||
|
_requestHistoryService(
|
||
|
{ path: `projects/${historyId}/blobs/${blobHash}` },
|
||
|
callback
|
||
|
)
|
||
|
}
|
||
|
|
||
|
export function getProjectBlobStream(historyId, blobHash, callback) {
|
||
|
logger.debug(
|
||
|
{ historyId, blobHash },
|
||
|
'getting blob stream from history service'
|
||
|
)
|
||
|
_requestHistoryServiceStream(
|
||
|
{ path: `projects/${historyId}/blobs/${blobHash}` },
|
||
|
callback
|
||
|
)
|
||
|
}
|
||
|
|
||
|
export function sendChanges(
|
||
|
projectId,
|
||
|
historyId,
|
||
|
changes,
|
||
|
endVersion,
|
||
|
callback
|
||
|
) {
|
||
|
logger.debug(
|
||
|
{ projectId, historyId, endVersion },
|
||
|
'sending changes to history service'
|
||
|
)
|
||
|
_requestHistoryService(
|
||
|
{
|
||
|
path: `projects/${historyId}/legacy_changes`,
|
||
|
qs: { end_version: endVersion },
|
||
|
method: 'POST',
|
||
|
json: changes,
|
||
|
},
|
||
|
error => {
|
||
|
if (error) {
|
||
|
OError.tag(error, 'failed to send changes to v1', {
|
||
|
projectId,
|
||
|
historyId,
|
||
|
endVersion,
|
||
|
errorCode: error.code,
|
||
|
statusCode: error.statusCode,
|
||
|
body: error.body,
|
||
|
})
|
||
|
logger.warn(error)
|
||
|
return callback(error)
|
||
|
}
|
||
|
callback()
|
||
|
}
|
||
|
)
|
||
|
}
|
||
|
|
||
|
export function createBlobForUpdate(projectId, historyId, update, callback) {
|
||
|
callback = _.once(callback)
|
||
|
|
||
|
if (update.doc != null && update.docLines != null) {
|
||
|
const stringStream = new StringStream()
|
||
|
stringStream.push(update.docLines)
|
||
|
stringStream.push(null)
|
||
|
|
||
|
LocalFileWriter.bufferOnDisk(
|
||
|
stringStream,
|
||
|
`project-${projectId}-doc-${update.doc}`,
|
||
|
(fsPath, cb) => {
|
||
|
_createBlob(historyId, fsPath, cb)
|
||
|
},
|
||
|
callback
|
||
|
)
|
||
|
} else if (update.file != null && update.url != null) {
|
||
|
// Rewrite the filestore url to point to the location in the local
|
||
|
// settings for this service (this avoids problems with cross-
|
||
|
// datacentre requests when running filestore in multiple locations).
|
||
|
const { pathname: fileStorePath } = new URL(update.url)
|
||
|
const urlMatch = /^\/project\/([0-9a-f]{24})\/file\/([0-9a-f]{24})$/.exec(
|
||
|
fileStorePath
|
||
|
)
|
||
|
if (urlMatch == null) {
|
||
|
return callback(new OError('invalid file for blob creation'))
|
||
|
}
|
||
|
if (urlMatch[1] !== projectId) {
|
||
|
return callback(new OError('invalid project for blob creation'))
|
||
|
}
|
||
|
const fileId = urlMatch[2]
|
||
|
const fileStoreStream = request.get({
|
||
|
url: `${Settings.apis.filestore.url}/project/${projectId}/file/${fileId}`,
|
||
|
timeout: HTTP_REQUEST_TIMEOUT,
|
||
|
})
|
||
|
fileStoreStream.pause()
|
||
|
fileStoreStream.on('error', err => {
|
||
|
callback(OError.tag(err, 'error from filestore', { url: update.url }))
|
||
|
})
|
||
|
fileStoreStream.on('response', response => {
|
||
|
if (response.statusCode >= 200 && response.statusCode < 300) {
|
||
|
LocalFileWriter.bufferOnDisk(
|
||
|
fileStoreStream,
|
||
|
`project-${projectId}-file-${fileId}`,
|
||
|
(fsPath, cb) => {
|
||
|
_createBlob(historyId, fsPath, cb)
|
||
|
},
|
||
|
callback
|
||
|
)
|
||
|
fileStoreStream.resume() // start data flowing when ready
|
||
|
} else if (response.statusCode === 404) {
|
||
|
logger.warn(
|
||
|
{ projectId, historyId, fileStoreUrl: update.url },
|
||
|
'File contents not found in filestore. Storing in history as an empty file'
|
||
|
)
|
||
|
const emptyStream = new StringStream()
|
||
|
LocalFileWriter.bufferOnDisk(
|
||
|
emptyStream,
|
||
|
`project-${projectId}-file-${fileId}`,
|
||
|
(fsPath, cb) => {
|
||
|
_createBlob(historyId, fsPath, cb)
|
||
|
},
|
||
|
callback
|
||
|
)
|
||
|
fileStoreStream.resume() // Drain the filestore stream
|
||
|
emptyStream.push(null) // send an EOF signal
|
||
|
} else {
|
||
|
const error = new OError(
|
||
|
`bad response from filestore: ${response.statusCode}`,
|
||
|
{ url: update.url, statusCode: response.statusCode }
|
||
|
)
|
||
|
fileStoreStream.resume() // See https://github.com/overleaf/write_latex/wiki/Streams-and-pipes-in-Node.js#discard-data-if-necessary-in-the-response-handler
|
||
|
callback(error)
|
||
|
}
|
||
|
})
|
||
|
} else {
|
||
|
const error = new OError('invalid update for blob creation')
|
||
|
callback(error)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function _createBlob(historyId, fsPath, _callback) {
|
||
|
const callback = _.once(_callback)
|
||
|
|
||
|
HashManager._getBlobHash(fsPath, (error, hash, byteLength) => {
|
||
|
if (error) {
|
||
|
return callback(OError.tag(error))
|
||
|
}
|
||
|
const outStream = fs.createReadStream(fsPath)
|
||
|
|
||
|
outStream.on('error', err => {
|
||
|
callback(
|
||
|
OError.tag(err, 'error streaming file from disk', {
|
||
|
fsPath,
|
||
|
hash,
|
||
|
byteLength,
|
||
|
})
|
||
|
)
|
||
|
})
|
||
|
|
||
|
logger.debug(
|
||
|
{ fsPath, hash, byteLength },
|
||
|
'sending blob to history service'
|
||
|
)
|
||
|
_requestHistoryService(
|
||
|
{
|
||
|
method: 'PUT',
|
||
|
path: `projects/${historyId}/blobs/${hash}`,
|
||
|
body: outStream,
|
||
|
},
|
||
|
error => {
|
||
|
if (error) {
|
||
|
return callback(OError.tag(error))
|
||
|
}
|
||
|
callback(null, hash)
|
||
|
}
|
||
|
)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
export function initializeProject(historyId, callback) {
|
||
|
_requestHistoryService(
|
||
|
{
|
||
|
method: 'POST',
|
||
|
path: 'projects',
|
||
|
json: historyId == null ? true : { projectId: historyId },
|
||
|
},
|
||
|
(error, project) => {
|
||
|
if (error) {
|
||
|
return callback(OError.tag(error))
|
||
|
}
|
||
|
|
||
|
const id = project.projectId
|
||
|
if (id == null) {
|
||
|
error = new OError('history store did not return a project id', id)
|
||
|
return callback(error)
|
||
|
}
|
||
|
|
||
|
callback(null, id)
|
||
|
}
|
||
|
)
|
||
|
}
|
||
|
|
||
|
export function deleteProject(projectId, callback) {
|
||
|
_requestHistoryService(
|
||
|
{ method: 'DELETE', path: `projects/${projectId}` },
|
||
|
callback
|
||
|
)
|
||
|
}
|
||
|
|
||
|
const getProjectBlobAsync = BPromise.promisify(getProjectBlob)
|
||
|
|
||
|
class BlobStore {
|
||
|
constructor(projectId) {
|
||
|
this.projectId = projectId
|
||
|
}
|
||
|
|
||
|
getString(hash) {
|
||
|
return getProjectBlobAsync(this.projectId, hash)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
export function getBlobStore(projectId) {
|
||
|
return new BlobStore(projectId)
|
||
|
}
|
||
|
|
||
|
function _requestOptions(options) {
|
||
|
const requestOptions = {
|
||
|
method: options.method || 'GET',
|
||
|
url: `${Settings.overleaf.history.host}/${options.path}`,
|
||
|
timeout: HTTP_REQUEST_TIMEOUT,
|
||
|
auth: {
|
||
|
user: Settings.overleaf.history.user,
|
||
|
pass: Settings.overleaf.history.pass,
|
||
|
sendImmediately: true,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
if (options.json != null) {
|
||
|
requestOptions.json = options.json
|
||
|
}
|
||
|
|
||
|
if (options.body != null) {
|
||
|
requestOptions.body = options.body
|
||
|
}
|
||
|
|
||
|
if (options.qs != null) {
|
||
|
requestOptions.qs = options.qs
|
||
|
}
|
||
|
|
||
|
return requestOptions
|
||
|
}
|
||
|
|
||
|
function _requestHistoryService(options, callback) {
|
||
|
const requestOptions = _requestOptions(options)
|
||
|
request(requestOptions, (error, res, body) => {
|
||
|
if (error) {
|
||
|
return callback(OError.tag(error))
|
||
|
}
|
||
|
|
||
|
if (res.statusCode >= 200 && res.statusCode < 300) {
|
||
|
callback(null, body)
|
||
|
} else {
|
||
|
error = new OError(
|
||
|
`history store a non-success status code: ${res.statusCode}`
|
||
|
)
|
||
|
error.statusCode = res.statusCode
|
||
|
error.body = body
|
||
|
logger.warn({ err: error }, error.message)
|
||
|
callback(error)
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
function _requestHistoryServiceStream(options, callback) {
|
||
|
callback = _.once(callback)
|
||
|
const requestOptions = _requestOptions(options)
|
||
|
const stream = request(requestOptions)
|
||
|
stream.on('error', callback)
|
||
|
stream.on('response', res => {
|
||
|
if (res.statusCode >= 200 && res.statusCode < 300) {
|
||
|
callback(null, stream)
|
||
|
} else {
|
||
|
const error = new OError(
|
||
|
`history store a non-success status code: ${res.statusCode}`
|
||
|
)
|
||
|
logger.warn({ err: error, options }, error.message)
|
||
|
callback(error)
|
||
|
}
|
||
|
})
|
||
|
}
|