send tpds updates for all dropbox users

GitOrigin-RevId: b690693b98b0b6288a4c3a734d4ff2d28901a496
This commit is contained in:
Ersun Warncke 2020-06-02 10:02:06 -04:00 committed by Copybot
parent 9c9993684c
commit 387695c2b1
4 changed files with 261 additions and 126 deletions

View file

@ -8,6 +8,7 @@ const settings = require('settings-sharelatex')
const CollaboratorsGetter = require('../Collaborators/CollaboratorsGetter')
.promises
const UserGetter = require('../User/UserGetter.js').promises
const tpdsUrl = _.get(settings, ['apis', 'thirdPartyDataStore', 'url'])
@ -27,23 +28,22 @@ async function addDoc(options) {
async function addEntity(options) {
const projectUserIds = await getProjectUsersIds(options.project_id)
if (!projectUserIds.length) {
return
}
const job = {
method: 'post',
headers: {
sl_entity_rev: options.rev,
sl_project_id: options.project_id,
sl_all_user_ids: JSON.stringify(projectUserIds)
},
uri: buildTpdsUrl(projectUserIds[0], options.project_name, options.path),
title: 'addFile',
streamOrigin: options.streamOrigin
}
for (const userId of projectUserIds) {
const job = {
method: 'post',
headers: {
sl_entity_rev: options.rev,
sl_project_id: options.project_id,
sl_all_user_ids: JSON.stringify([userId])
},
uri: buildTpdsUrl(userId, options.project_name, options.path),
title: 'addFile',
streamOrigin: options.streamOrigin
}
return enqueue(options.project_id, 'pipeStreamFrom', job)
await enqueue(userId, 'pipeStreamFrom', job)
}
}
async function addFile(options) {
@ -79,22 +79,21 @@ async function deleteEntity(options) {
metrics.inc('tpds.delete-entity')
const projectUserIds = await getProjectUsersIds(options.project_id)
if (!projectUserIds.length) {
return
}
const job = {
method: 'delete',
headers: {
sl_project_id: options.project_id,
sl_all_user_ids: JSON.stringify(projectUserIds)
},
uri: buildTpdsUrl(projectUserIds[0], options.project_name, options.path),
title: 'deleteEntity',
sl_all_user_ids: JSON.stringify(projectUserIds)
}
for (const userId of projectUserIds) {
const job = {
method: 'delete',
headers: {
sl_project_id: options.project_id,
sl_all_user_ids: JSON.stringify([userId])
},
uri: buildTpdsUrl(userId, options.project_name, options.path),
title: 'deleteEntity',
sl_all_user_ids: JSON.stringify([userId])
}
return enqueue(options.project_id, 'standardHttpRequest', job)
await enqueue(userId, 'standardHttpRequest', job)
}
}
async function enqueue(group, method, job) {
@ -104,12 +103,13 @@ async function enqueue(group, method, job) {
return
}
try {
await request({
const response = request({
uri: `${tpdsWorkerUrl}/enqueue/web_to_tpds_http_requests`,
json: { group, job, method },
method: 'post',
timeout: 5 * 1000
})
return response
} catch (err) {
// log error and continue
logger.error({ err, group, job, method }, 'error enqueueing tpdsworker job')
@ -119,36 +119,46 @@ async function enqueue(group, method, job) {
async function getProjectUsersIds(projectId) {
// get list of all user ids with access to project. project owner
// will always be the first entry in the list.
// TODO: filter this list to only return users with dropbox linked
return CollaboratorsGetter.getInvitedMemberIds(projectId)
const projectUserIds = await CollaboratorsGetter.getInvitedMemberIds(
projectId
)
// filter list to only return users with dropbox linked
const users = await UserGetter.getUsers(
{
_id: { $in: projectUserIds }
},
{
_id: 1
}
)
return users.map(user => user._id)
}
async function moveEntity(options) {
metrics.inc('tpds.move-entity')
const projectUserIds = await getProjectUsersIds(options.project_id)
if (!projectUserIds.length) {
return
}
const { endPath, startPath } = buildMovePaths(options)
const job = {
method: 'put',
title: 'moveEntity',
uri: `${tpdsUrl}/user/${projectUserIds[0]}/entity`,
headers: {
sl_project_id: options.project_id,
sl_entity_rev: options.rev,
sl_all_user_ids: JSON.stringify(projectUserIds)
},
json: {
user_id: projectUserIds[0],
endPath,
startPath
for (const userId of projectUserIds) {
const job = {
method: 'put',
title: 'moveEntity',
uri: `${tpdsUrl}/user/${userId}/entity`,
headers: {
sl_project_id: options.project_id,
sl_entity_rev: options.rev,
sl_all_user_ids: JSON.stringify([userId])
},
json: {
user_id: userId,
endPath,
startPath
}
}
}
return enqueue(options.project_id, 'standardHttpRequest', job)
await enqueue(userId, 'standardHttpRequest', job)
}
}
async function pollDropboxForUser(userId) {

View file

@ -11,24 +11,14 @@ const Features = require('../../infrastructure/Features')
const UserGetter = {
getUser(query, projection, callback) {
if (!query) {
return callback(new Error('no query provided'))
}
if (arguments.length === 2) {
callback = projection
projection = {}
}
if (typeof query === 'string') {
try {
query = { _id: ObjectId(query) }
} catch (e) {
return callback(null, null)
}
} else if (query instanceof ObjectId) {
query = { _id: query }
}
db.users.findOne(query, projection, callback)
normalizeQuery(query, (err, query) => {
if (err) return callback(err)
db.users.findOne(query, projection, callback)
})
},
getUserEmail(userId, callback) {
@ -138,14 +128,14 @@ const UserGetter = {
db.users.find(query, projection, callback)
},
getUsers(userIds, projection, callback) {
try {
userIds = userIds.map(u => ObjectId(u.toString()))
} catch (error) {
return callback(error)
getUsers(query, projection, callback) {
if (!query) {
return callback(new Error('no query provided'))
}
db.users.find({ _id: { $in: userIds } }, projection, callback)
normalizeQuery(query, (err, query) => {
if (err) return callback(err)
db.users.find(query, projection, callback)
})
},
// check for duplicate email address. This is also enforced at the DB level
@ -159,6 +149,26 @@ const UserGetter = {
}
}
function normalizeQuery(query, callback) {
if (!query) {
return callback(new Error('no query provided'))
}
try {
if (typeof query === 'string') {
callback(null, { _id: ObjectId(query) })
} else if (query instanceof ObjectId) {
callback(null, { _id: query })
} else if (Array.isArray(query)) {
const userIds = query.map(u => ObjectId(u.toString()))
callback(null, { _id: { $in: userIds } })
} else {
callback(null, query)
}
} catch (err) {
callback(err, null)
}
}
var decorateFullEmails = (
defaultEmail,
emailsData,

View file

@ -25,6 +25,9 @@ const filestoreUrl = 'filestore.sharelatex.com'
describe('TpdsUpdateSender', function() {
beforeEach(function() {
this.fakeUser = {
_id: '12390i'
}
this.requestQueuer = function(queue, meth, opts, callback) {}
const memberIds = [userId, collaberatorRef, readOnlyRef]
this.CollaboratorsGetter = {
@ -47,6 +50,14 @@ describe('TpdsUpdateSender', function() {
}
}
}
const getUsers = sinon.stub().resolves(
memberIds.map(userId => {
return { _id: userId }
})
)
this.UserGetter = {
promises: { getUsers }
}
this.updateSender = SandboxedModule.require(modulePath, {
globals: {
console: console
@ -56,6 +67,7 @@ describe('TpdsUpdateSender', function() {
'logger-sharelatex': { log() {} },
'request-promise-native': this.request,
'../Collaborators/CollaboratorsGetter': this.CollaboratorsGetter,
'../User/UserGetter.js': this.UserGetter,
'metrics-sharelatex': {
inc() {}
}
@ -71,14 +83,14 @@ describe('TpdsUpdateSender', function() {
it('should post the message to the tpdsworker', async function() {
this.settings.apis.tpdsworker = { url: 'www.tpdsworker.env' }
const group = 'myproject'
const method = 'somemethod'
const job = 'do something'
await this.updateSender.promises.enqueue(group, method, job)
const group0 = 'myproject'
const method0 = 'somemethod0'
const job0 = 'do something'
await this.updateSender.promises.enqueue(group0, method0, job0)
const args = this.request.firstCall.args[0]
args.json.group.should.equal(group)
args.json.job.should.equal(job)
args.json.method.should.equal(method)
args.json.group.should.equal(group0)
args.json.job.should.equal(job0)
args.json.method.should.equal(method0)
args.uri.should.equal(
'www.tpdsworker.env/enqueue/web_to_tpds_http_requests'
)
@ -101,19 +113,33 @@ describe('TpdsUpdateSender', function() {
project_name: projectName
})
const { group, job, method } = this.request.firstCall.args[0].json
group.should.equal(projectId)
method.should.equal('pipeStreamFrom')
job.method.should.equal('post')
job.streamOrigin.should.equal(
const {
group: group0,
job: job0,
method: method0
} = this.request.firstCall.args[0].json
group0.should.equal(userId)
method0.should.equal('pipeStreamFrom')
job0.method.should.equal('post')
job0.streamOrigin.should.equal(
`${filestoreUrl}/project/${projectId}/file/${fileId}`
)
const expectedUrl = `${thirdPartyDataStoreApiUrl}/user/${userId}/entity/${encodeURIComponent(
projectName
)}${encodeURIComponent(path)}`
job.uri.should.equal(expectedUrl)
job.headers.sl_all_user_ids.should.equal(
JSON.stringify([userId, collaberatorRef, readOnlyRef])
job0.uri.should.equal(expectedUrl)
job0.headers.sl_all_user_ids.should.equal(JSON.stringify([userId]))
const { group: group1, job: job1 } = this.request.secondCall.args[0].json
group1.should.equal('collaberator_ref_1_here')
job1.headers.sl_all_user_ids.should.equal(
JSON.stringify(['collaberator_ref_1_here'])
)
const { group: group2, job: job2 } = this.request.thirdCall.args[0].json
group2.should.equal('read_only_ref_1_id_here')
job2.headers.sl_all_user_ids.should.equal(
JSON.stringify(['read_only_ref_1_id_here'])
)
})
@ -130,20 +156,34 @@ describe('TpdsUpdateSender', function() {
project_name: projectName
})
const { group, job, method } = this.request.firstCall.args[0].json
const {
group: group0,
job: job0,
method: method0
} = this.request.firstCall.args[0].json
group.should.equal(projectId)
method.should.equal('pipeStreamFrom')
job.method.should.equal('post')
group0.should.equal(userId)
method0.should.equal('pipeStreamFrom')
job0.method.should.equal('post')
const expectedUrl = `${thirdPartyDataStoreApiUrl}/user/${userId}/entity/${encodeURIComponent(
projectName
)}${encodeURIComponent(path)}`
job.uri.should.equal(expectedUrl)
job.streamOrigin.should.equal(
job0.uri.should.equal(expectedUrl)
job0.streamOrigin.should.equal(
`${this.docstoreUrl}/project/${projectId}/doc/${docId}/raw`
)
job.headers.sl_all_user_ids.should.eql(
JSON.stringify([userId, collaberatorRef, readOnlyRef])
job0.headers.sl_all_user_ids.should.eql(JSON.stringify([userId]))
const { group: group1, job: job1 } = this.request.secondCall.args[0].json
group1.should.equal('collaberator_ref_1_here')
job1.headers.sl_all_user_ids.should.equal(
JSON.stringify(['collaberator_ref_1_here'])
)
const { group: group2, job: job2 } = this.request.thirdCall.args[0].json
group2.should.equal('read_only_ref_1_id_here')
job2.headers.sl_all_user_ids.should.equal(
JSON.stringify(['read_only_ref_1_id_here'])
)
})
@ -156,18 +196,32 @@ describe('TpdsUpdateSender', function() {
project_name: projectName
})
const { group, job, method } = this.request.firstCall.args[0].json
const {
group: group0,
job: job0,
method: method0
} = this.request.firstCall.args[0].json
group.should.equal(projectId)
method.should.equal('standardHttpRequest')
job.method.should.equal('delete')
group0.should.equal(userId)
method0.should.equal('standardHttpRequest')
job0.method.should.equal('delete')
const expectedUrl = `${thirdPartyDataStoreApiUrl}/user/${userId}/entity/${encodeURIComponent(
projectName
)}${encodeURIComponent(path)}`
job.headers.sl_all_user_ids.should.eql(
JSON.stringify([userId, collaberatorRef, readOnlyRef])
job0.headers.sl_all_user_ids.should.eql(JSON.stringify([userId]))
job0.uri.should.equal(expectedUrl)
const { group: group1, job: job1 } = this.request.secondCall.args[0].json
group1.should.equal('collaberator_ref_1_here')
job1.headers.sl_all_user_ids.should.equal(
JSON.stringify(['collaberator_ref_1_here'])
)
const { group: group2, job: job2 } = this.request.thirdCall.args[0].json
group2.should.equal('read_only_ref_1_id_here')
job2.headers.sl_all_user_ids.should.equal(
JSON.stringify(['read_only_ref_1_id_here'])
)
job.uri.should.equal(expectedUrl)
})
it('moving entity', async function() {
@ -181,16 +235,32 @@ describe('TpdsUpdateSender', function() {
project_name: projectName
})
const { group, job, method } = this.request.firstCall.args[0].json
const {
group: group0,
job: job0,
method: method0
} = this.request.firstCall.args[0].json
group.should.equal(projectId)
method.should.equal('standardHttpRequest')
job.method.should.equal('put')
job.uri.should.equal(`${thirdPartyDataStoreApiUrl}/user/${userId}/entity`)
job.json.startPath.should.equal(`/${projectName}/${startPath}`)
job.json.endPath.should.equal(`/${projectName}/${endPath}`)
job.headers.sl_all_user_ids.should.eql(
JSON.stringify([userId, collaberatorRef, readOnlyRef])
group0.should.equal(userId)
method0.should.equal('standardHttpRequest')
job0.method.should.equal('put')
job0.uri.should.equal(
`${thirdPartyDataStoreApiUrl}/user/${userId}/entity`
)
job0.json.startPath.should.equal(`/${projectName}/${startPath}`)
job0.json.endPath.should.equal(`/${projectName}/${endPath}`)
job0.headers.sl_all_user_ids.should.eql(JSON.stringify([userId]))
const { group: group1, job: job1 } = this.request.secondCall.args[0].json
group1.should.equal('collaberator_ref_1_here')
job1.headers.sl_all_user_ids.should.equal(
JSON.stringify(['collaberator_ref_1_here'])
)
const { group: group2, job: job2 } = this.request.thirdCall.args[0].json
group2.should.equal('read_only_ref_1_id_here')
job2.headers.sl_all_user_ids.should.equal(
JSON.stringify(['read_only_ref_1_id_here'])
)
})
@ -204,30 +274,50 @@ describe('TpdsUpdateSender', function() {
newProjectName
})
const { group, job, method } = this.request.firstCall.args[0].json
const {
group: group0,
job: job0,
method: method0
} = this.request.firstCall.args[0].json
group.should.equal(projectId)
method.should.equal('standardHttpRequest')
job.method.should.equal('put')
job.uri.should.equal(`${thirdPartyDataStoreApiUrl}/user/${userId}/entity`)
job.json.startPath.should.equal(oldProjectName)
job.json.endPath.should.equal(newProjectName)
job.headers.sl_all_user_ids.should.eql(
JSON.stringify([userId, collaberatorRef, readOnlyRef])
group0.should.equal(userId)
method0.should.equal('standardHttpRequest')
job0.method.should.equal('put')
job0.uri.should.equal(
`${thirdPartyDataStoreApiUrl}/user/${userId}/entity`
)
job0.json.startPath.should.equal(oldProjectName)
job0.json.endPath.should.equal(newProjectName)
job0.headers.sl_all_user_ids.should.eql(JSON.stringify([userId]))
const { group: group1, job: job1 } = this.request.secondCall.args[0].json
group1.should.equal('collaberator_ref_1_here')
job1.headers.sl_all_user_ids.should.equal(
JSON.stringify(['collaberator_ref_1_here'])
)
const { group: group2, job: job2 } = this.request.thirdCall.args[0].json
group2.should.equal('read_only_ref_1_id_here')
job2.headers.sl_all_user_ids.should.equal(
JSON.stringify(['read_only_ref_1_id_here'])
)
})
it('pollDropboxForUser', async function() {
await this.updateSender.promises.pollDropboxForUser(userId)
const { group, job, method } = this.request.firstCall.args[0].json
const {
group: group0,
job: job0,
method: method0
} = this.request.firstCall.args[0].json
group.should.equal(`poll-dropbox:${userId}`)
method.should.equal('standardHttpRequest')
group0.should.equal(`poll-dropbox:${userId}`)
method0.should.equal('standardHttpRequest')
job.method.should.equal('post')
job.uri.should.equal(`${thirdPartyDataStoreApiUrl}/user/poll`)
job.json.user_ids[0].should.equal(userId)
job0.method.should.equal('post')
job0.uri.should.equal(`${thirdPartyDataStoreApiUrl}/user/poll`)
job0.json.user_ids[0].should.equal(userId)
})
})
})

View file

@ -77,8 +77,33 @@ describe('UserGetter', function() {
})
it('should not allow null query', function(done) {
this.UserGetter.getUser(null, {}, (error, user) => {
this.UserGetter.getUser(null, {}, error => {
error.should.exist
error.message.should.equal('no query provided')
done()
})
})
})
describe('getUsers', function() {
it('should get users with array of userIds', function(done) {
const query = [new ObjectId()]
const projection = { email: 1 }
this.UserGetter.getUsers(query, projection, (error, users) => {
expect(error).to.not.exist
this.find.should.have.been.calledWithMatch(
{ _id: { $in: query } },
projection
)
users.should.deep.equal([this.fakeUser])
done()
})
})
it('should not allow null query', function(done) {
this.UserGetter.getUser(null, {}, error => {
error.should.exist
error.message.should.equal('no query provided')
done()
})
})