# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[deques, hashes, monotimes, options, sets, tables, times] import pkg/cps import pkg/sys/ioqueue import preserves import ../syndicate/protocols/[protocol, sturdy, trace] const tracing = defined(traceSyndicate) when tracing: import std/streams from std/os import getEnv export Handle type Oid = sturdy.Oid Caveat = sturdy.Caveat Attenuation = seq[Caveat] Rewrite = sturdy.Rewrite AssertionRef* = ref object value*: Value # if the Enity methods take a Value object then the generated # C code has "redefinition of struct" problems when orc is enabled Entity* = ref object of RootObj facet*: Facet oid*: Oid # oid is how Entities are identified over the wire Cap* {.preservesEmbedded.} = ref object of EmbeddedObj target*: Entity relay*: Facet # Entity has facet but a Cap is also scoped to a relay Facet attenuation*: Attenuation Ref* {.deprecated: "Ref was renamed to Cap".} = Cap OutboundAssertion = ref object handle: Handle peer: Cap established: bool OutboundTable = Table[Handle, OutboundAssertion] Actor* = ref object next: Actor name: string handleAllocator: ref Handle # a fresh actor gets a new ref Handle and # all actors spawned from it get the same ref. root: Facet exitReason: ref Exception exitHooks: seq[TurnAction] id: ActorId facetIdAllocator: uint exiting, exited: bool TurnAction* = proc (t: var Turn) {.closure.} Turn* = ref object facet: Facet # active facet that may change during a turn work: Deque[tuple[facet: Facet, act: TurnAction]] effects: Table[Actor, Turn] when tracing: desc: TurnDescription Facet* = ref FacetObj FacetObj = object actor*: Actor parent: Facet children: HashSet[Facet] outbound: OutboundTable shutdownActions: seq[TurnAction] inertCheckPreventers: int id: FacetId isAlive: bool var turnQueue {.threadvar.}: Deque[Turn] when tracing: proc openTraceStream: FileStream = let path = getEnv("SYNDICATE_TRACE_FILE") case path of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset" of "-": result = newFileStream(stderr) else: result = openFileStream(path, fmWrite) let traceStream = openTraceStream() var turnIdAllocator: uint proc nextTurnId(): TurnId = inc(turnIdAllocator) turnIdAllocator.toPreserves proc trace(actor: Actor; act: ActorActivation) = if not traceStream.isNil: var entry = TraceEntry( timestamp: getTime().toUnixFloat(), actor: initRecord("named", actor.name.toPreserves), item: act) traceStream.write(entry.toPreserves) proc path(facet: Facet): seq[trace.FacetId] = var f = facet while not f.isNil: result.add f.id.toPreserves f = f.parent proc initEnqueue(turn: Turn; cap: Cap): ActionDescription = result = ActionDescription(orKind: ActionDescriptionKind.enqueue) result.enqueue.event.target.actor = turn.facet.actor.id.toPreserves result.enqueue.event.target.facet = turn.facet.id.toPreserves result.enqueue.event.target.oid = cap.target.oid.toPreserves proc toDequeue(act: sink ActionDescription): ActionDescription = result = ActionDescription(orKind: ActionDescriptionKind.dequeue) result.dequeue.event = move act.enqueue.event proc toTraceTarget(cap: Cap): trace.Target = assert not cap.target.isNil assert not cap.target.facet.isNil result.actor = cap.target.facet.actor.id result.facet = cap.target.facet.id result.oid = cap.target.oid.toPreserves method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard converter toActor(f: Facet): Actor = f.actor converter toActor(t: Turn): Actor = t.facet.actor converter toFacet(a: Actor): Facet = a.root converter toFacet(t: Turn): Facet = t.facet using actor: Actor facet: Facet turn: var Turn action: TurnAction proc labels(f: Facet): string = assert not f.isNil assert not f.actor.isNil result.add f.actor.name proc catLabels(f: Facet; labels: var string) = if not f.parent.isNil: catLabels(f.parent, labels) labels.add ':' labels.add $f.id catLabels(f, result) proc `$`*(f: Facet): string = "" proc `$`*(actor: Actor): string = "" # TODO: ambigous when tracing: proc `$`*(r: Cap): string = "" proc `$`*(t: Turn): string = "" proc attenuate*(r: Cap; a: Attenuation): Cap = if a.len == 0: result = r else: result = Cap( target: r.target, relay: r.relay, attenuation: a & r.attenuation) proc hash*(actor): Hash = actor.unsafeAddr.hash proc hash*(facet): Hash = facet.unsafeAddr.hash proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash) proc actor*(turn): Actor = turn.facet.actor proc nextHandle(facet: Facet): Handle = result = succ(facet.actor.handleAllocator[]) facet.actor.handleAllocator[] = result template recallFacet(turn: var Turn; body: untyped): untyped = let facet = turn.facet block: body assert facet.actor == turn.facet.actor turn.facet = facet proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) = assert not facet.isNil turn.work.addLast((facet, act,)) proc queueTurn*(facet: Facet; act: TurnAction) = var turn = Turn(facet: facet) assert not facet.isNil turn.work.addLast((facet, act,)) when tracing: turn.desc.id = nextTurnId() turnQueue.addLast(turn) proc queueTurn*(prev: var Turn; facet: Facet; act: TurnAction) = var next = Turn(facet: facet) assert not facet.isNil next.work.addLast((facet, act,)) when tracing: next.desc.id = nextTurnId() next.desc.cause = TurnCause(orKind: TurnCauseKind.turn) next.desc.cause.turn.id = prev.desc.id turnQueue.addLast(next) proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action) ## Alias to queueTurn_. proc facet*(turn: Turn): Facet = turn.facet proc queueEffect*(turn: var Turn; target: Facet; act: TurnAction) = let fremd = target.actor if fremd == turn.facet.actor: turn.work.addLast((target, act,)) else: var fremdTurn = turn.effects.getOrDefault(fremd) if fremdTurn.isNil: fremdTurn = Turn(facet: target) turn.effects[fremd] = fremdTurn when tracing: fremdTurn.desc.id = nextTurnId() fremdTurn.desc.cause = TurnCause(orKind: TurnCauseKind.turn) fremdTurn.desc.cause.turn.id = turn.desc.id fremdTurn.work.addLast((target, act,)) type Bindings = Table[Value, Value] proc match(bindings: var Bindings; p: Pattern; v: Value): bool = case p.orKind of PatternKind.Pdiscard: result = true of PatternKind.Patom: result = case p.patom of PAtom.Boolean: v.isBoolean of PAtom.Double: v.isFloat of PAtom.Signedinteger: v.isInteger of PAtom.String: v.isString of PAtom.Bytestring: v.isByteString of PAtom.Symbol: v.isSymbol of PatternKind.Pembedded: result = v.isEmbedded of PatternKind.Pbind: if match(bindings, p.pbind.pattern, v): bindings[p.pbind.pattern.toPreserves] = v result = true of PatternKind.Pand: for pp in p.pand.patterns: result = match(bindings, pp, v) if not result: break of PatternKind.Pnot: var b: Bindings result = not match(b, p.pnot.pattern, v) of PatternKind.Lit: result = p.lit.value == v of PatternKind.PCompound: case p.pcompound.orKind of PCompoundKind.rec: if v.isRecord and p.pcompound.rec.label == v.label and p.pcompound.rec.fields.len == v.arity: result = true for i, pp in p.pcompound.rec.fields: if not match(bindings, pp, v[i]): result = false break of PCompoundKind.arr: if v.isSequence and p.pcompound.arr.items.len == v.sequence.len: result = true for i, pp in p.pcompound.arr.items: if not match(bindings, pp, v[i]): result = false break of PCompoundKind.dict: if v.isDictionary: result = true for key, pp in p.pcompound.dict.entries: let vv = step(v, key) if vv.isNone or not match(bindings, pp, get vv): result = true break proc match(p: Pattern; v: Value): Option[Bindings] = var b: Bindings if match(b, p, v): result = some b proc instantiate(t: Template; bindings: Bindings): Value = case t.orKind of TemplateKind.Tattenuate: let v = instantiate(t.tattenuate.template, bindings) let cap = v.unembed(Cap) if cap.isNone: raise newException(ValueError, "Attempt to attenuate non-capability") result = attenuate(get cap, t.tattenuate.attenuation).embed of TemplateKind.TRef: let n = $t.tref.binding.int try: result = bindings[n.toPreserves] except KeyError: raise newException(ValueError, "unbound reference: " & n) of TemplateKind.Lit: result = t.lit.value of TemplateKind.Tcompound: case t.tcompound.orKind of TCompoundKind.rec: result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len) for i, tt in t.tcompound.rec.fields: result[i] = instantiate(tt, bindings) of TCompoundKind.arr: result = initSequence(t.tcompound.arr.items.len) for i, tt in t.tcompound.arr.items: result[i] = instantiate(tt, bindings) of TCompoundKind.dict: result = initDictionary() for key, tt in t.tcompound.dict.entries: result[key] = instantiate(tt, bindings) proc rewrite(r: Rewrite; v: Value): Value = let bindings = match(r.pattern, v) if bindings.isSome: result = instantiate(r.template, get bindings) proc examineAlternatives(cav: Caveat; v: Value): Value = case cav.orKind of CaveatKind.Rewrite: result = rewrite(cav.rewrite, v) of CaveatKind.Alts: for r in cav.alts.alternatives: result = rewrite(r, v) if not result.isFalse: break of CaveatKind.Reject: discard of CaveatKind.unknown: discard proc runRewrites*(a: Attenuation; v: Value): Value = result = v for stage in a: result = examineAlternatives(stage, result) if result.isFalse: break proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) = var a = runRewrites(cap.attenuation, v) if not a.isFalse: let e = OutboundAssertion(handle: h, peer: cap) turn.facet.outbound[h] = e when tracing: var act = ActionDescription(orKind: ActionDescriptionKind.enqueue) act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves act.enqueue.event.target.facet = turn.facet.id.toPreserves act.enqueue.event.target.oid = cap.target.oid.toPreserves act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert) act.enqueue.event.detail.assert.assertion.value.value = mapEmbeds(v) do (cap: Value) -> Value: discard act.enqueue.event.detail.assert.handle = h turn.desc.actions.add act queueEffect(turn, cap.relay) do (turn: var Turn): e.established = true when tracing: turn.desc.actions.add act.toDequeue publish(cap.target, turn, AssertionRef(value: a), e.handle) proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} = result = turn.facet.nextHandle() publish(turn, r, a, result) proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} = publish(turn, r, a.toPreserves) proc retract(turn: var Turn; e: OutboundAssertion) = when tracing: var act = initEnqueue(turn, e.peer) act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.retract) act.enqueue.event.detail.retract.handle = e.handle turn.desc.actions.add act queueEffect(turn, e.peer.relay) do (turn: var Turn): when tracing: turn.desc.actions.add act.toDequeue if e.established: e.established = false e.peer.target.retract(turn, e.handle) proc retract*(turn: var Turn; h: Handle) = var e: OutboundAssertion if turn.facet.outbound.pop(h, e): turn.retract(e) proc message*(turn: var Turn; r: Cap; v: Value) = var a = runRewrites(r.attenuation, v) if not a.isFalse: when tracing: var act = initEnqueue(turn, r) act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.message) act.enqueue.event.detail.message.body.value.value = mapEmbeds(a) do (cap: Value) -> Value: discard turn.desc.actions.add act queueEffect(turn, r.relay) do (turn: var Turn): when tracing: turn.desc.actions.add act.toDequeue r.target.message(turn, AssertionRef(value: a)) proc message*[T](turn: var Turn; r: Cap; v: T) = message(turn, r, v.toPreserves) proc sync*(turn: var Turn; r, peer: Cap) = when tracing: var act = initEnqueue(turn, peer) act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.sync) act.enqueue.event.detail.sync.peer = peer.toTraceTarget turn.desc.actions.add act queueEffect(turn, r.relay) do (turn: var Turn): when tracing: turn.desc.actions.add act.toDequeue r.target.sync(turn, peer) proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle = result = publish(turn, cap, v) if h != default(Handle): retract(turn, h) proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} = var old = h h = publish(turn, cap, v) if old != default(Handle): retract(turn, old) h proc stop*(turn: var Turn) proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet = inc actor.facetIdAllocator result = Facet( id: actor.facetIdAllocator.toPreserves, actor: actor, parent: parent, outbound: initialAssertions, isAlive: true) if not parent.isNil: parent.children.incl result proc newFacet(actor; parent: Facet): Facet = var initialAssertions: OutboundTable newFacet(actor, parent, initialAssertions) proc isInert(facet): bool = let noKids = facet.children.len == 0 noOutboundHandles = facet.outbound.len == 0 isRootFacet = facet.parent.isNil noInertCheckPreventers = facet.inertCheckPreventers == 0 result = noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers proc preventInertCheck*(turn: Turn) = inc turn.facet.inertCheckPreventers proc terminateActor(turn; reason: ref Exception) proc terminateFacetOrderly(turn: var Turn) = let facet = turn.facet if facet.isAlive: facet.isAlive = false var i = 0 while i < facet.shutdownActions.len: facet.shutdownActions[i](turn) inc i setLen facet.shutdownActions, 0 for e in facet.outbound.values: retract(turn, e) clear facet.outbound proc inertCheck(turn: var Turn) = if (not turn.facet.parent.isNil and (not turn.facet.parent.isAlive)) or turn.facet.isInert: when tracing: var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) act.facetstop.path = turn.facet.path act.facetstop.reason = FacetStopReason.inert turn.desc.actions.add act stop(turn) proc terminateFacet(turn: var Turn) = let facet = turn.facet for child in facet.children: queueWork(turn, child, terminateFacetOrderly) # terminate all children facet.children.clear() # detach all children queueWork(turn, facet, terminateFacetOrderly) # self-termination proc stopIfInertAfter(action: TurnAction): TurnAction = proc work(turn: var Turn) = queueWork(turn, turn.facet, action) queueEffect(turn, turn.facet, inertCheck) work proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet) proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet {.discardable.} = result = newFacet(turn) recallFacet turn: turn.facet = result when tracing: var act = ActionDescription(orKind: ActionDescriptionKind.facetstart) act.facetstart.path.add result.path turn.desc.actions.add act stopIfInertAfter(bootProc)(turn) proc newActor(name: string; parent: Facet): Actor = result = Actor( name: name, id: name.toPreserves, ) if parent.isNil: new result.handleAllocator else: result.handleAllocator = parent.actor.handleAllocator result.root = newFacet(result, parent) when tracing: var act = ActorActivation(orKind: ActorActivationKind.start) act.start.actorName = Name(orKind: NameKind.named) act.start.actorName.named.name = name.toPreserves trace(result, act) proc run(actor: Actor; bootProc: TurnAction; initialAssertions: OutboundTable) = queueTurn(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc)) proc bootActor*(name: string; bootProc: TurnAction): Actor {.discardable.} = ## Boot a top-level actor. result = newActor(name, nil) new result.handleAllocator var turn = Turn(facet: result.root) assert not result.root.isNil turn.work.addLast((result.root, bootProc,)) when tracing: turn.desc.id = nextTurnId() turn.desc.cause = TurnCause(orKind: TurnCauseKind.external) turn.desc.cause.external.description = "bootActor".toPreserves turnQueue.addLast turn proc spawnActor*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = let actor = newActor(name, turn.facet) queueEffect(turn, actor.root) do (turn: var Turn): var newOutBound: Table[Handle, OutboundAssertion] for key in initialAssertions: discard turn.facet.outbound.pop(key, newOutbound[key]) when tracing: var act = ActionDescription(orKind: ActionDescriptionKind.spawn) act.spawn.id = actor.id.toPreserves turn.desc.actions.add act run(actor, bootProc, newOutBound) actor proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = spawnActor(name, turn, bootProc, initialAssertions) type StopOnRetract = ref object of Entity method retract*(e: StopOnRetract; turn: var Turn; h: Handle) = stop(turn) proc halfLink(facet, other: Facet) = let h = facet.nextHandle() facet.outbound[h] = OutboundAssertion( handle: h, peer: Cap(relay: other, target: StopOnRetract(facet: facet)), established: true, ) proc linkActor*(turn: var Turn; name: string; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = result = spawnActor(name, turn, bootProc, initialAssertions) halfLink(turn.facet, result.root) halfLink(result.root, turn.facet) var inertActor {.threadvar.}: Actor proc newInertCap*(): Cap = if inertActor.isNil: inertActor = bootActor("inert") do (turn: var Turn): turn.stop() Cap(relay: inertActor.root) proc atExit*(actor; action) = actor.exitHooks.add action proc terminateActor(turn; reason: ref Exception) = let actor = turn.actor if not actor.exiting: actor.exiting = true actor.exitReason = reason when tracing: var act = ActorActivation(orKind: ActorActivationKind.stop) if not reason.isNil: act.stop.status = ExitStatus(orKind: ExitStatusKind.Error) act.stop.status.error.message = reason.msg trace(actor, act) while actor.exitHooks.len > 0: var hook = actor.exitHooks.pop() try: hook(turn) except CatchableError as err: if reason.isNil: terminateActor(turn, err) return proc finish(turn: var Turn) = assert not actor.root.isNil, actor.name terminateFacet(turn) actor.root = nil actor.exited = true queueTurn(actor.root, finish) proc terminateFacet*(facet; e: ref Exception) = run(facet.actor.root) do (turn: var Turn): terminateActor(turn, e) proc stop*(turn: var Turn, facet: Facet) = queueEffect(turn, facet) do (turn: var Turn): when tracing: var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) act.facetstop.path = facet.path act.facetstop.reason = FacetStopReason.explicitAction turn.desc.actions.add act terminateFacet(turn) proc stop*(turn: var Turn) = stop(turn, turn.facet) proc stop*(facet: Facet) = run(facet, stop) proc onStop*(facet: Facet; act: TurnAction) = ## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops. add(facet.shutdownActions, act) proc onStop*(turn: var Turn; act: TurnAction) = onStop(turn.facet, act) proc isAlive(actor): bool = not(actor.exited or actor.exiting) proc stop*(actor: Actor) = if actor.isAlive: queueTurn(actor.root) do (turn: var Turn): assert(not turn.facet.isNil) when tracing: var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) act.facetstop.path = turn.facet.path act.facetstop.reason = FacetStopReason.actorStopping turn.desc.actions.add act stop(turn, turn.facet) proc stopActor*(facet: Facet) = stop(facet.actor) proc stopActor*(turn: var Turn) = stop(turn, turn.facet.actor.root) proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} = run(turn.facet, act) proc newCap*(relay: Facet; entity: Entity): Cap = ## Create a new capability for `entity` via `relay`. # An Entity has an owning facet and a Cap does as well? if entity.facet.isNil: entity.facet = relay Cap(relay: relay, target: entity) proc newCap*(turn; e: Entity): Cap = newCap(turn.facet, e) proc newCap*(e: Entity; turn): Cap = newCap(turn.facet, e) type SyncContinuation {.final.} = ref object of Entity action: TurnAction method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) = entity.action(turn) proc sync*(turn: var Turn; refer: Cap; act: TurnAction) = sync(turn, refer, newCap(turn, SyncContinuation(action: act))) proc running*(actor): bool = result = not actor.exited if not (result or actor.exitReason.isNil): raise actor.exitReason proc run(turn: var Turn) = while turn.work.len > 0: var (facet, act) = turn.work.popFirst() assert not act.isNil turn.facet = facet act(turn) when tracing: var act = ActorActivation(orKind: ActorActivationKind.turn) act.turn = move turn.desc trace(turn.facet.actor, act) # TODO: catch exceptions here for eff in turn.effects.mvalues: assert not eff.facet.isNil turnQueue.addLast(move eff) turn.facet = nil # invalidate the turn proc runPendingTurns* = while turnQueue.len > 0: var turn = turnQueue.popFirst() # TODO: check if actor is still valid try: run(turn) except CatchableError as err: stderr.writeLine("actor ", turn.actor.name, " threw an error during a turn") terminateActor(turn, err) proc run* = ## Run actors to completion var ready: seq[Continuation] while true: runPendingTurns() ioqueue.poll(ready) if ready.len == 0: break while ready.len > 0: try: discard trampoline: ready.pop() except CatchableError as err: stderr.writeLine "ioqueue continuation threw an error" raise err proc runActor*(name: string; bootProc: TurnAction) = ## Boot an actor `Actor` and churn ioqueue. let actor = bootActor(name, bootProc) if not actor.exitReason.isNil: raise actor.exitReason actors.run() if not actor.exitReason.isNil: raise actor.exitReason type FacetGuard* = object facet: Facet proc initGuard*(f: Facet): FacetGuard = result.facet = f inc result.facet.inertCheckPreventers proc disarm*(g: var FacetGuard) = if not g.facet.isNil: assert g.facet.inertCheckPreventers > 0 dec g.facet.inertCheckPreventers g.facet = nil proc `=destroy`*(g: FacetGuard) = if not g.facet.isNil: dec g.facet.inertCheckPreventers proc `=copy`*(dst: var FacetGuard, src: FacetGuard) = dst.facet = src.facet inc dst.facet.inertCheckPreventers