added flow support for Connector.js

This commit is contained in:
Kevin Jahns 2015-11-30 12:26:02 +01:00
parent bd9c3813fd
commit da2762edf5
5 changed files with 82 additions and 27 deletions

2
.vscode/launch.json vendored
View File

@ -2,7 +2,7 @@
"version": "0.2.0", "version": "0.2.0",
"configurations": [ "configurations": [
{ {
"name": "Launch", "name": "Test",
"type": "node", "type": "node",
"request": "launch", "request": "launch",
"program": "node_modules/gulp/bin/gulp.js", "program": "node_modules/gulp/bin/gulp.js",

View File

@ -12,6 +12,7 @@ type Struct = {
struct: 'Insert' | 'Delete' struct: 'Insert' | 'Delete'
}*/ }*/
type Struct = Insertion | Deletion type Struct = Insertion | Deletion
type Operation = Struct
type Insertion = { type Insertion = {
id: Id, id: Id,
@ -23,4 +24,30 @@ type Insertion = {
type Deletion = { type Deletion = {
target: Id, target: Id,
struct: 'Delete' struct: 'Delete'
} }
type MessageSyncStep1 = {
type: 'sync step 1',
deleteSet: any,
stateSet: any
}
type MessageSyncStep2 = {
type: 'sync step 2',
os: Array<Operation>,
deleteSet: any,
stateSet: any
}
type MessageUpdate = {
type: 'update',
ops: Array<Operation>
}
type MessageSyncDone = {
type: 'sync done'
}
type Message = MessageSyncStep1 | MessageSyncStep2 | MessageUpdate | MessageSyncDone

View File

@ -4,9 +4,10 @@ type YGlobal = {
utils: Object; utils: Object;
Struct: Object; Struct: Object;
AbstractDatabase: any; AbstractDatabase: any;
AbstractConnector: any;
} }
type YInstance = { type YConfig = {
db: Object, db: Object,
connector: Object, connector: Object,
root: Object root: Object
@ -14,4 +15,6 @@ type YInstance = {
declare var YConcurrency_TestingMode : boolean declare var YConcurrency_TestingMode : boolean
type Transaction<A> = Generator<any, A, any> type Transaction<A> = Generator<any, A, any>
type SyncRole = 'master' | 'slave'

View File

@ -1,7 +1,25 @@
/* @flow */
'use strict' 'use strict'
module.exports = function (Y) { module.exports = function (Y/* :YGlobal */) {
class AbstractConnector { class AbstractConnector {
/* ::
y: YConfig;
role: SyncRole;
connections: Object;
isSynced: boolean;
userEventListeners: Array<Function>;
whenSyncedListeners: Array<Function>;
currentSyncTarget: ?UserId;
syncingClients: Array<any>;
forwardToSyncingClients: boolean;
debug: boolean;
broadcastedHB: boolean;
syncStep2: Promise;
userId: UserId;
send: Function;
broadcast: Function;
*/
/* /*
opts contains the following information: opts contains the following information:
role : String Role of this client ("master" or "slave") role : String Role of this client ("master" or "slave")
@ -119,10 +137,12 @@ module.exports = function (Y) {
var conn = this var conn = this
this.currentSyncTarget = syncUser this.currentSyncTarget = syncUser
this.y.db.requestTransaction(function *() { this.y.db.requestTransaction(function *() {
var stateSet = yield* this.getStateSet()
var deleteSet = yield* this.getDeleteSet()
conn.send(syncUser, { conn.send(syncUser, {
type: 'sync step 1', type: 'sync step 1',
stateSet: yield* this.getStateSet(), stateSet: stateSet,
deleteSet: yield* this.getDeleteSet() deleteSet: deleteSet
}) })
}) })
} else { } else {
@ -139,22 +159,23 @@ module.exports = function (Y) {
} }
send (uid, message) { send (uid, message) {
if (this.debug) { if (this.debug) {
console.log(`send ${this.userId} -> ${uid}: ${message.type}`, m) // eslint-disable-line console.log(`send ${this.userId} -> ${uid}: ${message.type}`, message) // eslint-disable-line
} }
} }
/* /*
You received a raw message, and you know that it is intended for Yjs. Then call this function. You received a raw message, and you know that it is intended for Yjs. Then call this function.
*/ */
receiveMessage (sender, m) { receiveMessage (sender/* :UserId */, message/* :Message */) {
if (sender === this.userId) { if (sender === this.userId) {
return return
} }
if (this.debug) { if (this.debug) {
console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, JSON.parse(JSON.stringify(m))) // eslint-disable-line console.log(`receive ${sender} -> ${this.userId}: ${message.type}`, JSON.parse(JSON.stringify(message))) // eslint-disable-line
} }
if (m.type === 'sync step 1') { if (message.type === 'sync step 1') {
// TODO: make transaction, stream the ops // TODO: make transaction, stream the ops
let conn = this let conn = this
let m = message
this.y.db.requestTransaction(function *() { this.y.db.requestTransaction(function *() {
var currentStateSet = yield* this.getStateSet() var currentStateSet = yield* this.getStateSet()
yield* this.applyDeleteSet(m.deleteSet) yield* this.applyDeleteSet(m.deleteSet)
@ -176,7 +197,7 @@ module.exports = function (Y) {
conn.send(sender, { conn.send(sender, {
type: 'sync done' type: 'sync done'
}) })
}, conn.syncingClientDuration) }, 5000) // TODO: conn.syncingClientDuration)
} else { } else {
conn.send(sender, { conn.send(sender, {
type: 'sync done' type: 'sync done'
@ -184,46 +205,50 @@ module.exports = function (Y) {
} }
conn._setSyncedWith(sender) conn._setSyncedWith(sender)
}) })
} else if (m.type === 'sync step 2') { } else if (message.type === 'sync step 2') {
let conn = this let conn = this
var broadcastHB = !this.broadcastedHB var broadcastHB = !this.broadcastedHB
this.broadcastedHB = true this.broadcastedHB = true
var db = this.y.db var db = this.y.db
var defer = Promise.defer() var defer = {}
defer.promise = new Promise(function (resolve) {
defer.resolve = resolve
})
this.syncStep2 = defer.promise this.syncStep2 = defer.promise
let m /* :MessageSyncStep2 */ = message
db.requestTransaction(function * () { db.requestTransaction(function * () {
yield* this.applyDeleteSet(m.deleteSet) yield* this.applyDeleteSet(m.deleteSet)
this.store.apply(m.os) this.store.apply(m.os)
db.requestTransaction(function * () { db.requestTransaction(function * () {
var ops = yield* this.getOperations(m.stateSet) var ops = yield* this.getOperations(m.stateSet)
if (ops.length > 0) { if (ops.length > 0) {
m = { var update /* :MessageUpdate */ = {
type: 'update', type: 'update',
ops: ops ops: ops
} }
if (!broadcastHB) { // TODO: consider to broadcast here.. if (!broadcastHB) { // TODO: consider to broadcast here..
conn.send(sender, m) conn.send(sender, update)
} else { } else {
// broadcast only once! // broadcast only once!
conn.broadcast(m) conn.broadcast(update)
} }
} }
defer.resolve() defer.resolve()
}) })
}) })
} else if (m.type === 'sync done') { } else if (message.type === 'sync done') {
var self = this var self = this
this.syncStep2.then(function () { this.syncStep2.then(function () {
self._setSyncedWith(sender) self._setSyncedWith(sender)
}) })
} else if (m.type === 'update') { } else if (message.type === 'update') {
if (this.forwardToSyncingClients) { if (this.forwardToSyncingClients) {
for (var client of this.syncingClients) { for (var client of this.syncingClients) {
this.send(client, m) this.send(client, message)
} }
} }
if (this.y.db.forwardAppliedOperations) { if (this.y.db.forwardAppliedOperations) {
var delops = m.ops.filter(function (o) { var delops = message.ops.filter(function (o) {
return o.struct === 'Delete' return o.struct === 'Delete'
}) })
if (delops.length > 0) { if (delops.length > 0) {
@ -233,7 +258,7 @@ module.exports = function (Y) {
}) })
} }
} }
this.y.db.apply(m.ops) this.y.db.apply(message.ops)
} }
} }
_setSyncedWith (user) { _setSyncedWith (user) {
@ -259,7 +284,7 @@ module.exports = function (Y) {
does not support primitive values as array elements does not support primitive values as array elements
expects an ltx (less than xml) object expects an ltx (less than xml) object
*/ */
parseMessageFromXml (m) { parseMessageFromXml (m/* :any */) {
function parseArray (node) { function parseArray (node) {
for (var n of node.children) { for (var n of node.children) {
if (n.getAttribute('isArray') === 'true') { if (n.getAttribute('isArray') === 'true') {
@ -269,7 +294,7 @@ module.exports = function (Y) {
} }
} }
} }
function parseObject (node) { function parseObject (node/* :any */) {
var json = {} var json = {}
for (var attrName in node.attrs) { for (var attrName in node.attrs) {
var value = node.attrs[attrName] var value = node.attrs[attrName]
@ -280,7 +305,7 @@ module.exports = function (Y) {
json[attrName] = int json[attrName] = int
} }
} }
for (var n in node.children) { for (var n/* :any */ in node.children) {
var name = n.name var name = n.name
if (n.getAttribute('isArray') === 'true') { if (n.getAttribute('isArray') === 'true') {
json[name] = parseArray(n) json[name] = parseArray(n)

View File

@ -1,7 +1,7 @@
/* @flow */ /* @flow */
'use strict' 'use strict'
module.exports = function (Y /* : YGlobal */) { module.exports = function (Y /* :YGlobal */) {
/* /*
Partial definition of an OperationStore. Partial definition of an OperationStore.
TODO: name it Database, operation store only holds operations. TODO: name it Database, operation store only holds operations.
@ -16,7 +16,7 @@ module.exports = function (Y /* : YGlobal */) {
*/ */
class AbstractDatabase { class AbstractDatabase {
/* :: /* ::
y: YInstance; y: YConfig;
forwardAppliedOperations: boolean; forwardAppliedOperations: boolean;
listenersById: Object; listenersById: Object;
listenersByIdExecuteNow: Array<Object>; listenersByIdExecuteNow: Array<Object>;