From da2762edf50664db2d69f22f66b6ad6aa8719638 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Mon, 30 Nov 2015 12:26:02 +0100 Subject: [PATCH] added flow support for Connector.js --- .vscode/launch.json | 2 +- declarations/Structs.js | 29 +++++++++++++++++- declarations/Y.js | 7 +++-- src/Connector.js | 67 ++++++++++++++++++++++++++++------------- src/Database.js | 4 +-- 5 files changed, 82 insertions(+), 27 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 833c43e9..f5f0d3e1 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -2,7 +2,7 @@ "version": "0.2.0", "configurations": [ { - "name": "Launch", + "name": "Test", "type": "node", "request": "launch", "program": "node_modules/gulp/bin/gulp.js", diff --git a/declarations/Structs.js b/declarations/Structs.js index 58001bbc..9855efeb 100644 --- a/declarations/Structs.js +++ b/declarations/Structs.js @@ -12,6 +12,7 @@ type Struct = { struct: 'Insert' | 'Delete' }*/ type Struct = Insertion | Deletion +type Operation = Struct type Insertion = { id: Id, @@ -23,4 +24,30 @@ type Insertion = { type Deletion = { target: Id, struct: 'Delete' -} \ No newline at end of file +} + + +type MessageSyncStep1 = { + type: 'sync step 1', + deleteSet: any, + stateSet: any +} + +type MessageSyncStep2 = { + type: 'sync step 2', + os: Array, + deleteSet: any, + stateSet: any +} + +type MessageUpdate = { + type: 'update', + ops: Array +} + +type MessageSyncDone = { + type: 'sync done' +} + +type Message = MessageSyncStep1 | MessageSyncStep2 | MessageUpdate | MessageSyncDone + diff --git a/declarations/Y.js b/declarations/Y.js index e287be99..7a4bd94f 100644 --- a/declarations/Y.js +++ b/declarations/Y.js @@ -4,9 +4,10 @@ type YGlobal = { utils: Object; Struct: Object; AbstractDatabase: any; + AbstractConnector: any; } -type YInstance = { +type YConfig = { db: Object, connector: Object, root: Object @@ -14,4 +15,6 @@ type YInstance = { declare var YConcurrency_TestingMode : boolean -type Transaction = Generator \ No newline at end of file +type Transaction = Generator + +type SyncRole = 'master' | 'slave' \ No newline at end of file diff --git a/src/Connector.js b/src/Connector.js index 66b6a9d8..a1bef56f 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -1,7 +1,25 @@ +/* @flow */ 'use strict' -module.exports = function (Y) { +module.exports = function (Y/* :YGlobal */) { class AbstractConnector { + /* :: + y: YConfig; + role: SyncRole; + connections: Object; + isSynced: boolean; + userEventListeners: Array; + whenSyncedListeners: Array; + currentSyncTarget: ?UserId; + syncingClients: Array; + forwardToSyncingClients: boolean; + debug: boolean; + broadcastedHB: boolean; + syncStep2: Promise; + userId: UserId; + send: Function; + broadcast: Function; + */ /* opts contains the following information: role : String Role of this client ("master" or "slave") @@ -119,10 +137,12 @@ module.exports = function (Y) { var conn = this this.currentSyncTarget = syncUser this.y.db.requestTransaction(function *() { + var stateSet = yield* this.getStateSet() + var deleteSet = yield* this.getDeleteSet() conn.send(syncUser, { type: 'sync step 1', - stateSet: yield* this.getStateSet(), - deleteSet: yield* this.getDeleteSet() + stateSet: stateSet, + deleteSet: deleteSet }) }) } else { @@ -139,22 +159,23 @@ module.exports = function (Y) { } send (uid, message) { if (this.debug) { - console.log(`send ${this.userId} -> ${uid}: ${message.type}`, m) // eslint-disable-line + console.log(`send ${this.userId} -> ${uid}: ${message.type}`, message) // eslint-disable-line } } /* You received a raw message, and you know that it is intended for Yjs. Then call this function. */ - receiveMessage (sender, m) { + receiveMessage (sender/* :UserId */, message/* :Message */) { if (sender === this.userId) { return } if (this.debug) { - console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, JSON.parse(JSON.stringify(m))) // eslint-disable-line + console.log(`receive ${sender} -> ${this.userId}: ${message.type}`, JSON.parse(JSON.stringify(message))) // eslint-disable-line } - if (m.type === 'sync step 1') { + if (message.type === 'sync step 1') { // TODO: make transaction, stream the ops let conn = this + let m = message this.y.db.requestTransaction(function *() { var currentStateSet = yield* this.getStateSet() yield* this.applyDeleteSet(m.deleteSet) @@ -176,7 +197,7 @@ module.exports = function (Y) { conn.send(sender, { type: 'sync done' }) - }, conn.syncingClientDuration) + }, 5000) // TODO: conn.syncingClientDuration) } else { conn.send(sender, { type: 'sync done' @@ -184,46 +205,50 @@ module.exports = function (Y) { } conn._setSyncedWith(sender) }) - } else if (m.type === 'sync step 2') { + } else if (message.type === 'sync step 2') { let conn = this var broadcastHB = !this.broadcastedHB this.broadcastedHB = true var db = this.y.db - var defer = Promise.defer() + var defer = {} + defer.promise = new Promise(function (resolve) { + defer.resolve = resolve + }) this.syncStep2 = defer.promise + let m /* :MessageSyncStep2 */ = message db.requestTransaction(function * () { yield* this.applyDeleteSet(m.deleteSet) this.store.apply(m.os) db.requestTransaction(function * () { var ops = yield* this.getOperations(m.stateSet) if (ops.length > 0) { - m = { + var update /* :MessageUpdate */ = { type: 'update', ops: ops } if (!broadcastHB) { // TODO: consider to broadcast here.. - conn.send(sender, m) + conn.send(sender, update) } else { // broadcast only once! - conn.broadcast(m) + conn.broadcast(update) } } defer.resolve() }) }) - } else if (m.type === 'sync done') { + } else if (message.type === 'sync done') { var self = this this.syncStep2.then(function () { self._setSyncedWith(sender) }) - } else if (m.type === 'update') { + } else if (message.type === 'update') { if (this.forwardToSyncingClients) { for (var client of this.syncingClients) { - this.send(client, m) + this.send(client, message) } } if (this.y.db.forwardAppliedOperations) { - var delops = m.ops.filter(function (o) { + var delops = message.ops.filter(function (o) { return o.struct === 'Delete' }) if (delops.length > 0) { @@ -233,7 +258,7 @@ module.exports = function (Y) { }) } } - this.y.db.apply(m.ops) + this.y.db.apply(message.ops) } } _setSyncedWith (user) { @@ -259,7 +284,7 @@ module.exports = function (Y) { does not support primitive values as array elements expects an ltx (less than xml) object */ - parseMessageFromXml (m) { + parseMessageFromXml (m/* :any */) { function parseArray (node) { for (var n of node.children) { if (n.getAttribute('isArray') === 'true') { @@ -269,7 +294,7 @@ module.exports = function (Y) { } } } - function parseObject (node) { + function parseObject (node/* :any */) { var json = {} for (var attrName in node.attrs) { var value = node.attrs[attrName] @@ -280,7 +305,7 @@ module.exports = function (Y) { json[attrName] = int } } - for (var n in node.children) { + for (var n/* :any */ in node.children) { var name = n.name if (n.getAttribute('isArray') === 'true') { json[name] = parseArray(n) diff --git a/src/Database.js b/src/Database.js index 0dbd427c..8c7a1c39 100644 --- a/src/Database.js +++ b/src/Database.js @@ -1,7 +1,7 @@ /* @flow */ 'use strict' -module.exports = function (Y /* : YGlobal */) { +module.exports = function (Y /* :YGlobal */) { /* Partial definition of an OperationStore. TODO: name it Database, operation store only holds operations. @@ -16,7 +16,7 @@ module.exports = function (Y /* : YGlobal */) { */ class AbstractDatabase { /* :: - y: YInstance; + y: YConfig; forwardAppliedOperations: boolean; listenersById: Object; listenersByIdExecuteNow: Array;