Compare commits

...

2 Commits

3 changed files with 238 additions and 151 deletions

View File

@ -27,11 +27,10 @@ type
Turn = syndicate.Turn Turn = syndicate.Turn
Handle = actors.Handle Handle = actors.Handle
type PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure, gcsafe.}
PacketHandler = proc (buf: seq[byte]) {.gcsafe.} RelaySetup = proc (turn: var Turn; relay: Relay) {.closure, gcsafe.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
Relay* = ref object of RootObj Relay* = ref object
facet: Facet facet: Facet
inboundAssertions: Table[Handle, inboundAssertions: Table[Handle,
tuple[localHandle: Handle, imported: seq[WireSymbol]]] tuple[localHandle: Handle, imported: seq[WireSymbol]]]
@ -40,9 +39,9 @@ type
imported: Membrane imported: Membrane
nextLocalOid: Oid nextLocalOid: Oid
pendingTurn: protocol.Turn pendingTurn: protocol.Turn
packetSender: PacketHandler
wireBuf: BufferedDecoder wireBuf: BufferedDecoder
untrusted: bool packetWriter: PacketWriter
peer: Cap
SyncPeerEntity = ref object of Entity SyncPeerEntity = ref object of Entity
relay: Relay relay: Relay
@ -108,19 +107,18 @@ proc deregister(relay: Relay; h: Handle) =
if relay.outboundAssertions.pop(h, outbound): if relay.outboundAssertions.pop(h, outbound):
for e in outbound: releaseCapOut(relay, e) for e in outbound: releaseCapOut(relay, e)
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) = proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
if r.pendingTurn.len == 0: if relay.pendingTurn.len == 0:
# If the pending queue is empty then schedule a packet # If the pending queue is empty then schedule a packet
# to be sent after pending I/O is processed. # to be sent after pending I/O is processed.
callSoon do (): callSoon do ():
r.facet.run do (turn: var Turn): relay.facet.run do (turn: var Turn):
var pkt = Packet( var pkt = Packet(
orKind: PacketKind.Turn, orKind: PacketKind.Turn,
turn: move r.pendingTurn) turn: move relay.pendingTurn)
trace "C: ", pkt trace "C: ", pkt
assert(not r.packetSender.isNil, "missing packetSender proc") relay.packetWriter(turn, encode pkt)
r.packetSender(encode pkt) relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
proc send(re: RelayEntity; turn: var Turn; ev: Event) = proc send(re: RelayEntity; turn: var Turn; ev: Event) =
send(re.relay, turn, protocol.Oid re.oid, ev) send(re.relay, turn, protocol.Oid re.oid, ev)
@ -252,35 +250,28 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
when defined(posix): when defined(posix):
stderr.writeLine("discarding undecoded packet ", v) stderr.writeLine("discarding undecoded packet ", v)
proc dispatch(relay: Relay; buf: seq[byte]) = proc recv(relay: Relay; buf: seq[byte]) =
feed(relay.wireBuf, buf) feed(relay.wireBuf, buf)
var pr = decode(relay.wireBuf) var pr = decode(relay.wireBuf)
if pr.isSome: dispatch(relay, get pr) if pr.isSome: dispatch(relay, pr.get)
type type
RelayOptions* = object of RootObj RelayOptions* = object of RootObj
untrusted*: bool packetWriter*: PacketWriter
RelayActorOptions* = object of RelayOptions RelayActorOptions* = object of RelayOptions
initialOid*: Option[Oid] initialOid*: Option[Oid]
initialCap*: Cap initialCap*: Cap
nextLocalOid*: Option[Oid] nextLocalOid*: Option[Oid]
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay = proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
result = Relay( spawn(name, turn) do (turn: var Turn):
facet: turn.facet, let relay = Relay(
wireBuf: newBufferedDecoder(0), facet: turn.facet,
untrusted: opts.untrusted) packetWriter: opts.packetWriter,
discard result.facet.preventInertCheck() wireBuf: newBufferedDecoder(0),
setup(turn, result) )
discard relay.facet.preventInertCheck()
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection =
result.`addr` = addrAss
result.resolved = Resolved(orKind: ResolvedKind.accepted)
result.resolved.accepted.responderSession = ds
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
discard spawn(name, turn) do (turn: var Turn):
let relay = newRelay(turn, opts, setup)
if not opts.initialCap.isNil: if not opts.initialCap.isNil:
var exported: seq[WireSymbol] var exported: seq[WireSymbol]
discard rewriteCapOut(relay, opts.initialCap, exported) discard rewriteCapOut(relay, opts.initialCap, exported)
@ -288,132 +279,241 @@ proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts
relay.nextLocalOid = relay.nextLocalOid =
if oid == 0.Oid: 1.Oid if oid == 0.Oid: 1.Oid
else: oid else: oid
assert opts.initialOid.isSome
if opts.initialOid.isSome: if opts.initialOid.isSome:
var var
imported: seq[WireSymbol] imported: seq[WireSymbol]
wr = WireRef( wr = WireRef(
orKind: WireRefKind.mine, orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get)) mine: WireRefMine(oid: opts.initialOid.get))
res = rewriteCapIn(relay, turn.facet, wr, imported) relay.peer = rewriteCapIn(relay, turn.facet, wr, imported)
discard publish(turn, ds, transportConnectionResolve(addrAss, res)) assert not relay.peer.isNil
else: setup(turn, relay)
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
proc rejected(detail: Value): Resolved =
result = Resolved(orKind: ResolvedKind.Rejected)
result.rejected.detail = detail
proc accepted(cap: Cap): Resolved =
result = Resolved(orKind: ResolvedKind.accepted)
result.accepted.responderSession = cap
when defined(posix): when defined(posix):
import std/asyncnet
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
type ShutdownEntity* = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
export Tcp
when defined(posix):
import std/asyncfile import std/asyncfile
export Unix export Unix
proc newStdioTunnel(facet: Facet; receiver: PacketHandler): PacketHandler = type StdioControlEntity = ref object of Entity
let asyncStdin = openAsync("/dev/stdin") # this is universal now? stdin: AsyncFile
close(stdin)
const readSize = 0x2000 method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
proc readCb(fut: Future[string]) {.gcsafe.} = if ass.value.preservesTo(ForceDisconnect).isSome:
if fut.failed: terminate(facet, fut.error) close(entity.stdin)
else: close(stdout)
receiver(cast[seq[byte]](fut.read))
asyncStdin.read(readSize).addCallback(readCb) proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
asyncStdin.read(readSize).addCallback(readCb) ## Connect to an external dataspace over stdio.
proc sender(buf: seq[byte]) = proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
try: ## Blocking write to stdout.
if writeBytes(stdout, buf, 0, buf.len) != buf.len: let n = writeBytes(stdout, buf, 0, buf.len)
raise newException(IOError, "failed to write Preserves to stdout") flushFile(stdout)
flushFile(stdout) if n != buf.len:
except CatchableError as err: stopActor(turn)
terminate(facet, err) var opts = RelayActorOptions(
sender packetWriter: stdoutWriter,
initialCap: ds,
initialOid: 0.Oid.some,
)
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin") # this is universal now?
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: StdioControlEntity(stdin: asyncStdin).newCap(turn),
resolved: relay.peer.accepted,
))
const stdinReadSize = 0x2000
proc readCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
relay.recv(cast[seq[byte]](buf))
asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)
proc connectStdio*(turn: var Turn; ds: Cap) = proc connectStdio*(turn: var Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout. ## Connect to an external dataspace over stdin and stdout.
var opts = RelayActorOptions( connectTransport(turn, ds, transportAddress.Stdio())
initialCap: ds,
initialOid: 0.Oid.some)
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
relay.packetSender = newStdioTunnel(turn.facet, receiver)
proc newTunnel(facet: Facet; receiver: PacketHandler; socket: AsyncSocket): PacketHandler = import std/asyncnet
const recvSize = 0x2000 from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
proc recvCb(fut: Future[string]) {.gcsafe.} =
if fut.failed: terminate(facet, fut.error)
else:
receiver(cast[seq[byte]](fut.read))
if not socket.isClosed:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
proc sender(buf: seq[byte]) =
asyncCheck(facet, socket.send(cast[string](buf)))
sender
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; steps: seq[Value]) = type SocketControlEntity = ref object of Entity
## Relay a dataspace over an open `AsyncSocket`. socket: AsyncSocket
var shutdownCap: Cap
let method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) =
reenable = turn.facet.preventInertCheck() if ass.value.preservesTo(ForceDisconnect).isSome:
connectionClosedCap = newCap(turn, ShutdownEntity()) close(entity.socket)
discard bootActor("socket") do (turn: var Turn):
var ops = RelayActorOptions( type ShutdownEntity* = ref object of Entity
initialOid: 0.Oid.some) method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay): stopActor(turn)
let facet = turn.facet
proc receiver(buf: seq[byte]) = dispatch(relay, buf) proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) =
relay.packetSender = newTunnel(turn.facet, receiver, socket) proc socketWriter(turn: var Turn; buf: seq[byte]) =
turn.facet.actor.atExit do (turn: var Turn): close(socket) asyncCheck(turn, socket.send(cast[string](buf)))
discard publish(turn, connectionClosedCap, true) var ops = RelayActorOptions(
shutdownCap = newCap(turn, ShutdownEntity()) packetWriter: socketWriter,
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value): initialOid: 0.Oid.some,
raise newException(IOError, $detail) )
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap): spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
run(gatekeeper.relay) do (turn: var Turn): let facet = turn.facet
reenable() facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, shutdownCap, true) publish(turn, ds, TransportConnection(
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction = `addr`: transAddr,
let facet = inFacet(turn) do (turn: var Turn): control: SocketControlEntity(socket: socket).newCap(turn),
let o = ass.preservesTo Resolved; if o.isSome: resolved: relay.peer.accepted,
discard publish(turn, ds, ResolvePath( ))
route: route, `addr`: addrAss, resolved: o.get)) const recvSize = 0x4000
proc action(turn: var Turn) = proc recvCb(pktFut: Future[string]) {.gcsafe.} =
stop(turn, facet) if pktFut.failed or pktFut.read.len == 0:
result = action run(facet) do (turn: var Turn): stopActor(turn)
var resolve = Resolve( else:
step: steps[0], relay.recv(cast[seq[byte]](pktFut.read))
observer: newCap(turn, during(duringCallback)), if not socket.isClosed:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) =
let facet = turn.facet
fut.addCallback do ():
run(facet) do (turn: var Turn):
if fut.failed:
var ass = TransportConnection(
`addr`: ta,
resolved: Resolved(orKind: ResolvedKind.Rejected),
) )
discard publish(turn, gatekeeper, resolve) ass.resolved.rejected.detail = embed fut.error
publish(turn, ds, ass)
else:
connect(turn, ds, ta, socket)
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; steps: seq[Value]) = proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
## Relay a dataspace over TCP. let
let socket = newAsyncSocket( facet = turn.facet
domain = AF_INET, socket = newAsyncSocket(
sockType = SOCK_STREAM, domain = AF_INET,
protocol = IPPROTO_TCP, sockType = SOCK_STREAM,
buffered = false) protocol = IPPROTO_TCP,
let fut = connect(socket, transport.host, Port transport.port) buffered = false,
addCallback(fut, turn) do (turn: var Turn): )
connect(turn, ds, route, transport.toPreserves, socket, steps) connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port))
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; steps: seq[Value]) = proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
## Relay a dataspace over a UNIX socket. ## Relay a dataspace over a UNIX socket.
let socket = newAsyncSocket( let socket = newAsyncSocket(
domain = AF_UNIX, domain = AF_UNIX,
sockType = SOCK_STREAM, sockType = SOCK_STREAM,
protocol = cast[Protocol](0), protocol = cast[Protocol](0),
buffered = false) buffered = false)
let fut = connectUnix(socket, transport.path) connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path))
addCallback(fut, turn) do (turn: var Turn):
connect(turn, ds, route, transport.toPreserves, socket, steps) proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} =
if stepOff < route.pathSteps.len:
let
step = route.pathSteps[stepOff]
rejectPat = ResolvedPathStep?:{
0: ?(origin.embed), 1: ?step, 2: ?:Rejected}
acceptPat = ResolvedPathStep?:{
0: ?(origin.embed), 1: ?step, 2: ?:ResolvedAccepted}
onPublish(turn, ds, rejectPat) do (detail: Value):
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: detail.rejected,
))
during(turn, ds, acceptPat) do (next: Cap):
walk(turn, ds, next, route, transOff, stepOff.succ)
else:
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: origin.accepted,
))
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
let rejectPat = TransportConnection ?: {
0: ?route.transports[transOff],
2: ?:Rejected,
}
during(turn, ds, rejectPat) do (detail: Value):
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: detail.rejected,
))
let acceptPat = TransportConnection?:{
0: ?route.transports[transOff],
2: ?:ResolvedAccepted,
}
onPublish(turn, ds, acceptPat) do (origin: Cap):
walk(turn, ds, origin, route, transOff, 0)
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.gcsafe.}
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
spawn($stepType & "-step", turn) do (turn: var Turn):
let stepPat = grabRecord(stepType, grab())
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
let step = toRecord(stepType, stepDetail.value)
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
var res = ass.preservesTo Resolved
if res.isSome:
if res.get.orKind == ResolvedKind.accepted and
res.get.accepted.responderSession of Cap:
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
else:
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: res.get))
proc action(turn: var Turn) =
stop(turn)
result = action
publish(turn, origin, Resolve(
step: step, observer: newCap(turn, during(duringCallback))))
proc spawnRelays*(turn: var Turn; ds: Cap) =
## Spawn actors that manage routes and appeasing gatekeepers.
spawn("transport-connector", turn) do (turn: var Turn):
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
# Use a generic pattern and type matching
# in the during handler because it is easy.
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
during(turn, ds, stdioPat) do:
connectTransport(turn, ds, Stdio())
# TODO: tcp pattern
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
connectTransport(turn, ds, ta.value)
# TODO: unix pattern
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
connectTransport(turn, ds, ta.value)
spawn("path-resolver", turn) do (turn: var Turn):
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
during(turn, ds, pat) do (route: Literal[Route]):
for i, transAddr in route.value.transports:
connectRoute(turn, ds, route.value, i)
spawnStepResolver(turn, ds, "ref".toSymbol) do (
turn: var Turn, step: Value, origin: Cap, next: Cap):
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: next.accepted))
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.} type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
@ -429,21 +529,7 @@ proc envRoute*: Route =
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr) raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) = proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
var during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
unix: Unix bootProc(turn, dst)
tcp: Tcp
stdio: Stdio
doAssert(route.transports.len == 1, "only a single transport supported for routes")
if unix.fromPreserves route.transports[0]:
connect(turn, ds, route, unix, route.pathSteps)
elif tcp.fromPreserves route.transports[0]:
connect(turn, ds, route, tcp, route.pathSteps)
elif stdio.fromPreserves route.transports[0]:
doAssert(route.pathSteps.len == 0, "route steps not available over stdio")
connectStdio(turn, ds)
bootProc(turn, ds)
else:
raise newException(ValueError, "unsupported route")
during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap): # TODO: define a runActor that comes preloaded with relaying
bootProc(turn, dest)

View File

@ -1,6 +1,6 @@
# Package # Package
version = "20240119" version = "20240120"
author = "Emery Hemingway" author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency" description = "Syndicated actors for conversational concurrency"
license = "Unlicense" license = "Unlicense"

View File

@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense # SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncfile, os, parseopt] import std/[asyncdispatch, asyncfile, parseopt]
import preserves, syndicate, syndicate/relays import preserves, syndicate, syndicate/relays
type type
@ -49,6 +49,7 @@ proc main =
stderr.writeLine "--user: unspecified" stderr.writeLine "--user: unspecified"
else: else:
runActor("chat") do (turn: var Turn; root: Cap): runActor("chat") do (turn: var Turn; root: Cap):
spawnRelays(turn, root)
resolve(turn, root, route) do (turn: var Turn; ds: Cap): resolve(turn, root, route) do (turn: var Turn; ds: Cap):
chat(turn, ds, username) chat(turn, ds, username)