addObserverEndpoint

This commit is contained in:
Tony Garnock-Jones 2019-06-03 12:06:04 +01:00
parent d061d21748
commit 867b32b5be
2 changed files with 38 additions and 14 deletions

View File

@ -738,6 +738,39 @@ Facet.prototype.addEndpoint = function (updateFun, isDynamic) {
return ep;
};
Facet.prototype._addRawObserverEndpoint = function (specThunk, callbacks) {
this.addEndpoint(() => {
const spec = specThunk();
if (spec === void 0) {
return [void 0, null];
} else {
const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => {
let cb = null;
switch (evt) {
case Skeleton.EVENT_ADDED: cb = callbacks.add; break;
case Skeleton.EVENT_REMOVED: cb = callbacks.del; break;
case Skeleton.EVENT_MESSAGE: cb = callbacks.msg; break;
}
if (cb) cb(vs);
});
return [Assertions.Observe(spec), analysis];
}
});
};
Facet.prototype.addObserverEndpoint = function (specThunk, callbacks) {
const self = this;
function scriptify(f) {
return f && ((vs) => self.actor.scheduleScript(() => f(vs)));
}
this._addRawObserverEndpoint(specThunk, {
add: scriptify(callbacks.add),
del: scriptify(callbacks.del),
msg: scriptify(callbacks.msg),
});
};
Facet.prototype.addDataflow = function (subjectFun, priority) {
return this.addEndpoint(() => {
let subjectId = this.actor.dataspace.dataflow.currentSubjectId;

View File

@ -89,20 +89,11 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' {
react {
on start debug('remoteObs+', spec.toString());
on stop debug('remoteObs-', spec.toString());
currentFacet().addEndpoint(() => {
const outerSpec = P.Proposal(scope, spec);
const analysis = Skeleton.analyzeAssertion(outerSpec);
analysis.callback = Dataspace.wrap((evt, vs) => {
currentFacet().actor.scheduleScript(() => {
switch (evt) {
case Skeleton.EVENT_ADDED: sendFromPOA(W.Add(ep, vs)); break;
case Skeleton.EVENT_REMOVED: sendFromPOA(W.Del(ep, vs)); break;
case Skeleton.EVENT_MESSAGE: sendFromPOA(W.Msg(ep, vs)); break;
}
});
});
return [Observe(outerSpec), analysis];
}, true);
currentFacet().addObserverEndpoint(() => P.Proposal(scope, spec), {
add: (vs) => sendFromPOA(W.Add(ep, vs)),
del: (vs) => sendFromPOA(W.Del(ep, vs)),
msg: (vs) => sendFromPOA(W.Msg(ep, vs)),
});
assert P.Envelope(scope, Observe(spec));
stop on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Clear(ep)));
}