diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index e1a2311..ad9b852 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -27,11 +27,10 @@ type Turn = syndicate.Turn Handle = actors.Handle -type - PacketHandler = proc (buf: seq[byte]) {.gcsafe.} - RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.} + PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure, gcsafe.} + RelaySetup = proc (turn: var Turn; relay: Relay) {.closure, gcsafe.} - Relay* = ref object of RootObj + Relay* = ref object facet: Facet inboundAssertions: Table[Handle, tuple[localHandle: Handle, imported: seq[WireSymbol]]] @@ -40,9 +39,9 @@ type imported: Membrane nextLocalOid: Oid pendingTurn: protocol.Turn - packetSender: PacketHandler wireBuf: BufferedDecoder - untrusted: bool + packetWriter: PacketWriter + peer: Cap SyncPeerEntity = ref object of Entity relay: Relay @@ -108,19 +107,18 @@ proc deregister(relay: Relay; h: Handle) = if relay.outboundAssertions.pop(h, outbound): for e in outbound: releaseCapOut(relay, e) -proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) = - if r.pendingTurn.len == 0: +proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) = + if relay.pendingTurn.len == 0: # If the pending queue is empty then schedule a packet # to be sent after pending I/O is processed. callSoon do (): - r.facet.run do (turn: var Turn): + relay.facet.run do (turn: var Turn): var pkt = Packet( orKind: PacketKind.Turn, - turn: move r.pendingTurn) + turn: move relay.pendingTurn) trace "C: ", pkt - assert(not r.packetSender.isNil, "missing packetSender proc") - r.packetSender(encode pkt) - r.pendingTurn.add TurnEvent(oid: rOid, event: m) + relay.packetWriter(turn, encode pkt) + relay.pendingTurn.add TurnEvent(oid: rOid, event: m) proc send(re: RelayEntity; turn: var Turn; ev: Event) = send(re.relay, turn, protocol.Oid re.oid, ev) @@ -252,35 +250,28 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} = when defined(posix): stderr.writeLine("discarding undecoded packet ", v) -proc dispatch(relay: Relay; buf: seq[byte]) = +proc recv(relay: Relay; buf: seq[byte]) = feed(relay.wireBuf, buf) var pr = decode(relay.wireBuf) - if pr.isSome: dispatch(relay, get pr) + if pr.isSome: dispatch(relay, pr.get) type RelayOptions* = object of RootObj - untrusted*: bool + packetWriter*: PacketWriter + RelayActorOptions* = object of RelayOptions initialOid*: Option[Oid] initialCap*: Cap nextLocalOid*: Option[Oid] -proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay = - result = Relay( - facet: turn.facet, - wireBuf: newBufferedDecoder(0), - untrusted: opts.untrusted) - discard result.facet.preventInertCheck() - setup(turn, result) - -proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection = - result.`addr` = addrAss - result.resolved = Resolved(orKind: ResolvedKind.accepted) - result.resolved.accepted.responderSession = ds - -proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) = - discard spawn(name, turn) do (turn: var Turn): - let relay = newRelay(turn, opts, setup) +proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) = + spawn(name, turn) do (turn: var Turn): + let relay = Relay( + facet: turn.facet, + packetWriter: opts.packetWriter, + wireBuf: newBufferedDecoder(0), + ) + discard relay.facet.preventInertCheck() if not opts.initialCap.isNil: var exported: seq[WireSymbol] discard rewriteCapOut(relay, opts.initialCap, exported) @@ -288,132 +279,234 @@ proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts relay.nextLocalOid = if oid == 0.Oid: 1.Oid else: oid + assert opts.initialOid.isSome if opts.initialOid.isSome: var imported: seq[WireSymbol] wr = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: opts.initialOid.get)) - res = rewriteCapIn(relay, turn.facet, wr, imported) - discard publish(turn, ds, transportConnectionResolve(addrAss, res)) - else: - discard publish(turn, ds, transportConnectionResolve(addrAss, ds)) + relay.peer = rewriteCapIn(relay, turn.facet, wr, imported) + assert not relay.peer.isNil + setup(turn, relay) + +proc rejected(detail: Value): Resolved = + result = Resolved(orKind: ResolvedKind.Rejected) + result.rejected.detail = detail + +proc accepted(cap: Cap): Resolved = + result = Resolved(orKind: ResolvedKind.accepted) + result.accepted.responderSession = cap when defined(posix): - import std/asyncnet - from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol -type ShutdownEntity* = ref object of Entity - -method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = - stopActor(turn) - -type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.} - -export Tcp - -when defined(posix): import std/asyncfile export Unix - proc newStdioTunnel(facet: Facet; receiver: PacketHandler): PacketHandler = - let asyncStdin = openAsync("/dev/stdin") # this is universal now? - close(stdin) - const readSize = 0x2000 - proc readCb(fut: Future[string]) {.gcsafe.} = - if fut.failed: terminate(facet, fut.error) - else: - receiver(cast[seq[byte]](fut.read)) - asyncStdin.read(readSize).addCallback(readCb) - asyncStdin.read(readSize).addCallback(readCb) - proc sender(buf: seq[byte]) = - try: - if writeBytes(stdout, buf, 0, buf.len) != buf.len: - raise newException(IOError, "failed to write Preserves to stdout") - flushFile(stdout) - except CatchableError as err: - terminate(facet, err) - sender + type StdioControlEntity = ref object of Entity + stdin: AsyncFile + + method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) = + if ass.value.preservesTo(ForceDisconnect).isSome: + close(entity.stdin) + close(stdout) + + proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) = + ## Connect to an external dataspace over stdio. + proc stdoutWriter(turn: var Turn; buf: seq[byte]) = + ## Blocking write to stdout. + let n = writeBytes(stdout, buf, 0, buf.len) + flushFile(stdout) + if n != buf.len: + stopActor(turn) + var opts = RelayActorOptions( + packetWriter: stdoutWriter, + initialCap: ds, + initialOid: 0.Oid.some, + ) + spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): + let + facet = turn.facet + asyncStdin = openAsync("/dev/stdin") # this is universal now? + publish(turn, ds, TransportConnection( + `addr`: ta.toPreserves, + control: StdioControlEntity(stdin: asyncStdin).newCap(turn), + resolved: relay.peer.accepted, + )) + const stdinReadSize = 0x2000 + proc readCb(pktFut: Future[string]) {.gcsafe.} = + if not pktFut.failed: + var buf = pktFut.read + if buf.len == 0: + run(facet) do (turn: var Turn): stopActor(turn) + else: + relay.recv(cast[seq[byte]](buf)) + asyncStdin.read(stdinReadSize).addCallback(readCb) + asyncStdin.read(stdinReadSize).addCallback(readCb) proc connectStdio*(turn: var Turn; ds: Cap) = ## Connect to an external dataspace over stdin and stdout. - var opts = RelayActorOptions( - initialCap: ds, - initialOid: 0.Oid.some) - spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay): - proc receiver(buf: seq[byte]) = dispatch(relay, buf) - relay.packetSender = newStdioTunnel(turn.facet, receiver) + connectTransport(turn, ds, transportAddress.Stdio()) - proc newTunnel(facet: Facet; receiver: PacketHandler; socket: AsyncSocket): PacketHandler = - const recvSize = 0x2000 - proc recvCb(fut: Future[string]) {.gcsafe.} = - if fut.failed: terminate(facet, fut.error) - else: - receiver(cast[seq[byte]](fut.read)) - if not socket.isClosed: - socket.recv(recvSize).addCallback(recvCb) - socket.recv(recvSize).addCallback(recvCb) - proc sender(buf: seq[byte]) = - asyncCheck(facet, socket.send(cast[string](buf))) - sender + import std/asyncnet + from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol - proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; steps: seq[Value]) = - ## Relay a dataspace over an open `AsyncSocket`. - var shutdownCap: Cap - let - reenable = turn.facet.preventInertCheck() - connectionClosedCap = newCap(turn, ShutdownEntity()) - discard bootActor("socket") do (turn: var Turn): - var ops = RelayActorOptions( - initialOid: 0.Oid.some) - spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay): - let facet = turn.facet - proc receiver(buf: seq[byte]) = dispatch(relay, buf) - relay.packetSender = newTunnel(turn.facet, receiver, socket) - turn.facet.actor.atExit do (turn: var Turn): close(socket) - discard publish(turn, connectionClosedCap, true) - shutdownCap = newCap(turn, ShutdownEntity()) - onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value): - raise newException(IOError, $detail) - onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap): - run(gatekeeper.relay) do (turn: var Turn): - reenable() - discard publish(turn, shutdownCap, true) - proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction = - let facet = inFacet(turn) do (turn: var Turn): - let o = ass.preservesTo Resolved; if o.isSome: - discard publish(turn, ds, ResolvePath( - route: route, `addr`: addrAss, resolved: o.get)) - proc action(turn: var Turn) = - stop(turn, facet) - result = action - var resolve = Resolve( - step: steps[0], - observer: newCap(turn, during(duringCallback)), + type SocketControlEntity = ref object of Entity + socket: AsyncSocket + + method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) = + if ass.value.preservesTo(ForceDisconnect).isSome: + close(entity.socket) + + type ShutdownEntity* = ref object of Entity + method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = + stopActor(turn) + + proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) = + proc socketWriter(turn: var Turn; buf: seq[byte]) = + asyncCheck(turn, socket.send(cast[string](buf))) + var ops = RelayActorOptions( + packetWriter: socketWriter, + initialOid: 0.Oid.some, + ) + spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay): + let facet = turn.facet + facet.actor.atExit do (turn: var Turn): close(socket) + publish(turn, ds, TransportConnection( + `addr`: transAddr, + control: SocketControlEntity(socket: socket).newCap(turn), + resolved: relay.peer.accepted, + )) + const recvSize = 0x4000 + proc recvCb(pktFut: Future[string]) {.gcsafe.} = + if pktFut.failed or pktFut.read.len == 0: + run(facet) do (turn: var Turn): stopActor(turn) + else: + relay.recv(cast[seq[byte]](pktFut.read)) + if not socket.isClosed: + socket.recv(recvSize).addCallback(recvCb) + socket.recv(recvSize).addCallback(recvCb) + + proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) = + let facet = turn.facet + fut.addCallback do (): + run(facet) do (turn: var Turn): + if fut.failed: + var ass = TransportConnection( + `addr`: ta, + resolved: Resolved(orKind: ResolvedKind.Rejected), ) - discard publish(turn, gatekeeper, resolve) + ass.resolved.rejected.detail = embed fut.error + publish(turn, ds, ass) + else: + connect(turn, ds, ta, socket) - proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; steps: seq[Value]) = - ## Relay a dataspace over TCP. - let socket = newAsyncSocket( - domain = AF_INET, - sockType = SOCK_STREAM, - protocol = IPPROTO_TCP, - buffered = false) - let fut = connect(socket, transport.host, Port transport.port) - addCallback(fut, turn) do (turn: var Turn): - connect(turn, ds, route, transport.toPreserves, socket, steps) + proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) = + let + facet = turn.facet + socket = newAsyncSocket( + domain = AF_INET, + sockType = SOCK_STREAM, + protocol = IPPROTO_TCP, + buffered = false, + ) + connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port)) - proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; steps: seq[Value]) = + proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) = ## Relay a dataspace over a UNIX socket. let socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, protocol = cast[Protocol](0), buffered = false) - let fut = connectUnix(socket, transport.path) - addCallback(fut, turn) do (turn: var Turn): - connect(turn, ds, route, transport.toPreserves, socket, steps) + connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path)) + +proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} = + if stepOff < route.pathSteps.len: + let + step = route.pathSteps[stepOff] + rejectPat = ResolvedPathStep?:{ + 0: ?(origin.embed), 1: ?step, 2: ?:Rejected} + acceptPat = ResolvedPathStep?:{ + 0: ?(origin.embed), 1: ?step, 2: ?:ResolvedAccepted} + onPublish(turn, ds, rejectPat) do (detail: Value): + publish(turn, ds, ResolvePath( + route: route, + `addr`: route.transports[transOff], + resolved: detail.rejected, + )) + during(turn, ds, acceptPat) do (next: Cap): + walk(turn, ds, next, route, transOff, stepOff.succ) + else: + publish(turn, ds, ResolvePath( + route: route, + `addr`: route.transports[transOff], + resolved: origin.accepted, + )) + +proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) = + let rejectPat = TransportConnection ?: { + 0: ?route.transports[transOff], + 2: ?:Rejected, + } + during(turn, ds, rejectPat) do (detail: Value): + publish(turn, ds, ResolvePath( + route: route, + `addr`: route.transports[transOff], + resolved: detail.rejected, + )) + let acceptPat = TransportConnection?:{ + 0: ?route.transports[transOff], + 2: ?:ResolvedAccepted, + } + onPublish(turn, ds, acceptPat) do (origin: Cap): + walk(turn, ds, origin, route, transOff, 0) + +proc spawnRelays*(turn: var Turn; ds: Cap) = + ## Spawn actors that manage routes and appeasing gatekeepers. + spawn("transport-connector", turn) do (turn: var Turn): + let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() } + # Use a generic pattern and type matching + # in the during handler because it is easy. + + let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio}) + during(turn, ds, stdioPat) do: + connectTransport(turn, ds, Stdio()) + + # TODO: tcp pattern + during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]): + connectTransport(turn, ds, ta.value) + + # TODO: unix pattern + during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]): + connectTransport(turn, ds, ta.value) + + spawn("path-resolver", turn) do (turn: var Turn): + let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()} + during(turn, ds, pat) do (route: Literal[Route]): + for i, transAddr in route.value.transports: + connectRoute(turn, ds, route.value, i) + + spawn("sturdyref-step", turn) do (turn: var Turn): + let pat = ?Observe(pattern: ResolvedPathStep?:{1: !SturdyRef}) ?? {0: grab(), 1: grab()} + during(turn, ds, pat) do (origin: Literal[Cap]; detail: Literal[sturdy.Parameters]): + let step = SturdyRef(parameters: detail.value).toPreserves + proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction = + let facet = inFacet(turn) do (turn: var Turn): + var res = ass.preservesTo Resolved + if res.isSome: + publish(turn, ds, ResolvedPathStep( + origin: origin.value, + pathStep: step, + resolved: res.get, + )) + proc action(turn: var Turn) = + stop(turn, facet) + result = action + publish(turn, origin.value, Resolve( + step: step, + observer: newCap(turn, during(duringCallback)), + )) type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.} @@ -429,21 +522,7 @@ proc envRoute*: Route = raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr) proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) = - var - unix: Unix - tcp: Tcp - stdio: Stdio - doAssert(route.transports.len == 1, "only a single transport supported for routes") - if unix.fromPreserves route.transports[0]: - connect(turn, ds, route, unix, route.pathSteps) - elif tcp.fromPreserves route.transports[0]: - connect(turn, ds, route, tcp, route.pathSteps) - elif stdio.fromPreserves route.transports[0]: - doAssert(route.pathSteps.len == 0, "route steps not available over stdio") - connectStdio(turn, ds) - bootProc(turn, ds) - else: - raise newException(ValueError, "unsupported route") + during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap): + bootProc(turn, dst) - during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap): - bootProc(turn, dest) +# TODO: define a runActor that comes preloaded with relaying diff --git a/syndicate.nimble b/syndicate.nimble index 7be17e8..6b48412 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20240119" +version = "20240120" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense" diff --git a/tests/test_chat.nim b/tests/test_chat.nim index cf2d94b..24c1b4a 100644 --- a/tests/test_chat.nim +++ b/tests/test_chat.nim @@ -1,7 +1,7 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, asyncfile, os, parseopt] +import std/[asyncdispatch, asyncfile, parseopt] import preserves, syndicate, syndicate/relays type @@ -49,6 +49,7 @@ proc main = stderr.writeLine "--user: unspecified" else: runActor("chat") do (turn: var Turn; root: Cap): + spawnRelays(turn, root) resolve(turn, root, route) do (turn: var Turn; ds: Cap): chat(turn, ds, username)