diff --git a/src/syndicate.nim b/src/syndicate.nim index f34bca1..92390d9 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -1,14 +1,38 @@ # SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense +## This module implements the `Syndicate DSL `_. + +runnableExamples: + from syndicate/protocols/simpleChatProtocol import Present, Says + import std/asyncdispatch + + bootDataspace("example") do (ds: Ref; turn: var Turn): + let + me = "user" + presenceHandle = publish(turn, ds, Present(username: me)) + + onMessage(turn, ds, ?Says) do (who: string; what: string): + echo who, ": ", what + retract(turn, presenceHandle) + + during(turn, ds, ?Present) do (username: string): + echo "[", username, " arrived]" + message(turn, ds, Says(who: me, what: "users are losers")) + do: + echo "[", username, "departed]" + + poll() + import std/macros import preserves, preserves/jsonhooks import ./syndicate/[actors, dataspaces, durings, patterns] from ./syndicate/relays import connectStdio, connectUnix -export Assertion, Facet, Handle, Ref, Turn, TurnAction, bootDataspace, `?`, `$`, - connectStdio, connectUnix, drop, grab, message, publish, replace, run, stop +export Assertion, Facet, Handle, Ref, Symbol, Turn, TurnAction, bootDataspace, + `?`, `$`, connectStdio, connectUnix, facet, drop, grab, message, publish, + retract, replace, run, stop, unembed type PublishProc = proc (turn: var Turn; v: Assertion; h: Handle) {.closure.} @@ -110,3 +134,74 @@ macro onMessage*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) = result = quote do: `handlerProc` discard observe(`turn`, `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`)) + +proc wrapDuringHandler(entryBody, exitBody: NimNode): NimNode = + entryBody.expectKind nnkDo + var innerProc = newNimNode(nnkProcDef) + entryBody.copyChildrenTo innerProc + innerProc[0] = genSym(nskProc, "during") + var + formalArgs = entryBody[3] + valuesSym = ident("rawValues") + valuesTuple = newNimNode(nnkTupleTy, entryBody) + innerTuple = newNimNode(nnkVarTuple, entryBody) + varSectionInner = newNimNode(nnkVarSection, entryBody).add(innerTuple) + for i, arg in formalArgs: + if i > 0: + arg.expectKind nnkIdentDefs + if arg[1].kind == nnkEmpty: + error("type required for capture", arg) + var def = newNimNode(nnkIdentDefs, arg) + arg.copyChildrenTo def + valuesTuple.add(def) + innerTuple.add(arg[0]) + innerTuple.add(newEmptyNode(), valuesSym) + var + varSectionOuter = newNimNode(nnkVarSection, entryBody).add( + newIdentDefs(valuesSym, valuesTuple)) + publishBody = newStmtList(varSectionInner, entryBody[6]) + turnSym = ident"turn" + bindingsSym = ident"bindings" + handleSym = ident"duringHandle" + entrySym = genSym(nskProc, "during") + duringSym = genSym(nskProc, "during") + if exitBody.isNil: + quote do: + proc `duringSym`(`turnSym`: var Turn; `bindingsSym`: Assertion; `handleSym`: Handle): TurnAction = + `varSectionOuter` + if fromPreserve(`valuesSym`, `bindingsSym`): + `publishBody` + else: + quote do: + proc `duringSym`(`turnSym`: var Turn; `bindingsSym`: Assertion; `handleSym`: Handle): TurnAction = + `varSectionOuter` + if fromPreserve(`valuesSym`, `bindingsSym`): + `publishBody` + proc action(turn: var Turn) = + `exitBody` + result = action + +macro during*(turn: Turn; ds: Ref; pattern: Pattern; publishBody, retractBody: untyped) = + ## Call `publishBody` when an assertion matching `pattern` is published to `ds` and + ## call `retractBody` on retraction. Assertions that match `pattern` but are not + ## convertable to the arguments of `publishBody` are silently discarded. + ## + ## The following symbols are injected into the scope of both bodies: + ## - `turn` - active turn at entry of `publishBody` + ## - `bindings` - raw Preserves sequence that matched `pattern` + ## - `duringHandle` - dataspace handle of the assertion that triggered `publishBody` + let + callbackProc = wrapDuringHandler(publishBody, retractBody) + callbackSym = callbackProc[0] + result = quote do: + `callbackProc` + discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`)) + +macro during*(turn: Turn; ds: Ref; pattern: Pattern; publishBody: untyped) = + ## Variant of `during` without a retract body. + let + callbackProc = wrapDuringHandler(publishBody, nil) + callbackSym = callbackProc[0] + result = quote do: + `callbackProc` + discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`)) diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index ef843af..8cb7b45 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -62,7 +62,7 @@ type Turn* = object # an object that should remain on the stack id: TurnId - facet*: Facet + facet: Facet queues: Queues # a ref object that can outlive Turn ParentFacet = Option[Facet] @@ -124,6 +124,8 @@ proc nextHandle(facet: Facet): Handle = inc facet.actor.handleAllocator facet.actor.handleAllocator +proc facet*(turn: var Turn): Facet = turn.facet + proc enqueue(turn: var Turn; target: Facet; action: TurnAction) = if target in turn.queues: turn.queues[target].add action @@ -360,7 +362,7 @@ proc stopIfInertAfter(action: TurnAction): TurnAction = wrapper proc facet*(turn: var Turn; bootProc: TurnAction): Facet = - result =newFacet(turn.facet.actor, some turn.facet) + result = newFacet(turn.facet.actor, some turn.facet) inFacet(turn, result, stopIfInertAfter(bootProc)) proc newActor(name: string; bootProc: TurnAction; initialAssertions: OutboundTable): Actor = diff --git a/src/syndicate/durings.nim b/src/syndicate/durings.nim index d8032b6..a1ab59e 100644 --- a/src/syndicate/durings.nim +++ b/src/syndicate/durings.nim @@ -12,7 +12,7 @@ type Turn = actors.Turn type - DuringProc* = proc (turn: var Turn; a: Assertion): TurnAction {.gcsafe.} + DuringProc* = proc (turn: var Turn; a: Assertion; h: Handle): TurnAction {.gcsafe.} DuringActionKind = enum null, dead, act DuringAction = object case kind: DuringActionKind @@ -24,7 +24,7 @@ type assertionMap: Table[Handle, DuringAction] method publish(de: DuringEntity; turn: var Turn; a: Assertion; h: Handle) = - let action = de.cb(turn, a) + let action = de.cb(turn, a, h) # assert(not action.isNil "should have put in a no-op action") let g = de.assertionMap.getOrDefault h case g.kind @@ -45,7 +45,8 @@ method retract(de: DuringEntity; turn: var Turn; h: Handle) = raiseAssert("during: duplicate handle in retract: " & $h) of act: de.assertionMap.del h - g.action(turn) + if not g.action.isNil: + g.action(turn) proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb) diff --git a/src/syndicate/patterns.nim b/src/syndicate/patterns.nim index c5d3179..e0beff9 100644 --- a/src/syndicate/patterns.nim +++ b/src/syndicate/patterns.nim @@ -2,7 +2,6 @@ # SPDX-License-Identifier: Unlicense import std/[tables, typetraits] -import preserves/private/macros import preserves import ./protocols/dataspacePatterns @@ -59,9 +58,9 @@ proc `?*`*(): Pattern = grab() proc `?`*(T: typedesc; bindings: sink openArray[(int, Pattern)]): Pattern = ## Pattern constructor operator. # TODO: get a pattern for T and then replace the inner patterns with bindings. - when T.hasCustomPragma(preservesRecord): + when T.hasPreservesRecordPragma: var - label = tosymbol(T.getCustomPragmaVal(preservesRecord), Ref) + label = T.recordLabel.tosymbol(Ref) fields = newSeq[Pattern]() for (i, pat) in bindings: if i > fields.high: fields.setLen(succ i) @@ -82,9 +81,9 @@ proc `?`*(T: static typedesc): Pattern = when T is ref: ?pointerBase(T) elif T is Preserve: grab() - elif T.hasCustomPragma(preservesRecord): + elif T.hasPreservesRecordPragma: var - label = tosymbol(T.getCustomPragmaVal(preservesRecord), Ref) + label = T.recordLabel.tosymbol(Ref) fields = newSeq[Pattern]() for key, val in fieldPairs(default T): fields.add ?(typeOf val) @@ -92,14 +91,14 @@ proc `?`*(T: static typedesc): Pattern = orKind: DCompoundKind.rec, rec: DCompoundRec( label: label, fields: fields)) - elif T.hasCustomPragma(preservesTuple): + elif T.hasPreservesTuplePragma: var arr = DCompoundArr() for key, val in fieldPairs(default T): arr.items.add grab() ?DCompound( orKind: DCompoundKind.arr, arr: arr) - elif T.hasCustomPragma(preservesDictionary): + elif T.hasPreservesDictionaryPragma: var dict = DCompoundDict() for key, val in fieldPairs(default T): dict.entries[key.toSymbol(Ref)] = ?(typeOf val) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 49a350b..281615d 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -287,8 +287,9 @@ method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = type SturdyRef = sturdy.SturdyRef[Ref] Resolve = gatekeeper.Resolve[Ref] + ConnectProc* = proc (turn: var Turn; a: Assertion): TurnAction {.gcsafe.} -proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: DuringProc) = +proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) = var socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, @@ -334,7 +335,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: During run(gatekeeper.relay) do (turn: var Turn): reenable() discard publish(turn, shutdownRef, true) - proc duringCallback(turn: var Turn; ds: Preserve[Ref]): TurnAction = + proc duringCallback(turn: var Turn; ds: Assertion; h: Handle): TurnAction = let facet = facet(turn) do (turn: var Turn): discard bootProc(turn, ds) # TODO: what to do with this? proc action(turn: var Turn) = diff --git a/syndicate.nimble b/syndicate.nimble index ceeae35..6d28222 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "1.2.1" +version = "1.3.0" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense"