Cleanup actors module

Use methods at Entity rather than proc pointers, but use proc
pointers within the DSL in the syndicate module.
This commit is contained in:
Emery Hemingway 2022-03-12 10:08:22 -06:00
parent 231928f243
commit aca382e178
4 changed files with 80 additions and 131 deletions

View File

@ -4,9 +4,25 @@
import std/macros
import preserves
import syndicate/[actors, dataspaces, durings, patterns]
export dataspaces, patterns
export dataspaces, patterns, Handle
from syndicate/protocols/protocol import Handle
type
PublishProc = proc (turn: var Turn; v: Assertion; h: Handle) {.closure.}
RetractProc = proc (turn: var Turn; h: Handle) {.closure.}
MessageProc = proc (turn: var Turn; v: Assertion) {.closure.}
ClosureEntity = ref object of Entity
publishImpl: PublishProc
retractImpl: RetractProc
messageImpl: MessageProc
method publish(e: ClosureEntity; turn: var Turn; v: Assertion; h: Handle) =
if not e.publishImpl.isNil: e.publishImpl(turn, v, h)
method retract(e: ClosureEntity; turn: var Turn; h: Handle) =
if not e.retractImpl.isNil: e.retractImpl(turn, h)
method message(e: ClosureEntity; turn: var Turn; v: Assertion) =
if not e.messageImpl.isNil: e.messageImpl(turn, v)
proc wrapPublishHandler(handler: NimNode): NimNode =
handler.expectKind nnkDo
@ -37,7 +53,7 @@ proc wrapPublishHandler(handler: NimNode): NimNode =
handleSym = ident"handle"
handlerSym = genSym(nskProc, "publish")
quote do:
proc `handlerSym`(entity: Entity; `turnSym`: var Turn; bindings: Assertion; `handleSym`: Handle) =
proc `handlerSym`(`turnSym`: var Turn; bindings: Assertion; `handleSym`: Handle) =
`varSectionOuter`
if fromPreserve(`valuesSym`, bindings):
`publishBody`
@ -70,7 +86,7 @@ proc wrapMessageHandler(handler: NimNode): NimNode =
turnSym = ident"turn"
handlerSym = genSym(nskProc, "message")
quote do:
proc `handlerSym`(_: Entity; `turnSym`: var Turn; bindings: Assertion) =
proc `handlerSym`(`turnSym`: var Turn; bindings: Assertion) =
`varSectionOuter`
if fromPreserve(`valuesSym`, bindings):
`body`
@ -81,7 +97,7 @@ macro onPublish*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) =
handlerSym = handlerProc[0]
result = quote do:
`handlerProc`
discard observe(`turn`, `ds`, `pattern`, newEntity(publish = `handlerSym`))
discard observe(`turn`, `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`))
macro onMessage*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) =
let
@ -89,5 +105,4 @@ macro onMessage*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) =
handlerSym = handlerProc[0]
result = quote do:
`handlerProc`
discard observe(`turn`, `ds`, `pattern`, newEntity(message = `handlerSym`))
discard observe(`turn`, `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`))

View File

@ -32,17 +32,8 @@ type
Caveat = sturdy.Caveat[Ref]
Rewrite = sturdy.Rewrite[Ref]
PublishProc* = proc (e: Entity; turn: var Turn; v: Assertion; h: Handle) {.gcsafe.}
RetractProc* = proc (e: Entity; turn: var Turn; h: Handle) {.gcsafe.}
MessageProc* = proc (e: Entity; turn: var Turn; v: Assertion) {.gcsafe.}
SyncProc* = proc (e: Entity; turn: var Turn; peer: Ref) {.gcsafe.}
Entity* = ref object of RootObj
oid*: Oid # oid is how Entities are identified over the wire
publishImpl*: PublishProc
retractImpl*: RetractProc
messageImpl*: MessageProc
syncImpl*: SyncProc
Ref* {.unpreservable.} = ref object # TODO: rename
relay*: Facet
@ -55,7 +46,7 @@ type
established: bool
OutboundTable = Table[Handle, OutboundAssertion]
Actor = ref object
Actor* = ref object
future: Future[void]
name: string
id: ActorId
@ -71,7 +62,7 @@ type
Turn* = object # an object that should remain on the stack
id: TurnId
activeFacet*: Facet
facet*: Facet
queues: Queues # a ref object that can outlive Turn
ParentFacet = Option[Facet]
@ -93,36 +84,10 @@ using
turn: var Turn
action: TurnAction
proc setProcs*(
result: Entity,
publish: PublishProc = nil,
retract: RetractProc = nil,
message: MessageProc = nil,
sync: SyncProc = nil) {.inline.} =
result.publishImpl = publish
result.retractImpl = retract
result.messageImpl = message
result.syncImpl = sync
proc newEntity*(
publish: PublishProc = nil,
retract: RetractProc = nil,
message: MessageProc = nil,
sync: SyncProc = nil): Entity =
new result
result.setProcs(publish, retract, message, sync)
proc publish*(e: Entity; turn: var Turn; v: Assertion; h: Handle) =
if not e.publishImpl.isNil: e.publishImpl(e, turn, v, h)
proc retract*(e: Entity; turn: var Turn; h: Handle) =
if not e.retractImpl.isNil: e.retractImpl(e, turn, h)
proc message*(e: Entity; turn: var Turn; v: Assertion) =
if not e.messageImpl.isNil: e.messageImpl(e, turn, v)
proc sync*(e: Entity; turn: var Turn; peer: Ref) =
if not e.syncImpl.isNil: e.syncImpl(e, turn, peer)
method publish*(e: Entity; turn: var Turn; v: Assertion; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: var Turn; v: Assertion) {.base.} = discard
method sync*(e: Entity; turn: var Turn; peer: Ref) {.base.} = discard
proc labels(f: Facet): string =
proc catLabels(f: Facet; labels: var string) =
@ -153,11 +118,7 @@ proc attenuate(r: Ref; a: Attenuation): Ref =
proc hash*(facet): Hash =
facet.id.hash
proc hash(r: Ref): Hash =
!$(r.relay.hash !&
r.target.unsafeAddr.hash) # !&
# r.attenuation.toPreserve.hash)
# TODO: really convert for each hash?
proc hash*(r: Ref): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
proc nextHandle(facet: Facet): Handle =
inc facet.actor.handleAllocator
@ -284,13 +245,13 @@ proc publish(turn: var Turn; r: Ref; v: Assertion; h: Handle) =
if not a.isFalse:
let e = OutboundAssertion(
handle: h, peer: r, established: false)
turn.activeFacet.outbound[h] = e
turn.facet.outbound[h] = e
enqueue(turn, r.relay) do (turn: var Turn):
e.established = true
publish(r.target, turn, a, e.handle)
proc publish*(turn: var Turn; r: Ref; a: Assertion): Handle =
result = turn.activeFacet.nextHandle()
result = turn.facet.nextHandle()
publish(turn, r, a, result)
proc publish*[T](turn: var Turn; r: Ref; a: T): Handle =
@ -304,7 +265,7 @@ proc retract(turn: var Turn; e: OutboundAssertion) =
proc retract*(turn: var Turn; h: Handle) =
var e: OutboundAssertion
if turn.activeFacet.outbound.pop(h, e):
if turn.facet.outbound.pop(h, e):
turn.retract(e)
proc message*(turn: var Turn; r: Ref; v: Assertion) =
@ -345,13 +306,12 @@ proc newFacet(actor; parent: ParentFacet): Facet =
var initialAssertions: OutboundTable
newFacet(actor, parent, initialAssertions)
proc onStop(facet; action) =
facet.shutdownActions.add action
proc isInert(facet): bool =
facet.inertCheckPreventers == 0 and facet.children.len == 0 and facet.outbound.len == 0
result = facet.children.len == 0 and
(facet.outbound.len == 0 or facet.parent.isNone) and
facet.inertCheckPreventers == 0
proc preventInertCheck*(facet): (proc() {.gcsafe.}) =
proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
var armed = true
inc facet.inertCheckPreventers
proc disarm() =
@ -363,7 +323,7 @@ proc preventInertCheck*(facet): (proc() {.gcsafe.}) =
proc inFacet(turn: var Turn; facet; act: TurnAction) =
## Call an action with a facet using a temporary `Turn`
## that shares the `Queues` of the calling `Turn`.
var t = Turn(activeFacet: facet, queues: turn.queues)
var t = Turn(facet: facet, queues: turn.queues)
act(t)
proc terminate(actor; turn; reason: ref Exception) {.gcsafe.}
@ -372,8 +332,10 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} =
if facet.isAlive:
facet.isAlive = false
let parent = facet.parent
if parent.isSome:
parent.get.children.excl facet
block:
var turn = Turn(activeFacet: facet, queues: turn.queues)
var turn = Turn(facet: facet, queues: turn.queues)
for child in facet.children:
child.terminate(turn, orderly)
if orderly:
@ -383,24 +345,22 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} =
if orderly:
if parent.isSome:
if parent.get.isInert:
run(parent.get) do (turn: var Turn):
parent.get.terminate(turn, true) # TODO: is this the right turn?
parent.get.terminate(turn, true)
else:
run(facet.actor.root) do (turn: var Turn):
terminate(facet.actor, turn, nil) # TODO: is this the right turn?
terminate(facet.actor, turn, nil)
proc stopIfInertAfter(action: TurnAction): TurnAction =
proc wrapper(turn: var Turn) =
action(turn)
enqueue(turn, turn.activeFacet) do (turn: var Turn):
if (turn.activeFacet.parent.isSome and
(not turn.activeFacet.parent.get.isAlive)) or
turn.activeFacet.isInert:
enqueue(turn, turn.facet) do (turn: var Turn):
if (turn.facet.parent.isSome and
(not turn.facet.parent.get.isAlive)) or
turn.facet.isInert:
stop(turn)
wrapper
proc facet*(turn: var Turn; bootProc: TurnAction): Facet =
result =newFacet(turn.activeFacet.actor, some turn.activeFacet)
result =newFacet(turn.facet.actor, some turn.facet)
inFacet(turn, result, stopIfInertAfter(bootProc))
proc newActor(name: string; bootProc: TurnAction; initialAssertions: OutboundTable): Actor =
@ -414,24 +374,22 @@ proc newActor(name: string; bootProc: TurnAction; initialAssertions: OutboundTab
result.future = newFuture[void]($result)
run(
newFacet(result, some result.root, initialAssertions),
# stopIfInertAfter(bootProc) TODO
bootProc)
stopIfInertAfter(bootProc))
proc newActor*(name: string; bootProc: TurnAction): Actor =
proc bootActor*(name: string; bootProc: TurnAction): Actor =
var initialAssertions: OutboundTable
newActor(name, bootProc, initialAssertions)
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()) =
enqueue(turn, turn.activeFacet) do (turn: var Turn):
enqueue(turn, turn.facet) do (turn: var Turn):
var newOutBound: Table[Handle, OutboundAssertion]
for key in initialAssertions:
discard turn.activeFacet.outbound.pop(key, newOutbound[key])
discard turn.facet.outbound.pop(key, newOutbound[key])
callSoon:
discard newActor(name, bootProc, newOutBound)
proc newInertRef*(): Ref =
# TODO: really create a new actor? Novy does this only once.
let a = newActor("") do (turn: var Turn): turn.stop()
let a = bootActor("inert") do (turn: var Turn): turn.stop()
Ref(relay: a.root)
proc atExit*(actor; action) = actor.exitHooks.add action
@ -455,7 +413,7 @@ proc terminate(facet; e: ref Exception) =
facet.actor.terminate(turn, e)
proc asyncCheck*(turn; fut: FutureBase) =
let facet = turn.activeFacet
let facet = turn.facet
fut.addCallback do ():
if fut.failed: terminate(facet, fut.error)
@ -477,7 +435,7 @@ proc run*(facet; action: TurnAction; zombieTurn = false) =
# TODO: not Nim idiom
tryFacet(facet):
var turn = Turn(
activeFacet: facet,
facet: facet,
queues: newTable[Facet, seq[TurnAction]]())
action(turn)
run(turn.queues)
@ -487,29 +445,25 @@ proc stop*(turn: var Turn, facet: Facet) =
facet.terminate(turn, true)
proc stop*(turn: var Turn) =
stop(turn, turn.activeFacet)
stop(turn, turn.facet)
proc stopActor*(turn: var Turn) =
let actor = turn.activeFacet.actor
enqueue(turn, turn.activeFacet.actor.root) do (turn: var Turn):
let actor = turn.facet.actor
enqueue(turn, turn.facet.actor.root) do (turn: var Turn):
terminate(actor, turn, nil)
proc freshen*(turn: var Turn, act: TurnAction) =
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
run(turn.activeFacet, act)
run(turn.facet, act)
proc newRef*(relay: Facet; e: Entity): Ref =
Ref(relay: relay, target: e)
proc newRef*(turn; e: Entity): Ref =
Ref(relay: turn.activeFacet, target: e)
Ref(relay: turn.facet, target: e)
proc sync*(turn, refer: Ref, cb: proc(t: Turn) {.gcsafe.}) =
discard # TODO
proc log*(f: Facet, args: varargs[string, `$`]) =
echo f, args
proc runActor*(name: string; bootProc: TurnAction): Future[void] =
let actor = newActor(name, bootProc)
result = actor.future
bootActor(name, bootProc).future

View File

@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, macros, tables]
import std/[hashes, tables]
import preserves
import ./actors, ./patterns, ./protocols/dataspace
@ -23,8 +23,7 @@ type
cb: DuringProc
assertionMap: Table[Handle, DuringAction]
proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) =
var de = DuringEntity(e)
method publish(de: DuringEntity; turn: var Turn; a: Assertion; h: Handle) =
let action = de.cb(turn, a)
# assert(not action.isNil "should have put in a no-op action")
let g = de.assertionMap.getOrDefault h
@ -37,8 +36,7 @@ proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) =
of act:
raiseAssert("during: duplicate handle in publish: " & $h)
proc duringRetract(e: Entity; turn: var Turn; h: Handle) =
var de = DuringEntity(e)
method retract(de: DuringEntity; turn: var Turn; h: Handle) =
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
@ -49,9 +47,7 @@ proc duringRetract(e: Entity; turn: var Turn; h: Handle) =
de.assertionMap.del h
g.action(turn)
proc during*(cb: DuringProc): DuringEntity =
result = DuringEntity(cb: cb)
result.setProcs(publish = duringPublish, retract = duringRetract)
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
proc observe*(turn: var Turn; ds: Ref; pat: Pattern; e: Entity): Handle =
publish(turn, ds, Observe(pattern: pat, observer: embed newRef(turn, e)))

View File

@ -51,33 +51,24 @@ type
proc releaseRefOut(r: Relay; e: WireSymbol) =
r.exported.drop e
proc syncPeerPublish(e: Entity; t: var Turn; v: Assertion; h: Handle) =
var se = SyncPeerEntity(e)
se.handleMap[h] = publish(t, se.peer, v)
method publish(spe: SyncPeerEntity; t: var Turn; v: Assertion; h: Handle) =
spe.handleMap[h] = publish(t, spe.peer, v)
proc syncPeerRetract(e: Entity; t: var Turn; h: Handle) =
var se = SyncPeerEntity(e)
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
var other: Handle
if se.handleMap.pop(h, other):
retract(t, other)
proc syncPeerMessage(e: Entity; t: var Turn; v: Assertion) =
var se = SyncPeerEntity(e)
method message(se: SyncPeerEntity; t: var Turn; v: Assertion) =
if not se.e.isNil:
se.relay.releaseRefOut(se.e)
message(t, se.peer, v)
proc syncPeerSync(e: Entity; t: var Turn; peer: Ref) =
var se = SyncPeerEntity(e)
method sync(se: SyncPeerEntity; t: var Turn; peer: Ref) =
sync(t, se.peer, peer)
proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity =
result = SyncPeerEntity(relay: r, peer: p)
result.setProcs(
syncPeerPublish,
syncPeerRetract,
syncPeerMessage,
syncPeerSync)
SyncPeerEntity(relay: r, peer: p)
proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef =
if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay:
@ -132,33 +123,28 @@ proc send(r: Relay; rOid: protocol.Oid; m: Event) =
proc send(re: RelayEntity; ev: Event) =
send(re.relay, protocol.Oid re.oid, ev)
proc relayPublish(e: Entity; t: var Turn; v: Assertion; h: Handle) =
var
re = RelayEntity(e)
method publish(re: RelayEntity; t: var Turn; v: Assertion; h: Handle) =
re.send Event(
orKind: EventKind.Assert,
`assert`: protocol.Assert[WireRef](
assertion: re.relay.register(v, h),
handle: h))
proc relayRetract(e: Entity; t: var Turn; h: Handle) =
var re = RelayEntity(e)
method retract(re: RelayEntity; t: var Turn; h: Handle) =
re.relay.deregister h
re.send Event(
orKind: EventKind.Retract,
retract: Retract(handle: h))
proc relayMessage(e: Entity; turn: var Turn; msg: Assertion) =
method message(re: RelayEntity; turn: var Turn; msg: Assertion) =
var
re = RelayEntity(e)
ev = Event(orKind: EventKind.Message)
(body, _) = rewriteOut(re.relay, msg, true)
ev.message = Message[WireRef](body: body)
re.send ev
proc relaySync(e: Entity; turn: var Turn; peer: Ref) =
method sync(re: RelayEntity; turn: var Turn; peer: Ref) =
var
re = RelayEntity(e)
peerEntity = newSyncPeerEntity(re.relay, peer)
exported: seq[WireSymbol]
discard rewriteRefOut(re.relay, turn.newRef(peerEntity), false, exported)
@ -169,8 +155,7 @@ proc relaySync(e: Entity; turn: var Turn; peer: Ref) =
sync: Sync[WireRef](peer: embed toPreserve(false, WireRef))) # TODO: send the WireRef?
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
result = RelayEntity(label: label, relay: r, oid: o)
result.setProcs(relayPublish, relayRetract, relayMessage, relaySync)
RelayEntity(label: label, relay: r, oid: o)
using
relay: Relay
@ -214,9 +199,8 @@ proc close(r: Relay) = discard
proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) =
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.activeFacet, event.assert.assertion)
relay.inboundAssertions[event.assert.handle] =
(turn.publish(`ref`, a), imported,)
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
relay.inboundAssertions[event.assert.handle] = (publish(turn, `ref`, a), imported,)
of EventKind.Retract:
let remoteHandle = event.retract.handle
@ -226,7 +210,7 @@ proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) =
turn.retract(outbound.localHandle)
of EventKind.Message:
let (a, imported) = rewriteIn(relay, turn.activeFacet, event.message.body)
let (a, imported) = rewriteIn(relay, turn.facet, event.message.body)
assert imported.len == 0, "Cannot receive transient reference"
turn.message(`ref`, a)
@ -274,7 +258,7 @@ type
proc newRelay(turn: var Turn; opts: RelayOptions): Relay =
result = Relay(
facet: turn.activeFacet,
facet: turn.facet,
packetWriter: opts.packetWriter,
untrusted: opts.untrusted)
discard result.facet.preventInertCheck()
@ -292,7 +276,7 @@ proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions): Future[R
var wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
fut.complete rewriteRefIn(relay, turn.activeFacet, wr, imported)
fut.complete rewriteRefIn(relay, turn.facet, wr, imported)
else:
fut.complete(nil)
opts.nextLocalOid.map do (oid: Oid):