Merge pull request #11952 from overleaf/em-batch-redis-reads

Read updates from Redis in smaller batches

GitOrigin-RevId: 06901e4a9e43976e446c014d5d46c2488691c205
This commit is contained in:
Eric Mc Sween 2023-02-27 09:36:14 -05:00 committed by Copybot
parent 1c3b05d4a8
commit 3ead64344f
3 changed files with 442 additions and 420 deletions

View file

@ -6,20 +6,33 @@ import redis from '@overleaf/redis-wrapper'
import metrics from '@overleaf/metrics' import metrics from '@overleaf/metrics'
import OError from '@overleaf/o-error' import OError from '@overleaf/o-error'
// maximum size taken from the redis queue, to prevent project history /**
// consuming unbounded amounts of memory * Maximum size taken from the redis queue, to prevent project history
let RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024 * consuming unbounded amounts of memory
*/
export const RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024
// maximum length of ops (insertion and deletions) to process in a single /**
// iteration * Batch size when reading updates from Redis
let MAX_UPDATE_OP_LENGTH = 1024 */
export const RAW_UPDATES_BATCH_SIZE = 50
// warn if we exceed this raw update size, the final compressed updates we send /**
// could be smaller than this * Maximum length of ops (insertion and deletions) to process in a single
* iteration
*/
export const MAX_UPDATE_OP_LENGTH = 1024
/**
* Warn if we exceed this raw update size, the final compressed updates we
* send could be smaller than this
*/
const WARN_RAW_UPDATE_SIZE = 1024 * 1024 const WARN_RAW_UPDATE_SIZE = 1024 * 1024
// maximum number of new docs to process in a single iteration /**
let MAX_NEW_DOC_CONTENT_COUNT = 32 * Maximum number of new docs to process in a single iteration
*/
export const MAX_NEW_DOC_CONTENT_COUNT = 32
const CACHE_TTL_IN_SECONDS = 3600 const CACHE_TTL_IN_SECONDS = 3600
@ -32,10 +45,54 @@ async function countUnprocessedUpdates(projectId) {
return updates return updates
} }
async function getOldestDocUpdates(projectId, batchSize) { async function* getRawUpdates(projectId) {
const key = Keys.projectHistoryOps({ project_id: projectId }) const key = Keys.projectHistoryOps({ project_id: projectId })
const updates = await rclient.lrange(key, 0, batchSize - 1) let start = 0
return updates while (true) {
const stop = start + RAW_UPDATES_BATCH_SIZE - 1
const updates = await rclient.lrange(key, start, stop)
for (const update of updates) {
yield update
}
if (updates.length < RAW_UPDATES_BATCH_SIZE) {
return
}
start += RAW_UPDATES_BATCH_SIZE
}
}
async function getRawUpdatesBatch(projectId, batchSize) {
const rawUpdates = []
let totalRawUpdatesSize = 0
let hasMore = false
for await (const rawUpdate of getRawUpdates(projectId)) {
totalRawUpdatesSize += rawUpdate.length
if (
rawUpdates.length > 0 &&
totalRawUpdatesSize > RAW_UPDATE_SIZE_THRESHOLD
) {
hasMore = true
break
}
rawUpdates.push(rawUpdate)
if (rawUpdates.length >= batchSize) {
hasMore = true
break
}
}
metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1)
if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) {
const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length)
logger.warn(
{
projectId,
totalRawUpdatesSize,
rawUpdateSizes,
},
'large raw update size'
)
}
return { rawUpdates, hasMore }
} }
export function parseDocUpdates(jsonUpdates) { export function parseDocUpdates(jsonUpdates) {
@ -44,112 +101,53 @@ export function parseDocUpdates(jsonUpdates) {
async function getUpdatesInBatches(projectId, batchSize, runner) { async function getUpdatesInBatches(projectId, batchSize, runner) {
let moreBatches = true let moreBatches = true
while (moreBatches) { while (moreBatches) {
let rawUpdates = await getOldestDocUpdates(projectId, batchSize) const redisBatch = await getRawUpdatesBatch(projectId, batchSize)
if (redisBatch.rawUpdates.length === 0) {
moreBatches = rawUpdates.length === batchSize break
if (rawUpdates.length === 0) {
return
} }
moreBatches = redisBatch.hasMore
// don't process any more batches if we are single stepping const rawUpdates = []
if (batchSize === 1) { const updates = []
moreBatches = false
}
// consume the updates up to a maximum total number of bytes
// ensuring that at least one update will be processed (we may
// exceed RAW_UPDATE_SIZE_THRESHOLD is the first update is bigger
// than that).
let totalRawUpdatesSize = 0
const updatesToProcess = []
for (const rawUpdate of rawUpdates) {
const nextTotalSize = totalRawUpdatesSize + rawUpdate.length
if (
updatesToProcess.length > 0 &&
nextTotalSize > RAW_UPDATE_SIZE_THRESHOLD
) {
// stop consuming updates if we have at least one and the
// next update would exceed the size threshold
break
} else {
updatesToProcess.push(rawUpdate)
totalRawUpdatesSize += rawUpdate.length
}
}
// if we hit the size limit above, only process the updates up to that point
if (updatesToProcess.length < rawUpdates.length) {
moreBatches = true // process remaining raw updates in the next iteration
rawUpdates = updatesToProcess
}
metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1)
if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) {
const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length)
logger.warn(
{ projectId, totalRawUpdatesSize, rawUpdateSizes },
'large raw update size'
)
}
let updates
try {
updates = parseDocUpdates(rawUpdates)
} catch (error) {
throw OError.tag(error, 'failed to parse updates', {
projectId,
updates,
})
}
// consume the updates up to a maximum number of ops (insertions and deletions)
let totalOpLength = 0 let totalOpLength = 0
let updatesToProcessCount = 0
let totalDocContentCount = 0 let totalDocContentCount = 0
for (const parsedUpdate of updates) { for (const rawUpdate of redisBatch.rawUpdates) {
if (parsedUpdate.resyncDocContent) { let update
totalDocContentCount++ try {
} update = JSON.parse(rawUpdate)
if (totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT) { } catch (error) {
break throw OError.tag(error, 'failed to parse update', {
}
const nextTotalOpLength = totalOpLength + (parsedUpdate?.op?.length || 1)
if (
updatesToProcessCount > 0 &&
nextTotalOpLength > MAX_UPDATE_OP_LENGTH
) {
break
} else {
totalOpLength = nextTotalOpLength
updatesToProcessCount++
}
}
// if we hit the op limit above, only process the updates up to that point
if (updatesToProcessCount < updates.length) {
logger.debug(
{
projectId, projectId,
updatesToProcessCount, update,
updates_count: updates.length, })
totalOpLength, }
},
'restricting number of ops to be processed' totalOpLength += update?.op?.length || 1
) if (update.resyncDocContent) {
moreBatches = true totalDocContentCount += 1
// there is a 1:1 mapping between rawUpdates and updates }
// which we need to preserve here to ensure we only
// delete the updates that are actually processed if (
rawUpdates = rawUpdates.slice(0, updatesToProcessCount) updates.length > 0 &&
updates = updates.slice(0, updatesToProcessCount) (totalOpLength > MAX_UPDATE_OP_LENGTH ||
totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT)
) {
moreBatches = true
break
}
rawUpdates.push(rawUpdate)
updates.push(update)
} }
logger.debug({ projectId }, 'retrieved raw updates from redis')
await runner(updates) await runner(updates)
await deleteAppliedDocUpdates(projectId, rawUpdates) await deleteAppliedDocUpdates(projectId, rawUpdates)
if (batchSize === 1) {
// Special case for single stepping, don't process more batches
break
}
} }
} }
@ -324,23 +322,10 @@ async function clearCachedHistoryId(projectId) {
await rclient.del(key) await rclient.del(key)
} }
// for tests
export function setMaxUpdateOpLength(value) {
MAX_UPDATE_OP_LENGTH = value
}
export function setRawUpdateSizeThreshold(value) {
RAW_UPDATE_SIZE_THRESHOLD = value
}
export function setMaxNewDocContentCount(value) {
MAX_NEW_DOC_CONTENT_COUNT = value
}
// EXPORTS // EXPORTS
const countUnprocessedUpdatesCb = callbackify(countUnprocessedUpdates) const countUnprocessedUpdatesCb = callbackify(countUnprocessedUpdates)
const getOldestDocUpdatesCb = callbackify(getOldestDocUpdates) const getRawUpdatesBatchCb = callbackify(getRawUpdatesBatch)
const deleteAppliedDocUpdatesCb = callbackify(deleteAppliedDocUpdates) const deleteAppliedDocUpdatesCb = callbackify(deleteAppliedDocUpdates)
const destroyDocUpdatesQueueCb = callbackify(destroyDocUpdatesQueue) const destroyDocUpdatesQueueCb = callbackify(destroyDocUpdatesQueue)
const getProjectIdsWithHistoryOpsCb = callbackify(getProjectIdsWithHistoryOps) const getProjectIdsWithHistoryOpsCb = callbackify(getProjectIdsWithHistoryOps)
@ -378,7 +363,7 @@ const getUpdatesInBatchesCb = function (
export { export {
countUnprocessedUpdatesCb as countUnprocessedUpdates, countUnprocessedUpdatesCb as countUnprocessedUpdates,
getOldestDocUpdatesCb as getOldestDocUpdates, getRawUpdatesBatchCb as getRawUpdatesBatch,
deleteAppliedDocUpdatesCb as deleteAppliedDocUpdates, deleteAppliedDocUpdatesCb as deleteAppliedDocUpdates,
destroyDocUpdatesQueueCb as destroyDocUpdatesQueue, destroyDocUpdatesQueueCb as destroyDocUpdatesQueue,
getUpdatesInBatchesCb as getUpdatesInBatches, getUpdatesInBatchesCb as getUpdatesInBatches,
@ -396,7 +381,7 @@ export {
export const promises = { export const promises = {
countUnprocessedUpdates, countUnprocessedUpdates,
getOldestDocUpdates, getRawUpdatesBatch,
deleteAppliedDocUpdates, deleteAppliedDocUpdates,
destroyDocUpdatesQueue, destroyDocUpdatesQueue,
getUpdatesInBatches, getUpdatesInBatches,

View file

@ -29,38 +29,34 @@ export const REDIS_READ_BATCH_SIZE = 500
export const _mocks = {} export const _mocks = {}
export function getRawUpdates(projectId, batchSize, callback) { export function getRawUpdates(projectId, batchSize, callback) {
RedisManager.getOldestDocUpdates( RedisManager.getRawUpdatesBatch(projectId, batchSize, (error, batch) => {
projectId, if (error != null) {
batchSize, return callback(OError.tag(error))
(error, rawUpdates) => { }
let updates
try {
updates = RedisManager.parseDocUpdates(batch.rawUpdates)
} catch (error) {
return callback(OError.tag(error))
}
_getHistoryId(projectId, updates, (error, historyId) => {
if (error != null) { if (error != null) {
return callback(OError.tag(error)) return callback(OError.tag(error))
} }
HistoryStoreManager.getMostRecentChunk(
let updates projectId,
try { historyId,
updates = RedisManager.parseDocUpdates(rawUpdates) (error, chunk) => {
} catch (error) { if (error != null) {
return callback(OError.tag(error)) return callback(OError.tag(error))
}
_getHistoryId(projectId, updates, (error, historyId) => {
if (error != null) {
return callback(OError.tag(error))
}
HistoryStoreManager.getMostRecentChunk(
projectId,
historyId,
(error, chunk) => {
if (error != null) {
return callback(OError.tag(error))
}
callback(null, { project_id: projectId, chunk, updates })
} }
) callback(null, { project_id: projectId, chunk, updates })
}) }
} )
) })
})
} }
// Process all updates for a project, only check project-level information once // Process all updates for a project, only check project-level information once

View file

@ -6,15 +6,7 @@ const MODULE_PATH = '../../../../app/js/RedisManager.js'
describe('RedisManager', function () { describe('RedisManager', function () {
beforeEach(async function () { beforeEach(async function () {
this.rclient = { this.rclient = new FakeRedis()
auth: sinon.stub(),
exec: sinon.stub().resolves(),
lrange: sinon.stub(),
lrem: sinon.stub(),
srem: sinon.stub(),
del: sinon.stub(),
}
this.rclient.multi = sinon.stub().returns(this.rclient)
this.RedisWrapper = { this.RedisWrapper = {
createClient: sinon.stub().returns(this.rclient), createClient: sinon.stub().returns(this.rclient),
} }
@ -44,10 +36,10 @@ describe('RedisManager', function () {
'@overleaf/metrics': this.Metrics, '@overleaf/metrics': this.Metrics,
}) })
this.project_id = 'project-id-123' this.projectId = 'project-id-123'
this.batchSize = 100 this.batchSize = 100
this.historyOpsKey = `Project:HistoryOps:{${this.project_id}}` this.historyOpsKey = `Project:HistoryOps:{${this.projectId}}`
this.firstOpTimestampKey = `ProjectHistory:FirstOpTimestamp:{${this.project_id}}` this.firstOpTimestampKey = `ProjectHistory:FirstOpTimestamp:{${this.projectId}}`
this.updates = [ this.updates = [
{ v: 42, op: ['a', 'b', 'c', 'd'] }, { v: 42, op: ['a', 'b', 'c', 'd'] },
@ -60,32 +52,51 @@ describe('RedisManager', function () {
) )
}) })
describe('getOldestDocUpdates', function () { describe('getRawUpdatesBatch', function () {
beforeEach(async function () { it('gets a small number of updates in one batch', async function () {
this.rclient.lrange.resolves(this.rawUpdates) const updates = makeUpdates(2)
this.batchSize = 3 const rawUpdates = makeRawUpdates(updates)
this.result = await this.RedisManager.promises.getOldestDocUpdates( this.rclient.setList(this.historyOpsKey, rawUpdates)
this.project_id, const result = await this.RedisManager.promises.getRawUpdatesBatch(
this.batchSize this.projectId,
100
) )
expect(result).to.deep.equal({ rawUpdates, hasMore: false })
}) })
it('should read the updates from redis', function () { it('gets a larger number of updates in several batches', async function () {
this.rclient.lrange const updates = makeUpdates(
.calledWith(this.historyOpsKey, 0, this.batchSize - 1) this.RedisManager.RAW_UPDATES_BATCH_SIZE * 2 + 12
.should.equal(true) )
const rawUpdates = makeRawUpdates(updates)
this.rclient.setList(this.historyOpsKey, rawUpdates)
const result = await this.RedisManager.promises.getRawUpdatesBatch(
this.projectId,
5000
)
expect(result).to.deep.equal({ rawUpdates, hasMore: false })
}) })
it('should call the callback with the unparsed ops', function () { it("doesn't return more than the number of updates requested", async function () {
this.result.should.equal(this.rawUpdates) const updates = makeUpdates(100)
const rawUpdates = makeRawUpdates(updates)
this.rclient.setList(this.historyOpsKey, rawUpdates)
const result = await this.RedisManager.promises.getRawUpdatesBatch(
this.projectId,
75
)
expect(result).to.deep.equal({
rawUpdates: rawUpdates.slice(0, 75),
hasMore: true,
})
}) })
}) })
describe('parseDocUpdates', function () { describe('parseDocUpdates', function () {
it('should return the parsed ops', function () { it('should return the parsed ops', function () {
this.RedisManager.parseDocUpdates(this.rawUpdates).should.deep.equal( const updates = makeUpdates(12)
this.updates const rawUpdates = makeRawUpdates(updates)
) this.RedisManager.parseDocUpdates(rawUpdates).should.deep.equal(updates)
}) })
}) })
@ -96,40 +107,26 @@ describe('RedisManager', function () {
describe('single batch smaller than batch size', function () { describe('single batch smaller than batch size', function () {
beforeEach(async function () { beforeEach(async function () {
this.rclient.lrange.resolves(this.rawUpdates) this.updates = makeUpdates(2)
this.batchSize = 3 this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
await this.RedisManager.promises.getUpdatesInBatches( await this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
this.batchSize, 3,
this.runner this.runner
) )
}) })
it('requests a single batch of updates', function () {
this.rclient.lrange.should.have.been.calledOnce
this.rclient.lrange.should.have.been.calledWith(
this.historyOpsKey,
0,
this.batchSize - 1
)
})
it('calls the runner once', function () { it('calls the runner once', function () {
this.runner.callCount.should.equal(1) this.runner.callCount.should.equal(1)
}) })
it('calls the runner with the updates', function () { it('calls the runner with the updates', function () {
this.runner.calledWith(this.updates).should.equal(true) this.runner.should.have.been.calledWith(this.updates)
}) })
it('deletes the applied updates', function () { it('deletes the applied updates', function () {
for (const update of this.rawUpdates) { expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
expect(this.rclient.lrem).to.have.been.calledWith(
this.historyOpsKey,
1,
update
)
}
}) })
it('deletes the first op timestamp', function () { it('deletes the first op timestamp', function () {
@ -141,35 +138,26 @@ describe('RedisManager', function () {
describe('single batch at batch size', function () { describe('single batch at batch size', function () {
beforeEach(async function () { beforeEach(async function () {
this.rclient.lrange.onCall(0).resolves(this.rawUpdates) this.updates = makeUpdates(123)
this.rclient.lrange.onCall(1).resolves([]) this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
await this.RedisManager.promises.getUpdatesInBatches( await this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
2, 123,
this.runner this.runner
) )
}) })
it('requests a second batch of updates', function () {
this.rclient.lrange.should.have.been.calledTwice
})
it('calls the runner once', function () { it('calls the runner once', function () {
this.runner.callCount.should.equal(1) this.runner.callCount.should.equal(1)
}) })
it('calls the runner with the updates', function () { it('calls the runner with the updates', function () {
this.runner.calledWith(this.updates).should.equal(true) this.runner.should.have.been.calledWith(this.updates)
}) })
it('deletes the applied updates', function () { it('deletes the applied updates', function () {
for (const update of this.rawUpdates) { expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
expect(this.rclient.lrem).to.have.been.calledWith(
this.historyOpsKey,
1,
update
)
}
}) })
it('deletes the first op timestamp', function () { it('deletes the first op timestamp', function () {
@ -181,215 +169,160 @@ describe('RedisManager', function () {
describe('single batch exceeding size limit on updates', function () { describe('single batch exceeding size limit on updates', function () {
beforeEach(async function () { beforeEach(async function () {
// set the threshold below the size of the first update this.updates = makeUpdates(2, [
this.RedisManager.setRawUpdateSizeThreshold( 'x'.repeat(this.RedisManager.RAW_UPDATE_SIZE_THRESHOLD),
this.rawUpdates[0].length - 1 ])
) this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.lrange.onCall(0).resolves(this.rawUpdates) this.rclient.setList(this.historyOpsKey, this.rawUpdates)
this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1))
await this.RedisManager.promises.getUpdatesInBatches( await this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
2, 123,
this.runner this.runner
) )
}) })
it('requests a second batch of updates', function () {
this.rclient.lrange.should.have.been.calledTwice
})
it('calls the runner twice', function () { it('calls the runner twice', function () {
this.runner.callCount.should.equal(2) this.runner.callCount.should.equal(2)
}) })
it('calls the runner with the first update', function () { it('calls the runner with the first update', function () {
this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) this.runner
}) .getCall(0)
.should.have.been.calledWith(this.updates.slice(0, 1))
it('deletes the first update', function () {
expect(this.rclient.lrem).to.have.been.calledWith(
this.historyOpsKey,
1,
this.rawUpdates[0]
)
}) })
it('calls the runner with the second update', function () { it('calls the runner with the second update', function () {
this.runner.should.have.been.calledWith(this.updates.slice(1)) this.runner
.getCall(1)
.should.have.been.calledWith(this.updates.slice(1))
}) })
it('deletes the second set of applied updates', function () { it('deletes the applied updates', function () {
expect(this.rclient.lrem).to.have.been.calledWith( expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
this.historyOpsKey,
1,
this.rawUpdates[1]
)
}) })
}) })
describe('two batches with first update below and second update above the size limit on updates', function () { describe('two batches with first update below and second update above the size limit on updates', function () {
beforeEach(async function () { beforeEach(async function () {
// set the threshold above the size of the first update, but below the total size this.updates = makeUpdates(2, [
this.RedisManager.setRawUpdateSizeThreshold( 'x'.repeat(this.RedisManager.RAW_UPDATE_SIZE_THRESHOLD / 2),
this.rawUpdates[0].length + 1 ])
) this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.lrange.onCall(0).resolves(this.rawUpdates) this.rclient.setList(this.historyOpsKey, this.rawUpdates)
this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1))
await this.RedisManager.promises.getUpdatesInBatches( await this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
2, 123,
this.runner this.runner
) )
}) })
it('requests a second batch of updates', function () {
this.rclient.lrange.should.have.been.calledTwice
})
it('calls the runner twice', function () { it('calls the runner twice', function () {
this.runner.callCount.should.equal(2) this.runner.callCount.should.equal(2)
}) })
it('calls the runner with the first update', function () { it('calls the runner with the first update', function () {
this.runner.calledWith(this.updates.slice(0, 1)).should.equal(true) this.runner
}) .getCall(0)
.should.have.been.calledWith(this.updates.slice(0, 1))
it('deletes the first set applied update', function () {
expect(this.rclient.lrem).to.have.been.calledWith(
this.historyOpsKey,
1,
this.rawUpdates[0]
)
}) })
it('calls the runner with the second update', function () { it('calls the runner with the second update', function () {
this.runner.calledWith(this.updates.slice(1)).should.equal(true) this.runner
.getCall(1)
.should.have.been.calledWith(this.updates.slice(1))
}) })
it('deletes the second applied update', function () { it('deletes the applied updates', function () {
expect(this.rclient.lrem).to.have.been.calledWith( expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
this.historyOpsKey,
1,
this.rawUpdates[1]
)
}) })
}) })
describe('single batch exceeding op count limit on updates', function () { describe('single batch exceeding op count limit on updates', function () {
beforeEach(async function () { beforeEach(async function () {
// set the threshold below the size of the first update const ops = Array(this.RedisManager.MAX_UPDATE_OP_LENGTH + 1).fill('op')
this.RedisManager.setMaxUpdateOpLength(this.updates[0].op.length - 1) this.updates = makeUpdates(2, { op: ops })
this.rclient.lrange.onCall(0).resolves(this.rawUpdates) this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) this.rclient.setList(this.historyOpsKey, this.rawUpdates)
await this.RedisManager.promises.getUpdatesInBatches( await this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
2, 123,
this.runner this.runner
) )
}) })
it('requests a second batch of updates', function () {
this.rclient.lrange.should.have.been.calledTwice
})
it('calls the runner twice', function () {
this.runner.callCount.should.equal(2)
})
it('calls the runner with the first updates', function () {
this.runner.calledWith(this.updates.slice(0, 1)).should.equal(true)
})
it('deletes the first applied update', function () {
expect(this.rclient.lrem).to.have.been.calledWith(
this.historyOpsKey,
1,
this.rawUpdates[0]
)
})
it('calls the runner with the second updates', function () {
this.runner.calledWith(this.updates.slice(1)).should.equal(true)
})
it('deletes the second applied update', function () {
expect(this.rclient.lrem).to.have.been.calledWith(
this.historyOpsKey,
1,
this.rawUpdates[1]
)
})
})
describe('single batch exceeding doc content count', function () {
beforeEach(async function () {
this.updates = [{ resyncDocContent: 123 }, { resyncDocContent: 456 }]
this.rawUpdates = this.updates.map(update => JSON.stringify(update))
// set the threshold below the size of the first update
this.RedisManager.setMaxNewDocContentCount(this.updates.length - 1)
this.rclient.lrange.onCall(0).resolves(this.rawUpdates)
this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1))
await this.RedisManager.promises.getUpdatesInBatches(
this.project_id,
2,
this.runner
)
})
it('requests a second batch of updates', function () {
this.rclient.lrange.should.have.been.calledTwice
})
it('calls the runner twice', function () { it('calls the runner twice', function () {
this.runner.callCount.should.equal(2) this.runner.callCount.should.equal(2)
}) })
it('calls the runner with the first update', function () { it('calls the runner with the first update', function () {
this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) this.runner
}) .getCall(0)
.should.have.been.calledWith(this.updates.slice(0, 1))
it('deletes the first applied update', function () {
expect(this.rclient.lrem).to.have.been.calledWith(
this.historyOpsKey,
1,
this.rawUpdates[0]
)
}) })
it('calls the runner with the second update', function () { it('calls the runner with the second update', function () {
this.runner.should.have.been.calledWith(this.updates.slice(1)) this.runner
.getCall(1)
.should.have.been.calledWith(this.updates.slice(1))
}) })
it('deletes the second set of applied updates', function () { it('deletes the applied updates', function () {
expect(this.rclient.lrem).to.have.been.calledWith( expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
this.historyOpsKey, })
1, })
this.rawUpdates[1]
describe('single batch exceeding doc content count', function () {
beforeEach(async function () {
this.updates = makeUpdates(
this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT + 3,
{ resyncDocContent: 123 }
) )
this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
await this.RedisManager.promises.getUpdatesInBatches(
this.projectId,
123,
this.runner
)
})
it('calls the runner twice', function () {
this.runner.callCount.should.equal(2)
})
it('calls the runner with the first batch of updates', function () {
this.runner.should.have.been.calledWith(
this.updates.slice(0, this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT)
)
})
it('calls the runner with the second batch of updates', function () {
this.runner.should.have.been.calledWith(
this.updates.slice(this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT)
)
})
it('deletes the applied updates', function () {
expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
}) })
}) })
describe('two batches with first update below and second update above the ops length limit on updates', function () { describe('two batches with first update below and second update above the ops length limit on updates', function () {
beforeEach(async function () { beforeEach(async function () {
// set the threshold below the size of the first update // set the threshold below the size of the first update
this.RedisManager.setMaxUpdateOpLength(this.updates[0].op.length + 1) this.updates = makeUpdates(2, { op: ['op1', 'op2'] })
this.rclient.lrange.onCall(0).resolves(this.rawUpdates) this.updates[1].op = Array(
this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) this.RedisManager.MAX_UPDATE_OP_LENGTH + 2
).fill('op')
this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
await this.RedisManager.promises.getUpdatesInBatches( await this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
2, 123,
this.runner this.runner
) )
}) })
it('requests a second batch of updates', function () {
this.rclient.lrange.should.have.been.calledTwice
})
it('calls the runner twice', function () { it('calls the runner twice', function () {
this.runner.callCount.should.equal(2) this.runner.callCount.should.equal(2)
}) })
@ -398,79 +331,120 @@ describe('RedisManager', function () {
this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) this.runner.should.have.been.calledWith(this.updates.slice(0, 1))
}) })
it('deletes the first applied update', function () {
expect(this.rclient.lrem).to.have.been.calledWith(
this.historyOpsKey,
1,
this.rawUpdates[0]
)
})
it('calls the runner with the second update', function () { it('calls the runner with the second update', function () {
this.runner.should.have.been.calledWith(this.updates.slice(1)) this.runner.should.have.been.calledWith(this.updates.slice(1))
}) })
it('deletes the second applied update', function () { it('deletes the applied updates', function () {
expect(this.rclient.lrem).to.have.been.calledWith( expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
this.historyOpsKey,
1,
this.rawUpdates[1]
)
}) })
}) })
describe('two batches', function () { describe('two batches, one partial', function () {
beforeEach(async function () { beforeEach(async function () {
this.rclient.lrange.onCall(0).resolves(this.rawUpdates) this.updates = makeUpdates(15)
this.rclient.lrange.onCall(1).resolves(this.extraRawUpdates) this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
await this.RedisManager.promises.getUpdatesInBatches( await this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
2, 10,
this.runner this.runner
) )
}) })
it('requests a second batch of updates', function () {
this.rclient.lrange.should.have.been.calledTwice
})
it('calls the runner twice', function () { it('calls the runner twice', function () {
this.runner.callCount.should.equal(2) this.runner.callCount.should.equal(2)
}) })
it('calls the runner with the updates', function () { it('calls the runner with the updates', function () {
this.runner.should.have.been.calledWith(this.updates) this.runner
this.runner.should.have.been.calledWith(this.extraUpdates) .getCall(0)
.should.have.been.calledWith(this.updates.slice(0, 10))
this.runner
.getCall(1)
.should.have.been.calledWith(this.updates.slice(10))
}) })
it('deletes the first set of applied updates', function () { it('deletes the applied updates', function () {
for (const update of this.rawUpdates) { expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
expect(this.rclient.lrem).to.have.been.calledWith( })
this.historyOpsKey, })
1,
update describe('two full batches', function () {
) beforeEach(async function () {
} this.updates = makeUpdates(20)
this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
await this.RedisManager.promises.getUpdatesInBatches(
this.projectId,
10,
this.runner
)
}) })
it('deletes the second set of applied updates', function () { it('calls the runner twice', function () {
for (const update of this.extraRawUpdates) { this.runner.callCount.should.equal(2)
expect(this.rclient.lrem).to.have.been.calledWith( })
this.historyOpsKey,
1, it('calls the runner with the updates', function () {
update this.runner
.getCall(0)
.should.have.been.calledWith(this.updates.slice(0, 10))
this.runner
.getCall(1)
.should.have.been.calledWith(this.updates.slice(10))
})
it('deletes the applied updates', function () {
expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
})
})
describe('three full bathches, bigger than the Redis read batch size', function () {
beforeEach(async function () {
this.batchSize = this.RedisManager.RAW_UPDATES_BATCH_SIZE * 2
this.updates = makeUpdates(this.batchSize * 3)
this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
await this.RedisManager.promises.getUpdatesInBatches(
this.projectId,
this.batchSize,
this.runner
)
})
it('calls the runner twice', function () {
this.runner.callCount.should.equal(3)
})
it('calls the runner with the updates', function () {
this.runner
.getCall(0)
.should.have.been.calledWith(this.updates.slice(0, this.batchSize))
this.runner
.getCall(1)
.should.have.been.calledWith(
this.updates.slice(this.batchSize, this.batchSize * 2)
) )
} this.runner
.getCall(2)
.should.have.been.calledWith(this.updates.slice(this.batchSize * 2))
})
it('deletes the applied updates', function () {
expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([])
}) })
}) })
describe('error when first reading updates', function () { describe('error when first reading updates', function () {
beforeEach(async function () { beforeEach(async function () {
this.error = new Error('error') this.updates = makeUpdates(10)
this.rclient.lrange.rejects(this.error) this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
this.rclient.throwErrorOnLrangeCall(0)
await expect( await expect(
this.RedisManager.promises.getUpdatesInBatches( this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
2, 2,
this.runner this.runner
) )
@ -478,38 +452,105 @@ describe('RedisManager', function () {
}) })
it('does not delete any updates', function () { it('does not delete any updates', function () {
expect(this.rclient.lrem).not.to.have.been.called expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal(
this.rawUpdates
)
}) })
}) })
describe('error when reading updates for a second batch', function () { describe('error when reading updates for a second batch', function () {
beforeEach(async function () { beforeEach(async function () {
this.error = new Error('error') this.batchSize = this.RedisManager.RAW_UPDATES_BATCH_SIZE - 1
this.rclient.lrange.onCall(0).resolves(this.rawUpdates) this.updates = makeUpdates(this.RedisManager.RAW_UPDATES_BATCH_SIZE * 2)
this.rclient.lrange.onCall(1).rejects(this.error) this.rawUpdates = makeRawUpdates(this.updates)
this.rclient.setList(this.historyOpsKey, this.rawUpdates)
this.rclient.throwErrorOnLrangeCall(1)
await expect( await expect(
this.RedisManager.promises.getUpdatesInBatches( this.RedisManager.promises.getUpdatesInBatches(
this.project_id, this.projectId,
2, this.batchSize,
this.runner this.runner
) )
).to.be.rejected ).to.be.rejected
}) })
it('deletes the first set of applied updates', function () { it('calls the runner with the first batch of updates', function () {
for (const update of this.rawUpdates) { this.runner.should.have.been.calledOnce
expect(this.rclient.lrem).to.have.been.calledWith( this.runner
this.historyOpsKey, .getCall(0)
1, .should.have.been.calledWith(this.updates.slice(0, this.batchSize))
update
)
}
}) })
it('deletes applied updates only once', function () { it('deletes only the first batch of applied updates', function () {
expect(this.rclient.lrem.callCount).to.equal(this.rawUpdates.length) expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal(
this.rawUpdates.slice(this.batchSize)
)
}) })
}) })
}) })
}) })
class FakeRedis {
constructor() {
this.data = new Map()
this.del = sinon.stub()
this.lrangeCallCount = -1
}
setList(key, list) {
this.data.set(key, list)
}
getList(key) {
return this.data.get(key)
}
throwErrorOnLrangeCall(callNum) {
this.lrangeCallThrowingError = callNum
}
async lrange(key, start, stop) {
this.lrangeCallCount += 1
if (
this.lrangeCallThrowingError != null &&
this.lrangeCallThrowingError === this.lrangeCallCount
) {
throw new Error('LRANGE failed!')
}
const list = this.data.get(key) ?? []
return list.slice(start, stop + 1)
}
async lrem(key, count, elementToRemove) {
expect(count).to.be.greaterThan(0)
const original = this.data.get(key) ?? []
const filtered = original.filter(element => {
if (count > 0 && element === elementToRemove) {
count--
return false
}
return true
})
this.data.set(key, filtered)
}
async exec() {
// Nothing to do
}
multi() {
return this
}
}
function makeUpdates(updateCount, extraFields = {}) {
const updates = []
for (let i = 0; i < updateCount; i++) {
updates.push({ v: i, ...extraFields })
}
return updates
}
function makeRawUpdates(updates) {
return updates.map(JSON.stringify)
}