Port relays to nim-sys
This commit is contained in:
parent
01f26caf7b
commit
eb5d4d9a57
16
lock.json
16
lock.json
|
@ -16,11 +16,11 @@
|
||||||
"packages": [
|
"packages": [
|
||||||
"cps"
|
"cps"
|
||||||
],
|
],
|
||||||
"path": "/nix/store/m9vpcf3dq6z2h1xpi1vlw0ycxp91s5p7-source",
|
"path": "/nix/store/452hfhasrn3gl6vijfmzs69djl099j0j-source",
|
||||||
"rev": "2a4d771a715ba45cfba3a82fa625ae7ad6591c8b",
|
"rev": "b7c179f172e3a256a482a9daee3c0815ea423206",
|
||||||
"sha256": "0c62k5wpq9z9mn8cd4rm8jjc4z0xmnak4piyj5dsfbyj6sbdw2bf",
|
"sha256": "1sn9s7iv83sw1jl5jgi2h7b0xpgsn13f9icp5124jvbp0qkxskx2",
|
||||||
"srcDir": "",
|
"srcDir": "",
|
||||||
"url": "https://github.com/nim-works/cps/archive/2a4d771a715ba45cfba3a82fa625ae7ad6591c8b.tar.gz"
|
"url": "https://github.com/nim-works/cps/archive/b7c179f172e3a256a482a9daee3c0815ea423206.tar.gz"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"method": "fetchzip",
|
"method": "fetchzip",
|
||||||
|
@ -82,11 +82,11 @@
|
||||||
"packages": [
|
"packages": [
|
||||||
"sys"
|
"sys"
|
||||||
],
|
],
|
||||||
"path": "/nix/store/ayplzmq7xdzrp3n6ly6dnskf5c5aiihp-source",
|
"path": "/nix/store/syhxsjlsdqfap0hk4qp3s6kayk8cqknd-source",
|
||||||
"rev": "3b86a5083a4aa178994fe4ffdc046d340aa13b32",
|
"rev": "4ef3b624db86e331ba334e705c1aa235d55b05e1",
|
||||||
"sha256": "0qz9hag7synp8sx2b6caazm2kidvd0lv2p0h98sslkyzaf4icnal",
|
"sha256": "1q4qgw4an4mmmcbx48l6xk1jig1vc8p9cq9dbx39kpnb0890j32q",
|
||||||
"srcDir": "src",
|
"srcDir": "src",
|
||||||
"url": "https://github.com/alaviss/nim-sys/archive/3b86a5083a4aa178994fe4ffdc046d340aa13b32.tar.gz"
|
"url": "https://github.com/ehmry/nim-sys/archive/4ef3b624db86e331ba334e705c1aa235d55b05e1.tar.gz"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
import std/[options, tables]
|
import std/[options, tables]
|
||||||
from std/os import getEnv, `/`
|
from std/os import getEnv, `/`
|
||||||
|
import pkg/sys/ioqueue
|
||||||
import preserves
|
import preserves
|
||||||
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
||||||
|
|
||||||
|
@ -16,16 +17,15 @@ else:
|
||||||
|
|
||||||
export `$`
|
export `$`
|
||||||
|
|
||||||
type
|
|
||||||
Oid = sturdy.Oid
|
|
||||||
|
|
||||||
export Stdio, Tcp, WebSocket, Unix
|
export Stdio, Tcp, WebSocket, Unix
|
||||||
|
|
||||||
type
|
type
|
||||||
Assertion = Value
|
Assertion = Value
|
||||||
WireRef = sturdy.WireRef
|
Event = protocol.Event
|
||||||
Turn = syndicate.Turn
|
|
||||||
Handle = actors.Handle
|
Handle = actors.Handle
|
||||||
|
Oid = sturdy.Oid
|
||||||
|
Turn = syndicate.Turn
|
||||||
|
WireRef = sturdy.WireRef
|
||||||
|
|
||||||
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
|
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
|
||||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
|
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
|
||||||
|
@ -38,7 +38,6 @@ type
|
||||||
exported: Membrane
|
exported: Membrane
|
||||||
imported: Membrane
|
imported: Membrane
|
||||||
nextLocalOid: Oid
|
nextLocalOid: Oid
|
||||||
pendingTurn: protocol.Turn
|
|
||||||
wireBuf: BufferedDecoder
|
wireBuf: BufferedDecoder
|
||||||
packetWriter: PacketWriter
|
packetWriter: PacketWriter
|
||||||
peer: Cap
|
peer: Cap
|
||||||
|
@ -90,7 +89,7 @@ proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireR
|
||||||
mine: WireRefMine(oid: ws.oid))
|
mine: WireRefMine(oid: ws.oid))
|
||||||
|
|
||||||
proc rewriteOut(relay: Relay; v: Assertion):
|
proc rewriteOut(relay: Relay; v: Assertion):
|
||||||
tuple[rewritten: Value, exported: seq[WireSymbol]] {.closure.} =
|
tuple[rewritten: Value, exported: seq[WireSymbol]] =
|
||||||
var exported: seq[WireSymbol]
|
var exported: seq[WireSymbol]
|
||||||
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
||||||
let o = pr.unembed(Cap); if o.isSome:
|
let o = pr.unembed(Cap); if o.isSome:
|
||||||
|
@ -108,17 +107,15 @@ proc deregister(relay: Relay; h: Handle) =
|
||||||
for e in outbound: releaseCapOut(relay, e)
|
for e in outbound: releaseCapOut(relay, e)
|
||||||
|
|
||||||
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||||
if relay.pendingTurn.len == 0:
|
# TODO: don't send right away.
|
||||||
# If the pending queue is empty then schedule a packet
|
var pendingTurn: protocol.Turn
|
||||||
# to be sent after pending I/O is processed.
|
pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||||
callSoon do ():
|
relay.facet.run do (turn: var Turn):
|
||||||
relay.facet.run do (turn: var Turn):
|
var pkt = Packet(
|
||||||
var pkt = Packet(
|
orKind: PacketKind.Turn,
|
||||||
orKind: PacketKind.Turn,
|
turn: pendingTurn)
|
||||||
turn: move relay.pendingTurn)
|
trace "C: ", pkt
|
||||||
trace "C: ", pkt
|
relay.packetWriter(turn, encode pkt)
|
||||||
relay.packetWriter(turn, encode pkt)
|
|
||||||
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
|
||||||
|
|
||||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||||
send(re.relay, turn, protocol.Oid re.oid, ev)
|
send(re.relay, turn, protocol.Oid re.oid, ev)
|
||||||
|
@ -250,8 +247,8 @@ proc dispatch(relay: Relay; v: Value) =
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
stderr.writeLine("discarding undecoded packet ", v)
|
stderr.writeLine("discarding undecoded packet ", v)
|
||||||
|
|
||||||
proc recv(relay: Relay; buf: seq[byte]) =
|
proc recv(relay: Relay; buf: openarray[byte]; slice: Slice[int]) =
|
||||||
feed(relay.wireBuf, buf)
|
feed(relay.wireBuf, buf, slice)
|
||||||
var pr = decode(relay.wireBuf)
|
var pr = decode(relay.wireBuf)
|
||||||
if pr.isSome: dispatch(relay, pr.get)
|
if pr.isSome: dispatch(relay, pr.get)
|
||||||
|
|
||||||
|
@ -265,7 +262,7 @@ type
|
||||||
nextLocalOid*: Option[Oid]
|
nextLocalOid*: Option[Oid]
|
||||||
|
|
||||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||||
spawn(name, turn) do (turn: var Turn):
|
spawnActor(name, turn) do (turn: var Turn):
|
||||||
let relay = Relay(
|
let relay = Relay(
|
||||||
facet: turn.facet,
|
facet: turn.facet,
|
||||||
packetWriter: opts.packetWriter,
|
packetWriter: opts.packetWriter,
|
||||||
|
@ -300,16 +297,29 @@ proc accepted(cap: Cap): Resolved =
|
||||||
|
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
|
|
||||||
import std/asyncfile
|
import std/[oserrors, posix]
|
||||||
export Unix
|
import pkg/sys/[files, handles, sockets]
|
||||||
|
export transportAddress.Unix
|
||||||
|
|
||||||
type StdioControlEntity = ref object of Entity
|
type StdioControlEntity = ref object of Entity
|
||||||
|
buf: ref seq[byte]
|
||||||
|
relay: Relay
|
||||||
stdin: AsyncFile
|
stdin: AsyncFile
|
||||||
|
|
||||||
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
|
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||||
close(entity.stdin)
|
close(entity.stdin)
|
||||||
close(stdout)
|
|
||||||
|
proc loop(entity: StdioControlEntity) {.asyncio.} =
|
||||||
|
new entity.buf
|
||||||
|
entity.buf[].setLen(0x1000)
|
||||||
|
while true:
|
||||||
|
let n = read(entity.stdin, entity.buf)
|
||||||
|
if n == 0:
|
||||||
|
stderr.writeLine "empty read on stdin, stopping actor"
|
||||||
|
stopActor(entity.relay.facet)
|
||||||
|
else:
|
||||||
|
entity.relay.recv(entity.buf[], 0..<n)
|
||||||
|
|
||||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||||
## Connect to an external dataspace over stdio.
|
## Connect to an external dataspace over stdio.
|
||||||
|
@ -327,99 +337,109 @@ when defined(posix):
|
||||||
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||||
let
|
let
|
||||||
facet = turn.facet
|
facet = turn.facet
|
||||||
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
fd = stdin.getOsFileHandle()
|
||||||
|
flags = fcntl(fd.cint, F_GETFL, 0)
|
||||||
|
if flags < 0: raiseOSError(osLastError())
|
||||||
|
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
|
||||||
|
raiseOSError(osLastError())
|
||||||
|
let entity = StdioControlEntity(
|
||||||
|
relay: relay, stdin: newAsyncFile(FD fd))
|
||||||
publish(turn, ds, TransportConnection(
|
publish(turn, ds, TransportConnection(
|
||||||
`addr`: ta.toPreserves,
|
`addr`: ta.toPreserves,
|
||||||
control: StdioControlEntity(stdin: asyncStdin).newCap(turn),
|
control: newCap(entity, turn),
|
||||||
resolved: relay.peer.accepted,
|
resolved: relay.peer.accepted,
|
||||||
))
|
))
|
||||||
const stdinReadSize = 0x2000
|
discard trampoline:
|
||||||
proc readCb(pktFut: Future[string]) =
|
whelp loop(entity)
|
||||||
if not pktFut.failed:
|
|
||||||
var buf = pktFut.read
|
|
||||||
if buf.len == 0:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
relay.recv(cast[seq[byte]](buf))
|
|
||||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
|
||||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
|
||||||
|
|
||||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||||
## Connect to an external dataspace over stdin and stdout.
|
## Connect to an external dataspace over stdin and stdout.
|
||||||
connectTransport(turn, ds, transportAddress.Stdio())
|
connectTransport(turn, ds, transportAddress.Stdio())
|
||||||
|
|
||||||
import std/asyncnet
|
type
|
||||||
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
TcpEntity = ref object of Entity
|
||||||
|
relay: Relay
|
||||||
|
sock: AsyncConn[sockets.Protocol.TCP]
|
||||||
|
buf: ref seq[byte]
|
||||||
|
alive: bool
|
||||||
|
|
||||||
type SocketControlEntity = ref object of Entity
|
UnixEntity = ref object of Entity
|
||||||
socket: AsyncSocket
|
relay: Relay
|
||||||
|
sock: AsyncConn[sockets.Protocol.Unix]
|
||||||
|
buf: ref seq[byte]
|
||||||
|
alive: bool
|
||||||
|
|
||||||
method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) =
|
SocketEntity = TcpEntity | UnixEntity
|
||||||
|
|
||||||
|
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
|
||||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||||
close(entity.socket)
|
reset entity.alive
|
||||||
|
close(entity.sock)
|
||||||
|
|
||||||
type ShutdownEntity* = ref object of Entity
|
template socketLoop() {.dirty.}=
|
||||||
|
new entity.buf
|
||||||
|
entity.buf[].setLen(0x1000)
|
||||||
|
entity.alive = not entity.alive
|
||||||
|
while entity.alive:
|
||||||
|
let n = read(entity.sock, entity.buf)
|
||||||
|
if n < 0: raiseOSError(osLastError())
|
||||||
|
elif n == 0:
|
||||||
|
stderr.writeLine "empty read on socket, stopping actor"
|
||||||
|
stopActor(entity.relay.facet)
|
||||||
|
else:
|
||||||
|
entity.relay.recv(entity.buf[], 0..<n)
|
||||||
|
stderr.writeLine "breaking socketLoop"
|
||||||
|
|
||||||
|
proc loop(entity: TcpEntity) {.asyncio.} =
|
||||||
|
socketLoop()
|
||||||
|
proc loop(entity: UnixEntity) {.asyncio.} =
|
||||||
|
socketLoop()
|
||||||
|
|
||||||
|
type ShutdownEntity = ref object of Entity
|
||||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||||
stopActor(turn)
|
stopActor(turn)
|
||||||
|
|
||||||
proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) =
|
template bootSocketEntity() {.dirty.} =
|
||||||
proc socketWriter(turn: var Turn; buf: seq[byte]) =
|
proc publish(turn: var Turn) =
|
||||||
asyncCheck(turn, socket.send(cast[string](buf)))
|
publish(turn, ds, TransportConnection(
|
||||||
|
`addr`: ta.toPreserves,
|
||||||
|
control: newCap(entity, turn),
|
||||||
|
resolved: entity.relay.peer.accepted,
|
||||||
|
))
|
||||||
|
run(entity.relay.facet, publish)
|
||||||
|
loop(entity)
|
||||||
|
|
||||||
|
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
|
||||||
|
entity.sock = connectTcpAsync(ta.host, Port ta.port)
|
||||||
|
bootSocketEntity()
|
||||||
|
|
||||||
|
proc boot(entity: UnixEntity; ta: transportAddress.Unix; ds: Cap) {.asyncio.} =
|
||||||
|
entity.sock = connectUnixAsync(ta.path)
|
||||||
|
bootSocketEntity()
|
||||||
|
|
||||||
|
template spawnSocketRelay() {.dirty.} =
|
||||||
|
proc writeConn(turn: var Turn; buf: seq[byte]) =
|
||||||
|
discard trampoline:
|
||||||
|
whelp write(entity.sock, buf)
|
||||||
var ops = RelayActorOptions(
|
var ops = RelayActorOptions(
|
||||||
packetWriter: socketWriter,
|
packetWriter: writeConn,
|
||||||
initialOid: 0.Oid.some,
|
initialOid: 0.Oid.some,
|
||||||
)
|
)
|
||||||
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||||
let facet = turn.facet
|
entity.relay = relay
|
||||||
facet.actor.atExit do (turn: var Turn): close(socket)
|
atExit(turn.facet.actor) do (turn: var Turn):
|
||||||
publish(turn, ds, TransportConnection(
|
entity.alive = false
|
||||||
`addr`: transAddr,
|
close(entity.sock)
|
||||||
control: SocketControlEntity(socket: socket).newCap(turn),
|
discard trampoline:
|
||||||
resolved: relay.peer.accepted,
|
whelp boot(entity, ta, ds)
|
||||||
))
|
|
||||||
const recvSize = 0x4000
|
|
||||||
proc recvCb(pktFut: Future[string]) =
|
|
||||||
if pktFut.failed or pktFut.read.len == 0:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
relay.recv(cast[seq[byte]](pktFut.read))
|
|
||||||
if not socket.isClosed:
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
|
|
||||||
proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) =
|
|
||||||
let facet = turn.facet
|
|
||||||
fut.addCallback do ():
|
|
||||||
run(facet) do (turn: var Turn):
|
|
||||||
if fut.failed:
|
|
||||||
var ass = TransportConnection(
|
|
||||||
`addr`: ta,
|
|
||||||
resolved: Resolved(orKind: ResolvedKind.Rejected),
|
|
||||||
)
|
|
||||||
ass.resolved.rejected.detail = embed fut.error
|
|
||||||
publish(turn, ds, ass)
|
|
||||||
else:
|
|
||||||
connect(turn, ds, ta, socket)
|
|
||||||
|
|
||||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||||
let
|
let entity = TcpEntity()
|
||||||
facet = turn.facet
|
spawnSocketRelay()
|
||||||
socket = newAsyncSocket(
|
|
||||||
domain = AF_INET,
|
|
||||||
sockType = SOCK_STREAM,
|
|
||||||
protocol = IPPROTO_TCP,
|
|
||||||
buffered = false,
|
|
||||||
)
|
|
||||||
connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port))
|
|
||||||
|
|
||||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
||||||
## Relay a dataspace over a UNIX socket.
|
let entity = UnixEntity()
|
||||||
let socket = newAsyncSocket(
|
spawnSocketRelay()
|
||||||
domain = AF_UNIX,
|
|
||||||
sockType = SOCK_STREAM,
|
|
||||||
protocol = cast[Protocol](0),
|
|
||||||
buffered = false)
|
|
||||||
connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path))
|
|
||||||
|
|
||||||
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
||||||
if stepOff < route.pathSteps.len:
|
if stepOff < route.pathSteps.len:
|
||||||
|
@ -465,7 +485,7 @@ proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
||||||
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
|
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
|
||||||
|
|
||||||
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||||
spawn($stepType & "-step", turn) do (turn: var Turn):
|
spawnActor($stepType & "-step", turn) do (turn: var Turn):
|
||||||
let stepPat = grabRecord(stepType, grab())
|
let stepPat = grabRecord(stepType, grab())
|
||||||
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
||||||
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||||
|
@ -487,7 +507,7 @@ proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallbac
|
||||||
|
|
||||||
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||||
## Spawn actors that manage routes and appeasing gatekeepers.
|
## Spawn actors that manage routes and appeasing gatekeepers.
|
||||||
spawn("transport-connector", turn) do (turn: var Turn):
|
spawnActor("transport-connector", turn) do (turn: var Turn):
|
||||||
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||||
# Use a generic pattern and type matching
|
# Use a generic pattern and type matching
|
||||||
# in the during handler because it is easy.
|
# in the during handler because it is easy.
|
||||||
|
@ -496,15 +516,26 @@ proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||||
during(turn, ds, stdioPat) do:
|
during(turn, ds, stdioPat) do:
|
||||||
connectTransport(turn, ds, Stdio())
|
connectTransport(turn, ds, Stdio())
|
||||||
|
|
||||||
|
|
||||||
# TODO: tcp pattern
|
# TODO: tcp pattern
|
||||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
|
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
|
||||||
connectTransport(turn, ds, ta.value)
|
try: connectTransport(turn, ds, ta.value)
|
||||||
|
except CatchableError as e:
|
||||||
|
publish(turn, ds, TransportConnection(
|
||||||
|
`addr`: ta.toPreserve,
|
||||||
|
resolved: rejected(embed e),
|
||||||
|
))
|
||||||
|
|
||||||
# TODO: unix pattern
|
# TODO: unix pattern
|
||||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
|
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
|
||||||
connectTransport(turn, ds, ta.value)
|
try: connectTransport(turn, ds, ta.value)
|
||||||
|
except CatchableError as e:
|
||||||
|
publish(turn, ds, TransportConnection(
|
||||||
|
`addr`: ta.toPreserve,
|
||||||
|
resolved: rejected(embed e),
|
||||||
|
))
|
||||||
|
|
||||||
spawn("path-resolver", turn) do (turn: var Turn):
|
spawnActor("path-resolver", turn) do (turn: var Turn):
|
||||||
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
||||||
during(turn, ds, pat) do (route: Literal[Route]):
|
during(turn, ds, pat) do (route: Literal[Route]):
|
||||||
for i, transAddr in route.value.transports:
|
for i, transAddr in route.value.transports:
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "20240301"
|
version = "20240304"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Syndicated actors for conversational concurrency"
|
description = "Syndicated actors for conversational concurrency"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
|
@ -9,4 +9,4 @@ srcDir = "src"
|
||||||
|
|
||||||
# Dependencies
|
# Dependencies
|
||||||
|
|
||||||
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/alaviss/nim-sys.git", "https://github.com/nim-works/cps"
|
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps"
|
||||||
|
|
Loading…
Reference in New Issue