diff --git a/packages/service/src.ts/example.ts b/packages/service/src.ts/example.ts deleted file mode 100644 index ba1f65a..0000000 --- a/packages/service/src.ts/example.ts +++ /dev/null @@ -1,11 +0,0 @@ -import * as __SYNDICATE__ from "@syndicate-lang/core"; -/// SPDX-License-Identifier: GPL-3.0-or-later -/// SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones - -import { stringify } from '@syndicate-lang/core'; -import { service } from './index.js'; - -service(args => { - console.log('+', stringify(args)); - __SYNDICATE__.Turn.activeFacet.onStop(() => { console.log('-', stringify(args)); }); -}); diff --git a/packages/service/src.ts/index.ts b/packages/service/src.ts/index.ts deleted file mode 100644 index 5309aa5..0000000 --- a/packages/service/src.ts/index.ts +++ /dev/null @@ -1,32 +0,0 @@ -import * as __SYNDICATE__ from "@syndicate-lang/core"; -/// SPDX-License-Identifier: GPL-3.0-or-later -/// SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones - -import { Actor, AnyValue, Relay, Turn, assertionFacetObserver } from "@syndicate-lang/core"; -import * as process from 'process'; - -export interface ServiceOptions { - redirectConsole?: boolean; - debug?: boolean; - trustPeer?: boolean; -} - -export function service(handler: (args: AnyValue) => void, options?: ServiceOptions): Actor { - if (options?.redirectConsole ?? true) { - console.info = console.log = console.warn; - } - return Actor.boot(() => { - const facet = Turn.activeFacet; - facet.preventInertCheck(); - facet.actor.atExit(() => process.exit((facet.actor.exitReason?.ok ?? false) ? 0 : 1)); - new Relay.Relay(Object.assign({ - packetWriter: (bs: Uint8Array) => process.stdout.write(bs), - setup(r: Relay.Relay) { - process.stdin.on('data', bs => facet.turn(() => r.accept(new Uint8Array(bs)))); - process.stdin.on('close', () => facet.turn(() => { __SYNDICATE__.Turn.active._stop(__SYNDICATE__.Turn.activeFacet, () => {}); })); - process.stdin.on('end', () => facet.turn(() => { __SYNDICATE__.Turn.active._stop(__SYNDICATE__.Turn.activeFacet, () => {}); })); - }, - initialRef: Turn.ref(assertionFacetObserver(handler)), - }, options ?? {})); - }); -} diff --git a/packages/ws-relay/src.ts/index.ts b/packages/ws-relay/src.ts/index.ts deleted file mode 100644 index 8a19d86..0000000 --- a/packages/ws-relay/src.ts/index.ts +++ /dev/null @@ -1,435 +0,0 @@ -import * as __SYNDICATE__ from "@syndicate-lang/core"; -/// SPDX-License-Identifier: GPL-3.0-or-later -/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones - -import { - Assertion, - Bytes, - Dataflow, - Embedded, - IdentitySet, - Observe, - QuasiValue as Q, - Ref, - Relay, - Schemas, - Supervisor, - Turn, - assertionFacetObserver, - canonicalEncode, - fromJS, - isEmbedded, - stringify, - underlying, -} from "@syndicate-lang/core"; -import G = Schemas.gatekeeper; -import S = Schemas.sturdy; -import N = Schemas.noise; -import T = Schemas.transportAddress; -import * as SaltyCrypto from 'salty-crypto'; - -type TransportState = { - addr: T.WebSocket, - control: Ref, - peer: Ref, -}; - -export function boot(ds: Ref, debug: boolean = false) { - __SYNDICATE__.Turn.active._spawn(() => { __SYNDICATE__.Turn.activeFacet.actor.name = 'transportConnector'; - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: __SYNDICATE__.Observe({ - pattern: __SYNDICATE__.QuasiValue.finish((__SYNDICATE__.QuasiValue.ctor(Observe, (__SYNDICATE__.QuasiValue.dict(["pattern", (__SYNDICATE__.QuasiValue.quote((__SYNDICATE__.QuasiValue.ctor(G.TransportConnection, (__SYNDICATE__.QuasiValue.dict(["addr", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))))], ["control", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue._)))], ["resolved", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue._)))]))))))]))))), - observer: __SYNDICATE__.Turn.ref(__SYNDICATE__.assertionFacetObserver( - (__vs: __SYNDICATE__.AnyValue) => { - if (Array.isArray(__vs)) { -const addrPatValue = __vs[0]; - - const addr = Q.drop_lit(addrPatValue, T.toWebSocket); - if (!addr) return; - let counter =0; - Supervisor.always(() => ['transportConnector', fromJS(addr), counter++], () => { - console.log('connecting', addr.url, counter); - connectTo(addr); - }); - - } - } - )) - }) })); - })(ds)); - }); - - function connectTo(addr: T.WebSocket) { - const facet = Turn.activeFacet; - facet.preventInertCheck(); - const controlEntity = { - message(a0: Assertion): void { - const a = G.toTransportControl(a0); - if (!a) return; - __SYNDICATE__.Turn.active._stop(__SYNDICATE__.Turn.activeFacet, () => {}); // ForceDisconnect - }, - }; - let final = false; - function succeed(ws: WebSocket) { - if (final) return; - final = true; - console.log('opened', ws); - __SYNDICATE__.Turn.activeFacet.onStop(() => { - console.log('closing', ws); - ws.close(); - }); - ws.onclose = () => facet.turn(() => { __SYNDICATE__.Turn.active._stop(__SYNDICATE__.Turn.activeFacet, () => {}); }); - ws.onerror = () => facet.turn(() => - Turn.active.crash(new Error("WebSocket error"))); - const relay = new Relay.Relay({ - debug, - trustPeer: true, - packetWriter: bs => ws.send(bs), - setup(r: Relay.Relay) { - ws.onmessage = e => facet.turn(() => - r.accept(new Uint8Array(e.data))); - }, - initialOid: 0, - }); - console.log('succeed', addr.url); - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: G.TransportConnection({ - "addr": fromJS(addr), - "control": __SYNDICATE__.Turn.ref(controlEntity), - "resolved": G.Resolved.accepted(relay.peer!), - }) })); - })(ds)); - } - function fail(detail: Assertion) { - if (final) return; - final = true; - console.log('fail', addr.url, detail); - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: G.TransportConnection({ - "addr": fromJS(addr), - "control": __SYNDICATE__.Turn.ref(controlEntity), - "resolved": G.Resolved.Rejected(G.Rejected(detail)), - }) })); - })(ds)); - setTimeout(() => facet.turn(() => { __SYNDICATE__.Turn.active._stop(__SYNDICATE__.Turn.activeFacet, () => {}); }), 10000); - } - try { - const ws = new WebSocket(addr.url); - ws.binaryType = 'arraybuffer'; - ws.onopen = () => facet.turn(() => succeed(ws)); - ws.onclose = () => facet.turn(() => fail(Symbol.for('closed'))); - ws.onerror = () => facet.turn(() => fail(Symbol.for('websocket-error-event'))); - } catch (e) { - console.error('Failed opening websocket', addr.url, e); - fail(Symbol.for('websocket-exception')); - } - } - - __SYNDICATE__.Turn.active._spawn(() => { __SYNDICATE__.Turn.activeFacet.actor.name = 'pathResolver'; - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: __SYNDICATE__.Observe({ - pattern: __SYNDICATE__.QuasiValue.finish((__SYNDICATE__.QuasiValue.ctor(Observe, (__SYNDICATE__.QuasiValue.dict(["pattern", (__SYNDICATE__.QuasiValue.quote((__SYNDICATE__.QuasiValue.ctor(G.ResolvePath, (__SYNDICATE__.QuasiValue.dict(["route", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))))], ["addr", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue._)))], ["control", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue._)))], ["resolved", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue._)))]))))))]))))), - observer: __SYNDICATE__.Turn.ref(__SYNDICATE__.assertionFacetObserver( - (__vs: __SYNDICATE__.AnyValue) => { - if (Array.isArray(__vs)) { -const routePatValue = __vs[0]; - - const route = Q.drop_lit(routePatValue, G.toRoute); - if (!route) return; - const candidates = __SYNDICATE__.Turn.active.field>(new IdentitySet(), "candidates"); - route.transports.forEach(t => { - const addr = T.toWebSocket(t); - if (!addr) return; - console.log('tracking', addr.url); - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: __SYNDICATE__.Observe({ - pattern: __SYNDICATE__.QuasiValue.finish((__SYNDICATE__.QuasiValue.ctor(G.TransportConnection, (__SYNDICATE__.QuasiValue.dict(["addr", (__SYNDICATE__.QuasiValue.lit(__SYNDICATE__.fromJS(addr)))], ["control", (__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))], ["resolved", (__SYNDICATE__.QuasiValue.ctor(G.Resolved.accepted, (__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))))]))))), - observer: __SYNDICATE__.Turn.ref(__SYNDICATE__.assertionFacetObserver( - (__vs: __SYNDICATE__.AnyValue) => { - if (Array.isArray(__vs)) { -const __v_0 = Embedded.__from_preserve__(__vs[0]); - if (__v_0 === void 0) return; - const control_e = __v_0; -const __v_1 = Embedded.__from_preserve__(__vs[1]); - if (__v_1 === void 0) return; - const peer_e = __v_1; - - const entry = { - addr, - control: control_e.embeddedValue, - peer: peer_e.embeddedValue, - }; - candidates.value.add(entry); - candidates.changed(); - __SYNDICATE__.Turn.activeFacet.onStop(() => { - candidates.value.delete(entry); - candidates.changed(); - }); - - } - } - )) - }) })); - }); - const best = __SYNDICATE__.Turn.active.field(null, "best"); - const rootPeer = __SYNDICATE__.Turn.active.field(null, "rootPeer"); - __SYNDICATE__.Turn.active._dataflow(() => { - best.value = null; - for (const c of candidates.value) { - best.value = c; - break; - } - rootPeer.value = best.value?.peer ?? null; - }); - resolve(() => rootPeer.value, route.pathSteps, (r) => { - console.log('leaf', best.value?.addr?.url); - __SYNDICATE__.Turn.active.assertDataflow(() => (r()) - ? ({ target: currentSyndicateTarget, assertion: G.ResolvePath({ - "route": route, - "addr": fromJS(best.value!.addr), - "control": best.value!.control, - "resolved": r()! - }) }) - : ({ target: void 0, assertion: void 0 })); - }); - - } - } - )) - }) })); - })(ds)); - }); - - function resolve( - e: () => Ref | null, - steps: G.PathStep[], - k: (r: () => G.Resolved | null) => void, - ) { - if (steps.length === 0) { - k(() => { - const peer = e(); - return peer === null ? null : G.Resolved.accepted(peer); - }); - } else { - const [step, ...more] = steps; - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => (e()) - ? ({ target: currentSyndicateTarget, assertion: __SYNDICATE__.Observe({ - pattern: __SYNDICATE__.QuasiValue.finish((__SYNDICATE__.QuasiValue.ctor(G.ResolvedPathStep, (__SYNDICATE__.QuasiValue.dict(["origin", (__SYNDICATE__.QuasiValue.lit(__SYNDICATE__.fromJS((e()!))))], ["pathStep", (__SYNDICATE__.QuasiValue.lit(__SYNDICATE__.fromJS(step)))], ["resolved", (__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))]))))), - observer: __SYNDICATE__.Turn.ref(__SYNDICATE__.assertionFacetObserver( - (__vs: __SYNDICATE__.AnyValue) => { - if (Array.isArray(__vs)) { -const __v_0 = G.Resolved.__from_preserve__(__vs[0]); - if (__v_0 === void 0) return; - const resolved = __v_0; - - switch (resolved._variant) { - case "accepted": - resolve(() => resolved.responderSession, more, k); - break; - case "Rejected": - k(() => resolved); - break; - } - - } - } - )) - }) }) - : ({ target: void 0, assertion: void 0 })); - })(ds)); - } - } - - __SYNDICATE__.Turn.active._spawn(() => { __SYNDICATE__.Turn.activeFacet.actor.name = 'noiseStep'; - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: __SYNDICATE__.Observe({ - pattern: __SYNDICATE__.QuasiValue.finish((__SYNDICATE__.QuasiValue.ctor(Observe, (__SYNDICATE__.QuasiValue.dict(["pattern", (__SYNDICATE__.QuasiValue.quote((__SYNDICATE__.QuasiValue.ctor(G.ResolvedPathStep, (__SYNDICATE__.QuasiValue.dict(["origin", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))))], ["pathStep", (__SYNDICATE__.QuasiValue.ctor(G.PathStep, (__SYNDICATE__.QuasiValue.dict(["stepType", (__SYNDICATE__.QuasiValue.lit(__SYNDICATE__.fromJS(N.$noise)))], ["detail", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))))]))))], ["resolved", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue._)))]))))))]))))), - observer: __SYNDICATE__.Turn.ref(__SYNDICATE__.assertionFacetObserver( - (__vs: __SYNDICATE__.AnyValue) => { - if (Array.isArray(__vs)) { -const originPatValue = __vs[0]; -const detailPatValue = __vs[1]; - - const origin0 = Q.drop_lit(originPatValue); - if (!origin0 || !isEmbedded(origin0)) return; - const origin = origin0.embeddedValue; - - const detail0 = Q.drop_lit(detailPatValue, N.toNoisePathStepDetail); - if (!detail0) return; - const spec: N.NoiseSpec = detail0; - - const algorithms = SaltyCrypto.Noise_25519_ChaChaPoly_BLAKE2s; - const protocol = - spec.protocol._variant === "present" ? spec.protocol.protocol : - spec.protocol._variant === "absent" ? N.fromDefaultProtocol(N.DefaultProtocol()) as string : - (() => { throw new Error("Invalid noise protocol name"); })(); - const patternName = SaltyCrypto.matchPattern(algorithms, protocol); - if (patternName === null) throw new Error("Unsupported protocol " + protocol); - const preSharedKeys = - spec.preSharedKeys._variant === "present" ? spec.preSharedKeys.preSharedKeys : - spec.preSharedKeys._variant === "absent" ? [] : - (() => { throw new Error("Invalid pre-shared keys"); })(); - const prologue = underlying(canonicalEncode(spec.service)); - - const H = new SaltyCrypto.Handshake( - algorithms, - patternName, - 'initiator', - { - prologue, - remoteStaticPublicKey: underlying(spec.key), - preSharedKeys: preSharedKeys.map(underlying), - }); - - let transportState: SaltyCrypto.TransportState | null = null; - let responderSession: Ref | null = null; - let relay: Relay.Relay | null = null; - - function maybeTransition(s: SaltyCrypto.TransportState | null) { - if (transportState !== null) { - throw new Error("Unexpected double-transition to transport state"); - } - transportState = s; - if (transportState !== null) { - const noiseSessionFacet = Turn.activeFacet; - const peer = new Relay.Relay({ - debug, - trustPeer: true, - packetWriter: bs => noiseSessionFacet.turn(() => { - const fragments = transportState!.send.encrypt_large(bs).map(Bytes.from); - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.message(currentSyndicateTarget, ((fragments.length === 1) - ? N.Packet.complete(fragments[0]) - : N.Packet.fragmented(fragments))); - })(responderSession!)); - }), - setup(r: Relay.Relay) { - relay = r; - }, - initialOid: 0, - }).peer!; - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: G.ResolvedPathStep({ - "origin": origin, - "pathStep": G.PathStep({ - "stepType": N.$noise, - "detail": fromJS(spec), - }), - "resolved": G.Resolved.accepted(peer), - }) })); - } - } - - __SYNDICATE__.Turn.active.facet(() => { - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: G.Resolve({ - "step": G.Step({ - "stepType": N.$noise, - "detail": fromJS(N.ServiceSelector(spec.service)), - }), - "observer": __SYNDICATE__.Turn.ref(({ - ... assertionFacetObserver(e => { - const response = G.toResolved(e); - if (!response) return; - switch (response._variant) { - case "accepted": - responderSession = response.responderSession; - const { packet, finished } = H.writeMessage(new Uint8Array()); - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.message(currentSyndicateTarget, Bytes.from(packet)); - })(responderSession)); - maybeTransition(finished); - break; - case "Rejected": - __SYNDICATE__.Turn.active._stop(__SYNDICATE__.Turn.activeFacet, () => { - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: G.ResolvedPathStep({ - "origin": origin, - "pathStep": G.PathStep({ - "stepType": N.$noise, - "detail": fromJS(N.NoisePathStepDetail(spec)), - }), - "resolved": response, - }) })); - })(ds)); - }); - } - }), - message(body: Assertion) { - const p = N.asPacket(body); - if (transportState) { - const packet = transportState.recv.decrypt_large( - p._variant === 'complete' - ? [underlying(p.value)] - : p.value.map(underlying)); - relay!.accept(packet); - } else { - if (p._variant !== 'complete') { - throw new Error("Unexpected fragmentation in handshake"); - } - const { message, finished } = H.readMessage(underlying(p.value)); - if (message.byteLength !== 0) { - throw new Error("Unexpected payload during handshake"); - } - maybeTransition(finished); - } - }, - })), - }) })); - })(origin)); - }); - - } - } - )) - }) })); - })(ds)); - }); - - __SYNDICATE__.Turn.active._spawn(() => { __SYNDICATE__.Turn.activeFacet.actor.name = 'sturdyRefStep'; - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: __SYNDICATE__.Observe({ - pattern: __SYNDICATE__.QuasiValue.finish((__SYNDICATE__.QuasiValue.ctor(Observe, (__SYNDICATE__.QuasiValue.dict(["pattern", (__SYNDICATE__.QuasiValue.quote((__SYNDICATE__.QuasiValue.ctor(G.ResolvedPathStep, (__SYNDICATE__.QuasiValue.dict(["origin", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))))], ["pathStep", (__SYNDICATE__.QuasiValue.ctor(G.PathStep, (__SYNDICATE__.QuasiValue.dict(["stepType", (__SYNDICATE__.QuasiValue.lit(__SYNDICATE__.fromJS(S.$ref)))], ["detail", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue.bind((__SYNDICATE__.QuasiValue._)))))]))))], ["resolved", (__SYNDICATE__.QuasiValue.unquote((__SYNDICATE__.QuasiValue._)))]))))))]))))), - observer: __SYNDICATE__.Turn.ref(__SYNDICATE__.assertionFacetObserver( - (__vs: __SYNDICATE__.AnyValue) => { - if (Array.isArray(__vs)) { -const originPatValue = __vs[0]; -const detailPatValue = __vs[1]; - - const origin0 = Q.drop_lit(originPatValue); - if (!origin0 || !isEmbedded(origin0)) return; - const origin = origin0.embeddedValue; - - const detail0 = Q.drop_lit(detailPatValue, S.toSturdyPathStepDetail); - if (!detail0) return; - const parameters: S.Parameters = detail0; - - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: G.Resolve({ - "step": G.Step({ - "stepType": S.$ref, - "detail": fromJS(parameters), - }), - "observer": __SYNDICATE__.Turn.ref(assertionFacetObserver(e => { - const response = G.toResolved(e); - if (!response) return; - (((currentSyndicateTarget: __SYNDICATE__.Ref) => { - __SYNDICATE__.Turn.active.assertDataflow(() => ({ target: currentSyndicateTarget, assertion: G.ResolvedPathStep({ - "origin": origin, - "pathStep": G.PathStep({ - "stepType": S.$ref, - "detail": fromJS(parameters), - }), - "resolved": response, - }) })); - })(ds)); - })), - }) })); - })(origin)); - - } - } - )) - }) })); - })(ds)); - }); -}