diff --git a/packages/server/src/federation.js b/packages/server/src/federation.js new file mode 100644 index 0000000..b790a5a --- /dev/null +++ b/packages/server/src/federation.js @@ -0,0 +1,341 @@ +"use strict"; + +const P = activate require("./internal_protocol"); +const W = activate require("./protocol"); + +assertion type ManagementScope(scope) = Symbol.for('federation-management-scope'); + +module.exports.ManagementScope = ManagementScope; + +import { + Set, Map, + Observe, + Skeleton, Dataspace, currentFacet, + genUuid, +} from "@syndicate-lang/core"; + +spawn named '@syndicate-lang/server/federation/LocalLinkFactory' { + during ManagementScope($managementScope) { + during P.Envelope(managementScope, P.FederatedLink(_, $scope)) { + during P.ServerActive(scope) + spawn named ['@syndicate-lang/server/federation/LocalLink', managementScope, scope] + { + const sessionId = genUuid('localLink'); + assert P.Proposal(managementScope, P.FederatedLink(sessionId, scope)); + + const sendFromPOA = (m) => { + send P.Proposal(managementScope, P.FromPOA(sessionId, m)); + }; + + on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Assert($ep, Observe($spec)))) { + react { + currentFacet().addEndpoint(() => { + const outerSpec = P.Proposal(scope, spec); + const analysis = Skeleton.analyzeAssertion(outerSpec); + analysis.callback = Dataspace.wrap((evt, vs) => { + currentFacet().actor.scheduleScript(() => { + switch (evt) { + case Skeleton.EVENT_ADDED: sendFromPOA(W.Add(ep, vs)); break; + case Skeleton.EVENT_REMOVED: sendFromPOA(W.Del(ep, vs)); break; + case Skeleton.EVENT_MESSAGE: sendFromPOA(W.Msg(ep, vs)); break; + } + }); + }); + return [Observe(outerSpec), analysis]; + }, true); + assert P.Envelope(scope, Observe(spec)); + stop on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Clear(ep))); + } + } + + during Observe($pat(P.Envelope(scope, $spec))) { + const ep = genUuid('ep'); + on start sendFromPOA(W.Assert(ep, Observe(spec))); + on stop sendFromPOA(W.Clear(ep)); + on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Add(ep, $captures))) { + react { + assert Skeleton.instantiateAssertion(pat, captures); + stop on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Del(ep, captures))); + } + } + on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Msg(ep, $captures))) { + send Skeleton.instantiateAssertion(pat, captures); + } + } + } + } + } +} + +class Subscription { + constructor(id, spec, owner) { + this.id = id; + this.spec = spec; + this.holders = Map(); + this.matches = Map(); + this.owner = owner; + + this.owner.specs = this.owner.specs.set(spec, id); + this.owner.subs = this.owner.subs.set(id, this); + } + + isEmpty() { + return this.holders.isEmpty() && this.matches.isEmpty(); + } + + maybeRemove() { + if (this.isEmpty()) { + this.owner.specs = this.owner.specs.remove(this.spec); + this.owner.subs = this.owner.subs.remove(this.id); + } + } + + addHolder(linkid, ep) { + this.holders = this.holders.set(linkid, ep); + } + + removeHolder(linkid) { + this.holders = this.holders.remove(linkid); + this.maybeRemove(); + } + + addMatch(captures, linkid) { + const oldMatchHolders = this.matches.get(captures) || Set(); + const newMatchHolders = oldMatchHolders.add(linkid); + this.matches = this.matches.set(captures, newMatchHolders); + return oldMatchHolders; + } + + removeMatch(captures, linkid) { + const oldMatchHolders = this.matches.get(captures) || Set(); + const newMatchHolders = oldMatchHolders.remove(linkid); + this.matches = (newMatchHolders.isEmpty()) + ? this.matches.remove(captures) + : this.matches.set(captures, newMatchHolders); + this.maybeRemove(); + return newMatchHolders; + } +} + +spawn named '@syndicate-lang/server/federation/ScopeFactory' { + during ManagementScope($managementScope) { + function sendToLink(linkid, m) { + send P.Proposal(managementScope, P.ToPOA(linkid, m)); + } + + during P.Envelope(managementScope, P.FederatedLink(_, $scope)) + spawn named ['@syndicate-lang/server/federation/Scope', scope] + { + let nextId = 0; + const makeLocalId = () => { + nextId++; + return nextId; + }; + + field this.peers = Set(); + field this.specs = Map(); + field this.subs = Map(); + const scopeThis = this; + + const callWithSub = (localid, linkid, f) => { + const sub = this.subs.get(localid, false); + if (!sub) { + console.error("Ignoring mention of nonexistent local ID", localid, linkid); + } else { + return f(sub); + } + }; + + const unsubscribe = (localid, linkid) => { + callWithSub(localid, linkid, (sub) => { + sub.removeHolder(linkid); + switch (sub.holders.size) { + case 0: + this.peers.forEach((peer) => { + if (peer !== linkid) sendToLink(peer, W.Clear(localid)); + }); + break; + case 1: + sub.holders.forEach((peerEndpoint, peer) => { // only one, guaranteed ≠ linkid + sendToLink(peer, W.Clear(localid)); + }); + break; + default: + break; + } + }); + }; + + const removeMatch = (localid, captures, linkid) => { + callWithSub(localid, linkid, (sub) => { + const newMatchHolders = sub.removeMatch(captures, linkid); + switch (newMatchHolders.size) { + case 0: + sub.holders.forEach((peerEndpoint, peer) => { + if (peer !== linkid) sendToLink(peer, W.Del(peerEndpoint, captures)); + }); + break; + case 1: { + const peer = newMatchHolders.first(); // only one, guaranteed ≠ linkid + const peerEndpoint = sub.holders.get(peer, false); + if (peerEndpoint) sendToLink(peer, W.Del(peerEndpoint, captures)); + break; + } + default: + break; + } + }); + }; + + during P.Envelope(managementScope, P.FederatedLink($linkid, scope)) { + field this.linkSubs = Map(); + field this.linkMatches = Map(); + + // const summarise = () => { + // console.log('----------------------------------------', scope); + // this.peers.forEach((peer) => console.log(' peer', peer)); + // this.specs.forEach((localid, spec) => { + // console.log(' spec', spec.toString(), localid); + // const sub = this.subs.get(localid); + // sub.holders.forEach((peerEndpoint, peer) => { + // console.log(' sub', peer, peerEndpoint); + // }); + // sub.matches.forEach((matchHolders, captures) => { + // console.log(' match', captures.toString(), matchHolders.toJSON()); + // }); + // }); + // }; + + const err = (detail) => { + sendToLink(linkid, W.Err(detail)); + currentFacet().stop(); + }; + + on start { + // console.log('+PEER', linkid, scope, this.peers); + this.peers = this.peers.add(linkid); + this.specs.forEach((localid, spec) => { + sendToLink(linkid, W.Assert(localid, Observe(spec))); + }); + // summarise(); + } + + on stop { + // console.log('-PEER', linkid, scope); + this.peers = this.peers.remove(linkid); + this.linkMatches.forEach((matches, localid) => { + matches.forEach((captures) => removeMatch(localid, captures, linkid)); + }); + this.linkSubs.forEach((localid, _endpointId) => { + unsubscribe(localid, linkid); + }); + // summarise(); + } + + on message P.Envelope(managementScope, P.FromPOA(linkid, W.Assert($ep, Observe($spec)))) { + let localid = this.specs.get(spec, null); + let sub; + if (localid === null) { + localid = makeLocalId(); + sub = new Subscription(localid, spec, scopeThis); + } else { + sub = this.subs.get(localid); + } + const oldHolderCount = sub.holders.size; + sub.addHolder(linkid, ep); + this.linkSubs = this.linkSubs.set(ep, sub.id); + switch (oldHolderCount) { + case 0: + this.peers.forEach((peer) => { + if (peer !== linkid) sendToLink(peer, W.Assert(localid, Observe(spec))); + }); + break; + case 1: + sub.holders.forEach((peerEndpoint, peer) => { // now contains 2, one of which is us + if (peer !== linkid) sendToLink(peer, W.Assert(localid, Observe(spec))); + }); + break; + default: + break; + } + sub.matches.forEach((matchHolders, captures) => { + if (!matchHolders.remove(linkid).isEmpty()) { + sendToLink(linkid, W.Add(ep, captures)); + } + }); + // summarise(); + } + + on message P.Envelope(managementScope, P.FromPOA(linkid, W.Clear($ep))) { + const localid = this.linkSubs.get(ep, null); + if (localid === null) { + console.error("Ignoring mention of nonexistent endpoint", ep, linkid); + } else { + this.linkSubs = this.linkSubs.remove(ep); + unsubscribe(localid, linkid); + } + // summarise(); + } + + on message P.Envelope(managementScope, P.ToPOA(linkid, W.Clear($localid))) { + // NB ToPOA, not FromPOA! + (this.linkMatches.get(localid) || Set()).forEach((captures) => { + removeMatch(localid, captures, linkid); + }); + this.linkMatches = this.linkMatches.remove(localid); + // summarise(); + } + + on message P.Envelope(managementScope, P.FromPOA(linkid, W.Add($localid, $captures))) { + const matches = this.linkMatches.get(localid) || Set(); + if (matches.includes(captures)) { + err(Symbol.for('duplicate-capture')); + } else { + this.linkMatches = this.linkMatches.set(localid, matches.add(captures)); + callWithSub(localid, linkid, (sub) => { + const oldMatchHolders = sub.addMatch(captures, linkid); + switch (oldMatchHolders.size) { + case 0: + sub.holders.forEach((peerEndpoint, peer) => { + if (peer !== linkid) sendToLink(peer, W.Add(peerEndpoint, captures)); + }); + break; + case 1: { + const peer = oldMatchHolders.first(); // only one, guaranteed ≠ linkid + const peerEndpoint = sub.holders.get(peer, false); + if (peerEndpoint) sendToLink(peer, W.Add(peerEndpoint, captures)); + break; + } + default: + break; + } + }); + } + // summarise(); + } + + on message P.Envelope(managementScope, P.FromPOA(linkid, W.Del($localid, $captures))) { + const matches = this.linkMatches.get(localid) || Set(); + if (!matches.includes(captures)) { + err(Symbol.for('nonexistent-capture')); + } else { + const newMatches = matches.remove(captures); + this.linkMatches = (newMatches.isEmpty()) + ? this.linkMatches.remove(localid) + : this.linkMatches.set(localid, newMatches); + removeMatch(localid, captures, linkid); + } + // summarise(); + } + + on message P.Envelope(managementScope, P.FromPOA(linkid, W.Msg($localid, $captures))) { + callWithSub(localid, linkid, (sub) => { + sub.holders.forEach((peerEndpoint, peer) => { + if (peer !== linkid) sendToLink(peer, W.Msg(peerEndpoint, captures)); + }); + }); + } + } + } + } +} diff --git a/packages/server/src/index.js b/packages/server/src/index.js index e4e95f0..af044b5 100644 --- a/packages/server/src/index.js +++ b/packages/server/src/index.js @@ -9,6 +9,7 @@ const S = activate require("@syndicate-lang/driver-streams-node"); const M = activate require("@syndicate-lang/driver-mdns"); const P = activate require("./internal_protocol"); const Server = activate require("./server"); +const Federation = activate require("./federation"); import { Set, Bytes, @@ -37,6 +38,8 @@ spawn named 'serverLogger' { } spawn named 'rootServer' { + assert Federation.ManagementScope('local'); + during Http.Request($reqId, server, 'get', [], _, _) { assert :snapshot Http.Response( reqId, 200, "OK", {"Content-type": "text/html"}, diff --git a/packages/server/src/internal_protocol.js b/packages/server/src/internal_protocol.js index e3cc744..25dfd44 100644 --- a/packages/server/src/internal_protocol.js +++ b/packages/server/src/internal_protocol.js @@ -15,10 +15,14 @@ assertion type Envelope(scope, body) = Symbol.for('server-envelope'); // Monitoring assertion type POAScope(connId, scope) = Symbol.for('server-poa-scope'); +// Federation +assertion type FederatedLink(id, scope) = Symbol.for('federated-link'); + Object.assign(module.exports, { ServerActive, POA, FromPOA, ToPOA, Disconnect, Proposal, Envelope, POAScope, + FederatedLink, });