Preserve turn boundaries in distribution protocol; loopback client; much improved debug output

This commit is contained in:
Tony Garnock-Jones 2019-05-30 23:06:15 +01:00
parent 9d12ef311c
commit 1d8719f6b1
10 changed files with 282 additions and 112 deletions

View File

@ -27,7 +27,8 @@
"@syndicate-lang/driver-http-node": "^0.1.2", "@syndicate-lang/driver-http-node": "^0.1.2",
"@syndicate-lang/driver-mdns": "^0.1.1", "@syndicate-lang/driver-mdns": "^0.1.1",
"@syndicate-lang/driver-streams-node": "^0.1.1", "@syndicate-lang/driver-streams-node": "^0.1.1",
"@syndicate-lang/driver-websocket": "^0.1.1" "@syndicate-lang/driver-websocket": "^0.1.1",
"debug": "^4.1.1"
}, },
"main": "lib/index.js", "main": "lib/index.js",
"bin": { "bin": {

View File

@ -0,0 +1,20 @@
"use strict";
import { List } from "@syndicate-lang/core";
export function buffer(fields, fieldName) {
field fields[fieldName] = List();
return {
push: function (item) {
fields[fieldName] = fields[fieldName].push(item);
},
drain: function (handler) {
dataflow {
if (!fields[fieldName].isEmpty()) {
fields[fieldName].forEach(handler);
fields[fieldName] = List();
}
}
}
};
}

View File

@ -1,22 +1,28 @@
"use strict"; "use strict";
const debugFactory = require('debug');
import { import {
Decoder, Encoder, Bytes, Decoder, Encoder, Bytes,
Observe, Skeleton, Observe, Skeleton,
genUuid, genUuid, currentFacet,
} from "@syndicate-lang/core"; } from "@syndicate-lang/core";
const WS = activate require("@syndicate-lang/driver-websocket"); const WS = activate require("@syndicate-lang/driver-websocket");
const { const {
Connect, Peer, Connect, Peer,
Commit,
Assert, Clear, Message, Assert, Clear, Message,
Add, Del, Msg, Err, Add, Del, Msg, Err,
Ping, Pong, Ping, Pong,
makeDecoder, makeDecoder,
} = activate require("./protocol"); } = activate require("./protocol");
const P = activate require("./internal_protocol");
const Turn = activate require("./turn");
assertion type WSServer(url, scope) = Symbol.for('server-websocket-connection'); assertion type WSServer(url, scope) = Symbol.for('server-websocket-connection');
assertion type Loopback(scope) = Symbol.for('server-loopback-connection');
assertion type ToServer(addr, assertion); assertion type ToServer(addr, assertion);
assertion type FromServer(addr, assertion); assertion type FromServer(addr, assertion);
@ -27,12 +33,68 @@ message type ForceServerDisconnect(addr);
message type _ServerPacket(addr, packet); message type _ServerPacket(addr, packet);
Object.assign(module.exports, { Object.assign(module.exports, {
WSServer, WSServer, Loopback,
ToServer, FromServer, ToServer, FromServer,
ServerConnection, ServerConnected, ServerConnection, ServerConnected,
ForceServerDisconnect, ForceServerDisconnect,
}); });
export function _genericClientSessionFacet(addr, scope, w0, debug) {
if (debug === void 0) {
debug = debugFactory('syndicate/server:client:' + genUuid('?'));
}
assert ServerConnected(addr);
on start debug('+', addr.toString(), scope);
on stop debug('-', addr.toString(), scope);
on message _ServerPacket(addr, $m) debug('<', m.toString());
const w = (x) => {
debug('>', x.toString());
w0(x);
};
const outboundTurn = Turn.recorder(this, 'commitNeeded',
{
extend: w,
commit: () => { w(Commit()); },
debug: debug
});
const inboundTurn = Turn.replayer({ debug: debug });
on start w(Connect(scope));
during ToServer(addr, $a) {
const ep = genUuid('pub');
on start outboundTurn.extend(Assert(ep, a));
on stop outboundTurn.extend(Clear(ep));
}
on message ToServer(addr, $a) outboundTurn.extend(Message(a));
on message _ServerPacket(addr, Ping()) w(Pong());
during Observe(FromServer(addr, $spec)) {
const ep = genUuid('sub');
on start outboundTurn.extend(Assert(ep, Observe(spec)));
on stop outboundTurn.extend(Clear(ep));
on message _ServerPacket(addr, Add(ep, $vs)) inboundTurn.extend(() => {
react {
const epFacet = currentFacet();
assert Skeleton.instantiateAssertion(FromServer(addr, spec), vs);
on message _ServerPacket(addr, Del(ep, vs)) worklist.push(() => {
epFacet.stop();
});
}
})
on message _ServerPacket(addr, Msg(ep, $vs)) inboundTurn.extend(() => {
send Skeleton.instantiateAssertion(FromServer(addr, spec), vs);
})
on message _ServerPacket(addr, Commit()) inboundTurn.commit();
}
}
spawn named "ServerClientFactory" { spawn named "ServerClientFactory" {
during ToServer($addr, _) assert ServerConnection(addr); during ToServer($addr, _) assert ServerConnection(addr);
during Observe(FromServer($addr, _)) assert ServerConnection(addr); during Observe(FromServer($addr, _)) assert ServerConnection(addr);
@ -40,45 +102,32 @@ spawn named "ServerClientFactory" {
during ServerConnection($addr(WSServer($url, $scope))) spawn named ['Server', addr] { during ServerConnection($addr(WSServer($url, $scope))) spawn named ['Server', addr] {
const wsId = genUuid('server'); const wsId = genUuid('server');
const debug = debugFactory('syndicate/server:client:' + wsId);
during WS.WebSocket(wsId, url, {}) { during WS.WebSocket(wsId, url, {}) {
assert ServerConnected(addr);
function w(x) {
send WS.DataOut(wsId, new Encoder().push(x).contents());
}
on start w(Connect(scope));
on message WS.DataIn(wsId, $data) { on message WS.DataIn(wsId, $data) {
if (data instanceof Bytes) { if (data instanceof Bytes) send _ServerPacket(addr, makeDecoder(data).next());
send _ServerPacket(addr, makeDecoder(data).next());
}
} }
during ToServer(addr, $a) { _genericClientSessionFacet.call(
const ep = genUuid('pub'); this,
on start w(Assert(ep, a)); addr, scope,
on stop w(Clear(ep)); (x) => { send WS.DataOut(wsId, new Encoder().push(x).contents()); },
} debug);
}
}
on message ToServer(addr, $a) w(Message(a)); during ServerConnection($addr(Loopback($scope))) spawn named ['Server', addr] {
const debug = debugFactory('syndicate/server:client:loopback:' + scope);
on message _ServerPacket(addr, Ping()) w(Pong()); assert P.POA(addr);
on message P.ToPOA(addr, $p) send _ServerPacket(addr, p);
during Observe(FromServer(addr, $spec)) { on start react {
const ep = genUuid('sub'); stop on asserted Observe(P.FromPOA(addr, _)) {
on start w(Assert(ep, Observe(spec))); react _genericClientSessionFacet.call(
on stop w(Clear(ep)); this,
on message _ServerPacket(addr, Add(ep, $vs)) { addr, scope,
react { (x) => { send P.FromPOA(addr, x); },
assert Skeleton.instantiateAssertion(FromServer(addr, spec), vs); debug);
stop on message _ServerPacket(addr, Del(ep, vs));
}
}
on message _ServerPacket(addr, Msg(ep, $vs)) {
send Skeleton.instantiateAssertion(FromServer(addr, spec), vs);
}
} }
} }
} }

View File

@ -27,29 +27,31 @@ const Federation = activate require("./federation");
import { import {
Set, Map, Set, Map,
RandomID,
} from "@syndicate-lang/core"; } from "@syndicate-lang/core";
const fs = require('fs'); const fs = require('fs');
const debugFactory = require('debug');
const debug = debugFactory('syndicate/server:disco');
spawn named 'peerAdvertisement' { spawn named 'peerAdvertisement' {
const localId = RandomID.randomId(8, false); during OverlayNode($localId) {
assert OverlayNode(localId); on start debug('Local node ID is', localId);
console.log('Local node ID is', localId);
during Federation.ManagementScope($managementScope) { during Federation.ManagementScope($managementScope) {
during P.Envelope(managementScope, Overlay($overlayId, _)) { during P.Envelope(managementScope, Overlay($overlayId, _)) {
const gatewayId = overlayId + ':' + localId; const gatewayId = overlayId + ':' + localId;
during P.Envelope(managementScope, AvailableTransport(WebSocketTransport($port, $path))) { during P.Envelope(managementScope, AvailableTransport(WebSocketTransport($port, $path))) {
assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, [ assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, [
"path="+path, "path="+path,
"scope="+managementScope "scope="+managementScope
]); ]);
}
// Other variants for later:
// assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, []);
} }
// Other variants for later:
// assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, []);
} }
} }
} }
@ -67,7 +69,7 @@ function txtsToMap(txts) {
spawn named 'peerDiscovery' { spawn named 'peerDiscovery' {
during M.DefaultGateway($gatewayInterface, $gatewayIp) { during M.DefaultGateway($gatewayInterface, $gatewayIp) {
on start console.log('Gateway IP is', gatewayIp, 'on interface', gatewayInterface); on start debug('Gateway IP is', gatewayIp, 'on interface', gatewayInterface);
during M.Discovered(M.Service($name, '_syndicate+ws._tcp'), during M.Discovered(M.Service($name, '_syndicate+ws._tcp'),
_, // hostname _, // hostname
@ -88,11 +90,24 @@ spawn named 'peerDiscovery' {
} }
} }
spawn named 'helpful info output' { spawn named 'syndicate/server:disco:transport' {
console.info('Peer discovery running'); const debug = debugFactory('syndicate/server:disco:transport');
on asserted AvailableTransport($spec) console.info(spec.toString());
}
spawn named 'syndicate/server:disco:mdns' {
const debug = debugFactory('syndicate/server:disco:mdns');
debug('Peer discovery running');
during Peer($overlayId, $nodeId, $ip, $addr) { during Peer($overlayId, $nodeId, $ip, $addr) {
on start console.info("+PEER", ip, overlayId, nodeId, addr.toString()); on start debug("+", ip, overlayId, nodeId, addr.toString());
on stop console.info("-PEER", ip, overlayId, nodeId, addr.toString()); on stop debug("-", ip, overlayId, nodeId, addr.toString());
}
}
spawn named 'federationRoutingInfo' {
during Federation.ManagementScope($managementScope) {
// assert P.Proposal(managementScope, Federation.ManagementScope(managementScope));
during $t(AvailableTransport(_)) assert P.Proposal(managementScope, t);
} }
} }
@ -144,14 +159,17 @@ spawn named 'uplinkSelection' {
} }
dataflow if (this.bestAddr) { dataflow if (this.bestAddr) {
console.log('Selected uplink for overlay', overlayId, 'is', this.bestAddr.toString()); debug('Selected uplink peer for overlay', overlayId, 'is', this.bestPeer.toString(), 'at', this.bestAddr.toString());
} }
assert P.Proposal(managementScope, Federation.Uplink(overlayId, this.bestAddr, overlayId)) assert P.Proposal(managementScope, Federation.Uplink(overlayId, this.bestAddr, overlayId))
when (this.bestAddr); when (this.bestAddr);
assert P.Proposal(overlayId, OverlayLink(OverlayNode(localId), this.bestPeer)) const loopbackAddr = C.Loopback(overlayId);
when (this.bestAddr); during C.ServerConnected(loopbackAddr) {
assert C.ToServer(loopbackAddr, OverlayLink(OverlayNode(localId), this.bestPeer))
when (this.bestAddr);
}
} }
} }
} }

View File

@ -3,6 +3,8 @@
const P = activate require("./internal_protocol"); const P = activate require("./internal_protocol");
const W = activate require("./protocol"); const W = activate require("./protocol");
const C = activate require("./client"); const C = activate require("./client");
const B = activate require("./buffer");
const debugFactory = require('debug');
assertion type ManagementScope(scope) = Symbol.for('federation-management-scope'); assertion type ManagementScope(scope) = Symbol.for('federation-management-scope');
@ -28,39 +30,36 @@ spawn named '@syndicate-lang/server/federation/UplinkFactory' {
{ {
during C.ServerConnected(peerAddr) { during C.ServerConnected(peerAddr) {
const sessionId = genUuid('peer'); const sessionId = genUuid('peer');
const debug = debugFactory('syndicate/server:federation:uplink:' + sessionId);
on start debug('+', peerAddr.toString());
on stop debug('-', peerAddr.toString());
assert P.Proposal(managementScope, UplinkConnected(link)); assert P.Proposal(managementScope, UplinkConnected(link));
assert P.Proposal(managementScope, P.FederatedLink(sessionId, localScope)); assert P.Proposal(managementScope, P.FederatedLink(sessionId, localScope));
assert C.ToServer(peerAddr, P.FederatedLink(sessionId, remoteScope)); assert C.ToServer(peerAddr, P.FederatedLink(sessionId, remoteScope));
field this.pendingIn = List(); const pendingIn = B.buffer(this, 'pendingIn');
field this.pendingOut = List(); const pendingOut = B.buffer(this, 'pendingOut');
on message C.FromServer(peerAddr, P.ToPOA(sessionId, $p)) { on message C.FromServer(peerAddr, P.ToPOA(sessionId, $p)) {
this.pendingIn = this.pendingIn.push(p); pendingIn.push(p);
} }
on message P.Envelope(managementScope, P.ToPOA(sessionId, $p)) { on message P.Envelope(managementScope, P.ToPOA(sessionId, $p)) {
this.pendingOut = this.pendingOut.push(p); pendingOut.push(p);
} }
during P.Envelope(managementScope, Observe(P.FromPOA(sessionId, _))) { during P.Envelope(managementScope, P.FederatedLinkReady(sessionId)) {
during C.FromServer(peerAddr, Observe(P.FromPOA(sessionId, _))) { during C.FromServer(peerAddr, P.FederatedLinkReady(sessionId)) {
dataflow { pendingIn.drain((p) => {
if (!this.pendingIn.isEmpty()) { debug('<', p.toString());
this.pendingIn.forEach((p) => { send P.Proposal(managementScope, P.FromPOA(sessionId, p));
send P.Proposal(managementScope, P.FromPOA(sessionId, p)); });
}); pendingOut.drain((p) => {
this.pendingIn = List(); debug('>', p.toString());
} send C.ToServer(peerAddr, P.FromPOA(sessionId, p));
} });
dataflow {
if (!this.pendingOut.isEmpty()) {
this.pendingOut.forEach((p) => {
send C.ToServer(peerAddr, P.FromPOA(sessionId, p));
});
this.pendingOut = List();
}
}
} }
} }
} }
@ -75,6 +74,11 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' {
spawn named ['@syndicate-lang/server/federation/LocalLink', managementScope, scope] spawn named ['@syndicate-lang/server/federation/LocalLink', managementScope, scope]
{ {
const sessionId = genUuid('localLink'); const sessionId = genUuid('localLink');
const debug = debugFactory('syndicate/server:federation:local:' + scope);
on start debug('+', sessionId);
on stop debug('-', sessionId);
assert P.Proposal(managementScope, P.FederatedLink(sessionId, scope)); assert P.Proposal(managementScope, P.FederatedLink(sessionId, scope));
const sendFromPOA = (m) => { const sendFromPOA = (m) => {
@ -83,6 +87,8 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' {
on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Assert($ep, Observe($spec)))) { on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Assert($ep, Observe($spec)))) {
react { react {
on start debug('remoteObs+', spec.toString());
on stop debug('remoteObs-', spec.toString());
currentFacet().addEndpoint(() => { currentFacet().addEndpoint(() => {
const outerSpec = P.Proposal(scope, spec); const outerSpec = P.Proposal(scope, spec);
const analysis = Skeleton.analyzeAssertion(outerSpec); const analysis = Skeleton.analyzeAssertion(outerSpec);
@ -104,6 +110,8 @@ spawn named '@syndicate-lang/server/federation/LocalLinkFactory' {
during Observe($pat(P.Envelope(scope, $spec))) { during Observe($pat(P.Envelope(scope, $spec))) {
const ep = genUuid('ep'); const ep = genUuid('ep');
on start debug('localObs+', spec.toString(), ep);
on stop debug('localObs-', spec.toString(), ep);
on start sendFromPOA(W.Assert(ep, Observe(spec))); on start sendFromPOA(W.Assert(ep, Observe(spec)));
on stop sendFromPOA(W.Clear(ep)); on stop sendFromPOA(W.Clear(ep));
on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Add(ep, $captures))) { on message P.Envelope(managementScope, P.ToPOA(sessionId, W.Add(ep, $captures))) {
@ -242,6 +250,14 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' {
}; };
during P.Envelope(managementScope, P.FederatedLink($linkid, scope)) { during P.Envelope(managementScope, P.FederatedLink($linkid, scope)) {
const debug = debugFactory('syndicate/server:federation:link:' + linkid);
on start debug('+', scope.toString());
on stop debug('-', scope.toString());
on message P.Envelope(managementScope, P.FromPOA(linkid, $m)) debug('<', m.toString());
on message P.Envelope(managementScope, P.ToPOA(linkid, $m)) debug('>', m.toString());
assert P.Proposal(managementScope, P.FederatedLinkReady(linkid));
field this.linkSubs = Map(); field this.linkSubs = Map();
field this.linkMatches = Map(); field this.linkMatches = Map();
@ -266,8 +282,8 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' {
}; };
on start { on start {
// console.log('+PEER', linkid, scope, this.peers);
this.peers = this.peers.add(linkid); this.peers = this.peers.add(linkid);
// console.log('+PEER', linkid, scope, this.peers);
this.specs.forEach((localid, spec) => { this.specs.forEach((localid, spec) => {
sendToLink(linkid, W.Assert(localid, Observe(spec))); sendToLink(linkid, W.Assert(localid, Observe(spec)));
}); });
@ -275,8 +291,8 @@ spawn named '@syndicate-lang/server/federation/ScopeFactory' {
} }
on stop { on stop {
// console.log('-PEER', linkid, scope);
this.peers = this.peers.remove(linkid); this.peers = this.peers.remove(linkid);
// console.log('-PEER', linkid, scope);
this.linkMatches.forEach((matches, localid) => { this.linkMatches.forEach((matches, localid) => {
matches.forEach((captures) => removeMatch(localid, captures, linkid)); matches.forEach((captures) => removeMatch(localid, captures, linkid));
}); });

View File

@ -13,6 +13,10 @@ const Server = activate require("./server");
const Federation = activate require("./federation"); const Federation = activate require("./federation");
const fs = require('fs'); const fs = require('fs');
import {
RandomID,
} from "@syndicate-lang/core";
let currentManagementScope = 'local'; let currentManagementScope = 'local';
function usage() { function usage() {
@ -91,22 +95,15 @@ spawn named 'server' {
uplinks.forEach((link) => { uplinks.forEach((link) => {
assert P.Proposal(currentManagementScope, link); assert P.Proposal(currentManagementScope, link);
}); });
if (overlays.length > 0) {
const localId = RandomID.randomId(8, false);
assert D.OverlayNode(localId);
}
overlays.forEach((o) => { overlays.forEach((o) => {
assert P.Proposal(currentManagementScope, o); assert P.Proposal(currentManagementScope, o);
}); });
} }
spawn named 'helpful info output' {
on asserted D.AvailableTransport($spec) console.info('Transport:', spec.toString());
}
spawn named 'federationRoutingInfo' {
during Federation.ManagementScope($managementScope) {
// assert P.Proposal(managementScope, Federation.ManagementScope(managementScope));
during $t(D.AvailableTransport(_)) assert P.Proposal(managementScope, t);
}
}
function _spawnStreamServer(spec) { function _spawnStreamServer(spec) {
spawn named spec { spawn named spec {
assert D.AvailableTransport(spec); assert D.AvailableTransport(spec);

View File

@ -3,6 +3,7 @@
assertion type ServerActive(scope) = Symbol.for('server-active'); assertion type ServerActive(scope) = Symbol.for('server-active');
assertion type POA(connId) = Symbol.for('server-poa'); assertion type POA(connId) = Symbol.for('server-poa');
assertion type POAReady(connId) = Symbol.for('server-poa-ready');
message type FromPOA(connId, body) = Symbol.for('message-poa->server'); message type FromPOA(connId, body) = Symbol.for('message-poa->server');
message type ToPOA(connId, body) = Symbol.for('message-server->poa'); message type ToPOA(connId, body) = Symbol.for('message-server->poa');
@ -17,12 +18,13 @@ assertion type POAScope(connId, scope) = Symbol.for('server-poa-scope');
// Federation // Federation
assertion type FederatedLink(id, scope) = Symbol.for('federated-link'); assertion type FederatedLink(id, scope) = Symbol.for('federated-link');
assertion type FederatedLinkReady(id) = Symbol.for('federated-link-ready');
Object.assign(module.exports, { Object.assign(module.exports, {
ServerActive, ServerActive,
POA, FromPOA, ToPOA, POA, POAReady, FromPOA, ToPOA,
Disconnect, Disconnect,
Proposal, Envelope, Proposal, Envelope,
POAScope, POAScope,
FederatedLink, FederatedLink, FederatedLinkReady,
}); });

View File

@ -5,6 +5,8 @@ import { Decoder, Discard, Capture, Observe } from "@syndicate-lang/core";
message type Connect(scope); message type Connect(scope);
message type Peer(scope); message type Peer(scope);
message type Commit();
message type Assert(endpointName, assertion); message type Assert(endpointName, assertion);
message type Clear(endpointName); message type Clear(endpointName);
message type Message(body); message type Message(body);
@ -29,6 +31,7 @@ function makeDecoder(initialBuffer) {
Object.assign(module.exports, { Object.assign(module.exports, {
Connect, Peer, Connect, Peer,
Commit,
Assert, Clear, Message, Assert, Clear, Message,
Add, Del, Msg, Err, Add, Del, Msg, Err,
Ping, Pong, Ping, Pong,

View File

@ -2,37 +2,44 @@
const Http = activate require("@syndicate-lang/driver-http-node"); const Http = activate require("@syndicate-lang/driver-http-node");
const S = activate require("@syndicate-lang/driver-streams-node"); const S = activate require("@syndicate-lang/driver-streams-node");
const debugFactory = require('debug');
import { import {
Set, Bytes, Set, Bytes,
Encoder, Observe, Encoder, Observe,
Dataspace, Skeleton, currentFacet, genUuid, RandomID Dataspace, Skeleton, currentFacet, genUuid,
} from "@syndicate-lang/core"; } from "@syndicate-lang/core";
const P = activate require("./internal_protocol"); const P = activate require("./internal_protocol");
const W = activate require("./protocol"); const W = activate require("./protocol");
const B = activate require("./buffer");
const Turn = activate require("./turn");
export function websocketServerFacet(reqId) { export function websocketServerFacet(reqId) {
assert P.POA(reqId); assert P.POA(reqId);
on message Http.DataIn(reqId, $data) { const buf = B.buffer(this, 'chunks');
on message Http.DataIn(reqId, $data) buf.push(data);
during P.POAReady(reqId) buf.drain((data) => {
if (data instanceof Bytes) send P.FromPOA(reqId, W.makeDecoder(data).next()); if (data instanceof Bytes) send P.FromPOA(reqId, W.makeDecoder(data).next());
} });
on message P.ToPOA(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); on message P.ToPOA(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents());
stop on message P.Disconnect(reqId); stop on message P.Disconnect(reqId);
stop on retracted P.POAScope(reqId, _); stop on retracted P.POAReady(reqId);
} }
export function streamServerFacet(id) { export function streamServerFacet(id) {
assert P.POA(id); assert P.POA(id);
const decoder = W.makeDecoder(null); const decoder = W.makeDecoder(null);
on message S.Data(id, $data) { const buf = B.buffer(this, 'chunks');
on message S.Data(id, $data) buf.push(data);
during P.POAReady(reqId) buf.drain((data) => {
decoder.write(data); decoder.write(data);
let v; let v;
while ((v = decoder.try_next())) send P.FromPOA(id, v); while ((v = decoder.try_next())) send P.FromPOA(id, v);
} });
on message P.ToPOA(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); on message P.ToPOA(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null);
stop on message P.Disconnect(id); stop on message P.Disconnect(id);
stop on retracted P.POAScope(id, _); stop on retracted P.POAReady(id);
} }
export function streamServerActor(id, debugLabel) { export function streamServerActor(id, debugLabel) {
@ -48,7 +55,14 @@ spawn named '@syndicate-lang/server/server/POAHandler' {
during Observe(P.Envelope($scope, $spec)) assert P.Proposal(scope, Observe(spec)); during Observe(P.Envelope($scope, $spec)) assert P.Proposal(scope, Observe(spec));
during P.POA($connId) spawn named P.POA(connId) { during P.POA($connId) spawn named P.POA(connId) {
const debug = debugFactory('syndicate/server:server:' + connId.toString());
on start debug('+');
on stop debug('-');
on message P.FromPOA(connId, $m) debug('<', m.toString());
on message P.ToPOA(connId, $m) debug('>', m.toString());
field this.scope = null; field this.scope = null;
assert P.POAReady(connId);
assert P.POAScope(connId, this.scope) when (this.scope !== null); assert P.POAScope(connId, this.scope) when (this.scope !== null);
assert P.ServerActive(this.scope) when (this.scope !== null); assert P.ServerActive(this.scope) when (this.scope !== null);
@ -59,10 +73,20 @@ spawn named '@syndicate-lang/server/server/POAHandler' {
this.scope = scope; this.scope = scope;
} }
on message P.FromPOA(connId, W.Assert($ep, $a)) { const outboundTurn = Turn.recorder(this, 'commitNeeded',
{
extend: (m) => { send P.ToPOA(connId, m); },
commit: () => { send P.ToPOA(connId, W.Commit()); },
debug: debug
});
const inboundTurn = Turn.replayer({ debug: debug });
on message P.FromPOA(connId, W.Assert($ep, $a)) inboundTurn.extend(() => {
if (!endpoints.includes(ep)) { if (!endpoints.includes(ep)) {
endpoints = endpoints.add(ep); endpoints = endpoints.add(ep);
react { react {
const epFacet = currentFacet();
on stop { endpoints = endpoints.remove(ep); } on stop { endpoints = endpoints.remove(ep); }
field this.assertion = a; field this.assertion = a;
@ -75,9 +99,9 @@ spawn named '@syndicate-lang/server/server/POAHandler' {
analysis.callback = Dataspace.wrap((evt, vs) => { analysis.callback = Dataspace.wrap((evt, vs) => {
currentFacet().actor.scheduleScript(() => { currentFacet().actor.scheduleScript(() => {
switch (evt) { switch (evt) {
case Skeleton.EVENT_ADDED: send P.ToPOA(connId, W.Add(ep, vs)); break; case Skeleton.EVENT_ADDED: outboundTurn.extend(W.Add(ep, vs)); break;
case Skeleton.EVENT_REMOVED: send P.ToPOA(connId, W.Del(ep, vs)); break; case Skeleton.EVENT_REMOVED: outboundTurn.extend(W.Del(ep, vs)); break;
case Skeleton.EVENT_MESSAGE: send P.ToPOA(connId, W.Msg(ep, vs)); break; case Skeleton.EVENT_MESSAGE: outboundTurn.extend(W.Msg(ep, vs)); break;
} }
}); });
}); });
@ -87,14 +111,20 @@ spawn named '@syndicate-lang/server/server/POAHandler' {
} }
}, true); }, true);
on message P.FromPOA(connId, W.Assert(ep, $newAssertion)) this.assertion = newAssertion; on message P.FromPOA(connId, W.Assert(ep, $newAssertion)) inboundTurn.extend(() => {
stop on message P.FromPOA(connId, W.Clear(ep)); this.assertion = newAssertion;
});
on message P.FromPOA(connId, W.Clear(ep)) inboundTurn.extend(() => {
epFacet.stop();
});
} }
} }
} });
on message P.FromPOA(connId, W.Message($body)) { on message P.FromPOA(connId, W.Message($body)) inboundTurn.extend(() => {
send P.Proposal(this.scope, body); send P.Proposal(this.scope, body);
} });
on message P.FromPOA(connId, W.Commit()) inboundTurn.commit();
} }
} }

View File

@ -0,0 +1,34 @@
"use strict";
import { _Dataspace, currentFacet } from "@syndicate-lang/core";
const PRIORITY = _Dataspace.PRIORITY;
export function recorder(fields, fieldName, callbacks) {
field fields[fieldName] = false;
currentFacet().addDataflow(() => {
if (fields[fieldName]) {
callbacks.commit();
fields[fieldName] = false;
}
}, PRIORITY.IDLE);
return {
extend: function (item) {
callbacks.extend(item);
fields[fieldName] = true;
}
};
}
export function replayer(callbacks0) {
const callbacks = callbacks0 || {};
return {
worklist: [],
extend: function (thunk) {
this.worklist.push(thunk);
},
commit: function () {
this.worklist.forEach((thunk) => thunk());
this.worklist.length = 0; // clear out the list
}
};
}