save state in FilePersistence
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import BinaryEncoder from '../../Util/Binary/Encoder.mjs'
|
||||
/* global WebSocket */
|
||||
import NamedEventHandler from '../../Util/NamedEventHandler.mjs'
|
||||
import decodeMessage, { messageSS, messageSubscribe, messageStructs, messageGetSS } from './decodeMessage.mjs'
|
||||
import decodeMessage, { messageSS, messageSubscribe, messageStructs } from './decodeMessage.mjs'
|
||||
import { createMutualExclude } from '../../Util/mutualExclude.mjs'
|
||||
|
||||
export const STATE_CONNECTING = 0
|
||||
@@ -61,7 +61,6 @@ export default class WebsocketsConnector extends NamedEventHandler {
|
||||
const encoder = new BinaryEncoder()
|
||||
for (const [roomName, room] of this._rooms) {
|
||||
const y = room.y
|
||||
messageGetSS(roomName, y, encoder)
|
||||
messageSS(roomName, y, encoder)
|
||||
messageSubscribe(roomName, y, encoder)
|
||||
}
|
||||
@@ -69,7 +68,7 @@ export default class WebsocketsConnector extends NamedEventHandler {
|
||||
}
|
||||
|
||||
send (encoder) {
|
||||
if (encoder.length > 0) {
|
||||
if (encoder.length > 0 && this._socket.readyState === WebSocket.OPEN) {
|
||||
this._socket.send(encoder.createBuffer())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ export function messageStructs (roomName, y, encoder, structsBinaryEncoder) {
|
||||
* @param {*} message The binary encoded message
|
||||
* @param {*} ws The connection object
|
||||
*/
|
||||
export default function decodeMessage (connector, message, ws) {
|
||||
export default function decodeMessage (connector, message, ws, isServer = false) {
|
||||
const decoder = new BinaryDecoder(message)
|
||||
const encoder = new BinaryEncoder()
|
||||
while (decoder.hasContent()) {
|
||||
@@ -77,13 +77,25 @@ export default function decodeMessage (connector, message, ws) {
|
||||
messageSS(roomName, y, encoder)
|
||||
break
|
||||
case CONTENT_SUBSCRIBE:
|
||||
room.connections.add(ws)
|
||||
connector.subscribe(roomName, ws)
|
||||
break
|
||||
case CONTENT_SS:
|
||||
// received state set
|
||||
// reply with missing content
|
||||
const ss = readStateSet(decoder)
|
||||
messageStructsDSS(roomName, y, encoder, ss)
|
||||
const sendStructsDSS = () => {
|
||||
const encoder = new BinaryEncoder()
|
||||
messageStructsDSS(roomName, y, encoder, ss)
|
||||
if (isServer) {
|
||||
messageSS(roomName, y, encoder)
|
||||
}
|
||||
connector.send(encoder, ws)
|
||||
}
|
||||
if (room.persistenceLoaded !== undefined) {
|
||||
room.persistenceLoaded.then(sendStructsDSS)
|
||||
} else {
|
||||
sendStructsDSS()
|
||||
}
|
||||
break
|
||||
case CONTENT_STRUCTS_DSS:
|
||||
connector._mutualExclude(() => {
|
||||
|
||||
@@ -2,9 +2,10 @@ import Y from '../../Y.mjs'
|
||||
import uws from 'uws'
|
||||
import BinaryEncoder from '../../Util/Binary/Encoder.mjs'
|
||||
import decodeMessage, { messageStructs } from './decodeMessage.mjs'
|
||||
import FilePersistence from '../../Persistences/FilePersistence.mjs'
|
||||
|
||||
const WebsocketsServer = uws.Server
|
||||
|
||||
const persistence = new FilePersistence('.yjsPersisted')
|
||||
/**
|
||||
* Maps from room-name to ..
|
||||
* {
|
||||
@@ -17,7 +18,11 @@ const rooms = new Map()
|
||||
* Maps from ws-connection to Set<roomName> - the set of connected roomNames
|
||||
*/
|
||||
const connections = new Map()
|
||||
const wss = new WebsocketsServer({ port: 1234 })
|
||||
const port = process.env.PORT || 1234
|
||||
const wss = new WebsocketsServer({
|
||||
port,
|
||||
perMessageDeflate: {}
|
||||
})
|
||||
|
||||
/**
|
||||
* Set of room names that are scheduled to be sweeped (destroyed because they don't have a connection anymore)
|
||||
@@ -28,14 +33,23 @@ setInterval(function sweepRoomes () {
|
||||
const room = rooms.get(roomName)
|
||||
if (room !== undefined) {
|
||||
if (room.connections.size === 0) {
|
||||
room.y.destroy()
|
||||
persistence.saveState(roomName, room.y).then(() => {
|
||||
if (room.connections.size === 0) {
|
||||
room.y.destroy()
|
||||
rooms.delete(roomName)
|
||||
}
|
||||
})
|
||||
}
|
||||
rooms.delete(roomName)
|
||||
}
|
||||
})
|
||||
scheduledSweeps.clear()
|
||||
}, 5000)
|
||||
|
||||
const wsConnector = {
|
||||
send: (encoder, ws) => {
|
||||
const message = encoder.createBuffer()
|
||||
ws.send(message, null, null, true)
|
||||
},
|
||||
_mutualExclude: f => { f() },
|
||||
subscribe: function subscribe (roomName, ws) {
|
||||
let roomNames = connections.get(ws)
|
||||
@@ -44,26 +58,35 @@ const wsConnector = {
|
||||
connections.set(ws, roomNames)
|
||||
}
|
||||
roomNames.add(roomName)
|
||||
const room = this.getRoom(roomName)
|
||||
room.connections.add(ws)
|
||||
},
|
||||
getRoom: function getRoom (roomName) {
|
||||
let room = rooms.get(roomName)
|
||||
if (room === undefined) {
|
||||
const y = new Y(roomName, null, null, { gc: true })
|
||||
const persistenceLoaded = persistence.readState(roomName, y)
|
||||
y.on('afterTransaction', (y, transaction) => {
|
||||
if (transaction.encodedStructsLen > 0) {
|
||||
const encoder = new BinaryEncoder()
|
||||
messageStructs(roomName, y, encoder, transaction.encodedStructs)
|
||||
const message = encoder.createBuffer()
|
||||
// when changed, broakcast update to all connections
|
||||
room.connections.forEach(conn => {
|
||||
conn.send(message)
|
||||
// save to persistence
|
||||
persistence.saveUpdate(roomName, y, transaction.encodedStructs)
|
||||
// forward update to clients
|
||||
persistence._mutex(() => { // do not broadcast if persistence.readState is called
|
||||
const encoder = new BinaryEncoder()
|
||||
messageStructs(roomName, y, encoder, transaction.encodedStructs)
|
||||
const message = encoder.createBuffer()
|
||||
// when changed, broakcast update to all connections
|
||||
room.connections.forEach(conn => {
|
||||
conn.send(message, null, null, true)
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
room = {
|
||||
name: roomName,
|
||||
connections: new Set(),
|
||||
y
|
||||
y,
|
||||
persistenceLoaded
|
||||
}
|
||||
rooms.set(roomName, room)
|
||||
}
|
||||
@@ -74,9 +97,9 @@ const wsConnector = {
|
||||
wss.on('connection', (ws) => {
|
||||
ws.on('message', function onWSMessage (message) {
|
||||
if (message.byteLength > 0) {
|
||||
const reply = decodeMessage(wsConnector, message, ws)
|
||||
const reply = decodeMessage(wsConnector, message, ws, true)
|
||||
if (reply.length > 0) {
|
||||
ws.send(reply.createBuffer())
|
||||
ws.send(reply.createBuffer(), null, null, true)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
@@ -1 +1,67 @@
|
||||
import fs from 'fs'
|
||||
import path from 'path'
|
||||
import BinaryDecoder from '../Util/Binary/Decoder.mjs'
|
||||
import BinaryEncoder from '../Util/Binary/Encoder.mjs'
|
||||
import { createMutualExclude } from '../Util/mutualExclude.mjs'
|
||||
import { encodeUpdate, encodeStructsDS, decodePersisted } from './decodePersisted.mjs'
|
||||
|
||||
function createFilePath (persistence, roomName) {
|
||||
return path.join(persistence.dir, roomName)
|
||||
}
|
||||
|
||||
export default class FilePersistence {
|
||||
constructor (dir) {
|
||||
this.dir = dir
|
||||
this._mutex = createMutualExclude()
|
||||
}
|
||||
saveUpdate (room, y, encodedStructs) {
|
||||
return new Promise((resolve, reject) => {
|
||||
this._mutex(() => {
|
||||
const filePath = createFilePath(this, room)
|
||||
const updateMessage = new BinaryEncoder()
|
||||
encodeUpdate(y, encodedStructs, updateMessage)
|
||||
fs.appendFile(filePath, Buffer.from(updateMessage.createBuffer()), (err) => {
|
||||
if (err !== null) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
}, resolve)
|
||||
})
|
||||
}
|
||||
saveState (roomName, y) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const encoder = new BinaryEncoder()
|
||||
encodeStructsDS(y, encoder)
|
||||
const filePath = createFilePath(this, roomName)
|
||||
fs.writeFile(filePath, Buffer.from(encoder.createBuffer()), (err) => {
|
||||
if (err !== null) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
readState (roomName, y) {
|
||||
// Check if the file exists in the current directory.
|
||||
return new Promise((resolve, reject) => {
|
||||
const filePath = path.join(this.dir, roomName)
|
||||
fs.readFile(filePath, (err, data) => {
|
||||
if (err !== null) {
|
||||
resolve()
|
||||
// reject(err)
|
||||
} else {
|
||||
this._mutex(() => {
|
||||
console.info(`unpacking data (${data.length})`)
|
||||
console.time('unpacking')
|
||||
decodePersisted(y, new BinaryDecoder(data))
|
||||
console.timeEnd('unpacking')
|
||||
})
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,12 +117,16 @@ export default class BinaryDecoder {
|
||||
*/
|
||||
readVarString () {
|
||||
let len = this.readVarUint()
|
||||
let bytes = new Array(len)
|
||||
let encodedString = ''
|
||||
//let bytes = new Array(len)
|
||||
for (let i = 0; i < len; i++) {
|
||||
bytes[i] = this.uint8arr[this.pos++]
|
||||
//bytes[i] = this.uint8arr[this.pos++]
|
||||
// encodedString += String.fromCodePoint(this.uint8arr[this.pos++])
|
||||
encodedString += String(this.uint8arr[this.pos++])
|
||||
}
|
||||
let encodedString = bytes.map(b => String.fromCodePoint(b)).join('')
|
||||
return decodeURIComponent(escape(encodedString))
|
||||
//let encodedString = String.fromCodePoint.apply(null, bytes)
|
||||
//return decodeURIComponent(escape(encodedString))
|
||||
return encodedString
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
// TODO: rename mutex
|
||||
|
||||
/**
|
||||
@@ -19,7 +18,7 @@
|
||||
*/
|
||||
export function createMutualExclude () {
|
||||
var token = true
|
||||
return function mutualExclude (f) {
|
||||
return function mutualExclude (f, g) {
|
||||
if (token) {
|
||||
token = false
|
||||
try {
|
||||
@@ -28,6 +27,8 @@ export function createMutualExclude () {
|
||||
console.error(e)
|
||||
}
|
||||
token = true
|
||||
} else if (g !== undefined) {
|
||||
g()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user