diff --git a/lib/realtime.js b/lib/realtime.js index 5d769e7d9..b50e05b29 100644 --- a/lib/realtime.js +++ b/lib/realtime.js @@ -9,7 +9,6 @@ var randomcolor = require("randomcolor"); var Chance = require('chance'), chance = new Chance(); var moment = require('moment'); -var childProcess = require('child_process'); //core var config = require("./config.js"); @@ -20,9 +19,6 @@ var models = require("./models"); //ot var ot = require("./ot/index.js"); -// workers -var noteUpdater = require("./workers/noteUpdater"); - //public var realtime = { io: null, @@ -83,62 +79,97 @@ function emitCheck(note) { var users = {}; var notes = {}; //update when the note is dirty -var updaterIsBusy = false; var updater = setInterval(function () { - if (updaterIsBusy) return; - var _notes = {}; - Object.keys(notes).forEach(function (key) { + async.each(Object.keys(notes), function (key, callback) { var note = notes[key]; - if (!note.server || !note.server.isDirty) return; - _notes[key] = { - id: note.id, - lastchangeuser: note.lastchangeuser, - authorship: note.authorship, - document: note.server.document - }; - note.server.isDirty = false; - }); - if (Object.keys(_notes).length <= 0) return; - updaterIsBusy = true; - var worker = childProcess.fork("./lib/workers/noteUpdater.js"); - if (config.debug) logger.info('note updater worker process started'); - worker.send({ - msg: 'update note', - notes: _notes - }); - worker.on('message', function (data) { - if (!data || !data.msg || !data.note) return; - var note = notes[data.note.id]; - if (!note) return; - switch(data.msg) { - case 'error': - for (var i = 0, l = note.socks.length; i < l; i++) { - var sock = note.socks[i]; - if (typeof sock !== 'undefined' && sock) { - setTimeout(function () { - sock.disconnect(true); - }, 0); - } + if (note.server.isDirty) { + if (config.debug) logger.info("updater found dirty note: " + key); + updateNote(note, function(err, _note) { + // handle when note already been clean up + if (!notes[key] || !notes[key].server) return callback(null, null); + if (!_note) { + realtime.io.to(note.id).emit('info', { + code: 404 + }); + logger.error('note not found: ', note.id); } - break; - case 'note not found': - realtime.io.to(note.id).emit('info', { - code: 404 - }); - break; - case 'check': - note.lastchangeuserprofile = data.note.lastchangeuserprofile; - note.updatetime = data.note.updatetime; - saverSleep = false; + if (err || !_note) { + for (var i = 0, l = note.socks.length; i < l; i++) { + var sock = note.socks[i]; + if (typeof sock !== 'undefined' && sock) { + setTimeout(function () { + sock.disconnect(true); + }, 0); + } + } + return callback(err, null); + } + note.server.isDirty = false; + note.updatetime = moment(_note.lastchangeAt).valueOf(); emitCheck(note); - break; + return callback(null, null); + }); + } else { + return callback(null, null); } - }); - worker.on('close', function (code) { - updaterIsBusy = false; - if (config.debug) logger.info('note updater worker process exited with code ' + code); + }, function (err) { + if (err) return logger.error('updater error', err); }); }, 1000); +function updateNote(note, callback) { + models.Note.findOne({ + where: { + id: note.id + } + }).then(function (_note) { + if (!_note) return callback(null, null); + if (note.lastchangeuser) { + if (_note.lastchangeuserId != note.lastchangeuser) { + models.User.findOne({ + where: { + id: note.lastchangeuser + } + }).then(function (user) { + if (!user) return callback(null, null); + note.lastchangeuserprofile = models.User.parseProfile(user.profile); + return finishUpdateNote(note, _note, callback); + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); + } else { + return finishUpdateNote(note, _note, callback); + } + } else { + note.lastchangeuserprofile = null; + return finishUpdateNote(note, _note, callback); + } + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); +} +function finishUpdateNote(note, _note, callback) { + if (!note || !note.server) return callback(null, null); + var body = note.server.document; + var title = note.title = models.Note.parseNoteTitle(body); + title = LZString.compressToBase64(title); + body = LZString.compressToBase64(body); + var values = { + title: title, + content: body, + authorship: LZString.compressToBase64(JSON.stringify(note.authorship)), + lastchangeuserId: note.lastchangeuser, + lastchangeAt: Date.now() + }; + _note.update(values).then(function (_note) { + saverSleep = false; + return callback(null, _note); + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); +} //clean when user not in any rooms or user not in connected list var cleaner = setInterval(function () { async.each(Object.keys(users), function (key, callback) { @@ -161,28 +192,16 @@ var cleaner = setInterval(function () { }); }, 60000); var saverSleep = false; -var saverIsBusy = false; // save note revision in interval var saver = setInterval(function () { - if (saverSleep || saverIsBusy) return; - saverIsBusy = true; - var worker = childProcess.fork("./lib/workers/noteRevisionSaver.js"); - if (config.debug) logger.info('note revision saver worker process started'); - worker.send({ - msg: 'save note revision' - }); - worker.on('message', function (data) { - if (!data || !data.msg) return; - switch(data.msg) { - case 'empty': - saverSleep = true; - break; + if (saverSleep) return; + models.Revision.saveAllNotesRevision(function (err, notes) { + if (err) return logger.error('revision saver failed: ' + err); + if (notes && notes.length <= 0) { + saverSleep = true; + return; } }); - worker.on('close', function (code) { - saverIsBusy = false; - if (config.debug) logger.info('note revision saver worker process exited with code ' + code); - }); }, 60000 * 5); function getStatus(callback) { @@ -524,7 +543,7 @@ function disconnect(socket) { // remove note in notes if no user inside if (Object.keys(note.users).length <= 0) { if (note.server.isDirty) { - noteUpdater.updateNote(note, function (err, _note) { + updateNote(note, function (err, _note) { if (err) return logger.error('disconnect note failed: ' + err); // clear server before delete to avoid memory leaks note.server.document = ""; diff --git a/lib/workers/noteRevisionSaver.js b/lib/workers/noteRevisionSaver.js deleted file mode 100644 index b6b117a31..000000000 --- a/lib/workers/noteRevisionSaver.js +++ /dev/null @@ -1,19 +0,0 @@ -// core -var logger = require("../logger.js"); -var models = require("../models"); - -process.on('message', function (data) { - if (!data || !data.msg || data.msg !== 'save note revision') return process.exit(); - models.Revision.saveAllNotesRevision(function (err, notes) { - if (err) { - logger.error('note revision saver failed: ' + err); - return process.exit(); - } - if (notes && notes.length <= 0) { - process.send({ - msg: 'empty' - }); - } - process.exit(); - }); -}); \ No newline at end of file diff --git a/lib/workers/noteUpdater.js b/lib/workers/noteUpdater.js deleted file mode 100644 index 3fc4b1ebf..000000000 --- a/lib/workers/noteUpdater.js +++ /dev/null @@ -1,101 +0,0 @@ -// external modules -var async = require('async'); -var moment = require('moment'); -var LZString = require('lz-string'); - -// core -var config = require("../config.js"); -var logger = require("../logger.js"); -var models = require("../models"); - -process.on('message', function (data) { - if (!data || !data.msg || data.msg !== 'update note' || !data.notes) return process.exit(); - var notes = data.notes; - async.each(Object.keys(notes), function (key, callback) { - var note = notes[key]; - if (config.debug) logger.info("note updater found dirty note: " + key); - updateNote(note, function(err, _note) { - if (!_note) { - process.send({ - msg: 'note not found', - note: note - }); - logger.error('note not found: ', note.id); - } - if (err || !_note) { - process.send({ - msg: 'error', - note: note - }); - return callback(err, null); - } - note.updatetime = moment(_note.lastchangeAt).valueOf(); - process.send({ - msg: 'check', - note: note - }); - return callback(null, null); - }); - }, function (err) { - if (err) logger.error('note updater error', err); - process.exit(); - }); -}); - -function updateNote(note, callback) { - models.Note.findOne({ - where: { - id: note.id - } - }).then(function (_note) { - if (!_note) return callback(null, null); - if (note.lastchangeuser) { - if (_note.lastchangeuserId != note.lastchangeuser) { - models.User.findOne({ - where: { - id: note.lastchangeuser - } - }).then(function (user) { - if (!user) return callback(null, null); - note.lastchangeuserprofile = models.User.parseProfile(user.profile); - return finishUpdateNote(note, _note, callback); - }).catch(function (err) { - logger.error(err); - return callback(err, null); - }); - } else { - return finishUpdateNote(note, _note, callback); - } - } else { - note.lastchangeuserprofile = null; - return finishUpdateNote(note, _note, callback); - } - }).catch(function (err) { - logger.error(err); - return callback(err, null); - }); -} - -function finishUpdateNote(note, _note, callback) { - var body = note.document; - var title = note.title = models.Note.parseNoteTitle(body); - title = LZString.compressToBase64(title); - body = LZString.compressToBase64(body); - var values = { - title: title, - content: body, - authorship: LZString.compressToBase64(JSON.stringify(note.authorship)), - lastchangeuserId: note.lastchangeuser, - lastchangeAt: Date.now() - }; - _note.update(values).then(function (_note) { - return callback(null, _note); - }).catch(function (err) { - logger.error(err); - return callback(err, null); - }); -} - -module.exports = { - updateNote: updateNote -}; \ No newline at end of file