# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[asyncdispatch, deques, hashes, monotimes, options, sets, tables, times] import preserves, preserves/parse import ../syndicate/protocols/[protocol, sturdy] # proc `==`*(x, y: Handle): bool {.borrow.} template generateIdType(T: untyped) = type T* = distinct Natural proc `==`*(x, y: T): bool {.borrow.} proc `$`*(id: T): string {.borrow.} generateIdType(ActorId) generateIdType(FacetId) generateIdType(EndpointId) generateIdType(FieldId) generateIdType(TurnId) #[ proc genId(T: type): T = getMonotime().ticks.T ]# type Attenuation = sturdy.Attenuation[Ref] Oid = sturdy.Oid Assertion* = protocol.Assertion[Ref] Caveat = sturdy.Caveat[Ref] Rewrite = sturdy.Rewrite[Ref] PublishProc* = proc (e: Entity; turn: var Turn; v: Assertion; h: Handle) {.gcsafe.} RetractProc* = proc (e: Entity; turn: var Turn; h: Handle) {.gcsafe.} MessageProc* = proc (e: Entity; turn: var Turn; v: Assertion) {.gcsafe.} SyncProc* = proc (e: Entity; turn: var Turn; peer: Ref) {.gcsafe.} Entity* = ref object of RootObj oid*: Oid # oid is how Entities are identified over the wire publishImpl*: PublishProc retractImpl*: RetractProc messageImpl*: MessageProc syncImpl*: SyncProc Ref* {.unpreservable.} = ref object # TODO: rename relay*: Facet target*: Entity attenuation*: Attenuation OutboundAssertion = ref object handle: Handle peer: Ref established: bool OutboundTable = Table[Handle, OutboundAssertion] Actor = ref object future: Future[void] name: string id: ActorId handleAllocator: Handle root: Facet exitReason: ref Exception exitHooks: seq[TurnAction] exiting: bool TurnAction* = proc (t: var Turn) {.gcsafe.} Queues = TableRef[Facet, seq[TurnAction]] Turn* = object # an object that should remain on the stack id: TurnId activeFacet*: Facet queues: Queues # a ref object that can outlive Turn ParentFacet = Option[Facet] Facet* = ref FacetObj FacetObj = object id: FacetId actor*: Actor parent: ParentFacet children: HashSet[Facet] outbound: OutboundTable shutdownActions: seq[TurnAction] inertCheckPreventers: int isAlive: bool using actor: Actor facet: Facet turn: var Turn action: TurnAction proc setProcs*( result: Entity, publish: PublishProc = nil, retract: RetractProc = nil, message: MessageProc = nil, sync: SyncProc = nil) {.inline.} = result.publishImpl = publish result.retractImpl = retract result.messageImpl = message result.syncImpl = sync proc newEntity*( publish: PublishProc = nil, retract: RetractProc = nil, message: MessageProc = nil, sync: SyncProc = nil): Entity = new result result.setProcs(publish, retract, message, sync) proc publish*(e: Entity; turn: var Turn; v: Assertion; h: Handle) = if not e.publishImpl.isNil: e.publishImpl(e, turn, v, h) proc retract*(e: Entity; turn: var Turn; h: Handle) = if not e.retractImpl.isNil: e.retractImpl(e, turn, h) proc message*(e: Entity; turn: var Turn; v: Assertion) = if not e.messageImpl.isNil: e.messageImpl(e, turn, v) proc sync*(e: Entity; turn: var Turn; peer: Ref) = if not e.syncImpl.isNil: e.syncImpl(e, turn, peer) proc labels(f: Facet): string = proc catLabels(f: Facet; labels: var string) = labels.add ':' if f.parent.isSome: catLabels(f.parent.get, labels) labels.add ':' labels.add $f.id result.add f.actor.name catLabels(f, result) proc `$`*(f: Facet): string = "" proc `$`*(r: Ref): string = "" proc `$`*(actor: Actor): string = "" # TODO: ambigous proc attenuate(r: Ref; a: Attenuation): Ref = if a.len == 0: result = r else: result = Ref( relay: r.relay, target: r.target, attenuation: a & r.attenuation) proc hash*(facet): Hash = facet.id.hash proc hash*(r: Ref): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash) # !& # r.attenuation.toPreserve.hash) # TODO: really convert for each hash? proc nextHandle(facet: Facet): Handle = inc facet.actor.handleAllocator facet.actor.handleAllocator proc enqueue(turn: var Turn; target: Facet; action: TurnAction) = if target in turn.queues: turn.queues[target].add action else: turn.queues[target] = @[action] type Bindings = Table[Preserve[Ref], Preserve[Ref]] proc match(p: Pattern; v: Assertion): Option[Bindings] = proc walk(bindings: var Bindings; p: Pattern; v: Assertion): bool = case p.orKind of PatternKind.Pdiscard: result = true of PatternKind.Patom: result = case p.patom of PAtom.Boolean: v.isBoolean of PAtom.Float: v.isFloat of PAtom.Double: v.isDouble of PAtom.Signedinteger: v.isInteger of PAtom.String: v.isString of PAtom.Bytestring: v.isByteString of PAtom.Symbol: v.isSymbol of PatternKind.Pembedded: result = v.isEmbedded of PatternKind.Pbind: if walk(bindings, p.pbind.pattern, v): bindings[toPreserve(p.pbind.pattern, Ref)] = v result = true of PatternKind.Pand: for pp in p.pand.patterns: result = walk(bindings, pp, v) if not result: break of PatternKind.Pnot: var b: Bindings result = not walk(b, p.pnot.pattern, v) of PatternKind.Lit: result = p.lit.value == v of PatternKind.Pcompound: let ctor = p.pcompound.ctor case ctor.orKind of ConstructorspecKind.Crec: if v.isRecord and ctor.crec.label == v.label and ctor.crec.arity == v.arity: for key, pp in p.pcompound.members: if not key.isInteger: result = false else: result = walk(bindings, pp, v.record[key.int]) if not result: break of ConstructorspecKind.Carr: if v.isSequence and ctor.carr.arity == v.sequence.len: for key, pp in p.pcompound.members: result = if not key.isInteger: false else: walk(bindings, pp, v.sequence[key.int]) if not result: break of ConstructorspecKind.Cdict: if v.isDictionary: for key, pp in p.pcompound.members: let vv = v[key] result = if vv.isFalse: false else: walk(bindings, pp, vv) if not result: break var b: Bindings if walk(b, p, v): result = some b proc instantiate(t: Template; bindings: Bindings): Assertion = proc walk(t: Template): Assertion = case t.orKind of TemplateKind.Tattenuate: let v = walk(t.tattenuate.template) if not v.isEmbedded: raise newException(ValueError, "Attempt to attenuate non-capability: " & $v) result = embed(attenuate(v.embed, t.tattenuate.attenuation)) of TemplateKind.Tref: let n = $t.tref.binding try: result = bindings[toPreserve(n, Ref)] except KeyError: raise newException(ValueError, "unbound reference: " & n) of TemplateKind.Lit: result = t.lit.value of TemplateKind.Tcompound: let ctor = t.tcompound.ctor case ctor.orKind of ConstructorspecKind.Crec: result = initRecord(ctor.crec.label, ctor.crec.arity) for key, tt in t.tcompound.members: result.record[key.int] = walk(tt) of ConstructorspecKind.Carr: result = initSequence[Ref](ctor.carr.arity) for key, tt in t.tcompound.members: result.sequence[key.int] = walk(tt) of ConstructorspecKind.Cdict: result = initDictionary[Ref]() for key, tt in t.tcompound.members: result[key] = walk(tt) walk(t) proc rewrite(r: Rewrite; v: Assertion): Assertion = let bindings = match(r.pattern, v) if bindings.isSome: result = instantiate(r.template, get bindings) proc examineAlternatives(cav: Caveat; v: Assertion): Assertion = case cav.orKind of CaveatKind.`Rewrite`: result = rewrite(cav.rewrite, v) of CaveatKind.`Alts`: for r in cav.alts.alternatives: result = rewrite(r, v) if not result.isFalse: break proc runRewrites*(a: Attenuation; v: Assertion): Assertion = result = v for stage in a: result = examineAlternatives(stage, result) if result.isFalse: break proc publish(turn: var Turn; r: Ref; v: Assertion; h: Handle) = let a = runRewrites(r.attenuation, v) if not a.isFalse: let e = OutboundAssertion( handle: h, peer: r, established: false) turn.activeFacet.outbound[h] = e enqueue(turn, r.relay) do (turn: var Turn): e.established = true publish(r.target, turn, a, e.handle) proc publish*(turn: var Turn; r: Ref; a: Assertion): Handle = result = turn.activeFacet.nextHandle() publish(turn, r, a, result) proc publish*[T](turn: var Turn; r: Ref; a: T): Handle = publish(turn, r, toPreserve(a, Ref)) proc retract(turn: var Turn; e: OutboundAssertion) = enqueue(turn, e.peer.relay) do (turn: var Turn): if e.established: e.established = false e.peer.target.retract(turn, e.handle) proc retract*(turn: var Turn; h: Handle) = var e: OutboundAssertion if turn.activeFacet.outbound.pop(h, e): turn.retract(e) proc message*(turn: var Turn; r: Ref; v: Assertion) = let a = runRewrites(r.attenuation, v) if not a.isFalse: enqueue(turn, r.relay) do (turn: var Turn): r.target.message(turn, a) proc message*[T](turn: var Turn; r: Ref; v: T) = message(turn, r, toPreserve(v, Ref)) proc sync(turn: var Turn; e: Entity; peer: Ref) = e.sync(turn, peer) # or turn.message(peer, true) ? proc sync*(turn: var Turn; r, peer: Ref) = enqueue(turn, r.relay) do (turn: var Turn): sync(turn, r.target, peer) proc replace*[T](turn: var Turn; `ref`: Ref; h: Handle; v: T): Handle = result = publish(turn, `ref`, v) retract(turn, h) proc stop*(turn: var Turn) {.gcsafe.} proc run*(facet; action: TurnAction; zombieTurn = false) {.gcsafe.} proc newFacet(actor; parent: ParentFacet; initialAssertions: OutboundTable): Facet = result = Facet( id: getMonoTime().ticks.FacetId, actor: actor, parent: parent, outbound: initialAssertions, isAlive: true) if parent.isSome: parent.get.children.incl result proc newFacet(actor; parent: ParentFacet): Facet = var initialAssertions: OutboundTable newFacet(actor, parent, initialAssertions) proc onStop(facet; action) = facet.shutdownActions.add action proc isInert(facet): bool = facet.inertCheckPreventers == 0 and facet.children.len == 0 and facet.outbound.len == 0 proc preventInertCheck*(facet): (proc() {.gcsafe.}) = var armed = true inc facet.inertCheckPreventers proc disarm() = if armed: armed = false dec facet.inertCheckPreventers result = disarm proc inFacet(turn: var Turn; facet; act: TurnAction) = ## Call an action with a facet using a temporary `Turn` ## that shares the `Queues` of the calling `Turn`. var t = Turn(activeFacet: facet, queues: turn.queues) act(t) proc terminate(actor; turn; reason: ref Exception) {.gcsafe.} proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} = if facet.isAlive: facet.isAlive = false let parent = facet.parent block: var turn = Turn(activeFacet: facet, queues: turn.queues) for child in facet.children: child.terminate(turn, orderly) if orderly: for act in facet.shutdownActions: act(turn) for a in facet.outbound.values: turn.retract(a) if orderly: if parent.isSome: if parent.get.isInert: run(parent.get) do (turn: var Turn): parent.get.terminate(turn, true) # TODO: is this the right turn? else: run(facet.actor.root) do (turn: var Turn): terminate(facet.actor, turn, nil) # TODO: is this the right turn? proc stopIfInertAfter(action: TurnAction): TurnAction = proc wrapper(turn: var Turn) = action(turn) enqueue(turn, turn.activeFacet) do (turn: var Turn): if (turn.activeFacet.parent.isSome and (not turn.activeFacet.parent.get.isAlive)) or turn.activeFacet.isInert: stop(turn) wrapper proc facet*(turn: var Turn; bootProc: TurnAction): Facet = result =newFacet(turn.activeFacet.actor, some turn.activeFacet) inFacet(turn, result, stopIfInertAfter(bootProc)) proc newActor(name: string; bootProc: TurnAction; initialAssertions: OutboundTable): Actor = let now = getTime() seed = now.toUnix * 1_000_000_000 + now.nanosecond result = Actor( name: name, id: ActorId(seed)) result.root = newFacet(result, none Facet) result.future = newFuture[void]($result) run( newFacet(result, some result.root, initialAssertions), # stopIfInertAfter(bootProc) TODO bootProc) proc newActor*(name: string; bootProc: TurnAction): Actor = var initialAssertions: OutboundTable newActor(name, bootProc, initialAssertions) proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()) = enqueue(turn, turn.activeFacet) do (turn: var Turn): var newOutBound: Table[Handle, OutboundAssertion] for key in initialAssertions: discard turn.activeFacet.outbound.pop(key, newOutbound[key]) callSoon: discard newActor(name, bootProc, newOutBound) proc newInertRef*(): Ref = # TODO: really create a new actor? Novy does this only once. let a = newActor("") do (turn: var Turn): turn.stop() Ref(relay: a.root) proc atExit*(actor; action) = actor.exitHooks.add action proc terminate(actor; turn; reason: ref Exception) = if not actor.exiting: actor.exiting = true actor.exitReason = reason for hook in actor.exitHooks: hook(turn) proc finish(turn: var Turn) = actor.root.terminate(turn, not reason.isNil) if reason.isNil: actor.future.complete() else: actor.future.fail reason callSoon: run(actor.root, finish, true) proc terminate(facet; e: ref Exception) = run(facet.actor.root) do (turn: var Turn): facet.actor.terminate(turn, e) proc asyncCheck*(turn; fut: FutureBase) = let facet = turn.activeFacet fut.addCallback do (): if fut.failed: terminate(facet, fut.error) template tryFacet(facet; body: untyped) = body # TODO #try: body #except: terminate(facet, getCurrentException()) proc run(queues: Queues) = callSoon: for facet, queue in queues: for action in queue: run(facet, action) proc run*(facet; action: TurnAction; zombieTurn = false) = if not zombieTurn: if not facet.actor.exitReason.isNil: return if not facet.isAlive: return # TODO: not Nim idiom tryFacet(facet): var turn = Turn( activeFacet: facet, queues: newTable[Facet, seq[TurnAction]]()) action(turn) run(turn.queues) proc stop*(turn: var Turn, facet: Facet) = enqueue(turn, facet.parent.get) do (turn: var Turn): facet.terminate(turn, true) proc stop*(turn: var Turn) = stop(turn, turn.activeFacet) proc stopActor*(turn: var Turn) = let actor = turn.activeFacet.actor enqueue(turn, turn.activeFacet.actor.root) do (turn: var Turn): terminate(actor, turn, nil) proc freshen*(turn: var Turn, act: TurnAction) = assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn") run(turn.activeFacet, act) proc newRef*(relay: Facet; e: Entity): Ref = Ref(relay: relay, target: e) proc newRef*(turn; e: Entity): Ref = Ref(relay: turn.activeFacet, target: e) proc sync*(turn, refer: Ref, cb: proc(t: Turn) {.gcsafe.}) = discard # TODO proc log*(f: Facet, args: varargs[string, `$`]) = echo f, args proc runActor*(name: string; bootProc: TurnAction): Future[void] = let actor = newActor(name, bootProc) result = actor.future