From 7fd2805209772f589c425d6172e9bfcd1d4dac1b Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 8 Jun 2019 21:10:31 +0100 Subject: [PATCH] Switch to explicit grouping of turn actions/events --- packages/server/src/client.js | 94 +++++--- packages/server/src/federation.js | 376 ++++++++++++++++++------------ packages/server/src/protocol.js | 4 +- packages/server/src/server.js | 94 ++++---- packages/server/src/turn.js | 23 +- 5 files changed, 336 insertions(+), 255 deletions(-) diff --git a/packages/server/src/client.js b/packages/server/src/client.js index f03f92e..3039e59 100644 --- a/packages/server/src/client.js +++ b/packages/server/src/client.js @@ -3,7 +3,7 @@ const debugFactory = require('debug'); import { - Decoder, Encoder, Bytes, + Decoder, Encoder, Bytes, Map, Observe, Skeleton, genUuid, currentFacet, } from "@syndicate-lang/core"; @@ -12,14 +12,14 @@ const WS = activate require("@syndicate-lang/driver-websocket"); const { Connect, Peer, - Commit, + Turn, Assert, Clear, Message, - Add, Del, Msg, Err, + Add, Del, Msg, Err, End, Ping, Pong, makeDecoder, } = activate require("./protocol"); const P = activate require("./internal_protocol"); -const Turn = activate require("./turn"); +const { recorder } = activate require("./turn"); assertion type WSServer(url, scope) = Symbol.for('server-websocket-connection'); assertion type Loopback(scope) = Symbol.for('server-loopback-connection'); @@ -55,45 +55,79 @@ export function _genericClientSessionFacet(addr, scope, w0, debug) { w0(x); }; - const outboundTurn = Turn.recorder(this, 'commitNeeded', { - extend: w, - commit: () => { w(Commit()); }, - debug: debug - }); - const inboundTurn = Turn.replayer({ debug: debug }); + const outboundTurn = recorder(this, 'commitNeeded', (items) => w(Turn(items))); on start w(Connect(scope)); - during ToServer(addr, $a) { + let pubs = Map(); + let subs = Map(); + let matches = Map(); + + on asserted ToServer(addr, $a) { const ep = genUuid('pub'); - on start outboundTurn.extend(Assert(ep, a)); - on stop outboundTurn.extend(Clear(ep)); + outboundTurn.extend(Assert(ep, a)); + pubs = pubs.set(a, ep); + } + + on retracted ToServer(addr, $a) { + const ep = pubs.get(a); + outboundTurn.extend(Clear(ep)); + pubs = pubs.remove(a); } on message ToServer(addr, $a) { - outboundTurn.commit(); - w(Message(a)); + outboundTurn.extend(Message(a)); } on message _ServerPacket(addr, Ping()) w(Pong()); - during Observe(FromServer(addr, $spec)) { + on asserted Observe(FromServer(addr, $spec)) { const ep = genUuid('sub'); - on start outboundTurn.extend(Assert(ep, Observe(spec))); - on stop outboundTurn.extend(Clear(ep)); - on message _ServerPacket(addr, Add(ep, $vs)) inboundTurn.extend(() => { - react { - const assertionFacet = currentFacet(); - assert Skeleton.instantiateAssertion(FromServer(addr, spec), vs); - on message _ServerPacket(addr, Del(ep, vs)) inboundTurn.extend(() => { - assertionFacet.stop(); - }); + outboundTurn.extend(Assert(ep, Observe(spec))); + subs = subs.set(spec, ep); + matches = matches.set(ep, { spec, captures: Map() }); + } + + on retracted Observe(FromServer(addr, $spec)) { + outboundTurn.extend(Clear(subs.get(spec))); + subs = subs.remove(spec); + } + + const _instantiate = (m, vs) => Skeleton.instantiateAssertion(FromServer(addr, m.spec), vs); + + const _lookup = (CTOR, item) => { + const m = matches.get(CTOR._endpointName(item)); + const vs = CTOR._captures(item); + return { m, vs }; + } + + on message _ServerPacket(addr, Turn($items)) { + items.forEach((item) => { + if (Add.isClassOf(item)) { + const { m, vs } = _lookup(Add, item); + const a = _instantiate(m, vs); + m.captures = m.captures.set(vs, a); + currentFacet().actor.adhocAssert(a); + } else if (Del.isClassOf(item)) { + const { m, vs } = _lookup(Del, item); + currentFacet().actor.adhocRetract(m.captures.get(vs)); + m.captures = m.captures.remove(vs); + } else if (Msg.isClassOf(item)) { + const { m, vs } = _lookup(Msg, item); + send _instantiate(m, vs); + } else if (End.isClassOf(item)) { + const ep = End._endpointName(item); + const m = matches.get(ep); + if (m) { + m.captures.forEach((a) => currentFacet().actor.adhocRetract(a)); + matches = matches.remove(ep); + } + } else if (Err.isClassOf(item)) { + throw new Error(item.toString()); + } else { + debug("Unhandled client/server message", item.toString()); } - }) - on message _ServerPacket(addr, Msg(ep, $vs)) { - send Skeleton.instantiateAssertion(FromServer(addr, spec), vs); - } - on message _ServerPacket(addr, Commit()) inboundTurn.commit(); + }); } } diff --git a/packages/server/src/federation.js b/packages/server/src/federation.js index 32e7f7c..9c5cd95 100644 --- a/packages/server/src/federation.js +++ b/packages/server/src/federation.js @@ -4,6 +4,7 @@ const P = activate require("./internal_protocol"); const W = activate require("./protocol"); const C = activate require("./client"); const B = activate require("./buffer"); +const { recorder } = activate require("./turn"); const debugFactory = require('debug'); assertion type ManagementScope(scope) = Symbol.for('federation-management-scope'); @@ -42,24 +43,15 @@ spawn named '@syndicate-lang/server/federation/UplinkFactory' { const pendingIn = B.buffer(this, 'pendingIn'); const pendingOut = B.buffer(this, 'pendingOut'); - on message C.FromServer(peerAddr, P.ToPOA(sessionId, $p)) { - pendingIn.push(p); - } - - on message P.Envelope(managementScope, P.ToPOA(sessionId, $p)) { - pendingOut.push(p); - } + on message C.FromServer(peerAddr, P.ToPOA(sessionId, $p)) pendingIn.push(p); + on message P.Envelope(managementScope, P.ToPOA(sessionId, $p)) pendingOut.push(p); during P.Envelope(managementScope, P.FederatedLinkReady(sessionId)) { + debug('Local end is ready'); during C.FromServer(peerAddr, P.FederatedLinkReady(sessionId)) { - pendingIn.drain((p) => { - debug('<', p.toString()); - send P.Proposal(managementScope, P.FromPOA(sessionId, p)); - }); - pendingOut.drain((p) => { - debug('>', p.toString()); - send C.ToServer(peerAddr, P.FromPOA(sessionId, p)); - }); + debug('Remote end is ready'); + pendingIn.drain((p) => { send P.Proposal(managementScope, P.FromPOA(sessionId, p)); }); + pendingOut.drain((p) => { send C.ToServer(peerAddr, P.FromPOA(sessionId, p)); }); } } } @@ -81,41 +73,92 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' { assert P.Proposal(managementScope, P.FederatedLink(sessionId, scope)); - const sendFromPOA = (m) => { - send P.Proposal(managementScope, P.FromPOA(sessionId, m)); - }; + const sendFromPOA = (m) => { send P.Proposal(managementScope, P.FromPOA(sessionId, m)); }; + const outboundTurn = recorder(this, 'commitNeeded', (items) => sendFromPOA(W.Turn(items))); - on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Assert($ep, Observe($spec)))) { - react { - on start debug('remoteObs+', spec.toString()); - on stop debug('remoteObs-', spec.toString()); - currentFacet().addObserverEndpoint(() => P.Proposal(scope, spec), { - add: (vs) => sendFromPOA(W.Add(ep, vs)), - del: (vs) => sendFromPOA(W.Del(ep, vs)), - msg: (vs) => sendFromPOA(W.Msg(ep, vs)), - }); - assert P.Envelope(scope, Observe(spec)); - stop on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Clear(ep))) { - sendFromPOA(W.End(ep)); - } - } + let remoteEndpoints = Map(); + let localEndpoints = Map(); + let localMatches = Map(); + + const _inst = (m, vs) => Skeleton.instantiateAssertion(P.Envelope(scope, m.spec), vs); + + const _lookup = (CTOR, item) => { + const m = localMatches.get(CTOR._endpointName(item)); + const vs = CTOR._captures(item); + return { m, vs }; } - during Observe($pat(P.Envelope(scope, $spec))) { + on asserted Observe(P.Envelope(scope, $spec)) { const ep = genUuid('ep'); - on start debug('localObs+', spec.toString(), ep); - on stop debug('localObs-', spec.toString(), ep); - on start sendFromPOA(W.Assert(ep, Observe(spec))); - on stop sendFromPOA(W.Clear(ep)); - on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Add(ep, $captures))) { - react { - assert Skeleton.instantiateAssertion(pat, captures); - stop on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Del(ep, captures))); + debug('localObs+', spec.toString(), ep); + outboundTurn.extend(W.Assert(ep, Observe(spec))); + localEndpoints = localEndpoints.set(spec, ep); + localMatches = localMatches.set(ep, { spec, captures: Map() }); + } + + on retracted Observe(P.Envelope(scope, $spec)) { + const ep = localEndpoints.get(spec); + debug('localObs-', spec.toString(), ep); + outboundTurn.extend(W.Clear(ep)); + localEndpoints = localEndpoints.remove(spec); + } + + on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Turn($items))) { + items.forEach((item) => { + if (W.Assert.isClassOf(item)) { + const a = W.Assert._assertion(item); + if (Observe.isClassOf(a)) { + const ep = W.Assert._endpointName(item); + const spec = Observe._specification(a); + if (remoteEndpoints.has(ep)) { + throw new Error("Attempt to replace existing endpoint " + ep + " with " + a); + } + react { + const epFacet = currentFacet(); + remoteEndpoints = remoteEndpoints.set(ep, epFacet); + on stop { remoteEndpoints = remoteEndpoints.remove(ep); } + + on start debug('remoteObs+', spec.toString()); + on stop debug('remoteObs-', spec.toString()); + currentFacet().addObserverEndpoint(() => P.Proposal(scope, spec), { + add: (vs) => outboundTurn.extend(W.Add(ep, vs)), + del: (vs) => outboundTurn.extend(W.Del(ep, vs)), + msg: (vs) => outboundTurn.extend(W.Msg(ep, vs)), + }); + assert P.Envelope(scope, Observe(spec)); + } + } + } else if (W.Clear.isClassOf(item)) { + const ep = W.Clear._endpointName(item); + if (!remoteEndpoints.has(ep)) { + throw new Error("Attempt to clear nonexistent endpoint " + ep); + } + remoteEndpoints.get(ep).stop(() => { outboundTurn.extend(W.End(ep)); }); + } else if (W.Add.isClassOf(item)) { + const { m, vs } = _lookup(W.Add, item); + const a = _inst(m, vs); + m.captures = m.captures.set(vs, a); + currentFacet().actor.adhocAssert(a); + } else if (W.Del.isClassOf(item)) { + const { m, vs } = _lookup(W.Del, item); + currentFacet().actor.adhocRetract(m.captures.get(vs)); + m.captures = m.captures.remove(vs); + } else if (W.Msg.isClassOf(item)) { + const { m, vs } = _lookup(W.Msg, item); + send _inst(m, vs); + } else if (W.End.isClassOf(item)) { + const ep = W.End._endpointName(item); + const m = localMatches.get(ep); + if (m) { + m.captures.forEach((a) => currentFacet().actor.adhocRetract(a)); + localMatches = localMatches.remove(ep); + } + } else if (W.Err.isClassOf(item)) { + throw new Error(item.toString()); + } else { + debug("Unhandled federation message", item.toString()); } - } - on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Msg(ep, $captures))) { - send Skeleton.instantiateAssertion(pat, captures); - } + }); } } } @@ -174,20 +217,20 @@ class Subscription { spawn named '@syndicate-lang/server/federation/ScopeFactory' { during ManagementScope($managementScope) { - function sendToLink(linkid, m) { - send P.Proposal(managementScope, P.ToPOA(linkid, m)); - } - during P.Envelope(managementScope, P.FederatedLink(_, $scope)) spawn named ['@syndicate-lang/server/federation/Scope', scope] { + // function sendToLink(linkid, m) { + // send P.Proposal(managementScope, P.ToPOA(linkid, m)); + // } + let nextId = 0; const makeLocalId = () => { nextId++; return nextId; }; - field this.peers = Set(); + field this.turns = Map(); field this.specs = Map(); field this.subs = Map(); const scopeThis = this; @@ -217,13 +260,13 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' { sub.removeHolder(linkid); switch (sub.holders.size) { case 0: - this.peers.forEach((peer) => { - if (peer !== linkid) sendToLink(peer, W.Clear(localid)); + this.turns.forEach((turn, peer) => { + if (peer !== linkid) turn.extend(W.Clear(localid)); }); break; case 1: sub.holders.forEach((peerEndpoint, peer) => { // only one, guaranteed ≠ linkid - sendToLink(peer, W.Clear(localid)); + this.turns.get(peer).extend(W.Clear(localid)); }); break; default: @@ -238,13 +281,13 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' { switch (newMatchHolders.size) { case 0: sub.holders.forEach((peerEndpoint, peer) => { - if (peer !== linkid) sendToLink(peer, W.Del(peerEndpoint, captures)); + if (peer !== linkid) this.turns.get(peer).extend(W.Del(peerEndpoint, captures)); }); break; case 1: { const peer = newMatchHolders.first(); // only one, guaranteed ≠ linkid const peerEndpoint = sub.holders.get(peer, false); - if (peerEndpoint) sendToLink(peer, W.Del(peerEndpoint, captures)); + if (peerEndpoint) this.turns.get(peer).extend(W.Del(peerEndpoint, captures)); break; } default: @@ -262,127 +305,152 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' { assert P.Proposal(managementScope, P.FederatedLinkReady(linkid)); + const turn = recorder(this, 'commitNeeded', (items) => { + send P.Proposal(managementScope, P.ToPOA(linkid, W.Turn(items))); + }); + field this.linkSubs = Map(); field this.linkMatches = Map(); const err = (detail) => { - sendToLink(linkid, W.Err(detail)); + turn.extend(W.Err(detail)); currentFacet().stop(); }; on start { - this.peers = this.peers.add(linkid); - this.specs.forEach((localid, spec) => { - sendToLink(linkid, W.Assert(localid, Observe(spec))); - }); + this.turns = this.turns.set(linkid, turn); + this.specs.forEach((localid, spec) => turn.extend(W.Assert(localid, Observe(spec)))); + turn.commit(); } on stop { - this.peers = this.peers.remove(linkid); + this.turns = this.turns.remove(linkid); this.linkMatches.forEach((matches, localid) => { matches.forEach((captures) => removeMatch(localid, captures, linkid)); }); this.linkSubs.forEach((localid, _endpointId) => { unsubscribe(localid, linkid); }); + turn.commit(); } - on message P.Envelope(managementScope, P.FromPOA(linkid, W.Assert($ep, Observe($spec)))) { - let localid = this.specs.get(spec, null); - let sub; - if (localid === null) { - localid = makeLocalId(); - sub = new Subscription(localid, spec, scopeThis); - } else { - sub = this.subs.get(localid); - } - const oldHolderCount = sub.holders.size; - sub.addHolder(linkid, ep); - this.linkSubs = this.linkSubs.set(ep, sub.id); - switch (oldHolderCount) { - case 0: - this.peers.forEach((peer) => { - if (peer !== linkid) sendToLink(peer, W.Assert(localid, Observe(spec))); - }); - break; - case 1: - sub.holders.forEach((peerEndpoint, peer) => { // now contains 2, one of which is us - if (peer !== linkid) sendToLink(peer, W.Assert(localid, Observe(spec))); - }); - break; - default: - break; - } - sub.matches.forEach((matchHolders, captures) => { - if (!matchHolders.remove(linkid).isEmpty()) { - sendToLink(linkid, W.Add(ep, captures)); - } - }); - } + on message P.Envelope(managementScope, P.FromPOA(linkid, W.Turn($items))) { + items.forEach((item) => { + if (W.Assert.isClassOf(item)) { + const ep = W.Assert._endpointName(item); + const a = W.Assert._assertion(item); + if (Observe.isClassOf(a)) { + const spec = Observe._specification(a); - on message P.Envelope(managementScope, P.FromPOA(linkid, W.Clear($ep))) { - const localid = this.linkSubs.get(ep, null); - if (localid === null) { - console.error("Ignoring mention of nonexistent endpoint", ep, linkid); - } else { - this.linkSubs = this.linkSubs.remove(ep); - unsubscribe(localid, linkid); - } - sendToLink(linkid, W.End(ep)); - } - - on message P.Envelope(managementScope, P.FromPOA(linkid, W.End($localid))) { - (this.linkMatches.get(localid) || Set()).forEach((captures) => { - removeMatch(localid, captures, linkid); - }); - this.linkMatches = this.linkMatches.remove(localid); - } - - on message P.Envelope(managementScope, P.FromPOA(linkid, W.Add($localid, $captures))) { - const matches = this.linkMatches.get(localid) || Set(); - if (matches.includes(captures)) { - err(Symbol.for('duplicate-capture')); - } else { - this.linkMatches = this.linkMatches.set(localid, matches.add(captures)); - callWithSub(localid, linkid, (sub) => { - const oldMatchHolders = sub.addMatch(captures, linkid); - switch (oldMatchHolders.size) { - case 0: - sub.holders.forEach((peerEndpoint, peer) => { - if (peer !== linkid) sendToLink(peer, W.Add(peerEndpoint, captures)); - }); - break; - case 1: { - const peer = oldMatchHolders.first(); // only one, guaranteed ≠ linkid - const peerEndpoint = sub.holders.get(peer, false); - if (peerEndpoint) sendToLink(peer, W.Add(peerEndpoint, captures)); - break; + let localid = this.specs.get(spec, null); + let sub; + if (localid === null) { + localid = makeLocalId(); + sub = new Subscription(localid, spec, scopeThis); + } else { + sub = this.subs.get(localid); } - default: - break; + + const oldHolderCount = sub.holders.size; + sub.addHolder(linkid, ep); + this.linkSubs = this.linkSubs.set(ep, sub.id); + switch (oldHolderCount) { + case 0: + this.turns.forEach((turn, peer) => { + if (peer !== linkid) turn.extend(W.Assert(localid, Observe(spec))); + }); + break; + case 1: + sub.holders.forEach((peerEndpoint, peer) => { + // ^ now contains 2, one of which is us + if (peer !== linkid) { + this.turns.get(peer).extend(W.Assert(localid, Observe(spec))); + } + }); + break; + default: + break; + } + + sub.matches.forEach((matchHolders, captures) => { + if (!matchHolders.remove(linkid).isEmpty()) { + turn.extend(W.Add(ep, captures)); + } + }); } - }); - } - } - - on message P.Envelope(managementScope, P.FromPOA(linkid, W.Del($localid, $captures))) { - const matches = this.linkMatches.get(localid) || Set(); - if (!matches.includes(captures)) { - err(Symbol.for('nonexistent-capture')); - } else { - const newMatches = matches.remove(captures); - this.linkMatches = (newMatches.isEmpty()) - ? this.linkMatches.remove(localid) - : this.linkMatches.set(localid, newMatches); - removeMatch(localid, captures, linkid); - } - } - - on message P.Envelope(managementScope, P.FromPOA(linkid, W.Msg($localid, $captures))) { - callWithSub(localid, linkid, (sub) => { - sub.holders.forEach((peerEndpoint, peer) => { - if (peer !== linkid) sendToLink(peer, W.Msg(peerEndpoint, captures)); - }); + } else if (W.Clear.isClassOf(item)) { + const ep = W.Clear._endpointName(item); + const localid = this.linkSubs.get(ep, null); + if (localid === null) { + console.error("Ignoring mention of nonexistent endpoint", ep, linkid); + } else { + this.linkSubs = this.linkSubs.remove(ep); + unsubscribe(localid, linkid); + } + turn.extend(W.End(ep)); + } else if (W.End.isClassOf(item)) { + const localid = W.End._endpointName(item); + (this.linkMatches.get(localid) || Set()).forEach((captures) => { + removeMatch(localid, captures, linkid); + }); + this.linkMatches = this.linkMatches.remove(localid); + } else if (W.Add.isClassOf(item)) { + const localid = W.Add._endpointName(item); + const captures = W.Add._captures(item); + const matches = this.linkMatches.get(localid) || Set(); + if (matches.includes(captures)) { + err(Symbol.for('duplicate-capture')); + } else { + this.linkMatches = this.linkMatches.set(localid, matches.add(captures)); + callWithSub(localid, linkid, (sub) => { + const oldMatchHolders = sub.addMatch(captures, linkid); + switch (oldMatchHolders.size) { + case 0: + sub.holders.forEach((peerEndpoint, peer) => { + if (peer !== linkid) { + this.turns.get(peer).extend(W.Add(peerEndpoint, captures)); + } + }); + break; + case 1: { + const peer = oldMatchHolders.first(); // only one, guaranteed ≠ linkid + const peerEndpoint = sub.holders.get(peer, false); + if (peerEndpoint) { + this.turns.get(peer).extend(W.Add(peerEndpoint, captures)); + } + break; + } + default: + break; + } + }); + } + } else if (W.Del.isClassOf(item)) { + const localid = W.Del._endpointName(item); + const captures = W.Del._captures(item); + const matches = this.linkMatches.get(localid) || Set(); + if (!matches.includes(captures)) { + err(Symbol.for('nonexistent-capture')); + } else { + const newMatches = matches.remove(captures); + this.linkMatches = (newMatches.isEmpty()) + ? this.linkMatches.remove(localid) + : this.linkMatches.set(localid, newMatches); + removeMatch(localid, captures, linkid); + } + } else if (W.Msg.isClassOf(item)) { + const localid = W.Msg._endpointName(item); + const captures = W.Msg._captures(item); + callWithSub(localid, linkid, (sub) => { + sub.holders.forEach((peerEndpoint, peer) => { + if (peer !== linkid) { + this.turns.get(peer).extend(W.Msg(peerEndpoint, captures)); + } + }); + }); + } else { + debug("Unhandled federation message", item.toString()); + } }); } } diff --git a/packages/server/src/protocol.js b/packages/server/src/protocol.js index 3e84212..ac2eb82 100644 --- a/packages/server/src/protocol.js +++ b/packages/server/src/protocol.js @@ -5,7 +5,7 @@ import { Decoder, Discard, Capture, Observe } from "@syndicate-lang/core"; message type Connect(scope); message type Peer(scope); -message type Commit(); +message type Turn(items); message type Assert(endpointName, assertion); message type Clear(endpointName); @@ -32,7 +32,7 @@ function makeDecoder(initialBuffer) { Object.assign(module.exports, { Connect, Peer, - Commit, + Turn, Assert, Clear, Message, Add, Del, Msg, Err, End, Ping, Pong, diff --git a/packages/server/src/server.js b/packages/server/src/server.js index fe8a23d..bff17c3 100644 --- a/packages/server/src/server.js +++ b/packages/server/src/server.js @@ -5,7 +5,7 @@ const S = activate require("@syndicate-lang/driver-streams-node"); const debugFactory = require('debug'); import { - Set, Bytes, + Map, Bytes, Encoder, Observe, Dataspace, Skeleton, currentFacet, genUuid, } from "@syndicate-lang/core"; @@ -13,7 +13,7 @@ import { const P = activate require("./internal_protocol"); const W = activate require("./protocol"); const B = activate require("./buffer"); -const Turn = activate require("./turn"); +const { recorder } = activate require("./turn"); export function websocketServerFacet(reqId) { assert P.POA(reqId); @@ -66,7 +66,7 @@ spawn named '@syndicate-lang/server/server/POAHandler' { assert P.POAScope(connId, this.scope) when (this.scope !== null); assert P.ServerActive(this.scope) when (this.scope !== null); - let endpoints = Set(); + let endpoints = Map(); on message P.FromPOA(connId, W.Connect($scope)) { // TODO: Enforce requirement that Connect appear exactly once, before anything else @@ -74,62 +74,52 @@ spawn named '@syndicate-lang/server/server/POAHandler' { } const sendToPOA = (m) => { send P.ToPOA(connId, m); }; - const outboundTurn = Turn.recorder(this, 'commitNeeded', - { - extend: sendToPOA, - commit: () => { sendToPOA(W.Commit()); }, - debug: debug - }); - const inboundTurn = Turn.replayer({ debug: debug }); + const outboundTurn = recorder(this, 'commitNeeded', (items) => sendToPOA(W.Turn(items))); - on message P.FromPOA(connId, W.Assert($ep, $a)) inboundTurn.extend(() => { - if (!endpoints.includes(ep)) { - endpoints = endpoints.add(ep); - react { - const epFacet = currentFacet(); + on message P.FromPOA(connId, W.Turn($items)) { + items.forEach((item) => { + if (W.Assert.isClassOf(item)) { + const ep = W.Assert._endpointName(item); + const a = W.Assert._assertion(item); + if (endpoints.has(ep)) { + throw new Error("Attempt to update existing endpoint " + ep + " with " + a.toString()); + } + react { + const epFacet = currentFacet(); + endpoints = endpoints.set(ep, epFacet); + on stop { endpoints = endpoints.remove(ep); } - on stop { endpoints = endpoints.remove(ep); } + assert P.Proposal(this.scope, a); - field this.assertion = a; - assert P.Proposal(this.scope, this.assertion); - - currentFacet().addEndpoint(() => { - if (Observe.isClassOf(this.assertion)) { - const spec = P.Envelope(this.scope, Observe._specification(this.assertion)); - const analysis = Skeleton.analyzeAssertion(spec); - analysis.callback = Dataspace.wrap((evt, vs) => { - currentFacet().actor.scheduleScript(() => { - switch (evt) { - case Skeleton.EVENT_ADDED: outboundTurn.extend(W.Add(ep, vs)); break; - case Skeleton.EVENT_REMOVED: outboundTurn.extend(W.Del(ep, vs)); break; - case Skeleton.EVENT_MESSAGE: { - outboundTurn.commit(); - sendToPOA(W.Msg(ep, vs)); - break; + if (Observe.isClassOf(a)) { + currentFacet().addEndpoint(() => { + const spec = P.Envelope(this.scope, Observe._specification(a)); + const analysis = Skeleton.analyzeAssertion(spec); + analysis.callback = Dataspace.wrap((evt, vs) => { + currentFacet().actor.scheduleScript(() => { + switch (evt) { + case Skeleton.EVENT_ADDED: outboundTurn.extend(W.Add(ep, vs)); break; + case Skeleton.EVENT_REMOVED: outboundTurn.extend(W.Del(ep, vs)); break; + case Skeleton.EVENT_MESSAGE: outboundTurn.extend(W.Msg(ep, vs)); break; } - } + }); }); - }); - return [Observe(spec), analysis]; - } else { - return [void 0, null]; + return [Observe(spec), analysis]; + }, false); } - }, true); - - on message P.FromPOA(connId, W.Assert(ep, $newAssertion)) inboundTurn.extend(() => { - this.assertion = newAssertion; - }); - on message P.FromPOA(connId, W.Clear(ep)) inboundTurn.extend(() => { - epFacet.stop(() => { outboundTurn.extend(W.End(ep)); }); - }); + } + } else if (W.Clear.isClassOf(item)) { + const ep = W.Clear._endpointName(item); + if (!endpoints.has(ep)) { + throw new Error("Attempt to clear nonexistent endpoint " + ep); + } + endpoints.get(ep).stop(() => { outboundTurn.extend(W.End(ep)); }); + } else if (W.Message.isClassOf(item)) { + send P.Proposal(this.scope, W.Message._body(item)); + } else { + debug("Unhandled client/server message", item.toString()); } - } - }); - - on message P.FromPOA(connId, W.Message($body)) { - send P.Proposal(this.scope, body); + }); } - - on message P.FromPOA(connId, W.Commit()) inboundTurn.commit(); } } diff --git a/packages/server/src/turn.js b/packages/server/src/turn.js index 42ab088..db38637 100644 --- a/packages/server/src/turn.js +++ b/packages/server/src/turn.js @@ -3,14 +3,17 @@ import { Dataspace, _Dataspace, currentFacet } from "@syndicate-lang/core"; const PRIORITY = _Dataspace.PRIORITY; -export function recorder(fields, fieldName, callbacks) { +export function recorder(fields, fieldName, onCommit) { + let items = []; + function extend(item) { - callbacks.extend(item); + items.push(item); fields[fieldName] = true; } function commit() { if (fields[fieldName]) { - callbacks.commit(); + onCommit(items); + items = []; fields[fieldName] = false; } } @@ -19,17 +22,3 @@ export function recorder(fields, fieldName, callbacks) { currentFacet().addDataflow(commit, PRIORITY.IDLE); return { extend, commit }; } - -export function replayer(callbacks0) { - const callbacks = callbacks0 || {}; - return { - worklist: [], - extend: function (thunk) { - this.worklist.push(Dataspace.wrap(thunk)); - }, - commit: function () { - this.worklist.forEach((thunk) => thunk()); - this.worklist.length = 0; // clear out the list - } - }; -}