/* eslint-disable no-console, no-return-assign, */ // TODO: This file was created by bulk-decaffeinate. // Fix any style issues and re-enable lint. /* * decaffeinate suggestions: * DS101: Remove unnecessary use of Array.from * DS102: Remove unnecessary code created because of implicit returns * DS103: Rewrite code to no longer use __guard__ * DS104: Avoid inline assignments * DS204: Change includes calls to have a more natural evaluation order * DS205: Consider reworking code to avoid use of IIFEs * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ // The model of all the ops. Responsible for applying & transforming remote deltas // and managing the storage layer. // // Actual storage is handled by the database wrappers in db/*, wrapped by DocCache let Model const { EventEmitter } = require('events') const queue = require('./syncqueue') const types = require('../types') const Profiler = require('../../Profiler') const isArray = o => Object.prototype.toString.call(o) === '[object Array]' // This constructor creates a new Model object. There will be one model object // per server context. // // The model object is responsible for a lot of things: // // - It manages the interactions with the database // - It maintains (in memory) a set of all active documents // - It calls out to the OT functions when necessary // // The model is an event emitter. It emits the following events: // // create(docName, data): A document has been created with the specified name & data module.exports = Model = function (db, options) { // db can be null if the user doesn't want persistance. let getOps if (!(this instanceof Model)) { return new Model(db, options) } const model = this if (options == null) { options = {} } // This is a cache of 'live' documents. // // The cache is a map from docName -> { // ops:[{op, meta}] // snapshot // type // v // meta // eventEmitter // reapTimer // committedVersion: v // snapshotWriteLock: bool to make sure writeSnapshot isn't re-entrant // dbMeta: database specific data // opQueue: syncQueue for processing ops // } // // The ops list contains the document's last options.numCachedOps ops. (Or all // of them if we're using a memory store). // // Documents are stored in this set so long as the document has been accessed in // the last few seconds (options.reapTime) OR at least one client has the document // open. I don't know if I should keep open (but not being edited) documents live - // maybe if a client has a document open but the document isn't being edited, I should // flush it from the cache. // // In any case, the API to model is designed such that if we want to change that later // it should be pretty easy to do so without any external-to-the-model code changes. const docs = {} // This is a map from docName -> [callback]. It is used when a document hasn't been // cached and multiple getSnapshot() / getVersion() requests come in. All requests // are added to the callback list and called when db.getSnapshot() returns. // // callback(error, snapshot data) const awaitingGetSnapshot = {} // The time that documents which no clients have open will stay in the cache. // Should be > 0. if (options.reapTime == null) { options.reapTime = 3000 } // The number of operations the cache holds before reusing the space if (options.numCachedOps == null) { options.numCachedOps = 10 } // This option forces documents to be reaped, even when there's no database backend. // This is useful when you don't care about persistance and don't want to gradually // fill memory. // // You might want to set reapTime to a day or something. if (options.forceReaping == null) { options.forceReaping = false } // Until I come up with a better strategy, we'll save a copy of the document snapshot // to the database every ~20 submitted ops. if (options.opsBeforeCommit == null) { options.opsBeforeCommit = 20 } // It takes some processing time to transform client ops. The server will punt ops back to the // client to transform if they're too old. if (options.maximumAge == null) { options.maximumAge = 40 } // **** Cache API methods // Its important that all ops are applied in order. This helper method creates the op submission queue // for a single document. This contains the logic for transforming & applying ops. const makeOpQueue = (docName, doc) => queue(function (opData, callback) { if (!(opData.v >= 0)) { return callback('Version missing') } if (opData.v > doc.v) { return callback('Op at future version') } // Punt the transforming work back to the client if the op is too old. if (opData.v + options.maximumAge < doc.v) { return callback('Op too old') } if (!opData.meta) { opData.meta = {} } opData.meta.ts = Date.now() // We'll need to transform the op to the current version of the document. This // calls the callback immediately if opVersion == doc.v. return getOps(docName, opData.v, doc.v, function (error, ops) { let snapshot if (error) { return callback(error) } if (doc.v - opData.v !== ops.length) { // This should never happen. It indicates that we didn't get all the ops we // asked for. Its important that the submitted op is correctly transformed. console.error( `Could not get old ops in model for document ${docName}` ) console.error( `Expected ops ${opData.v} to ${doc.v} and got ${ops.length} ops` ) return callback('Internal error') } if (ops.length > 0) { try { const profile = new Profiler('model.transform') // If there's enough ops, it might be worth spinning this out into a webworker thread. for (const oldOp of Array.from(ops)) { // Dup detection works by sending the id(s) the op has been submitted with previously. // If the id matches, we reject it. The client can also detect the op has been submitted // already if it sees its own previous id in the ops it sees when it does catchup. if ( oldOp.meta.source && opData.dupIfSource && Array.from(opData.dupIfSource).includes(oldOp.meta.source) ) { return callback('Op already submitted') } opData.op = doc.type.transform(opData.op, oldOp.op, 'left') opData.v++ } profile.log('transform', { sync: true }).end() } catch (error1) { error = error1 return callback(error.message) } } try { const profile = new Profiler('model.apply') snapshot = doc.type.apply(doc.snapshot, opData.op) profile.log('model.apply', { sync: true }).end() } catch (error2) { error = error2 return callback(error.message) } if ( options.maxDocLength != null && doc.snapshot.length > options.maxDocLength ) { return callback('Update takes doc over max doc size') } // The op data should be at the current version, and the new document data should be at // the next version. // // This should never happen in practice, but its a nice little check to make sure everything // is hunky-dory. if (opData.v !== doc.v) { // This should never happen. console.error( 'Version mismatch detected in model. File a ticket - this is a bug.' ) console.error(`Expecting ${opData.v} == ${doc.v}`) return callback('Internal error') } // newDocData = {snapshot, type:type.name, v:opVersion + 1, meta:docData.meta} const writeOp = (db != null ? db.writeOp : undefined) || ((docName, newOpData, callback) => callback()) return writeOp(docName, opData, function (error) { if (error) { // The user should probably know about this. console.warn(`Error writing ops to database: ${error}`) return callback(error) } __guardMethod__(options.stats, 'writeOp', o => o.writeOp()) // This is needed when we emit the 'change' event, below. const oldSnapshot = doc.snapshot // All the heavy lifting is now done. Finally, we'll update the cache with the new data // and (maybe!) save a new document snapshot to the database. doc.v = opData.v + 1 doc.snapshot = snapshot doc.ops.push(opData) if (db && doc.ops.length > options.numCachedOps) { doc.ops.shift() } model.emit('applyOp', docName, opData, snapshot, oldSnapshot) doc.eventEmitter.emit('op', opData, snapshot, oldSnapshot) // The callback is called with the version of the document at which the op was applied. // This is the op.v after transformation, and its doc.v - 1. callback(null, opData.v) // I need a decent strategy here for deciding whether or not to save the snapshot. // // The 'right' strategy looks something like "Store the snapshot whenever the snapshot // is smaller than the accumulated op data". For now, I'll just store it every 20 // ops or something. (Configurable with doc.committedVersion) if ( !doc.snapshotWriteLock && doc.committedVersion + options.opsBeforeCommit <= doc.v ) { return tryWriteSnapshot(docName, function (error) { if (error) { return console.warn( `Error writing snapshot ${error}. This is nonfatal` ) } }) } }) }) }) // Add the data for the given docName to the cache. The named document shouldn't already // exist in the doc set. // // Returns the new doc. const add = function (docName, error, data, committedVersion, ops, dbMeta) { let callback, doc const callbacks = awaitingGetSnapshot[docName] delete awaitingGetSnapshot[docName] if (error) { if (callbacks) { for (callback of Array.from(callbacks)) { callback(error) } } } else { doc = docs[docName] = { snapshot: data.snapshot, v: data.v, type: data.type, meta: data.meta, // Cache of ops ops: ops || [], eventEmitter: new EventEmitter(), // Timer before the document will be invalidated from the cache (if the document has no // listeners) reapTimer: null, // Version of the snapshot thats in the database committedVersion: committedVersion != null ? committedVersion : data.v, snapshotWriteLock: false, dbMeta, } doc.opQueue = makeOpQueue(docName, doc) refreshReapingTimeout(docName) model.emit('add', docName, data) if (callbacks) { for (callback of Array.from(callbacks)) { callback(null, doc) } } } return doc } // This is a little helper wrapper around db.getOps. It does two things: // // - If there's no database set, it returns an error to the callback // - It adds version numbers to each op returned from the database // (These can be inferred from context so the DB doesn't store them, but its useful to have them). const getOpsInternal = function (docName, start, end, callback) { if (!db) { return typeof callback === 'function' ? callback('Document does not exist') : undefined } return db.getOps(docName, start, end, function (error, ops) { if (error) { return typeof callback === 'function' ? callback(error) : undefined } let v = start for (const op of Array.from(ops)) { op.v = v++ } return typeof callback === 'function' ? callback(null, ops) : undefined }) } // Load the named document into the cache. This function is re-entrant. // // The callback is called with (error, doc) const load = function (docName, callback) { if (docs[docName]) { // The document is already loaded. Return immediately. __guardMethod__(options.stats, 'cacheHit', o => o.cacheHit('getSnapshot')) return callback(null, docs[docName]) } // We're a memory store. If we don't have it, nobody does. if (!db) { return callback('Document does not exist') } const callbacks = awaitingGetSnapshot[docName] // The document is being loaded already. Add ourselves as a callback. if (callbacks) { return callbacks.push(callback) } __guardMethod__(options.stats, 'cacheMiss', o1 => o1.cacheMiss('getSnapshot') ) // The document isn't loaded and isn't being loaded. Load it. awaitingGetSnapshot[docName] = [callback] return db.getSnapshot(docName, function (error, data, dbMeta) { if (error) { return add(docName, error) } const type = types[data.type] if (!type) { console.warn(`Type '${data.type}' missing`) return callback('Type not found') } data.type = type const committedVersion = data.v // The server can close without saving the most recent document snapshot. // In this case, there are extra ops which need to be applied before // returning the snapshot. return getOpsInternal(docName, data.v, null, function (error, ops) { if (error) { return callback(error) } if (ops.length > 0) { console.log(`Catchup ${docName} ${data.v} -> ${data.v + ops.length}`) try { for (const op of Array.from(ops)) { data.snapshot = type.apply(data.snapshot, op.op) data.v++ } } catch (e) { // This should never happen - it indicates that whats in the // database is invalid. console.error(`Op data invalid for ${docName}: ${e.stack}`) return callback('Op data invalid') } } model.emit('load', docName, data) return add(docName, error, data, committedVersion, ops, dbMeta) }) }) } // This makes sure the cache contains a document. If the doc cache doesn't contain // a document, it is loaded from the database and stored. // // Documents are stored so long as either: // - They have been accessed within the past #{PERIOD} // - At least one client has the document open function refreshReapingTimeout(docName) { const doc = docs[docName] if (!doc) { return } // I want to let the clients list be updated before this is called. return process.nextTick(function () { // This is an awkward way to find out the number of clients on a document. If this // causes performance issues, add a numClients field to the document. // // The first check is because its possible that between refreshReapingTimeout being called and this // event being fired, someone called delete() on the document and hence the doc is something else now. if ( doc === docs[docName] && doc.eventEmitter.listeners('op').length === 0 && (db || options.forceReaping) && doc.opQueue.busy === false ) { let reapTimer clearTimeout(doc.reapTimer) return (doc.reapTimer = reapTimer = setTimeout( () => tryWriteSnapshot(docName, function () { // If the reaping timeout has been refreshed while we're writing the snapshot, or if we're // in the middle of applying an operation, don't reap. if ( docs[docName].reapTimer === reapTimer && doc.opQueue.busy === false ) { return delete docs[docName] } }), options.reapTime )) } }) } function tryWriteSnapshot(docName, callback) { if (!db) { return typeof callback === 'function' ? callback() : undefined } const doc = docs[docName] // The doc is closed if (!doc) { return typeof callback === 'function' ? callback() : undefined } // The document is already saved. if (doc.committedVersion === doc.v) { return typeof callback === 'function' ? callback() : undefined } if (doc.snapshotWriteLock) { return typeof callback === 'function' ? callback('Another snapshot write is in progress') : undefined } doc.snapshotWriteLock = true __guardMethod__(options.stats, 'writeSnapshot', o => o.writeSnapshot()) const writeSnapshot = (db != null ? db.writeSnapshot : undefined) || ((docName, docData, dbMeta, callback) => callback()) const data = { v: doc.v, meta: doc.meta, snapshot: doc.snapshot, // The database doesn't know about object types. type: doc.type.name, } // Commit snapshot. return writeSnapshot(docName, data, doc.dbMeta, function (error, dbMeta) { doc.snapshotWriteLock = false // We have to use data.v here because the version in the doc could // have been updated between the call to writeSnapshot() and now. doc.committedVersion = data.v doc.dbMeta = dbMeta return typeof callback === 'function' ? callback(error) : undefined }) } // *** Model interface methods // Create a new document. // // data should be {snapshot, type, [meta]}. The version of a new document is 0. this.create = function (docName, type, meta, callback) { if (typeof meta === 'function') { ;[meta, callback] = Array.from([{}, meta]) } if (docName.match(/\//)) { return typeof callback === 'function' ? callback('Invalid document name') : undefined } if (docs[docName]) { return typeof callback === 'function' ? callback('Document already exists') : undefined } if (typeof type === 'string') { type = types[type] } if (!type) { return typeof callback === 'function' ? callback('Type not found') : undefined } const data = { snapshot: type.create(), type: type.name, meta: meta || {}, v: 0, } const done = function (error, dbMeta) { // dbMeta can be used to cache extra state needed by the database to access the document, like an ID or something. if (error) { return typeof callback === 'function' ? callback(error) : undefined } // From here on we'll store the object version of the type name. data.type = type add(docName, null, data, 0, [], dbMeta) model.emit('create', docName, data) return typeof callback === 'function' ? callback() : undefined } if (db) { return db.create(docName, data, done) } else { return done() } } // Perminantly deletes the specified document. // If listeners are attached, they are removed. // // The callback is called with (error) if there was an error. If error is null / undefined, the // document was deleted. // // WARNING: This isn't well supported throughout the code. (Eg, streaming clients aren't told about the // deletion. Subsequent op submissions will fail). this.delete = function (docName, callback) { const doc = docs[docName] if (doc) { clearTimeout(doc.reapTimer) delete docs[docName] } const done = function (error) { if (!error) { model.emit('delete', docName) } return typeof callback === 'function' ? callback(error) : undefined } if (db) { return db.delete(docName, doc != null ? doc.dbMeta : undefined, done) } else { return done(!doc ? 'Document does not exist' : undefined) } } // This gets all operations from [start...end]. (That is, its not inclusive.) // // end can be null. This means 'get me all ops from start'. // // Each op returned is in the form {op:o, meta:m, v:version}. // // Callback is called with (error, [ops]) // // If the document does not exist, getOps doesn't necessarily return an error. This is because // its awkward to figure out whether or not the document exists for things // like the redis database backend. I guess its a bit gross having this inconsistant // with the other DB calls, but its certainly convenient. // // Use getVersion() to determine if a document actually exists, if thats what you're // after. this.getOps = getOps = function (docName, start, end, callback) { // getOps will only use the op cache if its there. It won't fill the op cache in. if (!(start >= 0)) { throw new Error('start must be 0+') } if (typeof end === 'function') { ;[end, callback] = Array.from([null, end]) } const ops = docs[docName] != null ? docs[docName].ops : undefined if (ops) { const version = docs[docName].v // Ops contains an array of ops. The last op in the list is the last op applied if (end == null) { end = version } start = Math.min(start, end) if (start === end) { return callback(null, []) } // Base is the version number of the oldest op we have cached const base = version - ops.length // If the database is null, we'll trim to the ops we do have and hope thats enough. if (start >= base || db === null) { refreshReapingTimeout(docName) if (options.stats != null) { options.stats.cacheHit('getOps') } return callback(null, ops.slice(start - base, end - base)) } } if (options.stats != null) { options.stats.cacheMiss('getOps') } return getOpsInternal(docName, start, end, callback) } // Gets the snapshot data for the specified document. // getSnapshot(docName, callback) // Callback is called with (error, {v: , type: , snapshot: , meta: }) this.getSnapshot = (docName, callback) => load(docName, (error, doc) => callback( error, doc ? { v: doc.v, type: doc.type, snapshot: doc.snapshot, meta: doc.meta } : undefined ) ) // Gets the latest version # of the document. // getVersion(docName, callback) // callback is called with (error, version). this.getVersion = (docName, callback) => load(docName, (error, doc) => callback(error, doc != null ? doc.v : undefined) ) // Apply an op to the specified document. // The callback is passed (error, applied version #) // opData = {op:op, v:v, meta:metadata} // // Ops are queued before being applied so that the following code applies op C before op B: // model.applyOp 'doc', OPA, -> model.applyOp 'doc', OPB // model.applyOp 'doc', OPC this.applyOp = ( docName, opData, callback // All the logic for this is in makeOpQueue, above. ) => load(docName, function (error, doc) { if (error) { return callback(error) } return process.nextTick(() => doc.opQueue(opData, function (error, newVersion) { refreshReapingTimeout(docName) return typeof callback === 'function' ? callback(error, newVersion) : undefined }) ) }) // TODO: store (some) metadata in DB // TODO: op and meta should be combineable in the op that gets sent this.applyMetaOp = function (docName, metaOpData, callback) { const { path, value } = metaOpData.meta if (!isArray(path)) { return typeof callback === 'function' ? callback('path should be an array') : undefined } return load(docName, function (error, doc) { if (error != null) { return typeof callback === 'function' ? callback(error) : undefined } else { let applied = false switch (path[0]) { case 'shout': doc.eventEmitter.emit('op', metaOpData) applied = true break } if (applied) { model.emit('applyMetaOp', docName, path, value) } return typeof callback === 'function' ? callback(null, doc.v) : undefined } }) } // Listen to all ops from the specified version. If version is in the past, all // ops since that version are sent immediately to the listener. // // The callback is called once the listener is attached, but before any ops have been passed // to the listener. // // This will _not_ edit the document metadata. // // If there are any listeners, we don't purge the document from the cache. But be aware, this behaviour // might change in a future version. // // version is the document version at which the document is opened. It can be left out if you want to open // the document at the most recent version. // // listener is called with (opData) each time an op is applied. // // callback(error, openedVersion) this.listen = function (docName, version, listener, callback) { if (typeof version === 'function') { ;[version, listener, callback] = Array.from([null, version, listener]) } return load(docName, function (error, doc) { if (error) { return typeof callback === 'function' ? callback(error) : undefined } clearTimeout(doc.reapTimer) if (version != null) { return getOps(docName, version, null, function (error, data) { if (error) { return typeof callback === 'function' ? callback(error) : undefined } doc.eventEmitter.on('op', listener) if (typeof callback === 'function') { callback(null, version) } return (() => { const result = [] for (const op of Array.from(data)) { let needle listener(op) // The listener may well remove itself during the catchup phase. If this happens, break early. // This is done in a quite inefficient way. (O(n) where n = #listeners on doc) if ( ((needle = listener), !Array.from(doc.eventEmitter.listeners('op')).includes(needle)) ) { break } else { result.push(undefined) } } return result })() }) } else { // Version is null / undefined. Just add the listener. doc.eventEmitter.on('op', listener) return typeof callback === 'function' ? callback(null, doc.v) : undefined } }) } // Remove a listener for a particular document. // // removeListener(docName, listener) // // This is synchronous. this.removeListener = function (docName, listener) { // The document should already be loaded. const doc = docs[docName] if (!doc) { throw new Error('removeListener called but document not loaded') } doc.eventEmitter.removeListener('op', listener) return refreshReapingTimeout(docName) } // Flush saves all snapshot data to the database. I'm not sure whether or not this is actually needed - // sharejs will happily replay uncommitted ops when documents are re-opened anyway. this.flush = function (callback) { if (!db) { return typeof callback === 'function' ? callback() : undefined } let pendingWrites = 0 for (const docName in docs) { const doc = docs[docName] if (doc.committedVersion < doc.v) { pendingWrites++ // I'm hoping writeSnapshot will always happen in another thread. tryWriteSnapshot(docName, () => process.nextTick(function () { pendingWrites-- if (pendingWrites === 0) { return typeof callback === 'function' ? callback() : undefined } }) ) } } // If nothing was queued, terminate immediately. if (pendingWrites === 0) { return typeof callback === 'function' ? callback() : undefined } } // Close the database connection. This is needed so nodejs can shut down cleanly. this.closeDb = function () { __guardMethod__(db, 'close', o => o.close()) return (db = null) } } // Model inherits from EventEmitter. Model.prototype = new EventEmitter() function __guardMethod__(obj, methodName, transform) { if ( typeof obj !== 'undefined' && obj !== null && typeof obj[methodName] === 'function' ) { return transform(obj, methodName) } else { return undefined } }