Compare commits

...

11 Commits

8 changed files with 295 additions and 179 deletions

View File

@ -60,11 +60,11 @@
"packages": [
"preserves"
],
"path": "/nix/store/6nnn5di5vip1vladlb7z56rbw18d1y7j-source",
"rev": "2825bceecf33a15b9b7942db5331a32cbc39b281",
"sha256": "145vf46fy3wc52j6vs509fm9bi5lx7c53gskbkpcfbkv82l86dgk",
"path": "/nix/store/2hy124xgabz134dxj3wji7mp47fdwy3w-source",
"rev": "9ae435a83c6d5028405538af5d24a023af625b6e",
"sha256": "1k7ywcp1a53x2fpc6wc2b0qzb264dkifash0s1wcp66rw3lx15k2",
"srcDir": "src",
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/2825bceecf33a15b9b7942db5331a32cbc39b281.tar.gz"
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/9ae435a83c6d5028405538af5d24a023af625b6e.tar.gz"
},
{
"method": "fetchzip",

View File

@ -4,7 +4,6 @@
## This module implements the `Syndicate DSL <https://syndicate-lang.org/doc/syndicate/>`_.
import std/[macros, tables, typetraits]
import pkg/sys/ioqueue
import preserves
export fromPreserves, toPreserves
@ -174,7 +173,8 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody, retractBody
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`callbackProc`
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
discard inFacet(`turn`) do (`turn`: var Turn):
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
## Variant of `during` without a retract body.
@ -186,9 +186,5 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`callbackProc`
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
proc runActor*(name: string; bootProc: TurnAction) =
## Boot an actor `Actor` and churn ioqueue once.
discard bootActor(name, bootProc)
actors.run()
discard inFacet(`turn`) do (`turn`: var Turn):
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))

View File

@ -6,14 +6,13 @@ import pkg/cps
import pkg/sys/ioqueue
import preserves
import ../syndicate/protocols/[protocol, sturdy]
import ../syndicate/protocols/[protocol, sturdy, trace]
const tracing = defined(traceSyndicate)
when tracing:
import std/streams
from std/os import getEnv
import ./protocols/trace
export Handle
@ -64,7 +63,7 @@ type
Turn* = ref object
facet: Facet # active facet that may change during a turn
work: Deque[tuple[facet: Facet, act: TurnAction]]
effects: seq[Turn]
effects: Table[Actor, Turn]
when tracing:
desc: TurnDescription
@ -110,39 +109,64 @@ when tracing:
result.add f.id.toPreserves
f = f.parent
proc initEnqueue(turn: Turn; cap: Cap): ActionDescription =
result = ActionDescription(orKind: ActionDescriptionKind.enqueue)
result.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
result.enqueue.event.target.facet = turn.facet.id.toPreserves
result.enqueue.event.target.oid = cap.target.oid.toPreserves
proc toDequeue(act: sink ActionDescription): ActionDescription =
result = ActionDescription(orKind: ActionDescriptionKind.dequeue)
result.dequeue.event = move act.enqueue.event
proc toTraceTarget(cap: Cap): trace.Target =
assert not cap.target.isNil
assert not cap.target.facet.isNil
result.actor = cap.target.facet.actor.id
result.facet = cap.target.facet.id
result.oid = cap.target.oid.toPreserves
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
converter toActor(f: Facet): Actor = f.actor
converter toActor(t: Turn): Actor = t.facet.actor
converter toFacet(a: Actor): Facet = a.root
converter toFacet(t: Turn): Facet = t.facet
using
actor: Actor
facet: Facet
turn: Turn
turn: var Turn
action: TurnAction
proc labels(f: Facet): string =
assert not f.isNil
assert not f.actor.isNil
result.add f.actor.name
proc catLabels(f: Facet; labels: var string) =
labels.add ':'
if not f.parent.isNil:
catLabels(f.parent, labels)
labels.add ':'
when tracing:
labels.add $f.id
result.add f.actor.name
catLabels(f, result)
proc `$`*(f: Facet): string =
"<Facet:" & f.labels & ">"
proc `$`*(r: Cap): string =
"<Ref:" & r.relay.labels & ">"
proc `$`*(actor: Actor): string =
"<Actor:" & actor.name & ">" # TODO: ambigous
proc `$`*(t: Turn): string =
"<Turn:" & $t.desc.id & ">"
when tracing:
proc `$`*(r: Cap): string =
"<Ref:" & r.relay.labels & ">"
proc `$`*(t: Turn): string =
"<Turn:" & $t.desc.id & ">"
proc attenuate*(r: Cap; a: Attenuation): Cap =
if a.len == 0: result = r
@ -151,8 +175,11 @@ proc attenuate*(r: Cap; a: Attenuation): Cap =
relay: r.relay,
attenuation: a & r.attenuation)
proc hash*(actor): Hash =
actor.unsafeAddr.hash
proc hash*(facet): Hash =
facet.id.hash
facet.unsafeAddr.hash
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
@ -162,6 +189,13 @@ proc nextHandle(facet: Facet): Handle =
result = succ(facet.actor.handleAllocator[])
facet.actor.handleAllocator[] = result
template recallFacet(turn: var Turn; body: untyped): untyped =
let facet = turn.facet
block:
body
assert facet.actor == turn.facet.actor
turn.facet = facet
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
assert not facet.isNil
turn.work.addLast((facet, act,))
@ -190,17 +224,19 @@ proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action)
proc facet*(turn: Turn): Facet = turn.facet
proc queueEffect*(turn: var Turn; target: Facet; act: TurnAction) =
if target.actor == turn.facet.actor:
let fremd = target.actor
if fremd == turn.facet.actor:
turn.work.addLast((target, act,))
else:
var next = Turn(facet: target)
assert not target.isNil
next.work.addLast((target, act,))
when tracing:
next.desc.id = nextTurnId()
next.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
next.desc.cause.turn.id = turn.desc.id
turn.effects.add next
var fremdTurn = turn.effects.getOrDefault(fremd)
if fremdTurn.isNil:
fremdTurn = Turn(facet: target)
turn.effects[fremd] = fremdTurn
when tracing:
fremdTurn.desc.id = nextTurnId()
fremdTurn.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
fremdTurn.desc.cause.turn.id = turn.desc.id
fremdTurn.work.addLast((target, act,))
type Bindings = Table[Value, Value]
@ -319,19 +355,21 @@ proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) =
if not a.isFalse:
let e = OutboundAssertion(handle: h, peer: cap)
turn.facet.outbound[h] = e
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
act.enqueue.event.target.facet = turn.facet.id.toPreserves
act.enqueue.event.target.oid = cap.target.oid.toPreserves
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert.assertion.value.value =
mapEmbeds(v) do (cap: Value) -> Value: discard
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
queueEffect(turn, cap.relay) do (turn: var Turn):
e.established = true
when tracing:
turn.desc.actions.add act.toDequeue
publish(cap.target, turn, AssertionRef(value: a), e.handle)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
act.enqueue.event.target.facet = turn.facet.id.toPreserves
act.enqueue.event.target.oid = cap.target.oid.toPreserves
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert.assertion.value.value =
mapEmbeds(v) do (cap: Value) -> Value: discard
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
result = turn.facet.nextHandle()
@ -341,7 +379,14 @@ proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
publish(turn, r, a.toPreserves)
proc retract(turn: var Turn; e: OutboundAssertion) =
when tracing:
var act = initEnqueue(turn, e.peer)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.retract)
act.enqueue.event.detail.retract.handle = e.handle
turn.desc.actions.add act
queueEffect(turn, e.peer.relay) do (turn: var Turn):
when tracing:
turn.desc.actions.add act.toDequeue
if e.established:
e.established = false
e.peer.target.retract(turn, e.handle)
@ -354,18 +399,30 @@ proc retract*(turn: var Turn; h: Handle) =
proc message*(turn: var Turn; r: Cap; v: Value) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
when tracing:
var act = initEnqueue(turn, r)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.message)
act.enqueue.event.detail.message.body.value.value =
mapEmbeds(a) do (cap: Value) -> Value: discard
turn.desc.actions.add act
queueEffect(turn, r.relay) do (turn: var Turn):
when tracing:
turn.desc.actions.add act.toDequeue
r.target.message(turn, AssertionRef(value: a))
proc message*[T](turn: var Turn; r: Cap; v: T) =
message(turn, r, v.toPreserves)
proc sync(turn: var Turn; e: Entity; peer: Cap) =
e.sync(turn, peer)
proc sync*(turn: var Turn; r, peer: Cap) =
when tracing:
var act = initEnqueue(turn, peer)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.sync)
act.enqueue.event.detail.sync.peer = peer.toTraceTarget
turn.desc.actions.add act
queueEffect(turn, r.relay) do (turn: var Turn):
sync(turn, r.target, peer)
when tracing:
turn.desc.actions.add act.toDequeue
r.target.sync(turn, peer)
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
result = publish(turn, cap, v)
@ -401,68 +458,64 @@ proc isInert(facet): bool =
noOutboundHandles = facet.outbound.len == 0
isRootFacet = facet.parent.isNil
noInertCheckPreventers = facet.inertCheckPreventers == 0
noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
result = noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
proc preventInertCheck*(facet): (proc()) {.discardable.} =
var armed = true
inc facet.inertCheckPreventers
proc disarm() =
if armed:
armed = false
dec facet.inertCheckPreventers
result = disarm
proc preventInertCheck*(turn: Turn) =
inc turn.facet.inertCheckPreventers
proc terminate(actor; turn; reason: ref Exception)
proc terminateActor(turn; reason: ref Exception)
proc terminate(facet; turn: var Turn; orderly: bool) =
proc terminateFacetOrderly(turn: var Turn) =
let facet = turn.facet
if facet.isAlive:
facet.isAlive = false
let parent = facet.parent
if not parent.isNil:
parent.children.excl facet
queueWork(turn, facet) do (turn: var Turn):
while facet.children.len > 0:
facet.children.pop.terminate(turn, orderly)
if orderly:
for act in facet.shutdownActions:
act(turn)
for a in facet.outbound.values: turn.retract(a)
if orderly:
if not parent.isNil:
if parent.isInert:
parent.terminate(turn, true)
else:
terminate(facet.actor, turn, nil)
var i = 0
while i < facet.shutdownActions.len:
facet.shutdownActions[i](turn)
inc i
setLen facet.shutdownActions, 0
for e in facet.outbound.values:
retract(turn, e)
clear facet.outbound
proc inertCheck(turn: var Turn) =
if (not turn.facet.parent.isNil and
(not turn.facet.parent.isAlive)) or
turn.facet.isInert:
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
act.facetstop.path = turn.facet.path
act.facetstop.reason = FacetStopReason.inert
turn.desc.actions.add act
stop(turn)
proc terminateFacet(turn: var Turn) =
let facet = turn.facet
for child in facet.children:
queueWork(turn, child, terminateFacetOrderly)
# terminate all children
facet.children.clear()
# detach all children
queueWork(turn, facet, terminateFacetOrderly)
# self-termination
proc stopIfInertAfter(action: TurnAction): TurnAction =
# TODO: verify this
proc wrapper(turn: var Turn) =
action(turn)
queueEffect(turn, turn.facet) do (turn: var Turn):
if (not turn.facet.parent.isNil and
(not turn.facet.parent.isAlive)) or
turn.facet.isInert:
stop(turn)
wrapper
proc work(turn: var Turn) =
queueWork(turn, turn.facet, action)
queueEffect(turn, turn.facet, inertCheck)
work
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
result = newFacet(turn)
let facet = turn.facet
turn.facet = result
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
act.facetstart.path.add result.path
turn.desc.actions.add act
stopIfInertAfter(bootProc)(turn)
turn.facet = facet
proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
recallFacet turn:
turn.facet = result
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
act.facetstart.path.add result.path
turn.desc.actions.add act
stopIfInertAfter(bootProc)(turn)
proc newActor(name: string; parent: Facet): Actor =
result = Actor(
@ -539,7 +592,8 @@ proc newInertCap*(): Cap =
proc atExit*(actor; action) = actor.exitHooks.add action
proc terminate(actor; turn; reason: ref Exception) =
proc terminateActor(turn; reason: ref Exception) =
let actor = turn.actor
if not actor.exiting:
actor.exiting = true
actor.exitReason = reason
@ -549,75 +603,76 @@ proc terminate(actor; turn; reason: ref Exception) =
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
act.stop.status.error.message = reason.msg
trace(actor, act)
for hook in actor.exitHooks: hook(turn)
while actor.exitHooks.len > 0:
var hook = actor.exitHooks.pop()
try: hook(turn)
except CatchableError as err:
if reason.isNil:
terminateActor(turn, err)
return
proc finish(turn: var Turn) =
assert not actor.root.isNil, actor.name
actor.root.terminate(turn, reason.isNil)
terminateFacet(turn)
actor.root = nil
actor.exited = true
queueTurn(actor.root, finish)
proc terminate*(facet; e: ref Exception) =
proc terminateFacet*(facet; e: ref Exception) =
run(facet.actor.root) do (turn: var Turn):
facet.actor.terminate(turn, e)
template recallFacet(turn: var Turn; body: untyped): untyped =
let facet = turn.facet
block:
body
assert facet.actor == turn.facet.actor, "turn of " & $facet.actor & " ended at " & $turn.facet.actor
turn.facet = facet
proc stopNow(turn: var Turn) =
let caller = turn.facet
recallFacet turn:
while caller.children.len > 0:
var child = caller.children.pop()
if child.actor == caller.actor:
turn.facet = child
stopNow(turn)
else:
queueEffect(turn, child, stopNow)
caller.terminate(turn, true)
terminateActor(turn, e)
proc stop*(turn: var Turn, facet: Facet) =
queueEffect(turn, facet, stopNow)
queueEffect(turn, facet) do (turn: var Turn):
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
act.facetstop.reason = FacetStopReason.explicitAction
turn.desc.actions.add act
terminateFacet(turn)
proc stop*(turn: var Turn) =
stop(turn, turn.facet)
proc onStop*(facet: Facet; act: TurnAction) =
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
assert not facet.isNil
add(facet.shutdownActions, act)
proc onStop*(turn: var Turn; act: TurnAction) =
onStop(turn.facet, act)
proc isAlive(actor): bool =
not(actor.exited or actor.exiting)
proc stop*(actor: Actor) =
queueTurn(actor.root) do (turn: var Turn):
assert(not turn.facet.isNil)
stop(turn)
if actor.isAlive:
queueTurn(actor.root) do (turn: var Turn):
assert(not turn.facet.isNil)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = turn.facet.path
act.facetstop.reason = FacetStopReason.actorStopping
turn.desc.actions.add act
stop(turn, turn.facet)
proc stopActor*(facet: Facet) =
stop(facet.actor)
proc stopActor*(turn: var Turn) =
assert(not turn.facet.isNil)
assert(not turn.facet.actor.isNil)
assert(not turn.facet.actor.root.isNil)
stop(turn, turn.facet.actor.root)
proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} =
run(turn.facet, act)
proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} =
Cap(relay: relay, target: e)
proc newCap*(relay: Facet; entity: Entity): Cap =
## Create a new capability for `entity` via `relay`.
# An Entity has an owning facet and a Cap does as well?
if entity.facet.isNil: entity.facet = relay
Cap(relay: relay, target: entity)
proc newCap*(turn; e: Entity): Cap =
Cap(relay: turn.facet, target: e)
newCap(turn.facet, e)
proc newCap*(e: Entity; turn): Cap =
Cap(relay: turn.facet, target: e)
newCap(turn.facet, e)
type SyncContinuation {.final.} = ref object of Entity
action: TurnAction
@ -643,21 +698,62 @@ proc run(turn: var Turn) =
var act = ActorActivation(orKind: ActorActivationKind.turn)
act.turn = move turn.desc
trace(turn.facet.actor, act)
turn.facet = nil # invalidate the turn
# TODO: catch exceptions here
for eff in turn.effects.mitems:
for eff in turn.effects.mvalues:
assert not eff.facet.isNil
turnQueue.addLast(move eff)
turn.facet = nil # invalidate the turn
proc runPendingTurns* =
while turnQueue.len > 0:
var turn = turnQueue.popFirst()
# TODO: check if actor is still valid
try: run(turn)
except CatchableError as err:
stderr.writeLine("actor ", turn.actor.name, " threw an error during a turn")
terminateActor(turn, err)
proc run* =
## Run actors to completion
var ready: seq[Continuation]
while true:
while turnQueue.len > 0:
var turn = turnQueue.popFirst()
# TODO: check if actor is still valid
run turn
runPendingTurns()
ioqueue.poll(ready)
if ready.len == 0: break
while ready.len > 0:
discard trampoline:
ready.pop()
try:
discard trampoline:
ready.pop()
except CatchableError as err:
stderr.writeLine "ioqueue continuation threw an error"
raise err
proc runActor*(name: string; bootProc: TurnAction) =
## Boot an actor `Actor` and churn ioqueue.
let actor = bootActor(name, bootProc)
if not actor.exitReason.isNil:
raise actor.exitReason
actors.run()
if not actor.exitReason.isNil:
raise actor.exitReason
type FacetGuard* = object
facet: Facet
proc initGuard*(f: Facet): FacetGuard =
result.facet = f
inc result.facet.inertCheckPreventers
proc disarm*(g: var FacetGuard) =
if not g.facet.isNil:
assert g.facet.inertCheckPreventers > 0
dec g.facet.inertCheckPreventers
g.facet = nil
proc `=destroy`*(g: FacetGuard) =
if not g.facet.isNil:
dec g.facet.inertCheckPreventers
proc `=copy`*(dst: var FacetGuard, src: FacetGuard) =
dst.facet = src.facet
inc dst.facet.inertCheckPreventers

View File

@ -42,7 +42,7 @@ type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
bootActor(name) do (turn: var Turn):
discard turn.facet.preventInertCheck()
turn.preventInertCheck()
bootProc(turn, newDataspace(turn))
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =

View File

@ -161,9 +161,6 @@ proc lookupLocal(relay; oid: Oid): Cap =
if not sym.isNil:
result = sym.cap
proc isInert(r: Cap): bool =
r.target.isNil
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
case n.orKind
of WireRefKind.mine:
@ -262,12 +259,12 @@ type
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
spawnLink(name, turn) do (turn: var Turn):
turn.preventInertCheck()
let relay = Relay(
facet: turn.facet,
packetWriter: opts.packetWriter,
wireBuf: newBufferedDecoder(0),
)
discard relay.facet.preventInertCheck()
if not opts.initialCap.isNil:
var exported: seq[WireSymbol]
discard rewriteCapOut(relay, opts.initialCap, exported)
@ -319,10 +316,10 @@ when defined(posix):
stopActor(entity.facet)
else:
entity.relay.recv(buf[], 0..<n)
close(entity.stdin)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
## Connect to an external dataspace over stdio.
let localDataspace = newDataspace(turn)
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
## Blocking write to stdout.
let n = writeBytes(stdout, buf, 0, buf.len)
@ -331,7 +328,7 @@ when defined(posix):
stopActor(turn)
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialCap: ds,
initialCap: localDataspace,
initialOid: 0.Oid.some,
)
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
@ -346,12 +343,14 @@ when defined(posix):
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
onStop(entity.facet) do (turn: var Turn):
entity.alive = false
close(entity.stdin)
# Close stdin to remove it from the ioqueue
discard trampoline:
whelp loop(entity)
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: newCap(entity, turn),
resolved: relay.peer.accepted,
resolved: localDataspace.accepted,
))
proc connectStdio*(turn: var Turn; ds: Cap) =
@ -382,7 +381,9 @@ when defined(posix):
template bootSocketEntity() {.dirty.} =
proc setup(turn: var Turn) {.closure.} =
proc kill(turn: var Turn) =
entity.alive = false
if entity.alive:
entity.alive = false
close(entity.sock)
onStop(turn, kill)
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
@ -400,7 +401,7 @@ when defined(posix):
stopActor(entity.facet)
else:
entity.relay.recv(buf[], 0..<n)
close(entity.sock)
# the socket closes when the actor is stopped
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
entity.sock = connectTcpAsync(ta.host, Port ta.port)
@ -560,11 +561,16 @@ proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
## Resolve a capability from the calling environment
## and call `bootProc`. See envRoute_.
var resolved = false
let
ds = newDataspace(turn)
pat = ResolvePath ?: {0: ?envRoute(), 3: ?:ResolvedAccepted}
during(turn, ds, pat) do (dst: Cap):
bootProc(turn, dst)
if not resolved:
resolved = true
bootProc(turn, dst)
do:
resolved = false
spawnRelays(turn, ds)
# TODO: define a runActor that comes preloaded with relaying

View File

@ -1,6 +1,6 @@
# Package
version = "20240308"
version = "20240315"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "Unlicense"
@ -9,4 +9,4 @@ srcDir = "src"
# Dependencies
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps"
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240312", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps"

View File

@ -1,7 +1,8 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncfile, parseopt]
import std/[oserrors, parseopt, posix, strutils]
import pkg/sys/[files, handles, ioqueue]
import preserves, syndicate, syndicate/relays
type
@ -10,33 +11,51 @@ type
Says {.preservesRecord: "Says".} = object
who, what: string
proc readStdin(facet: Facet; ds: Cap; username: string) =
let file = openAsync("/dev/stdin")
onStop(facet) do (turn: var Turn): close(file)
close(stdin)
proc readLine() {.gcsafe.} =
let future = readLine(file)
addCallback(future, facet) do (turn: var Turn):
var msg = read(future)
if msg == "": quit()
message(turn, ds, Says(who: username, what: msg))
readLine()
readLine()
proc syncAndStop(facet: Facet; cap: Cap) =
## Stop the actor responsible for `facet` after
## synchronizing with `cap`.
run(facet) do (turn: var Turn):
sync(turn, cap, stopActor)
proc readStdin(facet: Facet; ds: Cap; username: string) {.asyncio.} =
let
fd = stdin.getOsFileHandle()
flags = fcntl(fd.cint, F_GETFL, 0)
if flags < 0:
raiseOSError(osLastError())
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
raiseOSError(osLastError())
let
file = newAsyncFile(FD fd)
buf = new string
buf[].setLen(0x1000)
while true:
let n = read(file, buf)
if n < 1:
stderr.writeLine "test_chat calls stopsActor ", facet.actor
syncAndStop(facet, ds)
return
else:
var msg = buf[][0..<n].strip
proc send(turn: var Turn) =
message(turn, ds, Says(who: username, what: msg))
run(facet, send)
proc chat(turn: var Turn; ds: Cap; username: string) =
during(turn, ds, ?:Present) do (who: string):
echo who, " joined"
do:
echo who, " left"
during(turn, ds, ?:Present) do (who: string):
echo who, " joined"
do:
echo who, " left"
onMessage(turn, ds, ?:Says) do (who: string, what: string):
echo who, ": ", what
onMessage(turn, ds, ?:Says) do (who: string, what: string):
echo who, ": ", what
discard publish(turn, ds, Present(username: username))
readStdin(turn.facet, ds, username)
discard publish(turn, ds, Present(username: username))
discard trampoline:
whelp readStdin(turn.facet, ds, username)
proc main =
let route = envRoute()
var username = ""
for kind, key, val in getopt():
@ -48,9 +67,8 @@ proc main =
if username == "":
stderr.writeLine "--user: unspecified"
else:
runActor("chat") do (turn: var Turn; root: Cap):
spawnRelays(turn, root)
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
runActor("chat") do (turn: var Turn):
resolveEnvironment(turn) do (turn: var Turn; ds: Cap):
chat(turn, ds, username)
main()

View File

@ -7,7 +7,7 @@ import syndicate, syndicate/actors/timers
let actor = bootActor("timer-test") do (turn: var Turn):
let timers = newDataspace(turn)
spawnTimerActor(timers, turn)
spawnTimerActor(turn, timers)
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
echo "now in 13th bʼakʼtun"