Re-implement onPublish and onMessage
This commit is contained in:
parent
272b6dfcb7
commit
166152cd84
|
@ -1,239 +1,91 @@
|
|||
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, macros, options]
|
||||
import std/macros
|
||||
import preserves
|
||||
import syndicate/[actors, patterns]
|
||||
export patterns
|
||||
|
||||
import syndicate/[actors]
|
||||
|
||||
#[
|
||||
import syndicate/[assertions, dataspaces, events, skeletons]
|
||||
|
||||
|
||||
export preserves.fromPreserve
|
||||
export assertions.Observe
|
||||
export dataspaces.Facet
|
||||
export dataspaces.FieldId
|
||||
export dataspaces.Fields
|
||||
export dataspaces.`==`
|
||||
export dataspaces.addEndpoint
|
||||
export dataspaces.addStartScript
|
||||
export dataspaces.addStopScript
|
||||
export dataspaces.beginExternalTask
|
||||
export dataspaces.defineObservableProperty
|
||||
export dataspaces.endExternalTask
|
||||
export dataspaces.generateId
|
||||
export dataspaces.hash
|
||||
export dataspaces.recordDamage
|
||||
export dataspaces.recordObservation
|
||||
export dataspaces.scheduleScript
|
||||
export dataspaces.stop
|
||||
export events.EventKind
|
||||
export skeletons.Analysis
|
||||
|
||||
export asyncdispatch.`callback=`
|
||||
export options.get
|
||||
|
||||
proc getCurrentFacet*(): Facet = raiseAssert("must be called from within the DSL")
|
||||
## Return the current `Facet` for this context.
|
||||
|
||||
template stopIf*(cond, body: untyped): untyped =
|
||||
## Stop the current facet if `cond` is true and
|
||||
## invoke `body` after the facet has stopped.
|
||||
mixin getCurrentFacet
|
||||
getCurrentFacet().addDataflow do (facet: Facet):
|
||||
if cond:
|
||||
facet.stop do (facet: Facet):
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = facet
|
||||
body
|
||||
|
||||
template send*(msg: Preserve): untyped =
|
||||
mixin getCurrentFacet
|
||||
send(getCurrentFacet(), msg)
|
||||
|
||||
proc wrapDoHandler(pattern, handler: NimNode): NimNode =
|
||||
## Generate a procedure that unpacks a `pattern` match to fit the
|
||||
## parameters of `handler`, and calls the body of `handler`.
|
||||
proc wrapPublishHandler(handler: NimNode): NimNode =
|
||||
handler.expectKind nnkDo
|
||||
var innerProc = newNimNode(nnkProcDef)
|
||||
handler.copyChildrenTo innerProc
|
||||
innerProc[0] = genSym(nskProc, "message")
|
||||
var
|
||||
formalArgs = handler[3]
|
||||
cbFacetSym = genSym(nskParam, "facet")
|
||||
scriptFacetSym = genSym(nskParam, "facet")
|
||||
recSym = genSym(nskParam, "bindings")
|
||||
varSection = newNimNode(nnkVarSection, handler)
|
||||
conditional: NimNode
|
||||
argCount: int
|
||||
valuesSym = genSym(nskVar, "values")
|
||||
valuesTuple = newNimNode(nnkTupleTy, handler)
|
||||
innerTuple = newNimNode(nnkVarTuple, handler)
|
||||
varSectionInner = newNimNode(nnkVarSection, handler).add(innerTuple)
|
||||
for i, arg in formalArgs:
|
||||
if i > 0:
|
||||
arg.expectKind nnkIdentDefs
|
||||
if arg[0] == ident"_" or arg[0] == ident"*":
|
||||
if arg[1].kind != nnkEmpty:
|
||||
error("placeholders may not be typed", arg)
|
||||
else:
|
||||
if arg[1].kind == nnkEmpty:
|
||||
error("type required for capture", arg)
|
||||
var varDef = newNimNode(nnkIdentDefs, arg)
|
||||
arg.copyChildrenTo varDef
|
||||
varSection.add(varDef)
|
||||
var conversion = newCall("fromPreserve", varDef[0],
|
||||
newNimNode(nnkBracketExpr).add(recSym, newLit(pred i)))
|
||||
if conditional.isNil:
|
||||
conditional = conversion
|
||||
else:
|
||||
conditional = infix(conditional, "and", conversion)
|
||||
inc(argCount)
|
||||
var scriptBody = newStmtList()
|
||||
if argCount > 0:
|
||||
scriptBody.add(
|
||||
varSection,
|
||||
newNimNode(nnkIfStmt).add(
|
||||
newNimNode(nnkElifBranch).add(
|
||||
conditional, handler[6])))
|
||||
else:
|
||||
scriptBody.add(handler[6])
|
||||
if arg[1].kind == nnkEmpty:
|
||||
error("type required for capture", arg)
|
||||
var def = newNimNode(nnkIdentDefs, arg)
|
||||
arg.copyChildrenTo def
|
||||
valuesTuple.add(def)
|
||||
innerTuple.add(arg[0])
|
||||
innerTuple.add(newEmptyNode(), valuesSym)
|
||||
var
|
||||
scriptSym = genSym(nskProc, "script")
|
||||
handlerSym = genSym(nskProc, "handler")
|
||||
litArgCount = newLit argCount
|
||||
varSectionOuter = newNimNode(nnkVarSection, handler).add(
|
||||
newIdentDefs(valuesSym, valuesTuple))
|
||||
body = newStmtList(varSectionInner, handler[6])
|
||||
turnSym = ident"turn"
|
||||
handleSym = ident"handle"
|
||||
handlerSym = genSym(nskProc, "publish")
|
||||
quote do:
|
||||
proc `handlerSym`(`cbFacetSym`: Facet; `recSym`: seq[Preserve]) =
|
||||
assert(`litArgCount` == captureCount(`pattern`), "pattern does not match handler")
|
||||
# this should be a compile-time check
|
||||
assert(
|
||||
`litArgCount` == len(`recSym`),
|
||||
"cannot unpack " & $`litArgCount` & " bindings from " & $(toPreserve `recSym`))
|
||||
proc `scriptSym`(`scriptFacetSym`: Facet) =
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = `scriptFacetSym`
|
||||
`scriptBody`
|
||||
scheduleScript(`cbFacetSym`, `scriptSym`)
|
||||
proc `handlerSym`(_: Entity; `turnSym`: var Turn; bindings: Assertion; `handleSym`: Handle) =
|
||||
`varSectionOuter`
|
||||
if fromPreserve(`valuesSym`, bindings):
|
||||
`body`
|
||||
|
||||
proc wrapHandler(pattern, handler: NimNode): NimNode =
|
||||
case handler.kind
|
||||
of nnkDo:
|
||||
result = wrapDoHandler(pattern, handler)
|
||||
of nnkStmtList:
|
||||
let sym = genSym(nskProc, "handler")
|
||||
result = quote do:
|
||||
proc `sym`(facet: Facet; _: seq[Preserve]) =
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = facet
|
||||
`handler`
|
||||
else:
|
||||
error("unhandled event handler", handler)
|
||||
proc wrapMessageHandler(handler: NimNode): NimNode =
|
||||
handler.expectKind nnkDo
|
||||
var innerProc = newNimNode(nnkProcDef)
|
||||
handler.copyChildrenTo innerProc
|
||||
innerProc[0] = genSym(nskProc, "message")
|
||||
var
|
||||
formalArgs = handler[3]
|
||||
valuesSym = genSym(nskVar, "values")
|
||||
valuesTuple = newNimNode(nnkTupleTy, handler)
|
||||
innerTuple = newNimNode(nnkVarTuple, handler)
|
||||
varSectionInner = newNimNode(nnkVarSection, handler).add(innerTuple)
|
||||
for i, arg in formalArgs:
|
||||
if i > 0:
|
||||
arg.expectKind nnkIdentDefs
|
||||
if arg[1].kind == nnkEmpty:
|
||||
error("type required for capture", arg)
|
||||
var def = newNimNode(nnkIdentDefs, arg)
|
||||
arg.copyChildrenTo def
|
||||
valuesTuple.add(def)
|
||||
innerTuple.add(arg[0])
|
||||
innerTuple.add(newEmptyNode(), valuesSym)
|
||||
var
|
||||
varSectionOuter = newNimNode(nnkVarSection, handler).add(
|
||||
newIdentDefs(valuesSym, valuesTuple))
|
||||
body = newStmtList(varSectionInner, handler[6])
|
||||
turnSym = ident"turn"
|
||||
handlerSym = genSym(nskProc, "message")
|
||||
quote do:
|
||||
proc `handlerSym`(_: Entity; `turnSym`: var Turn; bindings: Assertion) =
|
||||
`varSectionOuter`
|
||||
if fromPreserve(`valuesSym`, bindings):
|
||||
`body`
|
||||
|
||||
proc onEvent(event: EventKind, pattern, handler: NimNode): NimNode =
|
||||
macro onPublish*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) =
|
||||
let
|
||||
handler = wrapHandler(pattern, handler)
|
||||
handlerSym = handler[0]
|
||||
handlerProc = wrapPublishHandler(doHandler)
|
||||
handlerSym = handlerProc[0]
|
||||
result = quote do:
|
||||
mixin getCurrentFacet
|
||||
getCurrentFacet().addEndpoint do (facet: Facet) -> EndpointSpec:
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = facet
|
||||
`handler`
|
||||
let a = `pattern`
|
||||
result.assertion = observe(a)
|
||||
result.analysis = some(analyzeAssertion(a))
|
||||
result.callback = wrap(facet, EventKind(`event`), `handlerSym`)
|
||||
`handlerProc`
|
||||
discard observe(`turn`, `ds`, `pattern`, newEntity(publish = `handlerSym`))
|
||||
|
||||
macro onAsserted*(pattern: Preserve; handler: untyped) =
|
||||
onEvent(addedEvent, pattern, handler)
|
||||
macro onMessage*(turn: Turn; ds: Ref; pattern: Pattern; doHandler: untyped) =
|
||||
let
|
||||
handlerProc = wrapMessageHandler(doHandler)
|
||||
handlerSym = handlerProc[0]
|
||||
result = quote do:
|
||||
`handlerProc`
|
||||
discard observe(`turn`, `ds`, `pattern`, newEntity(message = `handlerSym`))
|
||||
|
||||
macro onRetracted*(pattern: Preserve; handler: untyped) =
|
||||
onEvent(removedEvent, pattern, handler)
|
||||
|
||||
macro onMessage*(pattern: Preserve; doHandler: untyped) =
|
||||
onEvent(messageEvent, pattern, doHandler)
|
||||
|
||||
template onStart*(body: untyped): untyped =
|
||||
mixin getCurrentFacet
|
||||
getCurrentFacet().addStartScript do (facet: Facet):
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = facet
|
||||
body
|
||||
|
||||
template onStop*(body: untyped): untyped =
|
||||
mixin getCurrentFacet
|
||||
getCurrentFacet().addStopScript do (facet: Facet):
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = facet
|
||||
body
|
||||
|
||||
template publish*(a: Preserve): untyped =
|
||||
mixin getCurrentFacet
|
||||
getCurrentFacet().addEndpoint do (_: Facet) -> EndpointSpec:
|
||||
result.assertion = a
|
||||
|
||||
template field*(F: untyped; T: typedesc; initial: T): untyped =
|
||||
## Declare a field. The identifier `F` shall be a value with
|
||||
## `get` and `set` procs.
|
||||
mixin getCurrentFacet
|
||||
declareField(getCurrentFacet(), F, T, initial)
|
||||
# use the template defined in dataspaces
|
||||
|
||||
template react*(body: untyped): untyped =
|
||||
mixin getCurrentFacet
|
||||
addChildFacet(getCurrentFacet()) do (facet: Facet):
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = facet
|
||||
body
|
||||
|
||||
template stop*(body: untyped): untyped =
|
||||
mixin getCurrentFacet
|
||||
stop(getCurrentFacet()) do (facet: Facet):
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = facet
|
||||
body
|
||||
|
||||
template stop*(): untyped =
|
||||
mixin getCurrentFacet
|
||||
stop(getCurrentFacet())
|
||||
|
||||
template during*(pattern: Preserve; handler: untyped) =
|
||||
onAsserted(pattern):
|
||||
react:
|
||||
onAsserted(pattern, handler)
|
||||
onRetracted(pattern): stop()
|
||||
|
||||
template spawn*(name: string; spawnBody: untyped): untyped =
|
||||
mixin getCurrentFacet
|
||||
spawn(getCurrentFacet(), name) do (spawnFacet: Facet):
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = spawnFacet
|
||||
spawnBody
|
||||
|
||||
template withFacet*(f: Facet; body: untyped): untyped =
|
||||
## Execute a Syndicate ``body`` using the ``Facet`` at ``f``.
|
||||
runnableExamples:
|
||||
import preserves, preserves/records
|
||||
type Foo {.record: "foo".} = ref object
|
||||
facet: Facet
|
||||
i: int
|
||||
proc incAndAssert(foo: Foo) =
|
||||
inc(foo.i)
|
||||
withFacet foo.facet:
|
||||
react: assert: foo
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = f
|
||||
body
|
||||
|
||||
template syndicate*(ident, dataspaceBody: untyped): untyped =
|
||||
proc `ident`*(facet: Facet) =
|
||||
proc getCurrentFacet(): Facet {.inject, used.} = facet
|
||||
dataspaceBody
|
||||
proc `ident`*(name = ""): Future[void] =
|
||||
bootModule(name, `ident`)
|
||||
|
||||
type BootProc* = proc (facet: Facet) {.gcsafe.}
|
||||
|
||||
template boot*(module: BootProc) =
|
||||
mixin getCurrentFacet
|
||||
module(getCurrentFacet())
|
||||
|
||||
macro `?`*(x: untyped): untyped =
|
||||
## Sugar for generating Syndicate patterns.
|
||||
## `?_` is a pattern that matches but discards arbitrary
|
||||
## values and `?` combined with any other identifier is
|
||||
## a match and capture.
|
||||
if eqIdent(x, "_"):
|
||||
quote: toPreserve(Discard())
|
||||
else:
|
||||
quote: toPreserve(Capture())
|
||||
]#
|
||||
|
||||
proc startActorSystem*(name: string; bootProc: TurnAction): Future[void] =
|
||||
# from python
|
||||
let actor = newActor(name, bootProc)
|
||||
result = actor.future
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, asyncfile, strutils]
|
||||
import std/[asyncdispatch, strutils]
|
||||
import preserves, preserves/parse
|
||||
import syndicate/protocols/[simpleChatProtocol]
|
||||
import syndicate, syndicate/protocols/[simpleChatProtocol]
|
||||
import syndicate/[actors, capabilities, dataspaces, patterns, relay]
|
||||
|
||||
from syndicate/protocols/protocol import Handle
|
||||
|
@ -15,13 +15,12 @@ when defined(linux):
|
|||
importc, header: "sys/random.h".}
|
||||
|
||||
proc mint(): SturdyRef =
|
||||
#var key: array[16, byte]
|
||||
|
||||
#var key: array[32, byte]
|
||||
#doAssert getEntropy(addr key[0], csize_t key.len) == 0
|
||||
#mint(key, "syndicate")
|
||||
let pr = parsePreserves("""<ref "syndicate" [] #x"a6480df5306611ddd0d3882b546e1977">""", Ref)
|
||||
assert fromPreserve(result, pr)
|
||||
|
||||
proc noOp(turn: var Turn) = discard
|
||||
doAssert fromPreserve(result, pr)
|
||||
|
||||
waitFor runActor("chat") do (turn: var Turn):
|
||||
|
||||
|
@ -41,22 +40,13 @@ waitFor runActor("chat") do (turn: var Turn):
|
|||
|
||||
updateUsername(turn, "user" & $getCurrentProcessId())
|
||||
|
||||
proc duringPresent(turn: var Turn; v: Assertion): TurnAction =
|
||||
var a: tuple[username: string]
|
||||
assert fromPreserve(a, v), $v
|
||||
echo a.username, " arrived"
|
||||
proc onRetract(turn: var Turn) =
|
||||
echo a.username, " left"
|
||||
onRetract
|
||||
onPublish(turn, ds, Present ? {0: `?`()}) do (username: string):
|
||||
echo username, " arrived"
|
||||
#onRetract:
|
||||
# echo username, " left"
|
||||
|
||||
discard observe(turn, ds, Present ? {0: `?`()}, during(duringPresent))
|
||||
|
||||
discard observe(turn, ds, Says ? {0: `?`(), 1: `?`()}, newEntity(
|
||||
message = proc (e: Entity; turn: var Turn; v: Assertion) =
|
||||
var msg: tuple[who: string, what: string]
|
||||
assert fromPreserve(msg, v), $v
|
||||
echo msg.who, ": ", msg.what
|
||||
))
|
||||
onMessage(turn, ds, Says ? {0: `?`(), 1: `?`()}) do (who: string; what: string):
|
||||
echo who, ": ", what
|
||||
|
||||
message(turn, ds, Says(who: username, what: "hello"))
|
||||
|
||||
|
|
Loading…
Reference in New Issue