This commit is contained in:
Emery Hemingway 2023-07-13 14:15:13 +01:00
parent 7fec2d61ac
commit 9614955320
2 changed files with 132 additions and 43 deletions

View File

@ -5,6 +5,13 @@ import std/[asyncfutures, deques, hashes, monotimes, options, sets, tables, time
import preserves
import ../syndicate/protocols/[protocol, sturdy]
const tracing = defined(traceSyndicate)
when tracing:
import std/streams
from std/os import getEnv
import ./protocols/trace
export Handle
template generateIdType(typ: untyped) =
@ -25,6 +32,11 @@ type
Attenuation = seq[Caveat]
Rewrite = sturdy.Rewrite[Ref]
AssertionRef* = ref object
value*: Preserve[Ref]
# if the Enity methods take a Preserve[Ref] object then the generated
# C code has "redefinition of struct" problems when orc is enabled
Entity* = ref object of RootObj
oid*: Oid # oid is how Entities are identified over the wire
@ -42,41 +54,59 @@ type
Actor* = ref object
future: Future[void]
name: string
id: ActorId
handleAllocator: ref Handle
# a fresh actor gets a new ref Handle and
# all actors spawned from it get the same ref.
root: Facet
exitReason: ref Exception
exitHooks: seq[TurnAction]
id: ActorId
exiting: bool
when tracing:
turnIdAllocator: ref TurnId
traceStream: FileStream
TurnAction* = proc (t: var Turn) {.gcsafe.}
Queues = TableRef[Facet, seq[TurnAction]]
Turn* = object # an object that should remain on the stack
id: TurnId
facet: Facet
queues: Queues # a ref object that can outlive Turn
ParentFacet = Option[Facet]
when defined(traceSyndicate):
desc: TurnDescription[void]
Facet* = ref FacetObj
FacetObj = object
id: FacetId
actor*: Actor
parent: ParentFacet
parent: Facet
children: HashSet[Facet]
outbound: OutboundTable
shutdownActions: seq[TurnAction]
inertCheckPreventers: int
id: FacetId
isAlive: bool
type AssertionRef* = ref object
value*: Preserve[Ref]
# if the Enity methods take a Preserve[Ref] object then the generated
# C code has "redefinition of struct" problems when orc is enabled
when tracing:
proc nextTurnId(facet: Facet): TurnId =
result = succ(facet.actor.turnIdAllocator[])
facet.actor.turnIdAllocator[] = result
proc trace(actor: Actor; act: ActorActivation) =
if not actor.traceStream.isNil:
var entry = TraceEntry[void](
timestamp: getTime().toUnixFloat(),
actor: initRecord("named", actor.name.toPreserve(void)),
item: act)
actor.traceStream.writeText entry.toPreserve(void)
actor.traceStream.writeLine()
proc path(facet: Facet): seq[trace.FacetId[void]] =
var f = facet
while not f.isNil:
result.add f.id.toPreserve
f = f.parent
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base, gcsafe.} = discard
method retract*(e: Entity; turn: var Turn; h: Handle) {.base, gcsafe.} = discard
@ -92,10 +122,11 @@ using
proc labels(f: Facet): string =
proc catLabels(f: Facet; labels: var string) =
labels.add ':'
if f.parent.isSome:
catLabels(f.parent.get, labels)
if not f.parent.isNil:
catLabels(f.parent, labels)
labels.add ':'
labels.add $f.id
when tracing:
labels.add $f.id
result.add f.actor.name
catLabels(f, result)
@ -253,6 +284,18 @@ proc publish(turn: var Turn; r: Ref; v: Assertion; h: Handle) =
enqueue(turn, r.relay) do (turn: var Turn):
e.established = true
publish(r.target, turn, AssertionRef(value: a), e.handle)
when tracing:
var act = ActionDescription[void](orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserve
act.enqueue.event.target.facet = turn.facet.id.toPreserve
act.enqueue.event.target.oid = r.target.oid.toPreserve
act.enqueue.event.detail = trace.TurnEvent[void](orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert.assertion.value.value =
contract(v) do (r: Ref) -> Preserve[void]:
discard
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
proc publish*(turn: var Turn; r: Ref; a: Assertion): Handle =
result = turn.facet.nextHandle()
@ -305,22 +348,22 @@ proc stop*(turn: var Turn) {.gcsafe.}
proc run*(facet; action: TurnAction; zombieTurn = false) {.gcsafe.}
proc newFacet(actor; parent: ParentFacet; initialAssertions: OutboundTable): Facet =
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
result = Facet(
id: getMonoTime().ticks.FacetId,
actor: actor,
parent: parent,
outbound: initialAssertions,
isAlive: true)
if parent.isSome: parent.get.children.incl result
if not parent.isNil: parent.children.incl result
proc newFacet(actor; parent: ParentFacet): Facet =
proc newFacet(actor; parent: Facet): Facet =
var initialAssertions: OutboundTable
newFacet(actor, parent, initialAssertions)
proc isInert(facet): bool =
result = facet.children.len == 0 and
(facet.outbound.len == 0 or facet.parent.isNone) and
(facet.outbound.len == 0 or facet.parent.isNil) and
facet.inertCheckPreventers == 0
proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
@ -344,8 +387,8 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} =
if facet.isAlive:
facet.isAlive = false
let parent = facet.parent
if parent.isSome:
parent.get.children.excl facet
if not parent.isNil:
parent.children.excl facet
block:
var turn = Turn(facet: facet, queues: turn.queues)
while facet.children.len > 0:
@ -355,24 +398,32 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} =
act(turn)
for a in facet.outbound.values: turn.retract(a)
if orderly:
if parent.isSome:
if parent.get.isInert:
parent.get.terminate(turn, true)
if not parent.isNil:
if parent.isInert:
parent.terminate(turn, true)
else:
terminate(facet.actor, turn, nil)
when tracing:
var act = ActionDescription[void](orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
turn.desc.actions.add act
proc stopIfInertAfter(action: TurnAction): TurnAction =
proc wrapper(turn: var Turn) =
action(turn)
enqueue(turn, turn.facet) do (turn: var Turn):
if (turn.facet.parent.isSome and
(not turn.facet.parent.get.isAlive)) or
if (not turn.facet.parent.isNil and
(not turn.facet.parent.isAlive)) or
turn.facet.isInert:
stop(turn)
wrapper
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
result = newFacet(turn.facet.actor, some turn.facet)
result = newFacet(turn.facet.actor, turn.facet)
when tracing:
var act = ActionDescription[void](orKind: ActionDescriptionKind.facetstart)
act.facetstart.path.add result.path
turn.desc.actions.add act
inFacet(turn, result, stopIfInertAfter(bootProc))
proc facet*(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
@ -385,16 +436,28 @@ proc newActor(name: string): Actor =
name: name,
id: ActorId(seed))
new result.handleAllocator
result.root = newFacet(result, none Facet)
result.root = newFacet(result, nil)
result.future = newFuture[void]($result)
when tracing:
var act = ActorActivation[void](orKind: ActorActivationKind.start)
act.start.actorName = Name[void](orKind: NameKind.named)
act.start.actorName.named.name = name.toPreserve(void)
trace(result, act)
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
run(newFacet(actor, some actor.root, initialAssertions), stopIfInertAfter(bootProc))
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc bootActor*(name: string; bootProc: TurnAction): Actor =
var initialAssertions: OutboundTable
result = newActor(name)
new result.handleAllocator
when tracing:
new result.turnIdAllocator
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
case path
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
run(result, bootProc, initialAssertions)
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()) =
@ -404,6 +467,12 @@ proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertion
discard turn.facet.outbound.pop(key, newOutbound[key])
let actor = newActor(name)
actor.handleAllocator = turn.facet.actor.handleAllocator
when tracing:
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
actor.traceStream = turn.facet.actor.traceStream
var act = ActionDescription[void](orKind: ActionDescriptionKind.spawn)
act.spawn.id = actor.id.toPreserve
turn.desc.actions.add act
run(actor, bootProc, newOutBound)
proc newInertRef*(): Ref =
@ -414,6 +483,12 @@ proc atExit*(actor; action) = actor.exitHooks.add action
proc terminate(actor; turn; reason: ref Exception) =
if not actor.exiting:
when tracing:
var act = ActorActivation[void](orKind: ActorActivationKind.stop)
if not reason.isNil:
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
act.stop.status.error.message = reason.msg
trace(actor, act)
actor.exiting = true
actor.exitReason = reason
for hook in actor.exitHooks: hook(turn)
@ -444,17 +519,18 @@ template tryFacet(facet; body: untyped) =
except CatchableError as err: terminate(facet, err)
proc run*(facet; action: TurnAction; zombieTurn = false) =
if not zombieTurn:
if not facet.actor.exitReason.isNil: return
if not facet.isAlive: return
# TODO: not Nim idiom
tryFacet(facet):
var queues = newTable[Facet, seq[TurnAction]]()
block:
var turn = Turn(facet: facet, queues: queues)
action(turn)
for facet, queue in queues:
for action in queue: run(facet, action)
if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
tryFacet(facet):
var queues = newTable[Facet, seq[TurnAction]]()
block:
var turn = Turn(facet: facet, queues: queues)
action(turn)
when tracing:
turn.desc.id = facet.nextTurnId.toPreserve
facet.actor.trace ActorActivation[void](
orKind: ActorActivationKind.turn, turn: turn.desc)
for facet, queue in queues:
for action in queue: run(facet, action)
proc run*(`ref`: Ref; action: TurnAction) =
## Convenience proc to run a `TurnAction` in the scope of a `Ref`.
@ -465,7 +541,14 @@ proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
## within the context of `facet`.
addCallback(fut) do ():
if fut.failed: terminate(facet, fut.error)
else: run(facet, act)
else:
when tracing:
run(facet) do (turn: var Turn):
turn.desc.cause = TurnCause[void](orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserve
act(turn)
else:
run(facet, act)
proc addCallback*(fut: FutureBase; turn: var Turn; act: TurnAction) =
## Add a callback to a `Future` that will be called at a later `Turn`
@ -482,11 +565,17 @@ proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x:
if fut.failed: terminate(facet.facet, fut.error)
else:
run(facet) do (turn: var Turn):
when tracing:
turn.desc.cause = TurnCause[void](orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserve
act(turn, read fut)
proc stop*(turn: var Turn, facet: Facet) =
enqueue(turn, facet.parent.get) do (turn: var Turn):
facet.terminate(turn, true)
if facet.parent.isNil:
facet.terminate(turn, true)
else:
enqueue(turn, facet.parent) do (turn: var Turn):
facet.terminate(turn, true)
proc stop*(turn: var Turn) =
stop(turn, turn.facet)
@ -511,6 +600,6 @@ proc newRef*(turn; e: Entity): Ref =
Ref(relay: turn.facet, target: e)
proc sync*(turn, refer: Ref, cb: proc(t: Turn) {.gcsafe.}) =
discard # TODO
raiseAssert "not implemented"
proc future*(actor): Future[void] = actor.future

View File

@ -1,6 +1,6 @@
# Package
version = "20230712"
version = "20230713"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "Unlicense"