"use strict"; const P = activate require("./internal_protocol"); const W = activate require("./protocol"); assertion type ManagementScope(scope) = Symbol.for('federation-management-scope'); module.exports.ManagementScope = ManagementScope; import { Set, Map, Observe, Skeleton, Dataspace, currentFacet, genUuid, } from "@syndicate-lang/core"; spawn named '@syndicate-lang/server/federation/LocalLinkFactory' { during ManagementScope($managementScope) { during P.Envelope(managementScope, P.FederatedLink(_, $scope)) { during P.ServerActive(scope) spawn named ['@syndicate-lang/server/federation/LocalLink', managementScope, scope] { const sessionId = genUuid('localLink'); assert P.Proposal(managementScope, P.FederatedLink(sessionId, scope)); const sendFromPOA = (m) => { send P.Proposal(managementScope, P.FromPOA(sessionId, m)); }; on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Assert($ep, Observe($spec)))) { react { currentFacet().addEndpoint(() => { const outerSpec = P.Proposal(scope, spec); const analysis = Skeleton.analyzeAssertion(outerSpec); analysis.callback = Dataspace.wrap((evt, vs) => { currentFacet().actor.scheduleScript(() => { switch (evt) { case Skeleton.EVENT_ADDED: sendFromPOA(W.Add(ep, vs)); break; case Skeleton.EVENT_REMOVED: sendFromPOA(W.Del(ep, vs)); break; case Skeleton.EVENT_MESSAGE: sendFromPOA(W.Msg(ep, vs)); break; } }); }); return [Observe(outerSpec), analysis]; }, true); assert P.Envelope(scope, Observe(spec)); stop on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Clear(ep))); } } during Observe($pat(P.Envelope(scope, $spec))) { const ep = genUuid('ep'); on start sendFromPOA(W.Assert(ep, Observe(spec))); on stop sendFromPOA(W.Clear(ep)); on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Add(ep, $captures))) { react { assert Skeleton.instantiateAssertion(pat, captures); stop on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Del(ep, captures))); } } on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Msg(ep, $captures))) { send Skeleton.instantiateAssertion(pat, captures); } } } } } } class Subscription { constructor(id, spec, owner) { this.id = id; this.spec = spec; this.holders = Map(); this.matches = Map(); this.owner = owner; this.owner.specs = this.owner.specs.set(spec, id); this.owner.subs = this.owner.subs.set(id, this); } isEmpty() { return this.holders.isEmpty() && this.matches.isEmpty(); } maybeRemove() { if (this.isEmpty()) { this.owner.specs = this.owner.specs.remove(this.spec); this.owner.subs = this.owner.subs.remove(this.id); } } addHolder(linkid, ep) { this.holders = this.holders.set(linkid, ep); } removeHolder(linkid) { this.holders = this.holders.remove(linkid); this.maybeRemove(); } addMatch(captures, linkid) { const oldMatchHolders = this.matches.get(captures) || Set(); const newMatchHolders = oldMatchHolders.add(linkid); this.matches = this.matches.set(captures, newMatchHolders); return oldMatchHolders; } removeMatch(captures, linkid) { const oldMatchHolders = this.matches.get(captures) || Set(); const newMatchHolders = oldMatchHolders.remove(linkid); this.matches = (newMatchHolders.isEmpty()) ? this.matches.remove(captures) : this.matches.set(captures, newMatchHolders); this.maybeRemove(); return newMatchHolders; } } spawn named '@syndicate-lang/server/federation/ScopeFactory' { during ManagementScope($managementScope) { function sendToLink(linkid, m) { send P.Proposal(managementScope, P.ToPOA(linkid, m)); } during P.Envelope(managementScope, P.FederatedLink(_, $scope)) spawn named ['@syndicate-lang/server/federation/Scope', scope] { let nextId = 0; const makeLocalId = () => { nextId++; return nextId; }; field this.peers = Set(); field this.specs = Map(); field this.subs = Map(); const scopeThis = this; const callWithSub = (localid, linkid, f) => { const sub = this.subs.get(localid, false); if (!sub) { console.error("Ignoring mention of nonexistent local ID", localid, linkid); } else { return f(sub); } }; const unsubscribe = (localid, linkid) => { callWithSub(localid, linkid, (sub) => { sub.removeHolder(linkid); switch (sub.holders.size) { case 0: this.peers.forEach((peer) => { if (peer !== linkid) sendToLink(peer, W.Clear(localid)); }); break; case 1: sub.holders.forEach((peerEndpoint, peer) => { // only one, guaranteed ≠ linkid sendToLink(peer, W.Clear(localid)); }); break; default: break; } }); }; const removeMatch = (localid, captures, linkid) => { callWithSub(localid, linkid, (sub) => { const newMatchHolders = sub.removeMatch(captures, linkid); switch (newMatchHolders.size) { case 0: sub.holders.forEach((peerEndpoint, peer) => { if (peer !== linkid) sendToLink(peer, W.Del(peerEndpoint, captures)); }); break; case 1: { const peer = newMatchHolders.first(); // only one, guaranteed ≠ linkid const peerEndpoint = sub.holders.get(peer, false); if (peerEndpoint) sendToLink(peer, W.Del(peerEndpoint, captures)); break; } default: break; } }); }; during P.Envelope(managementScope, P.FederatedLink($linkid, scope)) { field this.linkSubs = Map(); field this.linkMatches = Map(); // const summarise = () => { // console.log('----------------------------------------', scope); // this.peers.forEach((peer) => console.log(' peer', peer)); // this.specs.forEach((localid, spec) => { // console.log(' spec', spec.toString(), localid); // const sub = this.subs.get(localid); // sub.holders.forEach((peerEndpoint, peer) => { // console.log(' sub', peer, peerEndpoint); // }); // sub.matches.forEach((matchHolders, captures) => { // console.log(' match', captures.toString(), matchHolders.toJSON()); // }); // }); // }; const err = (detail) => { sendToLink(linkid, W.Err(detail)); currentFacet().stop(); }; on start { // console.log('+PEER', linkid, scope, this.peers); this.peers = this.peers.add(linkid); this.specs.forEach((localid, spec) => { sendToLink(linkid, W.Assert(localid, Observe(spec))); }); // summarise(); } on stop { // console.log('-PEER', linkid, scope); this.peers = this.peers.remove(linkid); this.linkMatches.forEach((matches, localid) => { matches.forEach((captures) => removeMatch(localid, captures, linkid)); }); this.linkSubs.forEach((localid, _endpointId) => { unsubscribe(localid, linkid); }); // summarise(); } on message P.Envelope(managementScope, P.FromPOA(linkid, W.Assert($ep, Observe($spec)))) { let localid = this.specs.get(spec, null); let sub; if (localid === null) { localid = makeLocalId(); sub = new Subscription(localid, spec, scopeThis); } else { sub = this.subs.get(localid); } const oldHolderCount = sub.holders.size; sub.addHolder(linkid, ep); this.linkSubs = this.linkSubs.set(ep, sub.id); switch (oldHolderCount) { case 0: this.peers.forEach((peer) => { if (peer !== linkid) sendToLink(peer, W.Assert(localid, Observe(spec))); }); break; case 1: sub.holders.forEach((peerEndpoint, peer) => { // now contains 2, one of which is us if (peer !== linkid) sendToLink(peer, W.Assert(localid, Observe(spec))); }); break; default: break; } sub.matches.forEach((matchHolders, captures) => { if (!matchHolders.remove(linkid).isEmpty()) { sendToLink(linkid, W.Add(ep, captures)); } }); // summarise(); } on message P.Envelope(managementScope, P.FromPOA(linkid, W.Clear($ep))) { const localid = this.linkSubs.get(ep, null); if (localid === null) { console.error("Ignoring mention of nonexistent endpoint", ep, linkid); } else { this.linkSubs = this.linkSubs.remove(ep); unsubscribe(localid, linkid); } // summarise(); } on message P.Envelope(managementScope, P.ToPOA(linkid, W.Clear($localid))) { // NB ToPOA, not FromPOA! (this.linkMatches.get(localid) || Set()).forEach((captures) => { removeMatch(localid, captures, linkid); }); this.linkMatches = this.linkMatches.remove(localid); // summarise(); } on message P.Envelope(managementScope, P.FromPOA(linkid, W.Add($localid, $captures))) { const matches = this.linkMatches.get(localid) || Set(); if (matches.includes(captures)) { err(Symbol.for('duplicate-capture')); } else { this.linkMatches = this.linkMatches.set(localid, matches.add(captures)); callWithSub(localid, linkid, (sub) => { const oldMatchHolders = sub.addMatch(captures, linkid); switch (oldMatchHolders.size) { case 0: sub.holders.forEach((peerEndpoint, peer) => { if (peer !== linkid) sendToLink(peer, W.Add(peerEndpoint, captures)); }); break; case 1: { const peer = oldMatchHolders.first(); // only one, guaranteed ≠ linkid const peerEndpoint = sub.holders.get(peer, false); if (peerEndpoint) sendToLink(peer, W.Add(peerEndpoint, captures)); break; } default: break; } }); } // summarise(); } on message P.Envelope(managementScope, P.FromPOA(linkid, W.Del($localid, $captures))) { const matches = this.linkMatches.get(localid) || Set(); if (!matches.includes(captures)) { err(Symbol.for('nonexistent-capture')); } else { const newMatches = matches.remove(captures); this.linkMatches = (newMatches.isEmpty()) ? this.linkMatches.remove(localid) : this.linkMatches.set(localid, newMatches); removeMatch(localid, captures, linkid); } // summarise(); } on message P.Envelope(managementScope, P.FromPOA(linkid, W.Msg($localid, $captures))) { callWithSub(localid, linkid, (sub) => { sub.holders.forEach((peerEndpoint, peer) => { if (peer !== linkid) sendToLink(peer, W.Msg(peerEndpoint, captures)); }); }); } } } } }