2019-05-23 09:33:59 +00:00
|
|
|
"use strict";
|
|
|
|
|
|
|
|
const Http = activate require("@syndicate-lang/driver-http-node");
|
|
|
|
const S = activate require("@syndicate-lang/driver-streams-node");
|
2019-05-30 22:06:15 +00:00
|
|
|
const debugFactory = require('debug');
|
2019-05-23 09:33:59 +00:00
|
|
|
|
|
|
|
import {
|
2019-06-08 20:10:31 +00:00
|
|
|
Map, Bytes,
|
2019-09-11 14:11:15 +00:00
|
|
|
Observe,
|
2019-05-30 22:06:15 +00:00
|
|
|
Dataspace, Skeleton, currentFacet, genUuid,
|
2019-05-23 09:33:59 +00:00
|
|
|
} from "@syndicate-lang/core";
|
|
|
|
|
|
|
|
const P = activate require("./internal_protocol");
|
|
|
|
const W = activate require("./protocol");
|
2019-05-30 22:06:15 +00:00
|
|
|
const B = activate require("./buffer");
|
2019-06-08 20:10:31 +00:00
|
|
|
const { recorder } = activate require("./turn");
|
2019-06-20 12:35:04 +00:00
|
|
|
const { heartbeat } = activate require("./heartbeat");
|
2019-05-23 09:33:59 +00:00
|
|
|
|
|
|
|
export function websocketServerFacet(reqId) {
|
|
|
|
assert P.POA(reqId);
|
2019-05-30 22:06:15 +00:00
|
|
|
const buf = B.buffer(this, 'chunks');
|
|
|
|
on message Http.DataIn(reqId, $data) buf.push(data);
|
|
|
|
during P.POAReady(reqId) buf.drain((data) => {
|
2019-05-23 09:33:59 +00:00
|
|
|
if (data instanceof Bytes) send P.FromPOA(reqId, W.makeDecoder(data).next());
|
2019-05-30 22:06:15 +00:00
|
|
|
});
|
2019-09-11 14:11:15 +00:00
|
|
|
on message P.ToPOA(reqId, $resp) send Http.DataOut(reqId, W.makeEncoder().push(resp).contents());
|
2019-05-23 09:33:59 +00:00
|
|
|
stop on message P.Disconnect(reqId);
|
2019-05-30 22:06:15 +00:00
|
|
|
stop on retracted P.POAReady(reqId);
|
2019-05-23 09:33:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
export function streamServerFacet(id) {
|
|
|
|
assert P.POA(id);
|
|
|
|
const decoder = W.makeDecoder(null);
|
2019-05-30 22:06:15 +00:00
|
|
|
const buf = B.buffer(this, 'chunks');
|
2019-05-31 12:58:04 +00:00
|
|
|
on message S.Stream(id, S.Data($data)) buf.push(data);
|
2019-06-11 17:48:29 +00:00
|
|
|
during P.POAReady(id) buf.drain((data) => {
|
2019-05-23 09:33:59 +00:00
|
|
|
decoder.write(data);
|
|
|
|
let v;
|
|
|
|
while ((v = decoder.try_next())) send P.FromPOA(id, v);
|
2019-05-30 22:06:15 +00:00
|
|
|
});
|
2019-09-11 14:11:15 +00:00
|
|
|
on message P.ToPOA(id, $resp) send S.Stream(id, S.Push(W.makeEncoder().push(resp).contents(), false));
|
2019-05-23 09:33:59 +00:00
|
|
|
stop on message P.Disconnect(id);
|
2019-05-30 22:06:15 +00:00
|
|
|
stop on retracted P.POAReady(id);
|
2019-05-23 09:33:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
export function streamServerActor(id, debugLabel) {
|
|
|
|
spawn named [debugLabel || 'stream-poa', id] {
|
2019-05-31 12:58:04 +00:00
|
|
|
stop on retracted S.Stream(id, S.Duplex());
|
2019-06-11 17:48:29 +00:00
|
|
|
streamServerFacet.call(this, id);
|
2019-05-23 09:33:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
spawn named '@syndicate-lang/server/server/POAHandler' {
|
|
|
|
during P.Proposal($scope, $assertion) assert P.Envelope(scope, assertion);
|
|
|
|
on message P.Proposal($scope, $assertion) send P.Envelope(scope, assertion);
|
|
|
|
during Observe(P.Envelope($scope, $spec)) assert P.Proposal(scope, Observe(spec));
|
|
|
|
|
|
|
|
during P.POA($connId) spawn named P.POA(connId) {
|
2019-05-30 22:06:15 +00:00
|
|
|
const debug = debugFactory('syndicate/server:server:' + connId.toString());
|
|
|
|
on start debug('+');
|
|
|
|
on stop debug('-');
|
2019-06-20 12:35:04 +00:00
|
|
|
on message P.FromPOA(connId, $m) if (W.shouldDebugPrint(m)) debug('<', m.toString());
|
|
|
|
on message P.ToPOA(connId, $m) if (W.shouldDebugPrint(m)) debug('>', m.toString());
|
2019-05-30 22:06:15 +00:00
|
|
|
|
2019-05-23 09:33:59 +00:00
|
|
|
field this.scope = null;
|
2019-05-30 22:06:15 +00:00
|
|
|
assert P.POAReady(connId);
|
2019-05-23 09:33:59 +00:00
|
|
|
assert P.POAScope(connId, this.scope) when (this.scope !== null);
|
|
|
|
assert P.ServerActive(this.scope) when (this.scope !== null);
|
|
|
|
|
2019-06-08 20:10:31 +00:00
|
|
|
let endpoints = Map();
|
2019-05-23 09:33:59 +00:00
|
|
|
|
|
|
|
on message P.FromPOA(connId, W.Connect($scope)) {
|
|
|
|
// TODO: Enforce requirement that Connect appear exactly once, before anything else
|
|
|
|
this.scope = scope;
|
|
|
|
}
|
|
|
|
|
2019-06-07 14:51:13 +00:00
|
|
|
const sendToPOA = (m) => { send P.ToPOA(connId, m); };
|
2019-06-08 20:10:31 +00:00
|
|
|
const outboundTurn = recorder(this, 'commitNeeded', (items) => sendToPOA(W.Turn(items)));
|
|
|
|
|
2019-06-20 12:35:04 +00:00
|
|
|
const poaFacet = currentFacet();
|
|
|
|
const resetHeartbeat = heartbeat(this, ['server', connId], sendToPOA, () => {poaFacet.stop();});
|
|
|
|
on message P.FromPOA(connId, _) resetHeartbeat();
|
|
|
|
on message P.FromPOA(connId, W.Ping()) sendToPOA(W.Pong());
|
|
|
|
|
2019-06-08 20:10:31 +00:00
|
|
|
on message P.FromPOA(connId, W.Turn($items)) {
|
|
|
|
items.forEach((item) => {
|
|
|
|
if (W.Assert.isClassOf(item)) {
|
|
|
|
const ep = W.Assert._endpointName(item);
|
|
|
|
const a = W.Assert._assertion(item);
|
|
|
|
if (endpoints.has(ep)) {
|
|
|
|
throw new Error("Attempt to update existing endpoint " + ep + " with " + a.toString());
|
|
|
|
}
|
|
|
|
react {
|
|
|
|
const epFacet = currentFacet();
|
|
|
|
endpoints = endpoints.set(ep, epFacet);
|
|
|
|
on stop { endpoints = endpoints.remove(ep); }
|
|
|
|
|
|
|
|
assert P.Proposal(this.scope, a);
|
|
|
|
|
|
|
|
if (Observe.isClassOf(a)) {
|
|
|
|
currentFacet().addEndpoint(() => {
|
|
|
|
const spec = P.Envelope(this.scope, Observe._specification(a));
|
|
|
|
const analysis = Skeleton.analyzeAssertion(spec);
|
|
|
|
analysis.callback = Dataspace.wrap((evt, vs) => {
|
|
|
|
currentFacet().actor.scheduleScript(() => {
|
|
|
|
switch (evt) {
|
|
|
|
case Skeleton.EVENT_ADDED: outboundTurn.extend(W.Add(ep, vs)); break;
|
|
|
|
case Skeleton.EVENT_REMOVED: outboundTurn.extend(W.Del(ep, vs)); break;
|
|
|
|
case Skeleton.EVENT_MESSAGE: outboundTurn.extend(W.Msg(ep, vs)); break;
|
2019-06-07 14:51:13 +00:00
|
|
|
}
|
2019-06-08 20:10:31 +00:00
|
|
|
});
|
2019-05-23 09:33:59 +00:00
|
|
|
});
|
2019-06-08 20:10:31 +00:00
|
|
|
return [Observe(spec), analysis];
|
|
|
|
}, false);
|
2019-05-23 09:33:59 +00:00
|
|
|
}
|
2019-06-08 20:10:31 +00:00
|
|
|
}
|
|
|
|
} else if (W.Clear.isClassOf(item)) {
|
|
|
|
const ep = W.Clear._endpointName(item);
|
|
|
|
if (!endpoints.has(ep)) {
|
|
|
|
throw new Error("Attempt to clear nonexistent endpoint " + ep);
|
|
|
|
}
|
|
|
|
endpoints.get(ep).stop(() => { outboundTurn.extend(W.End(ep)); });
|
|
|
|
} else if (W.Message.isClassOf(item)) {
|
|
|
|
send P.Proposal(this.scope, W.Message._body(item));
|
|
|
|
} else {
|
|
|
|
debug("Unhandled client/server message", item.toString());
|
2019-05-23 09:33:59 +00:00
|
|
|
}
|
2019-06-08 20:10:31 +00:00
|
|
|
});
|
2019-06-07 14:51:13 +00:00
|
|
|
}
|
2019-05-23 09:33:59 +00:00
|
|
|
}
|
|
|
|
}
|