diff --git a/lock.json b/lock.json index 7c82bba..ea4ef0f 100644 --- a/lock.json +++ b/lock.json @@ -38,11 +38,11 @@ "packages": [ "nimcrypto" ], - "path": "/nix/store/7b491gv9zlayilsh8k2gnyzw6znrh7xq-source", - "rev": "70151aa132f3a771996117c23c1fcaa8446a6f35", - "sha256": "1ldjz02p70wagqvk6vgcg16kjh7pkm1394qd1pcdmg8z39bm5ag3", + "path": "/nix/store/jwz8pqbv6rsm8w4fjzdb37r0wzjn5hv0-source", + "rev": "d58da671799c69c0b3208b96c154e13c8b1a9e90", + "sha256": "12dm0gsy10ppga7zf7hpf4adaqjrd9b740n2w926xyazq1njf6k9", "srcDir": "", - "url": "https://github.com/cheatfate/nimcrypto/archive/70151aa132f3a771996117c23c1fcaa8446a6f35.tar.gz" + "url": "https://github.com/cheatfate/nimcrypto/archive/d58da671799c69c0b3208b96c154e13c8b1a9e90.tar.gz" }, { "method": "fetchzip", diff --git a/src/sam/actors.nim b/src/sam/actors.nim index 26cf115..39e5d8a 100644 --- a/src/sam/actors.nim +++ b/src/sam/actors.nim @@ -1,10 +1,10 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[deques, hashes, options, times] +import std/[deques, hashes, options, tables, times] import pkg/cps import preserves -import ../syndicate/protocols/[protocol, sturdy] +import ./protocols/[protocol, sturdy] # const traceSyndicate {.booldefine.}: bool = true const traceSyndicate* = true @@ -18,9 +18,13 @@ export protocol.Handle type Cont* = ref object of Continuation - turn: Turn facet: Facet + PublishProc* = proc (e: Entity; v: Value; h: Handle) {.cps: Cont.} + RetractProc* = proc (e: Entity; h: Handle) {.cps: Cont.} + MessageProc* = proc (e: Entity; v: Value) {.cps: Cont.} + SyncProc* = proc (e: Entity; peer: Cap) {.cps: Cont.} + Handler* = proc() {.closure.} Work = Deque[Cont] @@ -28,45 +32,53 @@ type FacetState = enum fFresh, fRunning, fEnded - Callback* = proc () {.nimcall.} + Callback* = proc () {.closure.} + + OutboundTable = Table[Handle, OutboundAssertion] + OutboundAssertion = ref object + handle: Handle + peer: Cap + established: bool Facet* = ref object ## https://synit.org/book/glossary.html#facet actor: Actor parent: Facet children: seq[Facet] + outbound: OutboundTable stopHandlers: HandlerDeque stopCallbacks: seq[Callback] state: FacetState - when traceSyndicate: - id: FacetId + id: FacetId - Turn = ref object + FacetProc* = proc (f: Facet) {.closure.} + ## Type for callbacks to be called within a turn. + ## The `Facet` parameter is the owning facet. + + Turn {.byref.} = object ## https://synit.org/book/glossary.html#turn facet: Facet entity: Entity - event: Option[protocol.Event] work: Work + actions: seq[Cont] + event: Option[protocol.Event] when traceSyndicate: desc: TurnDescription Entity* = ref object of RootObj ## https://synit.org/book/glossary.html#entity - facet*: Facet oid*: sturdy.Oid # oid is how Entities are identified over the wire publishImpl*: PublishProc retractImpl*: RetractProc messageImpl*: MessageProc syncImpl*: SyncProc - when traceSyndicate: - id: FacetId Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj relay*: Facet target*: Entity attenuation*: seq[sturdy.Caveat] - Actor = ref object + Actor* = ref object ## https://synit.org/book/glossary.html#actor # TODO: run on a seperate thread. # crashHandlers: HandlerDeque @@ -74,11 +86,12 @@ type handleAllocator: Handle facetIdAllocator: int id: ActorId + turn: Turn when traceSyndicate: traceStream: FileStream stopped: bool -template syndicate*(prc: typed): untyped = +template turnWork*(prc: typed): untyped = cps(Cont, prc) proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} = @@ -93,6 +106,9 @@ using cap: Cap turn: Turn +proc `$`*(facet): string = $facet.id +proc `$`*(cap): string = "#:…" + proc hash*(facet): Hash = facet.unsafeAddr.hash proc hash*(cap): Hash = cap.unsafeAddr.hash @@ -126,13 +142,19 @@ when traceSyndicate: ) actor.traceStream.writeLine($entry.toPreserves) - proc trace(actor; turn) = + proc traceTurn(actor) = if not actor.traceStream.isNil: actor.trace(ActorActivation( orKind: ActorActivationKind.turn, - turn: turn.desc, + turn: actor.turn.desc, )) + proc traceTarget(facet): trace.Target = + Target( + actor: facet.actor.id, + facet: facet.id, + ) + proc traceTarget(cap): trace.Target = let facet = cap.relay Target( @@ -141,53 +163,80 @@ when traceSyndicate: oid: cap.target.oid.toPreserves, ) - proc traceTarget(turn): trace.Target = - let facet = turn.facet - Target( - actor: facet.actor.id, - facet: facet.id, + proc traceEnqueue(actor; e: TargetedTurnEvent) = + actor.turn.desc.actions.add ActionDescription( + orKind: ActionDescriptionKind.enqueue, + enqueue: ActionDescriptionEnqueue(event: e), ) -proc newExternalTurn(facet): Turn = - result = Turn(facet: facet) - when traceSyndicate: - result.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external)) + proc traceDequeue(actor; e: TargetedTurnEvent) = + actor.turn.desc.actions.add ActionDescription( + orKind: ActionDescriptionKind.dequeue, + dequeue: ActionDescriptionDequeue(event: e), + ) proc pass*(a, b: Cont): Cont = - b.turn = a.turn - if b.facet.isNil: - b.facet = a.facet - # TODO: whelp a new continuation at facet boundaries? + assert not a.facet.isNil + b.facet = a.facet b -proc queue(t: Turn; c: Cont) = - c.facet = t.facet - t.work.addLast(c) +proc queueWork(facet; c: Cont) = + c.facet = facet + facet.actor.turn.work.addLast(c) -proc queue(c: Cont): Cont {.cpsMagic.} = - queue(c.turn, c) +proc yieldWork(c: Cont): Cont {.cpsMagic.} = + ## Suspend and enqueue the caller until later in the turn. + assert not c.facet.isNil + c.facet.queueWork(c) nil -proc complete(turn; c: Cont) = +proc yieldToActions(c: Cont): Cont {.cpsMagic.} = + assert not c.facet.isNil + c.facet.actor.turn.actions.add(c) + nil + +proc startExternalTurn(facet) = + let actor = facet.actor + assert actor.turn.work.len == 0 + assert actor.turn.actions.len == 0 + actor.turn.facet = facet + when traceSyndicate: + actor.turn.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external)) + +proc terminate(actor; err: ref Exception) = + raise err + +proc terminate(facet; err: ref Exception) = + terminate(facet.actor, err) + +proc complete(c: Cont) = var c = c try: while not c.isNil and not c.fn.isNil: - c.turn = turn var y = c.fn var x = y(c) c = Cont(x) except CatchableError as err: if not c.dismissed: writeStackFrames c - # terminate(c.facet, err) + terminate(c.facet, err) -proc run(turn) = - let actor = turn.facet.actor +proc run(actor) = assert not actor.stopped - while turn.work.len > 0: - complete(turn, turn.work.popFirst()) + var n = 0 + while actor.turn.work.len > 0: + actor.turn.work.popFirst().complete() + inc n + echo n, " items completed from work queue" + when traceSyndicate: + actor.traceTurn() + var i: int + while i < actor.turn.actions.len: + complete(move actor.turn.actions[i]) + inc i + echo i, " items completed from action queue" + turn.actions.setLen(0) when traceSyndicate: - actor.trace(turn) if actor.stopped: trace(actor, ActorActivation(orkind: ActorActivationKind.stop)) @@ -196,11 +245,11 @@ proc start(actor; cont: Cont) = var act = ActorActivation(orkind: ActorActivationKind.start) trace(actor, act) actor.root.state = fRunning - let turn = actor.root.newExternalTurn() - turn.queue(cont) - run(turn) + actor.root.startExternalTurn() + actor.root.queueWork(cont) + run(actor) -proc stop(turn; actor) +proc stop(actor) proc collectPath(result: var seq[FacetId]; facet) = if not facet.parent.isNil: @@ -211,51 +260,47 @@ proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} = c.fn = facet.stopHandlers.pop() result = c -proc runNextFacetStop() {.syndicate.} = +proc runNextFacetStop() {.cps: Cont.} = activeFacet().runNextStop() -proc stop(turn; facet; reason: FacetStopReason) = +proc stop(facet; reason: FacetStopReason) = + let actor = facet.actor while facet.stopHandlers.len > 0: var c = whelp runNextFacetStop() c.facet = facet - complete(turn, c) + complete(c) while facet.stopCallbacks.len > 0: var cb = facet.stopCallbacks.pop() cb() while facet.children.len > 0: - stop(turn, facet.children.pop(), FacetStopReason.parentStopping) + stop(facet.children.pop(), FacetStopReason.parentStopping) when traceSyndicate: var act = ActionDescription(orKind: ActionDescriptionKind.facetstop) collectPath(act.facetstop.path, facet) act.facetStop.reason = reason - turn.desc.actions.add act + actor.turn.desc.actions.add act if facet.parent.isNil: - facet.actor.root = nil - stop(turn, facet.actor) + actor.root = nil + stop(actor) -proc stop(turn; actor) = +proc stop(actor) = if not actor.root.isNil: - stop(turn, actor.root, FacetStopReason.actorStopping) + stop(actor.root, FacetStopReason.actorStopping) actor.stopped = true -proc bootActor*(name: string, c: Cont) = +proc bootActor(name: string, c: Cont) = start(newActor(name), c) -proc stopActor(c: Cont; a: Actor): Cont {.cpsMagic.} = - stop(c.turn, a) - nil +# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} = +# stop(c.facet, reason) -proc stopFacet(c: Cont; f: Facet): Cont {.cpsMagic.} = - stop(c.turn, f, FacetStopReason.explicitAction) - nil +proc stopActorAction() {.cps: Cont.} = + activeFacet().actor.stop() -proc stopFacet*() {.syndicate.} = - queue() - stop(activeTurn(), activeFacet(), FacetStopReason.explicitAction) - -proc stopActor*() {.syndicate.} = - queue() - stop(activeTurn(), activeActor()) +proc stopActor*(facet) = + let c = whelp stopActorAction() + c.facet = facet + facet.actor.turn.actions.add(c) type AssertionRef* = ref object @@ -270,67 +315,238 @@ proc nextHandle(facet: Facet): Handle = inc(facet.actor.handleAllocator) facet.actor.handleAllocator -proc publish*(cap: Cap; val: Value): Handle = - result = cap.relay.nextHandle() +proc actor(cap): Actor = cap.relay.actor + +type Bindings = Table[Value, Value] + +proc attenuate(r: Cap; a: seq[Caveat]): Cap = + if a.len == 0: result = r + else: result = Cap( + relay: r.relay, + target: r.target, + attenuation: a & r.attenuation) + +proc match(bindings: var Bindings; p: Pattern; v: Value): bool = + case p.orKind + of PatternKind.Pdiscard: result = true + of PatternKind.Patom: + result = case p.patom + of PAtom.Boolean: v.isBoolean + of PAtom.Double: v.isDouble + of PAtom.Signedinteger: v.isInteger + of PAtom.String: v.isString + of PAtom.Bytestring: v.isByteString + of PAtom.Symbol: v.isSymbol + of PatternKind.Pembedded: + result = v.isEmbedded + of PatternKind.Pbind: + if match(bindings, p.pbind.pattern, v): + bindings[p.pbind.pattern.toPreserves] = v + result = true + of PatternKind.Pand: + for pp in p.pand.patterns: + result = match(bindings, pp, v) + if not result: break + of PatternKind.Pnot: + var b: Bindings + result = not match(b, p.pnot.pattern, v) + of PatternKind.Lit: + result = p.lit.value == v + of PatternKind.PCompound: + case p.pcompound.orKind + of PCompoundKind.rec: + if v.isRecord and + p.pcompound.rec.label == v.label and + p.pcompound.rec.fields.len == v.arity: + result = true + for i, pp in p.pcompound.rec.fields: + if not match(bindings, pp, v[i]): + result = false + break + of PCompoundKind.arr: + if v.isSequence and p.pcompound.arr.items.len == v.sequence.len: + result = true + for i, pp in p.pcompound.arr.items: + if not match(bindings, pp, v[i]): + result = false + break + of PCompoundKind.dict: + if v.isDictionary: + result = true + for key, pp in p.pcompound.dict.entries: + let vv = step(v, key) + if vv.isNone or not match(bindings, pp, get vv): + result = true + break + +proc match(p: Pattern; v: Value): Option[Bindings] = + var b: Bindings + if match(b, p, v): + result = some b + +proc instantiate(t: Template; bindings: Bindings): Value = + case t.orKind + of TemplateKind.Tattenuate: + let v = instantiate(t.tattenuate.template, bindings) + let cap = v.unembed(Cap) + if cap.isNone: + raise newException(ValueError, "Attempt to attenuate non-capability") + result = attenuate(get cap, t.tattenuate.attenuation).embed + of TemplateKind.TRef: + let n = $t.tref.binding.int + try: result = bindings[n.toPreserves] + except KeyError: + raise newException(ValueError, "unbound reference: " & n) + of TemplateKind.Lit: + result = t.lit.value + of TemplateKind.Tcompound: + case t.tcompound.orKind + of TCompoundKind.rec: + result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len) + for i, tt in t.tcompound.rec.fields: + result[i] = instantiate(tt, bindings) + of TCompoundKind.arr: + result = initSequence(t.tcompound.arr.items.len) + for i, tt in t.tcompound.arr.items: + result[i] = instantiate(tt, bindings) + of TCompoundKind.dict: + result = initDictionary() + for key, tt in t.tcompound.dict.entries: + result[key] = instantiate(tt, bindings) + +proc rewrite(r: Rewrite; v: Value): Value = + let bindings = match(r.pattern, v) + if bindings.isSome: + result = instantiate(r.template, get bindings) + +proc examineAlternatives(cav: Caveat; v: Value): Value = + case cav.orKind + of CaveatKind.Rewrite: + result = rewrite(cav.rewrite, v) + of CaveatKind.Alts: + for r in cav.alts.alternatives: + result = rewrite(r, v) + if not result.isFalse: break + of CaveatKind.Reject: discard + of CaveatKind.unknown: discard + +proc runRewrites(v: Value; a: openarray[Caveat]): Value = + result = v + for stage in a: + result = examineAlternatives(stage, result) + if result.isFalse: break + +proc publish(c: Cont; e: Entity; v: Value; h: Handle): Cont {.cpsMagic.} = + pass c, e.publishImpl.call(e, v, h) + +proc retract(c: Cont; e: Entity; h: Handle): Cont {.cpsMagic.} = + pass c, e.retractImpl.call(e, h) + +proc message(c: Cont; e: Entity; v: Value): Cont {.cpsMagic.} = + pass c, e.messageImpl.call(e, v) + +proc sync(c: Cont; e: Entity; p: Cap): Cont {.cpsMagic.} = + pass c, e.syncImpl.call(e, p) + +proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} = when traceSyndicate: - var act = ActionDescription(orKind: ActionDescriptionKind.enqueue) - act.enqueue.event = TargetedTurnEvent( + var traceEvent = TargetedTurnEvent( target: cap.traceTarget, detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert) ) - act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert) - act.enqueue.event.detail.assert = TurnEventAssert( + traceEvent.detail = trace.TurnEvent(orKind: TurnEventKind.assert) + traceEvent.detail.assert = TurnEventAssert( assertion: AssertionDescription(orKind: AssertionDescriptionKind.value), - handle: result, + handle: h, ) - act.enqueue.event.detail.assert.assertion.value.value = val - turn.desc.actions.add act - cap.target.publish(val, result) - -proc retract*(cap: Cap; h: Handle) = + traceEvent.detail.assert.assertion.value.value = val + cap.actor.traceEnqueue(traceEvent) + cap.relay.outbound[h] = OutboundAssertion(handle: h, peer: cap) + yieldToActions() when traceSyndicate: - var act = ActionDescription(orKind: ActionDescriptionKind.enqueue) - act.enqueue.event = TargetedTurnEvent( - target: turn.traceTarget, + cap.actor.traceDequeue(traceEvent) + cap.target.publish(val, h) + cap.relay.outbound[h].established = true + +proc turnRetract(cap: Cap; h: Handle) {.turnWork.} = + when traceSyndicate: + var traceEvent = TargetedTurnEvent( + target: cap.traceTarget, detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract) ) - act.enqueue.event.detail.retract = TurnEventRetract(handle: h) - turn.desc.actions.add act + traceEvent.detail.retract.handle = h + cap.actor.traceEnqueue(traceEvent) + yieldToActions() + when traceSyndicate: + cap.actor.traceDequeue(traceEvent) cap.target.retract(h) -proc message*(cap; val: Value) = +proc turnMessage(cap: Cap; val: Value) {.turnWork.} = + var val = runRewrites(val, cap.attenuation) when traceSyndicate: - var act = ActionDescription(orKind: ActionDescriptionKind.enqueue) - act.enqueue.event = TargetedTurnEvent( - target: turn.traceTarget, + var traceEvent = TargetedTurnEvent( + target: cap.traceTarget, detail: trace.TurnEvent(orKind: trace.TurnEventKind.message) ) - act.enqueue.event.detail.message.body.value.value = val - turn.desc.actions.add act - cap.entity.message(val) - -proc sync*(peer: Cap) = + traceEvent.detail.message.body.value.value = val + cap.actor.traceEnqueue(traceEvent) + yieldToActions() when traceSyndicate: - var act = ActionDescription(orKind: ActionDescriptionKind.enqueue) - act.enqueue.event = TargetedTurnEvent( - target: turn.traceTarget, + cap.actor.traceDequeue(traceEvent) + cap.target.message(val) + +proc turnSync(cap: Cap; peer: Cap) {.turnWork.} = + when traceSyndicate: + var traceEvent = TargetedTurnEvent( + target: cap.traceTarget, detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync) ) - act.enqueue.event.detail.sync.peer = peer.traceTarget - turn.desc.actions.add act - peer.entity.sync() + traceEvent.detail.sync.peer = peer.traceTarget + cap.actor.traceEnqueue(traceEvent) + yieldToActions() + when traceSyndicate: + cap.actor.traceDequeue(traceEvent) + cap.target.sync(peer) -proc installStopHook*(c: Cont, facet: Facet): Cont {.cpsMagic.} = +proc publish*(cap; val: Value): Handle = + var val = runRewrites(val, cap.attenuation) + # TODO: attenuation to nothing? + result = cap.relay.nextHandle() + cap.relay.queueWork(whelp turnPublish(cap, val, result)) + +proc publish*[T](cap; x: T): Handle = + publish(cap, x.toPreserves) + +proc retract*(cap; h: Handle) = + cap.relay.queueWork(whelp turnRetract(cap, h)) + +proc message*(cap; val: Value) = + var val = runRewrites(val, cap.attenuation) + cap.relay.queueWork(whelp turnMessage(cap, val)) + +proc message*[T](cap; x: T) = + message(cap, x.toPreserves) + +proc sync*(cap, peer: Cap) = + cap.relay.queueWork(whelp turnSync(cap, peer)) + +proc installStopHook(c: Cont, facet: Facet): Cont {.cpsMagic.} = facet.stopHandlers.add(c.fn) return c -proc addOnStopHandler*(c: Cont; cb: Callback): Cont {.cpsMagic.} = +proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} = c.facet.stopCallbacks.add(cb) result = c -type FacetProc* = proc (f: Facet) {.nimcall.} +proc onStop*(facet; cb: proc () {.closure.}) = + facet.stopCallbacks.add(cb) proc bootActor*(name: string; bootProc: FacetProc): Actor = result = newActor(name) - # TODO: start a turn, trace reason is Eternal, + result.root.startExternalTurn() bootProc(result.root) + +proc runActor*(name: string; bootProc: FacetProc) = + let actor = bootActor(name, bootProc) + while not actor.stopped: + run(actor) diff --git a/src/sam/actors/timers.nim b/src/sam/actors/timers.nim index a1915b6..7bbf28e 100644 --- a/src/sam/actors/timers.nim +++ b/src/sam/actors/timers.nim @@ -3,10 +3,8 @@ import std/[asyncdispatch, monotimes, times, posix, times, epoll] import preserves -import syndicate - -import ../protocols/timer -from ../syndicate/protocols/dataspace import Observe +import ../syndicate, ../protocols/timer +from ../protocols/dataspace import Observe export timer @@ -24,12 +22,16 @@ proc eventfd(count: cuint, flags: cint): cint proc now: float64 = getTime().toUnixFloat() -proc processTimers(ds: Cap) {.turnAction.} = - let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) - during(ds, pat) do (seconds: float): - let period = seconds - now() - if period < 0.001 or true: - let h = publish(ds, LaterThan(seconds: seconds).toPreserves) +proc spawnTimers*(ds: Cap): Actor {.discardable.} = + ## Spawn a timer actor. + bootActor("timers") do (root: Facet): + let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) + #[ + during(ds, pat) do (seconds: float): + let period = seconds - now() + if period < 0.001 or true: + let h = publish(ds, LaterThan(seconds: seconds).toPreserves) + ]# #[ else: @@ -39,14 +41,12 @@ proc processTimers(ds: Cap) {.turnAction.} = discard publish(turn, ds, LaterThan(seconds: seconds)) ]# -proc spawnTimers*(ds: Cap) = - ## Spawn a timer actor. - boot(newActor("timers"), whelp processTimers(ds)) - -proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) = +#[ +proc after*(ds: Cap; dur: Duration; cb: proc () {.closure.}) = ## Execute `act` after some duration of time. let later = now() + dur.inMilliseconds.float64 * 1_000.0 onPublish(ds, grab LaterThan(seconds: later)): - act(turn) + cb() +]# # TODO: periodic timer diff --git a/src/sam/dataspaces.nim b/src/sam/dataspaces.nim index 1f19a4f..5dbb932 100644 --- a/src/sam/dataspaces.nim +++ b/src/sam/dataspaces.nim @@ -4,7 +4,7 @@ import std/[hashes, options, tables] import pkg/cps import preserves -import ./actors, ./protocols/dataspace, ./skeletons +import ./[actors, patterns, skeletons] from ./protocols/protocol import Handle from ./protocols/dataspace import Observe @@ -14,28 +14,35 @@ type index: Index handleMap: Table[Handle, Value] -method publish(ds: Dataspace; turn: Turn; a: AssertionRef; h: Handle) {.gcsafe.} = - if add(ds.index, turn, a.value): - var obs = a.value.preservesTo(Observe) +proc dsPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} = + var ds = Dataspace(e) + if ds.index.add(v): + var obs = v.preservesTo(Observe) if obs.isSome and obs.get.observer of Cap: - ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer)) - ds.handleMap[h] = a.value + ds.index.add(obs.get.pattern, Cap(obs.get.observer)) + ds.handleMap[h] = v -method retract(ds: Dataspace; turn: Turn; h: Handle) {.gcsafe.} = - let v = ds.handleMap[h] - if remove(ds.index, turn, v): +proc dsRetract(e: Entity; h: Handle) {.cps: Cont.} = + var ds = Dataspace(e) + var v = ds.handleMap[h] + if ds.index.remove(v): ds.handleMap.del h var obs = v.preservesTo(Observe) if obs.isSome and obs.get.observer of Cap: - ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer)) + ds.index.remove(obs.get.pattern, Cap(obs.get.observer)) -method message(ds: Dataspace; turn: Turn; a: AssertionRef) {.gcsafe.} = - ds.index.deliverMessage(turn, a.value) +proc dsMessage(e: Entity; v: Value) {.cps: Cont.} = + var ds = Dataspace(e) + ds.index.deliverMessage(v) proc newDataspace*(f: Facet): Cap = - newCap(f, Dataspace(index: initIndex())) + var ds = Dataspace( + publishImpl: whelp dsPublish, + retractImpl: whelp dsRetract, + messageImpl: whelp dsMessage, + index: initIndex(), + ) + newCap(f, ds) -type BootProc = proc (ds: Cap) - -proc newDataspace*(): Cap {.syndicate.} = - activeFacet().newDataspace() +proc observe*(cap: Cap; pat: Pattern; e: Entity): Handle = + publish(cap, Observe(pattern: pat, observer: newCap(cap.relay, e))) diff --git a/src/sam/durings.nim b/src/sam/durings.nim index 0e302f9..6da8046 100644 --- a/src/sam/durings.nim +++ b/src/sam/durings.nim @@ -3,34 +3,32 @@ import std/[hashes, tables] import preserves -import ./actors, ./patterns, ./protocols/dataspace +import ./[actors, patterns] type - DuringProc* = proc (a: Value; h: Handle) + DuringProc* = proc (a: Value; h: Handle): FacetProc {.gcsafe.} DuringActionKind = enum null, dead, act DuringAction = object case kind: DuringActionKind of null, dead: discard of act: - action: Cont + retractProc: FacetProc DuringEntity {.final.}= ref object of Entity - cb: DuringProc + publishProc: DuringProc assertionMap: Table[Handle, DuringAction] -method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) = - let action = de.cb(turn, a.value, h) - # assert(not action.isNil "should have put in a no-op action") +proc duringPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} = + var de = DuringEntity(e) + let handler = de.handler(de.facet, a.value, h) let g = de.assertionMap.getOrDefault h case g.kind - of null: - de.assertionMap[h] = DuringAction(kind: act, action: action) - of dead: - de.assertionMap.del h - freshen(turn, action) + of null, dead: + de.assertionMap[h] = DuringAction(kind: act, action: handler) of act: raiseAssert("during: duplicate handle in publish: " & $h) -method retract(de: DuringEntity; turn: Turn; h: Handle) = +proc duringRetract(e: Entity; h: Handle) {.cps: Cont.} = + var de = DuringEntity(e) let g = de.assertionMap.getOrDefault h case g.kind of null: @@ -40,25 +38,6 @@ method retract(de: DuringEntity; turn: Turn; h: Handle) = of act: de.assertionMap.del h if not g.action.isNil: - turn.queue(g.action) + g.action(de.facet) proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb) - -proc observe*(turn: Turn; ds: Cap; pat: Pattern; e: Entity): Handle = - publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)).toPreserves) - -proc assertHandle(turn: Turn): Handle = - doAssert( - turn.event.isSome and turn.event.get.orKind == EventKind.Assert, - "operation not valid during this turn") - turn.event.get.assert.handle - -proc awaitRetraction(cont: Cont): Cont {.cpsMagic.} = - {.error: "this cannot work".} - let h = cont.turn.assertHandle - while true: - let turn = cont.turn - if turn.retracts(h): - return c - else: - turn.facet.actor.qeueue(c) diff --git a/src/sam/skeletons.nim b/src/sam/skeletons.nim index f77101e..cb4c3df 100644 --- a/src/sam/skeletons.nim +++ b/src/sam/skeletons.nim @@ -65,9 +65,9 @@ func isEmpty(cont: Continuation): bool = cont.cache.len == 0 and cont.leafMap.len == 0 type - ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.} - LeafProc = proc (l: Leaf; v: Value) {.gcsafe.} - ObserverProc = proc (turn: Turn; group: ObserverGroup; vs: seq[Value]) {.gcsafe.} + ContinuationProc = proc (c: Continuation; v: Value) {.closure.} + LeafProc = proc (l: Leaf; v: Value) {.closure.} + ObserverProc = proc (group: ObserverGroup; vs: seq[Value]) {.closure.} proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap = result = cont.leafMap.getOrDefault(constPaths) @@ -114,10 +114,10 @@ proc top(stack: TermStack): Value = assert stack.len > 0 stack[stack.high] -proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind; +proc modify(node: Node; outerValue: Value; event: EventKind; modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) = - proc walk(cont: Continuation; turn: Turn) = + proc walk(cont: Continuation) = modCont(cont, outerValue) for constPaths, constValMap in cont.leafMap.pairs: let constVals = projectPaths(outerValue, constPaths) @@ -129,7 +129,7 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind; for capturePaths, observerGroup in leaf.observerGroups.pairs: let captures = projectPaths(outerValue, capturePaths) if captures.isSome: - modObs(turn, observerGroup, get captures) + modObs(observerGroup, get captures) of removedEvent: let leaf = constValMap.getOrDefault(get constVals) if not leaf.isNil: @@ -137,13 +137,13 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind; for capturePaths, observerGroup in leaf.observerGroups.pairs: let captures = projectPaths(outerValue, capturePaths) if captures.isSome: - modObs(turn, observerGroup, get captures) + modObs(observerGroup, get captures) if leaf.isEmpty: constValMap.del(get constVals) - proc walk(node: Node; turn: Turn; termStack: TermStack) = - walk(node.continuation, turn) + proc walk(node: Node; termStack: TermStack) = + walk(node.continuation) for selector, table in node.edges: let nextStack = pop(termStack, selector.popCount) @@ -153,11 +153,11 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind; if nextClass.kind != classNone: let nextNode = table.getOrDefault(nextClass) if not nextNode.isNil: - walk(nextNode, turn, push(nextStack, get nextValue)) + walk(nextNode, push(nextStack, get nextValue)) if event == removedEvent and nextNode.isEmpty: table.del(nextClass) - walk(node, turn, @[@[outerValue].toPreserves]) + walk(node, @[@[outerValue].toPreserves]) proc getOrNew[A, B, C](t: var Table[A, TableRef[B, C]], k: A): TableRef[B, C] = result = t.getOrDefault(k) @@ -227,7 +227,7 @@ proc getEndpoints(leaf: Leaf; capturePaths: Paths): ObserverGroup = if captures.isSome: discard result.cachedCaptures.change(get captures, +1) -proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = +proc add*(index: var Index; pattern: Pattern; observer: Cap) = let cont = index.root.extend(pattern) analysis = analyse pattern @@ -237,10 +237,10 @@ proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = # TODO if endpoints.cachedCaptures.len > 0: var captureMap = newTable[seq[Value], Handle]() for capture in endpoints.cachedCaptures.items: - captureMap[capture] = publish(turn, observer, capture.toPreserves) + captureMap[capture] = publish(observer, capture.toPreserves) endpoints.observers[observer] = captureMap -proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = +proc remove*(index: var Index; pattern: Pattern; observer: Cap) = let cont = index.root.extend(pattern) analysis = analyse pattern @@ -252,7 +252,7 @@ proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = if not endpoints.isNil: var captureMap: TableRef[seq[Value], Handle] if endpoints.observers.pop(observer, captureMap): - for handle in captureMap.values: retract(turn, handle) + for handle in captureMap.values: retract(observer, handle) if endpoints.observers.len == 0: leaf.observerGroups.del(analysis.capturePaths) if leaf.observerGroups.len == 0: @@ -260,7 +260,7 @@ proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = if constValMap.len == 0: cont.leafMap.del(analysis.constPaths) -proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int): bool = +proc adjustAssertion(index: var Index; outerValue: Value; delta: int): bool = case index.allAssertions.change(outerValue, delta) of cdAbsentToPresent: result = true @@ -268,37 +268,37 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int c.cache.incl(v) proc modLeaf(l: Leaf; v: Value) = l.cache.incl(v) - proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) = + proc modObserver(group: ObserverGroup; vs: seq[Value]) = let change = group.cachedCaptures.change(vs, +1) if change == cdAbsentToPresent: for (observer, captureMap) in group.observers.pairs: - captureMap[vs] = publish(turn, observer, vs.toPreserves) + captureMap[vs] = publish(observer, vs.toPreserves) # TODO: this handle is coming from the facet? - modify(index.root, turn, outerValue, addedEvent, modContinuation, modLeaf, modObserver) + modify(index.root, outerValue, addedEvent, modContinuation, modLeaf, modObserver) of cdPresentToAbsent: result = true proc modContinuation(c: Continuation; v: Value) = c.cache.excl(v) proc modLeaf(l: Leaf; v: Value) = l.cache.excl(v) - proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) = + proc modObserver(group: ObserverGroup; vs: seq[Value]) = if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent: for (observer, captureMap) in group.observers.pairs: var h: Handle if captureMap.take(vs, h): - retract(observer.target, turn, h) - modify(index.root, turn, outerValue, removedEvent, modContinuation, modLeaf, modObserver) + retract(observer, h) + modify(index.root, outerValue, removedEvent, modContinuation, modLeaf, modObserver) else: discard proc continuationNoop(c: Continuation; v: Value) = discard proc leafNoop(l: Leaf; v: Value) = discard -proc add*(index: var Index; turn: Turn; v: Value): bool = - adjustAssertion(index, turn, v, +1) -proc remove*(index: var Index; turn: Turn; v: Value): bool = - adjustAssertion(index, turn, v, -1) +proc add*(index: var Index; v: Value): bool = + adjustAssertion(index, v, +1) +proc remove*(index: var Index; v: Value): bool = + adjustAssertion(index, v, -1) -proc deliverMessage*(index: var Index; turn: Turn; v: Value) = - proc observersCb(turn: Turn; group: ObserverGroup; vs: seq[Value]) = - for observer in group.observers.keys: message(turn, observer, vs.toPreserves) - index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb) +proc deliverMessage*(index: var Index; v: Value) = + proc observersCb(group: ObserverGroup; vs: seq[Value]) = + for observer in group.observers.keys: message(observer, vs.toPreserves) + index.root.modify(v, messageEvent, continuationNoop, leafNoop, observersCb) diff --git a/src/sam/syndicate.nim b/src/sam/syndicate.nim index f42ea24..ba5f4ff 100644 --- a/src/sam/syndicate.nim +++ b/src/sam/syndicate.nim @@ -37,22 +37,26 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline. patterns.inject(pat, bindings) type - PublishProc = proc (turn: Turn; v: Value; h: Handle) {.closure.} - RetractProc = proc (turn: Turn; h: Handle) {.closure.} - MessageProc = proc (turn: Turn; v: Value) {.closure.} + PublishProc = proc (v: Value; h: Handle) {.closure.} + RetractProc = proc (h: Handle) {.closure.} + MessageProc = proc (v: Value) {.closure.} + ClosureEntity = ref object of Entity - publishImpl*: PublishProc - retractImpl*: RetractProc - messageImpl*: MessageProc + publishCb*: PublishProc + retractCb*: RetractProc + messageCb*: MessageProc -method publish(e: ClosureEntity; turn: Turn; a: AssertionRef; h: Handle) = - if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h) +proc publishCont(e: Entity; v: Value; h: Handle) {.cps: Cont.} = + var ce = ClosureEntity(e) + if not ce.publishCb.isNil: ce.publishCb(v, h) -method retract(e: ClosureEntity; turn: Turn; h: Handle) = - if not e.retractImpl.isNil: e.retractImpl(turn, h) +proc retractCont(e: Entity; h: Handle) {.cps: Cont.} = + var ce = ClosureEntity(e) + if not ce.retractCb.isNil: ce.retractCb(h) -method message(e: ClosureEntity; turn: Turn; a: AssertionRef) = - if not e.messageImpl.isNil: e.messageImpl(turn, a.value) +proc messageCont(e: Entity; v: Value) {.cps: Cont.} = + var ce = ClosureEntity(e) + if not ce.messageCb.isNil: ce.messageCb(v) proc argumentCount(handler: NimNode): int = handler.expectKind {nnkDo, nnkStmtList} @@ -108,7 +112,7 @@ proc wrapMessageHandler(handler: NimNode): NimNode = handlerSym = genSym(nskProc, "message") bindingsSym = ident"bindings" quote do: - proc `handlerSym`(turn: Turn; `bindingsSym`: Value) = + proc `handlerSym`(`bindingsSym`: Value) = `varSection` if fromPreserves(`valuesSym`, bindings): `body` @@ -150,9 +154,8 @@ macro onPublish*(ds: Cap; pattern: Pattern; handler: untyped) = discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`)) ]# -#[ -macro onMessage*(ds: Cap; pattern: Pattern; handler: untyped) = - ## Call `handler` when an message matching `pattern` is broadcasted at `ds`. +macro onMessage*(cap: Cap; pattern: Pattern; handler: untyped) = + ## Call `handler` when an message matching `pattern` is broadcasted at `cap`. let argCount = argumentCount(handler) handlerProc = wrapMessageHandler(handler) @@ -161,8 +164,10 @@ macro onMessage*(ds: Cap; pattern: Pattern; handler: untyped) = if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`: raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`) `handlerProc` - discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`)) -]# + discard observe(`cap`, `pattern`, ClosureEntity( + messageImpl: whelp messageCont, + messageCb: `handlerSym`, + )) #[ macro during*(ds: Cap; pattern: Pattern; publishBody, retractBody: untyped) = @@ -202,15 +207,15 @@ proc wrapHandler(body: NimNode; ident: string): NimNode = proc `sym`() = `body` -macro onStop*(body: untyped) = +#[ +macro onStop*(facet: Facet; body: untyped) = let handlerDef = wrapHandler(body, "onStop") handlerSym = handlerDef[0] result = quote do: `handlerDef` - addOnStopHandler(`handlerSym`) + addOnStopHandler(facet, `handlerSym`) -#[ macro onStop*(body: untyped) = quote do: block: @@ -220,6 +225,3 @@ macro onStop*(body: untyped) = `body` return ]# - -proc runActor*(name: string; bootProc: FacetProc) = - let actor = spawnActor(name, bootProc) diff --git a/syndicate.nimble b/syndicate.nimble index cbe0822..055f954 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20240216" +version = "20240219" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense" diff --git a/tests/test_timers.nim b/tests/test_timers.nim index f27ceb9..a2cbe5a 100644 --- a/tests/test_timers.nim +++ b/tests/test_timers.nim @@ -3,29 +3,34 @@ import std/times import pkg/cps -import syndicate +import sam/syndicate -# import syndicate/actors/timers +import sam/actors/timers proc now: float64 = getTime().toUnixFloat() runActor("timer-test") do (root: Facet): + echo "actor calls boot proc with root facte ", root - let ds = facet.newDataspace() - let h = facet.publish(ds, "hello world!".toPreserves) + let ds = root.newDataspace() + echo "new dataspace ", ds + let h = publish(ds, "hello world!".toPreserves) + echo "published to handle ", h - facet.onMessage(ds, grab()) do (v: Value): + onMessage(ds, grab()) do (v: Value): stderr.writeLine "observed message ", v - facet.message(ds, "hello world!".toPreserves) - facet.retract(h) - facet.sync(ds) + message(ds, "hello world!".toPreserves) + echo "sent a message" + retract(ds, h) + echo "retracted handle ", h + # facet.sync(ds) - facet.onStop: + root.onStop: echo "anonymous stop handler was invoked" echo "stopping actor" - facet.stopActor() + root.stopActor() echo "actor stopped but still executing?" #[