Compare commits
4 Commits
75d1e33bff
...
1c023a6ef8
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | 1c023a6ef8 | |
Emery Hemingway | b6b02bf71a | |
Emery Hemingway | 4ed90b0e02 | |
Emery Hemingway | a4ba81a481 |
|
@ -1,5 +1,6 @@
|
||||||
{ pkgs ? import <nixpkgs> { } }:
|
let pkgs = import <nixpkgs> { };
|
||||||
pkgs.buildNimPackage {
|
in pkgs.buildNimPackage {
|
||||||
name = "dummy";
|
name = "noiseprotocol";
|
||||||
|
buildInputs = [ pkgs.noise-c ];
|
||||||
lockFile = ./lock.json;
|
lockFile = ./lock.json;
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,11 +296,11 @@ proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
|
||||||
act.enqueue.event.detail.assert.handle = h
|
act.enqueue.event.detail.assert.handle = h
|
||||||
turn.desc.actions.add act
|
turn.desc.actions.add act
|
||||||
|
|
||||||
proc publish*(turn: var Turn; r: Cap; a: Value): Handle =
|
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
|
||||||
result = turn.facet.nextHandle()
|
result = turn.facet.nextHandle()
|
||||||
publish(turn, r, a, result)
|
publish(turn, r, a, result)
|
||||||
|
|
||||||
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle =
|
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
|
||||||
publish(turn, r, a.toPreserves)
|
publish(turn, r, a.toPreserves)
|
||||||
|
|
||||||
proc retract(turn: var Turn; e: OutboundAssertion) =
|
proc retract(turn: var Turn; e: OutboundAssertion) =
|
||||||
|
@ -459,7 +459,7 @@ proc bootActor*(name: string; bootProc: TurnAction): Actor =
|
||||||
else: result.traceStream = openFileStream(path, fmWrite)
|
else: result.traceStream = openFileStream(path, fmWrite)
|
||||||
run(result, bootProc, initialAssertions)
|
run(result, bootProc, initialAssertions)
|
||||||
|
|
||||||
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor =
|
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||||
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
||||||
enqueue(turn, turn.facet) do (turn: var Turn):
|
enqueue(turn, turn.facet) do (turn: var Turn):
|
||||||
var newOutBound: Table[Handle, OutboundAssertion]
|
var newOutBound: Table[Handle, OutboundAssertion]
|
||||||
|
@ -594,6 +594,9 @@ proc newCap*(relay: Facet; e: Entity): Cap =
|
||||||
proc newCap*(turn; e: Entity): Cap =
|
proc newCap*(turn; e: Entity): Cap =
|
||||||
Cap(relay: turn.facet, target: e)
|
Cap(relay: turn.facet, target: e)
|
||||||
|
|
||||||
|
proc newCap*(e: Entity; turn): Cap =
|
||||||
|
Cap(relay: turn.facet, target: e)
|
||||||
|
|
||||||
type SyncContinuation {.final.} = ref object of Entity
|
type SyncContinuation {.final.} = ref object of Entity
|
||||||
action: TurnAction
|
action: TurnAction
|
||||||
|
|
||||||
|
@ -601,7 +604,6 @@ method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) =
|
||||||
entity.action(turn)
|
entity.action(turn)
|
||||||
|
|
||||||
proc sync*(turn: var Turn; refer: Cap; act: TurnAction) =
|
proc sync*(turn: var Turn; refer: Cap; act: TurnAction) =
|
||||||
let e = SyncContinuation(action: act)
|
|
||||||
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
|
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
|
||||||
|
|
||||||
proc running*(actor): bool =
|
proc running*(actor): bool =
|
||||||
|
|
|
@ -66,43 +66,42 @@ proc grab*(pr: Value): Pattern =
|
||||||
$(grab parsePreserves"""<foo "bar" #"00" [0 1 2.0] {maybe: #t} <_>>""") ==
|
$(grab parsePreserves"""<foo "bar" #"00" [0 1 2.0] {maybe: #t} <_>>""") ==
|
||||||
"""<rec foo [<lit "bar"> <lit #"00"> <arr [<lit 0> <lit 1> <lit 2.0>]> <dict {maybe: <lit #t>}> <_>]>"""
|
"""<rec foo [<lit "bar"> <lit #"00"> <arr [<lit 0> <lit 1> <lit 2.0>]> <dict {maybe: <lit #t>}> <_>]>"""
|
||||||
|
|
||||||
if pr.embedded: drop()
|
case pr.kind
|
||||||
else:
|
of pkBoolean:
|
||||||
case pr.kind
|
AnyAtom(orKind: AnyAtomKind.`bool`, bool: pr.bool).toPattern
|
||||||
of pkBoolean:
|
of pkFloat:
|
||||||
AnyAtom(orKind: AnyAtomKind.`bool`, bool: pr.bool).toPattern
|
AnyAtom(orKind: AnyAtomKind.`float`, float: pr.float).toPattern
|
||||||
of pkFloat:
|
of pkDouble:
|
||||||
AnyAtom(orKind: AnyAtomKind.`float`, float: pr.float).toPattern
|
AnyAtom(orKind: AnyAtomKind.`double`, double: pr.double).toPattern
|
||||||
of pkDouble:
|
of pkRegister:
|
||||||
AnyAtom(orKind: AnyAtomKind.`double`, double: pr.double).toPattern
|
AnyAtom(orKind: AnyAtomKind.`int`, int: pr.register).toPattern
|
||||||
of pkRegister:
|
of pkString:
|
||||||
AnyAtom(orKind: AnyAtomKind.`int`, int: pr.register).toPattern
|
AnyAtom(orKind: AnyAtomKind.`string`, string: pr.string).toPattern
|
||||||
of pkString:
|
of pkByteString:
|
||||||
AnyAtom(orKind: AnyAtomKind.`string`, string: pr.string).toPattern
|
AnyAtom(orKind: AnyAtomKind.`bytes`, bytes: pr.bytes).toPattern
|
||||||
of pkByteString:
|
of pkSymbol:
|
||||||
AnyAtom(orKind: AnyAtomKind.`bytes`, bytes: pr.bytes).toPattern
|
AnyAtom(orKind: AnyAtomKind.`symbol`, symbol: pr.symbol).toPattern
|
||||||
of pkSymbol:
|
of pkRecord:
|
||||||
AnyAtom(orKind: AnyAtomKind.`symbol`, symbol: pr.symbol).toPattern
|
if (pr.isRecord("_") and pr.arity == 0) or (pr.isRecord("bind") and pr.arity == 1):
|
||||||
of pkRecord:
|
|
||||||
if (pr.isRecord("_") and pr.arity == 0) or (pr.isRecord("bind") and pr.arity == 1):
|
|
||||||
drop()
|
|
||||||
else:
|
|
||||||
DCompoundRec(
|
|
||||||
label: pr.label,
|
|
||||||
fields: map[Value, Pattern](pr.fields, grab)).toPattern
|
|
||||||
of pkSequence:
|
|
||||||
DCompoundArr(items: map(pr.sequence, grab)).toPattern
|
|
||||||
of pkSet:
|
|
||||||
raiseAssert "cannot construct a pattern over a set literal"
|
|
||||||
of pkDictionary:
|
|
||||||
var dict = DCompoundDict()
|
|
||||||
for key, val in pr.pairs: dict.entries[key] = grab val
|
|
||||||
dict.toPattern
|
|
||||||
of pkEmbedded:
|
|
||||||
# TODO: can patterns be constructed over embedded literals?
|
|
||||||
drop()
|
drop()
|
||||||
else:
|
else:
|
||||||
raise newException(ValueError, "cannot generate a pattern for unhandled Value type")
|
DCompoundRec(
|
||||||
|
label: pr.label,
|
||||||
|
fields: map[Value, Pattern](pr.fields, grab)).toPattern
|
||||||
|
of pkSequence:
|
||||||
|
DCompoundArr(items: map(pr.sequence, grab)).toPattern
|
||||||
|
of pkSet:
|
||||||
|
raiseAssert "cannot construct a pattern over a set literal"
|
||||||
|
of pkDictionary:
|
||||||
|
var dict = DCompoundDict()
|
||||||
|
for key, val in pr.pairs: dict.entries[key] = grab val
|
||||||
|
dict.toPattern
|
||||||
|
of pkEmbedded:
|
||||||
|
if pr.embeddedRef.isNil: drop()
|
||||||
|
else:
|
||||||
|
AnyAtom(orKind: AnyAtomKind.`embedded`, embedded: pr.embeddedRef).toPattern
|
||||||
|
else:
|
||||||
|
raise newException(ValueError, "cannot generate a pattern for unhandled Value type")
|
||||||
|
|
||||||
proc grab*[T](x: T): Pattern =
|
proc grab*[T](x: T): Pattern =
|
||||||
## Construct a `Pattern` from value of type `T`.
|
## Construct a `Pattern` from value of type `T`.
|
||||||
|
@ -114,33 +113,6 @@ proc grab*[T](x: T): Pattern =
|
||||||
$grab([0, 1, 2, 3]) == "<arr [<lit 0> <lit 1> <lit 2> <lit 3>]>"
|
$grab([0, 1, 2, 3]) == "<arr [<lit 0> <lit 1> <lit 2> <lit 3>]>"
|
||||||
grab(x.toPreserves)
|
grab(x.toPreserves)
|
||||||
|
|
||||||
proc patternOfType(typ: static typedesc; `bind`: static bool): Pattern =
|
|
||||||
when typ is ref:
|
|
||||||
patternOfType(pointerBase(typ), `bind`)
|
|
||||||
elif typ.hasPreservesRecordPragma:
|
|
||||||
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
|
|
||||||
for _, f in fieldPairs(default typ):
|
|
||||||
add(rec.fields, patternOfType(typeof f, `bind`))
|
|
||||||
result = rec.toPattern
|
|
||||||
elif typ.hasPreservesDictionaryPragma:
|
|
||||||
var dict = DCompoundDict()
|
|
||||||
for key, val in fieldPairs(default typ):
|
|
||||||
dict.entries[key.toSymbol] = patternOfType(typeof val, `bind`)
|
|
||||||
dict.toPattern
|
|
||||||
elif typ is tuple:
|
|
||||||
var arr = DCompoundArr()
|
|
||||||
for _, f in fieldPairs(default typ):
|
|
||||||
add(arr.items, patternOfType(typeof f, `bind`))
|
|
||||||
arr.toPattern
|
|
||||||
elif typ is array:
|
|
||||||
var arr = DCompoundArr()
|
|
||||||
arr.items.setLen(len(typ))
|
|
||||||
for e in arr.items.mitems: e = grab()
|
|
||||||
arr.toPattern
|
|
||||||
else:
|
|
||||||
if `bind`: grab()
|
|
||||||
else: drop()
|
|
||||||
|
|
||||||
proc grabType*(typ: static typedesc): Pattern =
|
proc grabType*(typ: static typedesc): Pattern =
|
||||||
## Derive a `Pattern` from type `typ`.
|
## Derive a `Pattern` from type `typ`.
|
||||||
## This works for `tuple` and `object` types but in the
|
## This works for `tuple` and `object` types but in the
|
||||||
|
@ -162,16 +134,61 @@ proc grabType*(typ: static typedesc): Pattern =
|
||||||
"<rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>"
|
"<rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>"
|
||||||
$(grabType ColoredRect) ==
|
$(grabType ColoredRect) ==
|
||||||
"<dict {color: <bind <_>> rect: <rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>}>"
|
"<dict {color: <bind <_>> rect: <rec rect [<arr [<bind <_>> <bind <_>>]> <arr [<bind <_>> <bind <_>>]>]>}>"
|
||||||
patternOfType(typ, true)
|
when typ is ref:
|
||||||
|
grabType(pointerBase(typ))
|
||||||
proc dropType*(typ: static typedesc): Pattern =
|
elif typ.hasPreservesRecordPragma:
|
||||||
## Derive a `Pattern` from type `typ` without any bindings.
|
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
|
||||||
patternOfType(typ, false)
|
for _, f in fieldPairs(default typ):
|
||||||
|
add(rec.fields, grabType(typeof f))
|
||||||
|
result = rec.toPattern
|
||||||
|
elif typ.hasPreservesDictionaryPragma:
|
||||||
|
var dict = DCompoundDict()
|
||||||
|
for key, val in fieldPairs(default typ):
|
||||||
|
dict.entries[key.toSymbol] = grabType(typeof val)
|
||||||
|
dict.toPattern
|
||||||
|
elif typ is tuple:
|
||||||
|
var arr = DCompoundArr()
|
||||||
|
for _, f in fieldPairs(default typ):
|
||||||
|
add(arr.items, grabType(typeof f))
|
||||||
|
arr.toPattern
|
||||||
|
elif typ is array:
|
||||||
|
var arr = DCompoundArr()
|
||||||
|
arr.items.setLen(len(typ))
|
||||||
|
for e in arr.items.mitems: e = grab()
|
||||||
|
arr.toPattern
|
||||||
|
else:
|
||||||
|
grab()
|
||||||
|
|
||||||
proc fieldCount(T: typedesc): int =
|
proc fieldCount(T: typedesc): int =
|
||||||
for _, _ in fieldPairs(default T):
|
for _, _ in fieldPairs(default T):
|
||||||
inc result
|
inc result
|
||||||
|
|
||||||
|
proc dropType*(typ: static typedesc): Pattern =
|
||||||
|
## Derive a `Pattern` from type `typ` without any bindings.
|
||||||
|
when typ is ref:
|
||||||
|
dropType(pointerBase(typ))
|
||||||
|
elif typ.hasPreservesRecordPragma:
|
||||||
|
var rec = DCompoundRec(label: typ.recordLabel.toSymbol)
|
||||||
|
rec.fields.setLen(fieldCount typ)
|
||||||
|
for i, _ in rec.fields:
|
||||||
|
rec.fields[i] = drop()
|
||||||
|
result = rec.toPattern
|
||||||
|
elif typ.hasPreservesDictionaryPragma:
|
||||||
|
DCompoundDict().toPattern
|
||||||
|
elif typ is tuple:
|
||||||
|
var arr = DCompoundArr()
|
||||||
|
arr.items.setLen(len typ)
|
||||||
|
for i, _ in arr.items:
|
||||||
|
arr.items[i] = drop()
|
||||||
|
arr.toPattern
|
||||||
|
elif typ is array:
|
||||||
|
var arr = DCompoundArr()
|
||||||
|
arr.items.setLen(len(typ))
|
||||||
|
for e in arr.items.mitems: e = drop()
|
||||||
|
arr.toPattern
|
||||||
|
else:
|
||||||
|
drop()
|
||||||
|
|
||||||
proc lookup(bindings: openArray[(int, Pattern)]; i: int): Pattern =
|
proc lookup(bindings: openArray[(int, Pattern)]; i: int): Pattern =
|
||||||
for (j, b) in bindings:
|
for (j, b) in bindings:
|
||||||
if i == j: return b
|
if i == j: return b
|
||||||
|
|
|
@ -28,7 +28,7 @@ type
|
||||||
Handle = actors.Handle
|
Handle = actors.Handle
|
||||||
|
|
||||||
type
|
type
|
||||||
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
|
PacketHandler = proc (buf: seq[byte]) {.gcsafe.}
|
||||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
|
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
|
||||||
|
|
||||||
Relay* = ref object of RootObj
|
Relay* = ref object of RootObj
|
||||||
|
@ -40,7 +40,8 @@ type
|
||||||
imported: Membrane
|
imported: Membrane
|
||||||
nextLocalOid: Oid
|
nextLocalOid: Oid
|
||||||
pendingTurn: protocol.Turn
|
pendingTurn: protocol.Turn
|
||||||
packetWriter: PacketWriter
|
packetSender: PacketHandler
|
||||||
|
wireBuf: BufferedDecoder
|
||||||
untrusted: bool
|
untrusted: bool
|
||||||
|
|
||||||
SyncPeerEntity = ref object of Entity
|
SyncPeerEntity = ref object of Entity
|
||||||
|
@ -117,8 +118,8 @@ proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||||
orKind: PacketKind.Turn,
|
orKind: PacketKind.Turn,
|
||||||
turn: move r.pendingTurn)
|
turn: move r.pendingTurn)
|
||||||
trace "C: ", pkt
|
trace "C: ", pkt
|
||||||
assert(not r.packetWriter.isNil, "missing packetWriter proc")
|
assert(not r.packetSender.isNil, "missing packetSender proc")
|
||||||
asyncCheck(turn, r.packetWriter(pkt))
|
r.packetSender(encode pkt)
|
||||||
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||||
|
|
||||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||||
|
@ -197,7 +198,7 @@ proc rewriteIn(relay; facet; v: Value):
|
||||||
|
|
||||||
proc close(r: Relay) = discard
|
proc close(r: Relay) = discard
|
||||||
|
|
||||||
proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
||||||
case event.orKind
|
case event.orKind
|
||||||
of EventKind.Assert:
|
of EventKind.Assert:
|
||||||
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
||||||
|
@ -225,7 +226,7 @@ proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.}
|
||||||
for e in imported: relay.imported.del e
|
for e in imported: relay.imported.del e
|
||||||
]#
|
]#
|
||||||
|
|
||||||
proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
|
proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
||||||
trace "S: ", v
|
trace "S: ", v
|
||||||
run(relay.facet) do (t: var Turn):
|
run(relay.facet) do (t: var Turn):
|
||||||
var pkt: Packet
|
var pkt: Packet
|
||||||
|
@ -251,9 +252,13 @@ proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
stderr.writeLine("discarding undecoded packet ", v)
|
stderr.writeLine("discarding undecoded packet ", v)
|
||||||
|
|
||||||
|
proc dispatch(relay: Relay; buf: seq[byte]) =
|
||||||
|
feed(relay.wireBuf, buf)
|
||||||
|
var pr = decode(relay.wireBuf)
|
||||||
|
if pr.isSome: dispatch(relay, get pr)
|
||||||
|
|
||||||
type
|
type
|
||||||
RelayOptions* = object of RootObj
|
RelayOptions* = object of RootObj
|
||||||
packetWriter*: PacketWriter
|
|
||||||
untrusted*: bool
|
untrusted*: bool
|
||||||
RelayActorOptions* = object of RelayOptions
|
RelayActorOptions* = object of RelayOptions
|
||||||
initialOid*: Option[Oid]
|
initialOid*: Option[Oid]
|
||||||
|
@ -263,7 +268,7 @@ type
|
||||||
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
|
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
|
||||||
result = Relay(
|
result = Relay(
|
||||||
facet: turn.facet,
|
facet: turn.facet,
|
||||||
packetWriter: opts.packetWriter,
|
wireBuf: newBufferedDecoder(0),
|
||||||
untrusted: opts.untrusted)
|
untrusted: opts.untrusted)
|
||||||
discard result.facet.preventInertCheck()
|
discard result.facet.preventInertCheck()
|
||||||
setup(turn, result)
|
setup(turn, result)
|
||||||
|
@ -308,39 +313,63 @@ type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||||
export Tcp
|
export Tcp
|
||||||
|
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
|
import std/asyncfile
|
||||||
export Unix
|
export Unix
|
||||||
|
|
||||||
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Value) =
|
proc newStdioTunnel(facet: Facet; receiver: PacketHandler): PacketHandler =
|
||||||
## Relay a dataspace over an open `AsyncSocket`.
|
let asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
||||||
proc socketWriter(packet: sink Packet): Future[void] =
|
close(stdin)
|
||||||
socket.send(cast[string](encode(packet)))
|
const readSize = 0x2000
|
||||||
|
proc readCb(fut: Future[string]) {.gcsafe.} =
|
||||||
|
if fut.failed: terminate(facet, fut.error)
|
||||||
|
else:
|
||||||
|
receiver(cast[seq[byte]](fut.read))
|
||||||
|
asyncStdin.read(readSize).addCallback(readCb)
|
||||||
|
asyncStdin.read(readSize).addCallback(readCb)
|
||||||
|
proc sender(buf: seq[byte]) =
|
||||||
|
try:
|
||||||
|
if writeBytes(stdout, buf, 0, buf.len) != buf.len:
|
||||||
|
raise newException(IOError, "failed to write Preserves to stdout")
|
||||||
|
flushFile(stdout)
|
||||||
|
except CatchableError as err:
|
||||||
|
terminate(facet, err)
|
||||||
|
sender
|
||||||
|
|
||||||
|
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||||
|
## Connect to an external dataspace over stdin and stdout.
|
||||||
|
var opts = RelayActorOptions(
|
||||||
|
initialCap: ds,
|
||||||
|
initialOid: 0.Oid.some)
|
||||||
|
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
|
||||||
|
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
|
||||||
|
relay.packetSender = newStdioTunnel(turn.facet, receiver)
|
||||||
|
|
||||||
|
proc newTunnel(facet: Facet; receiver: PacketHandler; socket: AsyncSocket): PacketHandler =
|
||||||
const recvSize = 0x2000
|
const recvSize = 0x2000
|
||||||
|
proc recvCb(fut: Future[string]) {.gcsafe.} =
|
||||||
|
if fut.failed: terminate(facet, fut.error)
|
||||||
|
else:
|
||||||
|
receiver(cast[seq[byte]](fut.read))
|
||||||
|
if not socket.isClosed:
|
||||||
|
socket.recv(recvSize).addCallback(recvCb)
|
||||||
|
socket.recv(recvSize).addCallback(recvCb)
|
||||||
|
proc sender(buf: seq[byte]) =
|
||||||
|
asyncCheck(facet, socket.send(cast[string](buf)))
|
||||||
|
sender
|
||||||
|
|
||||||
|
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; steps: seq[Value]) =
|
||||||
|
## Relay a dataspace over an open `AsyncSocket`.
|
||||||
var shutdownCap: Cap
|
var shutdownCap: Cap
|
||||||
let
|
let
|
||||||
reenable = turn.facet.preventInertCheck()
|
reenable = turn.facet.preventInertCheck()
|
||||||
connectionClosedCap = newCap(turn, ShutdownEntity())
|
connectionClosedCap = newCap(turn, ShutdownEntity())
|
||||||
discard bootActor("socket") do (turn: var Turn):
|
discard bootActor("socket") do (turn: var Turn):
|
||||||
var ops = RelayActorOptions(
|
var ops = RelayActorOptions(
|
||||||
packetWriter: socketWriter,
|
|
||||||
initialOid: 0.Oid.some)
|
initialOid: 0.Oid.some)
|
||||||
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
|
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
|
||||||
let facet = turn.facet
|
let facet = turn.facet
|
||||||
var wireBuf = newBufferedDecoder(0)
|
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
|
||||||
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
relay.packetSender = newTunnel(turn.facet, receiver, socket)
|
||||||
if pktFut.failed:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
var buf = pktFut.read
|
|
||||||
if buf.len == 0:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
feed(wireBuf, buf)
|
|
||||||
var (success, pr) = decode(wireBuf)
|
|
||||||
if success:
|
|
||||||
dispatch(relay, pr)
|
|
||||||
if not socket.isClosed:
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
turn.facet.actor.atExit do (turn: var Turn): close(socket)
|
turn.facet.actor.atExit do (turn: var Turn): close(socket)
|
||||||
discard publish(turn, connectionClosedCap, true)
|
discard publish(turn, connectionClosedCap, true)
|
||||||
shutdownCap = newCap(turn, ShutdownEntity())
|
shutdownCap = newCap(turn, ShutdownEntity())
|
||||||
|
@ -359,12 +388,12 @@ when defined(posix):
|
||||||
stop(turn, facet)
|
stop(turn, facet)
|
||||||
result = action
|
result = action
|
||||||
var resolve = Resolve(
|
var resolve = Resolve(
|
||||||
step: step,
|
step: steps[0],
|
||||||
observer: newCap(turn, during(duringCallback)),
|
observer: newCap(turn, during(duringCallback)),
|
||||||
)
|
)
|
||||||
discard publish(turn, gatekeeper, resolve)
|
discard publish(turn, gatekeeper, resolve)
|
||||||
|
|
||||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Value) =
|
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; steps: seq[Value]) =
|
||||||
## Relay a dataspace over TCP.
|
## Relay a dataspace over TCP.
|
||||||
let socket = newAsyncSocket(
|
let socket = newAsyncSocket(
|
||||||
domain = AF_INET,
|
domain = AF_INET,
|
||||||
|
@ -373,9 +402,9 @@ when defined(posix):
|
||||||
buffered = false)
|
buffered = false)
|
||||||
let fut = connect(socket, transport.host, Port transport.port)
|
let fut = connect(socket, transport.host, Port transport.port)
|
||||||
addCallback(fut, turn) do (turn: var Turn):
|
addCallback(fut, turn) do (turn: var Turn):
|
||||||
connect(turn, ds, route, transport.toPreserves, socket, step)
|
connect(turn, ds, route, transport.toPreserves, socket, steps)
|
||||||
|
|
||||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Value) =
|
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; steps: seq[Value]) =
|
||||||
## Relay a dataspace over a UNIX socket.
|
## Relay a dataspace over a UNIX socket.
|
||||||
let socket = newAsyncSocket(
|
let socket = newAsyncSocket(
|
||||||
domain = AF_UNIX,
|
domain = AF_UNIX,
|
||||||
|
@ -384,44 +413,7 @@ when defined(posix):
|
||||||
buffered = false)
|
buffered = false)
|
||||||
let fut = connectUnix(socket, transport.path)
|
let fut = connectUnix(socket, transport.path)
|
||||||
addCallback(fut, turn) do (turn: var Turn):
|
addCallback(fut, turn) do (turn: var Turn):
|
||||||
connect(turn, ds, route, transport.toPreserves, socket, step)
|
connect(turn, ds, route, transport.toPreserves, socket, steps)
|
||||||
|
|
||||||
import std/asyncfile
|
|
||||||
|
|
||||||
const stdinReadSize = 128
|
|
||||||
|
|
||||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
|
||||||
## Connect to an external dataspace over stdin and stdout.
|
|
||||||
proc stdoutWriter(packet: sink Packet): Future[void] =
|
|
||||||
result = newFuture[void]()
|
|
||||||
var buf = encode(packet)
|
|
||||||
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
|
|
||||||
flushFile(stdout)
|
|
||||||
complete result
|
|
||||||
var opts = RelayActorOptions(
|
|
||||||
packetWriter: stdoutWriter,
|
|
||||||
initialCap: ds,
|
|
||||||
initialOid: 0.Oid.some)
|
|
||||||
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
|
|
||||||
let
|
|
||||||
facet = turn.facet
|
|
||||||
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
|
||||||
close(stdin)
|
|
||||||
facet.actor.atExit do (turn: var Turn):
|
|
||||||
close(asyncStdin)
|
|
||||||
var wireBuf = newBufferedDecoder(0)
|
|
||||||
proc readCb(pktFut: Future[string]) {.gcsafe.} =
|
|
||||||
if not pktFut.failed:
|
|
||||||
var buf = pktFut.read
|
|
||||||
if buf.len == 0:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
feed(wireBuf, buf)
|
|
||||||
var (success, pr) = decode(wireBuf)
|
|
||||||
if success:
|
|
||||||
dispatch(relay, pr)
|
|
||||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
|
||||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
|
||||||
|
|
||||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||||
|
|
||||||
|
@ -442,12 +434,12 @@ proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||||
tcp: Tcp
|
tcp: Tcp
|
||||||
stdio: Stdio
|
stdio: Stdio
|
||||||
doAssert(route.transports.len == 1, "only a single transport supported for routes")
|
doAssert(route.transports.len == 1, "only a single transport supported for routes")
|
||||||
doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes")
|
|
||||||
if unix.fromPreserves route.transports[0]:
|
if unix.fromPreserves route.transports[0]:
|
||||||
connect(turn, ds, route, unix, route.pathSteps[0])
|
connect(turn, ds, route, unix, route.pathSteps)
|
||||||
elif tcp.fromPreserves route.transports[0]:
|
elif tcp.fromPreserves route.transports[0]:
|
||||||
connect(turn, ds, route, tcp, route.pathSteps[0])
|
connect(turn, ds, route, tcp, route.pathSteps)
|
||||||
elif stdio.fromPreserves route.transports[0]:
|
elif stdio.fromPreserves route.transports[0]:
|
||||||
|
doAssert(route.pathSteps.len == 0, "route steps not available over stdio")
|
||||||
connectStdio(turn, ds)
|
connectStdio(turn, ds)
|
||||||
bootProc(turn, ds)
|
bootProc(turn, ds)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "20240114"
|
version = "20240119"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Syndicated actors for conversational concurrency"
|
description = "Syndicated actors for conversational concurrency"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
|
@ -9,4 +9,4 @@ srcDir = "src"
|
||||||
|
|
||||||
# Dependencies
|
# Dependencies
|
||||||
|
|
||||||
requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240108"
|
requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240116"
|
||||||
|
|
|
@ -82,8 +82,8 @@ suite "protocol":
|
||||||
|
|
||||||
test "later-than":
|
test "later-than":
|
||||||
let
|
let
|
||||||
obsA = parsePreserves"""<Observe <rec later-than [<lit 1704113731.419243>]> #!#f>"""
|
obsA = parsePreserves"""<Observe <rec later-than [<lit 1704113731.419243>]> #f>"""
|
||||||
obsB = parsePreserves"""<Observe <rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]> #!#f>"""
|
obsB = parsePreserves"""<Observe <rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]> #f>"""
|
||||||
patA = """<rec later-than [<lit 1704113731.419243>]>""".parsePreserves.preservesTo(Pattern).get
|
patA = """<rec later-than [<lit 1704113731.419243>]>""".parsePreserves.preservesTo(Pattern).get
|
||||||
patB = """<rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]>""".parsePreserves.preservesTo(Pattern).get
|
patB = """<rec Observe [<rec rec [<lit later-than> <arr [<rec lit [<bind <_>>]>]>]> <_>]>""".parsePreserves.preservesTo(Pattern).get
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue