# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense ## This module implements the `Syndicate DSL `_. import std/[asyncdispatch, macros, tables, typetraits] import preserves export fromPreserve, toPreserve import ./syndicate/[actors, dataspaces, durings, patterns] import ./syndicate/protocols/dataspace when defined(posix): from ./syndicate/relays import Tcp, Unix, connect, connectStdio export Tcp, Unix, connect, connectStdio export patterns export Actor, Assertion, Facet, Handle, Ref, Symbol, Turn, TurnAction, `$`, addCallback, analyse, asyncCheck, bootDataspace, facet, future, inFacet, message, newDataspace, onStop, publish, retract, replace, run, stop, unembed, unpackLiterals proc `!`*(typ: static typedesc): Pattern {.inline.} = patterns.dropType(typ) proc `?`*(typ: static typedesc): Pattern {.inline.} = patterns.grabType(typ) proc `?`*(typ: static typedesc; bindings: sink openArray[(int, Pattern)]): Pattern {.inline.} = patterns.grab(typ, bindings) proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline.} = patterns.inject(pat, bindings) proc `?`*[T](val: T): Pattern {.inline.} = patterns.grab[T](val) type Observe* = dataspace.Observe[Ref] PublishProc = proc (turn: var Turn; v: Assertion; h: Handle) {.closure, gcsafe.} RetractProc = proc (turn: var Turn; h: Handle) {.closure, gcsafe.} MessageProc = proc (turn: var Turn; v: Assertion) {.closure, gcsafe.} ClosureEntity = ref object of Entity publishImpl: PublishProc retractImpl: RetractProc messageImpl: MessageProc method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} = if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h) method retract(e: ClosureEntity; turn: var Turn; h: Handle) {.gcsafe.} = if not e.retractImpl.isNil: e.retractImpl(turn, h) method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) {.gcsafe.} = if not e.messageImpl.isNil: e.messageImpl(turn, a.value) proc argumentCount(handler: NimNode): int = handler.expectKind {nnkDo, nnkStmtList} if handler.kind == nnkDo: result = pred handler[3].len proc wrapPublishHandler(handler: NimNode): NimNode = handler.expectKind {nnkDo, nnkStmtList} var innerProc = newNimNode(nnkProcDef) handler.copyChildrenTo innerProc innerProc[0] = genSym(nskProc, "message") var valuesSym = genSym(nskVar, "values") valuesTuple = newNimNode(nnkTupleTy, handler) innerTuple = newNimNode(nnkVarTuple, handler) varSectionInner = newNimNode(nnkVarSection, handler).add(innerTuple) if handler.kind == nnkDo: for i, arg in handler[3]: if i > 0: arg.expectKind nnkIdentDefs if arg[1].kind == nnkEmpty: error("type required for capture", arg) var def = newNimNode(nnkIdentDefs, arg) arg.copyChildrenTo def valuesTuple.add(def) innerTuple.add(arg[0]) innerTuple.add(newEmptyNode(), valuesSym) var varSectionOuter = newNimNode(nnkVarSection, handler).add( newIdentDefs(valuesSym, valuesTuple)) publishBody = if handler.kind == nnkStmtList: handler else: newStmtList(varSectionInner, handler[6]) turnSym = ident"turn" handleSym = ident"handle" handlerSym = genSym(nskProc, "publish") quote do: proc `handlerSym`(`turnSym`: var Turn; bindings: Assertion; `handleSym`: Handle) = `varSectionOuter` if fromPreserve(`valuesSym`, bindings): `publishBody` proc wrapMessageHandler(handler: NimNode): NimNode = handler.expectKind {nnkDo, nnkStmtList} var innerProc = newNimNode(nnkProcDef) handler.copyChildrenTo innerProc innerProc[0] = genSym(nskProc, "message") var valuesSym = genSym(nskVar, "values") valuesTuple = newNimNode(nnkTupleTy, handler) innerTuple = newNimNode(nnkVarTuple, handler) varSectionInner = newNimNode(nnkVarSection, handler).add(innerTuple) if handler.kind == nnkDo: for i, arg in handler[3]: if i > 0: arg.expectKind nnkIdentDefs if arg[1].kind == nnkEmpty: error("type required for capture", arg) var def = newNimNode(nnkIdentDefs, arg) arg.copyChildrenTo def valuesTuple.add(def) innerTuple.add(arg[0]) innerTuple.add(newEmptyNode(), valuesSym) var varSectionOuter = newNimNode(nnkVarSection, handler).add( newIdentDefs(valuesSym, valuesTuple)) body = newStmtList(varSectionInner, handler[6]) turnSym = ident"turn" handlerSym = genSym(nskProc, "message") quote do: proc `handlerSym`(`turnSym`: var Turn; bindings: Assertion) = `varSectionOuter` if fromPreserve(`valuesSym`, bindings): `body` macro onPublish*(turn: Turn; ds: Ref; pattern: Pattern; handler: untyped) = ## Call `handler` when an assertion matching `pattern` is published at `ds`. let argCount = argumentCount(handler) handlerProc = wrapPublishHandler(handler) handlerSym = handlerProc[0] result = quote do: if `pattern`.analyse.capturePaths.len != `argCount`: raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments") `handlerProc` discard observe(`turn`, `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`)) macro onMessage*(turn: Turn; ds: Ref; pattern: Pattern; handler: untyped) = ## Call `handler` when an message matching `pattern` is broadcasted at `ds`. let argCount = argumentCount(handler) handlerProc = wrapMessageHandler(handler) handlerSym = handlerProc[0] result = quote do: if `pattern`.analyse.capturePaths.len != `argCount`: raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments") `handlerProc` discard observe(`turn`, `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`)) proc wrapDuringHandler(entryBody, exitBody: NimNode): NimNode = entryBody.expectKind {nnkDo, nnkStmtList} var innerProc = newNimNode(nnkProcDef) entryBody.copyChildrenTo innerProc innerProc[0] = genSym(nskProc, "during") var valuesSym = ident("rawValues") valuesTuple = newNimNode(nnkTupleTy, entryBody) innerTuple = newNimNode(nnkVarTuple, entryBody) varSectionInner = newNimNode(nnkVarSection, entryBody).add(innerTuple) if entryBody.kind == nnkDo: for i, arg in entryBody[3]: if i > 0: arg.expectKind nnkIdentDefs if arg[1].kind == nnkEmpty: error("type required for capture", arg) var def = newNimNode(nnkIdentDefs, arg) arg.copyChildrenTo def valuesTuple.add(def) innerTuple.add(arg[0]) innerTuple.add(newEmptyNode(), valuesSym) var varSectionOuter = newNimNode(nnkVarSection, entryBody).add( newIdentDefs(valuesSym, valuesTuple)) publishBody = if entryBody.kind == nnkStmtList: entryBody else: newStmtList(varSectionInner, entryBody[6]) turnSym = ident"turn" bindingsSym = ident"bindings" handleSym = ident"duringHandle" duringSym = genSym(nskProc, "during") if exitBody.isNil: quote do: proc `duringSym`(`turnSym`: var Turn; `bindingsSym`: Assertion; `handleSym`: Handle): TurnAction = `varSectionOuter` if fromPreserve(`valuesSym`, `bindingsSym`): `publishBody` else: quote do: proc `duringSym`(`turnSym`: var Turn; `bindingsSym`: Assertion; `handleSym`: Handle): TurnAction = `varSectionOuter` if fromPreserve(`valuesSym`, `bindingsSym`): `publishBody` proc action(`turnSym`: var Turn) = `exitBody` result = action macro during*(turn: var Turn; ds: Ref; pattern: Pattern; publishBody, retractBody: untyped) = ## Call `publishBody` when an assertion matching `pattern` is published to `ds` and ## call `retractBody` on retraction. Assertions that match `pattern` but are not ## convertable to the arguments of `publishBody` are silently discarded. ## ## The following symbols are injected into the scope of both bodies: ## - `turn` - active turn at entry of `publishBody` and `retractBody` ## - `bindings` - raw Preserves sequence that matched `pattern` ## - `duringHandle` - dataspace handle of the assertion that triggered `publishBody` let argCount = argumentCount(publishBody) callbackProc = wrapDuringHandler(publishBody, retractBody) callbackSym = callbackProc[0] result = quote do: if `pattern`.analyse.capturePaths.len != `argCount`: raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments") `callbackProc` discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`)) macro during*(turn: var Turn; ds: Ref; pattern: Pattern; publishBody: untyped) = ## Variant of `during` without a retract body. let argCount = argumentCount(publishBody) callbackProc = wrapDuringHandler(publishBody, nil) callbackSym = callbackProc[0] result = quote do: if `pattern`.analyse.capturePaths.len != `argCount`: raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments") `callbackProc` discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`)) type BootProc = proc (ds: Ref; turn: var Turn) {.gcsafe.} from std/os import getEnv proc runActor*(name: string; bootProc: BootProc) = ## Run an `Actor` to completion. let actor = bootDataspace(name, bootProc) if getEnv"SYNDICATE_DEBUG" == "": while not actor.future.finished: poll() else: while not actor.future.finished: stderr.writeLine("Polling ", name, " actor…") poll() read(actor.future)