Map type works with simple update & sync. now going to implement support for syncing existing operation buffers

This commit is contained in:
Kevin Jahns 2015-06-30 15:44:14 +02:00
parent bffbb6ca27
commit b25977be06
9 changed files with 200 additions and 80 deletions

View File

@ -1,3 +1,4 @@
build/ build/
y.js y.js
y.js.map
interfaces/ interfaces/

View File

@ -22,6 +22,9 @@ class AbstractConnector { //eslint-disable-line no-unused-vars
this.userEventListeners = []; this.userEventListeners = [];
this.whenSyncedListeners = []; this.whenSyncedListeners = [];
this.currentSyncTarget = null; this.currentSyncTarget = null;
this.syncingClients = [];
this.forwardToSyncingClients = (opts.forwardToSyncingClients === false) ? false : true;
this.debug = opts.debug ? true : false;
} }
setUserId (userId) { setUserId (userId) {
this.userId = userId; this.userId = userId;
@ -61,6 +64,9 @@ class AbstractConnector { //eslint-disable-line no-unused-vars
role: role role: role
}); });
} }
if (this.currentSyncTarget == null) {
this.findNextSyncTarget();
}
} }
// Execute a function _when_ we are connected. // Execute a function _when_ we are connected.
// If not connected, wait until connected // If not connected, wait until connected
@ -78,7 +84,6 @@ class AbstractConnector { //eslint-disable-line no-unused-vars
throw new Error("The current sync has not finished!"); throw new Error("The current sync has not finished!");
} }
var syncUser = null; var syncUser = null;
for (var uid in this.connections) { for (var uid in this.connections) {
syncUser = this.connections[uid]; syncUser = this.connections[uid];
@ -88,7 +93,7 @@ class AbstractConnector { //eslint-disable-line no-unused-vars
} }
if (syncUser != null){ if (syncUser != null){
var conn = this; var conn = this;
this.y.os.requestTransaction(function*(){ this.y.db.requestTransaction(function*(){
conn.currentSyncTarget = uid; conn.currentSyncTarget = uid;
conn.send(uid, { conn.send(uid, {
type: "sync step 1", type: "sync step 1",
@ -107,11 +112,14 @@ class AbstractConnector { //eslint-disable-line no-unused-vars
return false; return false;
} }
// You received a raw message, and you know that it is intended for to Yjs. Then call this function. // You received a raw message, and you know that it is intended for to Yjs. Then call this function.
receiveMessage (sender, m) { receiveMessage (sender, m){
if (this.debug) {
console.log(`${sender} -> ${this.userId}: ${JSON.stringify(m)}`); //eslint-disable-line
}
if (m.type === "sync step 1") { if (m.type === "sync step 1") {
// TODO: make transaction, stream the ops // TODO: make transaction, stream the ops
let conn = this; let conn = this;
this.os.requestTransaction(function*(){ this.y.db.requestTransaction(function*(){
var ops = yield* this.getOperations(m.stateVector); var ops = yield* this.getOperations(m.stateVector);
var sv = yield* this.getStateVector(); var sv = yield* this.getStateVector();
conn.send(sender, { conn.send(sender, {
@ -119,33 +127,40 @@ class AbstractConnector { //eslint-disable-line no-unused-vars
os: ops, os: ops,
stateVector: sv stateVector: sv
}); });
conn.syncingClients.push(sender); if (this.forwardToSyncingClients) {
setTimeout(function(){ conn.syncingClients.push(sender);
conn.syncingClients = conn.syncingClients.filter(function(cli){ setTimeout(function(){
return cli !== sender; conn.syncingClients = conn.syncingClients.filter(function(cli){
}); return cli !== sender;
conn.send(sender, { });
type: "sync done" conn.send(sender, {
}); type: "sync done"
}, conn.syncingClientDuration); });
}, conn.syncingClientDuration);
}
}); });
} else if (m.type === "sync step 2") { } else if (m.type === "sync step 2") {
this.y.db.apply(m.os);
let conn = this; let conn = this;
this.os.requestTransaction(function*(){ this.y.db.requestTransaction(function*(){
var ops = yield* this.getOperations(m.stateVector); var ops = yield* this.getOperations(m.stateVector);
conn.broadcast({ if (ops.length > 0) {
type: "update", conn.broadcast({
ops: ops type: "update",
}); ops: ops
});
}
}); });
} else if (m.type === "sync done") { } else if (m.type === "sync done") {
this.connections[sender].isSynced = true; this.connections[sender].isSynced = true;
this.findNextSyncTarget(); this.findNextSyncTarget();
} else if (m.type === "update") { } else if (m.type === "update") {
for (var client of this.syncingClients) { if (this.forwardToSyncingClients) {
this.send(client, m); for (var client of this.syncingClients) {
this.send(client, m);
}
} }
this.os.apply(m.ops); this.y.db.apply(m.ops);
} }
} }
// Currently, the HB encodes operations as JSON. For the moment I want to keep it // Currently, the HB encodes operations as JSON. For the moment I want to keep it

View File

@ -15,7 +15,7 @@ function getRandom (o) {
var globalRoom = { var globalRoom = {
users: {}, users: {},
buffers: {}, buffers: {},
removeUser: function(user){ removeUser: function(user : AbstractConnector){
for (var i in this.users) { for (var i in this.users) {
this.users[i].userLeft(user); this.users[i].userLeft(user);
@ -24,15 +24,18 @@ var globalRoom = {
delete this.buffers[user]; delete this.buffers[user];
}, },
addUser: function(connector){ addUser: function(connector){
for (var u of this.users) {
u.userJoined(connector.userId);
}
this.users[connector.userId] = connector; this.users[connector.userId] = connector;
this.buffers[connector.userId] = []; this.buffers[connector.userId] = [];
for (var uname in this.users) {
if (uname !== connector.userId) {
var u = this.users[uname];
u.userJoined(connector.userId, "master");
connector.userJoined(u.userId, "master");
}
}
} }
}; };
function flushOne(){
setInterval(function(){
var bufs = []; var bufs = [];
for (var i in globalRoom.buffers) { for (var i in globalRoom.buffers) {
if (globalRoom.buffers[i].length > 0) { if (globalRoom.buffers[i].length > 0) {
@ -41,11 +44,15 @@ setInterval(function(){
} }
if (bufs.length > 0) { if (bufs.length > 0) {
var userId = getRandom(bufs); var userId = getRandom(bufs);
var m = globalRoom.buffers[userId]; var m = globalRoom.buffers[userId].shift();
var user = globalRoom.users[userId]; var user = globalRoom.users[userId];
user.receiveMessage(m); user.receiveMessage(m[0], m[1]);
return true;
} else {
return false;
} }
}, 10); }
setInterval(flushOne, 10);
var userIdCounter = 0; var userIdCounter = 0;
@ -54,23 +61,30 @@ class Test extends AbstractConnector {
if(options === undefined){ if(options === undefined){
throw new Error("Options must not be undefined!"); throw new Error("Options must not be undefined!");
} }
super(y, { options.role = "master";
role: "master" options.forwardToSyncingClients = false;
}); super(y, options);
this.setUserId((userIdCounter++) + ""); this.setUserId((userIdCounter++) + "");
globalRoom.addUser(this);
this.globalRoom = globalRoom;
} }
send (uid, message) { send (userId, message) {
globalRoom.buffers[uid].push(message); globalRoom.buffers[userId].push([this.userId, message]);
} }
broadcast (message) { broadcast (message) {
for (var buf of globalRoom.buffers) { for (var key in globalRoom.buffers) {
buf.push(message); globalRoom.buffers[key].push([this.userId, message]);
} }
} }
disconnect () { disconnect () {
globalRoom.removeUser(this.userId); globalRoom.removeUser(this.userId);
} }
flushAll () {
var c = true;
while (c) {
c = flushOne();
}
}
} }
Y.Test = Test; Y.Test = Test;

View File

@ -53,9 +53,10 @@ class AbstractOperationStore { //eslint-disable-line no-unused-vars
this.userId = userId; this.userId = userId;
} }
apply (ops) { apply (ops) {
for (var o of ops) { for (var key in ops) {
var required = Y.Struct[o.type].requiredOps(o); var o = ops[key];
this.whenOperationsExist(required, Y.Struct[o.type].execute, o); var required = Y.Struct[o.struct].requiredOps(o);
this.whenOperationsExist(required, Y.Struct[o.struct].execute, o);
} }
} }
// f is called as soon as every operation requested is available. // f is called as soon as every operation requested is available.
@ -68,7 +69,8 @@ class AbstractOperationStore { //eslint-disable-line no-unused-vars
missing: ids.length missing: ids.length
}; };
for (let id of ids) { for (let key in ids) {
let id = ids[key];
let sid = JSON.stringify(id); let sid = JSON.stringify(id);
let l = this.listenersById[sid]; let l = this.listenersById[sid];
if (l == null){ if (l == null){
@ -100,8 +102,9 @@ class AbstractOperationStore { //eslint-disable-line no-unused-vars
store.listenersByIdRequestPending = false; store.listenersByIdRequestPending = false;
for (let listener of exeNow) { for (let key in exeNow) {
yield* listener.f.apply(this, listener.args); let listener = exeNow[key];
yield* listener.f.call(this, listener.args);
} }
for (var sid in ls){ for (var sid in ls){
@ -110,9 +113,10 @@ class AbstractOperationStore { //eslint-disable-line no-unused-vars
if ((yield* this.getOperation(id)) == null){ if ((yield* this.getOperation(id)) == null){
store.listenersById[sid] = l; store.listenersById[sid] = l;
} else { } else {
for (let listener of l) { for (let key in l) {
let listener = l[key];
if (--listener.missing === 0){ if (--listener.missing === 0){
yield* listener.f.apply(this, listener.args); yield* listener.f.call(this, listener.args);
} }
} }
} }
@ -122,9 +126,10 @@ class AbstractOperationStore { //eslint-disable-line no-unused-vars
// called by a transaction when an operation is added // called by a transaction when an operation is added
operationAdded (op) { operationAdded (op) {
// notify whenOperation listeners (by id) // notify whenOperation listeners (by id)
var l = this.listenersById[op.id]; var l = this.listenersById[JSON.stringify(op.id)];
if (l != null) { if (l != null) {
for (var listener of l){ for (var key in l){
var listener = l[key];
if (--listener.missing === 0){ if (--listener.missing === 0){
this.whenOperationsExist([], listener.f, listener.args); this.whenOperationsExist([], listener.f, listener.args);
} }
@ -152,7 +157,7 @@ class AbstractOperationStore { //eslint-disable-line no-unused-vars
store.parentListenersActivated = {}; store.parentListenersActivated = {};
for (var parentId in activatedOperations){ for (var parentId in activatedOperations){
var parent = yield* this.getOperation(parentId); var parent = yield* this.getOperation(parentId);
Struct[parent.type].notifyObservers(activatedOperations[parentId]); Struct[parent.struct].notifyObservers(activatedOperations[parentId]);
} }
}); });

View File

@ -66,6 +66,9 @@ Y.Memory = (function(){ //eslint-disable-line no-unused-vars
var endSV : StateVector = yield* this.getStateVector(); var endSV : StateVector = yield* this.getStateVector();
for (var endState of endSV) { for (var endState of endSV) {
var user = endState.user; var user = endState.user;
if (user === "_") {
continue;
}
var startPos = startSS[user] || 0; var startPos = startSS[user] || 0;
var endPos = endState.clock; var endPos = endState.clock;

View File

@ -40,7 +40,11 @@ var Struct = {
var user = this.store.y.connector.userId; var user = this.store.y.connector.userId;
var state = yield* this.getState(user); var state = yield* this.getState(user);
op.id = [user, state.clock]; op.id = [user, state.clock];
return yield* this.addOperation(op); yield* this.addOperation(op);
this.store.y.connector.broadcast({
type: "update",
ops: [op]
});
} }
}, },
Insert: { Insert: {
@ -91,21 +95,25 @@ var Struct = {
} }
return op; return op;
}, },
requiredOps: function(op, ids){ requiredOps: function(op){
var ids = [];
if(op.left != null){ if(op.left != null){
ids.push(op.left); ids.push(op.left);
} }
if(op.right != null){ if(op.right != null){
ids.push(op.right); ids.push(op.right);
} }
if(op.right == null && op.left == null) {
ids.push(op.parent);
}
return ids; return ids;
}, },
getDistanceToOrigin: function *(op){ getDistanceToOrigin: function *(op){
var d = 0; var d = 0;
var o = yield this.getOperation(op.left); var o = yield* this.getOperation(op.left);
while (op.origin !== (o ? o.id : null)) { while (op.origin !== (o ? o.id : null)) {
d++; d++;
o = yield this.getOperation(o.left); o = yield* this.getOperation(o.left);
} }
return d; return d;
}, },
@ -125,13 +133,50 @@ var Struct = {
# $this insert_position is to the left of $o (forever!) # $this insert_position is to the left of $o (forever!)
*/ */
execute: function*(op){ execute: function*(op){
var distanceToOrigin = yield* Struct.Insert.getDistanceToOrigin(op); // most cases: 0 (starts from 0) var distanceToOrigin = yield* Struct.Insert.getDistanceToOrigin.call(this, op); // most cases: 0 (starts from 0)
var i = distanceToOrigin; // loop counter var i = distanceToOrigin; // loop counter
var o = yield* this.getOperation(this.left); var o, tmp;
o = yield* this.getOperation(o.right); if (op.right == null && op.left == null) {
var tmp; var p = yield* this.getOperation(op.parent);
if (op.parentSub != null) {
tmp = p.map[op.parentSub];
if (!compareIds(tmp, op.id)) {
op.right = tmp;
}
if (op.right == null) {
// this is the first ins in parent
p.map[op.parentSub] = op.id;
yield* this.setOperation(p);
yield* this.setOperation(op);
return;
}
} else {
tmp = p.start;
if (!compareIds(tmp, op.id)) {
op.left = tmp;
}
if (op.left == null) {
// this is the first ins in parent
p.start = op.id;
p.end = op.id;
yield* this.setOperation(p);
yield* this.setOperation(op);
return;
}
}
}
if (op.left != null) {
o = yield* this.getOperation(op.left);
o = yield* this.getOperation(o.right);
} else if (op.right != null) {
o = yield* this.getOperation(op.right);
while (o.left != null){
o = yield* this.getOperation(o.left);
}
}
while (true) { while (true) {
if (o.id !== this.right){ if (o.id !== op.right){
if (Struct.Insert.getDistanceToOrigin(o) === i) { if (Struct.Insert.getDistanceToOrigin(o) === i) {
// case 1 // case 1
if (o.id[0] < op.id[0]) { if (o.id[0] < op.id[0]) {
@ -158,21 +203,19 @@ var Struct = {
var left = null; var left = null;
var right = null; var right = null;
if (op.left != null) { if (op.left != null) {
left = this.getOperation(op.left); left = yield* this.getOperation(op.left);
left.right = op.id; left.right = op.id;
yield* this.setOperation(left); yield* this.setOperation(left);
} }
if (op.right != null) { if (op.right != null) {
right = this.getOperation(op.right); right = yield* this.getOperation(op.right);
right.left = op.id; right.left = op.id;
yield* this.setOperation(right); yield* this.setOperation(right);
} }
op.left = left;
op.right = right;
yield* this.setOperation(op); yield* this.setOperation(op);
// notify parent // notify parent
var parent = this.getOperation(op.parent); var parent = yield* this.getOperation(op.parent);
if (op.parentSub != null) { if (op.parentSub != null) {
if (right == null) { if (right == null) {
parent.map[op.parentSub] = op.id; parent.map[op.parentSub] = op.id;
@ -189,6 +232,7 @@ var Struct = {
yield* this.setOperation(parent); yield* this.setOperation(parent);
} }
} }
yield* this.setOperation(op);
} }
}, },
List: { List: {
@ -198,7 +242,8 @@ var Struct = {
op.struct = "List"; op.struct = "List";
return yield* Struct.Operation.create.call(this, op); return yield* Struct.Operation.create.call(this, op);
}, },
requiredOps: function(op, ids){ requiredOps: function(op){
var ids = [];
if (op.start != null) { if (op.start != null) {
ids.push(op.start); ids.push(op.start);
} }
@ -207,8 +252,8 @@ var Struct = {
} }
return ids; return ids;
}, },
execute: function* () { execute: function* (op) {
// nop yield* this.setOperation(op);
}, },
ref: function* (op : Op, pos : number) : Insert { ref: function* (op : Op, pos : number) : Insert {
var o = op.start; var o = op.start;
@ -231,11 +276,11 @@ var Struct = {
insert: function* (op, pos : number, contents : Array<any>) { insert: function* (op, pos : number, contents : Array<any>) {
var o = yield* Struct.List.ref.call(this, op, pos); var o = yield* Struct.List.ref.call(this, op, pos);
var or = yield* this.getOperation(o.right); var or = yield* this.getOperation(o.right);
for (var content of contents) { for (var key in contents) {
var insert = { var insert = {
left: o, left: o,
right: or, right: or,
content: content, content: contents[key],
parent: op parent: op
}; };
o = yield* Struct.Insert.create.call(this, insert); o = yield* Struct.Insert.create.call(this, insert);
@ -253,17 +298,19 @@ var Struct = {
op.struct = "Map"; op.struct = "Map";
return yield* Struct.Operation.create.call(this, op); return yield* Struct.Operation.create.call(this, op);
}, },
requiredOps: function(op, ids){ requiredOps: function(op){
for (var end of op.map) { var ids = [];
ids.push(end); for (var end in op.map) {
ids.push(op.map[end]);
} }
return ids; return ids;
}, },
execute: function* () { execute: function* (op) {
// nop yield* this.setOperation(op);
}, },
get: function* (op, name) { get: function* (op, name) {
return (yield* this.getOperation(op.map[name])).content; var res = yield* this.getOperation(op.map[name]);
return (res != null) ? res.content : void 0;
}, },
set: function* (op, name, value) { set: function* (op, name, value) {
var end = op.map[name]; var end = op.map[name];

View File

@ -11,7 +11,8 @@ describe("Yjs (basic)", function(){
name: "Memory" name: "Memory"
}, },
connector: { connector: {
name: "Test" name: "Test",
debug: true
} }
})); }));
} }
@ -29,12 +30,45 @@ describe("Yjs (basic)", function(){
done(); done();
}); });
}); });
it("Basic get&set of Map property", function(done){ it("Basic get&set of Map property (converge via sync)", function(done){
var y = this.users[0]; var y = this.users[0];
y.transact(function*(){ y.transact(function*(){
yield* y.root.val("stuff", "stuffy"); yield* y.root.val("stuff", "stuffy");
expect(yield* y.root.val("stuff")).toEqual("stuffy"); expect(yield* y.root.val("stuff")).toEqual("stuffy");
done();
}); });
y.connector.flushAll();
function getTransaction(yy){
return function*(){
expect(yield* yy.root.val("stuff")).toEqual("stuffy");
};
}
for (var key in this.users) {
var u = this.users[key];
u.transact(getTransaction(u));
}
done();
});
it("Basic get&set of Map property (converge via update)", function(done){
var y = this.users[0];
y.connector.flushAll();
y.transact(function*(){
yield* y.root.val("stuff", "stuffy");
expect(yield* y.root.val("stuff")).toEqual("stuffy");
});
function getTransaction(yy){
return function*(){
expect(yield* yy.root.val("stuff")).toEqual("stuffy");
};
}
y.connector.flushAll();
for (var key in this.users) {
var u = this.users[key];
u.transact(getTransaction(u));
}
done();
}); });
}); });

5
y.js

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long