Compare commits
9 Commits
98580c2bb6
...
15d2e8bfb4
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | 15d2e8bfb4 | |
Emery Hemingway | eb5d4d9a57 | |
Emery Hemingway | 01f26caf7b | |
Emery Hemingway | e31069e41a | |
Emery Hemingway | fdf2994ec4 | |
Emery Hemingway | 0cee6670c9 | |
Emery Hemingway | 1ce96560f4 | |
Emery Hemingway | d365a1e6e5 | |
Emery Hemingway | 3e5d910d1a |
3
Tupfile
3
Tupfile
|
@ -1,2 +1,3 @@
|
||||||
include_rules
|
include_rules
|
||||||
: lock.json |> !nim_cfg |> | ./<lock>
|
: |> !nim_lk |> {lockfile}
|
||||||
|
: {lockfile} |> !nim_cfg |> | ./<lock>
|
||||||
|
|
45
lock.json
45
lock.json
|
@ -6,12 +6,22 @@
|
||||||
"bigints"
|
"bigints"
|
||||||
],
|
],
|
||||||
"path": "/nix/store/jvrm392g8adfsgf36prgwkbyd7vh5jsw-source",
|
"path": "/nix/store/jvrm392g8adfsgf36prgwkbyd7vh5jsw-source",
|
||||||
"ref": "20231006",
|
|
||||||
"rev": "86ea14d31eea9275e1408ca34e6bfe9c99989a96",
|
"rev": "86ea14d31eea9275e1408ca34e6bfe9c99989a96",
|
||||||
"sha256": "15pcpmnk1bnw3k8769rjzcpg00nahyrypwbxs88jnwr4aczp99j4",
|
"sha256": "15pcpmnk1bnw3k8769rjzcpg00nahyrypwbxs88jnwr4aczp99j4",
|
||||||
"srcDir": "src",
|
"srcDir": "src",
|
||||||
"url": "https://github.com/ehmry/nim-bigints/archive/86ea14d31eea9275e1408ca34e6bfe9c99989a96.tar.gz"
|
"url": "https://github.com/ehmry/nim-bigints/archive/86ea14d31eea9275e1408ca34e6bfe9c99989a96.tar.gz"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"method": "fetchzip",
|
||||||
|
"packages": [
|
||||||
|
"cps"
|
||||||
|
],
|
||||||
|
"path": "/nix/store/452hfhasrn3gl6vijfmzs69djl099j0j-source",
|
||||||
|
"rev": "b7c179f172e3a256a482a9daee3c0815ea423206",
|
||||||
|
"sha256": "1sn9s7iv83sw1jl5jgi2h7b0xpgsn13f9icp5124jvbp0qkxskx2",
|
||||||
|
"srcDir": "",
|
||||||
|
"url": "https://github.com/nim-works/cps/archive/b7c179f172e3a256a482a9daee3c0815ea423206.tar.gz"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"method": "fetchzip",
|
"method": "fetchzip",
|
||||||
"packages": [
|
"packages": [
|
||||||
|
@ -28,12 +38,11 @@
|
||||||
"packages": [
|
"packages": [
|
||||||
"nimcrypto"
|
"nimcrypto"
|
||||||
],
|
],
|
||||||
"path": "/nix/store/zyr8zwh7vaiycn1s4r8cxwc71f2k5l0h-source",
|
"path": "/nix/store/jwz8pqbv6rsm8w4fjzdb37r0wzjn5hv0-source",
|
||||||
"ref": "traditional-api",
|
"rev": "d58da671799c69c0b3208b96c154e13c8b1a9e90",
|
||||||
"rev": "602c5d20c69c76137201b5d41f788f72afb95aa8",
|
"sha256": "12dm0gsy10ppga7zf7hpf4adaqjrd9b740n2w926xyazq1njf6k9",
|
||||||
"sha256": "1dmdmgb6b9m5f8dyxk781nnd61dsk3hdxqks7idk9ncnpj9fng65",
|
|
||||||
"srcDir": "",
|
"srcDir": "",
|
||||||
"url": "https://github.com/cheatfate/nimcrypto/archive/602c5d20c69c76137201b5d41f788f72afb95aa8.tar.gz"
|
"url": "https://github.com/cheatfate/nimcrypto/archive/d58da671799c69c0b3208b96c154e13c8b1a9e90.tar.gz"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"method": "fetchzip",
|
"method": "fetchzip",
|
||||||
|
@ -41,7 +50,6 @@
|
||||||
"npeg"
|
"npeg"
|
||||||
],
|
],
|
||||||
"path": "/nix/store/ffkxmjmigfs7zhhiiqm0iw2c34smyciy-source",
|
"path": "/nix/store/ffkxmjmigfs7zhhiiqm0iw2c34smyciy-source",
|
||||||
"ref": "1.2.1",
|
|
||||||
"rev": "26d62fdc40feb84c6533956dc11d5ee9ea9b6c09",
|
"rev": "26d62fdc40feb84c6533956dc11d5ee9ea9b6c09",
|
||||||
"sha256": "0xpzifjkfp49w76qmaylan8q181bs45anmp46l4bwr3lkrr7bpwh",
|
"sha256": "0xpzifjkfp49w76qmaylan8q181bs45anmp46l4bwr3lkrr7bpwh",
|
||||||
"srcDir": "src",
|
"srcDir": "src",
|
||||||
|
@ -53,11 +61,32 @@
|
||||||
"preserves"
|
"preserves"
|
||||||
],
|
],
|
||||||
"path": "/nix/store/6nnn5di5vip1vladlb7z56rbw18d1y7j-source",
|
"path": "/nix/store/6nnn5di5vip1vladlb7z56rbw18d1y7j-source",
|
||||||
"ref": "20240208",
|
|
||||||
"rev": "2825bceecf33a15b9b7942db5331a32cbc39b281",
|
"rev": "2825bceecf33a15b9b7942db5331a32cbc39b281",
|
||||||
"sha256": "145vf46fy3wc52j6vs509fm9bi5lx7c53gskbkpcfbkv82l86dgk",
|
"sha256": "145vf46fy3wc52j6vs509fm9bi5lx7c53gskbkpcfbkv82l86dgk",
|
||||||
"srcDir": "src",
|
"srcDir": "src",
|
||||||
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/2825bceecf33a15b9b7942db5331a32cbc39b281.tar.gz"
|
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/2825bceecf33a15b9b7942db5331a32cbc39b281.tar.gz"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"method": "fetchzip",
|
||||||
|
"packages": [
|
||||||
|
"stew"
|
||||||
|
],
|
||||||
|
"path": "/nix/store/mqg8qzsbcc8xqabq2yzvlhvcyqypk72c-source",
|
||||||
|
"rev": "3c91b8694e15137a81ec7db37c6c58194ec94a6a",
|
||||||
|
"sha256": "17lfhfxp5nxvld78xa83p258y80ks5jb4n53152cdr57xk86y07w",
|
||||||
|
"srcDir": "",
|
||||||
|
"url": "https://github.com/status-im/nim-stew/archive/3c91b8694e15137a81ec7db37c6c58194ec94a6a.tar.gz"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"method": "fetchzip",
|
||||||
|
"packages": [
|
||||||
|
"sys"
|
||||||
|
],
|
||||||
|
"path": "/nix/store/syhxsjlsdqfap0hk4qp3s6kayk8cqknd-source",
|
||||||
|
"rev": "4ef3b624db86e331ba334e705c1aa235d55b05e1",
|
||||||
|
"sha256": "1q4qgw4an4mmmcbx48l6xk1jig1vc8p9cq9dbx39kpnb0890j32q",
|
||||||
|
"srcDir": "src",
|
||||||
|
"url": "https://github.com/ehmry/nim-sys/archive/4ef3b624db86e331ba334e705c1aa235d55b05e1.tar.gz"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,8 @@
|
||||||
|
|
||||||
## This module implements the `Syndicate DSL <https://syndicate-lang.org/doc/syndicate/>`_.
|
## This module implements the `Syndicate DSL <https://syndicate-lang.org/doc/syndicate/>`_.
|
||||||
|
|
||||||
import std/[asyncdispatch, macros, tables, typetraits]
|
import std/[macros, tables, typetraits]
|
||||||
|
import pkg/sys/ioqueue
|
||||||
|
|
||||||
import preserves
|
import preserves
|
||||||
export fromPreserves, toPreserves
|
export fromPreserves, toPreserves
|
||||||
|
@ -34,21 +35,21 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline.
|
||||||
patterns.inject(pat, bindings)
|
patterns.inject(pat, bindings)
|
||||||
|
|
||||||
type
|
type
|
||||||
PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure, gcsafe.}
|
PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure.}
|
||||||
RetractProc = proc (turn: var Turn; h: Handle) {.closure, gcsafe.}
|
RetractProc = proc (turn: var Turn; h: Handle) {.closure.}
|
||||||
MessageProc = proc (turn: var Turn; v: Value) {.closure, gcsafe.}
|
MessageProc = proc (turn: var Turn; v: Value) {.closure.}
|
||||||
ClosureEntity = ref object of Entity
|
ClosureEntity = ref object of Entity
|
||||||
publishImpl: PublishProc
|
publishImpl: PublishProc
|
||||||
retractImpl: RetractProc
|
retractImpl: RetractProc
|
||||||
messageImpl: MessageProc
|
messageImpl: MessageProc
|
||||||
|
|
||||||
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||||
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
|
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
|
||||||
|
|
||||||
method retract(e: ClosureEntity; turn: var Turn; h: Handle) {.gcsafe.} =
|
method retract(e: ClosureEntity; turn: var Turn; h: Handle) =
|
||||||
if not e.retractImpl.isNil: e.retractImpl(turn, h)
|
if not e.retractImpl.isNil: e.retractImpl(turn, h)
|
||||||
|
|
||||||
method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) {.gcsafe.} =
|
method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) =
|
||||||
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
|
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
|
||||||
|
|
||||||
proc argumentCount(handler: NimNode): int =
|
proc argumentCount(handler: NimNode): int =
|
||||||
|
@ -187,16 +188,7 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
|
||||||
`callbackProc`
|
`callbackProc`
|
||||||
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
|
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
|
||||||
|
|
||||||
type BootProc = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
proc runActor*(name: string; bootProc: TurnAction) =
|
||||||
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.gcsafe.}
|
## Boot an actor `Actor` and churn ioqueue once.
|
||||||
|
let actor = bootActor(name, bootProc)
|
||||||
proc runActor*(name: string; bootProc: BootProc) =
|
ioqueue.run()
|
||||||
## Run an `Actor` to completion.
|
|
||||||
let actor = bootDataspace(name, bootProc)
|
|
||||||
while actor.running:
|
|
||||||
waitFor sleepAsync(500)
|
|
||||||
|
|
||||||
proc runActor*(name: string; bootProc: DeprecatedBootProc) {.deprecated.} =
|
|
||||||
## Run an `Actor` to completion.
|
|
||||||
runActor(name) do (turn: var Turn, ds: Cap):
|
|
||||||
bootProc(ds, turn)
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||||
# SPDX-License-Identifier: Unlicense
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
import std/[asyncfutures, hashes, monotimes, options, sets, tables, times]
|
import std/[hashes, monotimes, options, sets, tables, times]
|
||||||
import preserves
|
import preserves
|
||||||
import ../syndicate/protocols/[protocol, sturdy]
|
import ../syndicate/protocols/[protocol, sturdy]
|
||||||
|
|
||||||
|
@ -66,7 +66,7 @@ type
|
||||||
turnIdAllocator: ref TurnId
|
turnIdAllocator: ref TurnId
|
||||||
traceStream: FileStream
|
traceStream: FileStream
|
||||||
|
|
||||||
TurnAction* = proc (t: var Turn) {.gcsafe.}
|
TurnAction* = proc (t: var Turn) {.closure.}
|
||||||
|
|
||||||
Queues = TableRef[Facet, seq[TurnAction]]
|
Queues = TableRef[Facet, seq[TurnAction]]
|
||||||
|
|
||||||
|
@ -108,10 +108,10 @@ when tracing:
|
||||||
result.add f.id.toPreserves
|
result.add f.id.toPreserves
|
||||||
f = f.parent
|
f = f.parent
|
||||||
|
|
||||||
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base, gcsafe.} = discard
|
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
||||||
method retract*(e: Entity; turn: var Turn; h: Handle) {.base, gcsafe.} = discard
|
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
|
||||||
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base, gcsafe.} = discard
|
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
|
||||||
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base, gcsafe.} = discard
|
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
|
||||||
|
|
||||||
using
|
using
|
||||||
actor: Actor
|
actor: Actor
|
||||||
|
@ -171,7 +171,7 @@ proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
|
||||||
of PatternKind.Patom:
|
of PatternKind.Patom:
|
||||||
result = case p.patom
|
result = case p.patom
|
||||||
of PAtom.Boolean: v.isBoolean
|
of PAtom.Boolean: v.isBoolean
|
||||||
of PAtom.Double: v.isDouble
|
of PAtom.Double: v.isFloat
|
||||||
of PAtom.Signedinteger: v.isInteger
|
of PAtom.Signedinteger: v.isInteger
|
||||||
of PAtom.String: v.isString
|
of PAtom.String: v.isString
|
||||||
of PAtom.Bytestring: v.isByteString
|
of PAtom.Bytestring: v.isByteString
|
||||||
|
@ -278,8 +278,7 @@ proc runRewrites*(a: Attenuation; v: Value): Value =
|
||||||
proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
|
proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
|
||||||
var a = runRewrites(r.attenuation, v)
|
var a = runRewrites(r.attenuation, v)
|
||||||
if not a.isFalse:
|
if not a.isFalse:
|
||||||
let e = OutboundAssertion(
|
let e = OutboundAssertion(handle: h, peer: r)
|
||||||
handle: h, peer: r, established: false)
|
|
||||||
turn.facet.outbound[h] = e
|
turn.facet.outbound[h] = e
|
||||||
enqueue(turn, r.relay) do (turn: var Turn):
|
enqueue(turn, r.relay) do (turn: var Turn):
|
||||||
e.established = true
|
e.established = true
|
||||||
|
@ -341,9 +340,9 @@ proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discar
|
||||||
retract(turn, old)
|
retract(turn, old)
|
||||||
h
|
h
|
||||||
|
|
||||||
proc stop*(turn: var Turn) {.gcsafe.}
|
proc stop*(turn: var Turn)
|
||||||
|
|
||||||
proc run*(facet; action: TurnAction; zombieTurn = false) {.gcsafe.}
|
proc run*(facet; action: TurnAction; zombieTurn = false)
|
||||||
|
|
||||||
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
|
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
|
||||||
result = Facet(
|
result = Facet(
|
||||||
|
@ -363,7 +362,7 @@ proc isInert(facet): bool =
|
||||||
(facet.outbound.len == 0 or facet.parent.isNil) and
|
(facet.outbound.len == 0 or facet.parent.isNil) and
|
||||||
facet.inertCheckPreventers == 0
|
facet.inertCheckPreventers == 0
|
||||||
|
|
||||||
proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
|
proc preventInertCheck*(facet): (proc()) {.discardable.} =
|
||||||
var armed = true
|
var armed = true
|
||||||
inc facet.inertCheckPreventers
|
inc facet.inertCheckPreventers
|
||||||
proc disarm() =
|
proc disarm() =
|
||||||
|
@ -378,9 +377,9 @@ proc inFacet(turn: var Turn; facet; act: TurnAction) =
|
||||||
var t = Turn(facet: facet, queues: turn.queues)
|
var t = Turn(facet: facet, queues: turn.queues)
|
||||||
act(t)
|
act(t)
|
||||||
|
|
||||||
proc terminate(actor; turn; reason: ref Exception) {.gcsafe.}
|
proc terminate(actor; turn; reason: ref Exception)
|
||||||
|
|
||||||
proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} =
|
proc terminate(facet; turn: var Turn; orderly: bool) =
|
||||||
if facet.isAlive:
|
if facet.isAlive:
|
||||||
facet.isAlive = false
|
facet.isAlive = false
|
||||||
let parent = facet.parent
|
let parent = facet.parent
|
||||||
|
@ -458,7 +457,7 @@ proc bootActor*(name: string; bootProc: TurnAction): Actor =
|
||||||
else: result.traceStream = openFileStream(path, fmWrite)
|
else: result.traceStream = openFileStream(path, fmWrite)
|
||||||
run(result, bootProc, initialAssertions)
|
run(result, bootProc, initialAssertions)
|
||||||
|
|
||||||
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
proc spawnActor*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||||
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
||||||
enqueue(turn, turn.facet) do (turn: var Turn):
|
enqueue(turn, turn.facet) do (turn: var Turn):
|
||||||
var newOutBound: Table[Handle, OutboundAssertion]
|
var newOutBound: Table[Handle, OutboundAssertion]
|
||||||
|
@ -473,6 +472,9 @@ proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertion
|
||||||
run(actor, bootProc, newOutBound)
|
run(actor, bootProc, newOutBound)
|
||||||
actor
|
actor
|
||||||
|
|
||||||
|
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||||
|
spawnActor(name, turn, bootProc, initialAssertions)
|
||||||
|
|
||||||
proc newInertCap*(): Cap =
|
proc newInertCap*(): Cap =
|
||||||
let a = bootActor("inert") do (turn: var Turn): turn.stop()
|
let a = bootActor("inert") do (turn: var Turn): turn.stop()
|
||||||
Cap(relay: a.root)
|
Cap(relay: a.root)
|
||||||
|
@ -493,13 +495,14 @@ proc terminate(actor; turn; reason: ref Exception) =
|
||||||
proc finish(turn: var Turn) =
|
proc finish(turn: var Turn) =
|
||||||
actor.root.terminate(turn, reason.isNil)
|
actor.root.terminate(turn, reason.isNil)
|
||||||
actor.exited = true
|
actor.exited = true
|
||||||
callSoon do ():
|
block:
|
||||||
run(actor.root, finish, true)
|
run(actor.root, finish, true)
|
||||||
|
|
||||||
proc terminate*(facet; e: ref Exception) =
|
proc terminate*(facet; e: ref Exception) =
|
||||||
run(facet.actor.root) do (turn: var Turn):
|
run(facet.actor.root) do (turn: var Turn):
|
||||||
facet.actor.terminate(turn, e)
|
facet.actor.terminate(turn, e)
|
||||||
|
|
||||||
|
#[
|
||||||
proc asyncCheck*(facet: Facet; fut: FutureBase) =
|
proc asyncCheck*(facet: Facet; fut: FutureBase) =
|
||||||
## Sets a callback on `fut` which propagates exceptions to `facet`.
|
## Sets a callback on `fut` which propagates exceptions to `facet`.
|
||||||
addCallback(fut) do ():
|
addCallback(fut) do ():
|
||||||
|
@ -508,13 +511,14 @@ proc asyncCheck*(facet: Facet; fut: FutureBase) =
|
||||||
proc asyncCheck*(turn; fut: FutureBase) =
|
proc asyncCheck*(turn; fut: FutureBase) =
|
||||||
## Sets a callback on `fut` which propagates exceptions to the facet of `turn`.
|
## Sets a callback on `fut` which propagates exceptions to the facet of `turn`.
|
||||||
asyncCheck(turn.facet, fut)
|
asyncCheck(turn.facet, fut)
|
||||||
|
]#
|
||||||
|
|
||||||
template tryFacet(facet; body: untyped) =
|
template tryFacet(facet; body: untyped) =
|
||||||
try: body
|
try: body
|
||||||
except CatchableError as err: terminate(facet, err)
|
except CatchableError as err: terminate(facet, err)
|
||||||
|
|
||||||
proc run*(facet; action: TurnAction; zombieTurn = false) =
|
proc run*(facet; action: TurnAction; zombieTurn = false) =
|
||||||
if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
|
if true and zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
|
||||||
tryFacet(facet):
|
tryFacet(facet):
|
||||||
var queues = newTable[Facet, seq[TurnAction]]()
|
var queues = newTable[Facet, seq[TurnAction]]()
|
||||||
block:
|
block:
|
||||||
|
@ -531,6 +535,7 @@ proc run*(cap: Cap; action: TurnAction) =
|
||||||
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
|
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
|
||||||
run(cap.relay, action)
|
run(cap.relay, action)
|
||||||
|
|
||||||
|
#[
|
||||||
proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
|
proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
|
||||||
## Add a callback to a `Future` that will be called at a later `Turn`
|
## Add a callback to a `Future` that will be called at a later `Turn`
|
||||||
## within the context of `facet`.
|
## within the context of `facet`.
|
||||||
|
@ -555,7 +560,7 @@ proc addCallback*(fut: FutureBase; turn: var Turn; act: TurnAction) =
|
||||||
else:
|
else:
|
||||||
addCallback(fut, turn.facet, act)
|
addCallback(fut, turn.facet, act)
|
||||||
|
|
||||||
proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x: T) {.gcsafe.}) =
|
proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x: T) {.closure.}) =
|
||||||
addCallback(fut, turn) do (turn: var Turn):
|
addCallback(fut, turn) do (turn: var Turn):
|
||||||
if fut.failed: terminate(turn.facet, fut.error)
|
if fut.failed: terminate(turn.facet, fut.error)
|
||||||
else:
|
else:
|
||||||
|
@ -563,6 +568,7 @@ proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x:
|
||||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||||
turn.desc.cause.external.description = "Future".toPreserves
|
turn.desc.cause.external.description = "Future".toPreserves
|
||||||
act(turn, read fut)
|
act(turn, read fut)
|
||||||
|
]#
|
||||||
|
|
||||||
proc stop*(turn: var Turn, facet: Facet) =
|
proc stop*(turn: var Turn, facet: Facet) =
|
||||||
if facet.parent.isNil:
|
if facet.parent.isNil:
|
||||||
|
@ -578,10 +584,17 @@ proc onStop*(facet: Facet; act: TurnAction) =
|
||||||
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
|
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
|
||||||
add(facet.shutdownActions, act)
|
add(facet.shutdownActions, act)
|
||||||
|
|
||||||
|
proc stopActor*(facet: Facet) =
|
||||||
|
run(facet) do (turn: var Turn):
|
||||||
|
let actor = facet.actor
|
||||||
|
enqueue(turn, actor.root) do (turn: var Turn):
|
||||||
|
terminate(actor, turn, nil)
|
||||||
|
|
||||||
proc stopActor*(turn: var Turn) =
|
proc stopActor*(turn: var Turn) =
|
||||||
let actor = turn.facet.actor
|
stopActor(turn.facet)
|
||||||
enqueue(turn, actor.root) do (turn: var Turn):
|
|
||||||
terminate(actor, turn, nil)
|
proc stop*(actor: Actor) =
|
||||||
|
stopActor(actor.root)
|
||||||
|
|
||||||
proc freshen*(turn: var Turn, act: TurnAction) =
|
proc freshen*(turn: var Turn, act: TurnAction) =
|
||||||
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
|
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
|
||||||
|
|
|
@ -1,35 +1,115 @@
|
||||||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||||
# SPDX-License-Identifier: Unlicense
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
import std/[asyncdispatch, monotimes, times]
|
import std/[sets, times]
|
||||||
|
import pkg/sys/[handles, ioqueue]
|
||||||
import preserves
|
import preserves
|
||||||
import syndicate
|
import ../../syndicate, ../bags, ../protocols/[timer, dataspace]
|
||||||
|
|
||||||
import ../protocols/timer
|
|
||||||
from syndicate/protocols/dataspace import Observe
|
|
||||||
|
|
||||||
export timer
|
export timer
|
||||||
|
|
||||||
type Observe = dataspace.Observe
|
type
|
||||||
|
Observe = dataspace.Observe
|
||||||
|
|
||||||
proc now: float64 = getTime().toUnixFloat()
|
when defined(linux):
|
||||||
|
import std/[oserrors, posix]
|
||||||
|
type Time = posix.Time
|
||||||
|
|
||||||
proc spawnTimers*(turn: var Turn; ds: Cap): Actor {.discardable.} =
|
{.pragma: timerfd, importc, header: "<sys/timerfd.h>".}
|
||||||
## Spawn a timer actor.
|
|
||||||
spawn("timer", turn) do (turn: var Turn):
|
|
||||||
|
|
||||||
during(turn, ds, inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})) do (seconds: float):
|
proc timerfd_create(clock_id: ClockId, flags: cint): cint {.timerfd.}
|
||||||
let period = seconds - now()
|
proc timerfd_settime(ufd: cint, flags: cint,
|
||||||
if period < 0.001:
|
utmr: var Itimerspec, otmr: var Itimerspec): cint {.timerfd.}
|
||||||
discard publish(turn, ds, LaterThan(seconds: seconds))
|
proc timerfd_gettime(ufd: cint, curr: var Itimerspec): cint {.timerfd.}
|
||||||
else:
|
|
||||||
addCallback(sleepAsync(period * 1_000), turn) do (turn: var Turn):
|
var
|
||||||
discard publish(turn, ds, LaterThan(seconds: seconds))
|
TFD_NONBLOCK {.timerfd.}: cint
|
||||||
|
TFD_CLOEXEC {.timerfd.}: cint
|
||||||
|
TFD_TIMER_ABSTIME {.timerfd.}: cint
|
||||||
|
|
||||||
|
proc `<`(a, b: Timespec): bool =
|
||||||
|
a.tv_sec.clong < b.tv_sec.clong or
|
||||||
|
(a.tv_sec.clong == b.tv_sec.clong and a.tv_nsec < b.tv_nsec)
|
||||||
|
|
||||||
|
proc `+`(a, b: Timespec): Timespec =
|
||||||
|
result.tv_sec = Time a.tv_sec.clong + b.tv_sec.clong
|
||||||
|
result.tv_nsec = a.tv_nsec + b.tv_nsec
|
||||||
|
|
||||||
|
func toFloat(ts: Timespec): float =
|
||||||
|
ts.tv_sec.float + ts.tv_nsec.float / 1_000_000_000
|
||||||
|
|
||||||
|
func toTimespec(f: float): Timespec =
|
||||||
|
result.tv_sec = Time(f)
|
||||||
|
result.tv_nsec = clong(uint64(f * 1_000_000_000) mod 1_000_000_000)
|
||||||
|
|
||||||
|
proc clock_realtime: Timespec =
|
||||||
|
if clock_gettime(CLOCK_REALTIME, result) < 0:
|
||||||
|
raiseOSError(osLastError(), "clock_gettime")
|
||||||
|
|
||||||
|
type
|
||||||
|
TimerDriver = ref object
|
||||||
|
facet: Facet
|
||||||
|
## Owning facet of driver.
|
||||||
|
target: Cap
|
||||||
|
## Destination for LaterThan assertions.
|
||||||
|
deadlines: Bag[float]
|
||||||
|
## Deadlines that other actors are observing.
|
||||||
|
timers: HashSet[cint]
|
||||||
|
# TODO: use a single timer descriptor
|
||||||
|
|
||||||
|
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
|
||||||
|
let driver = TimerDriver(facet: facet, target: cap)
|
||||||
|
facet.onStop do (turn: var Turn):
|
||||||
|
for fd in driver.timers:
|
||||||
|
unregister(FD fd)
|
||||||
|
discard close(fd)
|
||||||
|
driver
|
||||||
|
|
||||||
|
proc earliestFloat(driver: TimerDriver): float =
|
||||||
|
assert driver.deadlines.len > 0
|
||||||
|
result = high float
|
||||||
|
for deadline in driver.deadlines:
|
||||||
|
if deadline < result:
|
||||||
|
result = deadline
|
||||||
|
|
||||||
|
proc await(driver: TimerDriver; deadline: float) {.asyncio.} =
|
||||||
|
## Run timer driver concurrently with actor.
|
||||||
|
let fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC)
|
||||||
|
if fd < 0:
|
||||||
|
raiseOSError(osLastError(), "failed to acquire timer descriptor")
|
||||||
|
var
|
||||||
|
old: Itimerspec
|
||||||
|
its = Itimerspec(it_value: deadline.toTimespec)
|
||||||
|
if timerfd_settime(fd, TFD_TIMER_ABSTIME, its, old) < 0:
|
||||||
|
raiseOSError(osLastError(), "failed to set timeout")
|
||||||
|
driver.timers.incl(fd)
|
||||||
|
while clock_realtime() < its.it_value:
|
||||||
|
# Check if the timer is expired which
|
||||||
|
# could happen before waiting.
|
||||||
|
wait(FD fd, Read)
|
||||||
|
if deadline in driver.deadlines:
|
||||||
|
# Check if the deadline is still observed.
|
||||||
|
proc turnWork(turn: var Turn) =
|
||||||
|
discard publish(turn, driver.target, LaterThan(seconds: deadline))
|
||||||
|
run(driver.facet, turnWork)
|
||||||
|
discard close(fd)
|
||||||
|
driver.timers.excl(fd)
|
||||||
|
|
||||||
|
proc spawnTimerActor*(ds: Cap; turn: var Turn): Actor {.discardable.} =
|
||||||
|
## Spawn a timer actor that responds to
|
||||||
|
## dataspace observations of timeouts on `ds`.
|
||||||
|
spawnActor("timers", turn) do (turn: var Turn):
|
||||||
|
let driver = spawnTimerDriver(turn.facet, ds)
|
||||||
|
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
|
||||||
|
during(turn, ds, pat) do (deadline: float):
|
||||||
|
if change(driver.deadlines, deadline, +1) == cdAbsentToPresent:
|
||||||
|
discard trampoline(whelp await(driver, deadline))
|
||||||
|
do:
|
||||||
|
discard change(driver.deadlines, deadline, -1, clamp = true)
|
||||||
|
# TODO: retract assertions that are unobserved.
|
||||||
|
|
||||||
proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
||||||
## Execute `act` after some duration of time.
|
## Execute `act` after some duration of time.
|
||||||
let later = now() + dur.inMilliseconds.float64 * 1_000.0
|
var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0
|
||||||
onPublish(turn, ds, grab LaterThan(seconds: later)):
|
onPublish(turn, ds, grab LaterThan(seconds: later)):
|
||||||
act(turn)
|
act(turn)
|
||||||
|
|
||||||
# TODO: periodic timer
|
|
||||||
|
|
|
@ -46,3 +46,5 @@ proc `$`*(bag: Bag): string =
|
||||||
if result.len > 1: result.add ' '
|
if result.len > 1: result.add ' '
|
||||||
result.add $x
|
result.add $x
|
||||||
result.add '}'
|
result.add '}'
|
||||||
|
|
||||||
|
export tables.contains, tables.del, tables.len
|
||||||
|
|
|
@ -51,7 +51,7 @@ iterator observersOf[Sid, Oid](g: Graph[Sid, Oid]; oid: Oid): Sid =
|
||||||
if g.edgesForward.hasKey(oid):
|
if g.edgesForward.hasKey(oid):
|
||||||
for sid in g.edgesForward[oid]: yield sid
|
for sid in g.edgesForward[oid]: yield sid
|
||||||
|
|
||||||
proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.gcsafe.}) =
|
proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.closure.}) =
|
||||||
var repairedThisRound: Set[Oid]
|
var repairedThisRound: Set[Oid]
|
||||||
while true:
|
while true:
|
||||||
var workSet = move g.damagedNodes
|
var workSet = move g.damagedNodes
|
||||||
|
|
|
@ -16,14 +16,14 @@ type
|
||||||
index: Index
|
index: Index
|
||||||
handleMap: Table[Handle, Assertion]
|
handleMap: Table[Handle, Assertion]
|
||||||
|
|
||||||
method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||||
if add(ds.index, turn, a.value):
|
if add(ds.index, turn, a.value):
|
||||||
var obs = a.value.preservesTo(Observe)
|
var obs = a.value.preservesTo(Observe)
|
||||||
if obs.isSome and obs.get.observer of Cap:
|
if obs.isSome and obs.get.observer of Cap:
|
||||||
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
|
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||||
ds.handleMap[h] = a.value
|
ds.handleMap[h] = a.value
|
||||||
|
|
||||||
method retract(ds: Dataspace; turn: var Turn; h: Handle) {.gcsafe.} =
|
method retract(ds: Dataspace; turn: var Turn; h: Handle) =
|
||||||
let v = ds.handleMap[h]
|
let v = ds.handleMap[h]
|
||||||
if remove(ds.index, turn, v):
|
if remove(ds.index, turn, v):
|
||||||
ds.handleMap.del h
|
ds.handleMap.del h
|
||||||
|
@ -31,14 +31,14 @@ method retract(ds: Dataspace; turn: var Turn; h: Handle) {.gcsafe.} =
|
||||||
if obs.isSome and obs.get.observer of Cap:
|
if obs.isSome and obs.get.observer of Cap:
|
||||||
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
|
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||||
|
|
||||||
method message(ds: Dataspace; turn: var Turn; a: AssertionRef) {.gcsafe.} =
|
method message(ds: Dataspace; turn: var Turn; a: AssertionRef) =
|
||||||
ds.index.deliverMessage(turn, a.value)
|
ds.index.deliverMessage(turn, a.value)
|
||||||
|
|
||||||
proc newDataspace*(turn: var Turn): Cap =
|
proc newDataspace*(turn: var Turn): Cap =
|
||||||
newCap(turn, Dataspace(index: initIndex()))
|
newCap(turn, Dataspace(index: initIndex()))
|
||||||
|
|
||||||
type BootProc = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
type BootProc = proc (turn: var Turn; ds: Cap) {.closure.}
|
||||||
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.gcsafe.}
|
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
|
||||||
|
|
||||||
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
||||||
bootActor(name) do (turn: var Turn):
|
bootActor(name) do (turn: var Turn):
|
||||||
|
|
|
@ -6,7 +6,7 @@ import preserves
|
||||||
import ./actors, ./patterns, ./protocols/dataspace
|
import ./actors, ./patterns, ./protocols/dataspace
|
||||||
|
|
||||||
type
|
type
|
||||||
DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction {.gcsafe.}
|
DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction
|
||||||
DuringActionKind = enum null, dead, act
|
DuringActionKind = enum null, dead, act
|
||||||
DuringAction = object
|
DuringAction = object
|
||||||
case kind: DuringActionKind
|
case kind: DuringActionKind
|
||||||
|
|
|
@ -313,7 +313,7 @@ proc grabDictionary*(bindings: sink openArray[(string, Pattern)]): Pattern =
|
||||||
for (key, val) in bindings.items:
|
for (key, val) in bindings.items:
|
||||||
result.dcompound.dict.entries[key.toSymbol] = val
|
result.dcompound.dict.entries[key.toSymbol] = val
|
||||||
|
|
||||||
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value {.gcsafe.}
|
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value
|
||||||
|
|
||||||
proc depattern(pat: Pattern; values: var seq[Value]; index: var int): Value =
|
proc depattern(pat: Pattern; values: var seq[Value]; index: var int): Value =
|
||||||
case pat.orKind
|
case pat.orKind
|
||||||
|
@ -328,7 +328,7 @@ proc depattern(pat: Pattern; values: var seq[Value]; index: var int): Value =
|
||||||
of PatternKind.DCompound:
|
of PatternKind.DCompound:
|
||||||
result = depattern(pat.dcompound, values, index)
|
result = depattern(pat.dcompound, values, index)
|
||||||
|
|
||||||
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value {.gcsafe.} =
|
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value =
|
||||||
case comp.orKind
|
case comp.orKind
|
||||||
of DCompoundKind.rec:
|
of DCompoundKind.rec:
|
||||||
result = initRecord(comp.rec.label, comp.rec.fields.len)
|
result = initRecord(comp.rec.label, comp.rec.fields.len)
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||||
# SPDX-License-Identifier: Unlicense
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
import std/[asyncdispatch, options, tables]
|
import std/[options, tables]
|
||||||
from std/os import getEnv, `/`
|
from std/os import getEnv, `/`
|
||||||
|
import pkg/sys/ioqueue
|
||||||
import preserves
|
import preserves
|
||||||
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
||||||
|
|
||||||
|
@ -16,19 +17,18 @@ else:
|
||||||
|
|
||||||
export `$`
|
export `$`
|
||||||
|
|
||||||
type
|
|
||||||
Oid = sturdy.Oid
|
|
||||||
|
|
||||||
export Stdio, Tcp, WebSocket, Unix
|
export Stdio, Tcp, WebSocket, Unix
|
||||||
|
|
||||||
type
|
type
|
||||||
Assertion = Value
|
Assertion = Value
|
||||||
WireRef = sturdy.WireRef
|
Event = protocol.Event
|
||||||
Turn = syndicate.Turn
|
|
||||||
Handle = actors.Handle
|
Handle = actors.Handle
|
||||||
|
Oid = sturdy.Oid
|
||||||
|
Turn = syndicate.Turn
|
||||||
|
WireRef = sturdy.WireRef
|
||||||
|
|
||||||
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure, gcsafe.}
|
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
|
||||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure, gcsafe.}
|
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
|
||||||
|
|
||||||
Relay* = ref object
|
Relay* = ref object
|
||||||
facet: Facet
|
facet: Facet
|
||||||
|
@ -38,7 +38,6 @@ type
|
||||||
exported: Membrane
|
exported: Membrane
|
||||||
imported: Membrane
|
imported: Membrane
|
||||||
nextLocalOid: Oid
|
nextLocalOid: Oid
|
||||||
pendingTurn: protocol.Turn
|
|
||||||
wireBuf: BufferedDecoder
|
wireBuf: BufferedDecoder
|
||||||
packetWriter: PacketWriter
|
packetWriter: PacketWriter
|
||||||
peer: Cap
|
peer: Cap
|
||||||
|
@ -90,7 +89,7 @@ proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireR
|
||||||
mine: WireRefMine(oid: ws.oid))
|
mine: WireRefMine(oid: ws.oid))
|
||||||
|
|
||||||
proc rewriteOut(relay: Relay; v: Assertion):
|
proc rewriteOut(relay: Relay; v: Assertion):
|
||||||
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
|
tuple[rewritten: Value, exported: seq[WireSymbol]] =
|
||||||
var exported: seq[WireSymbol]
|
var exported: seq[WireSymbol]
|
||||||
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
||||||
let o = pr.unembed(Cap); if o.isSome:
|
let o = pr.unembed(Cap); if o.isSome:
|
||||||
|
@ -108,41 +107,39 @@ proc deregister(relay: Relay; h: Handle) =
|
||||||
for e in outbound: releaseCapOut(relay, e)
|
for e in outbound: releaseCapOut(relay, e)
|
||||||
|
|
||||||
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||||
if relay.pendingTurn.len == 0:
|
# TODO: don't send right away.
|
||||||
# If the pending queue is empty then schedule a packet
|
var pendingTurn: protocol.Turn
|
||||||
# to be sent after pending I/O is processed.
|
pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||||
callSoon do ():
|
relay.facet.run do (turn: var Turn):
|
||||||
relay.facet.run do (turn: var Turn):
|
var pkt = Packet(
|
||||||
var pkt = Packet(
|
orKind: PacketKind.Turn,
|
||||||
orKind: PacketKind.Turn,
|
turn: pendingTurn)
|
||||||
turn: move relay.pendingTurn)
|
trace "C: ", pkt
|
||||||
trace "C: ", pkt
|
relay.packetWriter(turn, encode pkt)
|
||||||
relay.packetWriter(turn, encode pkt)
|
|
||||||
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
|
||||||
|
|
||||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||||
send(re.relay, turn, protocol.Oid re.oid, ev)
|
send(re.relay, turn, protocol.Oid re.oid, ev)
|
||||||
|
|
||||||
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) =
|
||||||
re.send(t, Event(
|
re.send(t, Event(
|
||||||
orKind: EventKind.Assert,
|
orKind: EventKind.Assert,
|
||||||
`assert`: protocol.Assert(
|
`assert`: protocol.Assert(
|
||||||
assertion: re.relay.register(a.value, h).rewritten,
|
assertion: re.relay.register(a.value, h).rewritten,
|
||||||
handle: h)))
|
handle: h)))
|
||||||
|
|
||||||
method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} =
|
method retract(re: RelayEntity; t: var Turn; h: Handle) =
|
||||||
re.relay.deregister h
|
re.relay.deregister h
|
||||||
re.send(t, Event(
|
re.send(t, Event(
|
||||||
orKind: EventKind.Retract,
|
orKind: EventKind.Retract,
|
||||||
retract: Retract(handle: h)))
|
retract: Retract(handle: h)))
|
||||||
|
|
||||||
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} =
|
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) =
|
||||||
var (value, exported) = rewriteOut(re.relay, msg.value)
|
var (value, exported) = rewriteOut(re.relay, msg.value)
|
||||||
assert(len(exported) == 0, "cannot send a reference in a message")
|
assert(len(exported) == 0, "cannot send a reference in a message")
|
||||||
if len(exported) == 0:
|
if len(exported) == 0:
|
||||||
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
|
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
|
||||||
|
|
||||||
method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} =
|
method sync(re: RelayEntity; turn: var Turn; peer: Cap) =
|
||||||
var
|
var
|
||||||
peerEntity = newSyncPeerEntity(re.relay, peer)
|
peerEntity = newSyncPeerEntity(re.relay, peer)
|
||||||
exported: seq[WireSymbol]
|
exported: seq[WireSymbol]
|
||||||
|
@ -185,7 +182,7 @@ proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap
|
||||||
else: raiseAssert "attenuation not implemented"
|
else: raiseAssert "attenuation not implemented"
|
||||||
|
|
||||||
proc rewriteIn(relay; facet; v: Value):
|
proc rewriteIn(relay; facet; v: Value):
|
||||||
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
|
tuple[rewritten: Assertion; imported: seq[WireSymbol]] =
|
||||||
var imported: seq[WireSymbol]
|
var imported: seq[WireSymbol]
|
||||||
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
||||||
let wr = pr.preservesTo WireRef; if wr.isSome:
|
let wr = pr.preservesTo WireRef; if wr.isSome:
|
||||||
|
@ -196,7 +193,7 @@ proc rewriteIn(relay; facet; v: Value):
|
||||||
|
|
||||||
proc close(r: Relay) = discard
|
proc close(r: Relay) = discard
|
||||||
|
|
||||||
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
|
||||||
case event.orKind
|
case event.orKind
|
||||||
of EventKind.Assert:
|
of EventKind.Assert:
|
||||||
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
||||||
|
@ -224,7 +221,7 @@ proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
||||||
for e in imported: relay.imported.del e
|
for e in imported: relay.imported.del e
|
||||||
]#
|
]#
|
||||||
|
|
||||||
proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
proc dispatch(relay: Relay; v: Value) =
|
||||||
trace "S: ", v
|
trace "S: ", v
|
||||||
run(relay.facet) do (t: var Turn):
|
run(relay.facet) do (t: var Turn):
|
||||||
var pkt: Packet
|
var pkt: Packet
|
||||||
|
@ -250,8 +247,8 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
stderr.writeLine("discarding undecoded packet ", v)
|
stderr.writeLine("discarding undecoded packet ", v)
|
||||||
|
|
||||||
proc recv(relay: Relay; buf: seq[byte]) =
|
proc recv(relay: Relay; buf: openarray[byte]; slice: Slice[int]) =
|
||||||
feed(relay.wireBuf, buf)
|
feed(relay.wireBuf, buf, slice)
|
||||||
var pr = decode(relay.wireBuf)
|
var pr = decode(relay.wireBuf)
|
||||||
if pr.isSome: dispatch(relay, pr.get)
|
if pr.isSome: dispatch(relay, pr.get)
|
||||||
|
|
||||||
|
@ -265,7 +262,7 @@ type
|
||||||
nextLocalOid*: Option[Oid]
|
nextLocalOid*: Option[Oid]
|
||||||
|
|
||||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||||
spawn(name, turn) do (turn: var Turn):
|
spawnActor(name, turn) do (turn: var Turn):
|
||||||
let relay = Relay(
|
let relay = Relay(
|
||||||
facet: turn.facet,
|
facet: turn.facet,
|
||||||
packetWriter: opts.packetWriter,
|
packetWriter: opts.packetWriter,
|
||||||
|
@ -300,16 +297,29 @@ proc accepted(cap: Cap): Resolved =
|
||||||
|
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
|
|
||||||
import std/asyncfile
|
import std/[oserrors, posix]
|
||||||
export Unix
|
import pkg/sys/[files, handles, sockets]
|
||||||
|
export transportAddress.Unix
|
||||||
|
|
||||||
type StdioControlEntity = ref object of Entity
|
type StdioControlEntity = ref object of Entity
|
||||||
|
buf: ref seq[byte]
|
||||||
|
relay: Relay
|
||||||
stdin: AsyncFile
|
stdin: AsyncFile
|
||||||
|
|
||||||
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
|
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||||
close(entity.stdin)
|
close(entity.stdin)
|
||||||
close(stdout)
|
|
||||||
|
proc loop(entity: StdioControlEntity) {.asyncio.} =
|
||||||
|
new entity.buf
|
||||||
|
entity.buf[].setLen(0x1000)
|
||||||
|
while true:
|
||||||
|
let n = read(entity.stdin, entity.buf)
|
||||||
|
if n == 0:
|
||||||
|
stderr.writeLine "empty read on stdin, stopping actor"
|
||||||
|
stopActor(entity.relay.facet)
|
||||||
|
else:
|
||||||
|
entity.relay.recv(entity.buf[], 0..<n)
|
||||||
|
|
||||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||||
## Connect to an external dataspace over stdio.
|
## Connect to an external dataspace over stdio.
|
||||||
|
@ -327,101 +337,111 @@ when defined(posix):
|
||||||
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||||
let
|
let
|
||||||
facet = turn.facet
|
facet = turn.facet
|
||||||
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
fd = stdin.getOsFileHandle()
|
||||||
|
flags = fcntl(fd.cint, F_GETFL, 0)
|
||||||
|
if flags < 0: raiseOSError(osLastError())
|
||||||
|
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
|
||||||
|
raiseOSError(osLastError())
|
||||||
|
let entity = StdioControlEntity(
|
||||||
|
relay: relay, stdin: newAsyncFile(FD fd))
|
||||||
publish(turn, ds, TransportConnection(
|
publish(turn, ds, TransportConnection(
|
||||||
`addr`: ta.toPreserves,
|
`addr`: ta.toPreserves,
|
||||||
control: StdioControlEntity(stdin: asyncStdin).newCap(turn),
|
control: newCap(entity, turn),
|
||||||
resolved: relay.peer.accepted,
|
resolved: relay.peer.accepted,
|
||||||
))
|
))
|
||||||
const stdinReadSize = 0x2000
|
discard trampoline:
|
||||||
proc readCb(pktFut: Future[string]) {.gcsafe.} =
|
whelp loop(entity)
|
||||||
if not pktFut.failed:
|
|
||||||
var buf = pktFut.read
|
|
||||||
if buf.len == 0:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
relay.recv(cast[seq[byte]](buf))
|
|
||||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
|
||||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
|
||||||
|
|
||||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||||
## Connect to an external dataspace over stdin and stdout.
|
## Connect to an external dataspace over stdin and stdout.
|
||||||
connectTransport(turn, ds, transportAddress.Stdio())
|
connectTransport(turn, ds, transportAddress.Stdio())
|
||||||
|
|
||||||
import std/asyncnet
|
type
|
||||||
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
TcpEntity = ref object of Entity
|
||||||
|
relay: Relay
|
||||||
|
sock: AsyncConn[sockets.Protocol.TCP]
|
||||||
|
buf: ref seq[byte]
|
||||||
|
alive: bool
|
||||||
|
|
||||||
type SocketControlEntity = ref object of Entity
|
UnixEntity = ref object of Entity
|
||||||
socket: AsyncSocket
|
relay: Relay
|
||||||
|
sock: AsyncConn[sockets.Protocol.Unix]
|
||||||
|
buf: ref seq[byte]
|
||||||
|
alive: bool
|
||||||
|
|
||||||
method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) =
|
SocketEntity = TcpEntity | UnixEntity
|
||||||
|
|
||||||
|
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
|
||||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||||
close(entity.socket)
|
reset entity.alive
|
||||||
|
close(entity.sock)
|
||||||
|
|
||||||
type ShutdownEntity* = ref object of Entity
|
template socketLoop() {.dirty.}=
|
||||||
|
new entity.buf
|
||||||
|
entity.buf[].setLen(0x1000)
|
||||||
|
entity.alive = not entity.alive
|
||||||
|
while entity.alive:
|
||||||
|
let n = read(entity.sock, entity.buf)
|
||||||
|
if n < 0: raiseOSError(osLastError())
|
||||||
|
elif n == 0:
|
||||||
|
stderr.writeLine "empty read on socket, stopping actor"
|
||||||
|
stopActor(entity.relay.facet)
|
||||||
|
else:
|
||||||
|
entity.relay.recv(entity.buf[], 0..<n)
|
||||||
|
stderr.writeLine "breaking socketLoop"
|
||||||
|
|
||||||
|
proc loop(entity: TcpEntity) {.asyncio.} =
|
||||||
|
socketLoop()
|
||||||
|
proc loop(entity: UnixEntity) {.asyncio.} =
|
||||||
|
socketLoop()
|
||||||
|
|
||||||
|
type ShutdownEntity = ref object of Entity
|
||||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||||
stopActor(turn)
|
stopActor(turn)
|
||||||
|
|
||||||
proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) =
|
template bootSocketEntity() {.dirty.} =
|
||||||
proc socketWriter(turn: var Turn; buf: seq[byte]) =
|
proc publish(turn: var Turn) =
|
||||||
asyncCheck(turn, socket.send(cast[string](buf)))
|
publish(turn, ds, TransportConnection(
|
||||||
|
`addr`: ta.toPreserves,
|
||||||
|
control: newCap(entity, turn),
|
||||||
|
resolved: entity.relay.peer.accepted,
|
||||||
|
))
|
||||||
|
run(entity.relay.facet, publish)
|
||||||
|
loop(entity)
|
||||||
|
|
||||||
|
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
|
||||||
|
entity.sock = connectTcpAsync(ta.host, Port ta.port)
|
||||||
|
bootSocketEntity()
|
||||||
|
|
||||||
|
proc boot(entity: UnixEntity; ta: transportAddress.Unix; ds: Cap) {.asyncio.} =
|
||||||
|
entity.sock = connectUnixAsync(ta.path)
|
||||||
|
bootSocketEntity()
|
||||||
|
|
||||||
|
template spawnSocketRelay() {.dirty.} =
|
||||||
|
proc writeConn(turn: var Turn; buf: seq[byte]) =
|
||||||
|
discard trampoline:
|
||||||
|
whelp write(entity.sock, buf)
|
||||||
var ops = RelayActorOptions(
|
var ops = RelayActorOptions(
|
||||||
packetWriter: socketWriter,
|
packetWriter: writeConn,
|
||||||
initialOid: 0.Oid.some,
|
initialOid: 0.Oid.some,
|
||||||
)
|
)
|
||||||
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||||
let facet = turn.facet
|
entity.relay = relay
|
||||||
facet.actor.atExit do (turn: var Turn): close(socket)
|
atExit(turn.facet.actor) do (turn: var Turn):
|
||||||
publish(turn, ds, TransportConnection(
|
entity.alive = false
|
||||||
`addr`: transAddr,
|
close(entity.sock)
|
||||||
control: SocketControlEntity(socket: socket).newCap(turn),
|
discard trampoline:
|
||||||
resolved: relay.peer.accepted,
|
whelp boot(entity, ta, ds)
|
||||||
))
|
|
||||||
const recvSize = 0x4000
|
|
||||||
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
|
||||||
if pktFut.failed or pktFut.read.len == 0:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
relay.recv(cast[seq[byte]](pktFut.read))
|
|
||||||
if not socket.isClosed:
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
|
|
||||||
proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) =
|
|
||||||
let facet = turn.facet
|
|
||||||
fut.addCallback do ():
|
|
||||||
run(facet) do (turn: var Turn):
|
|
||||||
if fut.failed:
|
|
||||||
var ass = TransportConnection(
|
|
||||||
`addr`: ta,
|
|
||||||
resolved: Resolved(orKind: ResolvedKind.Rejected),
|
|
||||||
)
|
|
||||||
ass.resolved.rejected.detail = embed fut.error
|
|
||||||
publish(turn, ds, ass)
|
|
||||||
else:
|
|
||||||
connect(turn, ds, ta, socket)
|
|
||||||
|
|
||||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||||
let
|
let entity = TcpEntity()
|
||||||
facet = turn.facet
|
spawnSocketRelay()
|
||||||
socket = newAsyncSocket(
|
|
||||||
domain = AF_INET,
|
|
||||||
sockType = SOCK_STREAM,
|
|
||||||
protocol = IPPROTO_TCP,
|
|
||||||
buffered = false,
|
|
||||||
)
|
|
||||||
connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port))
|
|
||||||
|
|
||||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
||||||
## Relay a dataspace over a UNIX socket.
|
let entity = UnixEntity()
|
||||||
let socket = newAsyncSocket(
|
spawnSocketRelay()
|
||||||
domain = AF_UNIX,
|
|
||||||
sockType = SOCK_STREAM,
|
|
||||||
protocol = cast[Protocol](0),
|
|
||||||
buffered = false)
|
|
||||||
connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path))
|
|
||||||
|
|
||||||
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} =
|
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
||||||
if stepOff < route.pathSteps.len:
|
if stepOff < route.pathSteps.len:
|
||||||
let
|
let
|
||||||
step = route.pathSteps[stepOff]
|
step = route.pathSteps[stepOff]
|
||||||
|
@ -462,10 +482,10 @@ proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
||||||
onPublish(turn, ds, acceptPat) do (origin: Cap):
|
onPublish(turn, ds, acceptPat) do (origin: Cap):
|
||||||
walk(turn, ds, origin, route, transOff, 0)
|
walk(turn, ds, origin, route, transOff, 0)
|
||||||
|
|
||||||
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.gcsafe.}
|
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
|
||||||
|
|
||||||
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||||
spawn($stepType & "-step", turn) do (turn: var Turn):
|
spawnActor($stepType & "-step", turn) do (turn: var Turn):
|
||||||
let stepPat = grabRecord(stepType, grab())
|
let stepPat = grabRecord(stepType, grab())
|
||||||
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
||||||
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||||
|
@ -487,7 +507,7 @@ proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallbac
|
||||||
|
|
||||||
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||||
## Spawn actors that manage routes and appeasing gatekeepers.
|
## Spawn actors that manage routes and appeasing gatekeepers.
|
||||||
spawn("transport-connector", turn) do (turn: var Turn):
|
spawnActor("transport-connector", turn) do (turn: var Turn):
|
||||||
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||||
# Use a generic pattern and type matching
|
# Use a generic pattern and type matching
|
||||||
# in the during handler because it is easy.
|
# in the during handler because it is easy.
|
||||||
|
@ -496,15 +516,26 @@ proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||||
during(turn, ds, stdioPat) do:
|
during(turn, ds, stdioPat) do:
|
||||||
connectTransport(turn, ds, Stdio())
|
connectTransport(turn, ds, Stdio())
|
||||||
|
|
||||||
|
|
||||||
# TODO: tcp pattern
|
# TODO: tcp pattern
|
||||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
|
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
|
||||||
connectTransport(turn, ds, ta.value)
|
try: connectTransport(turn, ds, ta.value)
|
||||||
|
except CatchableError as e:
|
||||||
|
publish(turn, ds, TransportConnection(
|
||||||
|
`addr`: ta.toPreserve,
|
||||||
|
resolved: rejected(embed e),
|
||||||
|
))
|
||||||
|
|
||||||
# TODO: unix pattern
|
# TODO: unix pattern
|
||||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
|
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
|
||||||
connectTransport(turn, ds, ta.value)
|
try: connectTransport(turn, ds, ta.value)
|
||||||
|
except CatchableError as e:
|
||||||
|
publish(turn, ds, TransportConnection(
|
||||||
|
`addr`: ta.toPreserve,
|
||||||
|
resolved: rejected(embed e),
|
||||||
|
))
|
||||||
|
|
||||||
spawn("path-resolver", turn) do (turn: var Turn):
|
spawnActor("path-resolver", turn) do (turn: var Turn):
|
||||||
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
||||||
during(turn, ds, pat) do (route: Literal[Route]):
|
during(turn, ds, pat) do (route: Literal[Route]):
|
||||||
for i, transAddr in route.value.transports:
|
for i, transAddr in route.value.transports:
|
||||||
|
@ -515,10 +546,16 @@ proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||||
publish(turn, ds, ResolvedPathStep(
|
publish(turn, ds, ResolvedPathStep(
|
||||||
origin: origin, pathStep: step, resolved: next.accepted))
|
origin: origin, pathStep: step, resolved: next.accepted))
|
||||||
|
|
||||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
type BootProc* = proc (turn: var Turn; ds: Cap) {.closure.}
|
||||||
|
|
||||||
|
const defaultRoute* = "<route [<stdio>]>"
|
||||||
|
|
||||||
proc envRoute*: Route =
|
proc envRoute*: Route =
|
||||||
var text = getEnv("SYNDICATE_ROUTE")
|
## Get an route to a Syndicate capability from the calling environment.
|
||||||
|
## On UNIX this is the SYNDICATE_ROUTE environmental variable with a
|
||||||
|
## fallack to a defaultRoute_.
|
||||||
|
## See https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/raw/branch/main/schemas/gatekeeper.prs.
|
||||||
|
var text = getEnv("SYNDICATE_ROUTE", defaultRoute)
|
||||||
if text == "":
|
if text == "":
|
||||||
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
|
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
|
||||||
result.transports = @[initRecord("unix", tx)]
|
result.transports = @[initRecord("unix", tx)]
|
||||||
|
@ -529,7 +566,18 @@ proc envRoute*: Route =
|
||||||
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
||||||
|
|
||||||
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||||
|
## Resolve `route` within `ds` and call `bootProc` with resolved capabilities.
|
||||||
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
|
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
|
||||||
bootProc(turn, dst)
|
bootProc(turn, dst)
|
||||||
|
|
||||||
|
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
|
||||||
|
## Resolve a capability from the calling environment
|
||||||
|
## and call `bootProc`. See envRoute_.
|
||||||
|
let
|
||||||
|
ds = newDataspace(turn)
|
||||||
|
pat = ResolvePath ?: {0: ?envRoute(), 3: ?:ResolvedAccepted}
|
||||||
|
during(turn, ds, pat) do (dst: Cap):
|
||||||
|
bootProc(turn, dst)
|
||||||
|
spawnRelays(turn, ds)
|
||||||
|
|
||||||
# TODO: define a runActor that comes preloaded with relaying
|
# TODO: define a runActor that comes preloaded with relaying
|
||||||
|
|
|
@ -65,9 +65,9 @@ func isEmpty(cont: Continuation): bool =
|
||||||
cont.cache.len == 0 and cont.leafMap.len == 0
|
cont.cache.len == 0 and cont.leafMap.len == 0
|
||||||
|
|
||||||
type
|
type
|
||||||
ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.}
|
ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
|
||||||
LeafProc = proc (l: Leaf; v: Value) {.gcsafe.}
|
LeafProc = proc (l: Leaf; v: Value) {.closure.}
|
||||||
ObserverProc = proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) {.gcsafe.}
|
ObserverProc = proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
|
||||||
|
|
||||||
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
|
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
|
||||||
result = cont.leafMap.getOrDefault(constPaths)
|
result = cont.leafMap.getOrDefault(constPaths)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "20240208"
|
version = "20240304"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Syndicated actors for conversational concurrency"
|
description = "Syndicated actors for conversational concurrency"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
|
@ -9,4 +9,4 @@ srcDir = "src"
|
||||||
|
|
||||||
# Dependencies
|
# Dependencies
|
||||||
|
|
||||||
requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208"
|
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps"
|
||||||
|
|
|
@ -2,16 +2,25 @@
|
||||||
# SPDX-License-Identifier: Unlicense
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
import std/times
|
import std/times
|
||||||
|
import pkg/sys/ioqueue
|
||||||
import syndicate, syndicate/actors/timers
|
import syndicate, syndicate/actors/timers
|
||||||
|
|
||||||
proc now: float64 = getTime().toUnixFloat()
|
let actor = bootActor("timer-test") do (turn: var Turn):
|
||||||
|
let timers = newDataspace(turn)
|
||||||
|
spawnTimerActor(timers, turn)
|
||||||
|
|
||||||
runActor("test_timers") do (ds: Cap; turn: var Turn):
|
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
|
||||||
onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do:
|
echo "now in 13th bʼakʼtun"
|
||||||
stderr.writeLine "slept one second once"
|
|
||||||
onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do:
|
after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn):
|
||||||
stderr.writeLine "slept one second twice"
|
echo "third timer expired"
|
||||||
onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do:
|
stopActor(turn)
|
||||||
stderr.writeLine "slept one second thrice"
|
|
||||||
quit()
|
after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn):
|
||||||
spawnTimers(turn, ds)
|
echo "first timer expired"
|
||||||
|
|
||||||
|
after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn):
|
||||||
|
echo "second timer expired"
|
||||||
|
|
||||||
|
echo "single run of ioqueue"
|
||||||
|
ioqueue.run()
|
||||||
|
|
Loading…
Reference in New Issue