Compare commits
5 Commits
50a77995bc
...
a052fbb0cb
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | a052fbb0cb | |
Emery Hemingway | 704dd66415 | |
Emery Hemingway | 1c023a6ef8 | |
Emery Hemingway | b6b02bf71a | |
Emery Hemingway | 4ed90b0e02 |
|
@ -1,5 +1,6 @@
|
||||||
{ pkgs ? import <nixpkgs> { } }:
|
let pkgs = import <nixpkgs> { };
|
||||||
pkgs.buildNimPackage {
|
in pkgs.buildNimPackage {
|
||||||
name = "dummy";
|
name = "noiseprotocol";
|
||||||
|
buildInputs = [ pkgs.noise-c ];
|
||||||
lockFile = ./lock.json;
|
lockFile = ./lock.json;
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,11 +296,11 @@ proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
|
||||||
act.enqueue.event.detail.assert.handle = h
|
act.enqueue.event.detail.assert.handle = h
|
||||||
turn.desc.actions.add act
|
turn.desc.actions.add act
|
||||||
|
|
||||||
proc publish*(turn: var Turn; r: Cap; a: Value): Handle =
|
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
|
||||||
result = turn.facet.nextHandle()
|
result = turn.facet.nextHandle()
|
||||||
publish(turn, r, a, result)
|
publish(turn, r, a, result)
|
||||||
|
|
||||||
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle =
|
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
|
||||||
publish(turn, r, a.toPreserves)
|
publish(turn, r, a.toPreserves)
|
||||||
|
|
||||||
proc retract(turn: var Turn; e: OutboundAssertion) =
|
proc retract(turn: var Turn; e: OutboundAssertion) =
|
||||||
|
@ -459,7 +459,7 @@ proc bootActor*(name: string; bootProc: TurnAction): Actor =
|
||||||
else: result.traceStream = openFileStream(path, fmWrite)
|
else: result.traceStream = openFileStream(path, fmWrite)
|
||||||
run(result, bootProc, initialAssertions)
|
run(result, bootProc, initialAssertions)
|
||||||
|
|
||||||
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor =
|
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||||
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
||||||
enqueue(turn, turn.facet) do (turn: var Turn):
|
enqueue(turn, turn.facet) do (turn: var Turn):
|
||||||
var newOutBound: Table[Handle, OutboundAssertion]
|
var newOutBound: Table[Handle, OutboundAssertion]
|
||||||
|
@ -594,6 +594,9 @@ proc newCap*(relay: Facet; e: Entity): Cap =
|
||||||
proc newCap*(turn; e: Entity): Cap =
|
proc newCap*(turn; e: Entity): Cap =
|
||||||
Cap(relay: turn.facet, target: e)
|
Cap(relay: turn.facet, target: e)
|
||||||
|
|
||||||
|
proc newCap*(e: Entity; turn): Cap =
|
||||||
|
Cap(relay: turn.facet, target: e)
|
||||||
|
|
||||||
type SyncContinuation {.final.} = ref object of Entity
|
type SyncContinuation {.final.} = ref object of Entity
|
||||||
action: TurnAction
|
action: TurnAction
|
||||||
|
|
||||||
|
|
|
@ -66,43 +66,42 @@ proc grab*(pr: Value): Pattern =
|
||||||
$(grab parsePreserves"""<foo "bar" #"00" [0 1 2.0] {maybe: #t} <_>>""") ==
|
$(grab parsePreserves"""<foo "bar" #"00" [0 1 2.0] {maybe: #t} <_>>""") ==
|
||||||
"""<rec foo [<lit "bar"> <lit #"00"> <arr [<lit 0> <lit 1> <lit 2.0>]> <dict {maybe: <lit #t>}> <_>]>"""
|
"""<rec foo [<lit "bar"> <lit #"00"> <arr [<lit 0> <lit 1> <lit 2.0>]> <dict {maybe: <lit #t>}> <_>]>"""
|
||||||
|
|
||||||
if pr.embedded: drop()
|
case pr.kind
|
||||||
else:
|
of pkBoolean:
|
||||||
case pr.kind
|
AnyAtom(orKind: AnyAtomKind.`bool`, bool: pr.bool).toPattern
|
||||||
of pkBoolean:
|
of pkFloat:
|
||||||
AnyAtom(orKind: AnyAtomKind.`bool`, bool: pr.bool).toPattern
|
AnyAtom(orKind: AnyAtomKind.`float`, float: pr.float).toPattern
|
||||||
of pkFloat:
|
of pkDouble:
|
||||||
AnyAtom(orKind: AnyAtomKind.`float`, float: pr.float).toPattern
|
AnyAtom(orKind: AnyAtomKind.`double`, double: pr.double).toPattern
|
||||||
of pkDouble:
|
of pkRegister:
|
||||||
AnyAtom(orKind: AnyAtomKind.`double`, double: pr.double).toPattern
|
AnyAtom(orKind: AnyAtomKind.`int`, int: pr.register).toPattern
|
||||||
of pkRegister:
|
of pkString:
|
||||||
AnyAtom(orKind: AnyAtomKind.`int`, int: pr.register).toPattern
|
AnyAtom(orKind: AnyAtomKind.`string`, string: pr.string).toPattern
|
||||||
of pkString:
|
of pkByteString:
|
||||||
AnyAtom(orKind: AnyAtomKind.`string`, string: pr.string).toPattern
|
AnyAtom(orKind: AnyAtomKind.`bytes`, bytes: pr.bytes).toPattern
|
||||||
of pkByteString:
|
of pkSymbol:
|
||||||
AnyAtom(orKind: AnyAtomKind.`bytes`, bytes: pr.bytes).toPattern
|
AnyAtom(orKind: AnyAtomKind.`symbol`, symbol: pr.symbol).toPattern
|
||||||
of pkSymbol:
|
of pkRecord:
|
||||||
AnyAtom(orKind: AnyAtomKind.`symbol`, symbol: pr.symbol).toPattern
|
if (pr.isRecord("_") and pr.arity == 0) or (pr.isRecord("bind") and pr.arity == 1):
|
||||||
of pkRecord:
|
|
||||||
if (pr.isRecord("_") and pr.arity == 0) or (pr.isRecord("bind") and pr.arity == 1):
|
|
||||||
drop()
|
|
||||||
else:
|
|
||||||
DCompoundRec(
|
|
||||||
label: pr.label,
|
|
||||||
fields: map[Value, Pattern](pr.fields, grab)).toPattern
|
|
||||||
of pkSequence:
|
|
||||||
DCompoundArr(items: map(pr.sequence, grab)).toPattern
|
|
||||||
of pkSet:
|
|
||||||
raiseAssert "cannot construct a pattern over a set literal"
|
|
||||||
of pkDictionary:
|
|
||||||
var dict = DCompoundDict()
|
|
||||||
for key, val in pr.pairs: dict.entries[key] = grab val
|
|
||||||
dict.toPattern
|
|
||||||
of pkEmbedded:
|
|
||||||
# TODO: can patterns be constructed over embedded literals?
|
|
||||||
drop()
|
drop()
|
||||||
else:
|
else:
|
||||||
raise newException(ValueError, "cannot generate a pattern for unhandled Value type")
|
DCompoundRec(
|
||||||
|
label: pr.label,
|
||||||
|
fields: map[Value, Pattern](pr.fields, grab)).toPattern
|
||||||
|
of pkSequence:
|
||||||
|
DCompoundArr(items: map(pr.sequence, grab)).toPattern
|
||||||
|
of pkSet:
|
||||||
|
raiseAssert "cannot construct a pattern over a set literal"
|
||||||
|
of pkDictionary:
|
||||||
|
var dict = DCompoundDict()
|
||||||
|
for key, val in pr.pairs: dict.entries[key] = grab val
|
||||||
|
dict.toPattern
|
||||||
|
of pkEmbedded:
|
||||||
|
if pr.embeddedRef.isNil: drop()
|
||||||
|
else:
|
||||||
|
AnyAtom(orKind: AnyAtomKind.`embedded`, embedded: pr.embeddedRef).toPattern
|
||||||
|
else:
|
||||||
|
raise newException(ValueError, "cannot generate a pattern for unhandled Value type")
|
||||||
|
|
||||||
proc grab*[T](x: T): Pattern =
|
proc grab*[T](x: T): Pattern =
|
||||||
## Construct a `Pattern` from value of type `T`.
|
## Construct a `Pattern` from value of type `T`.
|
||||||
|
@ -114,33 +113,6 @@ proc grab*[T](x: T): Pattern =
|
||||||
$grab([0, 1, 2, 3]) == "<arr [<lit 0> <lit 1> <lit 2> <lit 3>]>"
|
$grab([0, 1, 2, 3]) == "<arr [<lit 0> <lit 1> <lit 2> <lit 3>]>"
|
||||||
grab(x.toPreserves)
|
grab(x.toPreserves)
|
||||||
|
|
||||||
proc patternOfType(typ: static typedesc; `bind`: static bool): Pattern =
|
|
||||||
when typ is ref:
|
|
||||||
patternOfType(pointerBase(typ), `bind`)
|
|
||||||
elif typ.hasPreservesRecordPragma:
|
|
||||||
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
|
|
||||||
for _, f in fieldPairs(default typ):
|
|
||||||
add(rec.fields, patternOfType(typeof f, `bind`))
|
|
||||||
result = rec.toPattern
|
|
||||||
elif typ.hasPreservesDictionaryPragma:
|
|
||||||
var dict = DCompoundDict()
|
|
||||||
for key, val in fieldPairs(default typ):
|
|
||||||
dict.entries[key.toSymbol] = patternOfType(typeof val, `bind`)
|
|
||||||
dict.toPattern
|
|
||||||
elif typ is tuple:
|
|
||||||
var arr = DCompoundArr()
|
|
||||||
for _, f in fieldPairs(default typ):
|
|
||||||
add(arr.items, patternOfType(typeof f, `bind`))
|
|
||||||
arr.toPattern
|
|
||||||
elif typ is array:
|
|
||||||
var arr = DCompoundArr()
|
|
||||||
arr.items.setLen(len(typ))
|
|
||||||
for e in arr.items.mitems: e = grab()
|
|
||||||
arr.toPattern
|
|
||||||
else:
|
|
||||||
if `bind`: grab()
|
|
||||||
else: drop()
|
|
||||||
|
|
||||||
proc grabType*(typ: static typedesc): Pattern =
|
proc grabType*(typ: static typedesc): Pattern =
|
||||||
## Derive a `Pattern` from type `typ`.
|
## Derive a `Pattern` from type `typ`.
|
||||||
## This works for `tuple` and `object` types but in the
|
## This works for `tuple` and `object` types but in the
|
||||||
|
@ -162,16 +134,61 @@ proc grabType*(typ: static typedesc): Pattern =
|
||||||
"<rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>"
|
"<rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>"
|
||||||
$(grabType ColoredRect) ==
|
$(grabType ColoredRect) ==
|
||||||
"<dict {color: <bind <_>> rect: <rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>}>"
|
"<dict {color: <bind <_>> rect: <rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>}>"
|
||||||
patternOfType(typ, true)
|
when typ is ref:
|
||||||
|
grabType(pointerBase(typ))
|
||||||
proc dropType*(typ: static typedesc): Pattern =
|
elif typ.hasPreservesRecordPragma:
|
||||||
## Derive a `Pattern` from type `typ` without any bindings.
|
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
|
||||||
patternOfType(typ, false)
|
for _, f in fieldPairs(default typ):
|
||||||
|
add(rec.fields, grabType(typeof f))
|
||||||
|
result = rec.toPattern
|
||||||
|
elif typ.hasPreservesDictionaryPragma:
|
||||||
|
var dict = DCompoundDict()
|
||||||
|
for key, val in fieldPairs(default typ):
|
||||||
|
dict.entries[key.toSymbol] = grabType(typeof val)
|
||||||
|
dict.toPattern
|
||||||
|
elif typ is tuple:
|
||||||
|
var arr = DCompoundArr()
|
||||||
|
for _, f in fieldPairs(default typ):
|
||||||
|
add(arr.items, grabType(typeof f))
|
||||||
|
arr.toPattern
|
||||||
|
elif typ is array:
|
||||||
|
var arr = DCompoundArr()
|
||||||
|
arr.items.setLen(len(typ))
|
||||||
|
for e in arr.items.mitems: e = grab()
|
||||||
|
arr.toPattern
|
||||||
|
else:
|
||||||
|
grab()
|
||||||
|
|
||||||
proc fieldCount(T: typedesc): int =
|
proc fieldCount(T: typedesc): int =
|
||||||
for _, _ in fieldPairs(default T):
|
for _, _ in fieldPairs(default T):
|
||||||
inc result
|
inc result
|
||||||
|
|
||||||
|
proc dropType*(typ: static typedesc): Pattern =
|
||||||
|
## Derive a `Pattern` from type `typ` without any bindings.
|
||||||
|
when typ is ref:
|
||||||
|
dropType(pointerBase(typ))
|
||||||
|
elif typ.hasPreservesRecordPragma:
|
||||||
|
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
|
||||||
|
rec.fields.setLen(fieldCount typ)
|
||||||
|
for i, _ in rec.fields:
|
||||||
|
rec.fields[i] = drop()
|
||||||
|
result = rec.toPattern
|
||||||
|
elif typ.hasPreservesDictionaryPragma:
|
||||||
|
DCompoundDict().toPattern
|
||||||
|
elif typ is tuple:
|
||||||
|
var arr = DCompoundArr()
|
||||||
|
arr.items.setLen(len typ)
|
||||||
|
for i, _ in arr.items:
|
||||||
|
arr.items[i] = drop()
|
||||||
|
arr.toPattern
|
||||||
|
elif typ is array:
|
||||||
|
var arr = DCompoundArr()
|
||||||
|
arr.items.setLen(len(typ))
|
||||||
|
for e in arr.items.mitems: e = drop()
|
||||||
|
arr.toPattern
|
||||||
|
else:
|
||||||
|
drop()
|
||||||
|
|
||||||
proc lookup(bindings: openArray[(int, Pattern)]; i: int): Pattern =
|
proc lookup(bindings: openArray[(int, Pattern)]; i: int): Pattern =
|
||||||
for (j, b) in bindings:
|
for (j, b) in bindings:
|
||||||
if i == j: return b
|
if i == j: return b
|
||||||
|
|
|
@ -27,11 +27,10 @@ type
|
||||||
Turn = syndicate.Turn
|
Turn = syndicate.Turn
|
||||||
Handle = actors.Handle
|
Handle = actors.Handle
|
||||||
|
|
||||||
type
|
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure, gcsafe.}
|
||||||
PacketHandler = proc (buf: seq[byte]) {.gcsafe.}
|
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure, gcsafe.}
|
||||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
|
|
||||||
|
|
||||||
Relay* = ref object of RootObj
|
Relay* = ref object
|
||||||
facet: Facet
|
facet: Facet
|
||||||
inboundAssertions: Table[Handle,
|
inboundAssertions: Table[Handle,
|
||||||
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
|
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
|
||||||
|
@ -40,9 +39,9 @@ type
|
||||||
imported: Membrane
|
imported: Membrane
|
||||||
nextLocalOid: Oid
|
nextLocalOid: Oid
|
||||||
pendingTurn: protocol.Turn
|
pendingTurn: protocol.Turn
|
||||||
packetSender: PacketHandler
|
|
||||||
wireBuf: BufferedDecoder
|
wireBuf: BufferedDecoder
|
||||||
untrusted: bool
|
packetWriter: PacketWriter
|
||||||
|
peer: Cap
|
||||||
|
|
||||||
SyncPeerEntity = ref object of Entity
|
SyncPeerEntity = ref object of Entity
|
||||||
relay: Relay
|
relay: Relay
|
||||||
|
@ -108,19 +107,18 @@ proc deregister(relay: Relay; h: Handle) =
|
||||||
if relay.outboundAssertions.pop(h, outbound):
|
if relay.outboundAssertions.pop(h, outbound):
|
||||||
for e in outbound: releaseCapOut(relay, e)
|
for e in outbound: releaseCapOut(relay, e)
|
||||||
|
|
||||||
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||||
if r.pendingTurn.len == 0:
|
if relay.pendingTurn.len == 0:
|
||||||
# If the pending queue is empty then schedule a packet
|
# If the pending queue is empty then schedule a packet
|
||||||
# to be sent after pending I/O is processed.
|
# to be sent after pending I/O is processed.
|
||||||
callSoon do ():
|
callSoon do ():
|
||||||
r.facet.run do (turn: var Turn):
|
relay.facet.run do (turn: var Turn):
|
||||||
var pkt = Packet(
|
var pkt = Packet(
|
||||||
orKind: PacketKind.Turn,
|
orKind: PacketKind.Turn,
|
||||||
turn: move r.pendingTurn)
|
turn: move relay.pendingTurn)
|
||||||
trace "C: ", pkt
|
trace "C: ", pkt
|
||||||
assert(not r.packetSender.isNil, "missing packetSender proc")
|
relay.packetWriter(turn, encode pkt)
|
||||||
r.packetSender(encode pkt)
|
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||||
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
|
||||||
|
|
||||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||||
send(re.relay, turn, protocol.Oid re.oid, ev)
|
send(re.relay, turn, protocol.Oid re.oid, ev)
|
||||||
|
@ -252,35 +250,28 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
stderr.writeLine("discarding undecoded packet ", v)
|
stderr.writeLine("discarding undecoded packet ", v)
|
||||||
|
|
||||||
proc dispatch(relay: Relay; buf: seq[byte]) =
|
proc recv(relay: Relay; buf: seq[byte]) =
|
||||||
feed(relay.wireBuf, buf)
|
feed(relay.wireBuf, buf)
|
||||||
var pr = decode(relay.wireBuf)
|
var pr = decode(relay.wireBuf)
|
||||||
if pr.isSome: dispatch(relay, get pr)
|
if pr.isSome: dispatch(relay, pr.get)
|
||||||
|
|
||||||
type
|
type
|
||||||
RelayOptions* = object of RootObj
|
RelayOptions* = object of RootObj
|
||||||
untrusted*: bool
|
packetWriter*: PacketWriter
|
||||||
|
|
||||||
RelayActorOptions* = object of RelayOptions
|
RelayActorOptions* = object of RelayOptions
|
||||||
initialOid*: Option[Oid]
|
initialOid*: Option[Oid]
|
||||||
initialCap*: Cap
|
initialCap*: Cap
|
||||||
nextLocalOid*: Option[Oid]
|
nextLocalOid*: Option[Oid]
|
||||||
|
|
||||||
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
|
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||||
result = Relay(
|
spawn(name, turn) do (turn: var Turn):
|
||||||
facet: turn.facet,
|
let relay = Relay(
|
||||||
wireBuf: newBufferedDecoder(0),
|
facet: turn.facet,
|
||||||
untrusted: opts.untrusted)
|
packetWriter: opts.packetWriter,
|
||||||
discard result.facet.preventInertCheck()
|
wireBuf: newBufferedDecoder(0),
|
||||||
setup(turn, result)
|
)
|
||||||
|
discard relay.facet.preventInertCheck()
|
||||||
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection =
|
|
||||||
result.`addr` = addrAss
|
|
||||||
result.resolved = Resolved(orKind: ResolvedKind.accepted)
|
|
||||||
result.resolved.accepted.responderSession = ds
|
|
||||||
|
|
||||||
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
|
|
||||||
discard spawn(name, turn) do (turn: var Turn):
|
|
||||||
let relay = newRelay(turn, opts, setup)
|
|
||||||
if not opts.initialCap.isNil:
|
if not opts.initialCap.isNil:
|
||||||
var exported: seq[WireSymbol]
|
var exported: seq[WireSymbol]
|
||||||
discard rewriteCapOut(relay, opts.initialCap, exported)
|
discard rewriteCapOut(relay, opts.initialCap, exported)
|
||||||
|
@ -288,132 +279,241 @@ proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts
|
||||||
relay.nextLocalOid =
|
relay.nextLocalOid =
|
||||||
if oid == 0.Oid: 1.Oid
|
if oid == 0.Oid: 1.Oid
|
||||||
else: oid
|
else: oid
|
||||||
|
assert opts.initialOid.isSome
|
||||||
if opts.initialOid.isSome:
|
if opts.initialOid.isSome:
|
||||||
var
|
var
|
||||||
imported: seq[WireSymbol]
|
imported: seq[WireSymbol]
|
||||||
wr = WireRef(
|
wr = WireRef(
|
||||||
orKind: WireRefKind.mine,
|
orKind: WireRefKind.mine,
|
||||||
mine: WireRefMine(oid: opts.initialOid.get))
|
mine: WireRefMine(oid: opts.initialOid.get))
|
||||||
res = rewriteCapIn(relay, turn.facet, wr, imported)
|
relay.peer = rewriteCapIn(relay, turn.facet, wr, imported)
|
||||||
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
|
assert not relay.peer.isNil
|
||||||
else:
|
setup(turn, relay)
|
||||||
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
|
|
||||||
|
proc rejected(detail: Value): Resolved =
|
||||||
|
result = Resolved(orKind: ResolvedKind.Rejected)
|
||||||
|
result.rejected.detail = detail
|
||||||
|
|
||||||
|
proc accepted(cap: Cap): Resolved =
|
||||||
|
result = Resolved(orKind: ResolvedKind.accepted)
|
||||||
|
result.accepted.responderSession = cap
|
||||||
|
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
import std/asyncnet
|
|
||||||
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
|
||||||
|
|
||||||
type ShutdownEntity* = ref object of Entity
|
|
||||||
|
|
||||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
|
||||||
stopActor(turn)
|
|
||||||
|
|
||||||
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
|
||||||
|
|
||||||
export Tcp
|
|
||||||
|
|
||||||
when defined(posix):
|
|
||||||
import std/asyncfile
|
import std/asyncfile
|
||||||
export Unix
|
export Unix
|
||||||
|
|
||||||
proc newStdioTunnel(facet: Facet; receiver: PacketHandler): PacketHandler =
|
type StdioControlEntity = ref object of Entity
|
||||||
let asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
stdin: AsyncFile
|
||||||
close(stdin)
|
|
||||||
const readSize = 0x2000
|
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||||
proc readCb(fut: Future[string]) {.gcsafe.} =
|
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||||
if fut.failed: terminate(facet, fut.error)
|
close(entity.stdin)
|
||||||
else:
|
close(stdout)
|
||||||
receiver(cast[seq[byte]](fut.read))
|
|
||||||
asyncStdin.read(readSize).addCallback(readCb)
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||||
asyncStdin.read(readSize).addCallback(readCb)
|
## Connect to an external dataspace over stdio.
|
||||||
proc sender(buf: seq[byte]) =
|
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
|
||||||
try:
|
## Blocking write to stdout.
|
||||||
if writeBytes(stdout, buf, 0, buf.len) != buf.len:
|
let n = writeBytes(stdout, buf, 0, buf.len)
|
||||||
raise newException(IOError, "failed to write Preserves to stdout")
|
flushFile(stdout)
|
||||||
flushFile(stdout)
|
if n != buf.len:
|
||||||
except CatchableError as err:
|
stopActor(turn)
|
||||||
terminate(facet, err)
|
var opts = RelayActorOptions(
|
||||||
sender
|
packetWriter: stdoutWriter,
|
||||||
|
initialCap: ds,
|
||||||
|
initialOid: 0.Oid.some,
|
||||||
|
)
|
||||||
|
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||||
|
let
|
||||||
|
facet = turn.facet
|
||||||
|
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
||||||
|
publish(turn, ds, TransportConnection(
|
||||||
|
`addr`: ta.toPreserves,
|
||||||
|
control: StdioControlEntity(stdin: asyncStdin).newCap(turn),
|
||||||
|
resolved: relay.peer.accepted,
|
||||||
|
))
|
||||||
|
const stdinReadSize = 0x2000
|
||||||
|
proc readCb(pktFut: Future[string]) {.gcsafe.} =
|
||||||
|
if not pktFut.failed:
|
||||||
|
var buf = pktFut.read
|
||||||
|
if buf.len == 0:
|
||||||
|
run(facet) do (turn: var Turn): stopActor(turn)
|
||||||
|
else:
|
||||||
|
relay.recv(cast[seq[byte]](buf))
|
||||||
|
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||||
|
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||||
|
|
||||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||||
## Connect to an external dataspace over stdin and stdout.
|
## Connect to an external dataspace over stdin and stdout.
|
||||||
var opts = RelayActorOptions(
|
connectTransport(turn, ds, transportAddress.Stdio())
|
||||||
initialCap: ds,
|
|
||||||
initialOid: 0.Oid.some)
|
|
||||||
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
|
|
||||||
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
|
|
||||||
relay.packetSender = newStdioTunnel(turn.facet, receiver)
|
|
||||||
|
|
||||||
proc newTunnel(facet: Facet; receiver: PacketHandler; socket: AsyncSocket): PacketHandler =
|
import std/asyncnet
|
||||||
const recvSize = 0x2000
|
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
||||||
proc recvCb(fut: Future[string]) {.gcsafe.} =
|
|
||||||
if fut.failed: terminate(facet, fut.error)
|
|
||||||
else:
|
|
||||||
receiver(cast[seq[byte]](fut.read))
|
|
||||||
if not socket.isClosed:
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
proc sender(buf: seq[byte]) =
|
|
||||||
asyncCheck(facet, socket.send(cast[string](buf)))
|
|
||||||
sender
|
|
||||||
|
|
||||||
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; steps: seq[Value]) =
|
type SocketControlEntity = ref object of Entity
|
||||||
## Relay a dataspace over an open `AsyncSocket`.
|
socket: AsyncSocket
|
||||||
var shutdownCap: Cap
|
|
||||||
let
|
method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||||
reenable = turn.facet.preventInertCheck()
|
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||||
connectionClosedCap = newCap(turn, ShutdownEntity())
|
close(entity.socket)
|
||||||
discard bootActor("socket") do (turn: var Turn):
|
|
||||||
var ops = RelayActorOptions(
|
type ShutdownEntity* = ref object of Entity
|
||||||
initialOid: 0.Oid.some)
|
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||||
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
|
stopActor(turn)
|
||||||
let facet = turn.facet
|
|
||||||
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
|
proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) =
|
||||||
relay.packetSender = newTunnel(turn.facet, receiver, socket)
|
proc socketWriter(turn: var Turn; buf: seq[byte]) =
|
||||||
turn.facet.actor.atExit do (turn: var Turn): close(socket)
|
asyncCheck(turn, socket.send(cast[string](buf)))
|
||||||
discard publish(turn, connectionClosedCap, true)
|
var ops = RelayActorOptions(
|
||||||
shutdownCap = newCap(turn, ShutdownEntity())
|
packetWriter: socketWriter,
|
||||||
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value):
|
initialOid: 0.Oid.some,
|
||||||
raise newException(IOError, $detail)
|
)
|
||||||
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap):
|
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||||
run(gatekeeper.relay) do (turn: var Turn):
|
let facet = turn.facet
|
||||||
reenable()
|
facet.actor.atExit do (turn: var Turn): close(socket)
|
||||||
discard publish(turn, shutdownCap, true)
|
publish(turn, ds, TransportConnection(
|
||||||
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
|
`addr`: transAddr,
|
||||||
let facet = inFacet(turn) do (turn: var Turn):
|
control: SocketControlEntity(socket: socket).newCap(turn),
|
||||||
let o = ass.preservesTo Resolved; if o.isSome:
|
resolved: relay.peer.accepted,
|
||||||
discard publish(turn, ds, ResolvePath(
|
))
|
||||||
route: route, `addr`: addrAss, resolved: o.get))
|
const recvSize = 0x4000
|
||||||
proc action(turn: var Turn) =
|
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
||||||
stop(turn, facet)
|
if pktFut.failed or pktFut.read.len == 0:
|
||||||
result = action
|
run(facet) do (turn: var Turn): stopActor(turn)
|
||||||
var resolve = Resolve(
|
else:
|
||||||
step: steps[0],
|
relay.recv(cast[seq[byte]](pktFut.read))
|
||||||
observer: newCap(turn, during(duringCallback)),
|
if not socket.isClosed:
|
||||||
|
socket.recv(recvSize).addCallback(recvCb)
|
||||||
|
socket.recv(recvSize).addCallback(recvCb)
|
||||||
|
|
||||||
|
proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) =
|
||||||
|
let facet = turn.facet
|
||||||
|
fut.addCallback do ():
|
||||||
|
run(facet) do (turn: var Turn):
|
||||||
|
if fut.failed:
|
||||||
|
var ass = TransportConnection(
|
||||||
|
`addr`: ta,
|
||||||
|
resolved: Resolved(orKind: ResolvedKind.Rejected),
|
||||||
)
|
)
|
||||||
discard publish(turn, gatekeeper, resolve)
|
ass.resolved.rejected.detail = embed fut.error
|
||||||
|
publish(turn, ds, ass)
|
||||||
|
else:
|
||||||
|
connect(turn, ds, ta, socket)
|
||||||
|
|
||||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; steps: seq[Value]) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||||
## Relay a dataspace over TCP.
|
let
|
||||||
let socket = newAsyncSocket(
|
facet = turn.facet
|
||||||
domain = AF_INET,
|
socket = newAsyncSocket(
|
||||||
sockType = SOCK_STREAM,
|
domain = AF_INET,
|
||||||
protocol = IPPROTO_TCP,
|
sockType = SOCK_STREAM,
|
||||||
buffered = false)
|
protocol = IPPROTO_TCP,
|
||||||
let fut = connect(socket, transport.host, Port transport.port)
|
buffered = false,
|
||||||
addCallback(fut, turn) do (turn: var Turn):
|
)
|
||||||
connect(turn, ds, route, transport.toPreserves, socket, steps)
|
connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port))
|
||||||
|
|
||||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; steps: seq[Value]) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
||||||
## Relay a dataspace over a UNIX socket.
|
## Relay a dataspace over a UNIX socket.
|
||||||
let socket = newAsyncSocket(
|
let socket = newAsyncSocket(
|
||||||
domain = AF_UNIX,
|
domain = AF_UNIX,
|
||||||
sockType = SOCK_STREAM,
|
sockType = SOCK_STREAM,
|
||||||
protocol = cast[Protocol](0),
|
protocol = cast[Protocol](0),
|
||||||
buffered = false)
|
buffered = false)
|
||||||
let fut = connectUnix(socket, transport.path)
|
connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path))
|
||||||
addCallback(fut, turn) do (turn: var Turn):
|
|
||||||
connect(turn, ds, route, transport.toPreserves, socket, steps)
|
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} =
|
||||||
|
if stepOff < route.pathSteps.len:
|
||||||
|
let
|
||||||
|
step = route.pathSteps[stepOff]
|
||||||
|
rejectPat = ResolvedPathStep?:{
|
||||||
|
0: ?(origin.embed), 1: ?step, 2: ?:Rejected}
|
||||||
|
acceptPat = ResolvedPathStep?:{
|
||||||
|
0: ?(origin.embed), 1: ?step, 2: ?:ResolvedAccepted}
|
||||||
|
onPublish(turn, ds, rejectPat) do (detail: Value):
|
||||||
|
publish(turn, ds, ResolvePath(
|
||||||
|
route: route,
|
||||||
|
`addr`: route.transports[transOff],
|
||||||
|
resolved: detail.rejected,
|
||||||
|
))
|
||||||
|
during(turn, ds, acceptPat) do (next: Cap):
|
||||||
|
walk(turn, ds, next, route, transOff, stepOff.succ)
|
||||||
|
else:
|
||||||
|
publish(turn, ds, ResolvePath(
|
||||||
|
route: route,
|
||||||
|
`addr`: route.transports[transOff],
|
||||||
|
resolved: origin.accepted,
|
||||||
|
))
|
||||||
|
|
||||||
|
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
||||||
|
let rejectPat = TransportConnection ?: {
|
||||||
|
0: ?route.transports[transOff],
|
||||||
|
2: ?:Rejected,
|
||||||
|
}
|
||||||
|
during(turn, ds, rejectPat) do (detail: Value):
|
||||||
|
publish(turn, ds, ResolvePath(
|
||||||
|
route: route,
|
||||||
|
`addr`: route.transports[transOff],
|
||||||
|
resolved: detail.rejected,
|
||||||
|
))
|
||||||
|
let acceptPat = TransportConnection?:{
|
||||||
|
0: ?route.transports[transOff],
|
||||||
|
2: ?:ResolvedAccepted,
|
||||||
|
}
|
||||||
|
onPublish(turn, ds, acceptPat) do (origin: Cap):
|
||||||
|
walk(turn, ds, origin, route, transOff, 0)
|
||||||
|
|
||||||
|
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.gcsafe.}
|
||||||
|
|
||||||
|
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||||
|
spawn($stepType & "-step", turn) do (turn: var Turn):
|
||||||
|
let stepPat = grabRecord(stepType, grab())
|
||||||
|
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
||||||
|
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||||
|
let step = toRecord(stepType, stepDetail.value)
|
||||||
|
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
|
||||||
|
var res = ass.preservesTo Resolved
|
||||||
|
if res.isSome:
|
||||||
|
if res.get.orKind == ResolvedKind.accepted and
|
||||||
|
res.get.accepted.responderSession of Cap:
|
||||||
|
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
|
||||||
|
else:
|
||||||
|
publish(turn, ds, ResolvedPathStep(
|
||||||
|
origin: origin, pathStep: step, resolved: res.get))
|
||||||
|
proc action(turn: var Turn) =
|
||||||
|
stop(turn)
|
||||||
|
result = action
|
||||||
|
publish(turn, origin, Resolve(
|
||||||
|
step: step, observer: newCap(turn, during(duringCallback))))
|
||||||
|
|
||||||
|
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||||
|
## Spawn actors that manage routes and appeasing gatekeepers.
|
||||||
|
spawn("transport-connector", turn) do (turn: var Turn):
|
||||||
|
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||||
|
# Use a generic pattern and type matching
|
||||||
|
# in the during handler because it is easy.
|
||||||
|
|
||||||
|
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
|
||||||
|
during(turn, ds, stdioPat) do:
|
||||||
|
connectTransport(turn, ds, Stdio())
|
||||||
|
|
||||||
|
# TODO: tcp pattern
|
||||||
|
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
|
||||||
|
connectTransport(turn, ds, ta.value)
|
||||||
|
|
||||||
|
# TODO: unix pattern
|
||||||
|
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
|
||||||
|
connectTransport(turn, ds, ta.value)
|
||||||
|
|
||||||
|
spawn("path-resolver", turn) do (turn: var Turn):
|
||||||
|
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
||||||
|
during(turn, ds, pat) do (route: Literal[Route]):
|
||||||
|
for i, transAddr in route.value.transports:
|
||||||
|
connectRoute(turn, ds, route.value, i)
|
||||||
|
|
||||||
|
spawnStepResolver(turn, ds, "ref".toSymbol) do (
|
||||||
|
turn: var Turn, step: Value, origin: Cap, next: Cap):
|
||||||
|
publish(turn, ds, ResolvedPathStep(
|
||||||
|
origin: origin, pathStep: step, resolved: next.accepted))
|
||||||
|
|
||||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||||
|
|
||||||
|
@ -429,21 +529,7 @@ proc envRoute*: Route =
|
||||||
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
||||||
|
|
||||||
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||||
var
|
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
|
||||||
unix: Unix
|
bootProc(turn, dst)
|
||||||
tcp: Tcp
|
|
||||||
stdio: Stdio
|
|
||||||
doAssert(route.transports.len == 1, "only a single transport supported for routes")
|
|
||||||
if unix.fromPreserves route.transports[0]:
|
|
||||||
connect(turn, ds, route, unix, route.pathSteps)
|
|
||||||
elif tcp.fromPreserves route.transports[0]:
|
|
||||||
connect(turn, ds, route, tcp, route.pathSteps)
|
|
||||||
elif stdio.fromPreserves route.transports[0]:
|
|
||||||
doAssert(route.pathSteps.len == 0, "route steps not available over stdio")
|
|
||||||
connectStdio(turn, ds)
|
|
||||||
bootProc(turn, ds)
|
|
||||||
else:
|
|
||||||
raise newException(ValueError, "unsupported route")
|
|
||||||
|
|
||||||
during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap):
|
# TODO: define a runActor that comes preloaded with relaying
|
||||||
bootProc(turn, dest)
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "20240114"
|
version = "20240120"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Syndicated actors for conversational concurrency"
|
description = "Syndicated actors for conversational concurrency"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||||
# SPDX-License-Identifier: Unlicense
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
import std/[asyncdispatch, asyncfile, os, parseopt]
|
import std/[asyncdispatch, asyncfile, parseopt]
|
||||||
import preserves, syndicate, syndicate/relays
|
import preserves, syndicate, syndicate/relays
|
||||||
|
|
||||||
type
|
type
|
||||||
|
@ -49,6 +49,7 @@ proc main =
|
||||||
stderr.writeLine "--user: unspecified"
|
stderr.writeLine "--user: unspecified"
|
||||||
else:
|
else:
|
||||||
runActor("chat") do (turn: var Turn; root: Cap):
|
runActor("chat") do (turn: var Turn; root: Cap):
|
||||||
|
spawnRelays(turn, root)
|
||||||
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
||||||
chat(turn, ds, username)
|
chat(turn, ds, username)
|
||||||
|
|
||||||
|
|
|
@ -82,8 +82,8 @@ suite "protocol":
|
||||||
|
|
||||||
test "later-than":
|
test "later-than":
|
||||||
let
|
let
|
||||||
obsA = parsePreserves"""<Observe <rec later-than [<lit 1704113731.419243>]> #!#f>"""
|
obsA = parsePreserves"""<Observe <rec later-than [<lit 1704113731.419243>]> #f>"""
|
||||||
obsB = parsePreserves"""<Observe <rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]> #!#f>"""
|
obsB = parsePreserves"""<Observe <rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]> #f>"""
|
||||||
patA = """<rec later-than [<lit 1704113731.419243>]>""".parsePreserves.preservesTo(Pattern).get
|
patA = """<rec later-than [<lit 1704113731.419243>]>""".parsePreserves.preservesTo(Pattern).get
|
||||||
patB = """<rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]>""".parsePreserves.preservesTo(Pattern).get
|
patB = """<rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]>""".parsePreserves.preservesTo(Pattern).get
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue