diff --git a/packages/driver-mdns/package.json b/packages/driver-mdns/package.json index 0e5f450..990400f 100644 --- a/packages/driver-mdns/package.json +++ b/packages/driver-mdns/package.json @@ -20,6 +20,8 @@ }, "dependencies": { "@syndicate-lang/core": "^0.0.19", - "@syndicate-lang/driver-streams-node": "^0.0.1" + "@syndicate-lang/driver-streams-node": "^0.0.1", + "@syndicate-lang/driver-timer": "^0.0.20", + "bonjour": "^3.5.0" } } diff --git a/packages/driver-mdns/src/index.js b/packages/driver-mdns/src/index.js index 92f8cfd..ffdfef6 100644 --- a/packages/driver-mdns/src/index.js +++ b/packages/driver-mdns/src/index.js @@ -18,16 +18,17 @@ // TODO: support other than the default (usually ".local") domain. -const { Observe, currentFacet, genUuid } = require("@syndicate-lang/core"); +const { Observe, currentFacet, genUuid, Dataspace } = require("@syndicate-lang/core"); const S = activate require("@syndicate-lang/driver-streams-node"); +const { TimeLaterThan } = activate require("@syndicate-lang/driver-timer"); +const bonjour = require('bonjour')(); assertion type Service(name, serviceType) = Symbol.for("mdns-service"); -assertion type Publish(svc, hostName, port, txtDataRecords) = Symbol.for("mdns-publish"); -assertion type Published(svc, hostName, port, txtDataRecords) = Symbol.for("mdns-published"); -assertion type Discovered(svc, hostName, port, txtDataRecords, address, family, interfaceName) = Symbol.for("mdns-discovered"); +assertion type Publish(svc, hostName, port, txtData) = Symbol.for("mdns-publish"); +assertion type Published(svc, hostName, port, txtData) = Symbol.for("mdns-published"); +assertion type Discovered(svc, hostName, port, txtData, address) = Symbol.for("mdns-discovered"); +// ^ Avahi gave us "family" and "interfaceName", too, but `bonjour` doesn't. -// TODO: nested dataspace to scope these?? -message type BrowserInput(subprocessId, fields) = Symbol.for("-mdns-browser-input"); assertion type WildcardBrowserActive() = Symbol.for("-mdns-wildcard-browser-active"); export { @@ -37,137 +38,84 @@ export { Discovered, }; -function unescapeLabel(str) { - // Per avahi's avahi-common/domain.c's avahi_escape_label: - return str.replace( - /\\(\d\d\d|\.|\\)/g, // that is, \NNN in decimal or \. or \\ - function (x) { - if (x.length === 4) return String.fromCharCode(Number(x.slice(1))); - // else x.length === 2 - return x[1]; - }); +function parseServiceType(serviceType) { + const splitType = /_([^_]+)\._([^_]+)/.exec(serviceType); + if (!splitType) { + throw new Error('Cannot parse serviceType + ' + JSON.stringify(serviceType)) + } + return [splitType[1], splitType[2]]; } -spawn named 'driver/avahi-publish' { - during Observe(Published($svc, $hostName, $port, $txtDataRecords)) { - assert Publish(svc, hostName, port, txtDataRecords); +spawn named 'driver/mdns-publish' { + during Observe(Published($svc, $hostName, $port, $txtData)) { + assert Publish(svc, hostName, port, txtData); } - during Publish($svc(Service($name, $serviceType)), $hostName, $port, $txtDataRecords) { - const topFacet = currentFacet(); + // TODO: The `bonjour` module is limited to publishing a single TXT + // record with key-value strings. It'd be nice to support the full + // range of TXT content allowed by the mDNS and DNS RFCs, including + // non-key-value-formatted data and multiple TXT records. + during Publish($svc(Service($name, $serviceType)), $hostName, $port, $txtData) + spawn named ['bonjour', svc] { + const options = {}; + options.name = name; + if (hostName !== null) options.host = hostName; + options.port = port; + [options.type, options.protocol] = parseServiceType(serviceType); + options.txt = txtData.toJS(); + console.log('publish', options); + const service = bonjour.publish(options); + on stop service.stop(); - const args = ['-f', '-s']; - if (hostName !== null) args.push('-H', hostName); - args.push(name, serviceType, port.toString()); - txtDataRecords.forEach((txt) => args.push(txt)); + field this.established = false; + assert Published(svc, hostName, port, txtData) when (this.established); - const id = genUuid('avahi-publish'); - assert S.Subprocess(id, 'avahi-publish', args, {stdio: ['ignore', 'ignore', 'pipe']}); - stop on message S.SubprocessError(id, $err) { - console.error("Couldn't start avahi-publish", err); - } - stop on asserted S.SubprocessExit(id, $code, _) { - console.error("Subprocess avahi-publish terminated with code", code); - } - - on asserted S.SubprocessRunning(id, _, [_, _, $stderr]) { - react { - field this.established = false; - assert Published(svc, hostName, port, txtDataRecords) when (this.established); - - on retracted S.Readable(stderr) topFacet.stop(); - - on message S.Line(stderr, $line) { - line = line.toString('utf-8'); - if (line.startsWith('Established')) { - this.established = true; - } else if (line.startsWith('Disconnected')) { - this.established = false; - } else if (line.startsWith('Got SIG')) { - // e.g. "Got SIGTERM, quitting."; ignore. - } else { - console.log('avahi-publish', name+':', line); - } - } - } - } + service.on('up', Dataspace.wrapExternal(() => { this.established = true; })); + service.on('error', Dataspace.wrapExternal((err) => { throw err; })); } } -spawn named 'driver/avahi-browse' { - during Observe(Discovered(Service(_, $serviceType), _, _, _, _, _, _)) { - const topFacet = currentFacet(); - - const args = ['-f', '-r', '-k', '-p']; +spawn named 'driver/mdns-browse' { + during Observe(Discovered(Service(_, $serviceType), _, _, _, _)) { + const options = {}; if (typeof serviceType === 'string') { - args.push(serviceType); - } else { - args.push('-a'); + [options.type, options.protocol] = parseServiceType(serviceType); } + const browser = bonjour.find(options); + on stop browser.stop(); + + // field this.nextUpdate = +(new Date()); + // on asserted TimeLaterThan(this.nextUpdate) { + // console.log('updating'); + // browser.update(); + // } + + const eachRecord = (service, cb) => { + const svc = Service(service.name, '_' + service.type + '._' + service.protocol); + for (const addr of service.addresses) { + cb(Discovered(svc, service.host, service.port, service.txt, addr)); + } + // const now = +(new Date()); + // if (this.nextUpdate < now) { + // this.nextUpdate = now + 200; + // } + }; + + browser.on('up', Dataspace.wrapExternal((service) => { + eachRecord(service, (a) => { + currentFacet().actor.adhocAssert(a) + }); + })); + browser.on('down', Dataspace.wrapExternal((service) => { + eachRecord(service, (a) => { + currentFacet().actor.adhocRetract(a) + }); + })); if (typeof serviceType !== 'string') { assert WildcardBrowserActive(); } else { stop on asserted WildcardBrowserActive(); } - - const id = genUuid('avahi-browse'); - assert S.Subprocess(id, 'avahi-browse', args, {stdio: ['ignore', 'pipe', 'ignore']}); - stop on message S.SubprocessError(id, $err) { - console.error("Couldn't start avahi-browse", err); - } - stop on asserted S.SubprocessExit(id, $code, _) { - if (code !== 0) { - console.error("Subprocess avahi-browse terminated with code", code); - } - } - - on asserted S.SubprocessRunning(id, _, [_, $stdout, _]) { - react { - on retracted S.Readable(stdout) topFacet.stop(); - on message S.Line(stdout, $line) { - // Parsing of TXT record data (appearing after the port - // number in an '=' record) is unreliable given the way - // avahi-browse formats it. - // - // See https://github.com/lathiat/avahi/pull/206. - // - // However, it's still useful to have, so we do our best! - // - const pieces = line.toString('utf-8').split(/;/); - if (pieces[0] === '=') { - // A resolved address record, which has TXT data. - const normalFields = pieces.slice(0, 9); - const txtFields = pieces.slice(9).join(';'); // it's these that are dodgy - if (txtFields === '') { - normalFields.push([]); - } else { - normalFields.push(txtFields.slice(1,-1).split(/" "/)); // OMG this is vile - } - send BrowserInput(id, normalFields); - } else { - // Something else. - send BrowserInput(id, pieces); - } - } - - on message BrowserInput(id, ["+", $interfaceName, $family, $name, $serviceType, $domain]) { - react { - const svc = Service(unescapeLabel(name), serviceType); - stop on message BrowserInput( - id, ["-", interfaceName, family, name, serviceType, domain]); - on message BrowserInput( - id, ['=', interfaceName, family, name, serviceType, domain, - $hostName, $address, $portStr, $txtDataRecords]) - { - const port0 = Number(portStr); - const port = Number.isNaN(port0) ? null : port0; - react assert Discovered( - svc, hostName, port, txtDataRecords, address, family, interfaceName); - } - } - } - } - } } } diff --git a/packages/syntax-playground/src/avahipublish.js b/packages/syntax-playground/src/avahipublish.js index 667e298..f8c3acc 100644 --- a/packages/syntax-playground/src/avahipublish.js +++ b/packages/syntax-playground/src/avahipublish.js @@ -3,25 +3,30 @@ const S = activate require("@syndicate-lang/driver-streams-node"); const M = activate require("@syndicate-lang/driver-mdns"); spawn named 'test' { - const svc = M.Service((new Date()).toJSON(), '_syndicate._tcp'); - assert M.Publish(svc, null, 8001, []); + const label = (new Date()).toJSON().replace(/\./g, '-'); + // ^ I declare defeat. None of the underlying DNS and mDNS support + // libraries support fully-general DNS labels. In particular, labels + // with dots in them cause trouble. I tried to fix the underlying + // problem, but the changes required are just too much. So instead, + // I'm avoiding a perfectly reasonable use of DNS. This is why we + // can't have nice things. + const svc = M.Service(label, '_syndicate._tcp'); + assert M.Publish(svc, null, 8001, {}); during M.Discovered(M.Service($name, '_syndicate._tcp'), $hostName, $port, $txtDataRecords, - $address, - "IPv4", - $interfaceName) + $address) { - on start console.log('+', name, hostName, port, txtDataRecords, address, interfaceName); - on stop console.log('-', name, hostName, port, txtDataRecords, address, interfaceName); + on start console.log('+', name, hostName, port, txtDataRecords, address); + on stop console.log('-', name, hostName, port, txtDataRecords, address); } - during M.Discovered(M.Service($n, $t), $h, $p, $d, $a, "IPv4", $i) { + during M.Discovered(M.Service($n, $t), $h, $p, $d, $a) { if (t !== '_syndicate._tcp') { - on start console.log('**', t, n, h, p, d, a, i); - on stop console.log('==', t, n, h, p, d, a, i); + on start console.log('**', t, n, h, p, d, a); + on stop console.log('==', t, n, h, p, d, a); } } }