Compare commits

...

2 Commits

Author SHA1 Message Date
Emery Hemingway 71af7b0d72 CPS refactor 2024-02-19 22:40:48 +00:00
Emery Hemingway b2506d81bb Hash brownie 2024-02-18 00:56:59 +00:00
43 changed files with 2946 additions and 551 deletions

View File

@ -38,11 +38,11 @@
"packages": [ "packages": [
"nimcrypto" "nimcrypto"
], ],
"path": "/nix/store/7b491gv9zlayilsh8k2gnyzw6znrh7xq-source", "path": "/nix/store/jwz8pqbv6rsm8w4fjzdb37r0wzjn5hv0-source",
"rev": "70151aa132f3a771996117c23c1fcaa8446a6f35", "rev": "d58da671799c69c0b3208b96c154e13c8b1a9e90",
"sha256": "1ldjz02p70wagqvk6vgcg16kjh7pkm1394qd1pcdmg8z39bm5ag3", "sha256": "12dm0gsy10ppga7zf7hpf4adaqjrd9b740n2w926xyazq1njf6k9",
"srcDir": "", "srcDir": "",
"url": "https://github.com/cheatfate/nimcrypto/archive/70151aa132f3a771996117c23c1fcaa8446a6f35.tar.gz" "url": "https://github.com/cheatfate/nimcrypto/archive/d58da671799c69c0b3208b96c154e13c8b1a9e90.tar.gz"
}, },
{ {
"method": "fetchzip", "method": "fetchzip",

View File

@ -1,2 +0,0 @@
include_rules
: foreach *.nim |> !nim_check |>

699
src/sam/actors.bak Normal file
View File

@ -0,0 +1,699 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[deques, hashes, monotimes, options, sets, sequtils, tables, times]
import pkg/cps
import preserves
import ../syndicate/protocols/[protocol, sturdy]
export cps
const traceSyndicate {.booldefine.}: bool = true
when traceSyndicate:
import std/streams
from std/os import getEnv
import ./protocols/trace
type TraceSink = ref object
stream: FileStream
proc newTraceSink: TraceSink =
new result
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": quit"$SYNDICATE_TRACE_FILE unset"
of "-": actor.stream = newFileStream(stderr)
else: result.stream = openFileStream(path, fmWrite)
proc write(s: TraceSink; e: TraceEntry) = s.write(e.toPreserves)
export Handle
template generateIdType(typ: untyped) =
type typ* = distinct Natural
proc `==`*(x, y: typ): bool {.borrow.}
proc `$`*(id: typ): string {.borrow.}
generateIdType(ActorId)
generateIdType(FacetId)
generateIdType(EndpointId)
generateIdType(FieldId)
generateIdType(TurnId)
type
Oid = sturdy.Oid
Caveat = sturdy.Caveat
Attenuation = seq[Caveat]
Rewrite = sturdy.Rewrite
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
Entity* = ref object of RootObj
oid*: Oid # oid is how Entities are identified over the wire
Cap* {.preservesEmbedded.} = ref object of EmbeddedObj
relay*: Facet
target*: Entity
attenuation*: Attenuation
Ref* {.deprecated: "Ref was renamed to Cap".} = Cap
OutboundAssertion = ref object
handle: Handle
peer: Cap
established: bool
OutboundTable = Table[Handle, OutboundAssertion]
Actor* = ref object
name: string
handleAllocator: ref Handle
# a fresh actor gets a new ref Handle and
# all actors spawned from it get the same ref.
root: Facet
exitReason: ref Exception
exitHooks: seq[TurnAction]
id: ActorId
exiting, exited: bool
when traceSyndicate:
turnIdAllocator: ref TurnId
traceStream: FileStream
TurnAction* = proc (t: Turn)
Queues = TableRef[Facet, Deque[Cont]]
Turn* = ref object
facet: Facet
queues: Queues
when traceSyndicate:
desc: TurnDescription
Cont* = ref object of Continuation
turn*: Turn
Facet* = ref FacetObj
FacetObj = object
actor*: Actor
parent: Facet
children: HashSet[Facet]
outbound: OutboundTable
shutdownActions: seq[TurnAction]
inertCheckPreventers: int
id: FacetId
isAlive: bool
proc pass*(a, b: Cont): Cont =
assert not a.turn.isNil
b.turn = move a.turn
return b
template turnAction*(prc: typed): untyped =
cps(Cont, prc)
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
assert not c.turn.isNil
c.turn
when traceSyndicate:
proc nextTurnId(facet: Facet): TurnId =
result = succ(facet.actor.turnIdAllocator[])
facet.actor.turnIdAllocator[] = result
proc trace(actor: Actor; act: ActorActivation) =
assert not actor.traceStream.isNil
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: initRecord("named", actor.name.toPreserves),
item: act)
actor.traceStream.writeLine($entry.toPreserves)
proc path(facet: Facet): seq[trace.FacetId] =
var f = facet
while not f.isNil:
result.add f.id.toPreserves
f = f.parent
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
using
actor: Actor
facet: Facet
turn: Turn
action: TurnAction
proc labels(f: Facet): string =
proc catLabels(f: Facet; labels: var string) =
labels.add ':'
if not f.parent.isNil:
catLabels(f.parent, labels)
labels.add ':'
when traceSyndicate:
labels.add $f.id
result.add f.actor.name
catLabels(f, result)
proc `$`*(f: Facet): string =
"<Facet:" & f.labels & ">"
proc `$`*(r: Cap): string =
"<Ref:" & r.relay.labels & ">"
proc `$`*(actor: Actor): string =
"<Actor:" & actor.name & ">" # TODO: ambigous
proc attenuate(r: Cap; a: Attenuation): Cap =
if a.len == 0: result = r
else: result = Cap(
relay: r.relay,
target: r.target,
attenuation: a & r.attenuation)
proc hash*(facet): Hash =
facet.id.hash
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
proc nextHandle(facet: Facet): Handle =
result = succ(facet.actor.handleAllocator[])
facet.actor.handleAllocator[] = result
proc facet*(turn: Turn): Facet = turn.facet
proc enqueue(turn: Turn; target: Facet; cont: Cont) =
cont.turn = turn
if target in turn.queues:
turn.queues[target].addLast cont
else:
turn.queues[target] = toDeque([cont])
type Bindings = Table[Value, Value]
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
case p.orKind
of PatternKind.Pdiscard: result = true
of PatternKind.Patom:
result = case p.patom
of PAtom.Boolean: v.isBoolean
of PAtom.Double: v.isDouble
of PAtom.Signedinteger: v.isInteger
of PAtom.String: v.isString
of PAtom.Bytestring: v.isByteString
of PAtom.Symbol: v.isSymbol
of PatternKind.Pembedded:
result = v.isEmbedded
of PatternKind.Pbind:
if match(bindings, p.pbind.pattern, v):
bindings[p.pbind.pattern.toPreserves] = v
result = true
of PatternKind.Pand:
for pp in p.pand.patterns:
result = match(bindings, pp, v)
if not result: break
of PatternKind.Pnot:
var b: Bindings
result = not match(b, p.pnot.pattern, v)
of PatternKind.Lit:
result = p.lit.value == v
of PatternKind.PCompound:
case p.pcompound.orKind
of PCompoundKind.rec:
if v.isRecord and
p.pcompound.rec.label == v.label and
p.pcompound.rec.fields.len == v.arity:
result = true
for i, pp in p.pcompound.rec.fields:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.arr:
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
result = true
for i, pp in p.pcompound.arr.items:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.dict:
if v.isDictionary:
result = true
for key, pp in p.pcompound.dict.entries:
let vv = step(v, key)
if vv.isNone or not match(bindings, pp, get vv):
result = true
break
proc match(p: Pattern; v: Value): Option[Bindings] =
var b: Bindings
if match(b, p, v):
result = some b
proc instantiate(t: Template; bindings: Bindings): Value =
case t.orKind
of TemplateKind.Tattenuate:
let v = instantiate(t.tattenuate.template, bindings)
let cap = v.unembed(Cap)
if cap.isNone:
raise newException(ValueError, "Attempt to attenuate non-capability")
result = attenuate(get cap, t.tattenuate.attenuation).embed
of TemplateKind.TRef:
let n = $t.tref.binding.int
try: result = bindings[n.toPreserves]
except KeyError:
raise newException(ValueError, "unbound reference: " & n)
of TemplateKind.Lit:
result = t.lit.value
of TemplateKind.Tcompound:
case t.tcompound.orKind
of TCompoundKind.rec:
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
for i, tt in t.tcompound.rec.fields:
result[i] = instantiate(tt, bindings)
of TCompoundKind.arr:
result = initSequence(t.tcompound.arr.items.len)
for i, tt in t.tcompound.arr.items:
result[i] = instantiate(tt, bindings)
of TCompoundKind.dict:
result = initDictionary()
for key, tt in t.tcompound.dict.entries:
result[key] = instantiate(tt, bindings)
proc rewrite(r: Rewrite; v: Value): Value =
let bindings = match(r.pattern, v)
if bindings.isSome:
result = instantiate(r.template, get bindings)
proc examineAlternatives(cav: Caveat; v: Value): Value =
case cav.orKind
of CaveatKind.Rewrite:
result = rewrite(cav.rewrite, v)
of CaveatKind.Alts:
for r in cav.alts.alternatives:
result = rewrite(r, v)
if not result.isFalse: break
of CaveatKind.Reject: discard
of CaveatKind.unknown: discard
proc runRewrites*(a: Attenuation; v: Value): Value =
result = v
for stage in a:
result = examineAlternatives(stage, result)
if result.isFalse: break
proc publish(target: Entity; e: OutboundAssertion; a: AssertionRef) {.turnAction.} =
e.established = true
publish(target, activeTurn(), a, e.handle)
proc publish(turn: Turn; r: Cap; v: Value; h: Handle) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
let e = OutboundAssertion(
handle: h, peer: r, established: false)
turn.facet.outbound[h] = e
enqueue(turn, r.relay, whelp publish(r.target, e, AssertionRef(value: a)))
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
act.enqueue.event.target.facet = turn.facet.id.toPreserves
act.enqueue.event.target.oid = r.target.oid.toPreserves
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert.assertion.value.value =
mapEmbeds(v) do (cap: Value) -> Value: discard
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
proc publish*(turn: Turn; r: Cap; a: Value): Handle =
result = turn.facet.nextHandle()
publish(turn, r, a, result)
proc publish*(r: Cap; a: Value): Handle {.turnAction.} =
publish(activeTurn(), r, a, result)
proc retract(e: OutboundAssertion) {.turnAction.} =
if e.established:
e.established = false
e.peer.target.retract(activeTurn(), e.handle)
proc retract(turn: Turn; e: OutboundAssertion) =
enqueue(turn, e.peer.relay, whelp retract(e))
proc retract*(turn: Turn; h: Handle) =
var e: OutboundAssertion
if turn.facet.outbound.pop(h, e):
turn.retract(e)
proc message(target: Entity; a: AssertionRef) {.turnAction.} =
target.message(activeTurn(), a)
proc message*(turn: Turn; r: Cap; v: Value) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
enqueue(turn, r.relay, whelp message(r.target, AssertionRef(value: a)))
proc message*(target: Cap; value: Value) {.turnAction.} =
message(activeTurn(), target, value)
proc sync(e: Entity; peer: Cap) {.turnAction.} =
e.sync(activeTurn(), peer)
proc sync(turn: Turn; e: Entity; peer: Cap) =
e.sync(turn, peer)
proc sync*(turn: Turn; r, peer: Cap) =
enqueue(turn, r.relay, whelp sync(r.target, peer))
proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle =
result = publish(turn, cap, v)
if h != default(Handle):
retract(turn, h)
proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
var old = h
h = publish(turn, cap, v)
if old != default(Handle):
retract(turn, old)
h
proc stop*(turn: Turn)
proc run*(facet; action: TurnAction; zombieTurn = false)
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
result = Facet(
id: getMonoTime().ticks.FacetId,
actor: actor,
parent: parent,
outbound: initialAssertions,
isAlive: true)
if not parent.isNil: parent.children.incl result
proc newFacet(actor; parent: Facet): Facet =
var initialAssertions: OutboundTable
newFacet(actor, parent, initialAssertions)
proc isInert(facet): bool =
result = facet.children.len == 0 and
(facet.outbound.len == 0 or facet.parent.isNil) and
facet.inertCheckPreventers == 0
proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
var armed = true
inc facet.inertCheckPreventers
proc disarm() =
if armed:
armed = false
dec facet.inertCheckPreventers
result = disarm
proc inFacet(turn: Turn; facet; act: TurnAction) =
## Call an action with a facet using a temporary `Turn`
## that shares the `Queues` of the calling `Turn`.
var t = Turn(facet: facet, queues: turn.queues)
act(t)
proc terminate(actor; turn; reason: ref Exception)
proc terminate(facet; turn: Turn; orderly: bool) =
if facet.isAlive:
facet.isAlive = false
let parent = facet.parent
if not parent.isNil:
parent.children.excl facet
block:
var turn = Turn(facet: facet, queues: turn.queues)
while facet.children.len > 0:
facet.children.pop.terminate(turn, orderly)
if orderly:
for act in facet.shutdownActions:
act(turn)
for a in facet.outbound.values: turn.retract(a)
if orderly:
if not parent.isNil:
if parent.isInert:
parent.terminate(turn, true)
else:
terminate(facet.actor, turn, nil)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
turn.desc.actions.add act
proc stopIfInert() {.turnAction.} =
let turn = activeTurn()
if (not turn.facet.parent.isNil and (not turn.facet.parent.isAlive)) or turn.facet.isInert:
stop(turn)
proc stopIfInertAfter(action: TurnAction): TurnAction =
proc wrapper(turn: Turn) =
action(turn)
enqueue(turn, turn.facet, whelp stopIfInert())
wrapper
proc newFacet*(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet)
proc inFacet*(turn: Turn; bootProc: TurnAction): Facet =
result = newFacet(turn)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
act.facetstart.path.add result.path
turn.desc.actions.add act
inFacet(turn, result, stopIfInertAfter(bootProc))
proc facet*(turn: Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc run(actor; bootProc: TurnAction) =
var initialAssertions: OutboundTable
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc newActor(name: string; handleAlloc: ref Handle): Actor =
let
now = getTime()
seed = now.toUnix * 1_000_000_000 + now.nanosecond
result = Actor(
name: name,
id: ActorId(seed),
handleAllocator: handleAlloc,
)
result.root = newFacet(result, nil)
when traceSyndicate:
new result.turnIdAllocator
proc newActor*(name: string): Actor =
newActor(name, new(ref Handle))
proc bootActor*(name: string; bootProc: TurnAction) =
var
initialAssertions: OutboundTable
actor = newActor(name)
when traceSyndicate:
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
case path
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
of "-": actor.traceStream = newFileStream(stderr)
else: actor.traceStream = openFileStream(path, fmWrite)
when traceSyndicate:
var act = ActorActivation(orKind: ActorActivationKind.start)
act.start.actorName = Name(orKind: NameKind.named)
act.start.actorName.named.name = name.toPreserves
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
item: act)
actor.traceStream.writeLine($entry.toPreserves)
let turn = newTurn(actor, TurnCauseExternal(description: "top-level actor"))
run(actor, bootProc, initialAssertions)
proc bootActor*(name: string; cont: Cont) =
bootActor(name) do (turn: Turn):
enqueue(turn, turn.facet, cont)
proc spawnActor(actor: Actor; bootProc: TurnAction; initialAssertions: HashSet[Handle]) {.turnAction.} =
let turn = activeTurn()
var newOutBound: Table[Handle, OutboundAssertion]
for key in initialAssertions:
discard turn.facet.outbound.pop(key, newOutbound[key])
when traceSyndicate:
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
actor.traceStream = turn.facet.actor.traceStream
var act = ActionDescription(orKind: ActionDescriptionKind.spawn)
act.spawn.id = actor.id.toPreserves
turn.desc.actions.add act
run(actor, bootProc, newOutBound)
proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
let actor = newActor(name, turn.facet.actor.handleAllocator)
enqueue(turn, turn.facet, whelp spawnActor(actor, bootProc, initialAssertions))
actor
proc newInertCap*(): Cap =
let a = newActor("inert")
run(a) do (turn: Turn): turn.stop()
Cap(relay: a.root)
proc atExit*(actor; action) = actor.exitHooks.add action
proc terminate(actor: Actor; orderly: bool) {.turnAction.} =
actor.root.terminate(activeTurn(), orderly)
actor.exited = true
proc terminate(actor; turn; reason: ref Exception) =
if not actor.exiting:
actor.exiting = true
actor.exitReason = reason
when traceSyndicate:
var act = ActorActivation(orKind: ActorActivationKind.stop)
if not reason.isNil:
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
act.stop.status.error.message = reason.msg
trace(actor, act)
for hook in actor.exitHooks: hook(turn)
enqueue(turn, actor.root, whelp terminate(actor, reason.isNil))
proc terminate*(facet; e: ref Exception) =
run(facet.actor.root) do (turn: Turn):
facet.actor.terminate(turn, e)
#[
proc asyncCheck*(facet: Facet; fut: FutureBase) =
## Sets a callback on `fut` which propagates exceptions to `facet`.
addCallback(fut) do ():
if fut.failed: terminate(facet, fut.error)
proc asyncCheck*(turn; fut: FutureBase) =
## Sets a callback on `fut` which propagates exceptions to the facet of `turn`.
asyncCheck(turn.facet, fut)
]#
template tryFacet(facet; body: untyped) =
try: body
except CatchableError as err: terminate(facet, err)
proc run(facet: Facet; turn: Turn; deq: var Deque[Cont]): int =
## Return the number of continuations processed.
while deq.len > 0:
var c = deq.popFirst()
try:
while not c.isNil and not c.fn.isNil:
c.turn = turn
var y = c.fn
var x = y(c)
inc(result)
c = Cont(x)
except CatchableError as err:
if not c.dismissed:
writeStackFrames c
terminate(facet, err)
stderr.writeLine("ran ", result, " continuations for ", facet)
proc run*(facet; action: TurnAction; zombieTurn = false) =
if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
tryFacet(facet):
var queues = newTable[Facet, Deque[Cont]]()
var turn = Turn(facet: facet, queues: queues)
action(turn)
when traceSyndicate:
turn.desc.id = facet.nextTurnId.toPreserves
facet.actor.trace ActorActivation(
orKind: ActorActivationKind.turn, turn: turn.desc)
assert not turn.isNil
var n = 1
while n > 0:
n = 0
var facets = queues.keys.toSeq
for facet in facets:
n.inc run(facet, turn, queues[facet])
proc run*(cap: Cap; action: TurnAction) =
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
run(cap.relay, action)
#[
proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
## Add a callback to a `Future` that will be called at a later `Turn`
## within the context of `facet`.
addCallback(fut) do ():
if fut.failed: terminate(facet, fut.error)
else:
when traceSyndicate:
run(facet) do (turn: Turn):
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserves
act(turn)
else:
run(facet, act)
proc addCallback*(fut: FutureBase; turn: Turn; act: TurnAction) =
## Add a callback to a `Future` that will be called at a later `Turn`
## with the same context as the current.
if fut.failed:
terminate(turn.facet, fut.error)
elif fut.finished:
enqueue(turn, turn.facet, act)
else:
addCallback(fut, turn.facet, act)
proc addCallback*[T](fut: Future[T]; turn: Turn; act: proc (t: Turn, x: T) {.gcsafe.}) =
addCallback(fut, turn) do (turn: Turn):
if fut.failed: terminate(turn.facet, fut.error)
else:
when traceSyndicate:
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserves
act(turn, read fut)
]#
proc stop*(turn: Turn, facet: Facet) =
if facet.parent.isNil:
facet.terminate(turn, true)
else:
enqueue(turn, facet.parent, whelp terminate(facet.actor, true))
# TODO: terminate the actor?
proc stop*(turn: Turn) =
stop(turn, turn.facet)
proc onStop*(facet: Facet; act: TurnAction) =
## Add a `proc (turn: Turn)` action to `facet` to be called as it stops.
add(facet.shutdownActions, act)
proc stopActor*(turn: Turn) =
let actor = turn.facet.actor
enqueue(turn, actor.root, whelp terminate(actor, true))
proc freshen*(turn: Turn, act: TurnAction) =
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
run(turn.facet, act)
proc newCap*(relay: Facet; e: Entity): Cap =
Cap(relay: relay, target: e)
proc newCap*(turn; e: Entity): Cap =
Cap(relay: turn.facet, target: e)
proc newCap*(e: Entity; turn): Cap =
Cap(relay: turn.facet, target: e)
type SyncContinuation {.final.} = ref object of Entity
action: TurnAction
method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) =
entity.action(turn)
proc sync*(turn: Turn; refer: Cap; act: TurnAction) =
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
proc running*(actor): bool =
result = not actor.exited
if not (result or actor.exitReason.isNil):
raise actor.exitReason
proc newCap*(e: Entity): Cap {.turnAction.} =
Cap(relay: activeTurn().facet, target: e)

552
src/sam/actors.nim Normal file
View File

@ -0,0 +1,552 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[deques, hashes, options, tables, times]
import pkg/cps
import preserves
import ./protocols/[protocol, sturdy]
# const traceSyndicate {.booldefine.}: bool = true
const traceSyndicate* = true
when traceSyndicate:
import std/streams
from std/os import getEnv
import ./protocols/trace
export protocol.Handle
type
Cont* = ref object of Continuation
facet: Facet
PublishProc* = proc (e: Entity; v: Value; h: Handle) {.cps: Cont.}
RetractProc* = proc (e: Entity; h: Handle) {.cps: Cont.}
MessageProc* = proc (e: Entity; v: Value) {.cps: Cont.}
SyncProc* = proc (e: Entity; peer: Cap) {.cps: Cont.}
Handler* = proc() {.closure.}
Work = Deque[Cont]
HandlerDeque = seq[ContinuationProc[Continuation]]
FacetState = enum fFresh, fRunning, fEnded
Callback* = proc () {.closure.}
OutboundTable = Table[Handle, OutboundAssertion]
OutboundAssertion = ref object
handle: Handle
peer: Cap
established: bool
Facet* = ref object
## https://synit.org/book/glossary.html#facet
actor: Actor
parent: Facet
children: seq[Facet]
outbound: OutboundTable
stopHandlers: HandlerDeque
stopCallbacks: seq[Callback]
state: FacetState
id: FacetId
FacetProc* = proc (f: Facet) {.closure.}
## Type for callbacks to be called within a turn.
## The `Facet` parameter is the owning facet.
Turn {.byref.} = object
## https://synit.org/book/glossary.html#turn
facet: Facet
entity: Entity
work: Work
actions: seq[Cont]
event: Option[protocol.Event]
when traceSyndicate:
desc: TurnDescription
Entity* = ref object of RootObj
## https://synit.org/book/glossary.html#entity
oid*: sturdy.Oid # oid is how Entities are identified over the wire
publishImpl*: PublishProc
retractImpl*: RetractProc
messageImpl*: MessageProc
syncImpl*: SyncProc
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
relay*: Facet
target*: Entity
attenuation*: seq[sturdy.Caveat]
Actor* = ref object
## https://synit.org/book/glossary.html#actor
# TODO: run on a seperate thread.
# crashHandlers: HandlerDeque
root: Facet
handleAllocator: Handle
facetIdAllocator: int
id: ActorId
turn: Turn
when traceSyndicate:
traceStream: FileStream
stopped: bool
template turnWork*(prc: typed): untyped =
cps(Cont, prc)
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a `{.syndicate.}` context.
assert not c.facet.isNil
c.facet
using
actor: Actor
facet: Facet
entity: Entity
cap: Cap
turn: Turn
proc `$`*(facet): string = $facet.id
proc `$`*(cap): string = "#:…"
proc hash*(facet): Hash = facet.unsafeAddr.hash
proc hash*(cap): Hash = cap.unsafeAddr.hash
proc newFacet(actor: Actor; parent: Facet): Facet =
inc(actor.facetIdAllocator)
result = Facet(
actor: actor,
parent: parent,
id: actor.facetIdAllocator.toPreserves,
)
proc stopped*(facet): bool = facet.state != fRunning
proc newActor(name: string): Actor =
result = Actor(id: name.toPreserves)
result.root = newFacet(result, nil)
when traceSyndicate:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": discard
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
when traceSyndicate:
proc trace(actor; act: ActorActivation) =
if not actor.traceStream.isNil:
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: actor.id,
item: act,
)
actor.traceStream.writeLine($entry.toPreserves)
proc traceTurn(actor) =
if not actor.traceStream.isNil:
actor.trace(ActorActivation(
orKind: ActorActivationKind.turn,
turn: actor.turn.desc,
))
proc traceTarget(facet): trace.Target =
Target(
actor: facet.actor.id,
facet: facet.id,
)
proc traceTarget(cap): trace.Target =
let facet = cap.relay
Target(
actor: facet.actor.id,
facet: facet.id,
oid: cap.target.oid.toPreserves,
)
proc traceEnqueue(actor; e: TargetedTurnEvent) =
actor.turn.desc.actions.add ActionDescription(
orKind: ActionDescriptionKind.enqueue,
enqueue: ActionDescriptionEnqueue(event: e),
)
proc traceDequeue(actor; e: TargetedTurnEvent) =
actor.turn.desc.actions.add ActionDescription(
orKind: ActionDescriptionKind.dequeue,
dequeue: ActionDescriptionDequeue(event: e),
)
proc pass*(a, b: Cont): Cont =
assert not a.facet.isNil
b.facet = a.facet
b
proc queueWork(facet; c: Cont) =
c.facet = facet
facet.actor.turn.work.addLast(c)
proc yieldWork(c: Cont): Cont {.cpsMagic.} =
## Suspend and enqueue the caller until later in the turn.
assert not c.facet.isNil
c.facet.queueWork(c)
nil
proc yieldToActions(c: Cont): Cont {.cpsMagic.} =
assert not c.facet.isNil
c.facet.actor.turn.actions.add(c)
nil
proc startExternalTurn(facet) =
let actor = facet.actor
assert actor.turn.work.len == 0
assert actor.turn.actions.len == 0
actor.turn.facet = facet
when traceSyndicate:
actor.turn.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
proc terminate(actor; err: ref Exception) =
raise err
proc terminate(facet; err: ref Exception) =
terminate(facet.actor, err)
proc complete(c: Cont) =
var c = c
try:
while not c.isNil and not c.fn.isNil:
var y = c.fn
var x = y(c)
c = Cont(x)
except CatchableError as err:
if not c.dismissed:
writeStackFrames c
terminate(c.facet, err)
proc run(actor) =
assert not actor.stopped
var n = 0
while actor.turn.work.len > 0:
actor.turn.work.popFirst().complete()
inc n
echo n, " items completed from work queue"
when traceSyndicate:
actor.traceTurn()
var i: int
while i < actor.turn.actions.len:
complete(move actor.turn.actions[i])
inc i
echo i, " items completed from action queue"
turn.actions.setLen(0)
when traceSyndicate:
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
proc start(actor; cont: Cont) =
when traceSyndicate:
var act = ActorActivation(orkind: ActorActivationKind.start)
trace(actor, act)
actor.root.state = fRunning
actor.root.startExternalTurn()
actor.root.queueWork(cont)
run(actor)
proc stop(actor)
proc collectPath(result: var seq[FacetId]; facet) =
if not facet.parent.isNil:
collectPath(result, facet.parent)
result.add(facet.id)
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
c.fn = facet.stopHandlers.pop()
result = c
proc runNextFacetStop() {.cps: Cont.} =
activeFacet().runNextStop()
proc stop(facet; reason: FacetStopReason) =
let actor = facet.actor
while facet.stopHandlers.len > 0:
var c = whelp runNextFacetStop()
c.facet = facet
complete(c)
while facet.stopCallbacks.len > 0:
var cb = facet.stopCallbacks.pop()
cb()
while facet.children.len > 0:
stop(facet.children.pop(), FacetStopReason.parentStopping)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
collectPath(act.facetstop.path, facet)
act.facetStop.reason = reason
actor.turn.desc.actions.add act
if facet.parent.isNil:
actor.root = nil
stop(actor)
proc stop(actor) =
if not actor.root.isNil:
stop(actor.root, FacetStopReason.actorStopping)
actor.stopped = true
proc bootActor(name: string, c: Cont) =
start(newActor(name), c)
# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} =
# stop(c.facet, reason)
proc stopActorAction() {.cps: Cont.} =
activeFacet().actor.stop()
proc stopActor*(facet) =
let c = whelp stopActorAction()
c.facet = facet
facet.actor.turn.actions.add(c)
type
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
proc newCap*(f: Facet; e: Entity): Cap =
Cap(relay: f, target: e)
proc nextHandle(facet: Facet): Handle =
inc(facet.actor.handleAllocator)
facet.actor.handleAllocator
proc actor(cap): Actor = cap.relay.actor
type Bindings = Table[Value, Value]
proc attenuate(r: Cap; a: seq[Caveat]): Cap =
if a.len == 0: result = r
else: result = Cap(
relay: r.relay,
target: r.target,
attenuation: a & r.attenuation)
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
case p.orKind
of PatternKind.Pdiscard: result = true
of PatternKind.Patom:
result = case p.patom
of PAtom.Boolean: v.isBoolean
of PAtom.Double: v.isDouble
of PAtom.Signedinteger: v.isInteger
of PAtom.String: v.isString
of PAtom.Bytestring: v.isByteString
of PAtom.Symbol: v.isSymbol
of PatternKind.Pembedded:
result = v.isEmbedded
of PatternKind.Pbind:
if match(bindings, p.pbind.pattern, v):
bindings[p.pbind.pattern.toPreserves] = v
result = true
of PatternKind.Pand:
for pp in p.pand.patterns:
result = match(bindings, pp, v)
if not result: break
of PatternKind.Pnot:
var b: Bindings
result = not match(b, p.pnot.pattern, v)
of PatternKind.Lit:
result = p.lit.value == v
of PatternKind.PCompound:
case p.pcompound.orKind
of PCompoundKind.rec:
if v.isRecord and
p.pcompound.rec.label == v.label and
p.pcompound.rec.fields.len == v.arity:
result = true
for i, pp in p.pcompound.rec.fields:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.arr:
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
result = true
for i, pp in p.pcompound.arr.items:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.dict:
if v.isDictionary:
result = true
for key, pp in p.pcompound.dict.entries:
let vv = step(v, key)
if vv.isNone or not match(bindings, pp, get vv):
result = true
break
proc match(p: Pattern; v: Value): Option[Bindings] =
var b: Bindings
if match(b, p, v):
result = some b
proc instantiate(t: Template; bindings: Bindings): Value =
case t.orKind
of TemplateKind.Tattenuate:
let v = instantiate(t.tattenuate.template, bindings)
let cap = v.unembed(Cap)
if cap.isNone:
raise newException(ValueError, "Attempt to attenuate non-capability")
result = attenuate(get cap, t.tattenuate.attenuation).embed
of TemplateKind.TRef:
let n = $t.tref.binding.int
try: result = bindings[n.toPreserves]
except KeyError:
raise newException(ValueError, "unbound reference: " & n)
of TemplateKind.Lit:
result = t.lit.value
of TemplateKind.Tcompound:
case t.tcompound.orKind
of TCompoundKind.rec:
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
for i, tt in t.tcompound.rec.fields:
result[i] = instantiate(tt, bindings)
of TCompoundKind.arr:
result = initSequence(t.tcompound.arr.items.len)
for i, tt in t.tcompound.arr.items:
result[i] = instantiate(tt, bindings)
of TCompoundKind.dict:
result = initDictionary()
for key, tt in t.tcompound.dict.entries:
result[key] = instantiate(tt, bindings)
proc rewrite(r: Rewrite; v: Value): Value =
let bindings = match(r.pattern, v)
if bindings.isSome:
result = instantiate(r.template, get bindings)
proc examineAlternatives(cav: Caveat; v: Value): Value =
case cav.orKind
of CaveatKind.Rewrite:
result = rewrite(cav.rewrite, v)
of CaveatKind.Alts:
for r in cav.alts.alternatives:
result = rewrite(r, v)
if not result.isFalse: break
of CaveatKind.Reject: discard
of CaveatKind.unknown: discard
proc runRewrites(v: Value; a: openarray[Caveat]): Value =
result = v
for stage in a:
result = examineAlternatives(stage, result)
if result.isFalse: break
proc publish(c: Cont; e: Entity; v: Value; h: Handle): Cont {.cpsMagic.} =
pass c, e.publishImpl.call(e, v, h)
proc retract(c: Cont; e: Entity; h: Handle): Cont {.cpsMagic.} =
pass c, e.retractImpl.call(e, h)
proc message(c: Cont; e: Entity; v: Value): Cont {.cpsMagic.} =
pass c, e.messageImpl.call(e, v)
proc sync(c: Cont; e: Entity; p: Cap): Cont {.cpsMagic.} =
pass c, e.syncImpl.call(e, p)
proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
traceEvent.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
traceEvent.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: h,
)
traceEvent.detail.assert.assertion.value.value = val
cap.actor.traceEnqueue(traceEvent)
cap.relay.outbound[h] = OutboundAssertion(handle: h, peer: cap)
yieldToActions()
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.publish(val, h)
cap.relay.outbound[h].established = true
proc turnRetract(cap: Cap; h: Handle) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
)
traceEvent.detail.retract.handle = h
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.retract(h)
proc turnMessage(cap: Cap; val: Value) {.turnWork.} =
var val = runRewrites(val, cap.attenuation)
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
)
traceEvent.detail.message.body.value.value = val
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.message(val)
proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
)
traceEvent.detail.sync.peer = peer.traceTarget
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.sync(peer)
proc publish*(cap; val: Value): Handle =
var val = runRewrites(val, cap.attenuation)
# TODO: attenuation to nothing?
result = cap.relay.nextHandle()
cap.relay.queueWork(whelp turnPublish(cap, val, result))
proc publish*[T](cap; x: T): Handle =
publish(cap, x.toPreserves)
proc retract*(cap; h: Handle) =
cap.relay.queueWork(whelp turnRetract(cap, h))
proc message*(cap; val: Value) =
var val = runRewrites(val, cap.attenuation)
cap.relay.queueWork(whelp turnMessage(cap, val))
proc message*[T](cap; x: T) =
message(cap, x.toPreserves)
proc sync*(cap, peer: Cap) =
cap.relay.queueWork(whelp turnSync(cap, peer))
proc installStopHook(c: Cont, facet: Facet): Cont {.cpsMagic.} =
facet.stopHandlers.add(c.fn)
return c
proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} =
c.facet.stopCallbacks.add(cb)
result = c
proc onStop*(facet; cb: proc () {.closure.}) =
facet.stopCallbacks.add(cb)
proc bootActor*(name: string; bootProc: FacetProc): Actor =
result = newActor(name)
result.root.startExternalTurn()
bootProc(result.root)
proc runActor*(name: string; bootProc: FacetProc) =
let actor = bootActor(name, bootProc)
while not actor.stopped:
run(actor)

View File

@ -3,10 +3,8 @@
import std/[asyncdispatch, monotimes, times, posix, times, epoll] import std/[asyncdispatch, monotimes, times, posix, times, epoll]
import preserves import preserves
import syndicate import ../syndicate, ../protocols/timer
from ../protocols/dataspace import Observe
import ../protocols/timer
from syndicate/protocols/dataspace import Observe
export timer export timer
@ -24,12 +22,16 @@ proc eventfd(count: cuint, flags: cint): cint
proc now: float64 = getTime().toUnixFloat() proc now: float64 = getTime().toUnixFloat()
proc processTimers(ds: Cap) {.turnAction.} = proc spawnTimers*(ds: Cap): Actor {.discardable.} =
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) ## Spawn a timer actor.
during(ds, pat) do (seconds: float): bootActor("timers") do (root: Facet):
let period = seconds - now() let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
if period < 0.001 or true: #[
let h = publish(ds, LaterThan(seconds: seconds).toPreserves) during(ds, pat) do (seconds: float):
let period = seconds - now()
if period < 0.001 or true:
let h = publish(ds, LaterThan(seconds: seconds).toPreserves)
]#
#[ #[
else: else:
@ -39,14 +41,12 @@ proc processTimers(ds: Cap) {.turnAction.} =
discard publish(turn, ds, LaterThan(seconds: seconds)) discard publish(turn, ds, LaterThan(seconds: seconds))
]# ]#
proc spawnTimers*(ds: Cap) = #[
## Spawn a timer actor. proc after*(ds: Cap; dur: Duration; cb: proc () {.closure.}) =
boot(newActor("timers"), whelp processTimers(ds))
proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) =
## Execute `act` after some duration of time. ## Execute `act` after some duration of time.
let later = now() + dur.inMilliseconds.float64 * 1_000.0 let later = now() + dur.inMilliseconds.float64 * 1_000.0
onPublish(ds, grab LaterThan(seconds: later)): onPublish(ds, grab LaterThan(seconds: later)):
act(turn) cb()
]#
# TODO: periodic timer # TODO: periodic timer

48
src/sam/dataspaces.nim Normal file
View File

@ -0,0 +1,48 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, options, tables]
import pkg/cps
import preserves
import ./[actors, patterns, skeletons]
from ./protocols/protocol import Handle
from ./protocols/dataspace import Observe
type
Dataspace {.final.} = ref object of Entity
index: Index
handleMap: Table[Handle, Value]
proc dsPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
var ds = Dataspace(e)
if ds.index.add(v):
var obs = v.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.add(obs.get.pattern, Cap(obs.get.observer))
ds.handleMap[h] = v
proc dsRetract(e: Entity; h: Handle) {.cps: Cont.} =
var ds = Dataspace(e)
var v = ds.handleMap[h]
if ds.index.remove(v):
ds.handleMap.del h
var obs = v.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.remove(obs.get.pattern, Cap(obs.get.observer))
proc dsMessage(e: Entity; v: Value) {.cps: Cont.} =
var ds = Dataspace(e)
ds.index.deliverMessage(v)
proc newDataspace*(f: Facet): Cap =
var ds = Dataspace(
publishImpl: whelp dsPublish,
retractImpl: whelp dsRetract,
messageImpl: whelp dsMessage,
index: initIndex(),
)
newCap(f, ds)
proc observe*(cap: Cap; pat: Pattern; e: Entity): Handle =
publish(cap, Observe(pattern: pat, observer: newCap(cap.relay, e)))

43
src/sam/durings.nim Normal file
View File

@ -0,0 +1,43 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, tables]
import preserves
import ./[actors, patterns]
type
DuringProc* = proc (a: Value; h: Handle): FacetProc {.gcsafe.}
DuringActionKind = enum null, dead, act
DuringAction = object
case kind: DuringActionKind
of null, dead: discard
of act:
retractProc: FacetProc
DuringEntity {.final.}= ref object of Entity
publishProc: DuringProc
assertionMap: Table[Handle, DuringAction]
proc duringPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
var de = DuringEntity(e)
let handler = de.handler(de.facet, a.value, h)
let g = de.assertionMap.getOrDefault h
case g.kind
of null, dead:
de.assertionMap[h] = DuringAction(kind: act, action: handler)
of act:
raiseAssert("during: duplicate handle in publish: " & $h)
proc duringRetract(e: Entity; h: Handle) {.cps: Cont.} =
var de = DuringEntity(e)
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
de.assertionMap[h] = DuringAction(kind: dead)
of dead:
raiseAssert("during: duplicate handle in retract: " & $h)
of act:
de.assertionMap.del h
if not g.action.isNil:
g.action(de.facet)
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)

File diff suppressed because one or more lines are too long

9
src/sam/observers.nim Normal file
View File

@ -0,0 +1,9 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import preserves
import ./actors, ./patterns, ./protocols/dataspace
proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Cap {.discardable.} =
result = newCap(turn, e)
publish(turn, ds, Observe(pattern: pat, observer: result))

View File

@ -6,7 +6,7 @@ from std/os import getEnv, `/`
import pkg/sys/[ioqueue, sockets] import pkg/sys/[ioqueue, sockets]
import preserves import preserves
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress] import ./syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
when defined(traceSyndicate): when defined(traceSyndicate):
when defined(posix): when defined(posix):

443
src/sam/relays.nim.old Normal file
View File

@ -0,0 +1,443 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, options, tables]
from std/os import getEnv, `/`
import preserves
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
when defined(traceSyndicate):
when defined(posix):
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
else:
template trace(args: varargs[untyped]): untyped = echo(args)
else:
template trace(args: varargs[untyped]): untyped = discard
export `$`
type
Oid = sturdy.Oid
export Stdio, Tcp, WebSocket, Unix
type
Assertion = Value
WireRef = sturdy.WireRef
Turn = syndicate.Turn
Handle = actors.Handle
type
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
Relay* = ref object of RootObj
facet: Facet
inboundAssertions: Table[Handle,
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
outboundAssertions: Table[Handle, seq[WireSymbol]]
exported: Membrane
imported: Membrane
nextLocalOid: Oid
pendingTurn: protocol.Turn
packetWriter: PacketWriter
untrusted: bool
SyncPeerEntity = ref object of Entity
relay: Relay
peer: Cap
handleMap: Table[Handle, Handle]
e: WireSymbol
RelayEntity = ref object of Entity
## https://synit.org/book/protocol.html#relay-entities
label: string
relay: Relay
proc releaseCapOut(r: Relay; e: WireSymbol) =
r.exported.drop e
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
spe.handleMap[h] = publish(t, spe.peer, a.value)
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
var other: Handle
if se.handleMap.pop(h, other):
retract(t, other)
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
if not se.e.isNil:
se.relay.releaseCapOut(se.e)
message(t, se.peer, a.value)
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
sync(t, se.peer, peer)
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
SyncPeerEntity(relay: r, peer: p)
proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireRef =
if cap.target of RelayEntity and cap.target.RelayEntity.relay == relay and cap.attenuation.len == 0:
result = WireRef(orKind: WireRefKind.yours, yours: WireRefYours(oid: cap.target.oid))
else:
var ws = grab(relay.exported, cap)
if ws.isNil:
ws = newWireSymbol(relay.exported, relay.nextLocalOid, cap)
inc relay.nextLocalOid
exported.add ws
result = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: ws.oid))
proc rewriteOut(relay: Relay; v: Assertion):
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
var exported: seq[WireSymbol]
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let o = pr.unembed(Cap); if o.isSome:
rewriteCapOut(relay, o.get, exported).toPreserves
else: pr
result.exported = exported
proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] =
result = rewriteOut(relay, v)
relay.outboundAssertions[h] = result.exported
proc deregister(relay: Relay; h: Handle) =
var outbound: seq[WireSymbol]
if relay.outboundAssertions.pop(h, outbound):
for e in outbound: releaseCapOut(relay, e)
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
if r.pendingTurn.len == 0:
# If the pending queue is empty then schedule a packet
# to be sent after pending I/O is processed.
callSoon do ():
r.facet.run do (turn: var Turn):
var pkt = Packet(
orKind: PacketKind.Turn,
turn: move r.pendingTurn)
trace "C: ", pkt
assert(not r.packetWriter.isNil, "missing packetWriter proc")
r.packetWriter(turn, encode pkt)
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
send(re.relay, turn, protocol.Oid re.oid, ev)
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
re.send(t, Event(
orKind: EventKind.Assert,
`assert`: protocol.Assert(
assertion: re.relay.register(a.value, h).rewritten,
handle: h)))
method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} =
re.relay.deregister h
re.send(t, Event(
orKind: EventKind.Retract,
retract: Retract(handle: h)))
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} =
var (value, exported) = rewriteOut(re.relay, msg.value)
assert(len(exported) == 0, "cannot send a reference in a message")
if len(exported) == 0:
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} =
var
peerEntity = newSyncPeerEntity(re.relay, peer)
exported: seq[WireSymbol]
wr = rewriteCapOut(re.relay, turn.newCap(peerEntity), exported)
peerEntity.e = exported[0]
var ev = Event(orKind: EventKind.Sync)
ev.sync.peer = wr.toPreserves.embed
re.send(turn, ev)
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
RelayEntity(label: label, relay: r, oid: o)
using
relay: Relay
facet: Facet
proc lookupLocal(relay; oid: Oid): Cap =
let sym = relay.exported.grab oid
if sym.isNil: newInertCap()
else: sym.cap
proc isInert(r: Cap): bool =
r.target.isNil
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
case n.orKind
of WireRefKind.mine:
var e = relay.imported.grab(n.mine.oid)
if e.isNil:
e = newWireSymbol(
relay.imported,
n.mine.oid,
newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)),
)
imported.add e
result = e.cap
of WireRefKind.yours:
let r = relay.lookupLocal(n.yours.oid)
if n.yours.attenuation.len == 0 or r.isInert: result = r
else: raiseAssert "attenuation not implemented"
proc rewriteIn(relay; facet; v: Value):
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
var imported: seq[WireSymbol]
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let wr = pr.preservesTo WireRef; if wr.isSome:
result = rewriteCapIn(relay, facet, wr.get, imported).embed
else:
result = pr
result.imported = imported
proc close(r: Relay) = discard
proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
relay.inboundAssertions[event.assert.handle] = (publish(turn, cap, a), imported,)
of EventKind.Retract:
let remoteHandle = event.retract.handle
var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]]
if relay.inboundAssertions.pop(remoteHandle, outbound):
for e in outbound.imported: relay.imported.drop e
turn.retract(outbound.localHandle)
of EventKind.Message:
let (a, imported) = rewriteIn(relay, turn.facet, event.message.body)
assert imported.len == 0, "Cannot receive transient reference"
turn.message(cap, a)
of EventKind.Sync:
discard # TODO
#[
var imported: seq[WireSymbol]
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
turn.sync(cap) do (turn: var Turn):
turn.message(k, true)
for e in imported: relay.imported.del e
]#
proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
trace "S: ", v
run(relay.facet) do (t: var Turn):
var pkt: Packet
if pkt.fromPreserves(v):
case pkt.orKind
of PacketKind.Turn:
# https://synit.org/book/protocol.html#turn-packets
for te in pkt.turn:
let r = lookupLocal(relay, te.oid.Oid)
if not r.isInert:
dispatch(relay, t, r, te.event)
else:
stderr.writeLine("discarding event for unknown Cap; ", te.event)
of PacketKind.Error:
# https://synit.org/book/protocol.html#error-packets
when defined(posix):
stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
close relay
of PacketKind.Extension:
# https://synit.org/book/protocol.html#extension-packets
discard
else:
when defined(posix):
stderr.writeLine("discarding undecoded packet ", v)
type
RelayOptions* = object of RootObj
packetWriter*: PacketWriter
untrusted*: bool
RelayActorOptions* = object of RelayOptions
initialOid*: Option[Oid]
initialCap*: Cap
nextLocalOid*: Option[Oid]
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
result = Relay(
facet: turn.facet,
packetWriter: opts.packetWriter,
untrusted: opts.untrusted)
discard result.facet.preventInertCheck()
setup(turn, result)
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection =
result.`addr` = addrAss
result.resolved = Resolved(orKind: ResolvedKind.accepted)
result.resolved.accepted.responderSession = ds
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
discard spawn(name, turn) do (turn: var Turn):
let relay = newRelay(turn, opts, setup)
if not opts.initialCap.isNil:
var exported: seq[WireSymbol]
discard rewriteCapOut(relay, opts.initialCap, exported)
opts.nextLocalOid.map do (oid: Oid):
relay.nextLocalOid =
if oid == 0.Oid: 1.Oid
else: oid
if opts.initialOid.isSome:
var
imported: seq[WireSymbol]
wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
res = rewriteCapIn(relay, turn.facet, wr, imported)
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
else:
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
when defined(posix):
import std/asyncnet
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
type ShutdownEntity* = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
export Tcp
when defined(posix):
export Unix
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Value) =
## Relay a dataspace over an open `AsyncSocket`.
proc socketWriter(packet: sink Packet): Future[void] =
socket.send(cast[string](encode(packet)))
const recvSize = 0x2000
var shutdownCap: Cap
let
reenable = turn.facet.preventInertCheck()
connectionClosedCap = newCap(turn, ShutdownEntity())
discard bootActor("socket") do (turn: var Turn):
var ops = RelayActorOptions(
packetWriter: socketWriter,
initialOid: 0.Oid.some)
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet
var wireBuf = newBufferedDecoder(0)
turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedCap, true)
shutdownCap = newCap(turn, ShutdownEntity())
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value):
raise newException(IOError, $detail)
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap):
run(gatekeeper.relay) do (turn: var Turn):
reenable()
discard publish(turn, shutdownCap, true)
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
let facet = inFacet(turn) do (turn: var Turn):
let o = ass.preservesTo Resolved; if o.isSome:
discard publish(turn, ds, ResolvePath(
route: route, `addr`: addrAss, resolved: o.get))
proc action(turn: var Turn) =
stop(turn, facet)
result = action
var resolve = Resolve(
step: step,
observer: newCap(turn, during(duringCallback)),
)
discard publish(turn, gatekeeper, resolve)
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Value) =
## Relay a dataspace over TCP.
let socket = newAsyncSocket(
domain = AF_INET,
sockType = SOCK_STREAM,
protocol = IPPROTO_TCP,
buffered = false)
let fut = connect(socket, transport.host, Port transport.port)
addCallback(fut, turn) do (turn: var Turn):
connect(turn, ds, route, transport.toPreserves, socket, step)
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Value) =
## Relay a dataspace over a UNIX socket.
let socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
let fut = connectUnix(socket, transport.path)
addCallback(fut, turn) do (turn: var Turn):
connect(turn, ds, route, transport.toPreserves, socket, step)
import std/asyncfile
const stdinReadSize = 128
proc connectStdio*(turn: var Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout.
proc stdoutWriter(packet: sink Packet): Future[void] =
result = newFuture[void]()
var buf = encode(packet)
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
flushFile(stdout)
complete result
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialCap: ds,
initialOid: 0.Oid.some)
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin") # this is universal now?
close(stdin)
facet.actor.atExit do (turn: var Turn):
close(asyncStdin)
var wireBuf = newBufferedDecoder(0)
proc readCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
proc envRoute*: Route =
var text = getEnv("SYNDICATE_ROUTE")
if text == "":
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
result.transports = @[initRecord("unix", tx)]
result.pathSteps = @[capabilities.mint().toPreserves]
else:
var pr = parsePreserves(text)
if not result.fromPreserves(pr):
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
var
unix: Unix
tcp: Tcp
stdio: Stdio
doAssert(route.transports.len == 1, "only a single transport supported for routes")
doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes")
if unix.fromPreserves route.transports[0]:
connect(turn, ds, route, unix, route.pathSteps[0])
elif tcp.fromPreserves route.transports[0]:
connect(turn, ds, route, tcp, route.pathSteps[0])
elif stdio.fromPreserves route.transports[0]:
connectStdio(turn, ds)
bootProc(turn, ds)
else:
raise newException(ValueError, "unsupported route")
during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap):
bootProc(turn, dest)

View File

@ -65,9 +65,9 @@ func isEmpty(cont: Continuation): bool =
cont.cache.len == 0 and cont.leafMap.len == 0 cont.cache.len == 0 and cont.leafMap.len == 0
type type
ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.} ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
LeafProc = proc (l: Leaf; v: Value) {.gcsafe.} LeafProc = proc (l: Leaf; v: Value) {.closure.}
ObserverProc = proc (turn: Turn; group: ObserverGroup; vs: seq[Value]) {.gcsafe.} ObserverProc = proc (group: ObserverGroup; vs: seq[Value]) {.closure.}
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap = proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
result = cont.leafMap.getOrDefault(constPaths) result = cont.leafMap.getOrDefault(constPaths)
@ -114,10 +114,10 @@ proc top(stack: TermStack): Value =
assert stack.len > 0 assert stack.len > 0
stack[stack.high] stack[stack.high]
proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind; proc modify(node: Node; outerValue: Value; event: EventKind;
modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) = modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) =
proc walk(cont: Continuation; turn: Turn) = proc walk(cont: Continuation) =
modCont(cont, outerValue) modCont(cont, outerValue)
for constPaths, constValMap in cont.leafMap.pairs: for constPaths, constValMap in cont.leafMap.pairs:
let constVals = projectPaths(outerValue, constPaths) let constVals = projectPaths(outerValue, constPaths)
@ -129,7 +129,7 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
for capturePaths, observerGroup in leaf.observerGroups.pairs: for capturePaths, observerGroup in leaf.observerGroups.pairs:
let captures = projectPaths(outerValue, capturePaths) let captures = projectPaths(outerValue, capturePaths)
if captures.isSome: if captures.isSome:
modObs(turn, observerGroup, get captures) modObs(observerGroup, get captures)
of removedEvent: of removedEvent:
let leaf = constValMap.getOrDefault(get constVals) let leaf = constValMap.getOrDefault(get constVals)
if not leaf.isNil: if not leaf.isNil:
@ -137,13 +137,13 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
for capturePaths, observerGroup in leaf.observerGroups.pairs: for capturePaths, observerGroup in leaf.observerGroups.pairs:
let captures = projectPaths(outerValue, capturePaths) let captures = projectPaths(outerValue, capturePaths)
if captures.isSome: if captures.isSome:
modObs(turn, observerGroup, get captures) modObs(observerGroup, get captures)
if leaf.isEmpty: if leaf.isEmpty:
constValMap.del(get constVals) constValMap.del(get constVals)
proc walk(node: Node; turn: Turn; termStack: TermStack) = proc walk(node: Node; termStack: TermStack) =
walk(node.continuation, turn) walk(node.continuation)
for selector, table in node.edges: for selector, table in node.edges:
let let
nextStack = pop(termStack, selector.popCount) nextStack = pop(termStack, selector.popCount)
@ -153,11 +153,11 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
if nextClass.kind != classNone: if nextClass.kind != classNone:
let nextNode = table.getOrDefault(nextClass) let nextNode = table.getOrDefault(nextClass)
if not nextNode.isNil: if not nextNode.isNil:
walk(nextNode, turn, push(nextStack, get nextValue)) walk(nextNode, push(nextStack, get nextValue))
if event == removedEvent and nextNode.isEmpty: if event == removedEvent and nextNode.isEmpty:
table.del(nextClass) table.del(nextClass)
walk(node, turn, @[@[outerValue].toPreserves]) walk(node, @[@[outerValue].toPreserves])
proc getOrNew[A, B, C](t: var Table[A, TableRef[B, C]], k: A): TableRef[B, C] = proc getOrNew[A, B, C](t: var Table[A, TableRef[B, C]], k: A): TableRef[B, C] =
result = t.getOrDefault(k) result = t.getOrDefault(k)
@ -227,7 +227,7 @@ proc getEndpoints(leaf: Leaf; capturePaths: Paths): ObserverGroup =
if captures.isSome: if captures.isSome:
discard result.cachedCaptures.change(get captures, +1) discard result.cachedCaptures.change(get captures, +1)
proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = proc add*(index: var Index; pattern: Pattern; observer: Cap) =
let let
cont = index.root.extend(pattern) cont = index.root.extend(pattern)
analysis = analyse pattern analysis = analyse pattern
@ -237,10 +237,10 @@ proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
# TODO if endpoints.cachedCaptures.len > 0: # TODO if endpoints.cachedCaptures.len > 0:
var captureMap = newTable[seq[Value], Handle]() var captureMap = newTable[seq[Value], Handle]()
for capture in endpoints.cachedCaptures.items: for capture in endpoints.cachedCaptures.items:
captureMap[capture] = publish(turn, observer, capture.toPreserves) captureMap[capture] = publish(observer, capture.toPreserves)
endpoints.observers[observer] = captureMap endpoints.observers[observer] = captureMap
proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = proc remove*(index: var Index; pattern: Pattern; observer: Cap) =
let let
cont = index.root.extend(pattern) cont = index.root.extend(pattern)
analysis = analyse pattern analysis = analyse pattern
@ -252,7 +252,7 @@ proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
if not endpoints.isNil: if not endpoints.isNil:
var captureMap: TableRef[seq[Value], Handle] var captureMap: TableRef[seq[Value], Handle]
if endpoints.observers.pop(observer, captureMap): if endpoints.observers.pop(observer, captureMap):
for handle in captureMap.values: retract(turn, handle) for handle in captureMap.values: retract(observer, handle)
if endpoints.observers.len == 0: if endpoints.observers.len == 0:
leaf.observerGroups.del(analysis.capturePaths) leaf.observerGroups.del(analysis.capturePaths)
if leaf.observerGroups.len == 0: if leaf.observerGroups.len == 0:
@ -260,7 +260,7 @@ proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
if constValMap.len == 0: if constValMap.len == 0:
cont.leafMap.del(analysis.constPaths) cont.leafMap.del(analysis.constPaths)
proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int): bool = proc adjustAssertion(index: var Index; outerValue: Value; delta: int): bool =
case index.allAssertions.change(outerValue, delta) case index.allAssertions.change(outerValue, delta)
of cdAbsentToPresent: of cdAbsentToPresent:
result = true result = true
@ -268,37 +268,37 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int
c.cache.incl(v) c.cache.incl(v)
proc modLeaf(l: Leaf; v: Value) = proc modLeaf(l: Leaf; v: Value) =
l.cache.incl(v) l.cache.incl(v)
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) = proc modObserver(group: ObserverGroup; vs: seq[Value]) =
let change = group.cachedCaptures.change(vs, +1) let change = group.cachedCaptures.change(vs, +1)
if change == cdAbsentToPresent: if change == cdAbsentToPresent:
for (observer, captureMap) in group.observers.pairs: for (observer, captureMap) in group.observers.pairs:
captureMap[vs] = publish(turn, observer, vs.toPreserves) captureMap[vs] = publish(observer, vs.toPreserves)
# TODO: this handle is coming from the facet? # TODO: this handle is coming from the facet?
modify(index.root, turn, outerValue, addedEvent, modContinuation, modLeaf, modObserver) modify(index.root, outerValue, addedEvent, modContinuation, modLeaf, modObserver)
of cdPresentToAbsent: of cdPresentToAbsent:
result = true result = true
proc modContinuation(c: Continuation; v: Value) = proc modContinuation(c: Continuation; v: Value) =
c.cache.excl(v) c.cache.excl(v)
proc modLeaf(l: Leaf; v: Value) = proc modLeaf(l: Leaf; v: Value) =
l.cache.excl(v) l.cache.excl(v)
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) = proc modObserver(group: ObserverGroup; vs: seq[Value]) =
if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent: if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
for (observer, captureMap) in group.observers.pairs: for (observer, captureMap) in group.observers.pairs:
var h: Handle var h: Handle
if captureMap.take(vs, h): if captureMap.take(vs, h):
retract(observer.target, turn, h) retract(observer, h)
modify(index.root, turn, outerValue, removedEvent, modContinuation, modLeaf, modObserver) modify(index.root, outerValue, removedEvent, modContinuation, modLeaf, modObserver)
else: discard else: discard
proc continuationNoop(c: Continuation; v: Value) = discard proc continuationNoop(c: Continuation; v: Value) = discard
proc leafNoop(l: Leaf; v: Value) = discard proc leafNoop(l: Leaf; v: Value) = discard
proc add*(index: var Index; turn: Turn; v: Value): bool = proc add*(index: var Index; v: Value): bool =
adjustAssertion(index, turn, v, +1) adjustAssertion(index, v, +1)
proc remove*(index: var Index; turn: Turn; v: Value): bool = proc remove*(index: var Index; v: Value): bool =
adjustAssertion(index, turn, v, -1) adjustAssertion(index, v, -1)
proc deliverMessage*(index: var Index; turn: Turn; v: Value) = proc deliverMessage*(index: var Index; v: Value) =
proc observersCb(turn: Turn; group: ObserverGroup; vs: seq[Value]) = proc observersCb(group: ObserverGroup; vs: seq[Value]) =
for observer in group.observers.keys: message(turn, observer, vs.toPreserves) for observer in group.observers.keys: message(observer, vs.toPreserves)
index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb) index.root.modify(v, messageEvent, continuationNoop, leafNoop, observersCb)

View File

@ -10,9 +10,9 @@ import pkg/cps
import preserves import preserves
export preserves export preserves
import ./syndicate/[actors, dataspaces, patterns] import ./[actors, dataspaces, patterns]
# durings # durings
import ./syndicate/protocols/dataspace import ./protocols/dataspace
export actors, dataspace, dataspaces, patterns export actors, dataspace, dataspaces, patterns
@ -37,22 +37,26 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline.
patterns.inject(pat, bindings) patterns.inject(pat, bindings)
type type
PublishProc = proc (turn: Turn; v: Value; h: Handle) {.closure.} PublishProc = proc (v: Value; h: Handle) {.closure.}
RetractProc = proc (turn: Turn; h: Handle) {.closure.} RetractProc = proc (h: Handle) {.closure.}
MessageProc = proc (turn: Turn; v: Value) {.closure.} MessageProc = proc (v: Value) {.closure.}
ClosureEntity = ref object of Entity ClosureEntity = ref object of Entity
publishImpl*: PublishProc publishCb*: PublishProc
retractImpl*: RetractProc retractCb*: RetractProc
messageImpl*: MessageProc messageCb*: MessageProc
method publish(e: ClosureEntity; turn: Turn; a: AssertionRef; h: Handle) = proc publishCont(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h) var ce = ClosureEntity(e)
if not ce.publishCb.isNil: ce.publishCb(v, h)
method retract(e: ClosureEntity; turn: Turn; h: Handle) = proc retractCont(e: Entity; h: Handle) {.cps: Cont.} =
if not e.retractImpl.isNil: e.retractImpl(turn, h) var ce = ClosureEntity(e)
if not ce.retractCb.isNil: ce.retractCb(h)
method message(e: ClosureEntity; turn: Turn; a: AssertionRef) = proc messageCont(e: Entity; v: Value) {.cps: Cont.} =
if not e.messageImpl.isNil: e.messageImpl(turn, a.value) var ce = ClosureEntity(e)
if not ce.messageCb.isNil: ce.messageCb(v)
proc argumentCount(handler: NimNode): int = proc argumentCount(handler: NimNode): int =
handler.expectKind {nnkDo, nnkStmtList} handler.expectKind {nnkDo, nnkStmtList}
@ -108,7 +112,7 @@ proc wrapMessageHandler(handler: NimNode): NimNode =
handlerSym = genSym(nskProc, "message") handlerSym = genSym(nskProc, "message")
bindingsSym = ident"bindings" bindingsSym = ident"bindings"
quote do: quote do:
proc `handlerSym`(turn: Turn; `bindingsSym`: Value) = proc `handlerSym`(`bindingsSym`: Value) =
`varSection` `varSection`
if fromPreserves(`valuesSym`, bindings): if fromPreserves(`valuesSym`, bindings):
`body` `body`
@ -150,9 +154,8 @@ macro onPublish*(ds: Cap; pattern: Pattern; handler: untyped) =
discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`)) discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`))
]# ]#
#[ macro onMessage*(cap: Cap; pattern: Pattern; handler: untyped) =
macro onMessage*(ds: Cap; pattern: Pattern; handler: untyped) = ## Call `handler` when an message matching `pattern` is broadcasted at `cap`.
## Call `handler` when an message matching `pattern` is broadcasted at `ds`.
let let
argCount = argumentCount(handler) argCount = argumentCount(handler)
handlerProc = wrapMessageHandler(handler) handlerProc = wrapMessageHandler(handler)
@ -161,8 +164,10 @@ macro onMessage*(ds: Cap; pattern: Pattern; handler: untyped) =
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`: if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`) raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`handlerProc` `handlerProc`
discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`)) discard observe(`cap`, `pattern`, ClosureEntity(
]# messageImpl: whelp messageCont,
messageCb: `handlerSym`,
))
#[ #[
macro during*(ds: Cap; pattern: Pattern; publishBody, retractBody: untyped) = macro during*(ds: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
@ -202,15 +207,15 @@ proc wrapHandler(body: NimNode; ident: string): NimNode =
proc `sym`() = proc `sym`() =
`body` `body`
macro onStop*(body: untyped) = #[
macro onStop*(facet: Facet; body: untyped) =
let let
handlerDef = wrapHandler(body, "onStop") handlerDef = wrapHandler(body, "onStop")
handlerSym = handlerDef[0] handlerSym = handlerDef[0]
result = quote do: result = quote do:
`handlerDef` `handlerDef`
addOnStopHandler(`handlerSym`) addOnStopHandler(facet, `handlerSym`)
#[
macro onStop*(body: untyped) = macro onStop*(body: untyped) =
quote do: quote do:
block: block:

19
src/sam/tracing.nim Normal file
View File

@ -0,0 +1,19 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import ./protocols/[protocol, trace]
export trace
proc traceAction*(e: protocol.Event): trace.TurnEvent =
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: result,
)
act.enqueue.event.detail.assert.assertion.value.value = val
turn.desc.actions.add act

View File

@ -1,347 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[deques, hashes, options, times]
import pkg/cps
import preserves
import ../syndicate/protocols/[protocol, sturdy]
# const traceSyndicate {.booldefine.}: bool = true
const traceSyndicate* = true
when traceSyndicate:
import std/streams
from std/os import getEnv
import ./protocols/trace
export protocol.Handle
type
Cont* = ref object of Continuation
turn: Turn
facet: Facet
Handler* = proc() {.closure.}
Work = Deque[Cont]
HandlerDeque = seq[ContinuationProc[Continuation]]
FacetState = enum fFresh, fRunning, fEnded
Callback* = proc () {.nimcall.}
Facet* = ref object
## https://synit.org/book/glossary.html#facet
actor: Actor
parent: Facet
children: seq[Facet]
stopHandlers: HandlerDeque
stopCallbacks: seq[Callback]
state: FacetState
when traceSyndicate:
id: FacetId
Turn* = ref object
## https://synit.org/book/glossary.html#turn
facet: Facet
entity: Entity
event: Option[protocol.Event]
work: Work
when traceSyndicate:
desc: TurnDescription
Entity* = ref object of RootObj
## https://synit.org/book/glossary.html#entity
facet*: Facet
oid*: sturdy.Oid # oid is how Entities are identified over the wire
when traceSyndicate:
id: FacetId
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
relay*: Facet
target*: Entity
attenuation*: seq[sturdy.Caveat]
Actor* = ref object
## https://synit.org/book/glossary.html#actor
# crashHandlers: HandlerDeque
root: Facet
handleAllocator: Handle
facetIdAllocator: int
id: ActorId
when traceSyndicate:
traceStream: FileStream
stopped: bool
template syndicate*(prc: typed): untyped =
cps(Cont, prc)
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
## Return the active `Turn` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.turn
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a `{.syndicate.}` context.
assert not c.facet.isNil
c.facet
proc activeActor*(c: Cont): Actor {.cpsVoodoo.} =
## Return the active `Actor` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.facet.actor
using
actor: Actor
facet: Facet
entity: Entity
cap: Cap
turn: Turn
proc hash*(facet): Hash = facet.unsafeAddr.hash
proc hash*(cap): Hash = cap.unsafeAddr.hash
proc newFacet(actor: Actor; parent: Facet): Facet =
inc(actor.facetIdAllocator)
result = Facet(
actor: actor,
parent: parent,
id: actor.facetIdAllocator.toPreserves,
)
proc stopped*(facet): bool = facet.state != fRunning
proc newActor(name: string): Actor =
result = Actor(id: name.toPreserves)
result.root = newFacet(result, nil)
when traceSyndicate:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": discard
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
when traceSyndicate:
proc trace(actor; act: ActorActivation) =
if not actor.traceStream.isNil:
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: actor.id,
item: act,
)
actor.traceStream.writeLine($entry.toPreserves)
proc trace(actor; turn) =
if not actor.traceStream.isNil:
actor.trace(ActorActivation(
orKind: ActorActivationKind.turn,
turn: turn.desc,
))
proc traceTarget(cap): trace.Target =
let facet = cap.relay
Target(
actor: facet.actor.id,
facet: facet.id,
oid: cap.target.oid.toPreserves,
)
proc traceTarget(turn): trace.Target =
let facet = turn.facet
Target(
actor: facet.actor.id,
facet: facet.id,
)
proc newExternalTurn(facet): Turn =
result = Turn(facet: facet)
when traceSyndicate:
result.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
proc pass*(a, b: Cont): Cont =
b.turn = a.turn
if b.facet.isNil:
b.facet = a.facet
# TODO: whelp a new continuation at facet boundaries?
b
proc queue(t: Turn; c: Cont) =
c.facet = t.facet
t.work.addLast(c)
proc queue(c: Cont): Cont {.cpsMagic.} =
queue(c.turn, c)
nil
proc complete(turn; c: Cont) =
var c = c
try:
while not c.isNil and not c.fn.isNil:
c.turn = turn
var y = c.fn
var x = y(c)
c = Cont(x)
except CatchableError as err:
if not c.dismissed:
writeStackFrames c
# terminate(c.facet, err)
proc run(turn) =
let actor = turn.facet.actor
assert not actor.stopped
while turn.work.len > 0:
complete(turn, turn.work.popFirst())
when traceSyndicate:
actor.trace(turn)
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
proc start(actor; cont: Cont) =
when traceSyndicate:
var act = ActorActivation(orkind: ActorActivationKind.start)
trace(actor, act)
actor.root.state = fRunning
let turn = actor.root.newExternalTurn()
turn.queue(cont)
run(turn)
proc stop(turn; actor)
proc collectPath(result: var seq[FacetId]; facet) =
if not facet.parent.isNil:
collectPath(result, facet.parent)
result.add(facet.id)
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
c.fn = facet.stopHandlers.pop()
result = c
proc runNextFacetStop() {.syndicate.} =
activeFacet().runNextStop()
proc stop(turn; facet; reason: FacetStopReason) =
while facet.stopHandlers.len > 0:
var c = whelp runNextFacetStop()
c.facet = facet
complete(turn, c)
while facet.stopCallbacks.len > 0:
var cb = facet.stopCallbacks.pop()
cb()
while facet.children.len > 0:
stop(turn, facet.children.pop(), FacetStopReason.parentStopping)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
collectPath(act.facetstop.path, facet)
act.facetStop.reason = reason
turn.desc.actions.add act
if facet.parent.isNil:
facet.actor.root = nil
stop(turn, facet.actor)
proc stop(turn; actor) =
if not actor.root.isNil:
stop(turn, actor.root, FacetStopReason.actorStopping)
actor.stopped = true
proc bootActor*(name: string, c: Cont) =
start(newActor(name), c)
proc stopActor(c: Cont; a: Actor): Cont {.cpsMagic.} =
stop(c.turn, a)
nil
proc stopFacet(c: Cont; f: Facet): Cont {.cpsMagic.} =
stop(c.turn, f, FacetStopReason.explicitAction)
nil
proc stopFacet*() {.syndicate.} =
queue()
stop(activeTurn(), activeFacet(), FacetStopReason.explicitAction)
proc stopActor*() {.syndicate.} =
queue()
stop(activeTurn(), activeActor())
type
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
proc newCap*(f: Facet; e: Entity): Cap =
Cap(relay: f, target: e)
proc nextHandle(facet: Facet): Handle =
inc(facet.actor.handleAllocator)
facet.actor.handleAllocator
proc publish*(turn: Turn; cap: Cap; val: Value): Handle =
result = turn.facet.nextHandle()
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: result,
)
act.enqueue.event.detail.assert.assertion.value.value = val
turn.desc.actions.add act
proc retract*(turn; h: Handle) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
)
act.enqueue.event.detail.retract = TurnEventRetract(handle: h)
turn.desc.actions.add act
proc message*(turn; cap; val: Value) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
)
act.enqueue.event.detail.message.body.value.value = val
turn.desc.actions.add act
proc sync*(turn; peer: Cap) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
)
act.enqueue.event.detail.sync.peer = peer.traceTarget
turn.desc.actions.add act
proc publish*(cap: Cap; val: Value): Handle {.syndicate.} =
publish(activeTurn(), cap, val)
proc retract*(h: Handle) {.syndicate.} =
activeTurn().retract(h)
proc message*(cap: Cap; val: Value) {.syndicate.} =
activeTurn().message(cap, val)
proc sync*(cap: Cap) {.syndicate.} =
activeTurn().sync(cap)
proc installStopHook*(c: Cont, facet: Facet): Cont {.cpsMagic.} =
facet.stopHandlers.add(c.fn)
return c
proc addOnStopHandler*(c: Cont; cb: Callback): Cont {.cpsMagic.} =
c.facet.stopCallbacks.add(cb)
result = c

View File

@ -1,41 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, options, tables]
import pkg/cps
import preserves
import ./actors, ./protocols/dataspace, ./skeletons
from ./protocols/protocol import Handle
from ./protocols/dataspace import Observe
type
Dataspace {.final.} = ref object of Entity
index: Index
handleMap: Table[Handle, Value]
method publish(ds: Dataspace; turn: Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
if add(ds.index, turn, a.value):
var obs = a.value.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
ds.handleMap[h] = a.value
method retract(ds: Dataspace; turn: Turn; h: Handle) {.gcsafe.} =
let v = ds.handleMap[h]
if remove(ds.index, turn, v):
ds.handleMap.del h
var obs = v.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
method message(ds: Dataspace; turn: Turn; a: AssertionRef) {.gcsafe.} =
ds.index.deliverMessage(turn, a.value)
proc newDataspace*(f: Facet): Cap =
newCap(f, Dataspace(index: initIndex()))
type BootProc = proc (ds: Cap)
proc newDataspace*(): Cap {.syndicate.} =
activeFacet().newDataspace()

View File

@ -1,64 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, tables]
import preserves
import ./actors, ./patterns, ./protocols/dataspace
type
DuringProc* = proc (a: Value; h: Handle)
DuringActionKind = enum null, dead, act
DuringAction = object
case kind: DuringActionKind
of null, dead: discard
of act:
action: Cont
DuringEntity {.final.}= ref object of Entity
cb: DuringProc
assertionMap: Table[Handle, DuringAction]
method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) =
let action = de.cb(turn, a.value, h)
# assert(not action.isNil "should have put in a no-op action")
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
de.assertionMap[h] = DuringAction(kind: act, action: action)
of dead:
de.assertionMap.del h
freshen(turn, action)
of act:
raiseAssert("during: duplicate handle in publish: " & $h)
method retract(de: DuringEntity; turn: Turn; h: Handle) =
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
de.assertionMap[h] = DuringAction(kind: dead)
of dead:
raiseAssert("during: duplicate handle in retract: " & $h)
of act:
de.assertionMap.del h
if not g.action.isNil:
turn.queue(g.action)
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
proc observe*(turn: Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)).toPreserves)
proc assertHandle(turn: Turn): Handle =
doAssert(
turn.event.isSome and turn.event.get.orKind == EventKind.Assert,
"operation not valid during this turn")
turn.event.get.assert.handle
proc awaitRetraction(cont: Cont): Cont {.cpsMagic.} =
{.error: "this cannot work".}
let h = cont.turn.assertHandle
while true:
let turn = cont.turn
if turn.retracts(h):
return c
else:
turn.facet.actor.qeueue(c)

View File

@ -1,6 +1,6 @@
# Package # Package
version = "20240216" version = "20240219"
author = "Emery Hemingway" author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency" description = "Syndicated actors for conversational concurrency"
license = "Unlicense" license = "Unlicense"

View File

@ -23,17 +23,17 @@ proc readStdin(facet: Facet; ds: Cap; username: string) =
readLine() readLine()
readLine() readLine()
proc chat(turn: var Turn; ds: Cap; username: string) = proc chat(facet: Facet; ds: Cap; username: string) =
during(turn, ds, ?:Present) do (who: string): during(facet, ds, ?:Present) do (who: string):
echo who, " joined" echo who, " joined"
do: do:
echo who, " left" echo who, " left"
onMessage(turn, ds, ?:Says) do (who: string, what: string): onMessage(facet, ds, ?:Says) do (who: string, what: string):
echo who, ": ", what echo who, ": ", what
discard publish(turn, ds, Present(username: username)) discard publish(facet, ds, Present(username: username))
readStdin(turn.facet, ds, username) readStdin(facet, ds, username)
proc main = proc main =
let route = envRoute() let route = envRoute()
@ -48,9 +48,10 @@ proc main =
if username == "": if username == "":
stderr.writeLine "--user: unspecified" stderr.writeLine "--user: unspecified"
else: else:
runActor("chat") do (turn: var Turn; root: Cap): runActor("chat") do (root: Facet):
spawnRelays(turn, root) let ds = facet.newDataspace()
resolve(turn, root, route) do (turn: var Turn; ds: Cap): facet.spawnRelays(ds)
chat(turn, ds, username) resolve(facet, ds, route) do (remote: Cap):
chat(facet, remote, username)
main() main()

View File

@ -3,28 +3,34 @@
import std/times import std/times
import pkg/cps import pkg/cps
import syndicate import sam/syndicate
# import syndicate/actors/timers import sam/actors/timers
proc now: float64 = getTime().toUnixFloat() proc now: float64 = getTime().toUnixFloat()
proc main() {.syndicate.} = runActor("timer-test") do (root: Facet):
let ds = newDataspace() echo "actor calls boot proc with root facte ", root
let h = publish(ds, "hello world!".toPreserves)
#onMessage(ds, grab()) do (v: Value): let ds = root.newDataspace()
# stderr.writeLine "observed message ", v echo "new dataspace ", ds
let h = publish(ds, "hello world!".toPreserves)
echo "published to handle ", h
onMessage(ds, grab()) do (v: Value):
stderr.writeLine "observed message ", v
message(ds, "hello world!".toPreserves) message(ds, "hello world!".toPreserves)
retract(h) echo "sent a message"
sync(ds) retract(ds, h)
echo "retracted handle ", h
# facet.sync(ds)
onStop: root.onStop:
echo "anonymous stop handler was invoked" echo "anonymous stop handler was invoked"
echo "stopping actor" echo "stopping actor"
stopActor() root.stopActor()
echo "actor stopped but still executing?" echo "actor stopped but still executing?"
#[ #[
@ -38,5 +44,3 @@ proc main() {.syndicate.} =
stderr.writeLine "slept one second thrice" stderr.writeLine "slept one second thrice"
stopActor() stopActor()
]# ]#
bootActor("main", whelp main())