From 1d8719f6b1f61ce3b870a97aec6c4de2c39aa280 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 30 May 2019 23:06:15 +0100 Subject: [PATCH] Preserve turn boundaries in distribution protocol; loopback client; much improved debug output --- packages/server/package.json | 3 +- packages/server/src/buffer.js | 20 ++++ packages/server/src/client.js | 119 ++++++++++++++++------- packages/server/src/disco.js | 64 +++++++----- packages/server/src/federation.js | 64 +++++++----- packages/server/src/index.js | 19 ++-- packages/server/src/internal_protocol.js | 6 +- packages/server/src/protocol.js | 3 + packages/server/src/server.js | 62 +++++++++--- packages/server/src/turn.js | 34 +++++++ 10 files changed, 282 insertions(+), 112 deletions(-) create mode 100644 packages/server/src/buffer.js create mode 100644 packages/server/src/turn.js diff --git a/packages/server/package.json b/packages/server/package.json index 327c1c3..e151779 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -27,7 +27,8 @@ "@syndicate-lang/driver-http-node": "^0.1.2", "@syndicate-lang/driver-mdns": "^0.1.1", "@syndicate-lang/driver-streams-node": "^0.1.1", - "@syndicate-lang/driver-websocket": "^0.1.1" + "@syndicate-lang/driver-websocket": "^0.1.1", + "debug": "^4.1.1" }, "main": "lib/index.js", "bin": { diff --git a/packages/server/src/buffer.js b/packages/server/src/buffer.js new file mode 100644 index 0000000..67337fa --- /dev/null +++ b/packages/server/src/buffer.js @@ -0,0 +1,20 @@ +"use strict"; + +import { List } from "@syndicate-lang/core"; + +export function buffer(fields, fieldName) { + field fields[fieldName] = List(); + return { + push: function (item) { + fields[fieldName] = fields[fieldName].push(item); + }, + drain: function (handler) { + dataflow { + if (!fields[fieldName].isEmpty()) { + fields[fieldName].forEach(handler); + fields[fieldName] = List(); + } + } + } + }; +} diff --git a/packages/server/src/client.js b/packages/server/src/client.js index 39195be..ca66376 100644 --- a/packages/server/src/client.js +++ b/packages/server/src/client.js @@ -1,22 +1,28 @@ "use strict"; +const debugFactory = require('debug'); + import { Decoder, Encoder, Bytes, Observe, Skeleton, - genUuid, + genUuid, currentFacet, } from "@syndicate-lang/core"; const WS = activate require("@syndicate-lang/driver-websocket"); const { Connect, Peer, + Commit, Assert, Clear, Message, Add, Del, Msg, Err, Ping, Pong, makeDecoder, } = activate require("./protocol"); +const P = activate require("./internal_protocol"); +const Turn = activate require("./turn"); assertion type WSServer(url, scope) = Symbol.for('server-websocket-connection'); +assertion type Loopback(scope) = Symbol.for('server-loopback-connection'); assertion type ToServer(addr, assertion); assertion type FromServer(addr, assertion); @@ -27,12 +33,68 @@ message type ForceServerDisconnect(addr); message type _ServerPacket(addr, packet); Object.assign(module.exports, { - WSServer, + WSServer, Loopback, ToServer, FromServer, ServerConnection, ServerConnected, ForceServerDisconnect, }); +export function _genericClientSessionFacet(addr, scope, w0, debug) { + if (debug === void 0) { + debug = debugFactory('syndicate/server:client:' + genUuid('?')); + } + + assert ServerConnected(addr); + + on start debug('+', addr.toString(), scope); + on stop debug('-', addr.toString(), scope); + on message _ServerPacket(addr, $m) debug('<', m.toString()); + + const w = (x) => { + debug('>', x.toString()); + w0(x); + }; + + const outboundTurn = Turn.recorder(this, 'commitNeeded', + { + extend: w, + commit: () => { w(Commit()); }, + debug: debug + }); + const inboundTurn = Turn.replayer({ debug: debug }); + + on start w(Connect(scope)); + + during ToServer(addr, $a) { + const ep = genUuid('pub'); + on start outboundTurn.extend(Assert(ep, a)); + on stop outboundTurn.extend(Clear(ep)); + } + + on message ToServer(addr, $a) outboundTurn.extend(Message(a)); + + on message _ServerPacket(addr, Ping()) w(Pong()); + + during Observe(FromServer(addr, $spec)) { + const ep = genUuid('sub'); + on start outboundTurn.extend(Assert(ep, Observe(spec))); + on stop outboundTurn.extend(Clear(ep)); + on message _ServerPacket(addr, Add(ep, $vs)) inboundTurn.extend(() => { + react { + const epFacet = currentFacet(); + assert Skeleton.instantiateAssertion(FromServer(addr, spec), vs); + on message _ServerPacket(addr, Del(ep, vs)) worklist.push(() => { + epFacet.stop(); + }); + } + }) + on message _ServerPacket(addr, Msg(ep, $vs)) inboundTurn.extend(() => { + send Skeleton.instantiateAssertion(FromServer(addr, spec), vs); + }) + on message _ServerPacket(addr, Commit()) inboundTurn.commit(); + } +} + spawn named "ServerClientFactory" { during ToServer($addr, _) assert ServerConnection(addr); during Observe(FromServer($addr, _)) assert ServerConnection(addr); @@ -40,45 +102,32 @@ spawn named "ServerClientFactory" { during ServerConnection($addr(WSServer($url, $scope))) spawn named ['Server', addr] { const wsId = genUuid('server'); + const debug = debugFactory('syndicate/server:client:' + wsId); during WS.WebSocket(wsId, url, {}) { - assert ServerConnected(addr); - - function w(x) { - send WS.DataOut(wsId, new Encoder().push(x).contents()); - } - - on start w(Connect(scope)); - on message WS.DataIn(wsId, $data) { - if (data instanceof Bytes) { - send _ServerPacket(addr, makeDecoder(data).next()); - } + if (data instanceof Bytes) send _ServerPacket(addr, makeDecoder(data).next()); } - during ToServer(addr, $a) { - const ep = genUuid('pub'); - on start w(Assert(ep, a)); - on stop w(Clear(ep)); - } + _genericClientSessionFacet.call( + this, + addr, scope, + (x) => { send WS.DataOut(wsId, new Encoder().push(x).contents()); }, + debug); + } + } - on message ToServer(addr, $a) w(Message(a)); - - on message _ServerPacket(addr, Ping()) w(Pong()); - - during Observe(FromServer(addr, $spec)) { - const ep = genUuid('sub'); - on start w(Assert(ep, Observe(spec))); - on stop w(Clear(ep)); - on message _ServerPacket(addr, Add(ep, $vs)) { - react { - assert Skeleton.instantiateAssertion(FromServer(addr, spec), vs); - stop on message _ServerPacket(addr, Del(ep, vs)); - } - } - on message _ServerPacket(addr, Msg(ep, $vs)) { - send Skeleton.instantiateAssertion(FromServer(addr, spec), vs); - } + during ServerConnection($addr(Loopback($scope))) spawn named ['Server', addr] { + const debug = debugFactory('syndicate/server:client:loopback:' + scope); + assert P.POA(addr); + on message P.ToPOA(addr, $p) send _ServerPacket(addr, p); + on start react { + stop on asserted Observe(P.FromPOA(addr, _)) { + react _genericClientSessionFacet.call( + this, + addr, scope, + (x) => { send P.FromPOA(addr, x); }, + debug); } } } diff --git a/packages/server/src/disco.js b/packages/server/src/disco.js index 3959c37..b8a2d4d 100644 --- a/packages/server/src/disco.js +++ b/packages/server/src/disco.js @@ -27,29 +27,31 @@ const Federation = activate require("./federation"); import { Set, Map, - RandomID, } from "@syndicate-lang/core"; const fs = require('fs'); +const debugFactory = require('debug'); +const debug = debugFactory('syndicate/server:disco'); + spawn named 'peerAdvertisement' { - const localId = RandomID.randomId(8, false); - assert OverlayNode(localId); - console.log('Local node ID is', localId); + during OverlayNode($localId) { + on start debug('Local node ID is', localId); - during Federation.ManagementScope($managementScope) { - during P.Envelope(managementScope, Overlay($overlayId, _)) { - const gatewayId = overlayId + ':' + localId; + during Federation.ManagementScope($managementScope) { + during P.Envelope(managementScope, Overlay($overlayId, _)) { + const gatewayId = overlayId + ':' + localId; - during P.Envelope(managementScope, AvailableTransport(WebSocketTransport($port, $path))) { - assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, [ - "path="+path, - "scope="+managementScope - ]); + during P.Envelope(managementScope, AvailableTransport(WebSocketTransport($port, $path))) { + assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, [ + "path="+path, + "scope="+managementScope + ]); + } + + // Other variants for later: + // assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, []); } - - // Other variants for later: - // assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, []); } } } @@ -67,7 +69,7 @@ function txtsToMap(txts) { spawn named 'peerDiscovery' { during M.DefaultGateway($gatewayInterface, $gatewayIp) { - on start console.log('Gateway IP is', gatewayIp, 'on interface', gatewayInterface); + on start debug('Gateway IP is', gatewayIp, 'on interface', gatewayInterface); during M.Discovered(M.Service($name, '_syndicate+ws._tcp'), _, // hostname @@ -88,11 +90,24 @@ spawn named 'peerDiscovery' { } } -spawn named 'helpful info output' { - console.info('Peer discovery running'); +spawn named 'syndicate/server:disco:transport' { + const debug = debugFactory('syndicate/server:disco:transport'); + on asserted AvailableTransport($spec) console.info(spec.toString()); +} + +spawn named 'syndicate/server:disco:mdns' { + const debug = debugFactory('syndicate/server:disco:mdns'); + debug('Peer discovery running'); during Peer($overlayId, $nodeId, $ip, $addr) { - on start console.info("+PEER", ip, overlayId, nodeId, addr.toString()); - on stop console.info("-PEER", ip, overlayId, nodeId, addr.toString()); + on start debug("+", ip, overlayId, nodeId, addr.toString()); + on stop debug("-", ip, overlayId, nodeId, addr.toString()); + } +} + +spawn named 'federationRoutingInfo' { + during Federation.ManagementScope($managementScope) { + // assert P.Proposal(managementScope, Federation.ManagementScope(managementScope)); + during $t(AvailableTransport(_)) assert P.Proposal(managementScope, t); } } @@ -144,14 +159,17 @@ spawn named 'uplinkSelection' { } dataflow if (this.bestAddr) { - console.log('Selected uplink for overlay', overlayId, 'is', this.bestAddr.toString()); + debug('Selected uplink peer for overlay', overlayId, 'is', this.bestPeer.toString(), 'at', this.bestAddr.toString()); } assert P.Proposal(managementScope, Federation.Uplink(overlayId, this.bestAddr, overlayId)) when (this.bestAddr); - assert P.Proposal(overlayId, OverlayLink(OverlayNode(localId), this.bestPeer)) - when (this.bestAddr); + const loopbackAddr = C.Loopback(overlayId); + during C.ServerConnected(loopbackAddr) { + assert C.ToServer(loopbackAddr, OverlayLink(OverlayNode(localId), this.bestPeer)) + when (this.bestAddr); + } } } } diff --git a/packages/server/src/federation.js b/packages/server/src/federation.js index 6f6a197..b16cbae 100644 --- a/packages/server/src/federation.js +++ b/packages/server/src/federation.js @@ -3,6 +3,8 @@ const P = activate require("./internal_protocol"); const W = activate require("./protocol"); const C = activate require("./client"); +const B = activate require("./buffer"); +const debugFactory = require('debug'); assertion type ManagementScope(scope) = Symbol.for('federation-management-scope'); @@ -28,39 +30,36 @@ spawn named '@syndicate-lang/server/federation/UplinkFactory' { { during C.ServerConnected(peerAddr) { const sessionId = genUuid('peer'); + + const debug = debugFactory('syndicate/server:federation:uplink:' + sessionId); + on start debug('+', peerAddr.toString()); + on stop debug('-', peerAddr.toString()); + assert P.Proposal(managementScope, UplinkConnected(link)); assert P.Proposal(managementScope, P.FederatedLink(sessionId, localScope)); assert C.ToServer(peerAddr, P.FederatedLink(sessionId, remoteScope)); - field this.pendingIn = List(); - field this.pendingOut = List(); + const pendingIn = B.buffer(this, 'pendingIn'); + const pendingOut = B.buffer(this, 'pendingOut'); on message C.FromServer(peerAddr, P.ToPOA(sessionId, $p)) { - this.pendingIn = this.pendingIn.push(p); + pendingIn.push(p); } on message P.Envelope(managementScope, P.ToPOA(sessionId, $p)) { - this.pendingOut = this.pendingOut.push(p); + pendingOut.push(p); } - during P.Envelope(managementScope, Observe(P.FromPOA(sessionId, _))) { - during C.FromServer(peerAddr, Observe(P.FromPOA(sessionId, _))) { - dataflow { - if (!this.pendingIn.isEmpty()) { - this.pendingIn.forEach((p) => { - send P.Proposal(managementScope, P.FromPOA(sessionId, p)); - }); - this.pendingIn = List(); - } - } - dataflow { - if (!this.pendingOut.isEmpty()) { - this.pendingOut.forEach((p) => { - send C.ToServer(peerAddr, P.FromPOA(sessionId, p)); - }); - this.pendingOut = List(); - } - } + during P.Envelope(managementScope, P.FederatedLinkReady(sessionId)) { + during C.FromServer(peerAddr, P.FederatedLinkReady(sessionId)) { + pendingIn.drain((p) => { + debug('<', p.toString()); + send P.Proposal(managementScope, P.FromPOA(sessionId, p)); + }); + pendingOut.drain((p) => { + debug('>', p.toString()); + send C.ToServer(peerAddr, P.FromPOA(sessionId, p)); + }); } } } @@ -75,6 +74,11 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' { spawn named ['@syndicate-lang/server/federation/LocalLink', managementScope, scope] { const sessionId = genUuid('localLink'); + + const debug = debugFactory('syndicate/server:federation:local:' + scope); + on start debug('+', sessionId); + on stop debug('-', sessionId); + assert P.Proposal(managementScope, P.FederatedLink(sessionId, scope)); const sendFromPOA = (m) => { @@ -83,6 +87,8 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' { on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Assert($ep, Observe($spec)))) { react { + on start debug('remoteObs+', spec.toString()); + on stop debug('remoteObs-', spec.toString()); currentFacet().addEndpoint(() => { const outerSpec = P.Proposal(scope, spec); const analysis = Skeleton.analyzeAssertion(outerSpec); @@ -104,6 +110,8 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' { during Observe($pat(P.Envelope(scope, $spec))) { const ep = genUuid('ep'); + on start debug('localObs+', spec.toString(), ep); + on stop debug('localObs-', spec.toString(), ep); on start sendFromPOA(W.Assert(ep, Observe(spec))); on stop sendFromPOA(W.Clear(ep)); on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Add(ep, $captures))) { @@ -242,6 +250,14 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' { }; during P.Envelope(managementScope, P.FederatedLink($linkid, scope)) { + const debug = debugFactory('syndicate/server:federation:link:' + linkid); + on start debug('+', scope.toString()); + on stop debug('-', scope.toString()); + on message P.Envelope(managementScope, P.FromPOA(linkid, $m)) debug('<', m.toString()); + on message P.Envelope(managementScope, P.ToPOA(linkid, $m)) debug('>', m.toString()); + + assert P.Proposal(managementScope, P.FederatedLinkReady(linkid)); + field this.linkSubs = Map(); field this.linkMatches = Map(); @@ -266,8 +282,8 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' { }; on start { - // console.log('+PEER', linkid, scope, this.peers); this.peers = this.peers.add(linkid); + // console.log('+PEER', linkid, scope, this.peers); this.specs.forEach((localid, spec) => { sendToLink(linkid, W.Assert(localid, Observe(spec))); }); @@ -275,8 +291,8 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' { } on stop { - // console.log('-PEER', linkid, scope); this.peers = this.peers.remove(linkid); + // console.log('-PEER', linkid, scope); this.linkMatches.forEach((matches, localid) => { matches.forEach((captures) => removeMatch(localid, captures, linkid)); }); diff --git a/packages/server/src/index.js b/packages/server/src/index.js index 637db89..a8f8683 100644 --- a/packages/server/src/index.js +++ b/packages/server/src/index.js @@ -13,6 +13,10 @@ const Server = activate require("./server"); const Federation = activate require("./federation"); const fs = require('fs'); +import { + RandomID, +} from "@syndicate-lang/core"; + let currentManagementScope = 'local'; function usage() { @@ -91,22 +95,15 @@ spawn named 'server' { uplinks.forEach((link) => { assert P.Proposal(currentManagementScope, link); }); + if (overlays.length > 0) { + const localId = RandomID.randomId(8, false); + assert D.OverlayNode(localId); + } overlays.forEach((o) => { assert P.Proposal(currentManagementScope, o); }); } -spawn named 'helpful info output' { - on asserted D.AvailableTransport($spec) console.info('Transport:', spec.toString()); -} - -spawn named 'federationRoutingInfo' { - during Federation.ManagementScope($managementScope) { - // assert P.Proposal(managementScope, Federation.ManagementScope(managementScope)); - during $t(D.AvailableTransport(_)) assert P.Proposal(managementScope, t); - } -} - function _spawnStreamServer(spec) { spawn named spec { assert D.AvailableTransport(spec); diff --git a/packages/server/src/internal_protocol.js b/packages/server/src/internal_protocol.js index 25dfd44..acb0383 100644 --- a/packages/server/src/internal_protocol.js +++ b/packages/server/src/internal_protocol.js @@ -3,6 +3,7 @@ assertion type ServerActive(scope) = Symbol.for('server-active'); assertion type POA(connId) = Symbol.for('server-poa'); +assertion type POAReady(connId) = Symbol.for('server-poa-ready'); message type FromPOA(connId, body) = Symbol.for('message-poa->server'); message type ToPOA(connId, body) = Symbol.for('message-server->poa'); @@ -17,12 +18,13 @@ assertion type POAScope(connId, scope) = Symbol.for('server-poa-scope'); // Federation assertion type FederatedLink(id, scope) = Symbol.for('federated-link'); +assertion type FederatedLinkReady(id) = Symbol.for('federated-link-ready'); Object.assign(module.exports, { ServerActive, - POA, FromPOA, ToPOA, + POA, POAReady, FromPOA, ToPOA, Disconnect, Proposal, Envelope, POAScope, - FederatedLink, + FederatedLink, FederatedLinkReady, }); diff --git a/packages/server/src/protocol.js b/packages/server/src/protocol.js index ea9fcf7..54e8cc7 100644 --- a/packages/server/src/protocol.js +++ b/packages/server/src/protocol.js @@ -5,6 +5,8 @@ import { Decoder, Discard, Capture, Observe } from "@syndicate-lang/core"; message type Connect(scope); message type Peer(scope); +message type Commit(); + message type Assert(endpointName, assertion); message type Clear(endpointName); message type Message(body); @@ -29,6 +31,7 @@ function makeDecoder(initialBuffer) { Object.assign(module.exports, { Connect, Peer, + Commit, Assert, Clear, Message, Add, Del, Msg, Err, Ping, Pong, diff --git a/packages/server/src/server.js b/packages/server/src/server.js index e79974a..1a6207a 100644 --- a/packages/server/src/server.js +++ b/packages/server/src/server.js @@ -2,37 +2,44 @@ const Http = activate require("@syndicate-lang/driver-http-node"); const S = activate require("@syndicate-lang/driver-streams-node"); +const debugFactory = require('debug'); import { Set, Bytes, Encoder, Observe, - Dataspace, Skeleton, currentFacet, genUuid, RandomID + Dataspace, Skeleton, currentFacet, genUuid, } from "@syndicate-lang/core"; const P = activate require("./internal_protocol"); const W = activate require("./protocol"); +const B = activate require("./buffer"); +const Turn = activate require("./turn"); export function websocketServerFacet(reqId) { assert P.POA(reqId); - on message Http.DataIn(reqId, $data) { + const buf = B.buffer(this, 'chunks'); + on message Http.DataIn(reqId, $data) buf.push(data); + during P.POAReady(reqId) buf.drain((data) => { if (data instanceof Bytes) send P.FromPOA(reqId, W.makeDecoder(data).next()); - } + }); on message P.ToPOA(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); stop on message P.Disconnect(reqId); - stop on retracted P.POAScope(reqId, _); + stop on retracted P.POAReady(reqId); } export function streamServerFacet(id) { assert P.POA(id); const decoder = W.makeDecoder(null); - on message S.Data(id, $data) { + const buf = B.buffer(this, 'chunks'); + on message S.Data(id, $data) buf.push(data); + during P.POAReady(reqId) buf.drain((data) => { decoder.write(data); let v; while ((v = decoder.try_next())) send P.FromPOA(id, v); - } + }); on message P.ToPOA(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); stop on message P.Disconnect(id); - stop on retracted P.POAScope(id, _); + stop on retracted P.POAReady(id); } export function streamServerActor(id, debugLabel) { @@ -48,7 +55,14 @@ spawn named '@syndicate-lang/server/server/POAHandler' { during Observe(P.Envelope($scope, $spec)) assert P.Proposal(scope, Observe(spec)); during P.POA($connId) spawn named P.POA(connId) { + const debug = debugFactory('syndicate/server:server:' + connId.toString()); + on start debug('+'); + on stop debug('-'); + on message P.FromPOA(connId, $m) debug('<', m.toString()); + on message P.ToPOA(connId, $m) debug('>', m.toString()); + field this.scope = null; + assert P.POAReady(connId); assert P.POAScope(connId, this.scope) when (this.scope !== null); assert P.ServerActive(this.scope) when (this.scope !== null); @@ -59,10 +73,20 @@ spawn named '@syndicate-lang/server/server/POAHandler' { this.scope = scope; } - on message P.FromPOA(connId, W.Assert($ep, $a)) { + const outboundTurn = Turn.recorder(this, 'commitNeeded', + { + extend: (m) => { send P.ToPOA(connId, m); }, + commit: () => { send P.ToPOA(connId, W.Commit()); }, + debug: debug + }); + const inboundTurn = Turn.replayer({ debug: debug }); + + on message P.FromPOA(connId, W.Assert($ep, $a)) inboundTurn.extend(() => { if (!endpoints.includes(ep)) { endpoints = endpoints.add(ep); react { + const epFacet = currentFacet(); + on stop { endpoints = endpoints.remove(ep); } field this.assertion = a; @@ -75,9 +99,9 @@ spawn named '@syndicate-lang/server/server/POAHandler' { analysis.callback = Dataspace.wrap((evt, vs) => { currentFacet().actor.scheduleScript(() => { switch (evt) { - case Skeleton.EVENT_ADDED: send P.ToPOA(connId, W.Add(ep, vs)); break; - case Skeleton.EVENT_REMOVED: send P.ToPOA(connId, W.Del(ep, vs)); break; - case Skeleton.EVENT_MESSAGE: send P.ToPOA(connId, W.Msg(ep, vs)); break; + case Skeleton.EVENT_ADDED: outboundTurn.extend(W.Add(ep, vs)); break; + case Skeleton.EVENT_REMOVED: outboundTurn.extend(W.Del(ep, vs)); break; + case Skeleton.EVENT_MESSAGE: outboundTurn.extend(W.Msg(ep, vs)); break; } }); }); @@ -87,14 +111,20 @@ spawn named '@syndicate-lang/server/server/POAHandler' { } }, true); - on message P.FromPOA(connId, W.Assert(ep, $newAssertion)) this.assertion = newAssertion; - stop on message P.FromPOA(connId, W.Clear(ep)); + on message P.FromPOA(connId, W.Assert(ep, $newAssertion)) inboundTurn.extend(() => { + this.assertion = newAssertion; + }); + on message P.FromPOA(connId, W.Clear(ep)) inboundTurn.extend(() => { + epFacet.stop(); + }); } } - } + }); - on message P.FromPOA(connId, W.Message($body)) { + on message P.FromPOA(connId, W.Message($body)) inboundTurn.extend(() => { send P.Proposal(this.scope, body); - } + }); + + on message P.FromPOA(connId, W.Commit()) inboundTurn.commit(); } } diff --git a/packages/server/src/turn.js b/packages/server/src/turn.js new file mode 100644 index 0000000..74afcf1 --- /dev/null +++ b/packages/server/src/turn.js @@ -0,0 +1,34 @@ +"use strict"; + +import { _Dataspace, currentFacet } from "@syndicate-lang/core"; +const PRIORITY = _Dataspace.PRIORITY; + +export function recorder(fields, fieldName, callbacks) { + field fields[fieldName] = false; + currentFacet().addDataflow(() => { + if (fields[fieldName]) { + callbacks.commit(); + fields[fieldName] = false; + } + }, PRIORITY.IDLE); + return { + extend: function (item) { + callbacks.extend(item); + fields[fieldName] = true; + } + }; +} + +export function replayer(callbacks0) { + const callbacks = callbacks0 || {}; + return { + worklist: [], + extend: function (thunk) { + this.worklist.push(thunk); + }, + commit: function () { + this.worklist.forEach((thunk) => thunk()); + this.worklist.length = 0; // clear out the list + } + }; +}