relays: split connecting and resolving
This commit is contained in:
parent
686719273c
commit
2e027afc42
|
@ -27,11 +27,10 @@ type
|
|||
Turn = syndicate.Turn
|
||||
Handle = actors.Handle
|
||||
|
||||
type
|
||||
PacketHandler = proc (buf: seq[byte]) {.gcsafe.}
|
||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
|
||||
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure, gcsafe.}
|
||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure, gcsafe.}
|
||||
|
||||
Relay* = ref object of RootObj
|
||||
Relay* = ref object
|
||||
facet: Facet
|
||||
inboundAssertions: Table[Handle,
|
||||
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
|
||||
|
@ -40,9 +39,9 @@ type
|
|||
imported: Membrane
|
||||
nextLocalOid: Oid
|
||||
pendingTurn: protocol.Turn
|
||||
packetSender: PacketHandler
|
||||
wireBuf: BufferedDecoder
|
||||
untrusted: bool
|
||||
packetWriter: PacketWriter
|
||||
peer: Cap
|
||||
|
||||
SyncPeerEntity = ref object of Entity
|
||||
relay: Relay
|
||||
|
@ -108,19 +107,18 @@ proc deregister(relay: Relay; h: Handle) =
|
|||
if relay.outboundAssertions.pop(h, outbound):
|
||||
for e in outbound: releaseCapOut(relay, e)
|
||||
|
||||
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||
if r.pendingTurn.len == 0:
|
||||
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||
if relay.pendingTurn.len == 0:
|
||||
# If the pending queue is empty then schedule a packet
|
||||
# to be sent after pending I/O is processed.
|
||||
callSoon do ():
|
||||
r.facet.run do (turn: var Turn):
|
||||
relay.facet.run do (turn: var Turn):
|
||||
var pkt = Packet(
|
||||
orKind: PacketKind.Turn,
|
||||
turn: move r.pendingTurn)
|
||||
turn: move relay.pendingTurn)
|
||||
trace "C: ", pkt
|
||||
assert(not r.packetSender.isNil, "missing packetSender proc")
|
||||
r.packetSender(encode pkt)
|
||||
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||
relay.packetWriter(turn, encode pkt)
|
||||
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||
|
||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||
send(re.relay, turn, protocol.Oid re.oid, ev)
|
||||
|
@ -252,35 +250,28 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
|||
when defined(posix):
|
||||
stderr.writeLine("discarding undecoded packet ", v)
|
||||
|
||||
proc dispatch(relay: Relay; buf: seq[byte]) =
|
||||
proc recv(relay: Relay; buf: seq[byte]) =
|
||||
feed(relay.wireBuf, buf)
|
||||
var pr = decode(relay.wireBuf)
|
||||
if pr.isSome: dispatch(relay, get pr)
|
||||
if pr.isSome: dispatch(relay, pr.get)
|
||||
|
||||
type
|
||||
RelayOptions* = object of RootObj
|
||||
untrusted*: bool
|
||||
packetWriter*: PacketWriter
|
||||
|
||||
RelayActorOptions* = object of RelayOptions
|
||||
initialOid*: Option[Oid]
|
||||
initialCap*: Cap
|
||||
nextLocalOid*: Option[Oid]
|
||||
|
||||
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
|
||||
result = Relay(
|
||||
facet: turn.facet,
|
||||
wireBuf: newBufferedDecoder(0),
|
||||
untrusted: opts.untrusted)
|
||||
discard result.facet.preventInertCheck()
|
||||
setup(turn, result)
|
||||
|
||||
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection =
|
||||
result.`addr` = addrAss
|
||||
result.resolved = Resolved(orKind: ResolvedKind.accepted)
|
||||
result.resolved.accepted.responderSession = ds
|
||||
|
||||
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
discard spawn(name, turn) do (turn: var Turn):
|
||||
let relay = newRelay(turn, opts, setup)
|
||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
spawn(name, turn) do (turn: var Turn):
|
||||
let relay = Relay(
|
||||
facet: turn.facet,
|
||||
packetWriter: opts.packetWriter,
|
||||
wireBuf: newBufferedDecoder(0),
|
||||
)
|
||||
discard relay.facet.preventInertCheck()
|
||||
if not opts.initialCap.isNil:
|
||||
var exported: seq[WireSymbol]
|
||||
discard rewriteCapOut(relay, opts.initialCap, exported)
|
||||
|
@ -288,132 +279,236 @@ proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts
|
|||
relay.nextLocalOid =
|
||||
if oid == 0.Oid: 1.Oid
|
||||
else: oid
|
||||
assert opts.initialOid.isSome
|
||||
if opts.initialOid.isSome:
|
||||
var
|
||||
imported: seq[WireSymbol]
|
||||
wr = WireRef(
|
||||
orKind: WireRefKind.mine,
|
||||
mine: WireRefMine(oid: opts.initialOid.get))
|
||||
res = rewriteCapIn(relay, turn.facet, wr, imported)
|
||||
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
|
||||
relay.peer = rewriteCapIn(relay, turn.facet, wr, imported)
|
||||
assert not relay.peer.isNil
|
||||
setup(turn, relay)
|
||||
|
||||
proc rejected(detail: Value): Resolved =
|
||||
result = Resolved(orKind: ResolvedKind.Rejected)
|
||||
result.rejected.detail = detail
|
||||
|
||||
proc accepted(cap: Cap): Resolved =
|
||||
result = Resolved(orKind: ResolvedKind.accepted)
|
||||
result.accepted.responderSession = cap
|
||||
|
||||
else:
|
||||
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
|
||||
|
||||
when defined(posix):
|
||||
import std/asyncnet
|
||||
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
||||
|
||||
type ShutdownEntity* = ref object of Entity
|
||||
|
||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||
stopActor(turn)
|
||||
|
||||
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||
|
||||
export Tcp
|
||||
|
||||
when defined(posix):
|
||||
import std/asyncfile
|
||||
export Unix
|
||||
|
||||
proc newStdioTunnel(facet: Facet; receiver: PacketHandler): PacketHandler =
|
||||
let asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
||||
close(stdin)
|
||||
const readSize = 0x2000
|
||||
proc readCb(fut: Future[string]) {.gcsafe.} =
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
else:
|
||||
receiver(cast[seq[byte]](fut.read))
|
||||
asyncStdin.read(readSize).addCallback(readCb)
|
||||
asyncStdin.read(readSize).addCallback(readCb)
|
||||
proc sender(buf: seq[byte]) =
|
||||
try:
|
||||
if writeBytes(stdout, buf, 0, buf.len) != buf.len:
|
||||
raise newException(IOError, "failed to write Preserves to stdout")
|
||||
flushFile(stdout)
|
||||
except CatchableError as err:
|
||||
terminate(facet, err)
|
||||
sender
|
||||
type StdioControlEntity = ref object of Entity
|
||||
stdin: AsyncFile
|
||||
|
||||
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
close(entity.stdin)
|
||||
close(stdout)
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||
## Connect to an external dataspace over stdio.
|
||||
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
|
||||
## Blocking write to stdout.
|
||||
let n = writeBytes(stdout, buf, 0, buf.len)
|
||||
flushFile(stdout)
|
||||
if n != buf.len:
|
||||
stopActor(turn)
|
||||
var opts = RelayActorOptions(
|
||||
packetWriter: stdoutWriter,
|
||||
initialCap: ds,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||
let
|
||||
facet = turn.facet
|
||||
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
control: StdioControlEntity(stdin: asyncStdin).newCap(turn),
|
||||
resolved: relay.peer.accepted,
|
||||
))
|
||||
const stdinReadSize = 0x2000
|
||||
proc readCb(pktFut: Future[string]) {.gcsafe.} =
|
||||
if not pktFut.failed:
|
||||
var buf = pktFut.read
|
||||
if buf.len == 0:
|
||||
run(facet) do (turn: var Turn): stopActor(turn)
|
||||
else:
|
||||
relay.recv(cast[seq[byte]](buf))
|
||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||
|
||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||
## Connect to an external dataspace over stdin and stdout.
|
||||
var opts = RelayActorOptions(
|
||||
initialCap: ds,
|
||||
initialOid: 0.Oid.some)
|
||||
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
|
||||
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
|
||||
relay.packetSender = newStdioTunnel(turn.facet, receiver)
|
||||
connectTransport(turn, ds, transportAddress.Stdio())
|
||||
|
||||
proc newTunnel(facet: Facet; receiver: PacketHandler; socket: AsyncSocket): PacketHandler =
|
||||
const recvSize = 0x2000
|
||||
proc recvCb(fut: Future[string]) {.gcsafe.} =
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
else:
|
||||
receiver(cast[seq[byte]](fut.read))
|
||||
if not socket.isClosed:
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
proc sender(buf: seq[byte]) =
|
||||
asyncCheck(facet, socket.send(cast[string](buf)))
|
||||
sender
|
||||
import std/asyncnet
|
||||
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; steps: seq[Value]) =
|
||||
## Relay a dataspace over an open `AsyncSocket`.
|
||||
var shutdownCap: Cap
|
||||
let
|
||||
reenable = turn.facet.preventInertCheck()
|
||||
connectionClosedCap = newCap(turn, ShutdownEntity())
|
||||
discard bootActor("socket") do (turn: var Turn):
|
||||
var ops = RelayActorOptions(
|
||||
initialOid: 0.Oid.some)
|
||||
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
|
||||
let facet = turn.facet
|
||||
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
|
||||
relay.packetSender = newTunnel(turn.facet, receiver, socket)
|
||||
turn.facet.actor.atExit do (turn: var Turn): close(socket)
|
||||
discard publish(turn, connectionClosedCap, true)
|
||||
shutdownCap = newCap(turn, ShutdownEntity())
|
||||
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value):
|
||||
raise newException(IOError, $detail)
|
||||
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap):
|
||||
run(gatekeeper.relay) do (turn: var Turn):
|
||||
reenable()
|
||||
discard publish(turn, shutdownCap, true)
|
||||
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
|
||||
let facet = inFacet(turn) do (turn: var Turn):
|
||||
let o = ass.preservesTo Resolved; if o.isSome:
|
||||
discard publish(turn, ds, ResolvePath(
|
||||
route: route, `addr`: addrAss, resolved: o.get))
|
||||
proc action(turn: var Turn) =
|
||||
stop(turn, facet)
|
||||
result = action
|
||||
var resolve = Resolve(
|
||||
step: steps[0],
|
||||
observer: newCap(turn, during(duringCallback)),
|
||||
type SocketControlEntity = ref object of Entity
|
||||
socket: AsyncSocket
|
||||
|
||||
method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
close(entity.socket)
|
||||
|
||||
type ShutdownEntity* = ref object of Entity
|
||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||
stopActor(turn)
|
||||
|
||||
proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) =
|
||||
proc socketWriter(turn: var Turn; buf: seq[byte]) =
|
||||
asyncCheck(turn, socket.send(cast[string](buf)))
|
||||
var ops = RelayActorOptions(
|
||||
packetWriter: socketWriter,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||
let facet = turn.facet
|
||||
facet.actor.atExit do (turn: var Turn): close(socket)
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: transAddr,
|
||||
control: SocketControlEntity(socket: socket).newCap(turn),
|
||||
resolved: relay.peer.accepted,
|
||||
))
|
||||
const recvSize = 0x4000
|
||||
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
||||
if pktFut.failed or pktFut.read.len == 0:
|
||||
run(facet) do (turn: var Turn): stopActor(turn)
|
||||
else:
|
||||
relay.recv(cast[seq[byte]](pktFut.read))
|
||||
if not socket.isClosed:
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
|
||||
proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) =
|
||||
let facet = turn.facet
|
||||
fut.addCallback do ():
|
||||
run(facet) do (turn: var Turn):
|
||||
if fut.failed:
|
||||
var ass = TransportConnection(
|
||||
`addr`: ta,
|
||||
resolved: Resolved(orKind: ResolvedKind.Rejected),
|
||||
)
|
||||
discard publish(turn, gatekeeper, resolve)
|
||||
ass.resolved.rejected.detail = embed fut.error
|
||||
publish(turn, ds, ass)
|
||||
else:
|
||||
connect(turn, ds, ta, socket)
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; steps: seq[Value]) =
|
||||
## Relay a dataspace over TCP.
|
||||
let socket = newAsyncSocket(
|
||||
domain = AF_INET,
|
||||
sockType = SOCK_STREAM,
|
||||
protocol = IPPROTO_TCP,
|
||||
buffered = false)
|
||||
let fut = connect(socket, transport.host, Port transport.port)
|
||||
addCallback(fut, turn) do (turn: var Turn):
|
||||
connect(turn, ds, route, transport.toPreserves, socket, steps)
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||
let
|
||||
facet = turn.facet
|
||||
socket = newAsyncSocket(
|
||||
domain = AF_INET,
|
||||
sockType = SOCK_STREAM,
|
||||
protocol = IPPROTO_TCP,
|
||||
buffered = false,
|
||||
)
|
||||
connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port))
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; steps: seq[Value]) =
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
||||
## Relay a dataspace over a UNIX socket.
|
||||
let socket = newAsyncSocket(
|
||||
domain = AF_UNIX,
|
||||
sockType = SOCK_STREAM,
|
||||
protocol = cast[Protocol](0),
|
||||
buffered = false)
|
||||
let fut = connectUnix(socket, transport.path)
|
||||
addCallback(fut, turn) do (turn: var Turn):
|
||||
connect(turn, ds, route, transport.toPreserves, socket, steps)
|
||||
connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path))
|
||||
|
||||
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} =
|
||||
if stepOff < route.pathSteps.len:
|
||||
let
|
||||
step = route.pathSteps[stepOff]
|
||||
rejectPat = ResolvedPathStep?:{
|
||||
0: ?(origin.embed), 1: ?step, 2: ?:Rejected}
|
||||
acceptPat = ResolvedPathStep?:{
|
||||
0: ?(origin.embed), 1: ?step, 2: ?:ResolvedAccepted}
|
||||
onPublish(turn, ds, rejectPat) do (detail: Value):
|
||||
publish(turn, ds, ResolvePath(
|
||||
route: route,
|
||||
`addr`: route.transports[transOff],
|
||||
resolved: detail.rejected,
|
||||
))
|
||||
during(turn, ds, acceptPat) do (next: Cap):
|
||||
walk(turn, ds, next, route, transOff, stepOff.succ)
|
||||
else:
|
||||
publish(turn, ds, ResolvePath(
|
||||
route: route,
|
||||
`addr`: route.transports[transOff],
|
||||
resolved: origin.accepted,
|
||||
))
|
||||
|
||||
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
||||
let rejectPat = TransportConnection ?: {
|
||||
0: ?route.transports[transOff],
|
||||
2: ?:Rejected,
|
||||
}
|
||||
during(turn, ds, rejectPat) do (detail: Value):
|
||||
publish(turn, ds, ResolvePath(
|
||||
route: route,
|
||||
`addr`: route.transports[transOff],
|
||||
resolved: detail.rejected,
|
||||
))
|
||||
let acceptPat = TransportConnection?:{
|
||||
0: ?route.transports[transOff],
|
||||
2: ?:ResolvedAccepted,
|
||||
}
|
||||
onPublish(turn, ds, acceptPat) do (origin: Cap):
|
||||
walk(turn, ds, origin, route, transOff, 0)
|
||||
|
||||
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||
## Spawn actors that manage routes and appeasing gatekeepers.
|
||||
spawn("transport-connector", turn) do (turn: var Turn):
|
||||
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||
# Use a generic pattern and type matching
|
||||
# in the during handler because it is easy.
|
||||
|
||||
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
|
||||
during(turn, ds, stdioPat) do:
|
||||
connectTransport(turn, ds, Stdio())
|
||||
|
||||
# TODO: tcp pattern
|
||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
|
||||
connectTransport(turn, ds, ta.value)
|
||||
|
||||
# TODO: unix pattern
|
||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
|
||||
connectTransport(turn, ds, ta.value)
|
||||
|
||||
spawn("path-resolver", turn) do (turn: var Turn):
|
||||
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
||||
during(turn, ds, pat) do (route: Literal[Route]):
|
||||
for i, transAddr in route.value.transports:
|
||||
connectRoute(turn, ds, route.value, i)
|
||||
|
||||
spawn("sturdyref-step", turn) do (turn: var Turn):
|
||||
let pat = ?Observe(pattern: ResolvedPathStep?:{1: !SturdyRef}) ?? {0: grab(), 1: grab()}
|
||||
during(turn, ds, pat) do (origin: Literal[Cap]; detail: Literal[sturdy.Parameters]):
|
||||
let step = SturdyRef(parameters: detail.value).toPreserves
|
||||
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
|
||||
let facet = inFacet(turn) do (turn: var Turn):
|
||||
var res = ass.preservesTo Resolved
|
||||
if res.isSome:
|
||||
publish(turn, ds, ResolvedPathStep(
|
||||
origin: origin.value,
|
||||
pathStep: step,
|
||||
resolved: res.get,
|
||||
))
|
||||
proc action(turn: var Turn) =
|
||||
stop(turn, facet)
|
||||
result = action
|
||||
publish(turn, origin.value, Resolve(
|
||||
step: step,
|
||||
observer: newCap(turn, during(duringCallback)),
|
||||
))
|
||||
|
||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||
|
||||
|
@ -429,21 +524,7 @@ proc envRoute*: Route =
|
|||
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
||||
|
||||
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||
var
|
||||
unix: Unix
|
||||
tcp: Tcp
|
||||
stdio: Stdio
|
||||
doAssert(route.transports.len == 1, "only a single transport supported for routes")
|
||||
if unix.fromPreserves route.transports[0]:
|
||||
connect(turn, ds, route, unix, route.pathSteps)
|
||||
elif tcp.fromPreserves route.transports[0]:
|
||||
connect(turn, ds, route, tcp, route.pathSteps)
|
||||
elif stdio.fromPreserves route.transports[0]:
|
||||
doAssert(route.pathSteps.len == 0, "route steps not available over stdio")
|
||||
connectStdio(turn, ds)
|
||||
bootProc(turn, ds)
|
||||
else:
|
||||
raise newException(ValueError, "unsupported route")
|
||||
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
|
||||
bootProc(turn, dst)
|
||||
|
||||
during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap):
|
||||
bootProc(turn, dest)
|
||||
# TODO: define a runActor that comes preloaded with relaying
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240119"
|
||||
version = "20240120"
|
||||
author = "Emery Hemingway"
|
||||
description = "Syndicated actors for conversational concurrency"
|
||||
license = "Unlicense"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, asyncfile, os, parseopt]
|
||||
import std/[asyncdispatch, asyncfile, parseopt]
|
||||
import preserves, syndicate, syndicate/relays
|
||||
|
||||
type
|
||||
|
@ -49,6 +49,7 @@ proc main =
|
|||
stderr.writeLine "--user: unspecified"
|
||||
else:
|
||||
runActor("chat") do (turn: var Turn; root: Cap):
|
||||
spawnRelays(turn, root)
|
||||
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
||||
chat(turn, ds, username)
|
||||
|
||||
|
|
Loading…
Reference in New Issue