# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[deques, hashes, monotimes, options, sets, sequtils, tables, times] import pkg/cps import preserves import ../syndicate/protocols/[protocol, sturdy] export cps const traceSyndicate {.booldefine.}: bool = true when traceSyndicate: import std/streams from std/os import getEnv import ./protocols/trace type TraceSink = ref object stream: FileStream proc newTraceSink: TraceSink = new result let path = getEnv("SYNDICATE_TRACE_FILE", "") case path of "": quit"$SYNDICATE_TRACE_FILE unset" of "-": actor.stream = newFileStream(stderr) else: result.stream = openFileStream(path, fmWrite) proc write(s: TraceSink; e: TraceEntry) = s.write(e.toPreserves) export Handle template generateIdType(typ: untyped) = type typ* = distinct Natural proc `==`*(x, y: typ): bool {.borrow.} proc `$`*(id: typ): string {.borrow.} generateIdType(ActorId) generateIdType(FacetId) generateIdType(EndpointId) generateIdType(FieldId) generateIdType(TurnId) type Oid = sturdy.Oid Caveat = sturdy.Caveat Attenuation = seq[Caveat] Rewrite = sturdy.Rewrite AssertionRef* = ref object value*: Value # if the Enity methods take a Value object then the generated # C code has "redefinition of struct" problems when orc is enabled Entity* = ref object of RootObj oid*: Oid # oid is how Entities are identified over the wire Cap* {.preservesEmbedded.} = ref object of EmbeddedObj relay*: Facet target*: Entity attenuation*: Attenuation Ref* {.deprecated: "Ref was renamed to Cap".} = Cap OutboundAssertion = ref object handle: Handle peer: Cap established: bool OutboundTable = Table[Handle, OutboundAssertion] Actor* = ref object name: string handleAllocator: ref Handle # a fresh actor gets a new ref Handle and # all actors spawned from it get the same ref. root: Facet exitReason: ref Exception exitHooks: seq[TurnAction] id: ActorId exiting, exited: bool when traceSyndicate: turnIdAllocator: ref TurnId traceStream: FileStream TurnAction* = proc (t: Turn) Queues = TableRef[Facet, Deque[Cont]] Turn* = ref object facet: Facet queues: Queues when traceSyndicate: desc: TurnDescription Cont* = ref object of Continuation turn*: Turn Facet* = ref FacetObj FacetObj = object actor*: Actor parent: Facet children: HashSet[Facet] outbound: OutboundTable shutdownActions: seq[TurnAction] inertCheckPreventers: int id: FacetId isAlive: bool proc pass*(a, b: Cont): Cont = assert not a.turn.isNil b.turn = move a.turn return b template turnAction*(prc: typed): untyped = cps(Cont, prc) proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} = assert not c.turn.isNil c.turn when traceSyndicate: proc nextTurnId(facet: Facet): TurnId = result = succ(facet.actor.turnIdAllocator[]) facet.actor.turnIdAllocator[] = result proc trace(actor: Actor; act: ActorActivation) = assert not actor.traceStream.isNil var entry = TraceEntry( timestamp: getTime().toUnixFloat(), actor: initRecord("named", actor.name.toPreserves), item: act) actor.traceStream.writeLine($entry.toPreserves) proc path(facet: Facet): seq[trace.FacetId] = var f = facet while not f.isNil: result.add f.id.toPreserves f = f.parent method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard using actor: Actor facet: Facet turn: Turn action: TurnAction proc labels(f: Facet): string = proc catLabels(f: Facet; labels: var string) = labels.add ':' if not f.parent.isNil: catLabels(f.parent, labels) labels.add ':' when traceSyndicate: labels.add $f.id result.add f.actor.name catLabels(f, result) proc `$`*(f: Facet): string = "" proc `$`*(r: Cap): string = "" proc `$`*(actor: Actor): string = "" # TODO: ambigous proc attenuate(r: Cap; a: Attenuation): Cap = if a.len == 0: result = r else: result = Cap( relay: r.relay, target: r.target, attenuation: a & r.attenuation) proc hash*(facet): Hash = facet.id.hash proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash) proc nextHandle(facet: Facet): Handle = result = succ(facet.actor.handleAllocator[]) facet.actor.handleAllocator[] = result proc facet*(turn: Turn): Facet = turn.facet proc enqueue(turn: Turn; target: Facet; cont: Cont) = cont.turn = turn if target in turn.queues: turn.queues[target].addLast cont else: turn.queues[target] = toDeque([cont]) type Bindings = Table[Value, Value] proc match(bindings: var Bindings; p: Pattern; v: Value): bool = case p.orKind of PatternKind.Pdiscard: result = true of PatternKind.Patom: result = case p.patom of PAtom.Boolean: v.isBoolean of PAtom.Double: v.isDouble of PAtom.Signedinteger: v.isInteger of PAtom.String: v.isString of PAtom.Bytestring: v.isByteString of PAtom.Symbol: v.isSymbol of PatternKind.Pembedded: result = v.isEmbedded of PatternKind.Pbind: if match(bindings, p.pbind.pattern, v): bindings[p.pbind.pattern.toPreserves] = v result = true of PatternKind.Pand: for pp in p.pand.patterns: result = match(bindings, pp, v) if not result: break of PatternKind.Pnot: var b: Bindings result = not match(b, p.pnot.pattern, v) of PatternKind.Lit: result = p.lit.value == v of PatternKind.PCompound: case p.pcompound.orKind of PCompoundKind.rec: if v.isRecord and p.pcompound.rec.label == v.label and p.pcompound.rec.fields.len == v.arity: result = true for i, pp in p.pcompound.rec.fields: if not match(bindings, pp, v[i]): result = false break of PCompoundKind.arr: if v.isSequence and p.pcompound.arr.items.len == v.sequence.len: result = true for i, pp in p.pcompound.arr.items: if not match(bindings, pp, v[i]): result = false break of PCompoundKind.dict: if v.isDictionary: result = true for key, pp in p.pcompound.dict.entries: let vv = step(v, key) if vv.isNone or not match(bindings, pp, get vv): result = true break proc match(p: Pattern; v: Value): Option[Bindings] = var b: Bindings if match(b, p, v): result = some b proc instantiate(t: Template; bindings: Bindings): Value = case t.orKind of TemplateKind.Tattenuate: let v = instantiate(t.tattenuate.template, bindings) let cap = v.unembed(Cap) if cap.isNone: raise newException(ValueError, "Attempt to attenuate non-capability") result = attenuate(get cap, t.tattenuate.attenuation).embed of TemplateKind.TRef: let n = $t.tref.binding.int try: result = bindings[n.toPreserves] except KeyError: raise newException(ValueError, "unbound reference: " & n) of TemplateKind.Lit: result = t.lit.value of TemplateKind.Tcompound: case t.tcompound.orKind of TCompoundKind.rec: result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len) for i, tt in t.tcompound.rec.fields: result[i] = instantiate(tt, bindings) of TCompoundKind.arr: result = initSequence(t.tcompound.arr.items.len) for i, tt in t.tcompound.arr.items: result[i] = instantiate(tt, bindings) of TCompoundKind.dict: result = initDictionary() for key, tt in t.tcompound.dict.entries: result[key] = instantiate(tt, bindings) proc rewrite(r: Rewrite; v: Value): Value = let bindings = match(r.pattern, v) if bindings.isSome: result = instantiate(r.template, get bindings) proc examineAlternatives(cav: Caveat; v: Value): Value = case cav.orKind of CaveatKind.Rewrite: result = rewrite(cav.rewrite, v) of CaveatKind.Alts: for r in cav.alts.alternatives: result = rewrite(r, v) if not result.isFalse: break of CaveatKind.Reject: discard of CaveatKind.unknown: discard proc runRewrites*(a: Attenuation; v: Value): Value = result = v for stage in a: result = examineAlternatives(stage, result) if result.isFalse: break proc publish(target: Entity; e: OutboundAssertion; a: AssertionRef) {.turnAction.} = e.established = true publish(target, activeTurn(), a, e.handle) proc publish(turn: Turn; r: Cap; v: Value; h: Handle) = var a = runRewrites(r.attenuation, v) if not a.isFalse: let e = OutboundAssertion( handle: h, peer: r, established: false) turn.facet.outbound[h] = e enqueue(turn, r.relay, whelp publish(r.target, e, AssertionRef(value: a))) when traceSyndicate: var act = ActionDescription(orKind: ActionDescriptionKind.enqueue) act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves act.enqueue.event.target.facet = turn.facet.id.toPreserves act.enqueue.event.target.oid = r.target.oid.toPreserves act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert) act.enqueue.event.detail.assert.assertion.value.value = mapEmbeds(v) do (cap: Value) -> Value: discard act.enqueue.event.detail.assert.handle = h turn.desc.actions.add act proc publish*(turn: Turn; r: Cap; a: Value): Handle = result = turn.facet.nextHandle() publish(turn, r, a, result) proc publish*(r: Cap; a: Value): Handle {.turnAction.} = publish(activeTurn(), r, a, result) proc retract(e: OutboundAssertion) {.turnAction.} = if e.established: e.established = false e.peer.target.retract(activeTurn(), e.handle) proc retract(turn: Turn; e: OutboundAssertion) = enqueue(turn, e.peer.relay, whelp retract(e)) proc retract*(turn: Turn; h: Handle) = var e: OutboundAssertion if turn.facet.outbound.pop(h, e): turn.retract(e) proc message(target: Entity; a: AssertionRef) {.turnAction.} = target.message(activeTurn(), a) proc message*(turn: Turn; r: Cap; v: Value) = var a = runRewrites(r.attenuation, v) if not a.isFalse: enqueue(turn, r.relay, whelp message(r.target, AssertionRef(value: a))) proc message*(target: Cap; value: Value) {.turnAction.} = message(activeTurn(), target, value) proc sync(e: Entity; peer: Cap) {.turnAction.} = e.sync(activeTurn(), peer) proc sync(turn: Turn; e: Entity; peer: Cap) = e.sync(turn, peer) proc sync*(turn: Turn; r, peer: Cap) = enqueue(turn, r.relay, whelp sync(r.target, peer)) proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle = result = publish(turn, cap, v) if h != default(Handle): retract(turn, h) proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} = var old = h h = publish(turn, cap, v) if old != default(Handle): retract(turn, old) h proc stop*(turn: Turn) proc run*(facet; action: TurnAction; zombieTurn = false) proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet = result = Facet( id: getMonoTime().ticks.FacetId, actor: actor, parent: parent, outbound: initialAssertions, isAlive: true) if not parent.isNil: parent.children.incl result proc newFacet(actor; parent: Facet): Facet = var initialAssertions: OutboundTable newFacet(actor, parent, initialAssertions) proc isInert(facet): bool = result = facet.children.len == 0 and (facet.outbound.len == 0 or facet.parent.isNil) and facet.inertCheckPreventers == 0 proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} = var armed = true inc facet.inertCheckPreventers proc disarm() = if armed: armed = false dec facet.inertCheckPreventers result = disarm proc inFacet(turn: Turn; facet; act: TurnAction) = ## Call an action with a facet using a temporary `Turn` ## that shares the `Queues` of the calling `Turn`. var t = Turn(facet: facet, queues: turn.queues) act(t) proc terminate(actor; turn; reason: ref Exception) proc terminate(facet; turn: Turn; orderly: bool) = if facet.isAlive: facet.isAlive = false let parent = facet.parent if not parent.isNil: parent.children.excl facet block: var turn = Turn(facet: facet, queues: turn.queues) while facet.children.len > 0: facet.children.pop.terminate(turn, orderly) if orderly: for act in facet.shutdownActions: act(turn) for a in facet.outbound.values: turn.retract(a) if orderly: if not parent.isNil: if parent.isInert: parent.terminate(turn, true) else: terminate(facet.actor, turn, nil) when traceSyndicate: var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) act.facetstop.path = facet.path turn.desc.actions.add act proc stopIfInert() {.turnAction.} = let turn = activeTurn() if (not turn.facet.parent.isNil and (not turn.facet.parent.isAlive)) or turn.facet.isInert: stop(turn) proc stopIfInertAfter(action: TurnAction): TurnAction = proc wrapper(turn: Turn) = action(turn) enqueue(turn, turn.facet, whelp stopIfInert()) wrapper proc newFacet*(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet) proc inFacet*(turn: Turn; bootProc: TurnAction): Facet = result = newFacet(turn) when traceSyndicate: var act = ActionDescription(orKind: ActionDescriptionKind.facetstart) act.facetstart.path.add result.path turn.desc.actions.add act inFacet(turn, result, stopIfInertAfter(bootProc)) proc facet*(turn: Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc) proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) = run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc)) proc run(actor; bootProc: TurnAction) = var initialAssertions: OutboundTable run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc)) proc newActor(name: string; handleAlloc: ref Handle): Actor = let now = getTime() seed = now.toUnix * 1_000_000_000 + now.nanosecond result = Actor( name: name, id: ActorId(seed), handleAllocator: handleAlloc, ) result.root = newFacet(result, nil) when traceSyndicate: new result.turnIdAllocator proc newActor*(name: string): Actor = newActor(name, new(ref Handle)) proc bootActor*(name: string; bootProc: TurnAction) = var initialAssertions: OutboundTable actor = newActor(name) when traceSyndicate: let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr") case path of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name of "-": actor.traceStream = newFileStream(stderr) else: actor.traceStream = openFileStream(path, fmWrite) when traceSyndicate: var act = ActorActivation(orKind: ActorActivationKind.start) act.start.actorName = Name(orKind: NameKind.named) act.start.actorName.named.name = name.toPreserves var entry = TraceEntry( timestamp: getTime().toUnixFloat(), item: act) actor.traceStream.writeLine($entry.toPreserves) let turn = newTurn(actor, TurnCauseExternal(description: "top-level actor")) run(actor, bootProc, initialAssertions) proc bootActor*(name: string; cont: Cont) = bootActor(name) do (turn: Turn): enqueue(turn, turn.facet, cont) proc spawnActor(actor: Actor; bootProc: TurnAction; initialAssertions: HashSet[Handle]) {.turnAction.} = let turn = activeTurn() var newOutBound: Table[Handle, OutboundAssertion] for key in initialAssertions: discard turn.facet.outbound.pop(key, newOutbound[key]) when traceSyndicate: actor.turnIdAllocator = turn.facet.actor.turnIdAllocator actor.traceStream = turn.facet.actor.traceStream var act = ActionDescription(orKind: ActionDescriptionKind.spawn) act.spawn.id = actor.id.toPreserves turn.desc.actions.add act run(actor, bootProc, newOutBound) proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = let actor = newActor(name, turn.facet.actor.handleAllocator) enqueue(turn, turn.facet, whelp spawnActor(actor, bootProc, initialAssertions)) actor proc newInertCap*(): Cap = let a = newActor("inert") run(a) do (turn: Turn): turn.stop() Cap(relay: a.root) proc atExit*(actor; action) = actor.exitHooks.add action proc terminate(actor: Actor; orderly: bool) {.turnAction.} = actor.root.terminate(activeTurn(), orderly) actor.exited = true proc terminate(actor; turn; reason: ref Exception) = if not actor.exiting: actor.exiting = true actor.exitReason = reason when traceSyndicate: var act = ActorActivation(orKind: ActorActivationKind.stop) if not reason.isNil: act.stop.status = ExitStatus(orKind: ExitStatusKind.Error) act.stop.status.error.message = reason.msg trace(actor, act) for hook in actor.exitHooks: hook(turn) enqueue(turn, actor.root, whelp terminate(actor, reason.isNil)) proc terminate*(facet; e: ref Exception) = run(facet.actor.root) do (turn: Turn): facet.actor.terminate(turn, e) #[ proc asyncCheck*(facet: Facet; fut: FutureBase) = ## Sets a callback on `fut` which propagates exceptions to `facet`. addCallback(fut) do (): if fut.failed: terminate(facet, fut.error) proc asyncCheck*(turn; fut: FutureBase) = ## Sets a callback on `fut` which propagates exceptions to the facet of `turn`. asyncCheck(turn.facet, fut) ]# template tryFacet(facet; body: untyped) = try: body except CatchableError as err: terminate(facet, err) proc run(facet: Facet; turn: Turn; deq: var Deque[Cont]): int = ## Return the number of continuations processed. while deq.len > 0: var c = deq.popFirst() try: while not c.isNil and not c.fn.isNil: c.turn = turn var y = c.fn var x = y(c) inc(result) c = Cont(x) except CatchableError as err: if not c.dismissed: writeStackFrames c terminate(facet, err) stderr.writeLine("ran ", result, " continuations for ", facet) proc run*(facet; action: TurnAction; zombieTurn = false) = if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive): tryFacet(facet): var queues = newTable[Facet, Deque[Cont]]() var turn = Turn(facet: facet, queues: queues) action(turn) when traceSyndicate: turn.desc.id = facet.nextTurnId.toPreserves facet.actor.trace ActorActivation( orKind: ActorActivationKind.turn, turn: turn.desc) assert not turn.isNil var n = 1 while n > 0: n = 0 var facets = queues.keys.toSeq for facet in facets: n.inc run(facet, turn, queues[facet]) proc run*(cap: Cap; action: TurnAction) = ## Convenience proc to run a `TurnAction` in the scope of a `Cap`. run(cap.relay, action) #[ proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) = ## Add a callback to a `Future` that will be called at a later `Turn` ## within the context of `facet`. addCallback(fut) do (): if fut.failed: terminate(facet, fut.error) else: when traceSyndicate: run(facet) do (turn: Turn): turn.desc.cause = TurnCause(orKind: TurnCauseKind.external) turn.desc.cause.external.description = "Future".toPreserves act(turn) else: run(facet, act) proc addCallback*(fut: FutureBase; turn: Turn; act: TurnAction) = ## Add a callback to a `Future` that will be called at a later `Turn` ## with the same context as the current. if fut.failed: terminate(turn.facet, fut.error) elif fut.finished: enqueue(turn, turn.facet, act) else: addCallback(fut, turn.facet, act) proc addCallback*[T](fut: Future[T]; turn: Turn; act: proc (t: Turn, x: T) {.gcsafe.}) = addCallback(fut, turn) do (turn: Turn): if fut.failed: terminate(turn.facet, fut.error) else: when traceSyndicate: turn.desc.cause = TurnCause(orKind: TurnCauseKind.external) turn.desc.cause.external.description = "Future".toPreserves act(turn, read fut) ]# proc stop*(turn: Turn, facet: Facet) = if facet.parent.isNil: facet.terminate(turn, true) else: enqueue(turn, facet.parent, whelp terminate(facet.actor, true)) # TODO: terminate the actor? proc stop*(turn: Turn) = stop(turn, turn.facet) proc onStop*(facet: Facet; act: TurnAction) = ## Add a `proc (turn: Turn)` action to `facet` to be called as it stops. add(facet.shutdownActions, act) proc stopActor*(turn: Turn) = let actor = turn.facet.actor enqueue(turn, actor.root, whelp terminate(actor, true)) proc freshen*(turn: Turn, act: TurnAction) = assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn") run(turn.facet, act) proc newCap*(relay: Facet; e: Entity): Cap = Cap(relay: relay, target: e) proc newCap*(turn; e: Entity): Cap = Cap(relay: turn.facet, target: e) proc newCap*(e: Entity; turn): Cap = Cap(relay: turn.facet, target: e) type SyncContinuation {.final.} = ref object of Entity action: TurnAction method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) = entity.action(turn) proc sync*(turn: Turn; refer: Cap; act: TurnAction) = sync(turn, refer, newCap(turn, SyncContinuation(action: act))) proc running*(actor): bool = result = not actor.exited if not (result or actor.exitReason.isNil): raise actor.exitReason proc newCap*(e: Entity): Cap {.turnAction.} = Cap(relay: activeTurn().facet, target: e)