Hash brownie

This commit is contained in:
Emery Hemingway 2024-02-18 00:56:59 +00:00
parent fe268dea8d
commit b2506d81bb
38 changed files with 2247 additions and 61 deletions

View File

@ -1,2 +0,0 @@
include_rules
: foreach *.nim |> !nim_check |>

699
src/sam/actors.bak Normal file
View File

@ -0,0 +1,699 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[deques, hashes, monotimes, options, sets, sequtils, tables, times]
import pkg/cps
import preserves
import ../syndicate/protocols/[protocol, sturdy]
export cps
const traceSyndicate {.booldefine.}: bool = true
when traceSyndicate:
import std/streams
from std/os import getEnv
import ./protocols/trace
type TraceSink = ref object
stream: FileStream
proc newTraceSink: TraceSink =
new result
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": quit"$SYNDICATE_TRACE_FILE unset"
of "-": actor.stream = newFileStream(stderr)
else: result.stream = openFileStream(path, fmWrite)
proc write(s: TraceSink; e: TraceEntry) = s.write(e.toPreserves)
export Handle
template generateIdType(typ: untyped) =
type typ* = distinct Natural
proc `==`*(x, y: typ): bool {.borrow.}
proc `$`*(id: typ): string {.borrow.}
generateIdType(ActorId)
generateIdType(FacetId)
generateIdType(EndpointId)
generateIdType(FieldId)
generateIdType(TurnId)
type
Oid = sturdy.Oid
Caveat = sturdy.Caveat
Attenuation = seq[Caveat]
Rewrite = sturdy.Rewrite
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
Entity* = ref object of RootObj
oid*: Oid # oid is how Entities are identified over the wire
Cap* {.preservesEmbedded.} = ref object of EmbeddedObj
relay*: Facet
target*: Entity
attenuation*: Attenuation
Ref* {.deprecated: "Ref was renamed to Cap".} = Cap
OutboundAssertion = ref object
handle: Handle
peer: Cap
established: bool
OutboundTable = Table[Handle, OutboundAssertion]
Actor* = ref object
name: string
handleAllocator: ref Handle
# a fresh actor gets a new ref Handle and
# all actors spawned from it get the same ref.
root: Facet
exitReason: ref Exception
exitHooks: seq[TurnAction]
id: ActorId
exiting, exited: bool
when traceSyndicate:
turnIdAllocator: ref TurnId
traceStream: FileStream
TurnAction* = proc (t: Turn)
Queues = TableRef[Facet, Deque[Cont]]
Turn* = ref object
facet: Facet
queues: Queues
when traceSyndicate:
desc: TurnDescription
Cont* = ref object of Continuation
turn*: Turn
Facet* = ref FacetObj
FacetObj = object
actor*: Actor
parent: Facet
children: HashSet[Facet]
outbound: OutboundTable
shutdownActions: seq[TurnAction]
inertCheckPreventers: int
id: FacetId
isAlive: bool
proc pass*(a, b: Cont): Cont =
assert not a.turn.isNil
b.turn = move a.turn
return b
template turnAction*(prc: typed): untyped =
cps(Cont, prc)
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
assert not c.turn.isNil
c.turn
when traceSyndicate:
proc nextTurnId(facet: Facet): TurnId =
result = succ(facet.actor.turnIdAllocator[])
facet.actor.turnIdAllocator[] = result
proc trace(actor: Actor; act: ActorActivation) =
assert not actor.traceStream.isNil
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: initRecord("named", actor.name.toPreserves),
item: act)
actor.traceStream.writeLine($entry.toPreserves)
proc path(facet: Facet): seq[trace.FacetId] =
var f = facet
while not f.isNil:
result.add f.id.toPreserves
f = f.parent
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
using
actor: Actor
facet: Facet
turn: Turn
action: TurnAction
proc labels(f: Facet): string =
proc catLabels(f: Facet; labels: var string) =
labels.add ':'
if not f.parent.isNil:
catLabels(f.parent, labels)
labels.add ':'
when traceSyndicate:
labels.add $f.id
result.add f.actor.name
catLabels(f, result)
proc `$`*(f: Facet): string =
"<Facet:" & f.labels & ">"
proc `$`*(r: Cap): string =
"<Ref:" & r.relay.labels & ">"
proc `$`*(actor: Actor): string =
"<Actor:" & actor.name & ">" # TODO: ambigous
proc attenuate(r: Cap; a: Attenuation): Cap =
if a.len == 0: result = r
else: result = Cap(
relay: r.relay,
target: r.target,
attenuation: a & r.attenuation)
proc hash*(facet): Hash =
facet.id.hash
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
proc nextHandle(facet: Facet): Handle =
result = succ(facet.actor.handleAllocator[])
facet.actor.handleAllocator[] = result
proc facet*(turn: Turn): Facet = turn.facet
proc enqueue(turn: Turn; target: Facet; cont: Cont) =
cont.turn = turn
if target in turn.queues:
turn.queues[target].addLast cont
else:
turn.queues[target] = toDeque([cont])
type Bindings = Table[Value, Value]
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
case p.orKind
of PatternKind.Pdiscard: result = true
of PatternKind.Patom:
result = case p.patom
of PAtom.Boolean: v.isBoolean
of PAtom.Double: v.isDouble
of PAtom.Signedinteger: v.isInteger
of PAtom.String: v.isString
of PAtom.Bytestring: v.isByteString
of PAtom.Symbol: v.isSymbol
of PatternKind.Pembedded:
result = v.isEmbedded
of PatternKind.Pbind:
if match(bindings, p.pbind.pattern, v):
bindings[p.pbind.pattern.toPreserves] = v
result = true
of PatternKind.Pand:
for pp in p.pand.patterns:
result = match(bindings, pp, v)
if not result: break
of PatternKind.Pnot:
var b: Bindings
result = not match(b, p.pnot.pattern, v)
of PatternKind.Lit:
result = p.lit.value == v
of PatternKind.PCompound:
case p.pcompound.orKind
of PCompoundKind.rec:
if v.isRecord and
p.pcompound.rec.label == v.label and
p.pcompound.rec.fields.len == v.arity:
result = true
for i, pp in p.pcompound.rec.fields:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.arr:
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
result = true
for i, pp in p.pcompound.arr.items:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.dict:
if v.isDictionary:
result = true
for key, pp in p.pcompound.dict.entries:
let vv = step(v, key)
if vv.isNone or not match(bindings, pp, get vv):
result = true
break
proc match(p: Pattern; v: Value): Option[Bindings] =
var b: Bindings
if match(b, p, v):
result = some b
proc instantiate(t: Template; bindings: Bindings): Value =
case t.orKind
of TemplateKind.Tattenuate:
let v = instantiate(t.tattenuate.template, bindings)
let cap = v.unembed(Cap)
if cap.isNone:
raise newException(ValueError, "Attempt to attenuate non-capability")
result = attenuate(get cap, t.tattenuate.attenuation).embed
of TemplateKind.TRef:
let n = $t.tref.binding.int
try: result = bindings[n.toPreserves]
except KeyError:
raise newException(ValueError, "unbound reference: " & n)
of TemplateKind.Lit:
result = t.lit.value
of TemplateKind.Tcompound:
case t.tcompound.orKind
of TCompoundKind.rec:
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
for i, tt in t.tcompound.rec.fields:
result[i] = instantiate(tt, bindings)
of TCompoundKind.arr:
result = initSequence(t.tcompound.arr.items.len)
for i, tt in t.tcompound.arr.items:
result[i] = instantiate(tt, bindings)
of TCompoundKind.dict:
result = initDictionary()
for key, tt in t.tcompound.dict.entries:
result[key] = instantiate(tt, bindings)
proc rewrite(r: Rewrite; v: Value): Value =
let bindings = match(r.pattern, v)
if bindings.isSome:
result = instantiate(r.template, get bindings)
proc examineAlternatives(cav: Caveat; v: Value): Value =
case cav.orKind
of CaveatKind.Rewrite:
result = rewrite(cav.rewrite, v)
of CaveatKind.Alts:
for r in cav.alts.alternatives:
result = rewrite(r, v)
if not result.isFalse: break
of CaveatKind.Reject: discard
of CaveatKind.unknown: discard
proc runRewrites*(a: Attenuation; v: Value): Value =
result = v
for stage in a:
result = examineAlternatives(stage, result)
if result.isFalse: break
proc publish(target: Entity; e: OutboundAssertion; a: AssertionRef) {.turnAction.} =
e.established = true
publish(target, activeTurn(), a, e.handle)
proc publish(turn: Turn; r: Cap; v: Value; h: Handle) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
let e = OutboundAssertion(
handle: h, peer: r, established: false)
turn.facet.outbound[h] = e
enqueue(turn, r.relay, whelp publish(r.target, e, AssertionRef(value: a)))
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
act.enqueue.event.target.facet = turn.facet.id.toPreserves
act.enqueue.event.target.oid = r.target.oid.toPreserves
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert.assertion.value.value =
mapEmbeds(v) do (cap: Value) -> Value: discard
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
proc publish*(turn: Turn; r: Cap; a: Value): Handle =
result = turn.facet.nextHandle()
publish(turn, r, a, result)
proc publish*(r: Cap; a: Value): Handle {.turnAction.} =
publish(activeTurn(), r, a, result)
proc retract(e: OutboundAssertion) {.turnAction.} =
if e.established:
e.established = false
e.peer.target.retract(activeTurn(), e.handle)
proc retract(turn: Turn; e: OutboundAssertion) =
enqueue(turn, e.peer.relay, whelp retract(e))
proc retract*(turn: Turn; h: Handle) =
var e: OutboundAssertion
if turn.facet.outbound.pop(h, e):
turn.retract(e)
proc message(target: Entity; a: AssertionRef) {.turnAction.} =
target.message(activeTurn(), a)
proc message*(turn: Turn; r: Cap; v: Value) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
enqueue(turn, r.relay, whelp message(r.target, AssertionRef(value: a)))
proc message*(target: Cap; value: Value) {.turnAction.} =
message(activeTurn(), target, value)
proc sync(e: Entity; peer: Cap) {.turnAction.} =
e.sync(activeTurn(), peer)
proc sync(turn: Turn; e: Entity; peer: Cap) =
e.sync(turn, peer)
proc sync*(turn: Turn; r, peer: Cap) =
enqueue(turn, r.relay, whelp sync(r.target, peer))
proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle =
result = publish(turn, cap, v)
if h != default(Handle):
retract(turn, h)
proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
var old = h
h = publish(turn, cap, v)
if old != default(Handle):
retract(turn, old)
h
proc stop*(turn: Turn)
proc run*(facet; action: TurnAction; zombieTurn = false)
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
result = Facet(
id: getMonoTime().ticks.FacetId,
actor: actor,
parent: parent,
outbound: initialAssertions,
isAlive: true)
if not parent.isNil: parent.children.incl result
proc newFacet(actor; parent: Facet): Facet =
var initialAssertions: OutboundTable
newFacet(actor, parent, initialAssertions)
proc isInert(facet): bool =
result = facet.children.len == 0 and
(facet.outbound.len == 0 or facet.parent.isNil) and
facet.inertCheckPreventers == 0
proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
var armed = true
inc facet.inertCheckPreventers
proc disarm() =
if armed:
armed = false
dec facet.inertCheckPreventers
result = disarm
proc inFacet(turn: Turn; facet; act: TurnAction) =
## Call an action with a facet using a temporary `Turn`
## that shares the `Queues` of the calling `Turn`.
var t = Turn(facet: facet, queues: turn.queues)
act(t)
proc terminate(actor; turn; reason: ref Exception)
proc terminate(facet; turn: Turn; orderly: bool) =
if facet.isAlive:
facet.isAlive = false
let parent = facet.parent
if not parent.isNil:
parent.children.excl facet
block:
var turn = Turn(facet: facet, queues: turn.queues)
while facet.children.len > 0:
facet.children.pop.terminate(turn, orderly)
if orderly:
for act in facet.shutdownActions:
act(turn)
for a in facet.outbound.values: turn.retract(a)
if orderly:
if not parent.isNil:
if parent.isInert:
parent.terminate(turn, true)
else:
terminate(facet.actor, turn, nil)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
turn.desc.actions.add act
proc stopIfInert() {.turnAction.} =
let turn = activeTurn()
if (not turn.facet.parent.isNil and (not turn.facet.parent.isAlive)) or turn.facet.isInert:
stop(turn)
proc stopIfInertAfter(action: TurnAction): TurnAction =
proc wrapper(turn: Turn) =
action(turn)
enqueue(turn, turn.facet, whelp stopIfInert())
wrapper
proc newFacet*(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet)
proc inFacet*(turn: Turn; bootProc: TurnAction): Facet =
result = newFacet(turn)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
act.facetstart.path.add result.path
turn.desc.actions.add act
inFacet(turn, result, stopIfInertAfter(bootProc))
proc facet*(turn: Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc run(actor; bootProc: TurnAction) =
var initialAssertions: OutboundTable
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc newActor(name: string; handleAlloc: ref Handle): Actor =
let
now = getTime()
seed = now.toUnix * 1_000_000_000 + now.nanosecond
result = Actor(
name: name,
id: ActorId(seed),
handleAllocator: handleAlloc,
)
result.root = newFacet(result, nil)
when traceSyndicate:
new result.turnIdAllocator
proc newActor*(name: string): Actor =
newActor(name, new(ref Handle))
proc bootActor*(name: string; bootProc: TurnAction) =
var
initialAssertions: OutboundTable
actor = newActor(name)
when traceSyndicate:
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
case path
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
of "-": actor.traceStream = newFileStream(stderr)
else: actor.traceStream = openFileStream(path, fmWrite)
when traceSyndicate:
var act = ActorActivation(orKind: ActorActivationKind.start)
act.start.actorName = Name(orKind: NameKind.named)
act.start.actorName.named.name = name.toPreserves
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
item: act)
actor.traceStream.writeLine($entry.toPreserves)
let turn = newTurn(actor, TurnCauseExternal(description: "top-level actor"))
run(actor, bootProc, initialAssertions)
proc bootActor*(name: string; cont: Cont) =
bootActor(name) do (turn: Turn):
enqueue(turn, turn.facet, cont)
proc spawnActor(actor: Actor; bootProc: TurnAction; initialAssertions: HashSet[Handle]) {.turnAction.} =
let turn = activeTurn()
var newOutBound: Table[Handle, OutboundAssertion]
for key in initialAssertions:
discard turn.facet.outbound.pop(key, newOutbound[key])
when traceSyndicate:
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
actor.traceStream = turn.facet.actor.traceStream
var act = ActionDescription(orKind: ActionDescriptionKind.spawn)
act.spawn.id = actor.id.toPreserves
turn.desc.actions.add act
run(actor, bootProc, newOutBound)
proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
let actor = newActor(name, turn.facet.actor.handleAllocator)
enqueue(turn, turn.facet, whelp spawnActor(actor, bootProc, initialAssertions))
actor
proc newInertCap*(): Cap =
let a = newActor("inert")
run(a) do (turn: Turn): turn.stop()
Cap(relay: a.root)
proc atExit*(actor; action) = actor.exitHooks.add action
proc terminate(actor: Actor; orderly: bool) {.turnAction.} =
actor.root.terminate(activeTurn(), orderly)
actor.exited = true
proc terminate(actor; turn; reason: ref Exception) =
if not actor.exiting:
actor.exiting = true
actor.exitReason = reason
when traceSyndicate:
var act = ActorActivation(orKind: ActorActivationKind.stop)
if not reason.isNil:
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
act.stop.status.error.message = reason.msg
trace(actor, act)
for hook in actor.exitHooks: hook(turn)
enqueue(turn, actor.root, whelp terminate(actor, reason.isNil))
proc terminate*(facet; e: ref Exception) =
run(facet.actor.root) do (turn: Turn):
facet.actor.terminate(turn, e)
#[
proc asyncCheck*(facet: Facet; fut: FutureBase) =
## Sets a callback on `fut` which propagates exceptions to `facet`.
addCallback(fut) do ():
if fut.failed: terminate(facet, fut.error)
proc asyncCheck*(turn; fut: FutureBase) =
## Sets a callback on `fut` which propagates exceptions to the facet of `turn`.
asyncCheck(turn.facet, fut)
]#
template tryFacet(facet; body: untyped) =
try: body
except CatchableError as err: terminate(facet, err)
proc run(facet: Facet; turn: Turn; deq: var Deque[Cont]): int =
## Return the number of continuations processed.
while deq.len > 0:
var c = deq.popFirst()
try:
while not c.isNil and not c.fn.isNil:
c.turn = turn
var y = c.fn
var x = y(c)
inc(result)
c = Cont(x)
except CatchableError as err:
if not c.dismissed:
writeStackFrames c
terminate(facet, err)
stderr.writeLine("ran ", result, " continuations for ", facet)
proc run*(facet; action: TurnAction; zombieTurn = false) =
if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
tryFacet(facet):
var queues = newTable[Facet, Deque[Cont]]()
var turn = Turn(facet: facet, queues: queues)
action(turn)
when traceSyndicate:
turn.desc.id = facet.nextTurnId.toPreserves
facet.actor.trace ActorActivation(
orKind: ActorActivationKind.turn, turn: turn.desc)
assert not turn.isNil
var n = 1
while n > 0:
n = 0
var facets = queues.keys.toSeq
for facet in facets:
n.inc run(facet, turn, queues[facet])
proc run*(cap: Cap; action: TurnAction) =
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
run(cap.relay, action)
#[
proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
## Add a callback to a `Future` that will be called at a later `Turn`
## within the context of `facet`.
addCallback(fut) do ():
if fut.failed: terminate(facet, fut.error)
else:
when traceSyndicate:
run(facet) do (turn: Turn):
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserves
act(turn)
else:
run(facet, act)
proc addCallback*(fut: FutureBase; turn: Turn; act: TurnAction) =
## Add a callback to a `Future` that will be called at a later `Turn`
## with the same context as the current.
if fut.failed:
terminate(turn.facet, fut.error)
elif fut.finished:
enqueue(turn, turn.facet, act)
else:
addCallback(fut, turn.facet, act)
proc addCallback*[T](fut: Future[T]; turn: Turn; act: proc (t: Turn, x: T) {.gcsafe.}) =
addCallback(fut, turn) do (turn: Turn):
if fut.failed: terminate(turn.facet, fut.error)
else:
when traceSyndicate:
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserves
act(turn, read fut)
]#
proc stop*(turn: Turn, facet: Facet) =
if facet.parent.isNil:
facet.terminate(turn, true)
else:
enqueue(turn, facet.parent, whelp terminate(facet.actor, true))
# TODO: terminate the actor?
proc stop*(turn: Turn) =
stop(turn, turn.facet)
proc onStop*(facet: Facet; act: TurnAction) =
## Add a `proc (turn: Turn)` action to `facet` to be called as it stops.
add(facet.shutdownActions, act)
proc stopActor*(turn: Turn) =
let actor = turn.facet.actor
enqueue(turn, actor.root, whelp terminate(actor, true))
proc freshen*(turn: Turn, act: TurnAction) =
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
run(turn.facet, act)
proc newCap*(relay: Facet; e: Entity): Cap =
Cap(relay: relay, target: e)
proc newCap*(turn; e: Entity): Cap =
Cap(relay: turn.facet, target: e)
proc newCap*(e: Entity; turn): Cap =
Cap(relay: turn.facet, target: e)
type SyncContinuation {.final.} = ref object of Entity
action: TurnAction
method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) =
entity.action(turn)
proc sync*(turn: Turn; refer: Cap; act: TurnAction) =
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
proc running*(actor): bool =
result = not actor.exited
if not (result or actor.exitReason.isNil):
raise actor.exitReason
proc newCap*(e: Entity): Cap {.turnAction.} =
Cap(relay: activeTurn().facet, target: e)

View File

@ -41,7 +41,7 @@ type
when traceSyndicate:
id: FacetId
Turn* = ref object
Turn = ref object
## https://synit.org/book/glossary.html#turn
facet: Facet
entity: Entity
@ -54,6 +54,10 @@ type
## https://synit.org/book/glossary.html#entity
facet*: Facet
oid*: sturdy.Oid # oid is how Entities are identified over the wire
publishImpl*: PublishProc
retractImpl*: RetractProc
messageImpl*: MessageProc
syncImpl*: SyncProc
when traceSyndicate:
id: FacetId
@ -62,8 +66,9 @@ type
target*: Entity
attenuation*: seq[sturdy.Caveat]
Actor* = ref object
Actor = ref object
## https://synit.org/book/glossary.html#actor
# TODO: run on a seperate thread.
# crashHandlers: HandlerDeque
root: Facet
handleAllocator: Handle
@ -76,21 +81,11 @@ type
template syndicate*(prc: typed): untyped =
cps(Cont, prc)
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
## Return the active `Turn` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.turn
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a `{.syndicate.}` context.
assert not c.facet.isNil
c.facet
proc activeActor*(c: Cont): Actor {.cpsVoodoo.} =
## Return the active `Actor` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.facet.actor
using
actor: Actor
facet: Facet
@ -268,11 +263,6 @@ type
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
proc newCap*(f: Facet; e: Entity): Cap =
Cap(relay: f, target: e)
@ -280,8 +270,8 @@ proc nextHandle(facet: Facet): Handle =
inc(facet.actor.handleAllocator)
facet.actor.handleAllocator
proc publish*(turn: Turn; cap: Cap; val: Value): Handle =
result = turn.facet.nextHandle()
proc publish*(cap: Cap; val: Value): Handle =
result = cap.relay.nextHandle()
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
@ -295,8 +285,9 @@ proc publish*(turn: Turn; cap: Cap; val: Value): Handle =
)
act.enqueue.event.detail.assert.assertion.value.value = val
turn.desc.actions.add act
cap.target.publish(val, result)
proc retract*(turn; h: Handle) =
proc retract*(cap: Cap; h: Handle) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
@ -305,8 +296,9 @@ proc retract*(turn; h: Handle) =
)
act.enqueue.event.detail.retract = TurnEventRetract(handle: h)
turn.desc.actions.add act
cap.target.retract(h)
proc message*(turn; cap; val: Value) =
proc message*(cap; val: Value) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
@ -315,8 +307,9 @@ proc message*(turn; cap; val: Value) =
)
act.enqueue.event.detail.message.body.value.value = val
turn.desc.actions.add act
cap.entity.message(val)
proc sync*(turn; peer: Cap) =
proc sync*(peer: Cap) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
@ -325,18 +318,7 @@ proc sync*(turn; peer: Cap) =
)
act.enqueue.event.detail.sync.peer = peer.traceTarget
turn.desc.actions.add act
proc publish*(cap: Cap; val: Value): Handle {.syndicate.} =
publish(activeTurn(), cap, val)
proc retract*(h: Handle) {.syndicate.} =
activeTurn().retract(h)
proc message*(cap: Cap; val: Value) {.syndicate.} =
activeTurn().message(cap, val)
proc sync*(cap: Cap) {.syndicate.} =
activeTurn().sync(cap)
peer.entity.sync()
proc installStopHook*(c: Cont, facet: Facet): Cont {.cpsMagic.} =
facet.stopHandlers.add(c.fn)
@ -345,3 +327,10 @@ proc installStopHook*(c: Cont, facet: Facet): Cont {.cpsMagic.} =
proc addOnStopHandler*(c: Cont; cb: Callback): Cont {.cpsMagic.} =
c.facet.stopCallbacks.add(cb)
result = c
type FacetProc* = proc (f: Facet) {.nimcall.}
proc bootActor*(name: string; bootProc: FacetProc): Actor =
result = newActor(name)
# TODO: start a turn, trace reason is Eternal,
bootProc(result.root)

View File

@ -6,7 +6,7 @@ import preserves
import syndicate
import ../protocols/timer
from syndicate/protocols/dataspace import Observe
from ../syndicate/protocols/dataspace import Observe
export timer

File diff suppressed because one or more lines are too long

9
src/sam/observers.nim Normal file
View File

@ -0,0 +1,9 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import preserves
import ./actors, ./patterns, ./protocols/dataspace
proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Cap {.discardable.} =
result = newCap(turn, e)
publish(turn, ds, Observe(pattern: pat, observer: result))

View File

@ -6,7 +6,7 @@ from std/os import getEnv, `/`
import pkg/sys/[ioqueue, sockets]
import preserves
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
import ./syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
when defined(traceSyndicate):
when defined(posix):

443
src/sam/relays.nim.old Normal file
View File

@ -0,0 +1,443 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, options, tables]
from std/os import getEnv, `/`
import preserves
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
when defined(traceSyndicate):
when defined(posix):
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
else:
template trace(args: varargs[untyped]): untyped = echo(args)
else:
template trace(args: varargs[untyped]): untyped = discard
export `$`
type
Oid = sturdy.Oid
export Stdio, Tcp, WebSocket, Unix
type
Assertion = Value
WireRef = sturdy.WireRef
Turn = syndicate.Turn
Handle = actors.Handle
type
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
Relay* = ref object of RootObj
facet: Facet
inboundAssertions: Table[Handle,
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
outboundAssertions: Table[Handle, seq[WireSymbol]]
exported: Membrane
imported: Membrane
nextLocalOid: Oid
pendingTurn: protocol.Turn
packetWriter: PacketWriter
untrusted: bool
SyncPeerEntity = ref object of Entity
relay: Relay
peer: Cap
handleMap: Table[Handle, Handle]
e: WireSymbol
RelayEntity = ref object of Entity
## https://synit.org/book/protocol.html#relay-entities
label: string
relay: Relay
proc releaseCapOut(r: Relay; e: WireSymbol) =
r.exported.drop e
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
spe.handleMap[h] = publish(t, spe.peer, a.value)
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
var other: Handle
if se.handleMap.pop(h, other):
retract(t, other)
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
if not se.e.isNil:
se.relay.releaseCapOut(se.e)
message(t, se.peer, a.value)
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
sync(t, se.peer, peer)
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
SyncPeerEntity(relay: r, peer: p)
proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireRef =
if cap.target of RelayEntity and cap.target.RelayEntity.relay == relay and cap.attenuation.len == 0:
result = WireRef(orKind: WireRefKind.yours, yours: WireRefYours(oid: cap.target.oid))
else:
var ws = grab(relay.exported, cap)
if ws.isNil:
ws = newWireSymbol(relay.exported, relay.nextLocalOid, cap)
inc relay.nextLocalOid
exported.add ws
result = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: ws.oid))
proc rewriteOut(relay: Relay; v: Assertion):
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
var exported: seq[WireSymbol]
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let o = pr.unembed(Cap); if o.isSome:
rewriteCapOut(relay, o.get, exported).toPreserves
else: pr
result.exported = exported
proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] =
result = rewriteOut(relay, v)
relay.outboundAssertions[h] = result.exported
proc deregister(relay: Relay; h: Handle) =
var outbound: seq[WireSymbol]
if relay.outboundAssertions.pop(h, outbound):
for e in outbound: releaseCapOut(relay, e)
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
if r.pendingTurn.len == 0:
# If the pending queue is empty then schedule a packet
# to be sent after pending I/O is processed.
callSoon do ():
r.facet.run do (turn: var Turn):
var pkt = Packet(
orKind: PacketKind.Turn,
turn: move r.pendingTurn)
trace "C: ", pkt
assert(not r.packetWriter.isNil, "missing packetWriter proc")
r.packetWriter(turn, encode pkt)
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
send(re.relay, turn, protocol.Oid re.oid, ev)
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
re.send(t, Event(
orKind: EventKind.Assert,
`assert`: protocol.Assert(
assertion: re.relay.register(a.value, h).rewritten,
handle: h)))
method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} =
re.relay.deregister h
re.send(t, Event(
orKind: EventKind.Retract,
retract: Retract(handle: h)))
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} =
var (value, exported) = rewriteOut(re.relay, msg.value)
assert(len(exported) == 0, "cannot send a reference in a message")
if len(exported) == 0:
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} =
var
peerEntity = newSyncPeerEntity(re.relay, peer)
exported: seq[WireSymbol]
wr = rewriteCapOut(re.relay, turn.newCap(peerEntity), exported)
peerEntity.e = exported[0]
var ev = Event(orKind: EventKind.Sync)
ev.sync.peer = wr.toPreserves.embed
re.send(turn, ev)
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
RelayEntity(label: label, relay: r, oid: o)
using
relay: Relay
facet: Facet
proc lookupLocal(relay; oid: Oid): Cap =
let sym = relay.exported.grab oid
if sym.isNil: newInertCap()
else: sym.cap
proc isInert(r: Cap): bool =
r.target.isNil
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
case n.orKind
of WireRefKind.mine:
var e = relay.imported.grab(n.mine.oid)
if e.isNil:
e = newWireSymbol(
relay.imported,
n.mine.oid,
newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)),
)
imported.add e
result = e.cap
of WireRefKind.yours:
let r = relay.lookupLocal(n.yours.oid)
if n.yours.attenuation.len == 0 or r.isInert: result = r
else: raiseAssert "attenuation not implemented"
proc rewriteIn(relay; facet; v: Value):
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
var imported: seq[WireSymbol]
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let wr = pr.preservesTo WireRef; if wr.isSome:
result = rewriteCapIn(relay, facet, wr.get, imported).embed
else:
result = pr
result.imported = imported
proc close(r: Relay) = discard
proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
relay.inboundAssertions[event.assert.handle] = (publish(turn, cap, a), imported,)
of EventKind.Retract:
let remoteHandle = event.retract.handle
var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]]
if relay.inboundAssertions.pop(remoteHandle, outbound):
for e in outbound.imported: relay.imported.drop e
turn.retract(outbound.localHandle)
of EventKind.Message:
let (a, imported) = rewriteIn(relay, turn.facet, event.message.body)
assert imported.len == 0, "Cannot receive transient reference"
turn.message(cap, a)
of EventKind.Sync:
discard # TODO
#[
var imported: seq[WireSymbol]
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
turn.sync(cap) do (turn: var Turn):
turn.message(k, true)
for e in imported: relay.imported.del e
]#
proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
trace "S: ", v
run(relay.facet) do (t: var Turn):
var pkt: Packet
if pkt.fromPreserves(v):
case pkt.orKind
of PacketKind.Turn:
# https://synit.org/book/protocol.html#turn-packets
for te in pkt.turn:
let r = lookupLocal(relay, te.oid.Oid)
if not r.isInert:
dispatch(relay, t, r, te.event)
else:
stderr.writeLine("discarding event for unknown Cap; ", te.event)
of PacketKind.Error:
# https://synit.org/book/protocol.html#error-packets
when defined(posix):
stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
close relay
of PacketKind.Extension:
# https://synit.org/book/protocol.html#extension-packets
discard
else:
when defined(posix):
stderr.writeLine("discarding undecoded packet ", v)
type
RelayOptions* = object of RootObj
packetWriter*: PacketWriter
untrusted*: bool
RelayActorOptions* = object of RelayOptions
initialOid*: Option[Oid]
initialCap*: Cap
nextLocalOid*: Option[Oid]
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
result = Relay(
facet: turn.facet,
packetWriter: opts.packetWriter,
untrusted: opts.untrusted)
discard result.facet.preventInertCheck()
setup(turn, result)
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection =
result.`addr` = addrAss
result.resolved = Resolved(orKind: ResolvedKind.accepted)
result.resolved.accepted.responderSession = ds
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
discard spawn(name, turn) do (turn: var Turn):
let relay = newRelay(turn, opts, setup)
if not opts.initialCap.isNil:
var exported: seq[WireSymbol]
discard rewriteCapOut(relay, opts.initialCap, exported)
opts.nextLocalOid.map do (oid: Oid):
relay.nextLocalOid =
if oid == 0.Oid: 1.Oid
else: oid
if opts.initialOid.isSome:
var
imported: seq[WireSymbol]
wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
res = rewriteCapIn(relay, turn.facet, wr, imported)
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
else:
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
when defined(posix):
import std/asyncnet
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
type ShutdownEntity* = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
export Tcp
when defined(posix):
export Unix
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Value) =
## Relay a dataspace over an open `AsyncSocket`.
proc socketWriter(packet: sink Packet): Future[void] =
socket.send(cast[string](encode(packet)))
const recvSize = 0x2000
var shutdownCap: Cap
let
reenable = turn.facet.preventInertCheck()
connectionClosedCap = newCap(turn, ShutdownEntity())
discard bootActor("socket") do (turn: var Turn):
var ops = RelayActorOptions(
packetWriter: socketWriter,
initialOid: 0.Oid.some)
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet
var wireBuf = newBufferedDecoder(0)
turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedCap, true)
shutdownCap = newCap(turn, ShutdownEntity())
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value):
raise newException(IOError, $detail)
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap):
run(gatekeeper.relay) do (turn: var Turn):
reenable()
discard publish(turn, shutdownCap, true)
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
let facet = inFacet(turn) do (turn: var Turn):
let o = ass.preservesTo Resolved; if o.isSome:
discard publish(turn, ds, ResolvePath(
route: route, `addr`: addrAss, resolved: o.get))
proc action(turn: var Turn) =
stop(turn, facet)
result = action
var resolve = Resolve(
step: step,
observer: newCap(turn, during(duringCallback)),
)
discard publish(turn, gatekeeper, resolve)
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Value) =
## Relay a dataspace over TCP.
let socket = newAsyncSocket(
domain = AF_INET,
sockType = SOCK_STREAM,
protocol = IPPROTO_TCP,
buffered = false)
let fut = connect(socket, transport.host, Port transport.port)
addCallback(fut, turn) do (turn: var Turn):
connect(turn, ds, route, transport.toPreserves, socket, step)
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Value) =
## Relay a dataspace over a UNIX socket.
let socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
let fut = connectUnix(socket, transport.path)
addCallback(fut, turn) do (turn: var Turn):
connect(turn, ds, route, transport.toPreserves, socket, step)
import std/asyncfile
const stdinReadSize = 128
proc connectStdio*(turn: var Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout.
proc stdoutWriter(packet: sink Packet): Future[void] =
result = newFuture[void]()
var buf = encode(packet)
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
flushFile(stdout)
complete result
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialCap: ds,
initialOid: 0.Oid.some)
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin") # this is universal now?
close(stdin)
facet.actor.atExit do (turn: var Turn):
close(asyncStdin)
var wireBuf = newBufferedDecoder(0)
proc readCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
proc envRoute*: Route =
var text = getEnv("SYNDICATE_ROUTE")
if text == "":
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
result.transports = @[initRecord("unix", tx)]
result.pathSteps = @[capabilities.mint().toPreserves]
else:
var pr = parsePreserves(text)
if not result.fromPreserves(pr):
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
var
unix: Unix
tcp: Tcp
stdio: Stdio
doAssert(route.transports.len == 1, "only a single transport supported for routes")
doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes")
if unix.fromPreserves route.transports[0]:
connect(turn, ds, route, unix, route.pathSteps[0])
elif tcp.fromPreserves route.transports[0]:
connect(turn, ds, route, tcp, route.pathSteps[0])
elif stdio.fromPreserves route.transports[0]:
connectStdio(turn, ds)
bootProc(turn, ds)
else:
raise newException(ValueError, "unsupported route")
during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap):
bootProc(turn, dest)

View File

@ -10,9 +10,9 @@ import pkg/cps
import preserves
export preserves
import ./syndicate/[actors, dataspaces, patterns]
import ./[actors, dataspaces, patterns]
# durings
import ./syndicate/protocols/dataspace
import ./protocols/dataspace
export actors, dataspace, dataspaces, patterns
@ -220,3 +220,6 @@ macro onStop*(body: untyped) =
`body`
return
]#
proc runActor*(name: string; bootProc: FacetProc) =
let actor = spawnActor(name, bootProc)

19
src/sam/tracing.nim Normal file
View File

@ -0,0 +1,19 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import ./protocols/[protocol, trace]
export trace
proc traceAction*(e: protocol.Event): trace.TurnEvent =
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: result,
)
act.enqueue.event.detail.assert.assertion.value.value = val
turn.desc.actions.add act

View File

@ -23,17 +23,17 @@ proc readStdin(facet: Facet; ds: Cap; username: string) =
readLine()
readLine()
proc chat(turn: var Turn; ds: Cap; username: string) =
during(turn, ds, ?:Present) do (who: string):
proc chat(facet: Facet; ds: Cap; username: string) =
during(facet, ds, ?:Present) do (who: string):
echo who, " joined"
do:
echo who, " left"
onMessage(turn, ds, ?:Says) do (who: string, what: string):
onMessage(facet, ds, ?:Says) do (who: string, what: string):
echo who, ": ", what
discard publish(turn, ds, Present(username: username))
readStdin(turn.facet, ds, username)
discard publish(facet, ds, Present(username: username))
readStdin(facet, ds, username)
proc main =
let route = envRoute()
@ -48,9 +48,10 @@ proc main =
if username == "":
stderr.writeLine "--user: unspecified"
else:
runActor("chat") do (turn: var Turn; root: Cap):
spawnRelays(turn, root)
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
chat(turn, ds, username)
runActor("chat") do (root: Facet):
let ds = facet.newDataspace()
facet.spawnRelays(ds)
resolve(facet, ds, route) do (remote: Cap):
chat(facet, remote, username)
main()

View File

@ -9,22 +9,23 @@ import syndicate
proc now: float64 = getTime().toUnixFloat()
proc main() {.syndicate.} =
let ds = newDataspace()
let h = publish(ds, "hello world!".toPreserves)
runActor("timer-test") do (root: Facet):
#onMessage(ds, grab()) do (v: Value):
# stderr.writeLine "observed message ", v
let ds = facet.newDataspace()
let h = facet.publish(ds, "hello world!".toPreserves)
message(ds, "hello world!".toPreserves)
retract(h)
sync(ds)
facet.onMessage(ds, grab()) do (v: Value):
stderr.writeLine "observed message ", v
onStop:
facet.message(ds, "hello world!".toPreserves)
facet.retract(h)
facet.sync(ds)
facet.onStop:
echo "anonymous stop handler was invoked"
echo "stopping actor"
stopActor()
facet.stopActor()
echo "actor stopped but still executing?"
#[
@ -38,5 +39,3 @@ proc main() {.syndicate.} =
stderr.writeLine "slept one second thrice"
stopActor()
]#
bootActor("main", whelp main())