diff --git a/src/syndicate.nim b/src/syndicate.nim index 81d7d99..dd607e4 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -4,9 +4,25 @@ import std/macros import preserves import syndicate/[actors, dataspaces, durings, patterns] -export dataspaces, patterns +export dataspaces, patterns, Handle -from syndicate/protocols/protocol import Handle +type + PublishProc = proc (turn: var Turn; v: Assertion; h: Handle) {.closure.} + RetractProc = proc (turn: var Turn; h: Handle) {.closure.} + MessageProc = proc (turn: var Turn; v: Assertion) {.closure.} + ClosureEntity = ref object of Entity + publishImpl: PublishProc + retractImpl: RetractProc + messageImpl: MessageProc + +method publish(e: ClosureEntity; turn: var Turn; v: Assertion; h: Handle) = + if not e.publishImpl.isNil: e.publishImpl(turn, v, h) + +method retract(e: ClosureEntity; turn: var Turn; h: Handle) = + if not e.retractImpl.isNil: e.retractImpl(turn, h) + +method message(e: ClosureEntity; turn: var Turn; v: Assertion) = + if not e.messageImpl.isNil: e.messageImpl(turn, v) proc wrapPublishHandler(handler: NimNode): NimNode = handler.expectKind nnkDo @@ -37,7 +53,7 @@ proc wrapPublishHandler(handler: NimNode): NimNode = handleSym = ident"handle" handlerSym = genSym(nskProc, "publish") quote do: - proc `handlerSym`(entity: Entity; `turnSym`: var Turn; bindings: Assertion; `handleSym`: Handle) = + proc `handlerSym`(`turnSym`: var Turn; bindings: Assertion; `handleSym`: Handle) = `varSectionOuter` if fromPreserve(`valuesSym`, bindings): `publishBody` @@ -70,7 +86,7 @@ proc wrapMessageHandler(handler: NimNode): NimNode = turnSym = ident"turn" handlerSym = genSym(nskProc, "message") quote do: - proc `handlerSym`(_: Entity; `turnSym`: var Turn; bindings: Assertion) = + proc `handlerSym`(`turnSym`: var Turn; bindings: Assertion) = `varSectionOuter` if fromPreserve(`valuesSym`, bindings): `body` @@ -81,7 +97,7 @@ macro onPublish*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) = handlerSym = handlerProc[0] result = quote do: `handlerProc` - discard observe(`turn`, `ds`, `pattern`, newEntity(publish = `handlerSym`)) + discard observe(`turn`, `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`)) macro onMessage*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) = let @@ -89,5 +105,4 @@ macro onMessage*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) = handlerSym = handlerProc[0] result = quote do: `handlerProc` - discard observe(`turn`, `ds`, `pattern`, newEntity(message = `handlerSym`)) - + discard observe(`turn`, `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`)) diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index c1e3b16..4817788 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -32,17 +32,8 @@ type Caveat = sturdy.Caveat[Ref] Rewrite = sturdy.Rewrite[Ref] - PublishProc* = proc (e: Entity; turn: var Turn; v: Assertion; h: Handle) {.gcsafe.} - RetractProc* = proc (e: Entity; turn: var Turn; h: Handle) {.gcsafe.} - MessageProc* = proc (e: Entity; turn: var Turn; v: Assertion) {.gcsafe.} - SyncProc* = proc (e: Entity; turn: var Turn; peer: Ref) {.gcsafe.} - Entity* = ref object of RootObj oid*: Oid # oid is how Entities are identified over the wire - publishImpl*: PublishProc - retractImpl*: RetractProc - messageImpl*: MessageProc - syncImpl*: SyncProc Ref* {.unpreservable.} = ref object # TODO: rename relay*: Facet @@ -55,7 +46,7 @@ type established: bool OutboundTable = Table[Handle, OutboundAssertion] - Actor = ref object + Actor* = ref object future: Future[void] name: string id: ActorId @@ -71,7 +62,7 @@ type Turn* = object # an object that should remain on the stack id: TurnId - activeFacet*: Facet + facet*: Facet queues: Queues # a ref object that can outlive Turn ParentFacet = Option[Facet] @@ -93,36 +84,10 @@ using turn: var Turn action: TurnAction -proc setProcs*( - result: Entity, - publish: PublishProc = nil, - retract: RetractProc = nil, - message: MessageProc = nil, - sync: SyncProc = nil) {.inline.} = - result.publishImpl = publish - result.retractImpl = retract - result.messageImpl = message - result.syncImpl = sync - -proc newEntity*( - publish: PublishProc = nil, - retract: RetractProc = nil, - message: MessageProc = nil, - sync: SyncProc = nil): Entity = - new result - result.setProcs(publish, retract, message, sync) - -proc publish*(e: Entity; turn: var Turn; v: Assertion; h: Handle) = - if not e.publishImpl.isNil: e.publishImpl(e, turn, v, h) - -proc retract*(e: Entity; turn: var Turn; h: Handle) = - if not e.retractImpl.isNil: e.retractImpl(e, turn, h) - -proc message*(e: Entity; turn: var Turn; v: Assertion) = - if not e.messageImpl.isNil: e.messageImpl(e, turn, v) - -proc sync*(e: Entity; turn: var Turn; peer: Ref) = - if not e.syncImpl.isNil: e.syncImpl(e, turn, peer) +method publish*(e: Entity; turn: var Turn; v: Assertion; h: Handle) {.base.} = discard +method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard +method message*(e: Entity; turn: var Turn; v: Assertion) {.base.} = discard +method sync*(e: Entity; turn: var Turn; peer: Ref) {.base.} = discard proc labels(f: Facet): string = proc catLabels(f: Facet; labels: var string) = @@ -153,11 +118,7 @@ proc attenuate(r: Ref; a: Attenuation): Ref = proc hash*(facet): Hash = facet.id.hash -proc hash(r: Ref): Hash = - !$(r.relay.hash !& - r.target.unsafeAddr.hash) # !& - # r.attenuation.toPreserve.hash) - # TODO: really convert for each hash? +proc hash*(r: Ref): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash) proc nextHandle(facet: Facet): Handle = inc facet.actor.handleAllocator @@ -284,13 +245,13 @@ proc publish(turn: var Turn; r: Ref; v: Assertion; h: Handle) = if not a.isFalse: let e = OutboundAssertion( handle: h, peer: r, established: false) - turn.activeFacet.outbound[h] = e + turn.facet.outbound[h] = e enqueue(turn, r.relay) do (turn: var Turn): e.established = true publish(r.target, turn, a, e.handle) proc publish*(turn: var Turn; r: Ref; a: Assertion): Handle = - result = turn.activeFacet.nextHandle() + result = turn.facet.nextHandle() publish(turn, r, a, result) proc publish*[T](turn: var Turn; r: Ref; a: T): Handle = @@ -304,7 +265,7 @@ proc retract(turn: var Turn; e: OutboundAssertion) = proc retract*(turn: var Turn; h: Handle) = var e: OutboundAssertion - if turn.activeFacet.outbound.pop(h, e): + if turn.facet.outbound.pop(h, e): turn.retract(e) proc message*(turn: var Turn; r: Ref; v: Assertion) = @@ -345,13 +306,12 @@ proc newFacet(actor; parent: ParentFacet): Facet = var initialAssertions: OutboundTable newFacet(actor, parent, initialAssertions) -proc onStop(facet; action) = - facet.shutdownActions.add action - proc isInert(facet): bool = - facet.inertCheckPreventers == 0 and facet.children.len == 0 and facet.outbound.len == 0 + result = facet.children.len == 0 and + (facet.outbound.len == 0 or facet.parent.isNone) and + facet.inertCheckPreventers == 0 -proc preventInertCheck*(facet): (proc() {.gcsafe.}) = +proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} = var armed = true inc facet.inertCheckPreventers proc disarm() = @@ -363,7 +323,7 @@ proc preventInertCheck*(facet): (proc() {.gcsafe.}) = proc inFacet(turn: var Turn; facet; act: TurnAction) = ## Call an action with a facet using a temporary `Turn` ## that shares the `Queues` of the calling `Turn`. - var t = Turn(activeFacet: facet, queues: turn.queues) + var t = Turn(facet: facet, queues: turn.queues) act(t) proc terminate(actor; turn; reason: ref Exception) {.gcsafe.} @@ -372,8 +332,10 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} = if facet.isAlive: facet.isAlive = false let parent = facet.parent + if parent.isSome: + parent.get.children.excl facet block: - var turn = Turn(activeFacet: facet, queues: turn.queues) + var turn = Turn(facet: facet, queues: turn.queues) for child in facet.children: child.terminate(turn, orderly) if orderly: @@ -383,24 +345,22 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} = if orderly: if parent.isSome: if parent.get.isInert: - run(parent.get) do (turn: var Turn): - parent.get.terminate(turn, true) # TODO: is this the right turn? + parent.get.terminate(turn, true) else: - run(facet.actor.root) do (turn: var Turn): - terminate(facet.actor, turn, nil) # TODO: is this the right turn? + terminate(facet.actor, turn, nil) proc stopIfInertAfter(action: TurnAction): TurnAction = proc wrapper(turn: var Turn) = action(turn) - enqueue(turn, turn.activeFacet) do (turn: var Turn): - if (turn.activeFacet.parent.isSome and - (not turn.activeFacet.parent.get.isAlive)) or - turn.activeFacet.isInert: + enqueue(turn, turn.facet) do (turn: var Turn): + if (turn.facet.parent.isSome and + (not turn.facet.parent.get.isAlive)) or + turn.facet.isInert: stop(turn) wrapper proc facet*(turn: var Turn; bootProc: TurnAction): Facet = - result =newFacet(turn.activeFacet.actor, some turn.activeFacet) + result =newFacet(turn.facet.actor, some turn.facet) inFacet(turn, result, stopIfInertAfter(bootProc)) proc newActor(name: string; bootProc: TurnAction; initialAssertions: OutboundTable): Actor = @@ -414,24 +374,22 @@ proc newActor(name: string; bootProc: TurnAction; initialAssertions: OutboundTab result.future = newFuture[void]($result) run( newFacet(result, some result.root, initialAssertions), - # stopIfInertAfter(bootProc) TODO - bootProc) + stopIfInertAfter(bootProc)) -proc newActor*(name: string; bootProc: TurnAction): Actor = +proc bootActor*(name: string; bootProc: TurnAction): Actor = var initialAssertions: OutboundTable newActor(name, bootProc, initialAssertions) proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()) = - enqueue(turn, turn.activeFacet) do (turn: var Turn): + enqueue(turn, turn.facet) do (turn: var Turn): var newOutBound: Table[Handle, OutboundAssertion] for key in initialAssertions: - discard turn.activeFacet.outbound.pop(key, newOutbound[key]) + discard turn.facet.outbound.pop(key, newOutbound[key]) callSoon: discard newActor(name, bootProc, newOutBound) proc newInertRef*(): Ref = - # TODO: really create a new actor? Novy does this only once. - let a = newActor("") do (turn: var Turn): turn.stop() + let a = bootActor("inert") do (turn: var Turn): turn.stop() Ref(relay: a.root) proc atExit*(actor; action) = actor.exitHooks.add action @@ -455,7 +413,7 @@ proc terminate(facet; e: ref Exception) = facet.actor.terminate(turn, e) proc asyncCheck*(turn; fut: FutureBase) = - let facet = turn.activeFacet + let facet = turn.facet fut.addCallback do (): if fut.failed: terminate(facet, fut.error) @@ -477,7 +435,7 @@ proc run*(facet; action: TurnAction; zombieTurn = false) = # TODO: not Nim idiom tryFacet(facet): var turn = Turn( - activeFacet: facet, + facet: facet, queues: newTable[Facet, seq[TurnAction]]()) action(turn) run(turn.queues) @@ -487,29 +445,25 @@ proc stop*(turn: var Turn, facet: Facet) = facet.terminate(turn, true) proc stop*(turn: var Turn) = - stop(turn, turn.activeFacet) + stop(turn, turn.facet) proc stopActor*(turn: var Turn) = - let actor = turn.activeFacet.actor - enqueue(turn, turn.activeFacet.actor.root) do (turn: var Turn): + let actor = turn.facet.actor + enqueue(turn, turn.facet.actor.root) do (turn: var Turn): terminate(actor, turn, nil) proc freshen*(turn: var Turn, act: TurnAction) = assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn") - run(turn.activeFacet, act) + run(turn.facet, act) proc newRef*(relay: Facet; e: Entity): Ref = Ref(relay: relay, target: e) proc newRef*(turn; e: Entity): Ref = - Ref(relay: turn.activeFacet, target: e) + Ref(relay: turn.facet, target: e) proc sync*(turn, refer: Ref, cb: proc(t: Turn) {.gcsafe.}) = discard # TODO -proc log*(f: Facet, args: varargs[string, `$`]) = - echo f, args - proc runActor*(name: string; bootProc: TurnAction): Future[void] = - let actor = newActor(name, bootProc) - result = actor.future + bootActor(name, bootProc).future diff --git a/src/syndicate/durings.nim b/src/syndicate/durings.nim index fef26bc..d8032b6 100644 --- a/src/syndicate/durings.nim +++ b/src/syndicate/durings.nim @@ -1,7 +1,7 @@ # SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[hashes, macros, tables] +import std/[hashes, tables] import preserves import ./actors, ./patterns, ./protocols/dataspace @@ -23,8 +23,7 @@ type cb: DuringProc assertionMap: Table[Handle, DuringAction] -proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) = - var de = DuringEntity(e) +method publish(de: DuringEntity; turn: var Turn; a: Assertion; h: Handle) = let action = de.cb(turn, a) # assert(not action.isNil "should have put in a no-op action") let g = de.assertionMap.getOrDefault h @@ -37,8 +36,7 @@ proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) = of act: raiseAssert("during: duplicate handle in publish: " & $h) -proc duringRetract(e: Entity; turn: var Turn; h: Handle) = - var de = DuringEntity(e) +method retract(de: DuringEntity; turn: var Turn; h: Handle) = let g = de.assertionMap.getOrDefault h case g.kind of null: @@ -49,9 +47,7 @@ proc duringRetract(e: Entity; turn: var Turn; h: Handle) = de.assertionMap.del h g.action(turn) -proc during*(cb: DuringProc): DuringEntity = - result = DuringEntity(cb: cb) - result.setProcs(publish = duringPublish, retract = duringRetract) +proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb) proc observe*(turn: var Turn; ds: Ref; pat: Pattern; e: Entity): Handle = publish(turn, ds, Observe(pattern: pat, observer: embed newRef(turn, e))) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 2b2ef8e..a5eb92a 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -51,33 +51,24 @@ type proc releaseRefOut(r: Relay; e: WireSymbol) = r.exported.drop e -proc syncPeerPublish(e: Entity; t: var Turn; v: Assertion; h: Handle) = - var se = SyncPeerEntity(e) - se.handleMap[h] = publish(t, se.peer, v) +method publish(spe: SyncPeerEntity; t: var Turn; v: Assertion; h: Handle) = + spe.handleMap[h] = publish(t, spe.peer, v) -proc syncPeerRetract(e: Entity; t: var Turn; h: Handle) = - var se = SyncPeerEntity(e) +method retract(se: SyncPeerEntity; t: var Turn; h: Handle) = var other: Handle if se.handleMap.pop(h, other): retract(t, other) -proc syncPeerMessage(e: Entity; t: var Turn; v: Assertion) = - var se = SyncPeerEntity(e) +method message(se: SyncPeerEntity; t: var Turn; v: Assertion) = if not se.e.isNil: se.relay.releaseRefOut(se.e) message(t, se.peer, v) -proc syncPeerSync(e: Entity; t: var Turn; peer: Ref) = - var se = SyncPeerEntity(e) +method sync(se: SyncPeerEntity; t: var Turn; peer: Ref) = sync(t, se.peer, peer) proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity = - result = SyncPeerEntity(relay: r, peer: p) - result.setProcs( - syncPeerPublish, - syncPeerRetract, - syncPeerMessage, - syncPeerSync) + SyncPeerEntity(relay: r, peer: p) proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef = if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay: @@ -132,33 +123,28 @@ proc send(r: Relay; rOid: protocol.Oid; m: Event) = proc send(re: RelayEntity; ev: Event) = send(re.relay, protocol.Oid re.oid, ev) -proc relayPublish(e: Entity; t: var Turn; v: Assertion; h: Handle) = - var - re = RelayEntity(e) +method publish(re: RelayEntity; t: var Turn; v: Assertion; h: Handle) = re.send Event( orKind: EventKind.Assert, `assert`: protocol.Assert[WireRef]( assertion: re.relay.register(v, h), handle: h)) -proc relayRetract(e: Entity; t: var Turn; h: Handle) = - var re = RelayEntity(e) +method retract(re: RelayEntity; t: var Turn; h: Handle) = re.relay.deregister h re.send Event( orKind: EventKind.Retract, retract: Retract(handle: h)) -proc relayMessage(e: Entity; turn: var Turn; msg: Assertion) = +method message(re: RelayEntity; turn: var Turn; msg: Assertion) = var - re = RelayEntity(e) ev = Event(orKind: EventKind.Message) (body, _) = rewriteOut(re.relay, msg, true) ev.message = Message[WireRef](body: body) re.send ev -proc relaySync(e: Entity; turn: var Turn; peer: Ref) = +method sync(re: RelayEntity; turn: var Turn; peer: Ref) = var - re = RelayEntity(e) peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] discard rewriteRefOut(re.relay, turn.newRef(peerEntity), false, exported) @@ -169,8 +155,7 @@ proc relaySync(e: Entity; turn: var Turn; peer: Ref) = sync: Sync[WireRef](peer: embed toPreserve(false, WireRef))) # TODO: send the WireRef? proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity = - result = RelayEntity(label: label, relay: r, oid: o) - result.setProcs(relayPublish, relayRetract, relayMessage, relaySync) + RelayEntity(label: label, relay: r, oid: o) using relay: Relay @@ -214,9 +199,8 @@ proc close(r: Relay) = discard proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) = case event.orKind of EventKind.Assert: - let (a, imported) = rewriteIn(relay, turn.activeFacet, event.assert.assertion) - relay.inboundAssertions[event.assert.handle] = - (turn.publish(`ref`, a), imported,) + let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion) + relay.inboundAssertions[event.assert.handle] = (publish(turn, `ref`, a), imported,) of EventKind.Retract: let remoteHandle = event.retract.handle @@ -226,7 +210,7 @@ proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) = turn.retract(outbound.localHandle) of EventKind.Message: - let (a, imported) = rewriteIn(relay, turn.activeFacet, event.message.body) + let (a, imported) = rewriteIn(relay, turn.facet, event.message.body) assert imported.len == 0, "Cannot receive transient reference" turn.message(`ref`, a) @@ -274,7 +258,7 @@ type proc newRelay(turn: var Turn; opts: RelayOptions): Relay = result = Relay( - facet: turn.activeFacet, + facet: turn.facet, packetWriter: opts.packetWriter, untrusted: opts.untrusted) discard result.facet.preventInertCheck() @@ -292,7 +276,7 @@ proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions): Future[R var wr = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: opts.initialOid.get)) - fut.complete rewriteRefIn(relay, turn.activeFacet, wr, imported) + fut.complete rewriteRefIn(relay, turn.facet, wr, imported) else: fut.complete(nil) opts.nextLocalOid.map do (oid: Oid):