From 8199917335025684e083c13639812101472c751f Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 20 Jun 2019 13:35:04 +0100 Subject: [PATCH] Heartbeats --- packages/server/package.json | 1 + packages/server/src/client.js | 50 +++++++++++++++++++++++--------- packages/server/src/heartbeat.js | 43 +++++++++++++++++++++++++++ packages/server/src/protocol.js | 6 ++++ packages/server/src/server.js | 10 +++++-- 5 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 packages/server/src/heartbeat.js diff --git a/packages/server/package.json b/packages/server/package.json index a684ecf..89d6af4 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -27,6 +27,7 @@ "@syndicate-lang/driver-http-node": "^0.2.7", "@syndicate-lang/driver-mdns": "^0.2.7", "@syndicate-lang/driver-streams-node": "^0.2.7", + "@syndicate-lang/driver-timer": "^0.2.7", "@syndicate-lang/driver-websocket": "^0.2.7", "debug": "^4.1.1" }, diff --git a/packages/server/src/client.js b/packages/server/src/client.js index b9f822e..9286f58 100644 --- a/packages/server/src/client.js +++ b/packages/server/src/client.js @@ -17,9 +17,11 @@ const { Add, Del, Msg, Err, End, Ping, Pong, makeDecoder, + shouldDebugPrint, } = activate require("./protocol"); const P = activate require("./internal_protocol"); const { recorder } = activate require("./turn"); +const { heartbeat } = activate require("./heartbeat"); assertion type WSServer(url, scope) = Symbol.for('server-websocket-connection'); assertion type Loopback(scope) = Symbol.for('server-loopback-connection'); @@ -39,7 +41,7 @@ Object.assign(module.exports, { ForceServerDisconnect, }); -export function _genericClientSessionFacet(addr, scope, w0, debug) { +export function _genericClientSessionFacet(addr, scope, w0, teardown, debug) { if (debug === void 0) { debug = debugFactory('syndicate/server:client:' + genUuid('?')); } @@ -48,10 +50,10 @@ export function _genericClientSessionFacet(addr, scope, w0, debug) { on start debug('+', addr.toString(), scope); on stop debug('-', addr.toString(), scope); - on message _ServerPacket(addr, $m) debug('<', m.toString()); + on message _ServerPacket(addr, $m) if (shouldDebugPrint(m)) debug('<', m.toString()); const w = (x) => { - debug('>', x.toString()); + if (shouldDebugPrint(x)) debug('>', x.toString()); w0(x); }; @@ -96,6 +98,9 @@ export function _genericClientSessionFacet(addr, scope, w0, debug) { this.subs = this.subs.remove(spec); } + const resetHeartbeat = heartbeat(this, ['client', addr, scope], w, teardown); + on message _ServerPacket(addr, _) resetHeartbeat(); + const _instantiate = (m, vs) => Skeleton.instantiateAssertion(FromServer(addr, m.spec), vs); const _lookup = (CTOR, item) => { @@ -140,20 +145,36 @@ spawn named "ServerClientFactory" { during Observe(ServerConnected($addr)) assert ServerConnection(addr); during ServerConnection($addr(WSServer($url, $scope))) spawn named ['ServerClient', addr] { - const wsId = genUuid('ws'); - const debug = debugFactory('syndicate/server:client:' + wsId); + const reestablish = () => { + react { + const establishingFacet = currentFacet(); + const wsId = genUuid('ws'); - during WS.WebSocket(wsId, url, {}) { - on message WS.DataIn(wsId, $data) { - if (data instanceof Bytes) send _ServerPacket(addr, makeDecoder(data).next()); + during WS.WebSocket(wsId, url, {}) { + on message WS.DataIn(wsId, $data) { + if (data instanceof Bytes) send _ServerPacket(addr, makeDecoder(data).next()); + } + + _genericClientSessionFacet.call( + this, + addr, scope, + (x) => { send WS.DataOut(wsId, new Encoder().push(x).contents()); }, + () => { + establishingFacet.stop(() => { + // TODO: abstract this into a flush() function somewhere + const ACK = Symbol('ACK'); + react { + stop on message ACK reestablish(); + on start send ACK; + } + }); + }, + debugFactory('syndicate/server:client:' + wsId)); + } } + }; - _genericClientSessionFacet.call( - this, - addr, scope, - (x) => { send WS.DataOut(wsId, new Encoder().push(x).contents()); }, - debug); - } + on start reestablish(); } during ServerConnection($addr(Loopback($scope))) spawn named ['ServerClient', addr] { @@ -166,6 +187,7 @@ spawn named "ServerClientFactory" { this, addr, scope, (x) => { send P.FromPOA(addr, x); }, + () => { throw new Error("Cannot teardown and reset Loopback connection"); }, debug); } } diff --git a/packages/server/src/heartbeat.js b/packages/server/src/heartbeat.js new file mode 100644 index 0000000..e2d003a --- /dev/null +++ b/packages/server/src/heartbeat.js @@ -0,0 +1,43 @@ +"use strict"; + +const { TimeLaterThan } = activate require("@syndicate-lang/driver-timer"); +const debug = require('debug')('syndicate/server:heartbeat'); +const W = require('./protocol'); + +const PERIOD = 60 * 1000; // milliseconds +const GRACE = 3 * PERIOD; + +function heartbeat(fields, who, sendMessage, teardown) { + debug('Configuring heartbeat', who, PERIOD, GRACE); + + const NEXT_PING_TIME = Symbol('NEXT_PING_TIME'); + const LAST_RECEIVED_TRAFFIC = Symbol('LAST_RECEIVED_TRAFFIC'); + + function now() { return (+(new Date())); } // returns milliseconds + + field fields[NEXT_PING_TIME] = 0; + field fields[LAST_RECEIVED_TRAFFIC] = now(); + + const scheduleNextPing = () => { fields[NEXT_PING_TIME] = now() + PERIOD; }; + + on asserted TimeLaterThan(fields[NEXT_PING_TIME]) { + scheduleNextPing(); + sendMessage(W.Ping()); + } + + on asserted TimeLaterThan(fields[LAST_RECEIVED_TRAFFIC] + GRACE) { + debug('Heartbeat timeout', who, GRACE); + teardown(); + } + + return () => { + scheduleNextPing(); + fields[LAST_RECEIVED_TRAFFIC] = now(); + }; +} + +Object.assign(module.exports, { + PERIOD, + GRACE, + heartbeat, +}); diff --git a/packages/server/src/protocol.js b/packages/server/src/protocol.js index 1d94fbb..73c5321 100644 --- a/packages/server/src/protocol.js +++ b/packages/server/src/protocol.js @@ -29,6 +29,11 @@ function makeDecoder(initialBuffer) { }); } +function shouldDebugPrint(m) { + // return !(Ping.isClassOf(m) || Pong.isClassOf(m)); + return true; +} + Object.assign(module.exports, { Connect, Turn, @@ -36,4 +41,5 @@ Object.assign(module.exports, { Add, Del, Msg, Err, End, Ping, Pong, makeDecoder, + shouldDebugPrint, }); diff --git a/packages/server/src/server.js b/packages/server/src/server.js index 0efb105..7567f65 100644 --- a/packages/server/src/server.js +++ b/packages/server/src/server.js @@ -14,6 +14,7 @@ const P = activate require("./internal_protocol"); const W = activate require("./protocol"); const B = activate require("./buffer"); const { recorder } = activate require("./turn"); +const { heartbeat } = activate require("./heartbeat"); export function websocketServerFacet(reqId) { assert P.POA(reqId); @@ -58,8 +59,8 @@ spawn named '@syndicate-lang/server/server/POAHandler' { const debug = debugFactory('syndicate/server:server:' + connId.toString()); on start debug('+'); on stop debug('-'); - on message P.FromPOA(connId, $m) debug('<', m.toString()); - on message P.ToPOA(connId, $m) debug('>', m.toString()); + on message P.FromPOA(connId, $m) if (W.shouldDebugPrint(m)) debug('<', m.toString()); + on message P.ToPOA(connId, $m) if (W.shouldDebugPrint(m)) debug('>', m.toString()); field this.scope = null; assert P.POAReady(connId); @@ -76,6 +77,11 @@ spawn named '@syndicate-lang/server/server/POAHandler' { const sendToPOA = (m) => { send P.ToPOA(connId, m); }; const outboundTurn = recorder(this, 'commitNeeded', (items) => sendToPOA(W.Turn(items))); + const poaFacet = currentFacet(); + const resetHeartbeat = heartbeat(this, ['server', connId], sendToPOA, () => {poaFacet.stop();}); + on message P.FromPOA(connId, _) resetHeartbeat(); + on message P.FromPOA(connId, W.Ping()) sendToPOA(W.Pong()); + on message P.FromPOA(connId, W.Turn($items)) { items.forEach((item) => { if (W.Assert.isClassOf(item)) {