Compare commits
2 Commits
fe268dea8d
...
71af7b0d72
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | 71af7b0d72 | |
Emery Hemingway | b2506d81bb |
|
@ -38,11 +38,11 @@
|
|||
"packages": [
|
||||
"nimcrypto"
|
||||
],
|
||||
"path": "/nix/store/7b491gv9zlayilsh8k2gnyzw6znrh7xq-source",
|
||||
"rev": "70151aa132f3a771996117c23c1fcaa8446a6f35",
|
||||
"sha256": "1ldjz02p70wagqvk6vgcg16kjh7pkm1394qd1pcdmg8z39bm5ag3",
|
||||
"path": "/nix/store/jwz8pqbv6rsm8w4fjzdb37r0wzjn5hv0-source",
|
||||
"rev": "d58da671799c69c0b3208b96c154e13c8b1a9e90",
|
||||
"sha256": "12dm0gsy10ppga7zf7hpf4adaqjrd9b740n2w926xyazq1njf6k9",
|
||||
"srcDir": "",
|
||||
"url": "https://github.com/cheatfate/nimcrypto/archive/70151aa132f3a771996117c23c1fcaa8446a6f35.tar.gz"
|
||||
"url": "https://github.com/cheatfate/nimcrypto/archive/d58da671799c69c0b3208b96c154e13c8b1a9e90.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
|
|
|
@ -1,2 +0,0 @@
|
|||
include_rules
|
||||
: foreach *.nim |> !nim_check |>
|
|
@ -0,0 +1,699 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[deques, hashes, monotimes, options, sets, sequtils, tables, times]
|
||||
import pkg/cps
|
||||
import preserves
|
||||
import ../syndicate/protocols/[protocol, sturdy]
|
||||
|
||||
export cps
|
||||
|
||||
const traceSyndicate {.booldefine.}: bool = true
|
||||
|
||||
when traceSyndicate:
|
||||
import std/streams
|
||||
from std/os import getEnv
|
||||
import ./protocols/trace
|
||||
|
||||
type TraceSink = ref object
|
||||
stream: FileStream
|
||||
|
||||
proc newTraceSink: TraceSink =
|
||||
new result
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE", "")
|
||||
case path
|
||||
of "": quit"$SYNDICATE_TRACE_FILE unset"
|
||||
of "-": actor.stream = newFileStream(stderr)
|
||||
else: result.stream = openFileStream(path, fmWrite)
|
||||
|
||||
proc write(s: TraceSink; e: TraceEntry) = s.write(e.toPreserves)
|
||||
|
||||
export Handle
|
||||
|
||||
template generateIdType(typ: untyped) =
|
||||
type typ* = distinct Natural
|
||||
proc `==`*(x, y: typ): bool {.borrow.}
|
||||
proc `$`*(id: typ): string {.borrow.}
|
||||
|
||||
generateIdType(ActorId)
|
||||
generateIdType(FacetId)
|
||||
generateIdType(EndpointId)
|
||||
generateIdType(FieldId)
|
||||
generateIdType(TurnId)
|
||||
|
||||
type
|
||||
Oid = sturdy.Oid
|
||||
Caveat = sturdy.Caveat
|
||||
Attenuation = seq[Caveat]
|
||||
Rewrite = sturdy.Rewrite
|
||||
|
||||
AssertionRef* = ref object
|
||||
value*: Value
|
||||
# if the Enity methods take a Value object then the generated
|
||||
# C code has "redefinition of struct" problems when orc is enabled
|
||||
|
||||
Entity* = ref object of RootObj
|
||||
oid*: Oid # oid is how Entities are identified over the wire
|
||||
|
||||
Cap* {.preservesEmbedded.} = ref object of EmbeddedObj
|
||||
relay*: Facet
|
||||
target*: Entity
|
||||
attenuation*: Attenuation
|
||||
|
||||
Ref* {.deprecated: "Ref was renamed to Cap".} = Cap
|
||||
|
||||
OutboundAssertion = ref object
|
||||
handle: Handle
|
||||
peer: Cap
|
||||
established: bool
|
||||
OutboundTable = Table[Handle, OutboundAssertion]
|
||||
|
||||
Actor* = ref object
|
||||
name: string
|
||||
handleAllocator: ref Handle
|
||||
# a fresh actor gets a new ref Handle and
|
||||
# all actors spawned from it get the same ref.
|
||||
root: Facet
|
||||
exitReason: ref Exception
|
||||
exitHooks: seq[TurnAction]
|
||||
id: ActorId
|
||||
exiting, exited: bool
|
||||
when traceSyndicate:
|
||||
turnIdAllocator: ref TurnId
|
||||
traceStream: FileStream
|
||||
|
||||
TurnAction* = proc (t: Turn)
|
||||
|
||||
Queues = TableRef[Facet, Deque[Cont]]
|
||||
|
||||
Turn* = ref object
|
||||
facet: Facet
|
||||
queues: Queues
|
||||
when traceSyndicate:
|
||||
desc: TurnDescription
|
||||
|
||||
Cont* = ref object of Continuation
|
||||
turn*: Turn
|
||||
|
||||
Facet* = ref FacetObj
|
||||
FacetObj = object
|
||||
actor*: Actor
|
||||
parent: Facet
|
||||
children: HashSet[Facet]
|
||||
outbound: OutboundTable
|
||||
shutdownActions: seq[TurnAction]
|
||||
inertCheckPreventers: int
|
||||
id: FacetId
|
||||
isAlive: bool
|
||||
|
||||
proc pass*(a, b: Cont): Cont =
|
||||
assert not a.turn.isNil
|
||||
b.turn = move a.turn
|
||||
return b
|
||||
|
||||
template turnAction*(prc: typed): untyped =
|
||||
cps(Cont, prc)
|
||||
|
||||
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
|
||||
assert not c.turn.isNil
|
||||
c.turn
|
||||
|
||||
when traceSyndicate:
|
||||
|
||||
proc nextTurnId(facet: Facet): TurnId =
|
||||
result = succ(facet.actor.turnIdAllocator[])
|
||||
facet.actor.turnIdAllocator[] = result
|
||||
|
||||
proc trace(actor: Actor; act: ActorActivation) =
|
||||
assert not actor.traceStream.isNil
|
||||
var entry = TraceEntry(
|
||||
timestamp: getTime().toUnixFloat(),
|
||||
actor: initRecord("named", actor.name.toPreserves),
|
||||
item: act)
|
||||
actor.traceStream.writeLine($entry.toPreserves)
|
||||
|
||||
proc path(facet: Facet): seq[trace.FacetId] =
|
||||
var f = facet
|
||||
while not f.isNil:
|
||||
result.add f.id.toPreserves
|
||||
f = f.parent
|
||||
|
||||
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
||||
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
|
||||
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
|
||||
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
|
||||
|
||||
using
|
||||
actor: Actor
|
||||
facet: Facet
|
||||
turn: Turn
|
||||
action: TurnAction
|
||||
|
||||
proc labels(f: Facet): string =
|
||||
proc catLabels(f: Facet; labels: var string) =
|
||||
labels.add ':'
|
||||
if not f.parent.isNil:
|
||||
catLabels(f.parent, labels)
|
||||
labels.add ':'
|
||||
when traceSyndicate:
|
||||
labels.add $f.id
|
||||
result.add f.actor.name
|
||||
catLabels(f, result)
|
||||
|
||||
proc `$`*(f: Facet): string =
|
||||
"<Facet:" & f.labels & ">"
|
||||
|
||||
proc `$`*(r: Cap): string =
|
||||
"<Ref:" & r.relay.labels & ">"
|
||||
|
||||
proc `$`*(actor: Actor): string =
|
||||
"<Actor:" & actor.name & ">" # TODO: ambigous
|
||||
|
||||
proc attenuate(r: Cap; a: Attenuation): Cap =
|
||||
if a.len == 0: result = r
|
||||
else: result = Cap(
|
||||
relay: r.relay,
|
||||
target: r.target,
|
||||
attenuation: a & r.attenuation)
|
||||
|
||||
proc hash*(facet): Hash =
|
||||
facet.id.hash
|
||||
|
||||
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
|
||||
|
||||
proc nextHandle(facet: Facet): Handle =
|
||||
result = succ(facet.actor.handleAllocator[])
|
||||
facet.actor.handleAllocator[] = result
|
||||
|
||||
proc facet*(turn: Turn): Facet = turn.facet
|
||||
|
||||
proc enqueue(turn: Turn; target: Facet; cont: Cont) =
|
||||
cont.turn = turn
|
||||
if target in turn.queues:
|
||||
turn.queues[target].addLast cont
|
||||
else:
|
||||
turn.queues[target] = toDeque([cont])
|
||||
|
||||
type Bindings = Table[Value, Value]
|
||||
|
||||
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
|
||||
case p.orKind
|
||||
of PatternKind.Pdiscard: result = true
|
||||
of PatternKind.Patom:
|
||||
result = case p.patom
|
||||
of PAtom.Boolean: v.isBoolean
|
||||
of PAtom.Double: v.isDouble
|
||||
of PAtom.Signedinteger: v.isInteger
|
||||
of PAtom.String: v.isString
|
||||
of PAtom.Bytestring: v.isByteString
|
||||
of PAtom.Symbol: v.isSymbol
|
||||
of PatternKind.Pembedded:
|
||||
result = v.isEmbedded
|
||||
of PatternKind.Pbind:
|
||||
if match(bindings, p.pbind.pattern, v):
|
||||
bindings[p.pbind.pattern.toPreserves] = v
|
||||
result = true
|
||||
of PatternKind.Pand:
|
||||
for pp in p.pand.patterns:
|
||||
result = match(bindings, pp, v)
|
||||
if not result: break
|
||||
of PatternKind.Pnot:
|
||||
var b: Bindings
|
||||
result = not match(b, p.pnot.pattern, v)
|
||||
of PatternKind.Lit:
|
||||
result = p.lit.value == v
|
||||
of PatternKind.PCompound:
|
||||
case p.pcompound.orKind
|
||||
of PCompoundKind.rec:
|
||||
if v.isRecord and
|
||||
p.pcompound.rec.label == v.label and
|
||||
p.pcompound.rec.fields.len == v.arity:
|
||||
result = true
|
||||
for i, pp in p.pcompound.rec.fields:
|
||||
if not match(bindings, pp, v[i]):
|
||||
result = false
|
||||
break
|
||||
of PCompoundKind.arr:
|
||||
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
|
||||
result = true
|
||||
for i, pp in p.pcompound.arr.items:
|
||||
if not match(bindings, pp, v[i]):
|
||||
result = false
|
||||
break
|
||||
of PCompoundKind.dict:
|
||||
if v.isDictionary:
|
||||
result = true
|
||||
for key, pp in p.pcompound.dict.entries:
|
||||
let vv = step(v, key)
|
||||
if vv.isNone or not match(bindings, pp, get vv):
|
||||
result = true
|
||||
break
|
||||
|
||||
proc match(p: Pattern; v: Value): Option[Bindings] =
|
||||
var b: Bindings
|
||||
if match(b, p, v):
|
||||
result = some b
|
||||
|
||||
proc instantiate(t: Template; bindings: Bindings): Value =
|
||||
case t.orKind
|
||||
of TemplateKind.Tattenuate:
|
||||
let v = instantiate(t.tattenuate.template, bindings)
|
||||
let cap = v.unembed(Cap)
|
||||
if cap.isNone:
|
||||
raise newException(ValueError, "Attempt to attenuate non-capability")
|
||||
result = attenuate(get cap, t.tattenuate.attenuation).embed
|
||||
of TemplateKind.TRef:
|
||||
let n = $t.tref.binding.int
|
||||
try: result = bindings[n.toPreserves]
|
||||
except KeyError:
|
||||
raise newException(ValueError, "unbound reference: " & n)
|
||||
of TemplateKind.Lit:
|
||||
result = t.lit.value
|
||||
of TemplateKind.Tcompound:
|
||||
case t.tcompound.orKind
|
||||
of TCompoundKind.rec:
|
||||
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
|
||||
for i, tt in t.tcompound.rec.fields:
|
||||
result[i] = instantiate(tt, bindings)
|
||||
of TCompoundKind.arr:
|
||||
result = initSequence(t.tcompound.arr.items.len)
|
||||
for i, tt in t.tcompound.arr.items:
|
||||
result[i] = instantiate(tt, bindings)
|
||||
of TCompoundKind.dict:
|
||||
result = initDictionary()
|
||||
for key, tt in t.tcompound.dict.entries:
|
||||
result[key] = instantiate(tt, bindings)
|
||||
|
||||
proc rewrite(r: Rewrite; v: Value): Value =
|
||||
let bindings = match(r.pattern, v)
|
||||
if bindings.isSome:
|
||||
result = instantiate(r.template, get bindings)
|
||||
|
||||
proc examineAlternatives(cav: Caveat; v: Value): Value =
|
||||
case cav.orKind
|
||||
of CaveatKind.Rewrite:
|
||||
result = rewrite(cav.rewrite, v)
|
||||
of CaveatKind.Alts:
|
||||
for r in cav.alts.alternatives:
|
||||
result = rewrite(r, v)
|
||||
if not result.isFalse: break
|
||||
of CaveatKind.Reject: discard
|
||||
of CaveatKind.unknown: discard
|
||||
|
||||
proc runRewrites*(a: Attenuation; v: Value): Value =
|
||||
result = v
|
||||
for stage in a:
|
||||
result = examineAlternatives(stage, result)
|
||||
if result.isFalse: break
|
||||
|
||||
proc publish(target: Entity; e: OutboundAssertion; a: AssertionRef) {.turnAction.} =
|
||||
e.established = true
|
||||
publish(target, activeTurn(), a, e.handle)
|
||||
|
||||
proc publish(turn: Turn; r: Cap; v: Value; h: Handle) =
|
||||
var a = runRewrites(r.attenuation, v)
|
||||
if not a.isFalse:
|
||||
let e = OutboundAssertion(
|
||||
handle: h, peer: r, established: false)
|
||||
turn.facet.outbound[h] = e
|
||||
enqueue(turn, r.relay, whelp publish(r.target, e, AssertionRef(value: a)))
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
|
||||
act.enqueue.event.target.facet = turn.facet.id.toPreserves
|
||||
act.enqueue.event.target.oid = r.target.oid.toPreserves
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
act.enqueue.event.detail.assert.assertion.value.value =
|
||||
mapEmbeds(v) do (cap: Value) -> Value: discard
|
||||
act.enqueue.event.detail.assert.handle = h
|
||||
turn.desc.actions.add act
|
||||
|
||||
proc publish*(turn: Turn; r: Cap; a: Value): Handle =
|
||||
result = turn.facet.nextHandle()
|
||||
publish(turn, r, a, result)
|
||||
|
||||
proc publish*(r: Cap; a: Value): Handle {.turnAction.} =
|
||||
publish(activeTurn(), r, a, result)
|
||||
|
||||
proc retract(e: OutboundAssertion) {.turnAction.} =
|
||||
if e.established:
|
||||
e.established = false
|
||||
e.peer.target.retract(activeTurn(), e.handle)
|
||||
|
||||
proc retract(turn: Turn; e: OutboundAssertion) =
|
||||
enqueue(turn, e.peer.relay, whelp retract(e))
|
||||
|
||||
proc retract*(turn: Turn; h: Handle) =
|
||||
var e: OutboundAssertion
|
||||
if turn.facet.outbound.pop(h, e):
|
||||
turn.retract(e)
|
||||
|
||||
proc message(target: Entity; a: AssertionRef) {.turnAction.} =
|
||||
target.message(activeTurn(), a)
|
||||
|
||||
proc message*(turn: Turn; r: Cap; v: Value) =
|
||||
var a = runRewrites(r.attenuation, v)
|
||||
if not a.isFalse:
|
||||
enqueue(turn, r.relay, whelp message(r.target, AssertionRef(value: a)))
|
||||
|
||||
proc message*(target: Cap; value: Value) {.turnAction.} =
|
||||
message(activeTurn(), target, value)
|
||||
|
||||
proc sync(e: Entity; peer: Cap) {.turnAction.} =
|
||||
e.sync(activeTurn(), peer)
|
||||
|
||||
proc sync(turn: Turn; e: Entity; peer: Cap) =
|
||||
e.sync(turn, peer)
|
||||
|
||||
proc sync*(turn: Turn; r, peer: Cap) =
|
||||
enqueue(turn, r.relay, whelp sync(r.target, peer))
|
||||
|
||||
proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle =
|
||||
result = publish(turn, cap, v)
|
||||
if h != default(Handle):
|
||||
retract(turn, h)
|
||||
|
||||
proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
|
||||
var old = h
|
||||
h = publish(turn, cap, v)
|
||||
if old != default(Handle):
|
||||
retract(turn, old)
|
||||
h
|
||||
|
||||
proc stop*(turn: Turn)
|
||||
|
||||
proc run*(facet; action: TurnAction; zombieTurn = false)
|
||||
|
||||
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
|
||||
result = Facet(
|
||||
id: getMonoTime().ticks.FacetId,
|
||||
actor: actor,
|
||||
parent: parent,
|
||||
outbound: initialAssertions,
|
||||
isAlive: true)
|
||||
if not parent.isNil: parent.children.incl result
|
||||
|
||||
proc newFacet(actor; parent: Facet): Facet =
|
||||
var initialAssertions: OutboundTable
|
||||
newFacet(actor, parent, initialAssertions)
|
||||
|
||||
proc isInert(facet): bool =
|
||||
result = facet.children.len == 0 and
|
||||
(facet.outbound.len == 0 or facet.parent.isNil) and
|
||||
facet.inertCheckPreventers == 0
|
||||
|
||||
proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
|
||||
var armed = true
|
||||
inc facet.inertCheckPreventers
|
||||
proc disarm() =
|
||||
if armed:
|
||||
armed = false
|
||||
dec facet.inertCheckPreventers
|
||||
result = disarm
|
||||
|
||||
proc inFacet(turn: Turn; facet; act: TurnAction) =
|
||||
## Call an action with a facet using a temporary `Turn`
|
||||
## that shares the `Queues` of the calling `Turn`.
|
||||
var t = Turn(facet: facet, queues: turn.queues)
|
||||
act(t)
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception)
|
||||
|
||||
proc terminate(facet; turn: Turn; orderly: bool) =
|
||||
if facet.isAlive:
|
||||
facet.isAlive = false
|
||||
let parent = facet.parent
|
||||
if not parent.isNil:
|
||||
parent.children.excl facet
|
||||
block:
|
||||
var turn = Turn(facet: facet, queues: turn.queues)
|
||||
while facet.children.len > 0:
|
||||
facet.children.pop.terminate(turn, orderly)
|
||||
if orderly:
|
||||
for act in facet.shutdownActions:
|
||||
act(turn)
|
||||
for a in facet.outbound.values: turn.retract(a)
|
||||
if orderly:
|
||||
if not parent.isNil:
|
||||
if parent.isInert:
|
||||
parent.terminate(turn, true)
|
||||
else:
|
||||
terminate(facet.actor, turn, nil)
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
turn.desc.actions.add act
|
||||
|
||||
proc stopIfInert() {.turnAction.} =
|
||||
let turn = activeTurn()
|
||||
if (not turn.facet.parent.isNil and (not turn.facet.parent.isAlive)) or turn.facet.isInert:
|
||||
stop(turn)
|
||||
|
||||
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
||||
proc wrapper(turn: Turn) =
|
||||
action(turn)
|
||||
enqueue(turn, turn.facet, whelp stopIfInert())
|
||||
wrapper
|
||||
|
||||
proc newFacet*(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
|
||||
proc inFacet*(turn: Turn; bootProc: TurnAction): Facet =
|
||||
result = newFacet(turn)
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
act.facetstart.path.add result.path
|
||||
turn.desc.actions.add act
|
||||
inFacet(turn, result, stopIfInertAfter(bootProc))
|
||||
|
||||
proc facet*(turn: Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||
|
||||
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
|
||||
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
|
||||
|
||||
proc run(actor; bootProc: TurnAction) =
|
||||
var initialAssertions: OutboundTable
|
||||
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
|
||||
|
||||
proc newActor(name: string; handleAlloc: ref Handle): Actor =
|
||||
let
|
||||
now = getTime()
|
||||
seed = now.toUnix * 1_000_000_000 + now.nanosecond
|
||||
result = Actor(
|
||||
name: name,
|
||||
id: ActorId(seed),
|
||||
handleAllocator: handleAlloc,
|
||||
)
|
||||
result.root = newFacet(result, nil)
|
||||
when traceSyndicate:
|
||||
new result.turnIdAllocator
|
||||
|
||||
proc newActor*(name: string): Actor =
|
||||
newActor(name, new(ref Handle))
|
||||
|
||||
proc bootActor*(name: string; bootProc: TurnAction) =
|
||||
var
|
||||
initialAssertions: OutboundTable
|
||||
actor = newActor(name)
|
||||
when traceSyndicate:
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
|
||||
case path
|
||||
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
|
||||
of "-": actor.traceStream = newFileStream(stderr)
|
||||
else: actor.traceStream = openFileStream(path, fmWrite)
|
||||
when traceSyndicate:
|
||||
var act = ActorActivation(orKind: ActorActivationKind.start)
|
||||
act.start.actorName = Name(orKind: NameKind.named)
|
||||
act.start.actorName.named.name = name.toPreserves
|
||||
var entry = TraceEntry(
|
||||
timestamp: getTime().toUnixFloat(),
|
||||
item: act)
|
||||
actor.traceStream.writeLine($entry.toPreserves)
|
||||
let turn = newTurn(actor, TurnCauseExternal(description: "top-level actor"))
|
||||
run(actor, bootProc, initialAssertions)
|
||||
|
||||
proc bootActor*(name: string; cont: Cont) =
|
||||
bootActor(name) do (turn: Turn):
|
||||
enqueue(turn, turn.facet, cont)
|
||||
|
||||
proc spawnActor(actor: Actor; bootProc: TurnAction; initialAssertions: HashSet[Handle]) {.turnAction.} =
|
||||
let turn = activeTurn()
|
||||
var newOutBound: Table[Handle, OutboundAssertion]
|
||||
for key in initialAssertions:
|
||||
discard turn.facet.outbound.pop(key, newOutbound[key])
|
||||
when traceSyndicate:
|
||||
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
|
||||
actor.traceStream = turn.facet.actor.traceStream
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.spawn)
|
||||
act.spawn.id = actor.id.toPreserves
|
||||
turn.desc.actions.add act
|
||||
run(actor, bootProc, newOutBound)
|
||||
|
||||
proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
||||
enqueue(turn, turn.facet, whelp spawnActor(actor, bootProc, initialAssertions))
|
||||
actor
|
||||
|
||||
proc newInertCap*(): Cap =
|
||||
let a = newActor("inert")
|
||||
run(a) do (turn: Turn): turn.stop()
|
||||
Cap(relay: a.root)
|
||||
|
||||
proc atExit*(actor; action) = actor.exitHooks.add action
|
||||
|
||||
proc terminate(actor: Actor; orderly: bool) {.turnAction.} =
|
||||
actor.root.terminate(activeTurn(), orderly)
|
||||
actor.exited = true
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception) =
|
||||
if not actor.exiting:
|
||||
actor.exiting = true
|
||||
actor.exitReason = reason
|
||||
when traceSyndicate:
|
||||
var act = ActorActivation(orKind: ActorActivationKind.stop)
|
||||
if not reason.isNil:
|
||||
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
|
||||
act.stop.status.error.message = reason.msg
|
||||
trace(actor, act)
|
||||
for hook in actor.exitHooks: hook(turn)
|
||||
enqueue(turn, actor.root, whelp terminate(actor, reason.isNil))
|
||||
|
||||
proc terminate*(facet; e: ref Exception) =
|
||||
run(facet.actor.root) do (turn: Turn):
|
||||
facet.actor.terminate(turn, e)
|
||||
|
||||
#[
|
||||
proc asyncCheck*(facet: Facet; fut: FutureBase) =
|
||||
## Sets a callback on `fut` which propagates exceptions to `facet`.
|
||||
addCallback(fut) do ():
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
|
||||
proc asyncCheck*(turn; fut: FutureBase) =
|
||||
## Sets a callback on `fut` which propagates exceptions to the facet of `turn`.
|
||||
asyncCheck(turn.facet, fut)
|
||||
]#
|
||||
|
||||
template tryFacet(facet; body: untyped) =
|
||||
try: body
|
||||
except CatchableError as err: terminate(facet, err)
|
||||
|
||||
proc run(facet: Facet; turn: Turn; deq: var Deque[Cont]): int =
|
||||
## Return the number of continuations processed.
|
||||
while deq.len > 0:
|
||||
var c = deq.popFirst()
|
||||
try:
|
||||
while not c.isNil and not c.fn.isNil:
|
||||
c.turn = turn
|
||||
var y = c.fn
|
||||
var x = y(c)
|
||||
inc(result)
|
||||
c = Cont(x)
|
||||
except CatchableError as err:
|
||||
if not c.dismissed:
|
||||
writeStackFrames c
|
||||
terminate(facet, err)
|
||||
stderr.writeLine("ran ", result, " continuations for ", facet)
|
||||
|
||||
proc run*(facet; action: TurnAction; zombieTurn = false) =
|
||||
if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
|
||||
tryFacet(facet):
|
||||
var queues = newTable[Facet, Deque[Cont]]()
|
||||
var turn = Turn(facet: facet, queues: queues)
|
||||
action(turn)
|
||||
when traceSyndicate:
|
||||
turn.desc.id = facet.nextTurnId.toPreserves
|
||||
facet.actor.trace ActorActivation(
|
||||
orKind: ActorActivationKind.turn, turn: turn.desc)
|
||||
assert not turn.isNil
|
||||
var n = 1
|
||||
while n > 0:
|
||||
n = 0
|
||||
var facets = queues.keys.toSeq
|
||||
for facet in facets:
|
||||
n.inc run(facet, turn, queues[facet])
|
||||
|
||||
proc run*(cap: Cap; action: TurnAction) =
|
||||
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
|
||||
run(cap.relay, action)
|
||||
|
||||
#[
|
||||
proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
|
||||
## Add a callback to a `Future` that will be called at a later `Turn`
|
||||
## within the context of `facet`.
|
||||
addCallback(fut) do ():
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
else:
|
||||
when traceSyndicate:
|
||||
run(facet) do (turn: Turn):
|
||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||
turn.desc.cause.external.description = "Future".toPreserves
|
||||
act(turn)
|
||||
else:
|
||||
run(facet, act)
|
||||
|
||||
proc addCallback*(fut: FutureBase; turn: Turn; act: TurnAction) =
|
||||
## Add a callback to a `Future` that will be called at a later `Turn`
|
||||
## with the same context as the current.
|
||||
if fut.failed:
|
||||
terminate(turn.facet, fut.error)
|
||||
elif fut.finished:
|
||||
enqueue(turn, turn.facet, act)
|
||||
else:
|
||||
addCallback(fut, turn.facet, act)
|
||||
|
||||
proc addCallback*[T](fut: Future[T]; turn: Turn; act: proc (t: Turn, x: T) {.gcsafe.}) =
|
||||
addCallback(fut, turn) do (turn: Turn):
|
||||
if fut.failed: terminate(turn.facet, fut.error)
|
||||
else:
|
||||
when traceSyndicate:
|
||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||
turn.desc.cause.external.description = "Future".toPreserves
|
||||
act(turn, read fut)
|
||||
]#
|
||||
|
||||
proc stop*(turn: Turn, facet: Facet) =
|
||||
if facet.parent.isNil:
|
||||
facet.terminate(turn, true)
|
||||
else:
|
||||
enqueue(turn, facet.parent, whelp terminate(facet.actor, true))
|
||||
# TODO: terminate the actor?
|
||||
|
||||
proc stop*(turn: Turn) =
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc onStop*(facet: Facet; act: TurnAction) =
|
||||
## Add a `proc (turn: Turn)` action to `facet` to be called as it stops.
|
||||
add(facet.shutdownActions, act)
|
||||
|
||||
proc stopActor*(turn: Turn) =
|
||||
let actor = turn.facet.actor
|
||||
enqueue(turn, actor.root, whelp terminate(actor, true))
|
||||
|
||||
proc freshen*(turn: Turn, act: TurnAction) =
|
||||
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
|
||||
run(turn.facet, act)
|
||||
|
||||
proc newCap*(relay: Facet; e: Entity): Cap =
|
||||
Cap(relay: relay, target: e)
|
||||
|
||||
proc newCap*(turn; e: Entity): Cap =
|
||||
Cap(relay: turn.facet, target: e)
|
||||
|
||||
proc newCap*(e: Entity; turn): Cap =
|
||||
Cap(relay: turn.facet, target: e)
|
||||
|
||||
type SyncContinuation {.final.} = ref object of Entity
|
||||
action: TurnAction
|
||||
|
||||
method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) =
|
||||
entity.action(turn)
|
||||
|
||||
proc sync*(turn: Turn; refer: Cap; act: TurnAction) =
|
||||
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
|
||||
|
||||
proc running*(actor): bool =
|
||||
result = not actor.exited
|
||||
if not (result or actor.exitReason.isNil):
|
||||
raise actor.exitReason
|
||||
|
||||
proc newCap*(e: Entity): Cap {.turnAction.} =
|
||||
Cap(relay: activeTurn().facet, target: e)
|
|
@ -0,0 +1,552 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[deques, hashes, options, tables, times]
|
||||
import pkg/cps
|
||||
import preserves
|
||||
import ./protocols/[protocol, sturdy]
|
||||
|
||||
# const traceSyndicate {.booldefine.}: bool = true
|
||||
const traceSyndicate* = true
|
||||
|
||||
when traceSyndicate:
|
||||
import std/streams
|
||||
from std/os import getEnv
|
||||
import ./protocols/trace
|
||||
|
||||
export protocol.Handle
|
||||
|
||||
type
|
||||
Cont* = ref object of Continuation
|
||||
facet: Facet
|
||||
|
||||
PublishProc* = proc (e: Entity; v: Value; h: Handle) {.cps: Cont.}
|
||||
RetractProc* = proc (e: Entity; h: Handle) {.cps: Cont.}
|
||||
MessageProc* = proc (e: Entity; v: Value) {.cps: Cont.}
|
||||
SyncProc* = proc (e: Entity; peer: Cap) {.cps: Cont.}
|
||||
|
||||
Handler* = proc() {.closure.}
|
||||
|
||||
Work = Deque[Cont]
|
||||
HandlerDeque = seq[ContinuationProc[Continuation]]
|
||||
|
||||
FacetState = enum fFresh, fRunning, fEnded
|
||||
|
||||
Callback* = proc () {.closure.}
|
||||
|
||||
OutboundTable = Table[Handle, OutboundAssertion]
|
||||
OutboundAssertion = ref object
|
||||
handle: Handle
|
||||
peer: Cap
|
||||
established: bool
|
||||
|
||||
Facet* = ref object
|
||||
## https://synit.org/book/glossary.html#facet
|
||||
actor: Actor
|
||||
parent: Facet
|
||||
children: seq[Facet]
|
||||
outbound: OutboundTable
|
||||
stopHandlers: HandlerDeque
|
||||
stopCallbacks: seq[Callback]
|
||||
state: FacetState
|
||||
id: FacetId
|
||||
|
||||
FacetProc* = proc (f: Facet) {.closure.}
|
||||
## Type for callbacks to be called within a turn.
|
||||
## The `Facet` parameter is the owning facet.
|
||||
|
||||
Turn {.byref.} = object
|
||||
## https://synit.org/book/glossary.html#turn
|
||||
facet: Facet
|
||||
entity: Entity
|
||||
work: Work
|
||||
actions: seq[Cont]
|
||||
event: Option[protocol.Event]
|
||||
when traceSyndicate:
|
||||
desc: TurnDescription
|
||||
|
||||
Entity* = ref object of RootObj
|
||||
## https://synit.org/book/glossary.html#entity
|
||||
oid*: sturdy.Oid # oid is how Entities are identified over the wire
|
||||
publishImpl*: PublishProc
|
||||
retractImpl*: RetractProc
|
||||
messageImpl*: MessageProc
|
||||
syncImpl*: SyncProc
|
||||
|
||||
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
|
||||
relay*: Facet
|
||||
target*: Entity
|
||||
attenuation*: seq[sturdy.Caveat]
|
||||
|
||||
Actor* = ref object
|
||||
## https://synit.org/book/glossary.html#actor
|
||||
# TODO: run on a seperate thread.
|
||||
# crashHandlers: HandlerDeque
|
||||
root: Facet
|
||||
handleAllocator: Handle
|
||||
facetIdAllocator: int
|
||||
id: ActorId
|
||||
turn: Turn
|
||||
when traceSyndicate:
|
||||
traceStream: FileStream
|
||||
stopped: bool
|
||||
|
||||
template turnWork*(prc: typed): untyped =
|
||||
cps(Cont, prc)
|
||||
|
||||
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
|
||||
## Return the active `Facet` within a `{.syndicate.}` context.
|
||||
assert not c.facet.isNil
|
||||
c.facet
|
||||
|
||||
using
|
||||
actor: Actor
|
||||
facet: Facet
|
||||
entity: Entity
|
||||
cap: Cap
|
||||
turn: Turn
|
||||
|
||||
proc `$`*(facet): string = $facet.id
|
||||
proc `$`*(cap): string = "#:…"
|
||||
|
||||
proc hash*(facet): Hash = facet.unsafeAddr.hash
|
||||
proc hash*(cap): Hash = cap.unsafeAddr.hash
|
||||
|
||||
proc newFacet(actor: Actor; parent: Facet): Facet =
|
||||
inc(actor.facetIdAllocator)
|
||||
result = Facet(
|
||||
actor: actor,
|
||||
parent: parent,
|
||||
id: actor.facetIdAllocator.toPreserves,
|
||||
)
|
||||
|
||||
proc stopped*(facet): bool = facet.state != fRunning
|
||||
|
||||
proc newActor(name: string): Actor =
|
||||
result = Actor(id: name.toPreserves)
|
||||
result.root = newFacet(result, nil)
|
||||
when traceSyndicate:
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE", "")
|
||||
case path
|
||||
of "": discard
|
||||
of "-": result.traceStream = newFileStream(stderr)
|
||||
else: result.traceStream = openFileStream(path, fmWrite)
|
||||
|
||||
when traceSyndicate:
|
||||
proc trace(actor; act: ActorActivation) =
|
||||
if not actor.traceStream.isNil:
|
||||
var entry = TraceEntry(
|
||||
timestamp: getTime().toUnixFloat(),
|
||||
actor: actor.id,
|
||||
item: act,
|
||||
)
|
||||
actor.traceStream.writeLine($entry.toPreserves)
|
||||
|
||||
proc traceTurn(actor) =
|
||||
if not actor.traceStream.isNil:
|
||||
actor.trace(ActorActivation(
|
||||
orKind: ActorActivationKind.turn,
|
||||
turn: actor.turn.desc,
|
||||
))
|
||||
|
||||
proc traceTarget(facet): trace.Target =
|
||||
Target(
|
||||
actor: facet.actor.id,
|
||||
facet: facet.id,
|
||||
)
|
||||
|
||||
proc traceTarget(cap): trace.Target =
|
||||
let facet = cap.relay
|
||||
Target(
|
||||
actor: facet.actor.id,
|
||||
facet: facet.id,
|
||||
oid: cap.target.oid.toPreserves,
|
||||
)
|
||||
|
||||
proc traceEnqueue(actor; e: TargetedTurnEvent) =
|
||||
actor.turn.desc.actions.add ActionDescription(
|
||||
orKind: ActionDescriptionKind.enqueue,
|
||||
enqueue: ActionDescriptionEnqueue(event: e),
|
||||
)
|
||||
|
||||
proc traceDequeue(actor; e: TargetedTurnEvent) =
|
||||
actor.turn.desc.actions.add ActionDescription(
|
||||
orKind: ActionDescriptionKind.dequeue,
|
||||
dequeue: ActionDescriptionDequeue(event: e),
|
||||
)
|
||||
|
||||
proc pass*(a, b: Cont): Cont =
|
||||
assert not a.facet.isNil
|
||||
b.facet = a.facet
|
||||
b
|
||||
|
||||
proc queueWork(facet; c: Cont) =
|
||||
c.facet = facet
|
||||
facet.actor.turn.work.addLast(c)
|
||||
|
||||
proc yieldWork(c: Cont): Cont {.cpsMagic.} =
|
||||
## Suspend and enqueue the caller until later in the turn.
|
||||
assert not c.facet.isNil
|
||||
c.facet.queueWork(c)
|
||||
nil
|
||||
|
||||
proc yieldToActions(c: Cont): Cont {.cpsMagic.} =
|
||||
assert not c.facet.isNil
|
||||
c.facet.actor.turn.actions.add(c)
|
||||
nil
|
||||
|
||||
proc startExternalTurn(facet) =
|
||||
let actor = facet.actor
|
||||
assert actor.turn.work.len == 0
|
||||
assert actor.turn.actions.len == 0
|
||||
actor.turn.facet = facet
|
||||
when traceSyndicate:
|
||||
actor.turn.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
|
||||
|
||||
proc terminate(actor; err: ref Exception) =
|
||||
raise err
|
||||
|
||||
proc terminate(facet; err: ref Exception) =
|
||||
terminate(facet.actor, err)
|
||||
|
||||
proc complete(c: Cont) =
|
||||
var c = c
|
||||
try:
|
||||
while not c.isNil and not c.fn.isNil:
|
||||
var y = c.fn
|
||||
var x = y(c)
|
||||
c = Cont(x)
|
||||
except CatchableError as err:
|
||||
if not c.dismissed:
|
||||
writeStackFrames c
|
||||
terminate(c.facet, err)
|
||||
|
||||
proc run(actor) =
|
||||
assert not actor.stopped
|
||||
var n = 0
|
||||
while actor.turn.work.len > 0:
|
||||
actor.turn.work.popFirst().complete()
|
||||
inc n
|
||||
echo n, " items completed from work queue"
|
||||
when traceSyndicate:
|
||||
actor.traceTurn()
|
||||
var i: int
|
||||
while i < actor.turn.actions.len:
|
||||
complete(move actor.turn.actions[i])
|
||||
inc i
|
||||
echo i, " items completed from action queue"
|
||||
turn.actions.setLen(0)
|
||||
when traceSyndicate:
|
||||
if actor.stopped:
|
||||
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
|
||||
|
||||
proc start(actor; cont: Cont) =
|
||||
when traceSyndicate:
|
||||
var act = ActorActivation(orkind: ActorActivationKind.start)
|
||||
trace(actor, act)
|
||||
actor.root.state = fRunning
|
||||
actor.root.startExternalTurn()
|
||||
actor.root.queueWork(cont)
|
||||
run(actor)
|
||||
|
||||
proc stop(actor)
|
||||
|
||||
proc collectPath(result: var seq[FacetId]; facet) =
|
||||
if not facet.parent.isNil:
|
||||
collectPath(result, facet.parent)
|
||||
result.add(facet.id)
|
||||
|
||||
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
|
||||
c.fn = facet.stopHandlers.pop()
|
||||
result = c
|
||||
|
||||
proc runNextFacetStop() {.cps: Cont.} =
|
||||
activeFacet().runNextStop()
|
||||
|
||||
proc stop(facet; reason: FacetStopReason) =
|
||||
let actor = facet.actor
|
||||
while facet.stopHandlers.len > 0:
|
||||
var c = whelp runNextFacetStop()
|
||||
c.facet = facet
|
||||
complete(c)
|
||||
while facet.stopCallbacks.len > 0:
|
||||
var cb = facet.stopCallbacks.pop()
|
||||
cb()
|
||||
while facet.children.len > 0:
|
||||
stop(facet.children.pop(), FacetStopReason.parentStopping)
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
|
||||
collectPath(act.facetstop.path, facet)
|
||||
act.facetStop.reason = reason
|
||||
actor.turn.desc.actions.add act
|
||||
if facet.parent.isNil:
|
||||
actor.root = nil
|
||||
stop(actor)
|
||||
|
||||
proc stop(actor) =
|
||||
if not actor.root.isNil:
|
||||
stop(actor.root, FacetStopReason.actorStopping)
|
||||
actor.stopped = true
|
||||
|
||||
proc bootActor(name: string, c: Cont) =
|
||||
start(newActor(name), c)
|
||||
|
||||
# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} =
|
||||
# stop(c.facet, reason)
|
||||
|
||||
proc stopActorAction() {.cps: Cont.} =
|
||||
activeFacet().actor.stop()
|
||||
|
||||
proc stopActor*(facet) =
|
||||
let c = whelp stopActorAction()
|
||||
c.facet = facet
|
||||
facet.actor.turn.actions.add(c)
|
||||
|
||||
type
|
||||
AssertionRef* = ref object
|
||||
value*: Value
|
||||
# if the Enity methods take a Value object then the generated
|
||||
# C code has "redefinition of struct" problems when orc is enabled
|
||||
|
||||
proc newCap*(f: Facet; e: Entity): Cap =
|
||||
Cap(relay: f, target: e)
|
||||
|
||||
proc nextHandle(facet: Facet): Handle =
|
||||
inc(facet.actor.handleAllocator)
|
||||
facet.actor.handleAllocator
|
||||
|
||||
proc actor(cap): Actor = cap.relay.actor
|
||||
|
||||
type Bindings = Table[Value, Value]
|
||||
|
||||
proc attenuate(r: Cap; a: seq[Caveat]): Cap =
|
||||
if a.len == 0: result = r
|
||||
else: result = Cap(
|
||||
relay: r.relay,
|
||||
target: r.target,
|
||||
attenuation: a & r.attenuation)
|
||||
|
||||
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
|
||||
case p.orKind
|
||||
of PatternKind.Pdiscard: result = true
|
||||
of PatternKind.Patom:
|
||||
result = case p.patom
|
||||
of PAtom.Boolean: v.isBoolean
|
||||
of PAtom.Double: v.isDouble
|
||||
of PAtom.Signedinteger: v.isInteger
|
||||
of PAtom.String: v.isString
|
||||
of PAtom.Bytestring: v.isByteString
|
||||
of PAtom.Symbol: v.isSymbol
|
||||
of PatternKind.Pembedded:
|
||||
result = v.isEmbedded
|
||||
of PatternKind.Pbind:
|
||||
if match(bindings, p.pbind.pattern, v):
|
||||
bindings[p.pbind.pattern.toPreserves] = v
|
||||
result = true
|
||||
of PatternKind.Pand:
|
||||
for pp in p.pand.patterns:
|
||||
result = match(bindings, pp, v)
|
||||
if not result: break
|
||||
of PatternKind.Pnot:
|
||||
var b: Bindings
|
||||
result = not match(b, p.pnot.pattern, v)
|
||||
of PatternKind.Lit:
|
||||
result = p.lit.value == v
|
||||
of PatternKind.PCompound:
|
||||
case p.pcompound.orKind
|
||||
of PCompoundKind.rec:
|
||||
if v.isRecord and
|
||||
p.pcompound.rec.label == v.label and
|
||||
p.pcompound.rec.fields.len == v.arity:
|
||||
result = true
|
||||
for i, pp in p.pcompound.rec.fields:
|
||||
if not match(bindings, pp, v[i]):
|
||||
result = false
|
||||
break
|
||||
of PCompoundKind.arr:
|
||||
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
|
||||
result = true
|
||||
for i, pp in p.pcompound.arr.items:
|
||||
if not match(bindings, pp, v[i]):
|
||||
result = false
|
||||
break
|
||||
of PCompoundKind.dict:
|
||||
if v.isDictionary:
|
||||
result = true
|
||||
for key, pp in p.pcompound.dict.entries:
|
||||
let vv = step(v, key)
|
||||
if vv.isNone or not match(bindings, pp, get vv):
|
||||
result = true
|
||||
break
|
||||
|
||||
proc match(p: Pattern; v: Value): Option[Bindings] =
|
||||
var b: Bindings
|
||||
if match(b, p, v):
|
||||
result = some b
|
||||
|
||||
proc instantiate(t: Template; bindings: Bindings): Value =
|
||||
case t.orKind
|
||||
of TemplateKind.Tattenuate:
|
||||
let v = instantiate(t.tattenuate.template, bindings)
|
||||
let cap = v.unembed(Cap)
|
||||
if cap.isNone:
|
||||
raise newException(ValueError, "Attempt to attenuate non-capability")
|
||||
result = attenuate(get cap, t.tattenuate.attenuation).embed
|
||||
of TemplateKind.TRef:
|
||||
let n = $t.tref.binding.int
|
||||
try: result = bindings[n.toPreserves]
|
||||
except KeyError:
|
||||
raise newException(ValueError, "unbound reference: " & n)
|
||||
of TemplateKind.Lit:
|
||||
result = t.lit.value
|
||||
of TemplateKind.Tcompound:
|
||||
case t.tcompound.orKind
|
||||
of TCompoundKind.rec:
|
||||
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
|
||||
for i, tt in t.tcompound.rec.fields:
|
||||
result[i] = instantiate(tt, bindings)
|
||||
of TCompoundKind.arr:
|
||||
result = initSequence(t.tcompound.arr.items.len)
|
||||
for i, tt in t.tcompound.arr.items:
|
||||
result[i] = instantiate(tt, bindings)
|
||||
of TCompoundKind.dict:
|
||||
result = initDictionary()
|
||||
for key, tt in t.tcompound.dict.entries:
|
||||
result[key] = instantiate(tt, bindings)
|
||||
|
||||
proc rewrite(r: Rewrite; v: Value): Value =
|
||||
let bindings = match(r.pattern, v)
|
||||
if bindings.isSome:
|
||||
result = instantiate(r.template, get bindings)
|
||||
|
||||
proc examineAlternatives(cav: Caveat; v: Value): Value =
|
||||
case cav.orKind
|
||||
of CaveatKind.Rewrite:
|
||||
result = rewrite(cav.rewrite, v)
|
||||
of CaveatKind.Alts:
|
||||
for r in cav.alts.alternatives:
|
||||
result = rewrite(r, v)
|
||||
if not result.isFalse: break
|
||||
of CaveatKind.Reject: discard
|
||||
of CaveatKind.unknown: discard
|
||||
|
||||
proc runRewrites(v: Value; a: openarray[Caveat]): Value =
|
||||
result = v
|
||||
for stage in a:
|
||||
result = examineAlternatives(stage, result)
|
||||
if result.isFalse: break
|
||||
|
||||
proc publish(c: Cont; e: Entity; v: Value; h: Handle): Cont {.cpsMagic.} =
|
||||
pass c, e.publishImpl.call(e, v, h)
|
||||
|
||||
proc retract(c: Cont; e: Entity; h: Handle): Cont {.cpsMagic.} =
|
||||
pass c, e.retractImpl.call(e, h)
|
||||
|
||||
proc message(c: Cont; e: Entity; v: Value): Cont {.cpsMagic.} =
|
||||
pass c, e.messageImpl.call(e, v)
|
||||
|
||||
proc sync(c: Cont; e: Entity; p: Cap): Cont {.cpsMagic.} =
|
||||
pass c, e.syncImpl.call(e, p)
|
||||
|
||||
proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} =
|
||||
when traceSyndicate:
|
||||
var traceEvent = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
|
||||
)
|
||||
traceEvent.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
traceEvent.detail.assert = TurnEventAssert(
|
||||
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
|
||||
handle: h,
|
||||
)
|
||||
traceEvent.detail.assert.assertion.value.value = val
|
||||
cap.actor.traceEnqueue(traceEvent)
|
||||
cap.relay.outbound[h] = OutboundAssertion(handle: h, peer: cap)
|
||||
yieldToActions()
|
||||
when traceSyndicate:
|
||||
cap.actor.traceDequeue(traceEvent)
|
||||
cap.target.publish(val, h)
|
||||
cap.relay.outbound[h].established = true
|
||||
|
||||
proc turnRetract(cap: Cap; h: Handle) {.turnWork.} =
|
||||
when traceSyndicate:
|
||||
var traceEvent = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
|
||||
)
|
||||
traceEvent.detail.retract.handle = h
|
||||
cap.actor.traceEnqueue(traceEvent)
|
||||
yieldToActions()
|
||||
when traceSyndicate:
|
||||
cap.actor.traceDequeue(traceEvent)
|
||||
cap.target.retract(h)
|
||||
|
||||
proc turnMessage(cap: Cap; val: Value) {.turnWork.} =
|
||||
var val = runRewrites(val, cap.attenuation)
|
||||
when traceSyndicate:
|
||||
var traceEvent = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
|
||||
)
|
||||
traceEvent.detail.message.body.value.value = val
|
||||
cap.actor.traceEnqueue(traceEvent)
|
||||
yieldToActions()
|
||||
when traceSyndicate:
|
||||
cap.actor.traceDequeue(traceEvent)
|
||||
cap.target.message(val)
|
||||
|
||||
proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
|
||||
when traceSyndicate:
|
||||
var traceEvent = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
|
||||
)
|
||||
traceEvent.detail.sync.peer = peer.traceTarget
|
||||
cap.actor.traceEnqueue(traceEvent)
|
||||
yieldToActions()
|
||||
when traceSyndicate:
|
||||
cap.actor.traceDequeue(traceEvent)
|
||||
cap.target.sync(peer)
|
||||
|
||||
proc publish*(cap; val: Value): Handle =
|
||||
var val = runRewrites(val, cap.attenuation)
|
||||
# TODO: attenuation to nothing?
|
||||
result = cap.relay.nextHandle()
|
||||
cap.relay.queueWork(whelp turnPublish(cap, val, result))
|
||||
|
||||
proc publish*[T](cap; x: T): Handle =
|
||||
publish(cap, x.toPreserves)
|
||||
|
||||
proc retract*(cap; h: Handle) =
|
||||
cap.relay.queueWork(whelp turnRetract(cap, h))
|
||||
|
||||
proc message*(cap; val: Value) =
|
||||
var val = runRewrites(val, cap.attenuation)
|
||||
cap.relay.queueWork(whelp turnMessage(cap, val))
|
||||
|
||||
proc message*[T](cap; x: T) =
|
||||
message(cap, x.toPreserves)
|
||||
|
||||
proc sync*(cap, peer: Cap) =
|
||||
cap.relay.queueWork(whelp turnSync(cap, peer))
|
||||
|
||||
proc installStopHook(c: Cont, facet: Facet): Cont {.cpsMagic.} =
|
||||
facet.stopHandlers.add(c.fn)
|
||||
return c
|
||||
|
||||
proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} =
|
||||
c.facet.stopCallbacks.add(cb)
|
||||
result = c
|
||||
|
||||
proc onStop*(facet; cb: proc () {.closure.}) =
|
||||
facet.stopCallbacks.add(cb)
|
||||
|
||||
proc bootActor*(name: string; bootProc: FacetProc): Actor =
|
||||
result = newActor(name)
|
||||
result.root.startExternalTurn()
|
||||
bootProc(result.root)
|
||||
|
||||
proc runActor*(name: string; bootProc: FacetProc) =
|
||||
let actor = bootActor(name, bootProc)
|
||||
while not actor.stopped:
|
||||
run(actor)
|
|
@ -3,10 +3,8 @@
|
|||
|
||||
import std/[asyncdispatch, monotimes, times, posix, times, epoll]
|
||||
import preserves
|
||||
import syndicate
|
||||
|
||||
import ../protocols/timer
|
||||
from syndicate/protocols/dataspace import Observe
|
||||
import ../syndicate, ../protocols/timer
|
||||
from ../protocols/dataspace import Observe
|
||||
|
||||
export timer
|
||||
|
||||
|
@ -24,12 +22,16 @@ proc eventfd(count: cuint, flags: cint): cint
|
|||
|
||||
proc now: float64 = getTime().toUnixFloat()
|
||||
|
||||
proc processTimers(ds: Cap) {.turnAction.} =
|
||||
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
|
||||
during(ds, pat) do (seconds: float):
|
||||
let period = seconds - now()
|
||||
if period < 0.001 or true:
|
||||
let h = publish(ds, LaterThan(seconds: seconds).toPreserves)
|
||||
proc spawnTimers*(ds: Cap): Actor {.discardable.} =
|
||||
## Spawn a timer actor.
|
||||
bootActor("timers") do (root: Facet):
|
||||
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
|
||||
#[
|
||||
during(ds, pat) do (seconds: float):
|
||||
let period = seconds - now()
|
||||
if period < 0.001 or true:
|
||||
let h = publish(ds, LaterThan(seconds: seconds).toPreserves)
|
||||
]#
|
||||
|
||||
#[
|
||||
else:
|
||||
|
@ -39,14 +41,12 @@ proc processTimers(ds: Cap) {.turnAction.} =
|
|||
discard publish(turn, ds, LaterThan(seconds: seconds))
|
||||
]#
|
||||
|
||||
proc spawnTimers*(ds: Cap) =
|
||||
## Spawn a timer actor.
|
||||
boot(newActor("timers"), whelp processTimers(ds))
|
||||
|
||||
proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
||||
#[
|
||||
proc after*(ds: Cap; dur: Duration; cb: proc () {.closure.}) =
|
||||
## Execute `act` after some duration of time.
|
||||
let later = now() + dur.inMilliseconds.float64 * 1_000.0
|
||||
onPublish(ds, grab LaterThan(seconds: later)):
|
||||
act(turn)
|
||||
cb()
|
||||
]#
|
||||
|
||||
# TODO: periodic timer
|
|
@ -0,0 +1,48 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, options, tables]
|
||||
import pkg/cps
|
||||
import preserves
|
||||
import ./[actors, patterns, skeletons]
|
||||
|
||||
from ./protocols/protocol import Handle
|
||||
from ./protocols/dataspace import Observe
|
||||
|
||||
type
|
||||
Dataspace {.final.} = ref object of Entity
|
||||
index: Index
|
||||
handleMap: Table[Handle, Value]
|
||||
|
||||
proc dsPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
|
||||
var ds = Dataspace(e)
|
||||
if ds.index.add(v):
|
||||
var obs = v.preservesTo(Observe)
|
||||
if obs.isSome and obs.get.observer of Cap:
|
||||
ds.index.add(obs.get.pattern, Cap(obs.get.observer))
|
||||
ds.handleMap[h] = v
|
||||
|
||||
proc dsRetract(e: Entity; h: Handle) {.cps: Cont.} =
|
||||
var ds = Dataspace(e)
|
||||
var v = ds.handleMap[h]
|
||||
if ds.index.remove(v):
|
||||
ds.handleMap.del h
|
||||
var obs = v.preservesTo(Observe)
|
||||
if obs.isSome and obs.get.observer of Cap:
|
||||
ds.index.remove(obs.get.pattern, Cap(obs.get.observer))
|
||||
|
||||
proc dsMessage(e: Entity; v: Value) {.cps: Cont.} =
|
||||
var ds = Dataspace(e)
|
||||
ds.index.deliverMessage(v)
|
||||
|
||||
proc newDataspace*(f: Facet): Cap =
|
||||
var ds = Dataspace(
|
||||
publishImpl: whelp dsPublish,
|
||||
retractImpl: whelp dsRetract,
|
||||
messageImpl: whelp dsMessage,
|
||||
index: initIndex(),
|
||||
)
|
||||
newCap(f, ds)
|
||||
|
||||
proc observe*(cap: Cap; pat: Pattern; e: Entity): Handle =
|
||||
publish(cap, Observe(pattern: pat, observer: newCap(cap.relay, e)))
|
|
@ -0,0 +1,43 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, tables]
|
||||
import preserves
|
||||
import ./[actors, patterns]
|
||||
|
||||
type
|
||||
DuringProc* = proc (a: Value; h: Handle): FacetProc {.gcsafe.}
|
||||
DuringActionKind = enum null, dead, act
|
||||
DuringAction = object
|
||||
case kind: DuringActionKind
|
||||
of null, dead: discard
|
||||
of act:
|
||||
retractProc: FacetProc
|
||||
DuringEntity {.final.}= ref object of Entity
|
||||
publishProc: DuringProc
|
||||
assertionMap: Table[Handle, DuringAction]
|
||||
|
||||
proc duringPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
|
||||
var de = DuringEntity(e)
|
||||
let handler = de.handler(de.facet, a.value, h)
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null, dead:
|
||||
de.assertionMap[h] = DuringAction(kind: act, action: handler)
|
||||
of act:
|
||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||
|
||||
proc duringRetract(e: Entity; h: Handle) {.cps: Cont.} =
|
||||
var de = DuringEntity(e)
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: dead)
|
||||
of dead:
|
||||
raiseAssert("during: duplicate handle in retract: " & $h)
|
||||
of act:
|
||||
de.assertionMap.del h
|
||||
if not g.action.isNil:
|
||||
g.action(de.facet)
|
||||
|
||||
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,9 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import preserves
|
||||
import ./actors, ./patterns, ./protocols/dataspace
|
||||
|
||||
proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Cap {.discardable.} =
|
||||
result = newCap(turn, e)
|
||||
publish(turn, ds, Observe(pattern: pat, observer: result))
|
|
@ -6,7 +6,7 @@ from std/os import getEnv, `/`
|
|||
import pkg/sys/[ioqueue, sockets]
|
||||
|
||||
import preserves
|
||||
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
||||
import ./syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
||||
|
||||
when defined(traceSyndicate):
|
||||
when defined(posix):
|
|
@ -0,0 +1,443 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, options, tables]
|
||||
from std/os import getEnv, `/`
|
||||
import preserves
|
||||
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
||||
|
||||
when defined(traceSyndicate):
|
||||
when defined(posix):
|
||||
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
|
||||
else:
|
||||
template trace(args: varargs[untyped]): untyped = echo(args)
|
||||
else:
|
||||
template trace(args: varargs[untyped]): untyped = discard
|
||||
|
||||
export `$`
|
||||
|
||||
type
|
||||
Oid = sturdy.Oid
|
||||
|
||||
export Stdio, Tcp, WebSocket, Unix
|
||||
|
||||
type
|
||||
Assertion = Value
|
||||
WireRef = sturdy.WireRef
|
||||
Turn = syndicate.Turn
|
||||
Handle = actors.Handle
|
||||
|
||||
type
|
||||
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
|
||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
|
||||
|
||||
Relay* = ref object of RootObj
|
||||
facet: Facet
|
||||
inboundAssertions: Table[Handle,
|
||||
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
|
||||
outboundAssertions: Table[Handle, seq[WireSymbol]]
|
||||
exported: Membrane
|
||||
imported: Membrane
|
||||
nextLocalOid: Oid
|
||||
pendingTurn: protocol.Turn
|
||||
packetWriter: PacketWriter
|
||||
untrusted: bool
|
||||
|
||||
SyncPeerEntity = ref object of Entity
|
||||
relay: Relay
|
||||
peer: Cap
|
||||
handleMap: Table[Handle, Handle]
|
||||
e: WireSymbol
|
||||
|
||||
RelayEntity = ref object of Entity
|
||||
## https://synit.org/book/protocol.html#relay-entities
|
||||
label: string
|
||||
relay: Relay
|
||||
|
||||
proc releaseCapOut(r: Relay; e: WireSymbol) =
|
||||
r.exported.drop e
|
||||
|
||||
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
|
||||
spe.handleMap[h] = publish(t, spe.peer, a.value)
|
||||
|
||||
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
|
||||
var other: Handle
|
||||
if se.handleMap.pop(h, other):
|
||||
retract(t, other)
|
||||
|
||||
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
|
||||
if not se.e.isNil:
|
||||
se.relay.releaseCapOut(se.e)
|
||||
message(t, se.peer, a.value)
|
||||
|
||||
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
|
||||
sync(t, se.peer, peer)
|
||||
|
||||
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
|
||||
SyncPeerEntity(relay: r, peer: p)
|
||||
|
||||
proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireRef =
|
||||
if cap.target of RelayEntity and cap.target.RelayEntity.relay == relay and cap.attenuation.len == 0:
|
||||
result = WireRef(orKind: WireRefKind.yours, yours: WireRefYours(oid: cap.target.oid))
|
||||
else:
|
||||
var ws = grab(relay.exported, cap)
|
||||
if ws.isNil:
|
||||
ws = newWireSymbol(relay.exported, relay.nextLocalOid, cap)
|
||||
inc relay.nextLocalOid
|
||||
exported.add ws
|
||||
result = WireRef(
|
||||
orKind: WireRefKind.mine,
|
||||
mine: WireRefMine(oid: ws.oid))
|
||||
|
||||
proc rewriteOut(relay: Relay; v: Assertion):
|
||||
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
|
||||
var exported: seq[WireSymbol]
|
||||
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
||||
let o = pr.unembed(Cap); if o.isSome:
|
||||
rewriteCapOut(relay, o.get, exported).toPreserves
|
||||
else: pr
|
||||
result.exported = exported
|
||||
|
||||
proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] =
|
||||
result = rewriteOut(relay, v)
|
||||
relay.outboundAssertions[h] = result.exported
|
||||
|
||||
proc deregister(relay: Relay; h: Handle) =
|
||||
var outbound: seq[WireSymbol]
|
||||
if relay.outboundAssertions.pop(h, outbound):
|
||||
for e in outbound: releaseCapOut(relay, e)
|
||||
|
||||
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||
if r.pendingTurn.len == 0:
|
||||
# If the pending queue is empty then schedule a packet
|
||||
# to be sent after pending I/O is processed.
|
||||
callSoon do ():
|
||||
r.facet.run do (turn: var Turn):
|
||||
var pkt = Packet(
|
||||
orKind: PacketKind.Turn,
|
||||
turn: move r.pendingTurn)
|
||||
trace "C: ", pkt
|
||||
assert(not r.packetWriter.isNil, "missing packetWriter proc")
|
||||
r.packetWriter(turn, encode pkt)
|
||||
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||
|
||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||
send(re.relay, turn, protocol.Oid re.oid, ev)
|
||||
|
||||
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
||||
re.send(t, Event(
|
||||
orKind: EventKind.Assert,
|
||||
`assert`: protocol.Assert(
|
||||
assertion: re.relay.register(a.value, h).rewritten,
|
||||
handle: h)))
|
||||
|
||||
method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} =
|
||||
re.relay.deregister h
|
||||
re.send(t, Event(
|
||||
orKind: EventKind.Retract,
|
||||
retract: Retract(handle: h)))
|
||||
|
||||
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} =
|
||||
var (value, exported) = rewriteOut(re.relay, msg.value)
|
||||
assert(len(exported) == 0, "cannot send a reference in a message")
|
||||
if len(exported) == 0:
|
||||
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
|
||||
|
||||
method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} =
|
||||
var
|
||||
peerEntity = newSyncPeerEntity(re.relay, peer)
|
||||
exported: seq[WireSymbol]
|
||||
wr = rewriteCapOut(re.relay, turn.newCap(peerEntity), exported)
|
||||
peerEntity.e = exported[0]
|
||||
var ev = Event(orKind: EventKind.Sync)
|
||||
ev.sync.peer = wr.toPreserves.embed
|
||||
re.send(turn, ev)
|
||||
|
||||
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
|
||||
RelayEntity(label: label, relay: r, oid: o)
|
||||
|
||||
using
|
||||
relay: Relay
|
||||
facet: Facet
|
||||
|
||||
proc lookupLocal(relay; oid: Oid): Cap =
|
||||
let sym = relay.exported.grab oid
|
||||
if sym.isNil: newInertCap()
|
||||
else: sym.cap
|
||||
|
||||
proc isInert(r: Cap): bool =
|
||||
r.target.isNil
|
||||
|
||||
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
|
||||
case n.orKind
|
||||
of WireRefKind.mine:
|
||||
var e = relay.imported.grab(n.mine.oid)
|
||||
if e.isNil:
|
||||
e = newWireSymbol(
|
||||
relay.imported,
|
||||
n.mine.oid,
|
||||
newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)),
|
||||
)
|
||||
imported.add e
|
||||
result = e.cap
|
||||
of WireRefKind.yours:
|
||||
let r = relay.lookupLocal(n.yours.oid)
|
||||
if n.yours.attenuation.len == 0 or r.isInert: result = r
|
||||
else: raiseAssert "attenuation not implemented"
|
||||
|
||||
proc rewriteIn(relay; facet; v: Value):
|
||||
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
|
||||
var imported: seq[WireSymbol]
|
||||
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
||||
let wr = pr.preservesTo WireRef; if wr.isSome:
|
||||
result = rewriteCapIn(relay, facet, wr.get, imported).embed
|
||||
else:
|
||||
result = pr
|
||||
result.imported = imported
|
||||
|
||||
proc close(r: Relay) = discard
|
||||
|
||||
proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
||||
case event.orKind
|
||||
of EventKind.Assert:
|
||||
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
||||
relay.inboundAssertions[event.assert.handle] = (publish(turn, cap, a), imported,)
|
||||
|
||||
of EventKind.Retract:
|
||||
let remoteHandle = event.retract.handle
|
||||
var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]]
|
||||
if relay.inboundAssertions.pop(remoteHandle, outbound):
|
||||
for e in outbound.imported: relay.imported.drop e
|
||||
turn.retract(outbound.localHandle)
|
||||
|
||||
of EventKind.Message:
|
||||
let (a, imported) = rewriteIn(relay, turn.facet, event.message.body)
|
||||
assert imported.len == 0, "Cannot receive transient reference"
|
||||
turn.message(cap, a)
|
||||
|
||||
of EventKind.Sync:
|
||||
discard # TODO
|
||||
#[
|
||||
var imported: seq[WireSymbol]
|
||||
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
|
||||
turn.sync(cap) do (turn: var Turn):
|
||||
turn.message(k, true)
|
||||
for e in imported: relay.imported.del e
|
||||
]#
|
||||
|
||||
proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
|
||||
trace "S: ", v
|
||||
run(relay.facet) do (t: var Turn):
|
||||
var pkt: Packet
|
||||
if pkt.fromPreserves(v):
|
||||
case pkt.orKind
|
||||
of PacketKind.Turn:
|
||||
# https://synit.org/book/protocol.html#turn-packets
|
||||
for te in pkt.turn:
|
||||
let r = lookupLocal(relay, te.oid.Oid)
|
||||
if not r.isInert:
|
||||
dispatch(relay, t, r, te.event)
|
||||
else:
|
||||
stderr.writeLine("discarding event for unknown Cap; ", te.event)
|
||||
of PacketKind.Error:
|
||||
# https://synit.org/book/protocol.html#error-packets
|
||||
when defined(posix):
|
||||
stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
|
||||
close relay
|
||||
of PacketKind.Extension:
|
||||
# https://synit.org/book/protocol.html#extension-packets
|
||||
discard
|
||||
else:
|
||||
when defined(posix):
|
||||
stderr.writeLine("discarding undecoded packet ", v)
|
||||
|
||||
type
|
||||
RelayOptions* = object of RootObj
|
||||
packetWriter*: PacketWriter
|
||||
untrusted*: bool
|
||||
RelayActorOptions* = object of RelayOptions
|
||||
initialOid*: Option[Oid]
|
||||
initialCap*: Cap
|
||||
nextLocalOid*: Option[Oid]
|
||||
|
||||
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
|
||||
result = Relay(
|
||||
facet: turn.facet,
|
||||
packetWriter: opts.packetWriter,
|
||||
untrusted: opts.untrusted)
|
||||
discard result.facet.preventInertCheck()
|
||||
setup(turn, result)
|
||||
|
||||
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection =
|
||||
result.`addr` = addrAss
|
||||
result.resolved = Resolved(orKind: ResolvedKind.accepted)
|
||||
result.resolved.accepted.responderSession = ds
|
||||
|
||||
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
discard spawn(name, turn) do (turn: var Turn):
|
||||
let relay = newRelay(turn, opts, setup)
|
||||
if not opts.initialCap.isNil:
|
||||
var exported: seq[WireSymbol]
|
||||
discard rewriteCapOut(relay, opts.initialCap, exported)
|
||||
opts.nextLocalOid.map do (oid: Oid):
|
||||
relay.nextLocalOid =
|
||||
if oid == 0.Oid: 1.Oid
|
||||
else: oid
|
||||
if opts.initialOid.isSome:
|
||||
var
|
||||
imported: seq[WireSymbol]
|
||||
wr = WireRef(
|
||||
orKind: WireRefKind.mine,
|
||||
mine: WireRefMine(oid: opts.initialOid.get))
|
||||
res = rewriteCapIn(relay, turn.facet, wr, imported)
|
||||
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
|
||||
else:
|
||||
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
|
||||
|
||||
when defined(posix):
|
||||
import std/asyncnet
|
||||
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
||||
|
||||
type ShutdownEntity* = ref object of Entity
|
||||
|
||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||
stopActor(turn)
|
||||
|
||||
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||
|
||||
export Tcp
|
||||
|
||||
when defined(posix):
|
||||
export Unix
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Value) =
|
||||
## Relay a dataspace over an open `AsyncSocket`.
|
||||
proc socketWriter(packet: sink Packet): Future[void] =
|
||||
socket.send(cast[string](encode(packet)))
|
||||
const recvSize = 0x2000
|
||||
var shutdownCap: Cap
|
||||
let
|
||||
reenable = turn.facet.preventInertCheck()
|
||||
connectionClosedCap = newCap(turn, ShutdownEntity())
|
||||
discard bootActor("socket") do (turn: var Turn):
|
||||
var ops = RelayActorOptions(
|
||||
packetWriter: socketWriter,
|
||||
initialOid: 0.Oid.some)
|
||||
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
|
||||
let facet = turn.facet
|
||||
var wireBuf = newBufferedDecoder(0)
|
||||
|
||||
turn.facet.actor.atExit do (turn: var Turn): close(socket)
|
||||
discard publish(turn, connectionClosedCap, true)
|
||||
shutdownCap = newCap(turn, ShutdownEntity())
|
||||
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value):
|
||||
raise newException(IOError, $detail)
|
||||
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap):
|
||||
run(gatekeeper.relay) do (turn: var Turn):
|
||||
reenable()
|
||||
discard publish(turn, shutdownCap, true)
|
||||
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
|
||||
let facet = inFacet(turn) do (turn: var Turn):
|
||||
let o = ass.preservesTo Resolved; if o.isSome:
|
||||
discard publish(turn, ds, ResolvePath(
|
||||
route: route, `addr`: addrAss, resolved: o.get))
|
||||
proc action(turn: var Turn) =
|
||||
stop(turn, facet)
|
||||
result = action
|
||||
var resolve = Resolve(
|
||||
step: step,
|
||||
observer: newCap(turn, during(duringCallback)),
|
||||
)
|
||||
discard publish(turn, gatekeeper, resolve)
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Value) =
|
||||
## Relay a dataspace over TCP.
|
||||
let socket = newAsyncSocket(
|
||||
domain = AF_INET,
|
||||
sockType = SOCK_STREAM,
|
||||
protocol = IPPROTO_TCP,
|
||||
buffered = false)
|
||||
let fut = connect(socket, transport.host, Port transport.port)
|
||||
addCallback(fut, turn) do (turn: var Turn):
|
||||
connect(turn, ds, route, transport.toPreserves, socket, step)
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Value) =
|
||||
## Relay a dataspace over a UNIX socket.
|
||||
let socket = newAsyncSocket(
|
||||
domain = AF_UNIX,
|
||||
sockType = SOCK_STREAM,
|
||||
protocol = cast[Protocol](0),
|
||||
buffered = false)
|
||||
let fut = connectUnix(socket, transport.path)
|
||||
addCallback(fut, turn) do (turn: var Turn):
|
||||
connect(turn, ds, route, transport.toPreserves, socket, step)
|
||||
|
||||
import std/asyncfile
|
||||
|
||||
const stdinReadSize = 128
|
||||
|
||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||
## Connect to an external dataspace over stdin and stdout.
|
||||
proc stdoutWriter(packet: sink Packet): Future[void] =
|
||||
result = newFuture[void]()
|
||||
var buf = encode(packet)
|
||||
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
|
||||
flushFile(stdout)
|
||||
complete result
|
||||
var opts = RelayActorOptions(
|
||||
packetWriter: stdoutWriter,
|
||||
initialCap: ds,
|
||||
initialOid: 0.Oid.some)
|
||||
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
|
||||
let
|
||||
facet = turn.facet
|
||||
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
||||
close(stdin)
|
||||
facet.actor.atExit do (turn: var Turn):
|
||||
close(asyncStdin)
|
||||
var wireBuf = newBufferedDecoder(0)
|
||||
proc readCb(pktFut: Future[string]) {.gcsafe.} =
|
||||
if not pktFut.failed:
|
||||
var buf = pktFut.read
|
||||
if buf.len == 0:
|
||||
run(facet) do (turn: var Turn): stopActor(turn)
|
||||
else:
|
||||
feed(wireBuf, buf)
|
||||
var (success, pr) = decode(wireBuf)
|
||||
if success:
|
||||
dispatch(relay, pr)
|
||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||
|
||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||
|
||||
proc envRoute*: Route =
|
||||
var text = getEnv("SYNDICATE_ROUTE")
|
||||
if text == "":
|
||||
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
|
||||
result.transports = @[initRecord("unix", tx)]
|
||||
result.pathSteps = @[capabilities.mint().toPreserves]
|
||||
else:
|
||||
var pr = parsePreserves(text)
|
||||
if not result.fromPreserves(pr):
|
||||
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
||||
|
||||
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||
var
|
||||
unix: Unix
|
||||
tcp: Tcp
|
||||
stdio: Stdio
|
||||
doAssert(route.transports.len == 1, "only a single transport supported for routes")
|
||||
doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes")
|
||||
if unix.fromPreserves route.transports[0]:
|
||||
connect(turn, ds, route, unix, route.pathSteps[0])
|
||||
elif tcp.fromPreserves route.transports[0]:
|
||||
connect(turn, ds, route, tcp, route.pathSteps[0])
|
||||
elif stdio.fromPreserves route.transports[0]:
|
||||
connectStdio(turn, ds)
|
||||
bootProc(turn, ds)
|
||||
else:
|
||||
raise newException(ValueError, "unsupported route")
|
||||
|
||||
during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap):
|
||||
bootProc(turn, dest)
|
|
@ -65,9 +65,9 @@ func isEmpty(cont: Continuation): bool =
|
|||
cont.cache.len == 0 and cont.leafMap.len == 0
|
||||
|
||||
type
|
||||
ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.}
|
||||
LeafProc = proc (l: Leaf; v: Value) {.gcsafe.}
|
||||
ObserverProc = proc (turn: Turn; group: ObserverGroup; vs: seq[Value]) {.gcsafe.}
|
||||
ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
|
||||
LeafProc = proc (l: Leaf; v: Value) {.closure.}
|
||||
ObserverProc = proc (group: ObserverGroup; vs: seq[Value]) {.closure.}
|
||||
|
||||
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
|
||||
result = cont.leafMap.getOrDefault(constPaths)
|
||||
|
@ -114,10 +114,10 @@ proc top(stack: TermStack): Value =
|
|||
assert stack.len > 0
|
||||
stack[stack.high]
|
||||
|
||||
proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
|
||||
proc modify(node: Node; outerValue: Value; event: EventKind;
|
||||
modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) =
|
||||
|
||||
proc walk(cont: Continuation; turn: Turn) =
|
||||
proc walk(cont: Continuation) =
|
||||
modCont(cont, outerValue)
|
||||
for constPaths, constValMap in cont.leafMap.pairs:
|
||||
let constVals = projectPaths(outerValue, constPaths)
|
||||
|
@ -129,7 +129,7 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
|
|||
for capturePaths, observerGroup in leaf.observerGroups.pairs:
|
||||
let captures = projectPaths(outerValue, capturePaths)
|
||||
if captures.isSome:
|
||||
modObs(turn, observerGroup, get captures)
|
||||
modObs(observerGroup, get captures)
|
||||
of removedEvent:
|
||||
let leaf = constValMap.getOrDefault(get constVals)
|
||||
if not leaf.isNil:
|
||||
|
@ -137,13 +137,13 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
|
|||
for capturePaths, observerGroup in leaf.observerGroups.pairs:
|
||||
let captures = projectPaths(outerValue, capturePaths)
|
||||
if captures.isSome:
|
||||
modObs(turn, observerGroup, get captures)
|
||||
modObs(observerGroup, get captures)
|
||||
if leaf.isEmpty:
|
||||
constValMap.del(get constVals)
|
||||
|
||||
|
||||
proc walk(node: Node; turn: Turn; termStack: TermStack) =
|
||||
walk(node.continuation, turn)
|
||||
proc walk(node: Node; termStack: TermStack) =
|
||||
walk(node.continuation)
|
||||
for selector, table in node.edges:
|
||||
let
|
||||
nextStack = pop(termStack, selector.popCount)
|
||||
|
@ -153,11 +153,11 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
|
|||
if nextClass.kind != classNone:
|
||||
let nextNode = table.getOrDefault(nextClass)
|
||||
if not nextNode.isNil:
|
||||
walk(nextNode, turn, push(nextStack, get nextValue))
|
||||
walk(nextNode, push(nextStack, get nextValue))
|
||||
if event == removedEvent and nextNode.isEmpty:
|
||||
table.del(nextClass)
|
||||
|
||||
walk(node, turn, @[@[outerValue].toPreserves])
|
||||
walk(node, @[@[outerValue].toPreserves])
|
||||
|
||||
proc getOrNew[A, B, C](t: var Table[A, TableRef[B, C]], k: A): TableRef[B, C] =
|
||||
result = t.getOrDefault(k)
|
||||
|
@ -227,7 +227,7 @@ proc getEndpoints(leaf: Leaf; capturePaths: Paths): ObserverGroup =
|
|||
if captures.isSome:
|
||||
discard result.cachedCaptures.change(get captures, +1)
|
||||
|
||||
proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
||||
proc add*(index: var Index; pattern: Pattern; observer: Cap) =
|
||||
let
|
||||
cont = index.root.extend(pattern)
|
||||
analysis = analyse pattern
|
||||
|
@ -237,10 +237,10 @@ proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
|||
# TODO if endpoints.cachedCaptures.len > 0:
|
||||
var captureMap = newTable[seq[Value], Handle]()
|
||||
for capture in endpoints.cachedCaptures.items:
|
||||
captureMap[capture] = publish(turn, observer, capture.toPreserves)
|
||||
captureMap[capture] = publish(observer, capture.toPreserves)
|
||||
endpoints.observers[observer] = captureMap
|
||||
|
||||
proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
||||
proc remove*(index: var Index; pattern: Pattern; observer: Cap) =
|
||||
let
|
||||
cont = index.root.extend(pattern)
|
||||
analysis = analyse pattern
|
||||
|
@ -252,7 +252,7 @@ proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
|||
if not endpoints.isNil:
|
||||
var captureMap: TableRef[seq[Value], Handle]
|
||||
if endpoints.observers.pop(observer, captureMap):
|
||||
for handle in captureMap.values: retract(turn, handle)
|
||||
for handle in captureMap.values: retract(observer, handle)
|
||||
if endpoints.observers.len == 0:
|
||||
leaf.observerGroups.del(analysis.capturePaths)
|
||||
if leaf.observerGroups.len == 0:
|
||||
|
@ -260,7 +260,7 @@ proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
|||
if constValMap.len == 0:
|
||||
cont.leafMap.del(analysis.constPaths)
|
||||
|
||||
proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int): bool =
|
||||
proc adjustAssertion(index: var Index; outerValue: Value; delta: int): bool =
|
||||
case index.allAssertions.change(outerValue, delta)
|
||||
of cdAbsentToPresent:
|
||||
result = true
|
||||
|
@ -268,37 +268,37 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int
|
|||
c.cache.incl(v)
|
||||
proc modLeaf(l: Leaf; v: Value) =
|
||||
l.cache.incl(v)
|
||||
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
proc modObserver(group: ObserverGroup; vs: seq[Value]) =
|
||||
let change = group.cachedCaptures.change(vs, +1)
|
||||
if change == cdAbsentToPresent:
|
||||
for (observer, captureMap) in group.observers.pairs:
|
||||
captureMap[vs] = publish(turn, observer, vs.toPreserves)
|
||||
captureMap[vs] = publish(observer, vs.toPreserves)
|
||||
# TODO: this handle is coming from the facet?
|
||||
modify(index.root, turn, outerValue, addedEvent, modContinuation, modLeaf, modObserver)
|
||||
modify(index.root, outerValue, addedEvent, modContinuation, modLeaf, modObserver)
|
||||
of cdPresentToAbsent:
|
||||
result = true
|
||||
proc modContinuation(c: Continuation; v: Value) =
|
||||
c.cache.excl(v)
|
||||
proc modLeaf(l: Leaf; v: Value) =
|
||||
l.cache.excl(v)
|
||||
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
proc modObserver(group: ObserverGroup; vs: seq[Value]) =
|
||||
if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
|
||||
for (observer, captureMap) in group.observers.pairs:
|
||||
var h: Handle
|
||||
if captureMap.take(vs, h):
|
||||
retract(observer.target, turn, h)
|
||||
modify(index.root, turn, outerValue, removedEvent, modContinuation, modLeaf, modObserver)
|
||||
retract(observer, h)
|
||||
modify(index.root, outerValue, removedEvent, modContinuation, modLeaf, modObserver)
|
||||
else: discard
|
||||
|
||||
proc continuationNoop(c: Continuation; v: Value) = discard
|
||||
proc leafNoop(l: Leaf; v: Value) = discard
|
||||
|
||||
proc add*(index: var Index; turn: Turn; v: Value): bool =
|
||||
adjustAssertion(index, turn, v, +1)
|
||||
proc remove*(index: var Index; turn: Turn; v: Value): bool =
|
||||
adjustAssertion(index, turn, v, -1)
|
||||
proc add*(index: var Index; v: Value): bool =
|
||||
adjustAssertion(index, v, +1)
|
||||
proc remove*(index: var Index; v: Value): bool =
|
||||
adjustAssertion(index, v, -1)
|
||||
|
||||
proc deliverMessage*(index: var Index; turn: Turn; v: Value) =
|
||||
proc observersCb(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
for observer in group.observers.keys: message(turn, observer, vs.toPreserves)
|
||||
index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb)
|
||||
proc deliverMessage*(index: var Index; v: Value) =
|
||||
proc observersCb(group: ObserverGroup; vs: seq[Value]) =
|
||||
for observer in group.observers.keys: message(observer, vs.toPreserves)
|
||||
index.root.modify(v, messageEvent, continuationNoop, leafNoop, observersCb)
|
|
@ -10,9 +10,9 @@ import pkg/cps
|
|||
import preserves
|
||||
export preserves
|
||||
|
||||
import ./syndicate/[actors, dataspaces, patterns]
|
||||
import ./[actors, dataspaces, patterns]
|
||||
# durings
|
||||
import ./syndicate/protocols/dataspace
|
||||
import ./protocols/dataspace
|
||||
|
||||
export actors, dataspace, dataspaces, patterns
|
||||
|
||||
|
@ -37,22 +37,26 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline.
|
|||
patterns.inject(pat, bindings)
|
||||
|
||||
type
|
||||
PublishProc = proc (turn: Turn; v: Value; h: Handle) {.closure.}
|
||||
RetractProc = proc (turn: Turn; h: Handle) {.closure.}
|
||||
MessageProc = proc (turn: Turn; v: Value) {.closure.}
|
||||
PublishProc = proc (v: Value; h: Handle) {.closure.}
|
||||
RetractProc = proc (h: Handle) {.closure.}
|
||||
MessageProc = proc (v: Value) {.closure.}
|
||||
|
||||
ClosureEntity = ref object of Entity
|
||||
publishImpl*: PublishProc
|
||||
retractImpl*: RetractProc
|
||||
messageImpl*: MessageProc
|
||||
publishCb*: PublishProc
|
||||
retractCb*: RetractProc
|
||||
messageCb*: MessageProc
|
||||
|
||||
method publish(e: ClosureEntity; turn: Turn; a: AssertionRef; h: Handle) =
|
||||
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
|
||||
proc publishCont(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
|
||||
var ce = ClosureEntity(e)
|
||||
if not ce.publishCb.isNil: ce.publishCb(v, h)
|
||||
|
||||
method retract(e: ClosureEntity; turn: Turn; h: Handle) =
|
||||
if not e.retractImpl.isNil: e.retractImpl(turn, h)
|
||||
proc retractCont(e: Entity; h: Handle) {.cps: Cont.} =
|
||||
var ce = ClosureEntity(e)
|
||||
if not ce.retractCb.isNil: ce.retractCb(h)
|
||||
|
||||
method message(e: ClosureEntity; turn: Turn; a: AssertionRef) =
|
||||
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
|
||||
proc messageCont(e: Entity; v: Value) {.cps: Cont.} =
|
||||
var ce = ClosureEntity(e)
|
||||
if not ce.messageCb.isNil: ce.messageCb(v)
|
||||
|
||||
proc argumentCount(handler: NimNode): int =
|
||||
handler.expectKind {nnkDo, nnkStmtList}
|
||||
|
@ -108,7 +112,7 @@ proc wrapMessageHandler(handler: NimNode): NimNode =
|
|||
handlerSym = genSym(nskProc, "message")
|
||||
bindingsSym = ident"bindings"
|
||||
quote do:
|
||||
proc `handlerSym`(turn: Turn; `bindingsSym`: Value) =
|
||||
proc `handlerSym`(`bindingsSym`: Value) =
|
||||
`varSection`
|
||||
if fromPreserves(`valuesSym`, bindings):
|
||||
`body`
|
||||
|
@ -150,9 +154,8 @@ macro onPublish*(ds: Cap; pattern: Pattern; handler: untyped) =
|
|||
discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`))
|
||||
]#
|
||||
|
||||
#[
|
||||
macro onMessage*(ds: Cap; pattern: Pattern; handler: untyped) =
|
||||
## Call `handler` when an message matching `pattern` is broadcasted at `ds`.
|
||||
macro onMessage*(cap: Cap; pattern: Pattern; handler: untyped) =
|
||||
## Call `handler` when an message matching `pattern` is broadcasted at `cap`.
|
||||
let
|
||||
argCount = argumentCount(handler)
|
||||
handlerProc = wrapMessageHandler(handler)
|
||||
|
@ -161,8 +164,10 @@ macro onMessage*(ds: Cap; pattern: Pattern; handler: untyped) =
|
|||
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
|
||||
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
|
||||
`handlerProc`
|
||||
discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`))
|
||||
]#
|
||||
discard observe(`cap`, `pattern`, ClosureEntity(
|
||||
messageImpl: whelp messageCont,
|
||||
messageCb: `handlerSym`,
|
||||
))
|
||||
|
||||
#[
|
||||
macro during*(ds: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
|
||||
|
@ -202,15 +207,15 @@ proc wrapHandler(body: NimNode; ident: string): NimNode =
|
|||
proc `sym`() =
|
||||
`body`
|
||||
|
||||
macro onStop*(body: untyped) =
|
||||
#[
|
||||
macro onStop*(facet: Facet; body: untyped) =
|
||||
let
|
||||
handlerDef = wrapHandler(body, "onStop")
|
||||
handlerSym = handlerDef[0]
|
||||
result = quote do:
|
||||
`handlerDef`
|
||||
addOnStopHandler(`handlerSym`)
|
||||
addOnStopHandler(facet, `handlerSym`)
|
||||
|
||||
#[
|
||||
macro onStop*(body: untyped) =
|
||||
quote do:
|
||||
block:
|
|
@ -0,0 +1,19 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import ./protocols/[protocol, trace]
|
||||
export trace
|
||||
|
||||
proc traceAction*(e: protocol.Event): trace.TurnEvent =
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
|
||||
)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
act.enqueue.event.detail.assert = TurnEventAssert(
|
||||
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
|
||||
handle: result,
|
||||
)
|
||||
act.enqueue.event.detail.assert.assertion.value.value = val
|
||||
turn.desc.actions.add act
|
|
@ -1,347 +0,0 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[deques, hashes, options, times]
|
||||
import pkg/cps
|
||||
import preserves
|
||||
import ../syndicate/protocols/[protocol, sturdy]
|
||||
|
||||
# const traceSyndicate {.booldefine.}: bool = true
|
||||
const traceSyndicate* = true
|
||||
|
||||
when traceSyndicate:
|
||||
import std/streams
|
||||
from std/os import getEnv
|
||||
import ./protocols/trace
|
||||
|
||||
export protocol.Handle
|
||||
|
||||
type
|
||||
Cont* = ref object of Continuation
|
||||
turn: Turn
|
||||
facet: Facet
|
||||
|
||||
Handler* = proc() {.closure.}
|
||||
|
||||
Work = Deque[Cont]
|
||||
HandlerDeque = seq[ContinuationProc[Continuation]]
|
||||
|
||||
FacetState = enum fFresh, fRunning, fEnded
|
||||
|
||||
Callback* = proc () {.nimcall.}
|
||||
|
||||
Facet* = ref object
|
||||
## https://synit.org/book/glossary.html#facet
|
||||
actor: Actor
|
||||
parent: Facet
|
||||
children: seq[Facet]
|
||||
stopHandlers: HandlerDeque
|
||||
stopCallbacks: seq[Callback]
|
||||
state: FacetState
|
||||
when traceSyndicate:
|
||||
id: FacetId
|
||||
|
||||
Turn* = ref object
|
||||
## https://synit.org/book/glossary.html#turn
|
||||
facet: Facet
|
||||
entity: Entity
|
||||
event: Option[protocol.Event]
|
||||
work: Work
|
||||
when traceSyndicate:
|
||||
desc: TurnDescription
|
||||
|
||||
Entity* = ref object of RootObj
|
||||
## https://synit.org/book/glossary.html#entity
|
||||
facet*: Facet
|
||||
oid*: sturdy.Oid # oid is how Entities are identified over the wire
|
||||
when traceSyndicate:
|
||||
id: FacetId
|
||||
|
||||
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
|
||||
relay*: Facet
|
||||
target*: Entity
|
||||
attenuation*: seq[sturdy.Caveat]
|
||||
|
||||
Actor* = ref object
|
||||
## https://synit.org/book/glossary.html#actor
|
||||
# crashHandlers: HandlerDeque
|
||||
root: Facet
|
||||
handleAllocator: Handle
|
||||
facetIdAllocator: int
|
||||
id: ActorId
|
||||
when traceSyndicate:
|
||||
traceStream: FileStream
|
||||
stopped: bool
|
||||
|
||||
template syndicate*(prc: typed): untyped =
|
||||
cps(Cont, prc)
|
||||
|
||||
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
|
||||
## Return the active `Turn` within a `{.syndicate.}` context.
|
||||
assert not c.turn.isNil
|
||||
c.turn
|
||||
|
||||
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
|
||||
## Return the active `Facet` within a `{.syndicate.}` context.
|
||||
assert not c.facet.isNil
|
||||
c.facet
|
||||
|
||||
proc activeActor*(c: Cont): Actor {.cpsVoodoo.} =
|
||||
## Return the active `Actor` within a `{.syndicate.}` context.
|
||||
assert not c.turn.isNil
|
||||
c.facet.actor
|
||||
|
||||
using
|
||||
actor: Actor
|
||||
facet: Facet
|
||||
entity: Entity
|
||||
cap: Cap
|
||||
turn: Turn
|
||||
|
||||
proc hash*(facet): Hash = facet.unsafeAddr.hash
|
||||
proc hash*(cap): Hash = cap.unsafeAddr.hash
|
||||
|
||||
proc newFacet(actor: Actor; parent: Facet): Facet =
|
||||
inc(actor.facetIdAllocator)
|
||||
result = Facet(
|
||||
actor: actor,
|
||||
parent: parent,
|
||||
id: actor.facetIdAllocator.toPreserves,
|
||||
)
|
||||
|
||||
proc stopped*(facet): bool = facet.state != fRunning
|
||||
|
||||
proc newActor(name: string): Actor =
|
||||
result = Actor(id: name.toPreserves)
|
||||
result.root = newFacet(result, nil)
|
||||
when traceSyndicate:
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE", "")
|
||||
case path
|
||||
of "": discard
|
||||
of "-": result.traceStream = newFileStream(stderr)
|
||||
else: result.traceStream = openFileStream(path, fmWrite)
|
||||
|
||||
when traceSyndicate:
|
||||
proc trace(actor; act: ActorActivation) =
|
||||
if not actor.traceStream.isNil:
|
||||
var entry = TraceEntry(
|
||||
timestamp: getTime().toUnixFloat(),
|
||||
actor: actor.id,
|
||||
item: act,
|
||||
)
|
||||
actor.traceStream.writeLine($entry.toPreserves)
|
||||
|
||||
proc trace(actor; turn) =
|
||||
if not actor.traceStream.isNil:
|
||||
actor.trace(ActorActivation(
|
||||
orKind: ActorActivationKind.turn,
|
||||
turn: turn.desc,
|
||||
))
|
||||
|
||||
proc traceTarget(cap): trace.Target =
|
||||
let facet = cap.relay
|
||||
Target(
|
||||
actor: facet.actor.id,
|
||||
facet: facet.id,
|
||||
oid: cap.target.oid.toPreserves,
|
||||
)
|
||||
|
||||
proc traceTarget(turn): trace.Target =
|
||||
let facet = turn.facet
|
||||
Target(
|
||||
actor: facet.actor.id,
|
||||
facet: facet.id,
|
||||
)
|
||||
|
||||
proc newExternalTurn(facet): Turn =
|
||||
result = Turn(facet: facet)
|
||||
when traceSyndicate:
|
||||
result.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
|
||||
|
||||
proc pass*(a, b: Cont): Cont =
|
||||
b.turn = a.turn
|
||||
if b.facet.isNil:
|
||||
b.facet = a.facet
|
||||
# TODO: whelp a new continuation at facet boundaries?
|
||||
b
|
||||
|
||||
proc queue(t: Turn; c: Cont) =
|
||||
c.facet = t.facet
|
||||
t.work.addLast(c)
|
||||
|
||||
proc queue(c: Cont): Cont {.cpsMagic.} =
|
||||
queue(c.turn, c)
|
||||
nil
|
||||
|
||||
proc complete(turn; c: Cont) =
|
||||
var c = c
|
||||
try:
|
||||
while not c.isNil and not c.fn.isNil:
|
||||
c.turn = turn
|
||||
var y = c.fn
|
||||
var x = y(c)
|
||||
c = Cont(x)
|
||||
except CatchableError as err:
|
||||
if not c.dismissed:
|
||||
writeStackFrames c
|
||||
# terminate(c.facet, err)
|
||||
|
||||
proc run(turn) =
|
||||
let actor = turn.facet.actor
|
||||
assert not actor.stopped
|
||||
while turn.work.len > 0:
|
||||
complete(turn, turn.work.popFirst())
|
||||
when traceSyndicate:
|
||||
actor.trace(turn)
|
||||
if actor.stopped:
|
||||
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
|
||||
|
||||
proc start(actor; cont: Cont) =
|
||||
when traceSyndicate:
|
||||
var act = ActorActivation(orkind: ActorActivationKind.start)
|
||||
trace(actor, act)
|
||||
actor.root.state = fRunning
|
||||
let turn = actor.root.newExternalTurn()
|
||||
turn.queue(cont)
|
||||
run(turn)
|
||||
|
||||
proc stop(turn; actor)
|
||||
|
||||
proc collectPath(result: var seq[FacetId]; facet) =
|
||||
if not facet.parent.isNil:
|
||||
collectPath(result, facet.parent)
|
||||
result.add(facet.id)
|
||||
|
||||
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
|
||||
c.fn = facet.stopHandlers.pop()
|
||||
result = c
|
||||
|
||||
proc runNextFacetStop() {.syndicate.} =
|
||||
activeFacet().runNextStop()
|
||||
|
||||
proc stop(turn; facet; reason: FacetStopReason) =
|
||||
while facet.stopHandlers.len > 0:
|
||||
var c = whelp runNextFacetStop()
|
||||
c.facet = facet
|
||||
complete(turn, c)
|
||||
while facet.stopCallbacks.len > 0:
|
||||
var cb = facet.stopCallbacks.pop()
|
||||
cb()
|
||||
while facet.children.len > 0:
|
||||
stop(turn, facet.children.pop(), FacetStopReason.parentStopping)
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
|
||||
collectPath(act.facetstop.path, facet)
|
||||
act.facetStop.reason = reason
|
||||
turn.desc.actions.add act
|
||||
if facet.parent.isNil:
|
||||
facet.actor.root = nil
|
||||
stop(turn, facet.actor)
|
||||
|
||||
proc stop(turn; actor) =
|
||||
if not actor.root.isNil:
|
||||
stop(turn, actor.root, FacetStopReason.actorStopping)
|
||||
actor.stopped = true
|
||||
|
||||
proc bootActor*(name: string, c: Cont) =
|
||||
start(newActor(name), c)
|
||||
|
||||
proc stopActor(c: Cont; a: Actor): Cont {.cpsMagic.} =
|
||||
stop(c.turn, a)
|
||||
nil
|
||||
|
||||
proc stopFacet(c: Cont; f: Facet): Cont {.cpsMagic.} =
|
||||
stop(c.turn, f, FacetStopReason.explicitAction)
|
||||
nil
|
||||
|
||||
proc stopFacet*() {.syndicate.} =
|
||||
queue()
|
||||
stop(activeTurn(), activeFacet(), FacetStopReason.explicitAction)
|
||||
|
||||
proc stopActor*() {.syndicate.} =
|
||||
queue()
|
||||
stop(activeTurn(), activeActor())
|
||||
|
||||
type
|
||||
AssertionRef* = ref object
|
||||
value*: Value
|
||||
# if the Enity methods take a Value object then the generated
|
||||
# C code has "redefinition of struct" problems when orc is enabled
|
||||
|
||||
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
||||
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
|
||||
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
|
||||
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
|
||||
|
||||
proc newCap*(f: Facet; e: Entity): Cap =
|
||||
Cap(relay: f, target: e)
|
||||
|
||||
proc nextHandle(facet: Facet): Handle =
|
||||
inc(facet.actor.handleAllocator)
|
||||
facet.actor.handleAllocator
|
||||
|
||||
proc publish*(turn: Turn; cap: Cap; val: Value): Handle =
|
||||
result = turn.facet.nextHandle()
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
|
||||
)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
act.enqueue.event.detail.assert = TurnEventAssert(
|
||||
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
|
||||
handle: result,
|
||||
)
|
||||
act.enqueue.event.detail.assert.assertion.value.value = val
|
||||
turn.desc.actions.add act
|
||||
|
||||
proc retract*(turn; h: Handle) =
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event = TargetedTurnEvent(
|
||||
target: turn.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
|
||||
)
|
||||
act.enqueue.event.detail.retract = TurnEventRetract(handle: h)
|
||||
turn.desc.actions.add act
|
||||
|
||||
proc message*(turn; cap; val: Value) =
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event = TargetedTurnEvent(
|
||||
target: turn.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
|
||||
)
|
||||
act.enqueue.event.detail.message.body.value.value = val
|
||||
turn.desc.actions.add act
|
||||
|
||||
proc sync*(turn; peer: Cap) =
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event = TargetedTurnEvent(
|
||||
target: turn.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
|
||||
)
|
||||
act.enqueue.event.detail.sync.peer = peer.traceTarget
|
||||
turn.desc.actions.add act
|
||||
|
||||
proc publish*(cap: Cap; val: Value): Handle {.syndicate.} =
|
||||
publish(activeTurn(), cap, val)
|
||||
|
||||
proc retract*(h: Handle) {.syndicate.} =
|
||||
activeTurn().retract(h)
|
||||
|
||||
proc message*(cap: Cap; val: Value) {.syndicate.} =
|
||||
activeTurn().message(cap, val)
|
||||
|
||||
proc sync*(cap: Cap) {.syndicate.} =
|
||||
activeTurn().sync(cap)
|
||||
|
||||
proc installStopHook*(c: Cont, facet: Facet): Cont {.cpsMagic.} =
|
||||
facet.stopHandlers.add(c.fn)
|
||||
return c
|
||||
|
||||
proc addOnStopHandler*(c: Cont; cb: Callback): Cont {.cpsMagic.} =
|
||||
c.facet.stopCallbacks.add(cb)
|
||||
result = c
|
|
@ -1,41 +0,0 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, options, tables]
|
||||
import pkg/cps
|
||||
import preserves
|
||||
import ./actors, ./protocols/dataspace, ./skeletons
|
||||
|
||||
from ./protocols/protocol import Handle
|
||||
from ./protocols/dataspace import Observe
|
||||
|
||||
type
|
||||
Dataspace {.final.} = ref object of Entity
|
||||
index: Index
|
||||
handleMap: Table[Handle, Value]
|
||||
|
||||
method publish(ds: Dataspace; turn: Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
||||
if add(ds.index, turn, a.value):
|
||||
var obs = a.value.preservesTo(Observe)
|
||||
if obs.isSome and obs.get.observer of Cap:
|
||||
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||
ds.handleMap[h] = a.value
|
||||
|
||||
method retract(ds: Dataspace; turn: Turn; h: Handle) {.gcsafe.} =
|
||||
let v = ds.handleMap[h]
|
||||
if remove(ds.index, turn, v):
|
||||
ds.handleMap.del h
|
||||
var obs = v.preservesTo(Observe)
|
||||
if obs.isSome and obs.get.observer of Cap:
|
||||
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||
|
||||
method message(ds: Dataspace; turn: Turn; a: AssertionRef) {.gcsafe.} =
|
||||
ds.index.deliverMessage(turn, a.value)
|
||||
|
||||
proc newDataspace*(f: Facet): Cap =
|
||||
newCap(f, Dataspace(index: initIndex()))
|
||||
|
||||
type BootProc = proc (ds: Cap)
|
||||
|
||||
proc newDataspace*(): Cap {.syndicate.} =
|
||||
activeFacet().newDataspace()
|
|
@ -1,64 +0,0 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, tables]
|
||||
import preserves
|
||||
import ./actors, ./patterns, ./protocols/dataspace
|
||||
|
||||
type
|
||||
DuringProc* = proc (a: Value; h: Handle)
|
||||
DuringActionKind = enum null, dead, act
|
||||
DuringAction = object
|
||||
case kind: DuringActionKind
|
||||
of null, dead: discard
|
||||
of act:
|
||||
action: Cont
|
||||
DuringEntity {.final.}= ref object of Entity
|
||||
cb: DuringProc
|
||||
assertionMap: Table[Handle, DuringAction]
|
||||
|
||||
method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) =
|
||||
let action = de.cb(turn, a.value, h)
|
||||
# assert(not action.isNil "should have put in a no-op action")
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: act, action: action)
|
||||
of dead:
|
||||
de.assertionMap.del h
|
||||
freshen(turn, action)
|
||||
of act:
|
||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||
|
||||
method retract(de: DuringEntity; turn: Turn; h: Handle) =
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: dead)
|
||||
of dead:
|
||||
raiseAssert("during: duplicate handle in retract: " & $h)
|
||||
of act:
|
||||
de.assertionMap.del h
|
||||
if not g.action.isNil:
|
||||
turn.queue(g.action)
|
||||
|
||||
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
|
||||
|
||||
proc observe*(turn: Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
|
||||
publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)).toPreserves)
|
||||
|
||||
proc assertHandle(turn: Turn): Handle =
|
||||
doAssert(
|
||||
turn.event.isSome and turn.event.get.orKind == EventKind.Assert,
|
||||
"operation not valid during this turn")
|
||||
turn.event.get.assert.handle
|
||||
|
||||
proc awaitRetraction(cont: Cont): Cont {.cpsMagic.} =
|
||||
{.error: "this cannot work".}
|
||||
let h = cont.turn.assertHandle
|
||||
while true:
|
||||
let turn = cont.turn
|
||||
if turn.retracts(h):
|
||||
return c
|
||||
else:
|
||||
turn.facet.actor.qeueue(c)
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240216"
|
||||
version = "20240219"
|
||||
author = "Emery Hemingway"
|
||||
description = "Syndicated actors for conversational concurrency"
|
||||
license = "Unlicense"
|
||||
|
|
|
@ -23,17 +23,17 @@ proc readStdin(facet: Facet; ds: Cap; username: string) =
|
|||
readLine()
|
||||
readLine()
|
||||
|
||||
proc chat(turn: var Turn; ds: Cap; username: string) =
|
||||
during(turn, ds, ?:Present) do (who: string):
|
||||
proc chat(facet: Facet; ds: Cap; username: string) =
|
||||
during(facet, ds, ?:Present) do (who: string):
|
||||
echo who, " joined"
|
||||
do:
|
||||
echo who, " left"
|
||||
|
||||
onMessage(turn, ds, ?:Says) do (who: string, what: string):
|
||||
onMessage(facet, ds, ?:Says) do (who: string, what: string):
|
||||
echo who, ": ", what
|
||||
|
||||
discard publish(turn, ds, Present(username: username))
|
||||
readStdin(turn.facet, ds, username)
|
||||
discard publish(facet, ds, Present(username: username))
|
||||
readStdin(facet, ds, username)
|
||||
|
||||
proc main =
|
||||
let route = envRoute()
|
||||
|
@ -48,9 +48,10 @@ proc main =
|
|||
if username == "":
|
||||
stderr.writeLine "--user: unspecified"
|
||||
else:
|
||||
runActor("chat") do (turn: var Turn; root: Cap):
|
||||
spawnRelays(turn, root)
|
||||
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
||||
chat(turn, ds, username)
|
||||
runActor("chat") do (root: Facet):
|
||||
let ds = facet.newDataspace()
|
||||
facet.spawnRelays(ds)
|
||||
resolve(facet, ds, route) do (remote: Cap):
|
||||
chat(facet, remote, username)
|
||||
|
||||
main()
|
||||
|
|
|
@ -3,28 +3,34 @@
|
|||
|
||||
import std/times
|
||||
import pkg/cps
|
||||
import syndicate
|
||||
import sam/syndicate
|
||||
|
||||
# import syndicate/actors/timers
|
||||
import sam/actors/timers
|
||||
|
||||
proc now: float64 = getTime().toUnixFloat()
|
||||
|
||||
proc main() {.syndicate.} =
|
||||
let ds = newDataspace()
|
||||
let h = publish(ds, "hello world!".toPreserves)
|
||||
runActor("timer-test") do (root: Facet):
|
||||
echo "actor calls boot proc with root facte ", root
|
||||
|
||||
#onMessage(ds, grab()) do (v: Value):
|
||||
# stderr.writeLine "observed message ", v
|
||||
let ds = root.newDataspace()
|
||||
echo "new dataspace ", ds
|
||||
let h = publish(ds, "hello world!".toPreserves)
|
||||
echo "published to handle ", h
|
||||
|
||||
onMessage(ds, grab()) do (v: Value):
|
||||
stderr.writeLine "observed message ", v
|
||||
|
||||
message(ds, "hello world!".toPreserves)
|
||||
retract(h)
|
||||
sync(ds)
|
||||
echo "sent a message"
|
||||
retract(ds, h)
|
||||
echo "retracted handle ", h
|
||||
# facet.sync(ds)
|
||||
|
||||
onStop:
|
||||
root.onStop:
|
||||
echo "anonymous stop handler was invoked"
|
||||
|
||||
echo "stopping actor"
|
||||
stopActor()
|
||||
root.stopActor()
|
||||
echo "actor stopped but still executing?"
|
||||
|
||||
#[
|
||||
|
@ -38,5 +44,3 @@ proc main() {.syndicate.} =
|
|||
stderr.writeLine "slept one second thrice"
|
||||
stopActor()
|
||||
]#
|
||||
|
||||
bootActor("main", whelp main())
|
||||
|
|
Loading…
Reference in New Issue