From 8e8a1d9989b6b396d7cd3dc2d0c852291153bcf3 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 12 Dec 2018 17:16:10 +0000 Subject: [PATCH] Steps toward discovery --- packages/broker/package.json | 1 + packages/broker/src/index.js | 56 ++++++++++++++++++- .../syntax-playground/src/avahipublish.js | 47 ++++++++++------ 3 files changed, 85 insertions(+), 19 deletions(-) diff --git a/packages/broker/package.json b/packages/broker/package.json index cb97ec0..5e3cff9 100644 --- a/packages/broker/package.json +++ b/packages/broker/package.json @@ -14,6 +14,7 @@ "@syndicate-lang/core": "^0.0.19", "@syndicate-lang/driver-browser-ui": "^0.0.17", "@syndicate-lang/driver-http-node": "^0.0.16", + "@syndicate-lang/driver-mdns": "^0.0.1", "@syndicate-lang/driver-streams-node": "^0.0.1", "@syndicate-lang/driver-timer": "^0.0.20", "@syndicate-lang/driver-websocket": "^0.0.11", diff --git a/packages/broker/src/index.js b/packages/broker/src/index.js index 48658de..10ad75e 100644 --- a/packages/broker/src/index.js +++ b/packages/broker/src/index.js @@ -6,14 +6,22 @@ const UI = require("@syndicate-lang/driver-browser-ui"); const Http = activate require("@syndicate-lang/driver-http-node"); const S = activate require("@syndicate-lang/driver-streams-node"); +const M = activate require("@syndicate-lang/driver-mdns"); import { Set, Bytes, Encoder, Observe, - Dataspace, Skeleton, currentFacet, + Dataspace, Skeleton, currentFacet, genUuid, RandomID } from "@syndicate-lang/core"; -const server = Http.HttpServer(null, 8000); +const HTTP_PORT = 8000; +const TCP_PORT = 8001; + +const server = Http.HttpServer(null, HTTP_PORT); + +const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root broker +const localId = RandomID.randomId(8, false); +const gatewayId = dataspaceId + ':' + localId; const fs = require('fs'); @@ -68,6 +76,11 @@ spawn named 'rootServer' { } spawn named 'websocketListener' { + assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), + null, HTTP_PORT, ["tier=0", "path=/broker"]); + assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'), + null, HTTP_PORT, ["tier=0", "path=/monitor"]); + during Http.WebSocket($reqId, server, [$scope], _) spawn named ['wsConnection', scope, reqId] { const name = ConnectionName(scope, reqId); assert Connection(name); @@ -82,7 +95,8 @@ spawn named 'websocketListener' { } spawn named 'tcpListener' { - on asserted S.IncomingConnection($id, S.TcpListener(8001)) { + assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, TCP_PORT, ["tier=0"]); + on asserted S.IncomingConnection($id, S.TcpListener(TCP_PORT)) { spawnStreamConnection('tcpBroker', id); } } @@ -166,3 +180,39 @@ spawn named 'connectionHandler' { on message Response(connId, $resp) console.log('OUT:', connId.toString(), resp.toString()); } } + +spawn named 'peerDiscovery' { + // during M.DefaultGateway($gwif, _) { + // on start console.log('GW+', gwif); + // on stop console.log('GW-', gwif); + during M.Discovered( + M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", $gwif) + { + const [dsId, peerId] = name.split(':'); + + let tier = null; + txt.forEach((t) => { + t.split(' ').forEach((kv) => { + const [k, v] = kv.split('='); + if (k === 'tier') { + tier = Number.parseInt(v); + } + }); + }); + + on start console.log('+ws', gwif, tier, name, host, port, addr); + on stop console.log('-ws', gwif, tier, name, host, port, addr); + } + // } + + /* + +If there's a broker on our gateway interface, see if it's better than us. + - if it is, use it. + - if it's not, pretend it isn't there. + +If there's no broker on our gateway interface (or we're pretending +none exists), try to connect to the top. + + */ +} diff --git a/packages/syntax-playground/src/avahipublish.js b/packages/syntax-playground/src/avahipublish.js index 667e298..b192453 100644 --- a/packages/syntax-playground/src/avahipublish.js +++ b/packages/syntax-playground/src/avahipublish.js @@ -3,25 +3,40 @@ const S = activate require("@syndicate-lang/driver-streams-node"); const M = activate require("@syndicate-lang/driver-mdns"); spawn named 'test' { - const svc = M.Service((new Date()).toJSON(), '_syndicate._tcp'); - assert M.Publish(svc, null, 8001, []); + // const svc = M.Service((new Date()).toJSON(), '_syndicate._tcp'); + // assert M.Publish(svc, null, 8001, []); - during M.Discovered(M.Service($name, '_syndicate._tcp'), - $hostName, - $port, - $txtDataRecords, - $address, - "IPv4", - $interfaceName) + // during M.Discovered(M.Service($name, '_syndicate._tcp'), + // $hostName, + // $port, + // $txtData, + // $address, + // "IPv4", + // $interfaceName) + // { + // on start console.log('+', name, hostName, port, txtData, address, interfaceName); + // on stop console.log('-', name, hostName, port, txtData, address, interfaceName); + // } + + field this.count = 0; + dataflow console.log('Broker count:', this.count); + + during M.Discovered(M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", _) { - on start console.log('+', name, hostName, port, txtDataRecords, address, interfaceName); - on stop console.log('-', name, hostName, port, txtDataRecords, address, interfaceName); + on start { this.count++; console.log('+ws', name, host, port, txt.get(0, 'N/A'), addr); } + on stop { this.count--; console.log('-ws', name, host, port, txt.get(0, 'N/A'), addr); } } - during M.Discovered(M.Service($n, $t), $h, $p, $d, $a, "IPv4", $i) { - if (t !== '_syndicate._tcp') { - on start console.log('**', t, n, h, p, d, a, i); - on stop console.log('==', t, n, h, p, d, a, i); - } + during M.Discovered(M.Service($name, '_syndicate._tcp'), $host, $port, _, $addr, "IPv4", _) + { + on start { this.count++; console.log('+tcp', name, host, port, addr); } + on stop { this.count--; console.log('-tcp', name, host, port, addr); } } + + // during M.Discovered(M.Service($n, $t), $h, $p, $d, $a, "IPv4", $i) { + // if (t !== '_syndicate._tcp') { + // on start console.log('**', t, n, h, p, d, a, i); + // on stop console.log('==', t, n, h, p, d, a, i); + // } + // } }