2016-06-17 11:17:22 +00:00
|
|
|
Settings = require('settings-sharelatex')
|
2019-07-10 08:42:05 +00:00
|
|
|
rclient = require("redis-sharelatex").createClient(Settings.redis.documentupdater)
|
|
|
|
pubsubClient = require("redis-sharelatex").createClient(Settings.redis.pubsub)
|
2019-07-03 09:21:25 +00:00
|
|
|
Keys = Settings.redis.documentupdater.key_schema
|
2017-03-30 10:20:41 +00:00
|
|
|
logger = require('logger-sharelatex')
|
2019-03-21 12:10:15 +00:00
|
|
|
os = require "os"
|
|
|
|
crypto = require "crypto"
|
2020-03-25 12:15:35 +00:00
|
|
|
metrics = require('./Metrics')
|
2019-03-21 12:10:15 +00:00
|
|
|
|
|
|
|
HOST = os.hostname()
|
|
|
|
RND = crypto.randomBytes(4).toString('hex') # generate a random key for this process
|
|
|
|
COUNT = 0
|
2016-06-17 11:17:22 +00:00
|
|
|
|
2017-05-12 12:11:04 +00:00
|
|
|
MAX_OPS_PER_ITERATION = 8 # process a limited number of ops for safety
|
|
|
|
|
2017-05-02 14:38:33 +00:00
|
|
|
module.exports = RealTimeRedisManager =
|
2016-06-17 11:17:22 +00:00
|
|
|
getPendingUpdatesForDoc : (doc_id, callback)->
|
|
|
|
multi = rclient.multi()
|
2017-05-12 12:11:04 +00:00
|
|
|
multi.lrange Keys.pendingUpdates({doc_id}), 0, (MAX_OPS_PER_ITERATION-1)
|
|
|
|
multi.ltrim Keys.pendingUpdates({doc_id}), MAX_OPS_PER_ITERATION, -1
|
2016-06-17 11:17:22 +00:00
|
|
|
multi.exec (error, replys) ->
|
|
|
|
return callback(error) if error?
|
|
|
|
jsonUpdates = replys[0]
|
|
|
|
updates = []
|
|
|
|
for jsonUpdate in jsonUpdates
|
|
|
|
try
|
|
|
|
update = JSON.parse jsonUpdate
|
|
|
|
catch e
|
|
|
|
return callback e
|
|
|
|
updates.push update
|
2020-03-25 12:15:35 +00:00
|
|
|
# record metric for updates removed from queue
|
|
|
|
metrics.summary "redis.pendingUpdates", jsonUpdate.length, {status: "pop"}
|
2016-06-17 11:17:22 +00:00
|
|
|
callback error, updates
|
|
|
|
|
|
|
|
getUpdatesLength: (doc_id, callback)->
|
2017-04-13 16:00:42 +00:00
|
|
|
rclient.llen Keys.pendingUpdates({doc_id}), callback
|
2016-06-17 11:17:22 +00:00
|
|
|
|
2016-11-28 10:14:42 +00:00
|
|
|
sendData: (data) ->
|
2019-03-21 12:10:15 +00:00
|
|
|
# create a unique message id using a counter
|
|
|
|
message_id = "doc:#{HOST}:#{RND}-#{COUNT++}"
|
|
|
|
data?._id = message_id
|
2020-03-30 09:31:43 +00:00
|
|
|
|
|
|
|
blob = JSON.stringify(data)
|
|
|
|
metrics.summary "redis.publish.applied-ops", blob.length
|
|
|
|
|
2019-07-22 11:20:06 +00:00
|
|
|
# publish on separate channels for individual projects and docs when
|
|
|
|
# configured (needs realtime to be configured for this too).
|
2019-07-24 15:57:43 +00:00
|
|
|
if Settings.publishOnIndividualChannels
|
2020-03-30 09:31:43 +00:00
|
|
|
pubsubClient.publish "applied-ops:#{data.doc_id}", blob
|
2019-07-22 11:20:06 +00:00
|
|
|
else
|
2020-03-30 09:31:43 +00:00
|
|
|
pubsubClient.publish "applied-ops", blob
|