2023-01-13 07:42:29 -05:00
|
|
|
#!/usr/bin/env node
|
|
|
|
|
|
|
|
const { promisify } = require('util')
|
|
|
|
const BPromise = require('bluebird')
|
|
|
|
const commandLineArgs = require('command-line-args')
|
|
|
|
const config = require('config')
|
|
|
|
const fs = require('fs')
|
|
|
|
const readline = require('readline')
|
|
|
|
const { History } = require('overleaf-editor-core')
|
|
|
|
const { knex, historyStore, persistor } = require('..')
|
|
|
|
const projectKey = require('../lib/project_key')
|
|
|
|
|
|
|
|
const MAX_POSTGRES_INTEGER = 2147483647
|
|
|
|
const DEFAULT_BATCH_SIZE = 1000
|
|
|
|
const MAX_RETRIES = 10
|
|
|
|
const RETRY_DELAY_MS = 5000
|
|
|
|
|
|
|
|
// Obtain a preconfigured GCS client through a non-documented property of
|
|
|
|
// object-persistor. Sorry about that. We need the GCS client because we use
|
|
|
|
// operations that are not implemented in object-persistor.
|
|
|
|
const gcsClient = persistor.storage
|
|
|
|
const globalBucket = gcsClient.bucket(config.get('blobStore.globalBucket'))
|
|
|
|
const projectBucket = gcsClient.bucket(config.get('blobStore.projectBucket'))
|
|
|
|
const delay = promisify(setTimeout)
|
|
|
|
|
|
|
|
async function main() {
|
|
|
|
const options = commandLineArgs([
|
|
|
|
{ name: 'global-blobs', type: String },
|
|
|
|
{ name: 'min-project-id', type: Number, defaultValue: 1 },
|
|
|
|
{
|
|
|
|
name: 'max-project-id',
|
|
|
|
type: Number,
|
|
|
|
defaultValue: MAX_POSTGRES_INTEGER,
|
|
|
|
},
|
|
|
|
{ name: 'batch-size', type: Number, defaultValue: DEFAULT_BATCH_SIZE },
|
|
|
|
{ name: 'concurrency', type: Number, defaultValue: 1 },
|
|
|
|
])
|
|
|
|
if (!options['global-blobs']) {
|
|
|
|
console.error(
|
|
|
|
'You must specify a global blobs file with the --global-blobs option'
|
|
|
|
)
|
|
|
|
process.exit(1)
|
|
|
|
}
|
|
|
|
const globalBlobs = await readGlobalBlobs(options['global-blobs'])
|
|
|
|
const minProjectId = options['min-project-id']
|
|
|
|
const maxProjectId = options['max-project-id']
|
|
|
|
const batchSize = options['batch-size']
|
|
|
|
const concurrency = options.concurrency
|
|
|
|
console.log(`Keeping ${globalBlobs.size} global blobs`)
|
|
|
|
await run({ globalBlobs, minProjectId, maxProjectId, batchSize, concurrency })
|
|
|
|
console.log('Done.')
|
|
|
|
}
|
|
|
|
|
|
|
|
async function readGlobalBlobs(filename) {
|
|
|
|
const stream = fs.createReadStream(filename)
|
|
|
|
const reader = readline.createInterface({
|
|
|
|
input: stream,
|
|
|
|
crlfDelay: Infinity,
|
|
|
|
})
|
|
|
|
const blobs = new Set()
|
|
|
|
for await (const line of reader) {
|
|
|
|
blobs.add(line.trim())
|
|
|
|
}
|
|
|
|
return blobs
|
|
|
|
}
|
|
|
|
|
|
|
|
async function run(options) {
|
|
|
|
const { globalBlobs, minProjectId, maxProjectId, batchSize, concurrency } =
|
|
|
|
options
|
|
|
|
let batchStart = minProjectId
|
|
|
|
while (batchStart <= maxProjectId) {
|
|
|
|
let projectIds = await getProjectIds(batchStart, maxProjectId, batchSize)
|
|
|
|
if (projectIds.length === 0) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
const batchEnd = projectIds[projectIds.length - 1]
|
|
|
|
console.log(`Processing projects ${batchStart} to ${batchEnd}`)
|
|
|
|
const chunkIdsByProject = await getChunkIdsByProject(projectIds)
|
|
|
|
|
|
|
|
let retries = 0
|
|
|
|
while (true) {
|
|
|
|
const results = await BPromise.map(
|
|
|
|
projectIds,
|
|
|
|
async projectId =>
|
2024-05-22 05:37:08 -04:00
|
|
|
await processProject(
|
2023-01-13 07:42:29 -05:00
|
|
|
projectId,
|
|
|
|
chunkIdsByProject.get(projectId),
|
|
|
|
globalBlobs
|
|
|
|
),
|
|
|
|
{ concurrency }
|
|
|
|
)
|
|
|
|
const failedProjectIds = results
|
|
|
|
.filter(result => !result.success)
|
|
|
|
.map(result => result.projectId)
|
|
|
|
if (failedProjectIds.length === 0) {
|
|
|
|
// All projects were copied successfully. Carry on.
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Some projects failed. Retry.
|
|
|
|
retries += 1
|
|
|
|
if (retries > MAX_RETRIES) {
|
|
|
|
console.log(
|
|
|
|
`Too many retries processing projects ${batchStart} to ${batchEnd}. Giving up.`
|
|
|
|
)
|
|
|
|
process.exit(1)
|
|
|
|
}
|
|
|
|
console.log(`Retrying projects: ${failedProjectIds.join(', ')}`)
|
|
|
|
await delay(RETRY_DELAY_MS)
|
|
|
|
projectIds = failedProjectIds
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set up next batch
|
|
|
|
batchStart = batchEnd + 1
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async function getProjectIds(minProjectId, maxProjectId, batchSize) {
|
|
|
|
const projectIds = await knex('chunks')
|
|
|
|
.distinct('doc_id')
|
|
|
|
.where('doc_id', '>=', minProjectId)
|
|
|
|
.andWhere('doc_id', '<=', maxProjectId)
|
|
|
|
.orderBy('doc_id')
|
|
|
|
.limit(batchSize)
|
|
|
|
.pluck('doc_id')
|
|
|
|
return projectIds
|
|
|
|
}
|
|
|
|
|
|
|
|
async function getChunkIdsByProject(projectIds) {
|
|
|
|
const chunks = await knex('chunks')
|
|
|
|
.select('id', { projectId: 'doc_id' })
|
|
|
|
.where('doc_id', 'in', projectIds)
|
|
|
|
const chunkIdsByProject = new Map()
|
|
|
|
for (const projectId of projectIds) {
|
|
|
|
chunkIdsByProject.set(projectId, [])
|
|
|
|
}
|
|
|
|
for (const chunk of chunks) {
|
|
|
|
chunkIdsByProject.get(chunk.projectId).push(chunk.id)
|
|
|
|
}
|
|
|
|
return chunkIdsByProject
|
|
|
|
}
|
|
|
|
|
|
|
|
async function processProject(projectId, chunkIds, globalBlobs) {
|
|
|
|
try {
|
|
|
|
const blobHashes = await getBlobHashes(projectId, chunkIds)
|
|
|
|
const projectBlobHashes = blobHashes.filter(hash => !globalBlobs.has(hash))
|
|
|
|
const gcsSizesByHash = new Map()
|
|
|
|
for (const blobHash of projectBlobHashes) {
|
|
|
|
const blobSize = await copyBlobInGcs(projectId, blobHash)
|
|
|
|
if (blobSize != null) {
|
|
|
|
gcsSizesByHash.set(blobHash, blobSize)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
const dbSizesByHash = await copyBlobsInDatabase(
|
|
|
|
projectId,
|
|
|
|
projectBlobHashes
|
|
|
|
)
|
|
|
|
compareBlobSizes(gcsSizesByHash, dbSizesByHash)
|
|
|
|
return { projectId, success: true }
|
|
|
|
} catch (err) {
|
|
|
|
console.error(`Failed to process project ${projectId}:`, err.stack)
|
|
|
|
return { projectId, success: false }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function compareBlobSizes(gcsSizesByHash, dbSizesByHash) {
|
|
|
|
// Throw an error if the database doesn't report as many blobs as GCS
|
|
|
|
if (dbSizesByHash.size !== gcsSizesByHash.size) {
|
|
|
|
throw new Error(
|
|
|
|
`the database reported ${dbSizesByHash.size} blobs copied, but GCS reported ${gcsSizesByHash.size} blobs copied`
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
const mismatches = []
|
|
|
|
for (const [hash, dbSize] of dbSizesByHash.entries()) {
|
|
|
|
if (gcsSizesByHash.get(hash) !== dbSize) {
|
|
|
|
mismatches.push(hash)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (mismatches.length > 0) {
|
|
|
|
throw new Error(`blob size mismatch for hashes: ${mismatches.join(', ')}`)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async function getHistory(projectId, chunkId) {
|
|
|
|
const rawHistory = await historyStore.loadRaw(projectId, chunkId)
|
|
|
|
const history = History.fromRaw(rawHistory)
|
|
|
|
return history
|
|
|
|
}
|
|
|
|
|
|
|
|
async function getBlobHashes(projectId, chunkIds) {
|
|
|
|
const blobHashes = new Set()
|
|
|
|
for (const chunkId of chunkIds) {
|
|
|
|
const history = await getHistory(projectId, chunkId)
|
|
|
|
history.findBlobHashes(blobHashes)
|
|
|
|
}
|
|
|
|
return Array.from(blobHashes)
|
|
|
|
}
|
|
|
|
|
|
|
|
async function copyBlobInGcs(projectId, blobHash) {
|
|
|
|
const globalBlobKey = [
|
|
|
|
blobHash.slice(0, 2),
|
|
|
|
blobHash.slice(2, 4),
|
|
|
|
blobHash.slice(4),
|
|
|
|
].join('/')
|
|
|
|
const projectBlobKey = [
|
|
|
|
projectKey.format(projectId),
|
|
|
|
blobHash.slice(0, 2),
|
|
|
|
blobHash.slice(2),
|
|
|
|
].join('/')
|
|
|
|
const globalBlobObject = globalBucket.file(globalBlobKey)
|
|
|
|
const projectBlobObject = projectBucket.file(projectBlobKey)
|
|
|
|
|
|
|
|
// Check if the project blob exists
|
|
|
|
let projectBlobMetadata = null
|
|
|
|
try {
|
|
|
|
;[projectBlobMetadata] = await projectBlobObject.getMetadata()
|
|
|
|
} catch (err) {
|
|
|
|
if (err.code !== 404) {
|
|
|
|
throw err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that the blob exists
|
|
|
|
let globalBlobMetadata = null
|
|
|
|
try {
|
|
|
|
;[globalBlobMetadata] = await globalBlobObject.getMetadata()
|
|
|
|
} catch (err) {
|
|
|
|
if (err.code !== 404) {
|
|
|
|
throw err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (projectBlobMetadata) {
|
|
|
|
// Project blob already exists. Compare the metadata if the global blob
|
|
|
|
// also exists and return early.
|
|
|
|
if (
|
|
|
|
globalBlobMetadata != null &&
|
|
|
|
(globalBlobMetadata.size !== projectBlobMetadata.size ||
|
|
|
|
globalBlobMetadata.md5Hash !== projectBlobMetadata.md5Hash)
|
|
|
|
) {
|
|
|
|
throw new Error(
|
|
|
|
`Project blob ${blobHash} in project ${projectId} doesn't match global blob`
|
|
|
|
)
|
|
|
|
}
|
|
|
|
return null
|
|
|
|
}
|
|
|
|
|
|
|
|
await globalBlobObject.copy(projectBlobObject)
|
|
|
|
|
|
|
|
// Paranoid check that the copy went well. The getMetadata() method returns
|
|
|
|
// an array, with the metadata in first position.
|
|
|
|
;[projectBlobMetadata] = await projectBlobObject.getMetadata()
|
|
|
|
if (
|
|
|
|
globalBlobMetadata.size !== projectBlobMetadata.size ||
|
|
|
|
globalBlobMetadata.md5Hash !== projectBlobMetadata.md5Hash
|
|
|
|
) {
|
|
|
|
throw new Error(`Failed to copy blob ${blobHash} to project ${projectId})`)
|
|
|
|
}
|
|
|
|
|
|
|
|
return parseInt(projectBlobMetadata.size, 10)
|
|
|
|
}
|
|
|
|
|
|
|
|
async function copyBlobsInDatabase(projectId, blobHashes) {
|
|
|
|
const blobSizesByHash = new Map()
|
|
|
|
if (blobHashes.length === 0) {
|
|
|
|
return blobSizesByHash
|
|
|
|
}
|
|
|
|
const binaryBlobHashes = blobHashes.map(hash => Buffer.from(hash, 'hex'))
|
|
|
|
const result = await knex.raw(
|
|
|
|
`INSERT INTO project_blobs (
|
|
|
|
project_id, hash_bytes, byte_length, string_length
|
|
|
|
)
|
|
|
|
SELECT ?, hash_bytes, byte_length, string_length
|
|
|
|
FROM blobs
|
|
|
|
WHERE hash_bytes IN (${binaryBlobHashes.map(_ => '?').join(',')})
|
|
|
|
ON CONFLICT (project_id, hash_bytes) DO NOTHING
|
|
|
|
RETURNING hash_bytes, byte_length`,
|
|
|
|
[projectId, ...binaryBlobHashes]
|
|
|
|
)
|
|
|
|
for (const row of result.rows) {
|
|
|
|
blobSizesByHash.set(row.hash_bytes.toString('hex'), row.byte_length)
|
|
|
|
}
|
|
|
|
return blobSizesByHash
|
|
|
|
}
|
|
|
|
|
|
|
|
main()
|
|
|
|
.then(() => {
|
|
|
|
process.exit()
|
|
|
|
})
|
|
|
|
.catch(err => {
|
|
|
|
console.error(err)
|
|
|
|
process.exit(1)
|
|
|
|
})
|