Compare commits

...

10 Commits

15 changed files with 388 additions and 339 deletions

View File

@ -52,12 +52,12 @@
"packages": [
"preserves"
],
"path": "/nix/store/fpkhfxnfbdcri6k7mac21r3byg738bs4-source",
"ref": "20240108",
"rev": "a01ba8c96d65f670862ba074bf82b50cbda6ed99",
"sha256": "0n8pghy2qfywx0psr54yzjvhdhi5av204150jyyzfxhigczd8sr4",
"path": "/nix/store/6nnn5di5vip1vladlb7z56rbw18d1y7j-source",
"ref": "20240208",
"rev": "2825bceecf33a15b9b7942db5331a32cbc39b281",
"sha256": "145vf46fy3wc52j6vs509fm9bi5lx7c53gskbkpcfbkv82l86dgk",
"srcDir": "src",
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/a01ba8c96d65f670862ba074bf82b50cbda6ed99.tar.gz"
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/2825bceecf33a15b9b7942db5331a32cbc39b281.tar.gz"
}
]
}

View File

@ -171,7 +171,6 @@ proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
of PatternKind.Patom:
result = case p.patom
of PAtom.Boolean: v.isBoolean
of PAtom.Float: v.isFloat
of PAtom.Double: v.isDouble
of PAtom.Signedinteger: v.isInteger
of PAtom.String: v.isString
@ -296,11 +295,11 @@ proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
proc publish*(turn: var Turn; r: Cap; a: Value): Handle =
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
result = turn.facet.nextHandle()
publish(turn, r, a, result)
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle =
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
publish(turn, r, a.toPreserves)
proc retract(turn: var Turn; e: OutboundAssertion) =
@ -459,7 +458,7 @@ proc bootActor*(name: string; bootProc: TurnAction): Actor =
else: result.traceStream = openFileStream(path, fmWrite)
run(result, bootProc, initialAssertions)
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor =
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
let actor = newActor(name, turn.facet.actor.handleAllocator)
enqueue(turn, turn.facet) do (turn: var Turn):
var newOutBound: Table[Handle, OutboundAssertion]
@ -594,6 +593,9 @@ proc newCap*(relay: Facet; e: Entity): Cap =
proc newCap*(turn; e: Entity): Cap =
Cap(relay: turn.facet, target: e)
proc newCap*(e: Entity; turn): Cap =
Cap(relay: turn.facet, target: e)
type SyncContinuation {.final.} = ref object of Entity
action: TurnAction

View File

@ -66,43 +66,40 @@ proc grab*(pr: Value): Pattern =
$(grab parsePreserves"""<foo "bar" #"00" [0 1 2.0] {maybe: #t} <_>>""") ==
"""<rec foo [<lit "bar"> <lit #"00"> <arr [<lit 0> <lit 1> <lit 2.0>]> <dict {maybe: <lit #t>}> <_>]>"""
if pr.embedded: drop()
else:
case pr.kind
of pkBoolean:
AnyAtom(orKind: AnyAtomKind.`bool`, bool: pr.bool).toPattern
of pkFloat:
AnyAtom(orKind: AnyAtomKind.`float`, float: pr.float).toPattern
of pkDouble:
AnyAtom(orKind: AnyAtomKind.`double`, double: pr.double).toPattern
of pkRegister:
AnyAtom(orKind: AnyAtomKind.`int`, int: pr.register).toPattern
of pkString:
AnyAtom(orKind: AnyAtomKind.`string`, string: pr.string).toPattern
of pkByteString:
AnyAtom(orKind: AnyAtomKind.`bytes`, bytes: pr.bytes).toPattern
of pkSymbol:
AnyAtom(orKind: AnyAtomKind.`symbol`, symbol: pr.symbol).toPattern
of pkRecord:
if (pr.isRecord("_") and pr.arity == 0) or (pr.isRecord("bind") and pr.arity == 1):
drop()
else:
DCompoundRec(
label: pr.label,
fields: map[Value, Pattern](pr.fields, grab)).toPattern
of pkSequence:
DCompoundArr(items: map(pr.sequence, grab)).toPattern
of pkSet:
raiseAssert "cannot construct a pattern over a set literal"
of pkDictionary:
var dict = DCompoundDict()
for key, val in pr.pairs: dict.entries[key] = grab val
dict.toPattern
of pkEmbedded:
# TODO: can patterns be constructed over embedded literals?
case pr.kind
of pkBoolean:
AnyAtom(orKind: AnyAtomKind.`bool`, bool: pr.bool).toPattern
of pkFloat:
AnyAtom(orKind: AnyAtomKind.`double`, double: pr.float).toPattern
of pkRegister:
AnyAtom(orKind: AnyAtomKind.`int`, int: pr.register).toPattern
of pkString:
AnyAtom(orKind: AnyAtomKind.`string`, string: pr.string).toPattern
of pkByteString:
AnyAtom(orKind: AnyAtomKind.`bytes`, bytes: pr.bytes).toPattern
of pkSymbol:
AnyAtom(orKind: AnyAtomKind.`symbol`, symbol: pr.symbol).toPattern
of pkRecord:
if (pr.isRecord("_") and pr.arity == 0) or (pr.isRecord("bind") and pr.arity == 1):
drop()
else:
raise newException(ValueError, "cannot generate a pattern for unhandled Value type")
DCompoundRec(
label: pr.label,
fields: map[Value, Pattern](pr.fields, grab)).toPattern
of pkSequence:
DCompoundArr(items: map(pr.sequence, grab)).toPattern
of pkSet:
raiseAssert "cannot construct a pattern over a set literal"
of pkDictionary:
var dict = DCompoundDict()
for key, val in pr.pairs: dict.entries[key] = grab val
dict.toPattern
of pkEmbedded:
if pr.embeddedRef.isNil: drop()
else:
AnyAtom(orKind: AnyAtomKind.`embedded`, embedded: pr.embeddedRef).toPattern
else:
raise newException(ValueError, "cannot generate a pattern for unhandled Value type")
proc grab*[T](x: T): Pattern =
## Construct a `Pattern` from value of type `T`.
@ -114,33 +111,6 @@ proc grab*[T](x: T): Pattern =
$grab([0, 1, 2, 3]) == "<arr [<lit 0> <lit 1> <lit 2> <lit 3>]>"
grab(x.toPreserves)
proc patternOfType(typ: static typedesc; `bind`: static bool): Pattern =
when typ is ref:
patternOfType(pointerBase(typ), `bind`)
elif typ.hasPreservesRecordPragma:
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
for _, f in fieldPairs(default typ):
add(rec.fields, patternOfType(typeof f, `bind`))
result = rec.toPattern
elif typ.hasPreservesDictionaryPragma:
var dict = DCompoundDict()
for key, val in fieldPairs(default typ):
dict.entries[key.toSymbol] = patternOfType(typeof val, `bind`)
dict.toPattern
elif typ is tuple:
var arr = DCompoundArr()
for _, f in fieldPairs(default typ):
add(arr.items, patternOfType(typeof f, `bind`))
arr.toPattern
elif typ is array:
var arr = DCompoundArr()
arr.items.setLen(len(typ))
for e in arr.items.mitems: e = grab()
arr.toPattern
else:
if `bind`: grab()
else: drop()
proc grabType*(typ: static typedesc): Pattern =
## Derive a `Pattern` from type `typ`.
## This works for `tuple` and `object` types but in the
@ -162,16 +132,61 @@ proc grabType*(typ: static typedesc): Pattern =
"<rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>"
$(grabType ColoredRect) ==
"<dict {color: <bind <_>> rect: <rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>}>"
patternOfType(typ, true)
proc dropType*(typ: static typedesc): Pattern =
## Derive a `Pattern` from type `typ` without any bindings.
patternOfType(typ, false)
when typ is ref:
grabType(pointerBase(typ))
elif typ.hasPreservesRecordPragma:
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
for _, f in fieldPairs(default typ):
add(rec.fields, grabType(typeof f))
result = rec.toPattern
elif typ.hasPreservesDictionaryPragma:
var dict = DCompoundDict()
for key, val in fieldPairs(default typ):
dict.entries[key.toSymbol] = grabType(typeof val)
dict.toPattern
elif typ is tuple:
var arr = DCompoundArr()
for _, f in fieldPairs(default typ):
add(arr.items, grabType(typeof f))
arr.toPattern
elif typ is array:
var arr = DCompoundArr()
arr.items.setLen(len(typ))
for e in arr.items.mitems: e = grab()
arr.toPattern
else:
grab()
proc fieldCount(T: typedesc): int =
for _, _ in fieldPairs(default T):
inc result
proc dropType*(typ: static typedesc): Pattern =
## Derive a `Pattern` from type `typ` without any bindings.
when typ is ref:
dropType(pointerBase(typ))
elif typ.hasPreservesRecordPragma:
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
rec.fields.setLen(fieldCount typ)
for i, _ in rec.fields:
rec.fields[i] = drop()
result = rec.toPattern
elif typ.hasPreservesDictionaryPragma:
DCompoundDict().toPattern
elif typ is tuple:
var arr = DCompoundArr()
arr.items.setLen(len typ)
for i, _ in arr.items:
arr.items[i] = drop()
arr.toPattern
elif typ is array:
var arr = DCompoundArr()
arr.items.setLen(len(typ))
for e in arr.items.mitems: e = drop()
arr.toPattern
else:
drop()
proc lookup(bindings: openArray[(int, Pattern)]; i: int): Pattern =
for (j, b) in bindings:
if i == j: return b

View File

@ -6,6 +6,7 @@ modules += http.nim
modules += noise.nim
modules += protocol.nim
modules += service.nim
modules += stdenv.nim
modules += stream.nim
modules += sturdy.nim
modules += tcp.nim

View File

@ -4,17 +4,14 @@ import
type
AnyAtomKind* {.pure.} = enum
`bool`, `float`, `double`, `int`, `string`, `bytes`, `symbol`, `embedded`
`bool`, `double`, `int`, `string`, `bytes`, `symbol`, `embedded`
`AnyAtom`* {.preservesOr.} = object
case orKind*: AnyAtomKind
of AnyAtomKind.`bool`:
`bool`*: bool
of AnyAtomKind.`float`:
`float`*: float32
of AnyAtomKind.`double`:
`double`*: float64
`double`*: float
of AnyAtomKind.`int`:
`int`*: BiggestInt

View File

@ -8,18 +8,18 @@ type
`detail`*: Value
Turn* = seq[TurnEvent]
Message* {.preservesRecord: "message".} = object
Message* {.preservesRecord: "M".} = object
`body`*: Assertion
Retract* {.preservesRecord: "retract".} = object
Retract* {.preservesRecord: "R".} = object
`handle`*: Handle
Assert* {.preservesRecord: "assert".} = object
Assert* {.preservesRecord: "A".} = object
`assertion`*: Assertion
`handle`*: Handle
Extension* = Value
Sync* {.preservesRecord: "sync".} = object
Sync* {.preservesRecord: "S".} = object
`peer`* {.preservesEmbedded.}: Value
TurnEvent* {.preservesTuple.} = object

View File

@ -0,0 +1,40 @@
import
preserves, sturdy, gatekeeper
type
StandardTransportKind* {.pure.} = enum
`wsUrl`, `other`
`StandardTransport`* {.preservesOr.} = object
case orKind*: StandardTransportKind
of StandardTransportKind.`wsUrl`:
`wsurl`*: string
of StandardTransportKind.`other`:
`other`*: Value
StandardRouteKind* {.pure.} = enum
`standard`, `general`
StandardRouteStandard* {.preservesTuple.} = object
`transports`*: seq[StandardTransport]
`key`*: seq[byte]
`service`*: Value
`sig`*: seq[byte]
`oid`*: Value
`caveats`* {.preservesTupleTail.}: seq[sturdy.Caveat]
`StandardRoute`* {.preservesOr.} = object
case orKind*: StandardRouteKind
of StandardRouteKind.`standard`:
`standard`*: StandardRouteStandard
of StandardRouteKind.`general`:
`general`*: gatekeeper.Route
proc `$`*(x: StandardTransport | StandardRoute): string =
`$`(toPreserves(x))
proc encode*(x: StandardTransport | StandardRoute): seq[byte] =
encode(toPreserves(x))

View File

@ -107,8 +107,7 @@ type
SturdyPathStepDetail* = Parameters
`PAtom`* {.preservesOr, pure.} = enum
`Boolean`, `Float`, `Double`, `SignedInteger`, `String`, `ByteString`,
`Symbol`
`Boolean`, `Double`, `SignedInteger`, `String`, `ByteString`, `Symbol`
PDiscard* {.preservesRecord: "_".} = object
TemplateKind* {.pure.} = enum

View File

@ -5,17 +5,17 @@ import
type
TimerExpired* {.preservesRecord: "timer-expired".} = object
`label`*: Value
`seconds`*: float64
`seconds`*: float
SetTimer* {.preservesRecord: "set-timer".} = object
`label`*: Value
`seconds`*: float64
`seconds`*: float
`kind`*: TimerKind
`TimerKind`* {.preservesOr, pure.} = enum
`relative`, `absolute`, `clear`
LaterThan* {.preservesRecord: "later-than".} = object
`seconds`*: float64
`seconds`*: float
proc `$`*(x: TimerExpired | SetTimer | LaterThan): string =
`$`(toPreserves(x))

View File

@ -86,11 +86,11 @@ type
`reason`*: LinkedTaskReleaseReason
TurnCausePeriodicActivation* {.preservesRecord: "periodic-activation".} = object
`period`*: float64
`period`*: float
TurnCauseDelay* {.preservesRecord: "delay".} = object
`causingTurn`*: TurnId
`amount`*: float64
`amount`*: float
TurnCauseExternal* {.preservesRecord: "external".} = object
`description`*: Value
@ -170,7 +170,7 @@ type
TraceEntry* {.preservesRecord: "trace".} = object
`timestamp`*: float64
`timestamp`*: float
`actor`*: ActorId
`item`*: ActorActivation

View File

@ -27,11 +27,10 @@ type
Turn = syndicate.Turn
Handle = actors.Handle
type
PacketHandler = proc (buf: seq[byte]) {.gcsafe.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure, gcsafe.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure, gcsafe.}
Relay* = ref object of RootObj
Relay* = ref object
facet: Facet
inboundAssertions: Table[Handle,
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
@ -40,9 +39,9 @@ type
imported: Membrane
nextLocalOid: Oid
pendingTurn: protocol.Turn
packetSender: PacketHandler
wireBuf: BufferedDecoder
untrusted: bool
packetWriter: PacketWriter
peer: Cap
SyncPeerEntity = ref object of Entity
relay: Relay
@ -108,19 +107,18 @@ proc deregister(relay: Relay; h: Handle) =
if relay.outboundAssertions.pop(h, outbound):
for e in outbound: releaseCapOut(relay, e)
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
if r.pendingTurn.len == 0:
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
if relay.pendingTurn.len == 0:
# If the pending queue is empty then schedule a packet
# to be sent after pending I/O is processed.
callSoon do ():
r.facet.run do (turn: var Turn):
relay.facet.run do (turn: var Turn):
var pkt = Packet(
orKind: PacketKind.Turn,
turn: move r.pendingTurn)
turn: move relay.pendingTurn)
trace "C: ", pkt
assert(not r.packetSender.isNil, "missing packetSender proc")
r.packetSender(encode pkt)
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
relay.packetWriter(turn, encode pkt)
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
send(re.relay, turn, protocol.Oid re.oid, ev)
@ -252,35 +250,28 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
when defined(posix):
stderr.writeLine("discarding undecoded packet ", v)
proc dispatch(relay: Relay; buf: seq[byte]) =
proc recv(relay: Relay; buf: seq[byte]) =
feed(relay.wireBuf, buf)
var pr = decode(relay.wireBuf)
if pr.isSome: dispatch(relay, get pr)
if pr.isSome: dispatch(relay, pr.get)
type
RelayOptions* = object of RootObj
untrusted*: bool
packetWriter*: PacketWriter
RelayActorOptions* = object of RelayOptions
initialOid*: Option[Oid]
initialCap*: Cap
nextLocalOid*: Option[Oid]
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
result = Relay(
facet: turn.facet,
wireBuf: newBufferedDecoder(0),
untrusted: opts.untrusted)
discard result.facet.preventInertCheck()
setup(turn, result)
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection =
result.`addr` = addrAss
result.resolved = Resolved(orKind: ResolvedKind.accepted)
result.resolved.accepted.responderSession = ds
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
discard spawn(name, turn) do (turn: var Turn):
let relay = newRelay(turn, opts, setup)
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
spawn(name, turn) do (turn: var Turn):
let relay = Relay(
facet: turn.facet,
packetWriter: opts.packetWriter,
wireBuf: newBufferedDecoder(0),
)
discard relay.facet.preventInertCheck()
if not opts.initialCap.isNil:
var exported: seq[WireSymbol]
discard rewriteCapOut(relay, opts.initialCap, exported)
@ -288,132 +279,241 @@ proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts
relay.nextLocalOid =
if oid == 0.Oid: 1.Oid
else: oid
assert opts.initialOid.isSome
if opts.initialOid.isSome:
var
imported: seq[WireSymbol]
wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
res = rewriteCapIn(relay, turn.facet, wr, imported)
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
else:
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
relay.peer = rewriteCapIn(relay, turn.facet, wr, imported)
assert not relay.peer.isNil
setup(turn, relay)
proc rejected(detail: Value): Resolved =
result = Resolved(orKind: ResolvedKind.Rejected)
result.rejected.detail = detail
proc accepted(cap: Cap): Resolved =
result = Resolved(orKind: ResolvedKind.accepted)
result.accepted.responderSession = cap
when defined(posix):
import std/asyncnet
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
type ShutdownEntity* = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
export Tcp
when defined(posix):
import std/asyncfile
export Unix
proc newStdioTunnel(facet: Facet; receiver: PacketHandler): PacketHandler =
let asyncStdin = openAsync("/dev/stdin") # this is universal now?
close(stdin)
const readSize = 0x2000
proc readCb(fut: Future[string]) {.gcsafe.} =
if fut.failed: terminate(facet, fut.error)
else:
receiver(cast[seq[byte]](fut.read))
asyncStdin.read(readSize).addCallback(readCb)
asyncStdin.read(readSize).addCallback(readCb)
proc sender(buf: seq[byte]) =
try:
if writeBytes(stdout, buf, 0, buf.len) != buf.len:
raise newException(IOError, "failed to write Preserves to stdout")
flushFile(stdout)
except CatchableError as err:
terminate(facet, err)
sender
type StdioControlEntity = ref object of Entity
stdin: AsyncFile
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
close(entity.stdin)
close(stdout)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
## Connect to an external dataspace over stdio.
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
## Blocking write to stdout.
let n = writeBytes(stdout, buf, 0, buf.len)
flushFile(stdout)
if n != buf.len:
stopActor(turn)
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialCap: ds,
initialOid: 0.Oid.some,
)
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin") # this is universal now?
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: StdioControlEntity(stdin: asyncStdin).newCap(turn),
resolved: relay.peer.accepted,
))
const stdinReadSize = 0x2000
proc readCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
relay.recv(cast[seq[byte]](buf))
asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)
proc connectStdio*(turn: var Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout.
var opts = RelayActorOptions(
initialCap: ds,
initialOid: 0.Oid.some)
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
relay.packetSender = newStdioTunnel(turn.facet, receiver)
connectTransport(turn, ds, transportAddress.Stdio())
proc newTunnel(facet: Facet; receiver: PacketHandler; socket: AsyncSocket): PacketHandler =
const recvSize = 0x2000
proc recvCb(fut: Future[string]) {.gcsafe.} =
if fut.failed: terminate(facet, fut.error)
else:
receiver(cast[seq[byte]](fut.read))
if not socket.isClosed:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
proc sender(buf: seq[byte]) =
asyncCheck(facet, socket.send(cast[string](buf)))
sender
import std/asyncnet
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; steps: seq[Value]) =
## Relay a dataspace over an open `AsyncSocket`.
var shutdownCap: Cap
let
reenable = turn.facet.preventInertCheck()
connectionClosedCap = newCap(turn, ShutdownEntity())
discard bootActor("socket") do (turn: var Turn):
var ops = RelayActorOptions(
initialOid: 0.Oid.some)
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
relay.packetSender = newTunnel(turn.facet, receiver, socket)
turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedCap, true)
shutdownCap = newCap(turn, ShutdownEntity())
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value):
raise newException(IOError, $detail)
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap):
run(gatekeeper.relay) do (turn: var Turn):
reenable()
discard publish(turn, shutdownCap, true)
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
let facet = inFacet(turn) do (turn: var Turn):
let o = ass.preservesTo Resolved; if o.isSome:
discard publish(turn, ds, ResolvePath(
route: route, `addr`: addrAss, resolved: o.get))
proc action(turn: var Turn) =
stop(turn, facet)
result = action
var resolve = Resolve(
step: steps[0],
observer: newCap(turn, during(duringCallback)),
type SocketControlEntity = ref object of Entity
socket: AsyncSocket
method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
close(entity.socket)
type ShutdownEntity* = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) =
proc socketWriter(turn: var Turn; buf: seq[byte]) =
asyncCheck(turn, socket.send(cast[string](buf)))
var ops = RelayActorOptions(
packetWriter: socketWriter,
initialOid: 0.Oid.some,
)
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet
facet.actor.atExit do (turn: var Turn): close(socket)
publish(turn, ds, TransportConnection(
`addr`: transAddr,
control: SocketControlEntity(socket: socket).newCap(turn),
resolved: relay.peer.accepted,
))
const recvSize = 0x4000
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if pktFut.failed or pktFut.read.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
relay.recv(cast[seq[byte]](pktFut.read))
if not socket.isClosed:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) =
let facet = turn.facet
fut.addCallback do ():
run(facet) do (turn: var Turn):
if fut.failed:
var ass = TransportConnection(
`addr`: ta,
resolved: Resolved(orKind: ResolvedKind.Rejected),
)
discard publish(turn, gatekeeper, resolve)
ass.resolved.rejected.detail = embed fut.error
publish(turn, ds, ass)
else:
connect(turn, ds, ta, socket)
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; steps: seq[Value]) =
## Relay a dataspace over TCP.
let socket = newAsyncSocket(
domain = AF_INET,
sockType = SOCK_STREAM,
protocol = IPPROTO_TCP,
buffered = false)
let fut = connect(socket, transport.host, Port transport.port)
addCallback(fut, turn) do (turn: var Turn):
connect(turn, ds, route, transport.toPreserves, socket, steps)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
let
facet = turn.facet
socket = newAsyncSocket(
domain = AF_INET,
sockType = SOCK_STREAM,
protocol = IPPROTO_TCP,
buffered = false,
)
connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port))
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; steps: seq[Value]) =
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
## Relay a dataspace over a UNIX socket.
let socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
let fut = connectUnix(socket, transport.path)
addCallback(fut, turn) do (turn: var Turn):
connect(turn, ds, route, transport.toPreserves, socket, steps)
connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path))
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} =
if stepOff < route.pathSteps.len:
let
step = route.pathSteps[stepOff]
rejectPat = ResolvedPathStep?:{
0: ?(origin.embed), 1: ?step, 2: ?:Rejected}
acceptPat = ResolvedPathStep?:{
0: ?(origin.embed), 1: ?step, 2: ?:ResolvedAccepted}
onPublish(turn, ds, rejectPat) do (detail: Value):
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: detail.rejected,
))
during(turn, ds, acceptPat) do (next: Cap):
walk(turn, ds, next, route, transOff, stepOff.succ)
else:
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: origin.accepted,
))
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
let rejectPat = TransportConnection ?: {
0: ?route.transports[transOff],
2: ?:Rejected,
}
during(turn, ds, rejectPat) do (detail: Value):
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: detail.rejected,
))
let acceptPat = TransportConnection?:{
0: ?route.transports[transOff],
2: ?:ResolvedAccepted,
}
onPublish(turn, ds, acceptPat) do (origin: Cap):
walk(turn, ds, origin, route, transOff, 0)
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.gcsafe.}
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
spawn($stepType & "-step", turn) do (turn: var Turn):
let stepPat = grabRecord(stepType, grab())
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
let step = toRecord(stepType, stepDetail.value)
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
var res = ass.preservesTo Resolved
if res.isSome:
if res.get.orKind == ResolvedKind.accepted and
res.get.accepted.responderSession of Cap:
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
else:
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: res.get))
proc action(turn: var Turn) =
stop(turn)
result = action
publish(turn, origin, Resolve(
step: step, observer: newCap(turn, during(duringCallback))))
proc spawnRelays*(turn: var Turn; ds: Cap) =
## Spawn actors that manage routes and appeasing gatekeepers.
spawn("transport-connector", turn) do (turn: var Turn):
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
# Use a generic pattern and type matching
# in the during handler because it is easy.
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
during(turn, ds, stdioPat) do:
connectTransport(turn, ds, Stdio())
# TODO: tcp pattern
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
connectTransport(turn, ds, ta.value)
# TODO: unix pattern
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
connectTransport(turn, ds, ta.value)
spawn("path-resolver", turn) do (turn: var Turn):
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
during(turn, ds, pat) do (route: Literal[Route]):
for i, transAddr in route.value.transports:
connectRoute(turn, ds, route.value, i)
spawnStepResolver(turn, ds, "ref".toSymbol) do (
turn: var Turn, step: Value, origin: Cap, next: Cap):
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: next.accepted))
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
@ -429,21 +529,7 @@ proc envRoute*: Route =
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
var
unix: Unix
tcp: Tcp
stdio: Stdio
doAssert(route.transports.len == 1, "only a single transport supported for routes")
if unix.fromPreserves route.transports[0]:
connect(turn, ds, route, unix, route.pathSteps)
elif tcp.fromPreserves route.transports[0]:
connect(turn, ds, route, tcp, route.pathSteps)
elif stdio.fromPreserves route.transports[0]:
doAssert(route.pathSteps.len == 0, "route steps not available over stdio")
connectStdio(turn, ds)
bootProc(turn, ds)
else:
raise newException(ValueError, "unsupported route")
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
bootProc(turn, dst)
during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap):
bootProc(turn, dest)
# TODO: define a runActor that comes preloaded with relaying

View File

@ -1,6 +1,6 @@
# Package
version = "20240114"
version = "20240208"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "Unlicense"
@ -9,4 +9,4 @@ srcDir = "src"
# Dependencies
requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240116"
requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208"

View File

@ -1,92 +0,0 @@
#!/usr/bin/env spry
title = "Simple Chat Demo"
# Initialize libui
uiInit
username = "user"
menu = newMenu "Menu"
# menu addItem: "Username" onClicked: [
# dialog = newWindow: "Username" width: 200 height: 40 hasBar: false
# entry = newEntryText: username onChanged: []
# quitit = newButton: "Quit" onClicked: [
# destroy dialog
# true
# ]
# layout = newHorizontalBox
# #layout add: entry stretch: true
# layout.add: quitit stretch: false
# group = newGroup "Username"
# group setChild: layout
# dialog setChild: quitit
# dialog show
# ]
menu menuAppendAboutItem
menu addQuitItemShouldClose: [
win destroy
uiQuit
true
]
# Create a new Window
win = newWindow: title width: 640 height: 400 hasBar: true
win margined: true
# create text boxes
scrollback = newMultilineEntryText
scrollback readonly: true
sendEntry = newMultilineEntryText
# create layouts
layout = newVerticalBox
sendBox = newHorizontalBox
# Some buttons and their handlers
sendButton = newButton: "Send" onClicked: [
msg = (sendEntry text)
msg != "" then: [
scrollback addText: (msg, "\x0a")
sendEntry text: ""
]
]
# Group
group = newGroup "Workspace"
group margined: false
group setChild: layout
sendBox add: sendEntry stretch: true
sendBox add: sendButton stretch: false
# Put things in the boxes
layout padded: true
layout add: scrollback stretch: true
layout add: sendBox stretch: false
# Add box to window
win setChild: group
# Set initial text
sendEntry text: "compose a message here"
# Close handler
closeHandler = [
win destroy
uiQuit
true
]
# Set a handler on closing window
win onClosingShouldClose: [ true ]
# Show the window
win show
# Enter libui's event loop
uiMain

View File

@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncfile, os, parseopt]
import std/[asyncdispatch, asyncfile, parseopt]
import preserves, syndicate, syndicate/relays
type
@ -49,6 +49,7 @@ proc main =
stderr.writeLine "--user: unspecified"
else:
runActor("chat") do (turn: var Turn; root: Cap):
spawnRelays(turn, root)
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
chat(turn, ds, username)

View File

@ -82,8 +82,8 @@ suite "protocol":
test "later-than":
let
obsA = parsePreserves"""<Observe <rec later-than [<lit 1704113731.419243>]> #!#f>"""
obsB = parsePreserves"""<Observe <rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]> #!#f>"""
obsA = parsePreserves"""<Observe <rec later-than [<lit 1704113731.419243>]> #f>"""
obsB = parsePreserves"""<Observe <rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]> #f>"""
patA = """<rec later-than [<lit 1704113731.419243>]>""".parsePreserves.preservesTo(Pattern).get
patB = """<rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]>""".parsePreserves.preservesTo(Pattern).get