From 47da04267130eda300cb94bb1de6c8e7a57b5480 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Tue, 13 Dec 2022 22:57:17 -0600 Subject: [PATCH] Add connectNet Use Taps to connect to Syndicate peers over TCP. Taps is now a depedency. --- shell.nix | 7 ++ src/syndicate.nim | 11 +- src/syndicate/peers.nim | 91 +++++++++++++++++ src/syndicate/relays.nim | 215 ++++++++++++++++++++------------------- 4 files changed, 217 insertions(+), 107 deletions(-) create mode 100644 shell.nix create mode 100644 src/syndicate/peers.nim diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..b786e41 --- /dev/null +++ b/shell.nix @@ -0,0 +1,7 @@ +{ pkgs ? (builtins.getFlake "github:nixos/nixpkgs/release-22.11").legacyPackages.x86_64-linux }: +with pkgs; + +mkShell { + packages = [ pkg-config getdns ]; + inputsFrom = [ nim-unwrapped ]; +} diff --git a/src/syndicate.nim b/src/syndicate.nim index d19f858..0760a22 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -29,11 +29,16 @@ import preserves import ./syndicate/[actors, dataspaces, durings, patterns] import ./syndicate/protocols/dataspace -from ./syndicate/relays import connectStdio, connectUnix +when defined(posix): + from ./syndicate/relays import connectStdio, connectUnix, SturdyRef + export connectStdio, connectUnix +else: + from ./syndicate/relays import SturdyRef +export SturdyRef export Actor, Assertion, Facet, Handle, Ref, Symbol, Turn, TurnAction, - `$`, `?`, addCallback, analyse, asyncCheck, bootDataspace, connectStdio, - connectUnix, drop, facet, future, grab, message, newDataspace, publish, + `$`, `?`, addCallback, analyse, asyncCheck, bootDataspace, + drop, facet, future, grab, message, newDataspace, publish, retract, replace, run, stop, unembed type diff --git a/src/syndicate/peers.nim b/src/syndicate/peers.nim new file mode 100644 index 0000000..1ff479c --- /dev/null +++ b/src/syndicate/peers.nim @@ -0,0 +1,91 @@ +# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway +# SPDX-License-Identifier: Unlicense + +## Module for peering with remote dataspaces over network. + +import std/[asyncdispatch, net, options, streams, tables] +import preserves +import ./actors, ./durings, ./relays, ./protocols/protocol + +import taps + +export `$` + +type + Turn = actors.Turn + Assertion = Preserve[Ref] + Value = Preserve[void] + +proc connectTcp(remote: RemoteSpecifier): Connection = + var transport = newTransportProperties() + transport.require("reliability") + transport.require("preserve-order") + var preConn = newPreConnection( + transport = some transport, + remote = some remote) + preconn.initiate() + +proc connectNet*(turn: var Turn; remote: RemoteSpecifier; cap: SturdyRef; bootProc: ConnectProc) = + let + facet = turn.facet + reenable = facet.preventInertCheck() + connectionClosedRef = newRef(turn, ShutdownEntity()) + conn = connectTcp(remote) + conn.onReady do: + discard bootActor("net") do (turn: var Turn): + var shutdownRef: Ref + proc tapsWriter(pkt: sink Packet): Future[void] = + let fut = newFuture[void]("tapsWriter") + send(conn, encode(pkt)) + onSent(conn) do (ctx: MessageContext): + complete(fut) + onSendError(conn) do (ctx: MessageContext; reason: ref Exception): + fail(fut, reason) + fut + var ops = RelayActorOptions( + packetWriter: tapsWriter, + initialOid: 0.Oid.some) + let relayFut = spawnRelay("net", turn, ops) do (turn: var Turn; relay: Relay): + let facet = turn.facet + facet.actor.atExit do (turn: var Turn): + close(conn) + conn.onConnectionError do (reason: ref Exception): + terminate(facet, reason) + conn.onReceiveError do (ctx: MessageContext; reason: ref Exception): + terminate(facet, reason) + conn.onClosed do: + run(facet) do (turn: var Turn): + stopActor(turn) + var wireBuf = newBufferedDecoder() + conn.onReceived do (buf: seq[byte]; ctx: MessageContext): + feed(wireBuf, buf) + var (success, pr) = decode(wireBuf) + if success: dispatch(relay, pr) + receive(conn) + receive(conn) + discard publish(turn, connectionClosedRef, true) + shutdownRef = newRef(turn, ShutdownEntity()) + relayFut.addCallback do (refFut: Future[Ref]): + let gatekeeper = read refFut + run(gatekeeper.relay) do (turn: var Turn): + reenable() + discard publish(turn, shutdownRef, true) + proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = + let facet = facet(turn) do (turn: var Turn): + bootProc(turn, unembed a) + proc action(turn: var Turn) = + stop(turn, facet) + result = action + var res = Resolve( + sturdyref: cap, + observer: newRef(turn, during(duringCallback))) + discard publish(turn, gatekeeper, res) + +proc connectNet*(turn: var Turn; host: string; port: Port; cap: SturdyRef; bootProc: ConnectProc) = + var remote = newRemoteEndpoint() + remote.with(port) + if isIpAddress host: + remote.with(parseIpAddress(host)) + else: + remote.withHostname(host) + connectNet(turn, remote, cap, bootProc) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 03f5430..af6afee 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -6,10 +6,15 @@ import preserves import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy] when defined(traceSyndicate): - template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) + when defined(posix): + template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) + else: + template trace(args: varargs[untyped]): untyped = echo(args) else: template trace(args: varargs[untyped]): untyped = discard +export `$` + type Oid = sturdy.Oid type @@ -23,7 +28,7 @@ type PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.} RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.} - Relay = ref object of RootObj + Relay* = ref object of RootObj facet: Facet inboundAssertions: Table[Handle, tuple[localHandle: Handle, imported: seq[WireSymbol]]] @@ -187,7 +192,7 @@ proc rewriteIn(relay; facet; v: Value): proc close(r: Relay) = discard -proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) = +proc dispatch*(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) = case event.orKind of EventKind.Assert: let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion) @@ -215,7 +220,7 @@ proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) = for e in imported: relay.imported.del e ]# -proc dispatch(relay: Relay; v: Value) = +proc dispatch*(relay: Relay; v: Value) = trace "S: ", v run(relay.facet) do (t: var Turn): var pkt: Packet @@ -235,13 +240,13 @@ proc dispatch(relay: Relay; v: Value) = stderr.writeLine("discarding undecoded packet ", v) type - RelayOptions = object of RootObj - packetWriter: PacketWriter - untrusted: bool - RelayActorOptions = object of RelayOptions - initialOid: Option[Oid] - initialRef: Ref - nextLocalOid: Option[Oid] + RelayOptions* = object of RootObj + packetWriter*: PacketWriter + untrusted*: bool + RelayActorOptions* = object of RelayOptions + initialOid*: Option[Oid] + initialRef*: Ref + nextLocalOid*: Option[Oid] proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay = result = Relay( @@ -251,7 +256,7 @@ proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay = discard result.facet.preventInertCheck() setup(turn, result) -proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Ref] = +proc spawnRelay*(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Ref] = var fut = newFuture[Ref]"spawnRelay" spawn(name, turn) do (turn: var Turn): let relay = newRelay(turn, opts, setup) @@ -272,110 +277,112 @@ proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: Re else: oid fut -import std/asyncnet -from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol +when defined(posix): + import std/asyncnet + from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol import protocols/gatekeeper -type ShutdownEntity = ref object of Entity +type ShutdownEntity* = ref object of Entity method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = stopActor(turn) type - SturdyRef = sturdy.SturdyRef[Ref] - Resolve = gatekeeper.Resolve[Ref] + SturdyRef* = sturdy.SturdyRef[Ref] + Resolve* = gatekeeper.Resolve[Ref] ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.} -proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) = - var socket = newAsyncSocket( - domain = AF_UNIX, - sockType = SOCK_STREAM, - protocol = cast[Protocol](0), - buffered = false) - proc socketWriter(packet: sink Packet): Future[void] = - socket.send(cast[string](encode(packet))) - const recvSize = 0x2000 - var shutdownRef: Ref - let reenable = turn.facet.preventInertCheck() - let connectionClosedRef = newRef(turn, ShutdownEntity()) - var fut = newFuture[void]"connectUnix" - connectUnix(socket, path).addCallback do (f: Future[void]): - read f - discard bootActor("unix") do (turn: var Turn): - var ops = RelayActorOptions( - packetWriter: socketWriter, - initialOid: 0.Oid.some) - let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): - let facet = turn.facet - var wireBuf = newBufferedDecoder() - proc recvCb(pktFut: Future[string]) {.gcsafe.} = - if pktFut.failed: - run(facet) do (turn: var Turn): stopActor(turn) - else: - var buf = pktFut.read - if buf.len == 0: +when defined(posix): + proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) = + var socket = newAsyncSocket( + domain = AF_UNIX, + sockType = SOCK_STREAM, + protocol = cast[Protocol](0), + buffered = false) + proc socketWriter(packet: sink Packet): Future[void] = + socket.send(cast[string](encode(packet))) + const recvSize = 0x2000 + var shutdownRef: Ref + let reenable = turn.facet.preventInertCheck() + let connectionClosedRef = newRef(turn, ShutdownEntity()) + var fut = newFuture[void]"connectUnix" + connectUnix(socket, path).addCallback do (f: Future[void]): + read f + discard bootActor("unix") do (turn: var Turn): + var ops = RelayActorOptions( + packetWriter: socketWriter, + initialOid: 0.Oid.some) + let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): + let facet = turn.facet + var wireBuf = newBufferedDecoder() + proc recvCb(pktFut: Future[string]) {.gcsafe.} = + if pktFut.failed: run(facet) do (turn: var Turn): stopActor(turn) else: - feed(wireBuf, buf) - var (success, pr) = decode(wireBuf) - if success: - dispatch(relay, pr) - callSoon: - socket.recv(recvSize).addCallback(recvCb) - socket.recv(recvSize).addCallback(recvCb) - turn.facet.actor.atExit do (turn: var Turn): close(socket) - discard publish(turn, connectionClosedRef, true) - shutdownRef = newRef(turn, ShutdownEntity()) - relayFut.addCallback do (refFut: Future[Ref]): - let gatekeeper = read refFut - run(gatekeeper.relay) do (turn: var Turn): - reenable() - discard publish(turn, shutdownRef, true) - proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = - let facet = facet(turn) do (turn: var Turn): - bootProc(turn, unembed a) - proc action(turn: var Turn) = - stop(turn, facet) - result = action - var res = Resolve( - sturdyref: cap, - observer: newRef(turn, during(duringCallback))) - discard publish(turn, gatekeeper, res) - fut.complete() - asyncCheck(turn, fut) + var buf = pktFut.read + if buf.len == 0: + run(facet) do (turn: var Turn): stopActor(turn) + else: + feed(wireBuf, buf) + var (success, pr) = decode(wireBuf) + if success: + dispatch(relay, pr) + callSoon: + socket.recv(recvSize).addCallback(recvCb) + socket.recv(recvSize).addCallback(recvCb) + turn.facet.actor.atExit do (turn: var Turn): close(socket) + discard publish(turn, connectionClosedRef, true) + shutdownRef = newRef(turn, ShutdownEntity()) + relayFut.addCallback do (refFut: Future[Ref]): + let gatekeeper = read refFut + run(gatekeeper.relay) do (turn: var Turn): + reenable() + discard publish(turn, shutdownRef, true) + proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = + let facet = facet(turn) do (turn: var Turn): + bootProc(turn, unembed a) + proc action(turn: var Turn) = + stop(turn, facet) + result = action + var res = Resolve( + sturdyref: cap, + observer: newRef(turn, during(duringCallback))) + discard publish(turn, gatekeeper, res) + fut.complete() + asyncCheck(turn, fut) -import std/asyncfile + import std/asyncfile -const stdinReadSize = 128 + const stdinReadSize = 128 -proc connectStdio*(ds: Ref; turn: var Turn) = - ## Connect to an external dataspace over stdin and stdout. - proc stdoutWriter(packet: sink Packet): Future[void] {.async.} = - var buf = encode(packet) - doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len - flushFile(stdout) - var opts = RelayActorOptions( - packetWriter: stdoutWriter, - initialRef: ds, - initialOid: 0.Oid.some) - asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): - let - facet = turn.facet - asyncStdin = openAsync("/dev/stdin") - facet.actor.atExit do (turn: var Turn): - close(asyncStdin) - var wireBuf = newBufferedDecoder() - proc recvCb(pktFut: Future[string]) {.gcsafe.} = - if not pktFut.failed: - var buf = pktFut.read - if buf.len == 0: - run(facet) do (turn: var Turn): stopActor(turn) - else: - feed(wireBuf, buf) - var (success, pr) = decode(wireBuf) - if success: - dispatch(relay, pr) - callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb) - # do not process the next line immediately - asyncStdin.read(stdinReadSize).addCallback(recvCb) + proc connectStdio*(ds: Ref; turn: var Turn) = + ## Connect to an external dataspace over stdin and stdout. + proc stdoutWriter(packet: sink Packet): Future[void] {.async.} = + var buf = encode(packet) + doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len + flushFile(stdout) + var opts = RelayActorOptions( + packetWriter: stdoutWriter, + initialRef: ds, + initialOid: 0.Oid.some) + asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): + let + facet = turn.facet + asyncStdin = openAsync("/dev/stdin") + facet.actor.atExit do (turn: var Turn): + close(asyncStdin) + var wireBuf = newBufferedDecoder() + proc recvCb(pktFut: Future[string]) {.gcsafe.} = + if not pktFut.failed: + var buf = pktFut.read + if buf.len == 0: + run(facet) do (turn: var Turn): stopActor(turn) + else: + feed(wireBuf, buf) + var (success, pr) = decode(wireBuf) + if success: + dispatch(relay, pr) + callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb) + # do not process the next line immediately + asyncStdin.read(stdinReadSize).addCallback(recvCb)