[misc] bail out from pdf caching processing after 10s or earlier

...for fast compiles.
This commit is contained in:
Jakob Ackermann 2021-06-23 14:14:28 +01:00
parent c3dba7f74d
commit b09e52510f
10 changed files with 103 additions and 15 deletions

View file

@ -292,6 +292,8 @@ module.exports = CompileManager = {
timings['cpu-time'] / stats['latex-runs']
)
}
// Emit compile time.
timings.compile = ts
return OutputFileFinder.findOutputFiles(
resourceList,
@ -317,8 +319,7 @@ module.exports = CompileManager = {
)
}
// Emit compile time.
timings.compile = ts
// Emit e2e compile time.
timings.compileE2E = timerE2E.done()
if (stats['pdf-size']) {

View file

@ -10,21 +10,26 @@ const Settings = require('settings-sharelatex')
const OError = require('@overleaf/o-error')
const pLimit = require('p-limit')
const { parseXrefTable } = require('../lib/pdfjs/parseXrefTable')
const { TimedOutError } = require('./Errors')
/**
*
* @param {String} contentDir path to directory where content hash files are cached
* @param {String} filePath the pdf file to scan for streams
* @param {number} size the pdf size
* @param {number} compileTime
*/
async function update(contentDir, filePath, size) {
async function update(contentDir, filePath, size, compileTime) {
const checkDeadline = getDeadlineChecker(compileTime)
const ranges = []
const newRanges = []
// keep track of hashes expire old ones when they reach a generation > N.
const tracker = await HashFileTracker.from(contentDir)
tracker.updateAge()
const rawTable = await parseXrefTable(filePath, size)
checkDeadline('after init HashFileTracker')
const rawTable = await parseXrefTable(filePath, size, checkDeadline)
rawTable.sort((a, b) => {
return a.offset - b.offset
})
@ -32,6 +37,8 @@ async function update(contentDir, filePath, size) {
obj.idx = idx
})
checkDeadline('after parsing')
const uncompressedObjects = []
for (const object of rawTable) {
if (!object.uncompressed) {
@ -50,12 +57,14 @@ async function update(contentDir, filePath, size) {
if (size < Settings.pdfCachingMinChunkSize) {
continue
}
uncompressedObjects.push(object)
uncompressedObjects.push({ object, idx: uncompressedObjects.length })
}
checkDeadline('after finding uncompressed')
const handle = await fs.promises.open(filePath)
try {
for (const object of uncompressedObjects) {
for (const { object, idx } of uncompressedObjects) {
let buffer = Buffer.alloc(object.size, 0)
const { bytesRead } = await handle.read(
buffer,
@ -63,6 +72,7 @@ async function update(contentDir, filePath, size) {
object.size,
object.offset
)
checkDeadline('after read ' + idx)
if (bytesRead !== object.size) {
throw new OError('could not read full chunk', {
object,
@ -80,6 +90,7 @@ async function update(contentDir, filePath, size) {
buffer = buffer.subarray(objectIdRaw.byteLength)
const hash = pdfStreamHash(buffer)
checkDeadline('after hash ' + idx)
const range = {
objectId: objectIdRaw.toString(),
start: object.offset + objectIdRaw.byteLength,
@ -92,12 +103,15 @@ async function update(contentDir, filePath, size) {
if (tracker.track(range)) continue
await writePdfStream(contentDir, hash, buffer)
checkDeadline('after write ' + idx)
newRanges.push(range)
}
} finally {
await handle.close()
}
// NOTE: Bailing out below does not make sense.
// Let the next compile use the already written ranges.
const reclaimedSpace = await tracker.deleteStaleHashes(5)
await tracker.flush()
return [ranges, newRanges, reclaimedSpace]
@ -219,6 +233,28 @@ async function writePdfStream(dir, hash, buffer) {
}
}
function getDeadlineChecker(compileTime) {
const maxOverhead = Math.min(
// Adding 10s to a 40s compile time is OK.
compileTime / 4,
// Adding 30s to a 120s compile time is not OK, limit to 10s.
Settings.pdfCachingMaxProcessingTime
)
const deadline = Date.now() + maxOverhead
let lastStage = { stage: 'start', now: Date.now() }
return function (stage) {
const now = Date.now()
if (now > deadline) {
throw new TimedOutError(stage, {
lastStage: lastStage.stage,
diffToLastStage: now - lastStage.now
})
}
lastStage = { stage, now }
}
}
function promiseMapWithLimit(concurrency, array, fn) {
const limit = pLimit(concurrency)
return Promise.all(array.map((x) => limit(() => fn(x))))

View file

@ -19,6 +19,9 @@ function getSystemLoad() {
const ONE_MB = 1024 * 1024
function emitPdfStats(stats, timings) {
if (stats['pdf-caching-timed-out']) {
Metrics.inc('pdf-caching-timed-out')
}
if (timings['compute-pdf-caching']) {
emitPdfCachingStats(stats, timings)
} else {

View file

@ -4,6 +4,8 @@
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
const OError = require('@overleaf/o-error')
let Errors
var NotFoundError = function (message) {
const error = new Error(message)
@ -29,7 +31,10 @@ var AlreadyCompilingError = function (message) {
}
AlreadyCompilingError.prototype.__proto__ = Error.prototype
class TimedOutError extends OError {}
module.exports = Errors = {
TimedOutError,
NotFoundError,
FilesOutOfSyncError,
AlreadyCompilingError

View file

@ -26,6 +26,7 @@ const Metrics = require('./Metrics')
const OutputFileOptimiser = require('./OutputFileOptimiser')
const ContentCacheManager = require('./ContentCacheManager')
const { TimedOutError } = require('./Errors')
module.exports = OutputCacheManager = {
CONTENT_SUBDIR: 'content',
@ -281,7 +282,16 @@ module.exports = OutputCacheManager = {
contentDir,
outputFilePath,
pdfSize,
timings.compile,
function (err, result) {
if (err && err instanceof TimedOutError) {
logger.warn(
{ err, outputDir, stats, timings },
'pdf caching timed out'
)
stats['pdf-caching-timed-out'] = 1
return callback(null, outputFiles)
}
if (err) return callback(err, outputFiles)
const [contentRanges, newContentRanges, reclaimedSpace] = result

View file

@ -4,10 +4,10 @@ const { MissingDataException } = require('pdfjs-dist/lib/core/core_utils')
const { FSStream } = require('./FSStream')
class FSPdfManager extends LocalPdfManager {
constructor(docId, options) {
constructor(docId, { fh, size, checkDeadline }) {
const nonEmptyDummyBuffer = Buffer.alloc(1, 0)
super(docId, nonEmptyDummyBuffer)
this.stream = new FSStream(options.fh, 0, options.size)
this.stream = new FSStream(fh, 0, size, null, null, checkDeadline)
this.pdfDocument = new PDFDocument(this, this.stream)
}

View file

@ -4,11 +4,12 @@ const { MissingDataException } = require('pdfjs-dist/lib/core/core_utils')
const BUF_SIZE = 1024 // read from the file in 1024 byte pages
class FSStream extends Stream {
constructor(fh, start, length, dict, cachedBytes) {
constructor(fh, start, length, dict, cachedBytes, checkDeadline) {
const nonEmptyDummyBuffer = Buffer.alloc(1, 0)
super(nonEmptyDummyBuffer, start, length, dict)
delete this.bytes
this.fh = fh
this.checkDeadline = checkDeadline
this.cachedBytes = cachedBytes || []
}
@ -23,6 +24,7 @@ class FSStream extends Stream {
// Manage cached reads from the file
requestRange(begin, end) {
this.checkDeadline(`request range ${begin} - ${end}`)
// expand small ranges to read a larger amount
if (end - begin < BUF_SIZE) {
end = begin + BUF_SIZE
@ -123,6 +125,7 @@ class FSStream extends Stream {
}
makeSubStream(start, length, dict = null) {
this.checkDeadline(`make sub stream start=${start}/length=${length}`)
// BG: had to add this check for null length, it is being called with only
// the start value at one point in the xref decoding. The intent is clear
// enough
@ -131,7 +134,14 @@ class FSStream extends Stream {
if (!length) {
length = this.end - start
}
return new FSStream(this.fh, start, length, dict, this.cachedBytes)
return new FSStream(
this.fh,
start,
length,
dict,
this.cachedBytes,
this.checkDeadline
)
}
}

View file

@ -1,18 +1,21 @@
const fs = require('fs')
const { FSPdfManager } = require('./FSPdfManager')
async function parseXrefTable(path, size) {
async function parseXrefTable(path, size, checkDeadline) {
if (size === 0) {
return []
}
const file = await fs.promises.open(path)
try {
const manager = new FSPdfManager(0, { fh: file, size })
const manager = new FSPdfManager(0, { fh: file, size, checkDeadline })
await manager.ensureDoc('checkHeader')
checkDeadline('pdfjs: after checkHeader')
await manager.ensureDoc('parseStartXRef')
checkDeadline('pdfjs: after parseStartXRef')
await manager.ensureDoc('parse')
checkDeadline('pdfjs: after parse')
return manager.pdfDocument.catalog.xref.entries
} finally {
file.close()

View file

@ -68,7 +68,9 @@ module.exports = {
enablePdfCaching: process.env.ENABLE_PDF_CACHING === 'true',
enablePdfCachingDark: process.env.ENABLE_PDF_CACHING_DARK === 'true',
pdfCachingMinChunkSize:
parseInt(process.env.PDF_CACHING_MIN_CHUNK_SIZE, 10) || 1024
parseInt(process.env.PDF_CACHING_MIN_CHUNK_SIZE, 10) || 1024,
pdfCachingMaxProcessingTime:
parseInt(process.env.PDF_CACHING_MAX_PROCESSING_TIME, 10) || 10 * 1000
}
if (process.env.ALLOWED_COMPILE_GROUPS) {

View file

@ -33,7 +33,7 @@ async function loadContext(example) {
}
async function backFillSnapshot(example, size) {
const table = await parseXrefTable(pdfPath(example), size)
const table = await parseXrefTable(pdfPath(example), size, () => {})
await fs.promises.mkdir(Path.dirname(snapshotPath(example)), {
recursive: true
})
@ -53,6 +53,24 @@ describe('pdfjs', function () {
})
})
describe('when the operation times out', function () {
it('should bail out', async function () {
const path = pdfPath(EXAMPLES[0])
const { size } = await loadContext(EXAMPLES[0])
const err = new Error()
let table
try {
table = await parseXrefTable(path, size, () => {
throw err
})
} catch (e) {
expect(e).to.equal(err)
return
}
expect(table).to.not.exist
})
})
for (const example of EXAMPLES) {
describe(example, function () {
let size, snapshot
@ -70,7 +88,7 @@ describe('pdfjs', function () {
})
it('should produce the expected xRef table', async function () {
const table = await parseXrefTable(pdfPath(example), size)
const table = await parseXrefTable(pdfPath(example), size, () => {})
expect(table).to.deep.equal(snapshot)
})
})