From 867b32b5bec30a1ab7b804f5aad0bb6ba38a5676 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 3 Jun 2019 12:06:04 +0100 Subject: [PATCH] addObserverEndpoint --- packages/core/src/dataspace.js | 33 +++++++++++++++++++++++++++++++ packages/server/src/federation.js | 19 +++++------------- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/packages/core/src/dataspace.js b/packages/core/src/dataspace.js index 54e991d..b94e140 100644 --- a/packages/core/src/dataspace.js +++ b/packages/core/src/dataspace.js @@ -738,6 +738,39 @@ Facet.prototype.addEndpoint = function (updateFun, isDynamic) { return ep; }; +Facet.prototype._addRawObserverEndpoint = function (specThunk, callbacks) { + this.addEndpoint(() => { + const spec = specThunk(); + if (spec === void 0) { + return [void 0, null]; + } else { + const analysis = Skeleton.analyzeAssertion(spec); + analysis.callback = Dataspace.wrap((evt, vs) => { + let cb = null; + switch (evt) { + case Skeleton.EVENT_ADDED: cb = callbacks.add; break; + case Skeleton.EVENT_REMOVED: cb = callbacks.del; break; + case Skeleton.EVENT_MESSAGE: cb = callbacks.msg; break; + } + if (cb) cb(vs); + }); + return [Assertions.Observe(spec), analysis]; + } + }); +}; + +Facet.prototype.addObserverEndpoint = function (specThunk, callbacks) { + const self = this; + function scriptify(f) { + return f && ((vs) => self.actor.scheduleScript(() => f(vs))); + } + this._addRawObserverEndpoint(specThunk, { + add: scriptify(callbacks.add), + del: scriptify(callbacks.del), + msg: scriptify(callbacks.msg), + }); +}; + Facet.prototype.addDataflow = function (subjectFun, priority) { return this.addEndpoint(() => { let subjectId = this.actor.dataspace.dataflow.currentSubjectId; diff --git a/packages/server/src/federation.js b/packages/server/src/federation.js index b16cbae..c3971e2 100644 --- a/packages/server/src/federation.js +++ b/packages/server/src/federation.js @@ -89,20 +89,11 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' { react { on start debug('remoteObs+', spec.toString()); on stop debug('remoteObs-', spec.toString()); - currentFacet().addEndpoint(() => { - const outerSpec = P.Proposal(scope, spec); - const analysis = Skeleton.analyzeAssertion(outerSpec); - analysis.callback = Dataspace.wrap((evt, vs) => { - currentFacet().actor.scheduleScript(() => { - switch (evt) { - case Skeleton.EVENT_ADDED: sendFromPOA(W.Add(ep, vs)); break; - case Skeleton.EVENT_REMOVED: sendFromPOA(W.Del(ep, vs)); break; - case Skeleton.EVENT_MESSAGE: sendFromPOA(W.Msg(ep, vs)); break; - } - }); - }); - return [Observe(outerSpec), analysis]; - }, true); + currentFacet().addObserverEndpoint(() => P.Proposal(scope, spec), { + add: (vs) => sendFromPOA(W.Add(ep, vs)), + del: (vs) => sendFromPOA(W.Del(ep, vs)), + msg: (vs) => sendFromPOA(W.Msg(ep, vs)), + }); assert P.Envelope(scope, Observe(spec)); stop on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Clear(ep))); }