Get basic ChangeTracker hooked up. WIP

This commit is contained in:
James Allen 2016-11-28 10:14:42 +00:00
parent 5ce15c4d60
commit e739e86c48
17 changed files with 751 additions and 112 deletions

View file

@ -46,6 +46,7 @@ app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLo
app.delete '/project/:project_id/doc/:doc_id', HttpController.flushAndDeleteDoc
app.delete '/project/:project_id', HttpController.deleteProject
app.post '/project/:project_id/flush', HttpController.flushProject
app.post '/project/:project_id/track_changes', HttpController.setTrackChanges
app.get '/total', (req, res)->
timer = new Metrics.Timer("http.allDocList")

View file

@ -0,0 +1,457 @@
load = (EventEmitter) ->
class ChangesTracker extends EventEmitter
# The purpose of this class is to track a set of inserts and deletes to a document, like
# track changes in Word. We store these as a set of ShareJs style ranges:
# {i: "foo", p: 42} # Insert 'foo' at offset 42
# {d: "bar", p: 37} # Delete 'bar' at offset 37
# We only track the inserts and deletes, not the whole document, but by being given all
# updates that are applied to a document, we can update these appropriately.
#
# Note that the set of inserts and deletes we store applies to the document as-is at the moment.
# So inserts correspond to text which is in the document, while deletes correspond to text which
# is no longer there, so their lengths do not affect the position of later offsets.
# E.g.
# this is the current text of the document
# |-----| |
# {i: "current ", p:12} -^ ^- {d: "old ", p: 31}
#
# Track changes rules (should be consistent with Word):
# * When text is inserted at a delete, the text goes to the left of the delete
# I.e. "foo|bar" -> "foobaz|bar", where | is the delete, and 'baz' is inserted
# * Deleting content flagged as 'inserted' does not create a new delete marker, it only
# removes the insert marker. E.g.
# * "abdefghijkl" -> "abfghijkl" when 'de' is deleted. No delete marker added
# |---| <- inserted |-| <- inserted
# * Deletes overlapping regular text and inserted text will insert a delete marker for the
# regular text:
# "abcdefghijkl" -> "abcdejkl" when 'fghi' is deleted
# |----| |--||
# ^- inserted 'bcdefg' \ ^- deleted 'hi'
# \--inserted 'bcde'
# * Deletes overlapping other deletes are merged. E.g.
# "abcghijkl" -> "ahijkl" when 'bcg is deleted'
# | <- delete 'def' | <- delete 'bcdefg'
# * Deletes by another user will consume deletes by the first user
# * Inserts by another user will not combine with inserts by the first user. If they are in the
# middle of a previous insert by the first user, the original insert will be split into two.
constructor: (@changes = [], @comments = []) ->
# Change objects have the following structure:
# {
# id: ... # Uniquely generated by us
# op: { # ShareJs style op tracking the offset (p) and content inserted (i) or deleted (d)
# i: "..."
# p: 42
# }
# }
#
# Ids are used to uniquely identify a change, e.g. for updating it in the database, or keeping in
# sync with Ace ranges.
@id = 0
addComment: (offset, length, metadata) ->
# TODO: Don't allow overlapping comments?
@comments.push comment = {
id: @_newId()
offset, length, metadata
}
@emit "comment:added", comment
return comment
getComment: (comment_id) ->
comment = null
for c in @comments
if c.id == comment_id
comment = c
break
return comment
resolveCommentId: (comment_id, resolved_data) ->
comment = @getComment(comment_id)
return if !comment?
comment.metadata.resolved = true
comment.metadata.resolved_data = resolved_data
@emit "comment:resolved", comment
unresolveCommentId: (comment_id) ->
comment = @getComment(comment_id)
return if !comment?
comment.metadata.resolved = false
@emit "comment:unresolved", comment
removeCommentId: (comment_id) ->
comment = @getComment(comment_id)
return if !comment?
@comments = @comments.filter (c) -> c.id != comment_id
@emit "comment:removed", comment
getChange: (change_id) ->
change = null
for c in @changes
if c.id == change_id
change = c
break
return change
removeChangeId: (change_id) ->
change = @getChange(change_id)
return if !change?
@_removeChange(change)
applyOp: (op, metadata = {}) ->
metadata.ts ?= new Date()
# Apply an op that has been applied to the document to our changes to keep them up to date
if op.i?
@applyInsertToChanges(op, metadata)
@applyInsertToComments(op)
else if op.d?
@applyDeleteToChanges(op, metadata)
@applyDeleteToComments(op)
applyInsertToComments: (op) ->
for comment in @comments
if op.p <= comment.offset
comment.offset += op.i.length
@emit "comment:moved", comment
else if op.p < comment.offset + comment.length
comment.length += op.i.length
@emit "comment:moved", comment
applyDeleteToComments: (op) ->
op_start = op.p
op_length = op.d.length
op_end = op.p + op_length
for comment in @comments
comment_end = comment.offset + comment.length
if op_end <= comment.offset
# delete is fully before comment
comment.offset -= op_length
@emit "comment:moved", comment
else if op_start >= comment_end
# delete is fully after comment, nothing to do
else
# delete and comment overlap
delete_length_before = Math.max(0, comment.offset - op_start)
delete_length_after = Math.max(0, op_end - comment_end)
delete_length_overlapping = op_length - delete_length_before - delete_length_after
comment.offset = Math.min(comment.offset, op_start)
comment.length -= delete_length_overlapping
@emit "comment:moved", comment
applyInsertToChanges: (op, metadata) ->
op_start = op.p
op_length = op.i.length
op_end = op.p + op_length
already_merged = false
previous_change = null
moved_changes = []
remove_changes = []
new_changes = []
for change in @changes
change_start = change.op.p
if change.op.d?
# Shift any deletes after this along by the length of this insert
if op_start < change_start
change.op.p += op_length
moved_changes.push change
else if op_start == change_start
# If the insert matches the start of the delete, just remove it from the delete instead
if change.op.d.length >= op.i.length and change.op.d.slice(0, op.i.length) == op.i
change.op.d = change.op.d.slice(op.i.length)
change.op.p += op.i.length
if change.op.d == ""
remove_changes.push change
else
moved_changes.push change
already_merged = true
else
change.op.p += op_length
moved_changes.push change
else if change.op.i?
change_end = change_start + change.op.i.length
is_change_overlapping = (op_start >= change_start and op_start <= change_end)
# Only merge inserts if they are from the same user
is_same_user = metadata.user_id == change.metadata.user_id
# If there is a delete at the start of the insert, and we're inserting
# at the start, we SHOULDN'T merge since the delete acts as a partition.
# The previous op will be the delete, but it's already been shifted by this insert
#
# I.e.
# Originally: |-- existing insert --|
# | <- existing delete at same offset
#
# Now: |-- existing insert --| <- not shifted yet
# |-- this insert --|| <- existing delete shifted along to end of this op
#
# After: |-- existing insert --|
# |-- this insert --|| <- existing delete
#
# Without the delete, the inserts would be merged.
is_insert_blocked_by_delete = (previous_change? and previous_change.op.d? and previous_change.op.p == op_end)
# If the insert is overlapping another insert, either at the beginning in the middle or touching the end,
# then we merge them into one.
if @track_changes and
is_change_overlapping and
!is_insert_blocked_by_delete and
!already_merged and
is_same_user
offset = op_start - change_start
change.op.i = change.op.i.slice(0, offset) + op.i + change.op.i.slice(offset)
change.metadata.ts = metadata.ts
already_merged = true
moved_changes.push change
else if op_start <= change_start
# If we're fully before the other insert we can just shift the other insert by our length.
# If they are touching, and should have been merged, they will have been above.
# If not merged above, then it must be blocked by a delete, and will be after this insert, so we shift it along as well
change.op.p += op_length
moved_changes.push change
else if (!is_same_user or !@track_changes) and change_start < op_start < change_end
# This user is inserting inside a change by another user, so we need to split the
# other user's change into one before and after this one.
offset = op_start - change_start
before_content = change.op.i.slice(0, offset)
after_content = change.op.i.slice(offset)
# The existing change can become the 'before' change
change.op.i = before_content
moved_changes.push change
# Create a new op afterwards
after_change = {
op: {
i: after_content
p: change_start + offset + op_length
}
metadata: {}
}
after_change.metadata[key] = value for key, value of change.metadata
new_changes.push after_change
previous_change = change
if @track_changes and !already_merged
@_addOp op, metadata
for {op, metadata} in new_changes
@_addOp op, metadata
for change in remove_changes
@_removeChange change
if moved_changes.length > 0
@emit "changes:moved", moved_changes
applyDeleteToChanges: (op, metadata) ->
op_start = op.p
op_length = op.d.length
op_end = op.p + op_length
remove_changes = []
moved_changes = []
# We might end up modifying our delete op if it merges with existing deletes, or cancels out
# with an existing insert. Since we might do multiple modifications, we record them and do
# all the modifications after looping through the existing changes, so as not to mess up the
# offset indexes as we go.
op_modifications = []
for change in @changes
if change.op.i?
change_start = change.op.p
change_end = change_start + change.op.i.length
if op_end <= change_start
# Shift ops after us back by our length
change.op.p -= op_length
moved_changes.push change
else if op_start >= change_end
# Delete is after insert, nothing to do
else
# When the new delete overlaps an insert, we should remove the part of the insert that
# is now deleted, and also remove the part of the new delete that overlapped. I.e.
# the two cancel out where they overlap.
if op_start >= change_start
# |-- existing insert --|
# insert_remaining_before -> |.....||-- new delete --|
delete_remaining_before = ""
insert_remaining_before = change.op.i.slice(0, op_start - change_start)
else
# delete_remaining_before -> |.....||-- existing insert --|
# |-- new delete --|
delete_remaining_before = op.d.slice(0, change_start - op_start)
insert_remaining_before = ""
if op_end <= change_end
# |-- existing insert --|
# |-- new delete --||.....| <- insert_remaining_after
delete_remaining_after = ""
insert_remaining_after = change.op.i.slice(op_end - change_start)
else
# |-- existing insert --||.....| <- delete_remaining_after
# |-- new delete --|
delete_remaining_after = op.d.slice(change_end - op_start)
insert_remaining_after = ""
insert_remaining = insert_remaining_before + insert_remaining_after
if insert_remaining.length > 0
change.op.i = insert_remaining
change.op.p = Math.min(change_start, op_start)
change.metadata.ts = metadata.ts
moved_changes.push change
else
remove_changes.push change
# We know what we want to preserve of our delete op before (delete_remaining_before) and what we want to preserve
# afterwards (delete_remaining_before). Now we need to turn that into a modification which deletes the
# chunk in the middle not covered by these.
delete_removed_length = op.d.length - delete_remaining_before.length - delete_remaining_after.length
delete_removed_start = delete_remaining_before.length
modification = {
d: op.d.slice(delete_removed_start, delete_removed_start + delete_removed_length)
p: delete_removed_start
}
if modification.d.length > 0
op_modifications.push modification
else if change.op.d?
change_start = change.op.p
if op_end < change_start or (!@track_changes and op_end == change_start)
# Shift ops after us back by our length.
# If we're tracking changes, it must be strictly before, since we'll merge
# below if they are touching. Otherwise, touching is fine.
change.op.p -= op_length
moved_changes.push change
else if op_start <= change_start <= op_end
if @track_changes
# If we overlap a delete, add it in our content, and delete the existing change.
# It's easier to do it this way, rather than modifying the existing delete in case
# we overlap many deletes and we'd need to track that. We have a workaround to
# update the delete in place if possible below.
offset = change_start - op_start
op_modifications.push { i: change.op.d, p: offset }
remove_changes.push change
else
change.op.p = op_start
moved_changes.push change
# Copy rather than modify because we still need to apply it to comments
op = {
p: op.p
d: @_applyOpModifications(op.d, op_modifications)
}
for change in remove_changes
# This is a bit of hack to avoid removing one delete and replacing it with another.
# If we don't do this, it causes the UI to flicker
if op.d.length > 0 and change.op.d? and op.p <= change.op.p <= op.p + op.d.length
change.op.p = op.p
change.op.d = op.d
change.metadata = metadata
moved_changes.push change
op.d = "" # stop it being added
else
@_removeChange change
if @track_changes and op.d.length > 0
@_addOp op, metadata
else
# It's possible that we deleted an insert between two other inserts. I.e.
# If we delete 'user_2 insert' in:
# |-- user_1 insert --||-- user_2 insert --||-- user_1 insert --|
# it becomes:
# |-- user_1 insert --||-- user_1 insert --|
# We need to merge these together again
results = @_scanAndMergeAdjacentUpdates()
moved_changes = moved_changes.concat(results.moved_changes)
for change in results.remove_changes
@_removeChange change
moved_changes = moved_changes.filter (c) -> c != change
if moved_changes.length > 0
@emit "changes:moved", moved_changes
_newId: () ->
(@id++).toString()
_addOp: (op, metadata) ->
change = {
id: @_newId()
op: op
metadata: metadata
}
@changes.push change
# Keep ops in order of offset, with deletes before inserts
@changes.sort (c1, c2) ->
result = c1.op.p - c2.op.p
if result != 0
return result
else if c1.op.i? and c2.op.d?
return 1
else
return -1
if op.d?
@emit "delete:added", change
else if op.i?
@emit "insert:added", change
_removeChange: (change) ->
@changes = @changes.filter (c) -> c.id != change.id
if change.op.d?
@emit "delete:removed", change
else if change.op.i?
@emit "insert:removed", change
_applyOpModifications: (content, op_modifications) ->
# Put in descending position order, with deleting first if at the same offset
# (Inserting first would modify the content that the delete will delete)
op_modifications.sort (a, b) ->
result = b.p - a.p
if result != 0
return result
else if a.i? and b.d?
return 1
else
return -1
for modification in op_modifications
if modification.i?
content = content.slice(0, modification.p) + modification.i + content.slice(modification.p)
else if modification.d?
if content.slice(modification.p, modification.p + modification.d.length) != modification.d
throw new Error("deleted content does not match. content: #{JSON.stringify(content)}; modification: #{JSON.stringify(modification)}")
content = content.slice(0, modification.p) + content.slice(modification.p + modification.d.length)
return content
_scanAndMergeAdjacentUpdates: () ->
# This should only need calling when deleting an update between two
# other updates. There's no other way to get two adjacent updates from the
# same user, since they would be merged on insert.
previous_change = null
remove_changes = []
moved_changes = []
for change in @changes
if previous_change?.op.i? and change.op.i?
previous_change_end = previous_change.op.p + previous_change.op.i.length
previous_change_user_id = previous_change.metadata.user_id
change_start = change.op.p
change_user_id = change.metadata.user_id
if previous_change_end == change_start and previous_change_user_id == change_user_id
remove_changes.push change
previous_change.op.i += change.op.i
moved_changes.push previous_change
else if previous_change?.op.d? and change.op.d? and previous_change.op.p == change.op.p
# Merge adjacent deletes
previous_change.op.d += change.op.d
remove_changes.push change
moved_changes.push previous_change
else # Only update to the current change if we haven't removed it.
previous_change = change
return { moved_changes, remove_changes }
if define?
define ["utils/EventEmitter"], load
else
EventEmitter = require("events").EventEmitter
module.exports = load(EventEmitter)

View file

@ -3,7 +3,8 @@ PersistenceManager = require "./PersistenceManager"
DiffCodec = require "./DiffCodec"
logger = require "logger-sharelatex"
Metrics = require "./Metrics"
TrackChangesManager = require "./TrackChangesManager"
HistoryManager = require "./HistoryManager"
WebRedisManager = require "./WebRedisManager"
module.exports = DocumentManager =
getDoc: (project_id, doc_id, _callback = (error, lines, version, alreadyLoaded) ->) ->
@ -12,18 +13,18 @@ module.exports = DocumentManager =
timer.done()
_callback(args...)
RedisManager.getDoc project_id, doc_id, (error, lines, version) ->
RedisManager.getDoc project_id, doc_id, (error, lines, version, track_changes, track_changes_entries) ->
return callback(error) if error?
if !lines? or !version?
logger.log project_id: project_id, doc_id: doc_id, "doc not in redis so getting from persistence API"
PersistenceManager.getDoc project_id, doc_id, (error, lines, version) ->
logger.log {project_id, doc_id, track_changes}, "doc not in redis so getting from persistence API"
PersistenceManager.getDoc project_id, doc_id, (error, lines, version, track_changes, track_changes_entries) ->
return callback(error) if error?
logger.log project_id: project_id, doc_id: doc_id, lines: lines, version: version, "got doc from persistence API"
RedisManager.putDocInMemory project_id, doc_id, lines, version, (error) ->
logger.log {project_id, doc_id, lines, version, track_changes}, "got doc from persistence API"
RedisManager.putDocInMemory project_id, doc_id, lines, version, track_changes, track_changes_entries, (error) ->
return callback(error) if error?
callback null, lines, version, false
callback null, lines, version, track_changes, track_changes_entries, false
else
callback null, lines, version, true
callback null, lines, version, track_changes, track_changes_entries, true
getDocAndRecentOps: (project_id, doc_id, fromVersion, _callback = (error, lines, version, recentOps) ->) ->
timer = new Metrics.Timer("docManager.getDocAndRecentOps")
@ -50,7 +51,7 @@ module.exports = DocumentManager =
return callback(new Error("No lines were provided to setDoc"))
UpdateManager = require "./UpdateManager"
DocumentManager.getDoc project_id, doc_id, (error, oldLines, version, alreadyLoaded) ->
DocumentManager.getDoc project_id, doc_id, (error, oldLines, version, track_changes, alreadyLoaded) ->
return callback(error) if error?
if oldLines? and oldLines.length > 0 and oldLines[0].text?
@ -89,14 +90,14 @@ module.exports = DocumentManager =
callback = (args...) ->
timer.done()
_callback(args...)
RedisManager.getDoc project_id, doc_id, (error, lines, version) ->
RedisManager.getDoc project_id, doc_id, (error, lines, version, track_changes, track_changes_entries) ->
return callback(error) if error?
if !lines? or !version?
logger.log project_id: project_id, doc_id: doc_id, "doc is not loaded so not flushing"
callback null # TODO: return a flag to bail out, as we go on to remove doc from memory?
else
logger.log project_id: project_id, doc_id: doc_id, version: version, "flushing doc"
PersistenceManager.setDoc project_id, doc_id, lines, version, (error) ->
PersistenceManager.setDoc project_id, doc_id, lines, version, track_changes, track_changes_entries, (error) ->
return callback(error) if error?
callback null
@ -111,13 +112,19 @@ module.exports = DocumentManager =
# Flush in the background since it requires and http request
# to track changes
TrackChangesManager.flushDocChanges project_id, doc_id, (err) ->
HistoryManager.flushDocChanges project_id, doc_id, (err) ->
if err?
logger.err {err, project_id, doc_id}, "error flushing to track changes"
RedisManager.removeDocFromMemory project_id, doc_id, (error) ->
return callback(error) if error?
callback null
setTrackChanges: (project_id, doc_id, track_changes_on, callback = (error) ->) ->
RedisManager.setTrackChanges project_id, doc_id, track_changes_on, (error) ->
return callback(error) if error?
WebRedisManager.sendData {project_id, doc_id, track_changes_on}
callback()
getDocWithLock: (project_id, doc_id, callback = (error, lines, version) ->) ->
UpdateManager = require "./UpdateManager"
@ -138,3 +145,7 @@ module.exports = DocumentManager =
flushAndDeleteDocWithLock: (project_id, doc_id, callback = (error) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.flushAndDeleteDoc, project_id, doc_id, callback
setTrackChangesWithLock: (project_id, doc_id, track_changes_on, callback = (error) ->) ->
UpdateManager = require "./UpdateManager"
UpdateManager.lockUpdatesAndDo DocumentManager.setTrackChanges, project_id, doc_id, track_changes_on, callback

View file

@ -0,0 +1,44 @@
settings = require "settings-sharelatex"
request = require "request"
logger = require "logger-sharelatex"
async = require "async"
WebRedisManager = require "./WebRedisManager"
module.exports = HistoryManager =
flushDocChanges: (project_id, doc_id, callback = (error) ->) ->
if !settings.apis?.trackchanges?
logger.warn doc_id: doc_id, "track changes API is not configured, so not flushing"
return callback()
url = "#{settings.apis.trackchanges.url}/project/#{project_id}/doc/#{doc_id}/flush"
logger.log project_id: project_id, doc_id: doc_id, url: url, "flushing doc in track changes api"
request.post url, (error, res, body)->
if error?
return callback(error)
else if res.statusCode >= 200 and res.statusCode < 300
return callback(null)
else
error = new Error("track changes api returned a failure status code: #{res.statusCode}")
return callback(error)
FLUSH_EVERY_N_OPS: 50
pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) ->
if ops.length == 0
return callback()
WebRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) ->
return callback(error) if error?
# We want to flush every 50 ops, i.e. 50, 100, 150, etc
# Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 50 and should flush.
# (Most of the time, we will only hit 50 and then flushing will put us back to 0)
previousLength = length - ops.length
prevBlock = Math.floor(previousLength / HistoryManager.FLUSH_EVERY_N_OPS)
newBlock = Math.floor(length / HistoryManager.FLUSH_EVERY_N_OPS)
if newBlock != prevBlock
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api"
HistoryManager.flushDocChanges project_id, doc_id, (error) ->
if error?
logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api"
callback()

View file

@ -96,3 +96,15 @@ module.exports = HttpController =
return next(error) if error?
logger.log project_id: project_id, "deleted project via http"
res.send 204 # No Content
setTrackChanges: (req, res, next = (error) ->) ->
project_id = req.params.project_id
track_changes_on = req.body.on
if !track_changes_on?
return res.send 400
track_changes_on = !!track_changes_on # Make boolean
logger.log {project_id, track_changes_on}, "turning on track changes via http"
ProjectManager.setTrackChangesWithLocks project_id, track_changes_on, (error) ->
return next(error) if error?
res.send 204

View file

@ -12,14 +12,14 @@ MAX_HTTP_REQUEST_LENGTH = 5000 # 5 seconds
module.exports = PersistenceManager =
getDoc: (project_id, doc_id, callback = (error, lines, version) ->) ->
PersistenceManager.getDocFromWeb project_id, doc_id, (error, lines) ->
PersistenceManager.getDocFromWeb project_id, doc_id, (error, lines, track_changes, track_changes_entries) ->
return callback(error) if error?
PersistenceManager.getDocVersionInMongo doc_id, (error, version) ->
return callback(error) if error?
callback null, lines, version
callback null, lines, version, track_changes, track_changes_entries
setDoc: (project_id, doc_id, lines, version, callback = (error) ->) ->
PersistenceManager.setDocInWeb project_id, doc_id, lines, (error) ->
setDoc: (project_id, doc_id, lines, version, track_changes, track_changes_entries, callback = (error) ->) ->
PersistenceManager.setDocInWeb project_id, doc_id, lines, track_changes, track_changes_entries, (error) ->
return callback(error) if error?
PersistenceManager.setDocVersionInMongo doc_id, version, (error) ->
return callback(error) if error?
@ -50,13 +50,13 @@ module.exports = PersistenceManager =
body = JSON.parse body
catch e
return callback(e)
return callback null, body.lines
return callback null, body.lines, body.track_changes, body.track_changes_entries
else if res.statusCode == 404
return callback(new Errors.NotFoundError("doc not not found: #{url}"))
else
return callback(new Error("error accessing web API: #{url} #{res.statusCode}"))
setDocInWeb: (project_id, doc_id, lines, _callback = (error) ->) ->
setDocInWeb: (project_id, doc_id, lines, track_changes, track_changes_entries, _callback = (error) ->) ->
timer = new Metrics.Timer("persistenceManager.setDoc")
callback = (args...) ->
timer.done()
@ -68,6 +68,8 @@ module.exports = PersistenceManager =
method: "POST"
body: JSON.stringify
lines: lines
track_changes: track_changes
track_changes_entries: track_changes_entries
headers:
"content-type": "application/json"
auth:

View file

@ -57,4 +57,30 @@ module.exports = ProjectManager =
else
callback(null)
setTrackChangesWithLocks: (project_id, track_changes_on, _callback = (error) ->) ->
timer = new Metrics.Timer("projectManager.toggleTrackChangesWithLocks")
callback = (args...) ->
timer.done()
_callback(args...)
RedisManager.getDocIdsInProject project_id, (error, doc_ids) ->
return callback(error) if error?
jobs = []
errors = []
for doc_id in (doc_ids or [])
do (doc_id) ->
jobs.push (callback) ->
DocumentManager.setTrackChangesWithLock project_id, doc_id, track_changes_on, (error) ->
if error?
logger.error {err: error, project_id, doc_ids, track_changes_on}, "error toggle track changes for doc"
errors.push(error)
callback()
# TODO: If no docs, turn on track changes in Mongo manually
logger.log {project_id, doc_ids, track_changes_on}, "toggling track changes for docs"
async.series jobs, () ->
if errors.length > 0
callback new Error("Errors toggling track changes for docs. See log for details")
else
callback(null)

View file

@ -34,6 +34,10 @@ module.exports = RedisKeyBuilder =
return (key_schema) -> key_schema.uncompressedHistoryOp({doc_id})
pendingUpdates: ({doc_id}) ->
return (key_schema) -> key_schema.pendingUpdates({doc_id})
trackChangesEnabled: ({doc_id}) ->
return (key_schema) -> key_schema.trackChangesEnabled({doc_id})
trackChangesEntries: ({doc_id}) ->
return (key_schema) -> key_schema.trackChangesEntries({doc_id})
docsInProject: ({project_id}) ->
return (key_schema) -> key_schema.docsInProject({project_id})
docsWithHistoryOps: ({project_id}) ->

View file

@ -13,7 +13,7 @@ minutes = 60 # seconds for Redis expire
module.exports = RedisManager =
rclient: rclient
putDocInMemory : (project_id, doc_id, docLines, version, _callback)->
putDocInMemory : (project_id, doc_id, docLines, version, track_changes, track_changes_entries, _callback)->
timer = new metrics.Timer("redis.put-doc")
callback = (error) ->
timer.done()
@ -23,6 +23,8 @@ module.exports = RedisManager =
multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines)
multi.set keys.projectKey({doc_id:doc_id}), project_id
multi.set keys.docVersion(doc_id:doc_id), version
multi.set keys.trackChangesEnabled(doc_id:doc_id), if track_changes then "1" else "0"
multi.set keys.trackChangesEntries(doc_id:doc_id), JSON.stringify(track_changes_entries)
multi.exec (error) ->
return callback(error) if error?
rclient.sadd keys.docsInProject(project_id:project_id), doc_id, callback
@ -41,30 +43,36 @@ module.exports = RedisManager =
multi.del keys.docLines(doc_id:doc_id)
multi.del keys.projectKey(doc_id:doc_id)
multi.del keys.docVersion(doc_id:doc_id)
multi.del keys.trackChangesEnabled(doc_id:doc_id)
multi.del keys.trackChangesEntries(doc_id:doc_id)
multi.exec (error) ->
return callback(error) if error?
rclient.srem keys.docsInProject(project_id:project_id), doc_id, callback
getDoc : (project_id, doc_id, callback = (error, lines, version, project_id) ->)->
getDoc : (project_id, doc_id, callback = (error, lines, version, track_changes, track_changes_entries) ->)->
timer = new metrics.Timer("redis.get-doc")
multi = rclient.multi()
multi.get keys.docLines(doc_id:doc_id)
multi.get keys.docVersion(doc_id:doc_id)
multi.get keys.projectKey(doc_id:doc_id)
multi.get keys.trackChangesEnabled(doc_id:doc_id)
multi.get keys.trackChangesEntries(doc_id:doc_id)
multi.exec (error, result)->
timer.done()
return callback(error) if error?
try
docLines = JSON.parse result[0]
track_changes_entries = JSON.parse result[4]
catch e
return callback(e)
version = parseInt(result[1] or 0, 10)
doc_project_id = result[2]
track_changes = (result[3] == "1")
# check doc is in requested project
if doc_project_id? and doc_project_id isnt project_id
logger.error project_id: project_id, doc_id: doc_id, doc_project_id: doc_project_id, "doc not in project"
return callback(new Errors.NotFoundError("document not found"))
callback null, docLines, version, project_id
callback null, docLines, version, track_changes, track_changes_entries
getDocVersion: (doc_id, callback = (error, version) ->) ->
rclient.get keys.docVersion(doc_id: doc_id), (error, version) ->
@ -104,7 +112,7 @@ module.exports = RedisManager =
DOC_OPS_TTL: 60 * minutes
DOC_OPS_MAX_LENGTH: 100
updateDocument : (doc_id, docLines, newVersion, appliedOps = [], callback = (error) ->)->
updateDocument : (doc_id, docLines, newVersion, appliedOps = [], track_changes_entries, callback = (error) ->)->
RedisManager.getDocVersion doc_id, (error, currentVersion) ->
return callback(error) if error?
if currentVersion + appliedOps.length != newVersion
@ -119,6 +127,7 @@ module.exports = RedisManager =
multi.rpush keys.docOps(doc_id: doc_id), jsonOps...
multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL
multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1
multi.set keys.trackChangesEntries(doc_id:doc_id), JSON.stringify(track_changes_entries)
multi.exec (error, replys) ->
return callback(error) if error?
return callback()
@ -126,3 +135,6 @@ module.exports = RedisManager =
getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) ->
rclient.smembers keys.docsInProject(project_id: project_id), callback
setTrackChanges: (project_id, doc_id, track_changes_on, callback = (error) ->) ->
value = (if track_changes_on then "1" else "0")
rclient.set keys.trackChangesEnabled({doc_id}), value, callback

View file

@ -1,12 +1,11 @@
Keys = require('./UpdateKeys')
Settings = require('settings-sharelatex')
DocumentManager = require "./DocumentManager"
RedisManager = require "./RedisManager"
Errors = require "./Errors"
logger = require "logger-sharelatex"
module.exports = class ShareJsDB
constructor: () ->
constructor: (@project_id, @doc_id, @lines, @version) ->
@appliedOps = {}
# ShareJS calls this detacted from the instance, so we need
# bind it to keep our context that can access @appliedOps
@ -31,22 +30,14 @@ module.exports = class ShareJsDB
callback()
getSnapshot: (doc_key, callback) ->
[project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key)
DocumentManager.getDoc project_id, doc_id, (error, lines, version) ->
return callback(error) if error?
if !lines? or !version?
return callback(new Errors.NotFoundError("document not found: #{doc_id}"))
if lines.length > 0 and lines[0].text?
type = "json"
snapshot = lines: lines
else
type = "text"
snapshot = lines.join("\n")
callback null,
snapshot: snapshot
v: parseInt(version, 10)
type: type
if doc_key != Keys.combineProjectIdAndDocId(@project_id, @doc_id)
return callback(new Errors.NotFoundError("unexpected doc_key #{doc_key}, expected #{Keys.combineProjectIdAndDocId(@project_id, @doc_id)}"))
else
return callback null, {
snapshot: @lines.join("\n")
v: parseInt(@version, 10)
type: "text"
}
# To be able to remove a doc from the ShareJS memory
# we need to called Model::delete, which calls this

View file

@ -6,22 +6,19 @@ Settings = require('settings-sharelatex')
Keys = require "./UpdateKeys"
{EventEmitter} = require "events"
util = require "util"
redis = require("redis-sharelatex")
rclient = redis.createClient(Settings.redis.web)
WebRedisManager = require "./WebRedisManager"
ShareJsModel:: = {}
util.inherits ShareJsModel, EventEmitter
module.exports = ShareJsUpdateManager =
getNewShareJsModel: () ->
db = new ShareJsDB()
getNewShareJsModel: (project_id, doc_id, lines, version) ->
db = new ShareJsDB(project_id, doc_id, lines, version)
model = new ShareJsModel(db, maxDocLength: Settings.max_doc_length)
model.db = db
return model
applyUpdate: (project_id, doc_id, update, callback = (error, updatedDocLines) ->) ->
applyUpdate: (project_id, doc_id, update, lines, version, callback = (error, updatedDocLines) ->) ->
logger.log project_id: project_id, doc_id: doc_id, update: update, "applying sharejs updates"
jobs = []
@ -30,7 +27,7 @@ module.exports = ShareJsUpdateManager =
# getting stuck due to queued callbacks (line 260 of sharejs/server/model.coffee)
# This adds a small but hopefully acceptable overhead (~12ms per 1000 updates on
# my 2009 MBP).
model = @getNewShareJsModel()
model = @getNewShareJsModel(project_id, doc_id, lines, version)
@_listenForOps(model)
doc_key = Keys.combineProjectIdAndDocId(project_id, doc_id)
model.applyOp doc_key, update, (error) ->
@ -55,18 +52,9 @@ module.exports = ShareJsUpdateManager =
[project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key)
ShareJsUpdateManager._sendOp(project_id, doc_id, opData)
_sendOp: (project_id, doc_id, opData) ->
data =
project_id: project_id
doc_id: doc_id
op: opData
data = JSON.stringify data
rclient.publish "applied-ops", data
_sendOp: (project_id, doc_id, op) ->
WebRedisManager.sendData {project_id, doc_id, op}
_sendError: (project_id, doc_id, error) ->
data = JSON.stringify
project_id: project_id
doc_id: doc_id
error: error.message || error
rclient.publish "applied-ops", data
WebRedisManager.sendData {project_id, doc_id, error: error.message || error}

View file

@ -1,44 +1,12 @@
settings = require "settings-sharelatex"
request = require "request"
logger = require "logger-sharelatex"
async = require "async"
WebRedisManager = require "./WebRedisManager"
ChangesTracker = require "./ChangesTracker"
module.exports = TrackChangesManager =
flushDocChanges: (project_id, doc_id, callback = (error) ->) ->
if !settings.apis?.trackchanges?
logger.warn doc_id: doc_id, "track changes API is not configured, so not flushing"
return callback()
url = "#{settings.apis.trackchanges.url}/project/#{project_id}/doc/#{doc_id}/flush"
logger.log project_id: project_id, doc_id: doc_id, url: url, "flushing doc in track changes api"
request.post url, (error, res, body)->
if error?
return callback(error)
else if res.statusCode >= 200 and res.statusCode < 300
return callback(null)
else
error = new Error("track changes api returned a failure status code: #{res.statusCode}")
return callback(error)
FLUSH_EVERY_N_OPS: 50
pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) ->
if ops.length == 0
return callback()
WebRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) ->
return callback(error) if error?
# We want to flush every 50 ops, i.e. 50, 100, 150, etc
# Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these
# ops. If we've changed, then we've gone over a multiple of 50 and should flush.
# (Most of the time, we will only hit 50 and then flushing will put us back to 0)
previousLength = length - ops.length
prevBlock = Math.floor(previousLength / TrackChangesManager.FLUSH_EVERY_N_OPS)
newBlock = Math.floor(length / TrackChangesManager.FLUSH_EVERY_N_OPS)
if newBlock != prevBlock
# Do this in the background since it uses HTTP and so may be too
# slow to wait for when processing a doc update.
logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api"
TrackChangesManager.flushDocChanges project_id, doc_id, (error) ->
if error?
logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api"
callback()
applyUpdate: (project_id, doc_id, entries = {}, updates = [], track_changes, callback = (error, new_entries) ->) ->
{changes, comments} = entries
changesTracker = new ChangesTracker(changes, comments)
changesTracker.track_changes = track_changes
for update in updates
for op in update.op
changesTracker.applyOp(op, { user_id: update.meta?.user_id, })
{changes, comments} = changesTracker
callback null, {changes, comments}

View file

@ -2,11 +2,14 @@ LockManager = require "./LockManager"
RedisManager = require "./RedisManager"
WebRedisManager = require "./WebRedisManager"
ShareJsUpdateManager = require "./ShareJsUpdateManager"
TrackChangesManager = require "./TrackChangesManager"
HistoryManager = require "./HistoryManager"
Settings = require('settings-sharelatex')
async = require("async")
logger = require('logger-sharelatex')
Metrics = require "./Metrics"
Errors = require "./Errors"
DocumentManager = require "./DocumentManager"
TrackChangesManager = require "./TrackChangesManager"
module.exports = UpdateManager =
processOutstandingUpdates: (project_id, doc_id, callback = (error) ->) ->
@ -45,12 +48,18 @@ module.exports = UpdateManager =
applyUpdate: (project_id, doc_id, update, callback = (error) ->) ->
UpdateManager._sanitizeUpdate update
ShareJsUpdateManager.applyUpdate project_id, doc_id, update, (error, updatedDocLines, version, appliedOps) ->
DocumentManager.getDoc project_id, doc_id, (error, lines, version, track_changes, track_changes_entries) ->
return callback(error) if error?
logger.log doc_id: doc_id, version: version, "updating doc in redis"
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, (error) ->
if !lines? or !version?
return callback(new Errors.NotFoundError("document not found: #{doc_id}"))
ShareJsUpdateManager.applyUpdate project_id, doc_id, update, lines, version, (error, updatedDocLines, version, appliedOps) ->
return callback(error) if error?
TrackChangesManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback
TrackChangesManager.applyUpdate project_id, doc_id, track_changes_entries, appliedOps, track_changes, (error, new_track_changes_entries) ->
return callback(error) if error?
logger.log doc_id: doc_id, version: version, "updating doc in redis"
RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_track_changes_entries, (error) ->
return callback(error) if error?
HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback
lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) ->
LockManager.getLock doc_id, (error, lockValue) ->

View file

@ -32,4 +32,7 @@ module.exports = WebRedisManager =
], (error, results) ->
return callback(error) if error?
[length, _] = results
callback(error, length)
callback(error, length)
sendData: (data) ->
rclient.publish "applied-ops", JSON.stringify(data)

View file

@ -32,6 +32,8 @@ module.exports =
docVersion: ({doc_id}) -> "DocVersion:#{doc_id}"
projectKey: ({doc_id}) -> "ProjectId:#{doc_id}"
docsInProject: ({project_id}) -> "DocsIn:#{project_id}"
trackChangesEnabled: ({doc_id}) -> "TrackChangesEnabled:#{doc_id}"
trackChangesEntries: ({doc_id}) -> "TrackChangesEntries:#{doc_id}"
# }, {
# cluster: [{
# port: "7000"
@ -44,6 +46,8 @@ module.exports =
# docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}"
# projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}"
# docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}"
# trackChangesEnabled: ({doc_id}) -> "TrackChangesEnabled:{#{doc_id}}"
# trackChangesEntries: ({doc_id}) -> "TrackChangesEntries:{#{doc_id}}"
}]
max_doc_length: 2 * 1024 * 1024 # 2mb

View file

@ -0,0 +1,96 @@
sinon = require "sinon"
chai = require("chai")
chai.should()
async = require "async"
rclient = require("redis").createClient()
MockWebApi = require "./helpers/MockWebApi"
DocUpdaterClient = require "./helpers/DocUpdaterClient"
describe "Track changes", ->
describe "turning on track changes", ->
before (done) ->
DocUpdaterClient.subscribeToAppliedOps @appliedOpsListener = sinon.stub()
@project_id = DocUpdaterClient.randomId()
@docs = [{
id: doc_id0 = DocUpdaterClient.randomId()
lines: ["one", "two", "three"]
updatedLines: ["one", "one and a half", "two", "three"]
}, {
id: doc_id1 = DocUpdaterClient.randomId()
lines: ["four", "five", "six"]
updatedLines: ["four", "four and a half", "five", "six"]
}]
for doc in @docs
MockWebApi.insertDoc @project_id, doc.id, {
lines: doc.lines
version: 0
}
async.series @docs.map((doc) =>
(callback) =>
DocUpdaterClient.preloadDoc @project_id, doc.id, callback
), (error) =>
throw error if error?
setTimeout () =>
DocUpdaterClient.setTrackChangesOn @project_id, (error, res, body) =>
@statusCode = res.statusCode
done()
, 200
it "should return a 204 status code", ->
@statusCode.should.equal 204
it "should send a track changes message to real-time for each doc", ->
@appliedOpsListener.calledWith("applied-ops", JSON.stringify({
project_id: @project_id, doc_id: @docs[0].id, track_changes_on: true
})).should.equal true
@appliedOpsListener.calledWith("applied-ops", JSON.stringify({
project_id: @project_id, doc_id: @docs[1].id, track_changes_on: true
})).should.equal true
it "should set the track changes key in redis", (done) ->
rclient.get "TrackChangesEnabled:#{@docs[0].id}", (error, value) =>
throw error if error?
value.should.equal "1"
rclient.get "TrackChangesEnabled:#{@docs[1].id}", (error, value) ->
throw error if error?
value.should.equal "1"
done()
describe "tracking changes", ->
before (done) ->
@project_id = DocUpdaterClient.randomId()
@doc = {
id: doc_id0 = DocUpdaterClient.randomId()
lines: ["one", "two", "three"]
}
@update =
doc: @doc.id
op: [{
i: "one and a half\n"
p: 4
}]
v: 0
meta:
user_id: @user_id = DocUpdaterClient.randomId()
MockWebApi.insertDoc @project_id, @doc.id, {
lines: @doc.lines
version: 0
}
DocUpdaterClient.preloadDoc @project_id, @doc.id, (error) =>
throw error if error?
DocUpdaterClient.setTrackChangesOn @project_id, (error, res, body) =>
throw error if error?
DocUpdaterClient.sendUpdate @project_id, @doc.id, @update, (error) ->
throw error if error?
setTimeout done, 200
it "should set the updated track changes entries in redis", (done) ->
rclient.get "TrackChangesEntries:#{@doc.id}", (error, value) =>
throw error if error?
entries = JSON.parse(value)
change = entries.changes[0]
change.op.should.deep.equal @update.op[0]
change.metadata.user_id.should.equal @user_id
done()

View file

@ -72,7 +72,18 @@ module.exports = DocUpdaterClient =
deleteProject: (project_id, callback = () ->) ->
request.del "http://localhost:3003/project/#{project_id}", callback
setTrackChangesOn: (project_id, callback = () ->) ->
request.post {
url: "http://localhost:3003/project/#{project_id}/track_changes"
json:
on: true
}, callback
setTrackChangesOff: (project_id, callback = () ->) ->
request.post {
url: "http://localhost:3003/project/#{project_id}/track_changes"
json:
on: false
}, callback