Compare commits
35 Commits
50a77995bc
...
7c72ea5732
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | 7c72ea5732 | |
Emery Hemingway | 2aee79662e | |
Emery Hemingway | 8b79dce5ba | |
Emery Hemingway | 6bcf039dc2 | |
Emery Hemingway | 217a6aacf3 | |
Emery Hemingway | aea9a2e4e6 | |
Emery Hemingway | 78d7efc712 | |
Emery Hemingway | 289754499c | |
Emery Hemingway | 4fe2173d81 | |
Emery Hemingway | 87e730bc5b | |
Emery Hemingway | 399fd4a30c | |
Emery Hemingway | bf8f7e9aaa | |
Emery Hemingway | d654195fb8 | |
Emery Hemingway | 76acf2cb67 | |
Emery Hemingway | 50b00827ce | |
Emery Hemingway | 81ce71d495 | |
Emery Hemingway | a3146f88a5 | |
Emery Hemingway | 9ca073d433 | |
Emery Hemingway | cd846d0d46 | |
Emery Hemingway | 82f2e8ee98 | |
Emery Hemingway | 1592fac3b1 | |
Emery Hemingway | 8ef95c0e1d | |
Emery Hemingway | a014362292 | |
Emery Hemingway | b8c1bec9cf | |
Emery Hemingway | cf395dbfa4 | |
Emery Hemingway | 9d975bab56 | |
Emery Hemingway | 15d2e8bfb4 | |
Emery Hemingway | eb5d4d9a57 | |
Emery Hemingway | 01f26caf7b | |
Emery Hemingway | e31069e41a | |
Emery Hemingway | fdf2994ec4 | |
Emery Hemingway | 0cee6670c9 | |
Emery Hemingway | 1ce96560f4 | |
Emery Hemingway | d365a1e6e5 | |
Emery Hemingway | 3e5d910d1a |
|
@ -54,7 +54,7 @@ The Syndicate DSL can be entered using `runActor` which calls a Nim body with a
|
|||
### Publish
|
||||
|
||||
``` nim
|
||||
runActor("main") do (dataspace: Ref; turn: var Turn):
|
||||
runActor("main") do (dataspace: Ref; turn: Turn):
|
||||
let presenceHandle = publish(turn, dataspace, Present(username: "Judy"))
|
||||
# publish <Present "Judy"> to the dataspace
|
||||
# the assertion can be later retracted by handle
|
||||
|
@ -67,7 +67,7 @@ runActor("main") do (dataspace: Ref; turn: var Turn):
|
|||
We can react to assertions and messages within dataspaces using [patterns](https://synit.org/book/glossary.html#dataspace-pattern). Patterns are constructed using a Nim type and the `?` operator. Again a Nim type is used rather than a raw Preserves for schema consistency.
|
||||
|
||||
``` nim
|
||||
runActor("main") do (dataspace: Ref; turn: var Turn):
|
||||
runActor("main") do (dataspace: Ref; turn: Turn):
|
||||
during(turn, dataspace, ?Present) do (who: string):
|
||||
# This body is active when the ?Present pattern is matched.
|
||||
# The Present type contains two atomic values that can be matched
|
||||
|
|
3
Tupfile
3
Tupfile
|
@ -1,2 +1,3 @@
|
|||
include_rules
|
||||
: lock.json |> !nim_cfg |> | ./<lock>
|
||||
: |> !nim_lk |> {lockfile}
|
||||
: {lockfile} |> !nim_cfg |> | ./<lock>
|
||||
|
|
|
@ -1,2 +1,5 @@
|
|||
include depends.tup
|
||||
NIM_GROUPS += $(TUP_CWD)/<lock>
|
||||
NIM_FLAGS += --path:$(TUP_CWD)/../cps
|
||||
NIM_FLAGS += --path:$(TUP_CWD)/../solo5_dispatcher/pkg
|
||||
NIM_FLAGS += --path:$(TUP_CWD)/../taps/src
|
||||
|
|
144
lock.json
144
lock.json
|
@ -6,12 +6,44 @@
|
|||
"bigints"
|
||||
],
|
||||
"path": "/nix/store/jvrm392g8adfsgf36prgwkbyd7vh5jsw-source",
|
||||
"ref": "20231006",
|
||||
"rev": "86ea14d31eea9275e1408ca34e6bfe9c99989a96",
|
||||
"sha256": "15pcpmnk1bnw3k8769rjzcpg00nahyrypwbxs88jnwr4aczp99j4",
|
||||
"srcDir": "src",
|
||||
"url": "https://github.com/ehmry/nim-bigints/archive/86ea14d31eea9275e1408ca34e6bfe9c99989a96.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"cps"
|
||||
],
|
||||
"path": "/nix/store/8gbhwni0akqskdb3qhn5nfgv6gkdz0vz-source",
|
||||
"rev": "c90530ac57f98a842b7be969115c6ef08bdcc564",
|
||||
"sha256": "0h8ghs2fqg68j3jdcg7grnxssmllmgg99kym2w0a3vlwca1zvr62",
|
||||
"srcDir": "",
|
||||
"url": "https://github.com/ehmry/cps/archive/c90530ac57f98a842b7be969115c6ef08bdcc564.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"getdns"
|
||||
],
|
||||
"path": "/nix/store/x9xmn7w4k6jg8nv5bnx148ibhnsfh362-source",
|
||||
"rev": "c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6",
|
||||
"sha256": "1sbgx2x51szr22i72n7c8jglnfmr8m7y7ga0v85d58fwadiv7g6b",
|
||||
"srcDir": "src",
|
||||
"url": "https://git.sr.ht/~ehmry/getdns-nim/archive/c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"getdns"
|
||||
],
|
||||
"path": "/nix/store/x9xmn7w4k6jg8nv5bnx148ibhnsfh362-source",
|
||||
"rev": "c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6",
|
||||
"sha256": "1sbgx2x51szr22i72n7c8jglnfmr8m7y7ga0v85d58fwadiv7g6b",
|
||||
"srcDir": "src",
|
||||
"url": "https://git.sr.ht/~ehmry/getdns-nim/archive/c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
|
@ -28,36 +60,116 @@
|
|||
"packages": [
|
||||
"nimcrypto"
|
||||
],
|
||||
"path": "/nix/store/zyr8zwh7vaiycn1s4r8cxwc71f2k5l0h-source",
|
||||
"ref": "traditional-api",
|
||||
"rev": "602c5d20c69c76137201b5d41f788f72afb95aa8",
|
||||
"sha256": "1dmdmgb6b9m5f8dyxk781nnd61dsk3hdxqks7idk9ncnpj9fng65",
|
||||
"path": "/nix/store/fkrcpp8lzj2yi21na79xm63xk0ggnqsp-source",
|
||||
"rev": "f147d30c69bc1c9bcf0e37f7699bcf0fbaab97b5",
|
||||
"sha256": "1h3dzdbc9kacwpi10mj73yjglvn7kbizj1x8qc9099ax091cj5xn",
|
||||
"srcDir": "",
|
||||
"url": "https://github.com/cheatfate/nimcrypto/archive/602c5d20c69c76137201b5d41f788f72afb95aa8.tar.gz"
|
||||
"url": "https://github.com/cheatfate/nimcrypto/archive/f147d30c69bc1c9bcf0e37f7699bcf0fbaab97b5.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"npeg"
|
||||
],
|
||||
"path": "/nix/store/ffkxmjmigfs7zhhiiqm0iw2c34smyciy-source",
|
||||
"ref": "1.2.1",
|
||||
"rev": "26d62fdc40feb84c6533956dc11d5ee9ea9b6c09",
|
||||
"sha256": "0xpzifjkfp49w76qmaylan8q181bs45anmp46l4bwr3lkrr7bpwh",
|
||||
"path": "/nix/store/xpn694ibgipj8xak3j4bky6b3k0vp7hh-source",
|
||||
"rev": "ec0cc6e64ea4c62d2aa382b176a4838474238f8d",
|
||||
"sha256": "1fi9ls3xl20bmv1ikillxywl96i9al6zmmxrbffx448gbrxs86kg",
|
||||
"srcDir": "src",
|
||||
"url": "https://github.com/zevv/npeg/archive/26d62fdc40feb84c6533956dc11d5ee9ea9b6c09.tar.gz"
|
||||
"url": "https://github.com/zevv/npeg/archive/ec0cc6e64ea4c62d2aa382b176a4838474238f8d.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"preserves"
|
||||
],
|
||||
"path": "/nix/store/6nnn5di5vip1vladlb7z56rbw18d1y7j-source",
|
||||
"ref": "20240208",
|
||||
"rev": "2825bceecf33a15b9b7942db5331a32cbc39b281",
|
||||
"sha256": "145vf46fy3wc52j6vs509fm9bi5lx7c53gskbkpcfbkv82l86dgk",
|
||||
"path": "/nix/store/2hy124xgabz134dxj3wji7mp47fdwy3w-source",
|
||||
"rev": "9ae435a83c6d5028405538af5d24a023af625b6e",
|
||||
"sha256": "1k7ywcp1a53x2fpc6wc2b0qzb264dkifash0s1wcp66rw3lx15k2",
|
||||
"srcDir": "src",
|
||||
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/2825bceecf33a15b9b7942db5331a32cbc39b281.tar.gz"
|
||||
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/9ae435a83c6d5028405538af5d24a023af625b6e.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"stew"
|
||||
],
|
||||
"path": "/nix/store/mqg8qzsbcc8xqabq2yzvlhvcyqypk72c-source",
|
||||
"rev": "3c91b8694e15137a81ec7db37c6c58194ec94a6a",
|
||||
"sha256": "17lfhfxp5nxvld78xa83p258y80ks5jb4n53152cdr57xk86y07w",
|
||||
"srcDir": "",
|
||||
"url": "https://github.com/status-im/nim-stew/archive/3c91b8694e15137a81ec7db37c6c58194ec94a6a.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"sys"
|
||||
],
|
||||
"path": "/nix/store/syhxsjlsdqfap0hk4qp3s6kayk8cqknd-source",
|
||||
"rev": "4ef3b624db86e331ba334e705c1aa235d55b05e1",
|
||||
"sha256": "1q4qgw4an4mmmcbx48l6xk1jig1vc8p9cq9dbx39kpnb0890j32q",
|
||||
"srcDir": "src",
|
||||
"url": "https://github.com/ehmry/nim-sys/archive/4ef3b624db86e331ba334e705c1aa235d55b05e1.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"sys"
|
||||
],
|
||||
"path": "/nix/store/vf9ls2wip6d8xhsi3rjh0dqsqg597i6b-source",
|
||||
"rev": "c117ee60542f084525f254e6ade590675a6a2ed6",
|
||||
"sha256": "12qzx2lnh84xqfgypy0pka8nflq0y8n1izfwx8mb4zya5nzawmyf",
|
||||
"srcDir": "src",
|
||||
"url": "https://github.com/alaviss/nim-sys/archive/c117ee60542f084525f254e6ade590675a6a2ed6.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"taps"
|
||||
],
|
||||
"path": "/nix/store/n86g5fw60z1k53bn35zvrwlwmyk3ixdn-source",
|
||||
"rev": "756cb07b4f874181ad34c370cad6082ee65f646d",
|
||||
"sha256": "0dp7ml3kj2fi6isvjkkxf02hwj0gshx6qra0ghnk2cbfykbcgfp8",
|
||||
"srcDir": "src",
|
||||
"url": "https://git.sr.ht/~ehmry/nim_taps/archive/756cb07b4f874181ad34c370cad6082ee65f646d.tar.gz"
|
||||
},
|
||||
{
|
||||
"date": "2024-04-02T15:38:57+01:00",
|
||||
"deepClone": false,
|
||||
"fetchLFS": false,
|
||||
"fetchSubmodules": true,
|
||||
"hash": "sha256-iZb9aAgYr4FGkqfIg49QWiCqeizIi047kFhugHiP8o0=",
|
||||
"leaveDotGit": false,
|
||||
"method": "git",
|
||||
"packages": [
|
||||
"solo5_dispatcher"
|
||||
],
|
||||
"path": "/nix/store/sf5dgj2ljvahcm6my7d61ibda51vnrii-solo5_dispatcher",
|
||||
"rev": "a7a894a96a2221284012800e6fd32923d83d20bd",
|
||||
"sha256": "13gjixw80vjqj0xlx2y85ixal82sa27q7j57j9383bqq11lgv5l9",
|
||||
"srcDir": "pkg",
|
||||
"url": "https://git.sr.ht/~ehmry/solo5_dispatcher"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"cps"
|
||||
],
|
||||
"path": "/nix/store/phdf6siqbhj7vx4qq507lzla81si60iz-source",
|
||||
"rev": "58772ff9ddb38a4b2ec52da142d8532ba2fe7039",
|
||||
"sha256": "1lph7v27nqwgm3a0ssi8q348gjrkjwgqc50agw38j7xif6wj80cw",
|
||||
"srcDir": "",
|
||||
"url": "https://github.com/ehmry/cps/archive/58772ff9ddb38a4b2ec52da142d8532ba2fe7039.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"stew"
|
||||
],
|
||||
"path": "/nix/store/mqg8qzsbcc8xqabq2yzvlhvcyqypk72c-source",
|
||||
"rev": "3c91b8694e15137a81ec7db37c6c58194ec94a6a",
|
||||
"sha256": "17lfhfxp5nxvld78xa83p258y80ks5jb4n53152cdr57xk86y07w",
|
||||
"srcDir": "",
|
||||
"url": "https://github.com/status-im/nim-stew/archive/3c91b8694e15137a81ec7db37c6c58194ec94a6a.tar.gz"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
{ pkgs ? import <nixpkgs> { } }:
|
||||
|
||||
pkgs.buildNimPackage {
|
||||
name = "dummy";
|
||||
lockFile = ./lock.json;
|
||||
buildInputs = builtins.attrValues { inherit (pkgs) getdns solo5; };
|
||||
nativeBuildInputs = builtins.attrValues { inherit (pkgs) pkg-config solo5; };
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
## This module implements the `Syndicate DSL <https://syndicate-lang.org/doc/syndicate/>`_.
|
||||
|
||||
import std/[asyncdispatch, macros, tables, typetraits]
|
||||
import std/[macros, tables, typetraits]
|
||||
|
||||
import preserves
|
||||
export fromPreserves, toPreserves
|
||||
|
@ -34,21 +34,21 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline.
|
|||
patterns.inject(pat, bindings)
|
||||
|
||||
type
|
||||
PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure, gcsafe.}
|
||||
RetractProc = proc (turn: var Turn; h: Handle) {.closure, gcsafe.}
|
||||
MessageProc = proc (turn: var Turn; v: Value) {.closure, gcsafe.}
|
||||
PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure.}
|
||||
RetractProc = proc (turn: var Turn; h: Handle) {.closure.}
|
||||
MessageProc = proc (turn: var Turn; v: Value) {.closure.}
|
||||
ClosureEntity = ref object of Entity
|
||||
publishImpl: PublishProc
|
||||
retractImpl: RetractProc
|
||||
messageImpl: MessageProc
|
||||
publishImpl*: PublishProc
|
||||
retractImpl*: RetractProc
|
||||
messageImpl*: MessageProc
|
||||
|
||||
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
||||
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
|
||||
|
||||
method retract(e: ClosureEntity; turn: var Turn; h: Handle) {.gcsafe.} =
|
||||
method retract(e: ClosureEntity; turn: var Turn; h: Handle) =
|
||||
if not e.retractImpl.isNil: e.retractImpl(turn, h)
|
||||
|
||||
method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) {.gcsafe.} =
|
||||
method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) =
|
||||
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
|
||||
|
||||
proc argumentCount(handler: NimNode): int =
|
||||
|
@ -187,16 +187,14 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
|
|||
`callbackProc`
|
||||
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
|
||||
|
||||
type BootProc = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.gcsafe.}
|
||||
|
||||
proc runActor*(name: string; bootProc: BootProc) =
|
||||
## Run an `Actor` to completion.
|
||||
let actor = bootDataspace(name, bootProc)
|
||||
while actor.running:
|
||||
waitFor sleepAsync(500)
|
||||
|
||||
proc runActor*(name: string; bootProc: DeprecatedBootProc) {.deprecated.} =
|
||||
## Run an `Actor` to completion.
|
||||
runActor(name) do (turn: var Turn, ds: Cap):
|
||||
bootProc(ds, turn)
|
||||
when defined(solo5):
|
||||
echo """
|
||||
______
|
||||
/ \_\
|
||||
/ ,__/ \ ____ __
|
||||
/\__/ \, \ _______ ______ ____/ /_/________ / /____
|
||||
\/ \__/ / / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \
|
||||
\ ' \__/ _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/
|
||||
\____/_/ /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/
|
||||
/____/
|
||||
"""
|
||||
|
|
|
@ -1,30 +1,27 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncfutures, hashes, monotimes, options, sets, tables, times]
|
||||
import std/[assertions, deques, hashes, monotimes, options, sets, tables, times]
|
||||
import cps
|
||||
|
||||
import preserves
|
||||
import ../syndicate/protocols/[protocol, sturdy]
|
||||
import ../syndicate/protocols/trace
|
||||
|
||||
when defined(solo5):
|
||||
import solo5_dispatcher
|
||||
else:
|
||||
import pkg/sys/ioqueue
|
||||
|
||||
const tracing = defined(traceSyndicate)
|
||||
|
||||
when tracing:
|
||||
import std/streams
|
||||
from std/os import getEnv
|
||||
import ./protocols/trace
|
||||
when not defined(solo5):
|
||||
from std/os import getEnv
|
||||
|
||||
export Handle
|
||||
|
||||
template generateIdType(typ: untyped) =
|
||||
type typ* = distinct Natural
|
||||
proc `==`*(x, y: typ): bool {.borrow.}
|
||||
proc `$`*(id: typ): string {.borrow.}
|
||||
|
||||
generateIdType(ActorId)
|
||||
generateIdType(FacetId)
|
||||
generateIdType(EndpointId)
|
||||
generateIdType(FieldId)
|
||||
generateIdType(TurnId)
|
||||
|
||||
type
|
||||
Oid = sturdy.Oid
|
||||
Caveat = sturdy.Caveat
|
||||
|
@ -37,11 +34,13 @@ type
|
|||
# C code has "redefinition of struct" problems when orc is enabled
|
||||
|
||||
Entity* = ref object of RootObj
|
||||
facet*: Facet
|
||||
oid*: Oid # oid is how Entities are identified over the wire
|
||||
|
||||
Cap* {.preservesEmbedded.} = ref object of EmbeddedObj
|
||||
relay*: Facet
|
||||
target*: Entity
|
||||
relay*: Facet
|
||||
# Entity has facet but a Cap is also scoped to a relay Facet
|
||||
attenuation*: Attenuation
|
||||
|
||||
Ref* {.deprecated: "Ref was renamed to Cap".} = Cap
|
||||
|
@ -53,6 +52,7 @@ type
|
|||
OutboundTable = Table[Handle, OutboundAssertion]
|
||||
|
||||
Actor* = ref object
|
||||
next: Actor
|
||||
name: string
|
||||
handleAllocator: ref Handle
|
||||
# a fresh actor gets a new ref Handle and
|
||||
|
@ -61,18 +61,15 @@ type
|
|||
exitReason: ref Exception
|
||||
exitHooks: seq[TurnAction]
|
||||
id: ActorId
|
||||
facetIdAllocator: uint
|
||||
exiting, exited: bool
|
||||
when tracing:
|
||||
turnIdAllocator: ref TurnId
|
||||
traceStream: FileStream
|
||||
|
||||
TurnAction* = proc (t: var Turn) {.gcsafe.}
|
||||
TurnAction* = proc (t: var Turn) {.closure.}
|
||||
|
||||
Queues = TableRef[Facet, seq[TurnAction]]
|
||||
|
||||
Turn* = object # an object that should remain on the stack
|
||||
facet: Facet
|
||||
queues: Queues # a ref object that can outlive Turn
|
||||
Turn* = ref object
|
||||
facet: Facet # active facet that may change during a turn
|
||||
work: Deque[tuple[facet: Facet, act: TurnAction]]
|
||||
effects: Table[Actor, Turn]
|
||||
when tracing:
|
||||
desc: TurnDescription
|
||||
|
||||
|
@ -87,31 +84,75 @@ type
|
|||
id: FacetId
|
||||
isAlive: bool
|
||||
|
||||
var turnQueue {.threadvar.}: Deque[Turn]
|
||||
|
||||
when tracing:
|
||||
|
||||
proc nextTurnId(facet: Facet): TurnId =
|
||||
result = succ(facet.actor.turnIdAllocator[])
|
||||
facet.actor.turnIdAllocator[] = result
|
||||
|
||||
proc trace(actor: Actor; act: ActorActivation) =
|
||||
if not actor.traceStream.isNil:
|
||||
var entry = TraceEntry(
|
||||
when defined(solo5):
|
||||
proc traceActivation(actor: Actor; act: ActorActivation) =
|
||||
discard #[
|
||||
echo TraceEntry(
|
||||
timestamp: getTime().toUnixFloat(),
|
||||
actor: initRecord("named", actor.name.toPreserves),
|
||||
item: act)
|
||||
actor.traceStream.writeText entry.toPreserves
|
||||
actor.traceStream.writeLine()
|
||||
item: act,
|
||||
).toPreserves
|
||||
]#
|
||||
else:
|
||||
proc openTraceStream: FileStream =
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE")
|
||||
case path
|
||||
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset"
|
||||
of "-": result = newFileStream(stderr)
|
||||
else: result = openFileStream(path, fmWrite)
|
||||
|
||||
proc path(facet: Facet): seq[trace.FacetId] =
|
||||
let traceStream = openTraceStream()
|
||||
|
||||
proc traceActivation(actor: Actor; act: ActorActivation) =
|
||||
if not traceStream.isNil:
|
||||
var entry = TraceEntry(
|
||||
timestamp: getTime().toUnixFloat(),
|
||||
actor: initRecord("named", actor.name.toPreserves),
|
||||
item: act)
|
||||
traceStream.write(entry.toPreserves)
|
||||
traceStream.flush()
|
||||
|
||||
var turnIdAllocator: uint
|
||||
|
||||
proc nextTurnId(): TurnId =
|
||||
inc(turnIdAllocator)
|
||||
turnIdAllocator.toPreserves
|
||||
|
||||
proc path(facet: Facet): seq[FacetId] =
|
||||
var f = facet
|
||||
while not f.isNil:
|
||||
result.add f.id.toPreserves
|
||||
f = f.parent
|
||||
|
||||
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base, gcsafe.} = discard
|
||||
method retract*(e: Entity; turn: var Turn; h: Handle) {.base, gcsafe.} = discard
|
||||
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base, gcsafe.} = discard
|
||||
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base, gcsafe.} = discard
|
||||
proc initEnqueue(turn: Turn; cap: Cap): ActionDescription =
|
||||
result = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
result.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
|
||||
result.enqueue.event.target.facet = turn.facet.id.toPreserves
|
||||
result.enqueue.event.target.oid = cap.target.oid.toPreserves
|
||||
|
||||
proc toDequeue(act: sink ActionDescription): ActionDescription =
|
||||
result = ActionDescription(orKind: ActionDescriptionKind.dequeue)
|
||||
result.dequeue.event = move act.enqueue.event
|
||||
|
||||
proc toTraceTarget(cap: Cap): Target =
|
||||
assert not cap.target.isNil
|
||||
assert not cap.target.facet.isNil
|
||||
result.actor = cap.target.facet.actor.id
|
||||
result.facet = cap.target.facet.id
|
||||
result.oid = cap.target.oid.toPreserves
|
||||
|
||||
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
||||
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
|
||||
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
|
||||
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
|
||||
|
||||
converter toActor(f: Facet): Actor = f.actor
|
||||
converter toActor(t: Turn): Actor = t.facet.actor
|
||||
converter toFacet(a: Actor): Facet = a.root
|
||||
converter toFacet(t: Turn): Facet = t.facet
|
||||
|
||||
using
|
||||
actor: Actor
|
||||
|
@ -120,48 +161,99 @@ using
|
|||
action: TurnAction
|
||||
|
||||
proc labels(f: Facet): string =
|
||||
assert not f.isNil
|
||||
assert not f.actor.isNil
|
||||
result.add f.actor.name
|
||||
proc catLabels(f: Facet; labels: var string) =
|
||||
labels.add ':'
|
||||
if not f.parent.isNil:
|
||||
catLabels(f.parent, labels)
|
||||
labels.add ':'
|
||||
when tracing:
|
||||
labels.add $f.id
|
||||
result.add f.actor.name
|
||||
catLabels(f, result)
|
||||
|
||||
proc `$`*(f: Facet): string =
|
||||
"<Facet:" & f.labels & ">"
|
||||
|
||||
proc `$`*(r: Cap): string =
|
||||
"<Ref:" & r.relay.labels & ">"
|
||||
|
||||
proc `$`*(actor: Actor): string =
|
||||
"<Actor:" & actor.name & ">" # TODO: ambigous
|
||||
|
||||
proc attenuate(r: Cap; a: Attenuation): Cap =
|
||||
when tracing:
|
||||
|
||||
proc `$`*(r: Cap): string =
|
||||
"<Ref:" & r.relay.labels & ">"
|
||||
|
||||
proc `$`*(t: Turn): string =
|
||||
"<Turn:" & $t.desc.id & ">"
|
||||
|
||||
proc attenuate*(r: Cap; a: Attenuation): Cap =
|
||||
if a.len == 0: result = r
|
||||
else: result = Cap(
|
||||
relay: r.relay,
|
||||
target: r.target,
|
||||
relay: r.relay,
|
||||
attenuation: a & r.attenuation)
|
||||
|
||||
proc hash*(actor): Hash =
|
||||
result = actor[].unsafeAddr.hash
|
||||
|
||||
proc hash*(facet): Hash =
|
||||
facet.id.hash
|
||||
facet[].unsafeAddr.hash
|
||||
|
||||
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
|
||||
|
||||
proc actor*(turn): Actor = turn.facet.actor
|
||||
|
||||
proc nextHandle(facet: Facet): Handle =
|
||||
result = succ(facet.actor.handleAllocator[])
|
||||
facet.actor.handleAllocator[] = result
|
||||
|
||||
proc facet*(turn: var Turn): Facet = turn.facet
|
||||
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||
let facet = turn.facet
|
||||
block:
|
||||
body
|
||||
assert facet.actor == turn.facet.actor
|
||||
turn.facet = facet
|
||||
|
||||
proc enqueue(turn: var Turn; target: Facet; action: TurnAction) =
|
||||
if target in turn.queues:
|
||||
turn.queues[target].add action
|
||||
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
|
||||
assert not facet.isNil
|
||||
turn.work.addLast((facet, act,))
|
||||
|
||||
proc queueTurn*(facet: Facet; act: TurnAction) =
|
||||
var turn = Turn(facet: facet)
|
||||
assert not facet.isNil
|
||||
turn.work.addLast((facet, act,))
|
||||
when tracing:
|
||||
turn.desc.id = nextTurnId()
|
||||
turnQueue.addLast(turn)
|
||||
|
||||
proc queueTurn*(prev: var Turn; facet: Facet; act: TurnAction) =
|
||||
var next = Turn(facet: facet)
|
||||
assert not facet.isNil
|
||||
next.work.addLast((facet, act,))
|
||||
when tracing:
|
||||
next.desc.id = nextTurnId()
|
||||
next.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
|
||||
next.desc.cause.turn.id = prev.desc.id
|
||||
turnQueue.addLast(next)
|
||||
|
||||
proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action)
|
||||
## Alias to queueTurn_.
|
||||
|
||||
proc facet*(turn: Turn): Facet = turn.facet
|
||||
|
||||
proc queueEffect*(turn: var Turn; target: Facet; act: TurnAction) =
|
||||
let fremd = target.actor
|
||||
if fremd == turn.facet.actor:
|
||||
turn.work.addLast((target, act,))
|
||||
else:
|
||||
turn.queues[target] = @[action]
|
||||
var fremdTurn = turn.effects.getOrDefault(fremd)
|
||||
if fremdTurn.isNil:
|
||||
fremdTurn = Turn(facet: target)
|
||||
turn.effects[fremd] = fremdTurn
|
||||
when tracing:
|
||||
fremdTurn.desc.id = nextTurnId()
|
||||
fremdTurn.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
|
||||
fremdTurn.desc.cause.turn.id = turn.desc.id
|
||||
fremdTurn.work.addLast((target, act,))
|
||||
|
||||
type Bindings = Table[Value, Value]
|
||||
|
||||
|
@ -171,7 +263,7 @@ proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
|
|||
of PatternKind.Patom:
|
||||
result = case p.patom
|
||||
of PAtom.Boolean: v.isBoolean
|
||||
of PAtom.Double: v.isDouble
|
||||
of PAtom.Double: v.isFloat
|
||||
of PAtom.Signedinteger: v.isInteger
|
||||
of PAtom.String: v.isString
|
||||
of PAtom.Bytestring: v.isByteString
|
||||
|
@ -275,25 +367,26 @@ proc runRewrites*(a: Attenuation; v: Value): Value =
|
|||
result = examineAlternatives(stage, result)
|
||||
if result.isFalse: break
|
||||
|
||||
proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
|
||||
var a = runRewrites(r.attenuation, v)
|
||||
proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) =
|
||||
var a = runRewrites(cap.attenuation, v)
|
||||
if not a.isFalse:
|
||||
let e = OutboundAssertion(
|
||||
handle: h, peer: r, established: false)
|
||||
let e = OutboundAssertion(handle: h, peer: cap)
|
||||
turn.facet.outbound[h] = e
|
||||
enqueue(turn, r.relay) do (turn: var Turn):
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
|
||||
act.enqueue.event.target.facet = turn.facet.id.toPreserves
|
||||
act.enqueue.event.target.oid = cap.target.oid.toPreserves
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: trace.TurnEventKind.assert)
|
||||
act.enqueue.event.detail.assert.assertion.value.value =
|
||||
mapEmbeds(v) do (cap: Value) -> Value: discard
|
||||
act.enqueue.event.detail.assert.handle = h
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, cap.relay) do (turn: var Turn):
|
||||
e.established = true
|
||||
publish(r.target, turn, AssertionRef(value: a), e.handle)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
|
||||
act.enqueue.event.target.facet = turn.facet.id.toPreserves
|
||||
act.enqueue.event.target.oid = r.target.oid.toPreserves
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
act.enqueue.event.detail.assert.assertion.value.value =
|
||||
mapEmbeds(v) do (cap: Value) -> Value: discard
|
||||
act.enqueue.event.detail.assert.handle = h
|
||||
turn.desc.actions.add act
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
publish(cap.target, turn, AssertionRef(value: a), e.handle)
|
||||
|
||||
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
|
||||
result = turn.facet.nextHandle()
|
||||
|
@ -303,7 +396,14 @@ proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
|
|||
publish(turn, r, a.toPreserves)
|
||||
|
||||
proc retract(turn: var Turn; e: OutboundAssertion) =
|
||||
enqueue(turn, e.peer.relay) do (turn: var Turn):
|
||||
when tracing:
|
||||
var act = initEnqueue(turn, e.peer)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.retract)
|
||||
act.enqueue.event.detail.retract.handle = e.handle
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, e.peer.relay) do (turn: var Turn):
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
if e.established:
|
||||
e.established = false
|
||||
e.peer.target.retract(turn, e.handle)
|
||||
|
@ -316,18 +416,30 @@ proc retract*(turn: var Turn; h: Handle) =
|
|||
proc message*(turn: var Turn; r: Cap; v: Value) =
|
||||
var a = runRewrites(r.attenuation, v)
|
||||
if not a.isFalse:
|
||||
enqueue(turn, r.relay) do (turn: var Turn):
|
||||
when tracing:
|
||||
var act = initEnqueue(turn, r)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.message)
|
||||
act.enqueue.event.detail.message.body.value.value =
|
||||
mapEmbeds(a) do (cap: Value) -> Value: discard
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
r.target.message(turn, AssertionRef(value: a))
|
||||
|
||||
proc message*[T](turn: var Turn; r: Cap; v: T) =
|
||||
message(turn, r, v.toPreserves)
|
||||
|
||||
proc sync(turn: var Turn; e: Entity; peer: Cap) =
|
||||
e.sync(turn, peer)
|
||||
|
||||
proc sync*(turn: var Turn; r, peer: Cap) =
|
||||
enqueue(turn, r.relay) do (turn: var Turn):
|
||||
sync(turn, r.target, peer)
|
||||
when tracing:
|
||||
var act = initEnqueue(turn, peer)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.sync)
|
||||
act.enqueue.event.detail.sync.peer = peer.toTraceTarget
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
r.target.sync(turn, peer)
|
||||
|
||||
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
|
||||
result = publish(turn, cap, v)
|
||||
|
@ -341,13 +453,12 @@ proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discar
|
|||
retract(turn, old)
|
||||
h
|
||||
|
||||
proc stop*(turn: var Turn) {.gcsafe.}
|
||||
|
||||
proc run*(facet; action: TurnAction; zombieTurn = false) {.gcsafe.}
|
||||
proc stop*(turn: var Turn)
|
||||
|
||||
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
|
||||
inc actor.facetIdAllocator
|
||||
result = Facet(
|
||||
id: getMonoTime().ticks.FacetId,
|
||||
id: actor.facetIdAllocator.toPreserves,
|
||||
actor: actor,
|
||||
parent: parent,
|
||||
outbound: initialAssertions,
|
||||
|
@ -359,127 +470,147 @@ proc newFacet(actor; parent: Facet): Facet =
|
|||
newFacet(actor, parent, initialAssertions)
|
||||
|
||||
proc isInert(facet): bool =
|
||||
result = facet.children.len == 0 and
|
||||
(facet.outbound.len == 0 or facet.parent.isNil) and
|
||||
facet.inertCheckPreventers == 0
|
||||
let
|
||||
noKids = facet.children.len == 0
|
||||
noOutboundHandles = facet.outbound.len == 0
|
||||
isRootFacet = facet.parent.isNil
|
||||
noInertCheckPreventers = facet.inertCheckPreventers == 0
|
||||
result = noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
|
||||
|
||||
proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
|
||||
var armed = true
|
||||
inc facet.inertCheckPreventers
|
||||
proc disarm() =
|
||||
if armed:
|
||||
armed = false
|
||||
dec facet.inertCheckPreventers
|
||||
result = disarm
|
||||
proc preventInertCheck*(turn: Turn) =
|
||||
inc turn.facet.inertCheckPreventers
|
||||
|
||||
proc inFacet(turn: var Turn; facet; act: TurnAction) =
|
||||
## Call an action with a facet using a temporary `Turn`
|
||||
## that shares the `Queues` of the calling `Turn`.
|
||||
var t = Turn(facet: facet, queues: turn.queues)
|
||||
act(t)
|
||||
proc terminateActor(turn; reason: ref Exception)
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception) {.gcsafe.}
|
||||
|
||||
proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} =
|
||||
proc terminateFacetOrderly(turn: var Turn) =
|
||||
let facet = turn.facet
|
||||
if facet.isAlive:
|
||||
facet.isAlive = false
|
||||
let parent = facet.parent
|
||||
if not parent.isNil:
|
||||
parent.children.excl facet
|
||||
block:
|
||||
var turn = Turn(facet: facet, queues: turn.queues)
|
||||
while facet.children.len > 0:
|
||||
facet.children.pop.terminate(turn, orderly)
|
||||
if orderly:
|
||||
for act in facet.shutdownActions:
|
||||
act(turn)
|
||||
for a in facet.outbound.values: turn.retract(a)
|
||||
if orderly:
|
||||
if not parent.isNil:
|
||||
if parent.isInert:
|
||||
parent.terminate(turn, true)
|
||||
else:
|
||||
terminate(facet.actor, turn, nil)
|
||||
var i = 0
|
||||
while i < facet.shutdownActions.len:
|
||||
facet.shutdownActions[i](turn)
|
||||
inc i
|
||||
setLen facet.shutdownActions, 0
|
||||
for e in facet.outbound.values:
|
||||
retract(turn, e)
|
||||
clear facet.outbound
|
||||
|
||||
proc inertCheck(turn: var Turn) =
|
||||
if (not turn.facet.parent.isNil and
|
||||
(not turn.facet.parent.isAlive)) or
|
||||
turn.facet.isInert:
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
act.facetstop.path = turn.facet.path
|
||||
act.facetstop.reason = FacetStopReason.inert
|
||||
turn.desc.actions.add act
|
||||
stop(turn)
|
||||
|
||||
proc terminateFacet(turn: var Turn) =
|
||||
let facet = turn.facet
|
||||
for child in facet.children:
|
||||
queueWork(turn, child, terminateFacetOrderly)
|
||||
# terminate all children
|
||||
facet.children.clear()
|
||||
# detach all children
|
||||
queueWork(turn, facet, terminateFacetOrderly)
|
||||
# self-termination
|
||||
|
||||
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
||||
proc wrapper(turn: var Turn) =
|
||||
proc work(turn: var Turn) =
|
||||
queueEffect(turn, turn.facet, inertCheck)
|
||||
action(turn)
|
||||
enqueue(turn, turn.facet) do (turn: var Turn):
|
||||
if (not turn.facet.parent.isNil and
|
||||
(not turn.facet.parent.isAlive)) or
|
||||
turn.facet.isInert:
|
||||
stop(turn)
|
||||
wrapper
|
||||
work
|
||||
|
||||
proc newFacet*(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
|
||||
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
|
||||
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet {.discardable.} =
|
||||
result = newFacet(turn)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
act.facetstart.path.add result.path
|
||||
turn.desc.actions.add act
|
||||
inFacet(turn, result, stopIfInertAfter(bootProc))
|
||||
recallFacet turn:
|
||||
turn.facet = result
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
act.facetstart.path.add result.path
|
||||
turn.desc.actions.add act
|
||||
stopIfInertAfter(bootProc)(turn)
|
||||
|
||||
proc facet*(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||
|
||||
proc newActor(name: string; handleAlloc: ref Handle): Actor =
|
||||
let
|
||||
now = getTime()
|
||||
seed = now.toUnix * 1_000_000_000 + now.nanosecond
|
||||
proc newActor(name: string; parent: Facet): Actor =
|
||||
result = Actor(
|
||||
name: name,
|
||||
id: ActorId(seed),
|
||||
handleAllocator: handleAlloc,
|
||||
id: name.toPreserves,
|
||||
)
|
||||
result.root = newFacet(result, nil)
|
||||
if parent.isNil:
|
||||
new result.handleAllocator
|
||||
else:
|
||||
result.handleAllocator = parent.actor.handleAllocator
|
||||
result.root = newFacet(result, parent)
|
||||
when tracing:
|
||||
var act = ActorActivation(orKind: ActorActivationKind.start)
|
||||
act.start.actorName = Name(orKind: NameKind.named)
|
||||
act.start.actorName.named.name = name.toPreserves
|
||||
trace(result, act)
|
||||
traceActivation(result, act)
|
||||
|
||||
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
|
||||
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
|
||||
proc run(actor: Actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
|
||||
queueTurn(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
|
||||
|
||||
proc bootActor*(name: string; bootProc: TurnAction): Actor =
|
||||
var initialAssertions: OutboundTable
|
||||
result = newActor(name, new(ref Handle))
|
||||
proc bootActor*(name: string; bootProc: TurnAction): Actor {.discardable.} =
|
||||
## Boot a top-level actor.
|
||||
result = newActor(name, nil)
|
||||
new result.handleAllocator
|
||||
var turn = Turn(facet: result.root)
|
||||
assert not result.root.isNil
|
||||
turn.work.addLast((result.root, bootProc,))
|
||||
when tracing:
|
||||
new result.turnIdAllocator
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
|
||||
case path
|
||||
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
|
||||
of "-": result.traceStream = newFileStream(stderr)
|
||||
else: result.traceStream = openFileStream(path, fmWrite)
|
||||
run(result, bootProc, initialAssertions)
|
||||
turn.desc.id = nextTurnId()
|
||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||
turn.desc.cause.external.description = "bootActor".toPreserves
|
||||
turnQueue.addLast turn
|
||||
|
||||
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
||||
enqueue(turn, turn.facet) do (turn: var Turn):
|
||||
proc spawnActor*(turn: var Turn; name: string; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
let actor = newActor(name, turn.facet)
|
||||
queueEffect(turn, actor.root) do (turn: var Turn):
|
||||
var newOutBound: Table[Handle, OutboundAssertion]
|
||||
for key in initialAssertions:
|
||||
discard turn.facet.outbound.pop(key, newOutbound[key])
|
||||
when tracing:
|
||||
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
|
||||
actor.traceStream = turn.facet.actor.traceStream
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.spawn)
|
||||
act.spawn.id = actor.id.toPreserves
|
||||
turn.desc.actions.add act
|
||||
run(actor, bootProc, newOutBound)
|
||||
actor
|
||||
|
||||
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
spawnActor(turn, name, bootProc, initialAssertions)
|
||||
|
||||
type StopOnRetract = ref object of Entity
|
||||
|
||||
method retract*(e: StopOnRetract; turn: var Turn; h: Handle) =
|
||||
stop(turn)
|
||||
|
||||
proc halfLink(facet, other: Facet) =
|
||||
let h = facet.nextHandle()
|
||||
facet.outbound[h] = OutboundAssertion(
|
||||
handle: h,
|
||||
peer: Cap(relay: other, target: StopOnRetract(facet: facet)),
|
||||
established: true,
|
||||
)
|
||||
|
||||
proc linkActor*(turn: var Turn; name: string; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
result = spawnActor(turn, name, bootProc, initialAssertions)
|
||||
halfLink(turn.facet, result.root)
|
||||
halfLink(result.root, turn.facet)
|
||||
|
||||
var inertActor {.threadvar.}: Actor
|
||||
|
||||
proc newInertCap*(): Cap =
|
||||
let a = bootActor("inert") do (turn: var Turn): turn.stop()
|
||||
Cap(relay: a.root)
|
||||
if inertActor.isNil:
|
||||
inertActor = bootActor("inert") do (turn: var Turn): turn.stop()
|
||||
Cap(relay: inertActor.root)
|
||||
|
||||
proc atExit*(actor; action) = actor.exitHooks.add action
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception) =
|
||||
proc terminateActor(turn; reason: ref Exception) =
|
||||
let actor = turn.actor
|
||||
if not actor.exiting:
|
||||
actor.exiting = true
|
||||
actor.exitReason = reason
|
||||
|
@ -488,113 +619,83 @@ proc terminate(actor; turn; reason: ref Exception) =
|
|||
if not reason.isNil:
|
||||
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
|
||||
act.stop.status.error.message = reason.msg
|
||||
trace(actor, act)
|
||||
for hook in actor.exitHooks: hook(turn)
|
||||
traceActivation(actor, act)
|
||||
while actor.exitHooks.len > 0:
|
||||
var hook = actor.exitHooks.pop()
|
||||
try: hook(turn)
|
||||
except CatchableError as err:
|
||||
if reason.isNil:
|
||||
terminateActor(turn, err)
|
||||
return
|
||||
proc finish(turn: var Turn) =
|
||||
actor.root.terminate(turn, reason.isNil)
|
||||
assert not actor.root.isNil, actor.name
|
||||
terminateFacet(turn)
|
||||
actor.root = nil
|
||||
actor.exited = true
|
||||
callSoon do ():
|
||||
run(actor.root, finish, true)
|
||||
queueTurn(actor.root, finish)
|
||||
|
||||
proc terminate*(facet; e: ref Exception) =
|
||||
proc terminateFacet*(facet; e: ref Exception) =
|
||||
run(facet.actor.root) do (turn: var Turn):
|
||||
facet.actor.terminate(turn, e)
|
||||
terminateActor(turn, e)
|
||||
|
||||
proc asyncCheck*(facet: Facet; fut: FutureBase) =
|
||||
## Sets a callback on `fut` which propagates exceptions to `facet`.
|
||||
addCallback(fut) do ():
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
|
||||
proc asyncCheck*(turn; fut: FutureBase) =
|
||||
## Sets a callback on `fut` which propagates exceptions to the facet of `turn`.
|
||||
asyncCheck(turn.facet, fut)
|
||||
|
||||
template tryFacet(facet; body: untyped) =
|
||||
try: body
|
||||
except CatchableError as err: terminate(facet, err)
|
||||
|
||||
proc run*(facet; action: TurnAction; zombieTurn = false) =
|
||||
if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
|
||||
tryFacet(facet):
|
||||
var queues = newTable[Facet, seq[TurnAction]]()
|
||||
block:
|
||||
var turn = Turn(facet: facet, queues: queues)
|
||||
action(turn)
|
||||
when tracing:
|
||||
turn.desc.id = facet.nextTurnId.toPreserves
|
||||
facet.actor.trace ActorActivation(
|
||||
orKind: ActorActivationKind.turn, turn: turn.desc)
|
||||
for facet, queue in queues:
|
||||
for action in queue: run(facet, action)
|
||||
|
||||
proc run*(cap: Cap; action: TurnAction) =
|
||||
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
|
||||
run(cap.relay, action)
|
||||
|
||||
proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
|
||||
## Add a callback to a `Future` that will be called at a later `Turn`
|
||||
## within the context of `facet`.
|
||||
addCallback(fut) do ():
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
else:
|
||||
when tracing:
|
||||
run(facet) do (turn: var Turn):
|
||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||
turn.desc.cause.external.description = "Future".toPreserves
|
||||
act(turn)
|
||||
else:
|
||||
run(facet, act)
|
||||
|
||||
proc addCallback*(fut: FutureBase; turn: var Turn; act: TurnAction) =
|
||||
## Add a callback to a `Future` that will be called at a later `Turn`
|
||||
## with the same context as the current.
|
||||
if fut.failed:
|
||||
terminate(turn.facet, fut.error)
|
||||
elif fut.finished:
|
||||
enqueue(turn, turn.facet, act)
|
||||
else:
|
||||
addCallback(fut, turn.facet, act)
|
||||
|
||||
proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x: T) {.gcsafe.}) =
|
||||
addCallback(fut, turn) do (turn: var Turn):
|
||||
if fut.failed: terminate(turn.facet, fut.error)
|
||||
else:
|
||||
when tracing:
|
||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||
turn.desc.cause.external.description = "Future".toPreserves
|
||||
act(turn, read fut)
|
||||
proc terminate*(turn: var Turn; e: ref Exception) =
|
||||
terminateActor(turn, e)
|
||||
|
||||
proc stop*(turn: var Turn, facet: Facet) =
|
||||
if facet.parent.isNil:
|
||||
facet.terminate(turn, true)
|
||||
else:
|
||||
enqueue(turn, facet.parent) do (turn: var Turn):
|
||||
facet.terminate(turn, true)
|
||||
queueEffect(turn, facet) do (turn: var Turn):
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
act.facetstop.reason = FacetStopReason.explicitAction
|
||||
turn.desc.actions.add act
|
||||
terminateFacet(turn)
|
||||
|
||||
proc stop*(turn: var Turn) =
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc stop*(facet: Facet) =
|
||||
run(facet, stop)
|
||||
|
||||
proc onStop*(facet: Facet; act: TurnAction) =
|
||||
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
|
||||
add(facet.shutdownActions, act)
|
||||
|
||||
proc stopActor*(turn: var Turn) =
|
||||
let actor = turn.facet.actor
|
||||
enqueue(turn, actor.root) do (turn: var Turn):
|
||||
terminate(actor, turn, nil)
|
||||
proc onStop*(turn: var Turn; act: TurnAction) =
|
||||
onStop(turn.facet, act)
|
||||
|
||||
proc freshen*(turn: var Turn, act: TurnAction) =
|
||||
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
|
||||
proc isAlive(actor): bool =
|
||||
not(actor.exited or actor.exiting)
|
||||
|
||||
proc stop*(actor: Actor) =
|
||||
if actor.isAlive:
|
||||
queueTurn(actor.root) do (turn: var Turn):
|
||||
assert(not turn.facet.isNil)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = turn.facet.path
|
||||
act.facetstop.reason = FacetStopReason.actorStopping
|
||||
turn.desc.actions.add act
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc stopActor*(facet: Facet) =
|
||||
stop(facet.actor)
|
||||
|
||||
proc stopActor*(turn: var Turn) =
|
||||
stop(turn, turn.facet.actor.root)
|
||||
|
||||
proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} =
|
||||
run(turn.facet, act)
|
||||
|
||||
proc newCap*(relay: Facet; e: Entity): Cap =
|
||||
Cap(relay: relay, target: e)
|
||||
proc newCap*(relay: Facet; entity: Entity): Cap =
|
||||
## Create a new capability for `entity` via `relay`.
|
||||
# An Entity has an owning facet and a Cap does as well?
|
||||
if entity.facet.isNil: entity.facet = relay
|
||||
Cap(relay: relay, target: entity)
|
||||
|
||||
proc newCap*(turn; e: Entity): Cap =
|
||||
Cap(relay: turn.facet, target: e)
|
||||
|
||||
newCap(turn.facet, e)
|
||||
proc newCap*(e: Entity; turn): Cap =
|
||||
Cap(relay: turn.facet, target: e)
|
||||
newCap(turn.facet, e)
|
||||
|
||||
type SyncContinuation {.final.} = ref object of Entity
|
||||
action: TurnAction
|
||||
|
@ -609,3 +710,78 @@ proc running*(actor): bool =
|
|||
result = not actor.exited
|
||||
if not (result or actor.exitReason.isNil):
|
||||
raise actor.exitReason
|
||||
|
||||
proc run(turn: var Turn) =
|
||||
while turn.work.len > 0:
|
||||
var (facet, act) = turn.work.popFirst()
|
||||
assert not act.isNil
|
||||
turn.facet = facet
|
||||
act(turn)
|
||||
when tracing:
|
||||
var act = ActorActivation(orKind: ActorActivationKind.turn)
|
||||
act.turn = move turn.desc
|
||||
traceActivation(turn.facet.actor, act)
|
||||
# TODO: catch exceptions here
|
||||
for eff in turn.effects.mvalues:
|
||||
assert not eff.facet.isNil
|
||||
turnQueue.addLast(move eff)
|
||||
turn.facet = nil # invalidate the turn
|
||||
|
||||
proc runPendingTurns* =
|
||||
while turnQueue.len > 0:
|
||||
var turn = turnQueue.popFirst()
|
||||
# TODO: check if actor is still valid
|
||||
try: run(turn)
|
||||
except CatchableError as err:
|
||||
terminateActor(turn, err)
|
||||
raise err
|
||||
|
||||
proc run* =
|
||||
## Run actors to completion
|
||||
when defined(solo5):
|
||||
while turnQueue.len > 0 or solo5_dispatcher.runOnce():
|
||||
runPendingTurns()
|
||||
else:
|
||||
var ready: seq[Continuation]
|
||||
while true:
|
||||
runPendingTurns()
|
||||
ioqueue.poll(ready)
|
||||
if ready.len == 0: break
|
||||
while ready.len > 0:
|
||||
discard trampoline:
|
||||
ready.pop()
|
||||
|
||||
proc runActor*(name: string; bootProc: TurnAction) =
|
||||
## Boot an actor `Actor` and churn ioqueue.
|
||||
let actor = bootActor(name, bootProc)
|
||||
if not actor.exitReason.isNil:
|
||||
raise actor.exitReason
|
||||
when defined(solo5):
|
||||
runPendingTurns()
|
||||
while (actor.isAlive and solo5_dispatcher.runOnce()) or turnQueue.len > 0:
|
||||
runPendingTurns()
|
||||
else:
|
||||
actors.run()
|
||||
if not actor.exitReason.isNil:
|
||||
raise actor.exitReason
|
||||
|
||||
type FacetGuard* = object
|
||||
facet: Facet
|
||||
|
||||
proc initGuard*(f: Facet): FacetGuard =
|
||||
result.facet = f
|
||||
inc result.facet.inertCheckPreventers
|
||||
|
||||
proc disarm*(g: var FacetGuard) =
|
||||
if not g.facet.isNil:
|
||||
assert g.facet.inertCheckPreventers > 0
|
||||
dec g.facet.inertCheckPreventers
|
||||
g.facet = nil
|
||||
|
||||
proc `=destroy`*(g: FacetGuard) =
|
||||
if not g.facet.isNil:
|
||||
dec g.facet.inertCheckPreventers
|
||||
|
||||
proc `=copy`*(dst: var FacetGuard, src: FacetGuard) =
|
||||
dst.facet = src.facet
|
||||
inc dst.facet.inertCheckPreventers
|
||||
|
|
|
@ -1,35 +1,147 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, monotimes, times]
|
||||
import std/times
|
||||
import preserves
|
||||
import syndicate
|
||||
import ../../syndicate, ../bags, ../protocols/[timer, dataspace]
|
||||
|
||||
import ../protocols/timer
|
||||
from syndicate/protocols/dataspace import Observe
|
||||
when defined(solo5):
|
||||
import solo5_dispatcher
|
||||
else:
|
||||
import pkg/sys/[handles, ioqueue]
|
||||
|
||||
export timer
|
||||
|
||||
type Observe = dataspace.Observe
|
||||
type
|
||||
Observe = dataspace.Observe
|
||||
|
||||
proc now: float64 = getTime().toUnixFloat()
|
||||
when defined(solo5):
|
||||
import solo5, solo5_dispatcher
|
||||
|
||||
proc spawnTimers*(turn: var Turn; ds: Cap): Actor {.discardable.} =
|
||||
## Spawn a timer actor.
|
||||
spawn("timer", turn) do (turn: var Turn):
|
||||
proc wallFloat: float =
|
||||
solo5_clock_wall().float / 1_000_000_000.0
|
||||
|
||||
during(turn, ds, inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})) do (seconds: float):
|
||||
let period = seconds - now()
|
||||
if period < 0.001:
|
||||
discard publish(turn, ds, LaterThan(seconds: seconds))
|
||||
else:
|
||||
addCallback(sleepAsync(period * 1_000), turn) do (turn: var Turn):
|
||||
discard publish(turn, ds, LaterThan(seconds: seconds))
|
||||
type
|
||||
TimerDriver = ref object
|
||||
facet: Facet
|
||||
## Owning facet of driver.
|
||||
target: Cap
|
||||
## Destination for LaterThan assertions.
|
||||
deadlines: Bag[float]
|
||||
## Deadlines that other actors are observing.
|
||||
|
||||
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
|
||||
TimerDriver(facet: facet, target: cap)
|
||||
|
||||
proc await(driver: TimerDriver; deadline: float) {.solo5dispatch.} =
|
||||
yieldUntil(deadline)
|
||||
if deadline in driver.deadlines:
|
||||
# check if the deadline is still observed
|
||||
proc turnWork(turn: var Turn) =
|
||||
discard publish(turn, driver.target, LaterThan(seconds: deadline))
|
||||
run(driver.facet, turnWork)
|
||||
|
||||
else:
|
||||
import std/[oserrors, posix, sets]
|
||||
type Time = posix.Time
|
||||
|
||||
{.pragma: timerfd, importc, header: "<sys/timerfd.h>".}
|
||||
|
||||
proc timerfd_create(clock_id: ClockId, flags: cint): cint {.timerfd.}
|
||||
proc timerfd_settime(ufd: cint, flags: cint,
|
||||
utmr: var Itimerspec, otmr: var Itimerspec): cint {.timerfd.}
|
||||
proc timerfd_gettime(ufd: cint, curr: var Itimerspec): cint {.timerfd.}
|
||||
|
||||
var
|
||||
TFD_NONBLOCK {.timerfd.}: cint
|
||||
TFD_CLOEXEC {.timerfd.}: cint
|
||||
TFD_TIMER_ABSTIME {.timerfd.}: cint
|
||||
|
||||
proc `<`(a, b: Timespec): bool =
|
||||
a.tv_sec.clong < b.tv_sec.clong or
|
||||
(a.tv_sec.clong == b.tv_sec.clong and a.tv_nsec < b.tv_nsec)
|
||||
|
||||
proc `+`(a, b: Timespec): Timespec =
|
||||
result.tv_sec = Time a.tv_sec.clong + b.tv_sec.clong
|
||||
result.tv_nsec = a.tv_nsec + b.tv_nsec
|
||||
|
||||
func toFloat(ts: Timespec): float =
|
||||
ts.tv_sec.float + ts.tv_nsec.float / 1_000_000_000
|
||||
|
||||
func toTimespec(f: float): Timespec =
|
||||
result.tv_sec = Time(f)
|
||||
result.tv_nsec = clong(uint64(f * 1_000_000_000) mod 1_000_000_000)
|
||||
|
||||
proc wallFloat: float =
|
||||
var ts: Timespec
|
||||
if clock_gettime(CLOCK_REALTIME, ts) < 0:
|
||||
raiseOSError(osLastError(), "clock_gettime")
|
||||
ts.toFloat
|
||||
|
||||
type
|
||||
TimerDriver = ref object
|
||||
facet: Facet
|
||||
## Owning facet of driver.
|
||||
target: Cap
|
||||
## Destination for LaterThan assertions.
|
||||
deadlines: Bag[float]
|
||||
## Deadlines that other actors are observing.
|
||||
timers: HashSet[cint]
|
||||
# TODO: use a single timer descriptor
|
||||
|
||||
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
|
||||
let driver = TimerDriver(facet: facet, target: cap)
|
||||
facet.onStop do (turn: var Turn):
|
||||
for fd in driver.timers:
|
||||
unregister(FD fd)
|
||||
discard close(fd)
|
||||
driver
|
||||
|
||||
proc earliestFloat(driver: TimerDriver): float =
|
||||
assert driver.deadlines.len > 0
|
||||
result = high float
|
||||
for deadline in driver.deadlines:
|
||||
if deadline < result:
|
||||
result = deadline
|
||||
|
||||
proc await(driver: TimerDriver; deadline: float) {.asyncio.} =
|
||||
## Run timer driver concurrently with actor.
|
||||
let fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC)
|
||||
if fd < 0:
|
||||
raiseOSError(osLastError(), "failed to acquire timer descriptor")
|
||||
var
|
||||
old: Itimerspec
|
||||
its = Itimerspec(it_value: deadline.toTimespec)
|
||||
if timerfd_settime(fd, TFD_TIMER_ABSTIME, its, old) < 0:
|
||||
raiseOSError(osLastError(), "failed to set timeout")
|
||||
driver.timers.incl(fd)
|
||||
while wallFloat() < deadline:
|
||||
# Check if the timer is expired which
|
||||
# could happen before waiting.
|
||||
wait(FD fd, Read)
|
||||
if deadline in driver.deadlines:
|
||||
# Check if the deadline is still observed.
|
||||
proc turnWork(turn: var Turn) =
|
||||
discard publish(turn, driver.target, LaterThan(seconds: deadline))
|
||||
run(driver.facet, turnWork)
|
||||
discard close(fd)
|
||||
driver.timers.excl(fd)
|
||||
|
||||
proc spawnTimerActor*(turn: var Turn; ds: Cap): Actor {.discardable.} =
|
||||
## Spawn a timer actor that responds to
|
||||
## dataspace observations of timeouts on `ds`.
|
||||
linkActor(turn, "timers") do (turn: var Turn):
|
||||
let driver = spawnTimerDriver(turn.facet, ds)
|
||||
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
|
||||
during(turn, ds, pat) do (deadline: float):
|
||||
if change(driver.deadlines, deadline, +1) == cdAbsentToPresent:
|
||||
discard trampoline(whelp await(driver, deadline))
|
||||
do:
|
||||
discard change(driver.deadlines, deadline, -1, clamp = true)
|
||||
# TODO: retract assertions that are unobserved.
|
||||
|
||||
proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
||||
## Execute `act` after some duration of time.
|
||||
let later = now() + dur.inMilliseconds.float64 * 1_000.0
|
||||
var later = wallFloat() + dur.inMilliseconds.float / 1_000.0
|
||||
onPublish(turn, ds, grab LaterThan(seconds: later)):
|
||||
act(turn)
|
||||
|
||||
# TODO: periodic timer
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
## An unordered association of items to counts.
|
||||
## An item count may be negative, unlike CountTable.
|
||||
|
||||
import tables
|
||||
import std/[assertions, tables]
|
||||
|
||||
type
|
||||
ChangeDescription* = enum
|
||||
|
@ -46,3 +46,5 @@ proc `$`*(bag: Bag): string =
|
|||
if result.len > 1: result.add ' '
|
||||
result.add $x
|
||||
result.add '}'
|
||||
|
||||
export tables.contains, tables.del, tables.len
|
||||
|
|
|
@ -51,7 +51,7 @@ iterator observersOf[Sid, Oid](g: Graph[Sid, Oid]; oid: Oid): Sid =
|
|||
if g.edgesForward.hasKey(oid):
|
||||
for sid in g.edgesForward[oid]: yield sid
|
||||
|
||||
proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.gcsafe.}) =
|
||||
proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.closure.}) =
|
||||
var repairedThisRound: Set[Oid]
|
||||
while true:
|
||||
var workSet = move g.damagedNodes
|
||||
|
|
|
@ -16,14 +16,14 @@ type
|
|||
index: Index
|
||||
handleMap: Table[Handle, Assertion]
|
||||
|
||||
method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
||||
method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||
if add(ds.index, turn, a.value):
|
||||
var obs = a.value.preservesTo(Observe)
|
||||
if obs.isSome and obs.get.observer of Cap:
|
||||
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||
ds.handleMap[h] = a.value
|
||||
|
||||
method retract(ds: Dataspace; turn: var Turn; h: Handle) {.gcsafe.} =
|
||||
method retract(ds: Dataspace; turn: var Turn; h: Handle) =
|
||||
let v = ds.handleMap[h]
|
||||
if remove(ds.index, turn, v):
|
||||
ds.handleMap.del h
|
||||
|
@ -31,18 +31,18 @@ method retract(ds: Dataspace; turn: var Turn; h: Handle) {.gcsafe.} =
|
|||
if obs.isSome and obs.get.observer of Cap:
|
||||
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||
|
||||
method message(ds: Dataspace; turn: var Turn; a: AssertionRef) {.gcsafe.} =
|
||||
method message(ds: Dataspace; turn: var Turn; a: AssertionRef) =
|
||||
ds.index.deliverMessage(turn, a.value)
|
||||
|
||||
proc newDataspace*(turn: var Turn): Cap =
|
||||
newCap(turn, Dataspace(index: initIndex()))
|
||||
|
||||
type BootProc = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.gcsafe.}
|
||||
type BootProc = proc (turn: var Turn; ds: Cap) {.closure.}
|
||||
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
|
||||
|
||||
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
||||
bootActor(name) do (turn: var Turn):
|
||||
discard turn.facet.preventInertCheck()
|
||||
turn.preventInertCheck()
|
||||
bootProc(turn, newDataspace(turn))
|
||||
|
||||
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
||||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, tables]
|
||||
|
@ -6,7 +6,7 @@ import preserves
|
|||
import ./actors, ./patterns, ./protocols/dataspace
|
||||
|
||||
type
|
||||
DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction {.gcsafe.}
|
||||
DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction
|
||||
DuringActionKind = enum null, dead, act
|
||||
DuringAction = object
|
||||
case kind: DuringActionKind
|
||||
|
@ -18,17 +18,18 @@ type
|
|||
assertionMap: Table[Handle, DuringAction]
|
||||
|
||||
method publish(de: DuringEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||
let action = de.cb(turn, a.value, h)
|
||||
# assert(not action.isNil "should have put in a no-op action")
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: act, action: action)
|
||||
of dead:
|
||||
de.assertionMap.del h
|
||||
freshen(turn, action)
|
||||
of act:
|
||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||
discard inFacet(turn) do (turn: var Turn):
|
||||
let action = de.cb(turn, a.value, h)
|
||||
# assert(not action.isNil "should have put in a no-op action")
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: act, action: action)
|
||||
of dead:
|
||||
de.assertionMap.del h
|
||||
action(turn)
|
||||
of act:
|
||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||
|
||||
method retract(de: DuringEntity; turn: var Turn; h: Handle) =
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import std/[hashes, tables]
|
||||
|
||||
from ./actors import Cap, hash
|
||||
import ./actors
|
||||
from ./protocols/sturdy import Oid
|
||||
|
||||
proc hash(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[algorithm, options, sequtils, tables, typetraits]
|
||||
import std/[algorithm, assertions, options, sequtils, tables, typetraits]
|
||||
|
||||
import preserves
|
||||
import ./protocols/dataspacePatterns
|
||||
|
@ -313,7 +313,7 @@ proc grabDictionary*(bindings: sink openArray[(string, Pattern)]): Pattern =
|
|||
for (key, val) in bindings.items:
|
||||
result.dcompound.dict.entries[key.toSymbol] = val
|
||||
|
||||
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value {.gcsafe.}
|
||||
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value
|
||||
|
||||
proc depattern(pat: Pattern; values: var seq[Value]; index: var int): Value =
|
||||
case pat.orKind
|
||||
|
@ -328,7 +328,7 @@ proc depattern(pat: Pattern; values: var seq[Value]; index: var int): Value =
|
|||
of PatternKind.DCompound:
|
||||
result = depattern(pat.dcompound, values, index)
|
||||
|
||||
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value {.gcsafe.} =
|
||||
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value =
|
||||
case comp.orKind
|
||||
of DCompoundKind.rec:
|
||||
result = initRecord(comp.rec.label, comp.rec.fields.len)
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, options, tables]
|
||||
from std/os import getEnv, `/`
|
||||
import std/[options, tables]
|
||||
import preserves
|
||||
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
||||
import ../syndicate, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
||||
|
||||
when defined(posix):
|
||||
import ./capabilities
|
||||
from std/os import getEnv, `/`
|
||||
|
||||
when defined(traceSyndicate):
|
||||
when defined(posix):
|
||||
|
@ -16,29 +19,28 @@ else:
|
|||
|
||||
export `$`
|
||||
|
||||
type
|
||||
Oid = sturdy.Oid
|
||||
|
||||
export Stdio, Tcp, WebSocket, Unix
|
||||
export Route, Stdio, Tcp, WebSocket, Unix
|
||||
|
||||
type
|
||||
Assertion = Value
|
||||
WireRef = sturdy.WireRef
|
||||
Turn = syndicate.Turn
|
||||
Event = protocol.Event
|
||||
Handle = actors.Handle
|
||||
Oid = sturdy.Oid
|
||||
Turn = syndicate.Turn
|
||||
WireRef = sturdy.WireRef
|
||||
|
||||
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure, gcsafe.}
|
||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure, gcsafe.}
|
||||
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
|
||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
|
||||
|
||||
Relay* = ref object
|
||||
facet: Facet
|
||||
inboundAssertions: Table[Handle,
|
||||
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
|
||||
outboundAssertions: Table[Handle, seq[WireSymbol]]
|
||||
pendingTurn: protocol.Turn
|
||||
exported: Membrane
|
||||
imported: Membrane
|
||||
nextLocalOid: Oid
|
||||
pendingTurn: protocol.Turn
|
||||
wireBuf: BufferedDecoder
|
||||
packetWriter: PacketWriter
|
||||
peer: Cap
|
||||
|
@ -90,7 +92,7 @@ proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireR
|
|||
mine: WireRefMine(oid: ws.oid))
|
||||
|
||||
proc rewriteOut(relay: Relay; v: Assertion):
|
||||
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
|
||||
tuple[rewritten: Value, exported: seq[WireSymbol]] =
|
||||
var exported: seq[WireSymbol]
|
||||
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
||||
let o = pr.unembed(Cap); if o.isSome:
|
||||
|
@ -108,41 +110,39 @@ proc deregister(relay: Relay; h: Handle) =
|
|||
for e in outbound: releaseCapOut(relay, e)
|
||||
|
||||
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||
if relay.pendingTurn.len == 0:
|
||||
# If the pending queue is empty then schedule a packet
|
||||
# to be sent after pending I/O is processed.
|
||||
callSoon do ():
|
||||
relay.facet.run do (turn: var Turn):
|
||||
var pkt = Packet(
|
||||
orKind: PacketKind.Turn,
|
||||
turn: move relay.pendingTurn)
|
||||
trace "C: ", pkt
|
||||
relay.packetWriter(turn, encode pkt)
|
||||
# TODO: don't send right away.
|
||||
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||
queueEffect(turn, relay.facet) do (turn: var Turn):
|
||||
if relay.pendingTurn.len > 0:
|
||||
var pkt = Packet(
|
||||
orKind: PacketKind.Turn,
|
||||
turn: move relay.pendingTurn)
|
||||
trace "C: ", pkt
|
||||
relay.packetWriter(turn, encode pkt)
|
||||
|
||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||
send(re.relay, turn, protocol.Oid re.oid, ev)
|
||||
|
||||
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
||||
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) =
|
||||
re.send(t, Event(
|
||||
orKind: EventKind.Assert,
|
||||
`assert`: protocol.Assert(
|
||||
assertion: re.relay.register(a.value, h).rewritten,
|
||||
handle: h)))
|
||||
|
||||
method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} =
|
||||
method retract(re: RelayEntity; t: var Turn; h: Handle) =
|
||||
re.relay.deregister h
|
||||
re.send(t, Event(
|
||||
orKind: EventKind.Retract,
|
||||
retract: Retract(handle: h)))
|
||||
|
||||
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} =
|
||||
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) =
|
||||
var (value, exported) = rewriteOut(re.relay, msg.value)
|
||||
assert(len(exported) == 0, "cannot send a reference in a message")
|
||||
if len(exported) == 0:
|
||||
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
|
||||
|
||||
method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} =
|
||||
method sync(re: RelayEntity; turn: var Turn; peer: Cap) =
|
||||
var
|
||||
peerEntity = newSyncPeerEntity(re.relay, peer)
|
||||
exported: seq[WireSymbol]
|
||||
|
@ -161,11 +161,8 @@ using
|
|||
|
||||
proc lookupLocal(relay; oid: Oid): Cap =
|
||||
let sym = relay.exported.grab oid
|
||||
if sym.isNil: newInertCap()
|
||||
else: sym.cap
|
||||
|
||||
proc isInert(r: Cap): bool =
|
||||
r.target.isNil
|
||||
if not sym.isNil:
|
||||
result = sym.cap
|
||||
|
||||
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
|
||||
case n.orKind
|
||||
|
@ -180,12 +177,14 @@ proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap
|
|||
imported.add e
|
||||
result = e.cap
|
||||
of WireRefKind.yours:
|
||||
let r = relay.lookupLocal(n.yours.oid)
|
||||
if n.yours.attenuation.len == 0 or r.isInert: result = r
|
||||
else: raiseAssert "attenuation not implemented"
|
||||
result = relay.lookupLocal(n.yours.oid)
|
||||
if result.isNil:
|
||||
result = newInertCap()
|
||||
elif n.yours.attenuation.len > 0:
|
||||
result = attenuate(result, n.yours.attenuation)
|
||||
|
||||
proc rewriteIn(relay; facet; v: Value):
|
||||
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
|
||||
tuple[rewritten: Assertion; imported: seq[WireSymbol]] =
|
||||
var imported: seq[WireSymbol]
|
||||
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
|
||||
let wr = pr.preservesTo WireRef; if wr.isSome:
|
||||
|
@ -196,7 +195,7 @@ proc rewriteIn(relay; facet; v: Value):
|
|||
|
||||
proc close(r: Relay) = discard
|
||||
|
||||
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
||||
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
|
||||
case event.orKind
|
||||
of EventKind.Assert:
|
||||
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
||||
|
@ -215,16 +214,15 @@ proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
|||
turn.message(cap, a)
|
||||
|
||||
of EventKind.Sync:
|
||||
discard # TODO
|
||||
#[
|
||||
var imported: seq[WireSymbol]
|
||||
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
|
||||
turn.sync(cap) do (turn: var Turn):
|
||||
turn.message(k, true)
|
||||
for e in imported: relay.imported.del e
|
||||
]#
|
||||
var
|
||||
(v, imported) = rewriteIn(relay, turn.facet, event.sync.peer)
|
||||
peer = unembed(v, Cap)
|
||||
if peer.isSome:
|
||||
turn.message(get peer, true)
|
||||
for e in imported: relay.imported.drop e
|
||||
|
||||
proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
||||
proc dispatch(relay: Relay; v: Value) =
|
||||
trace "S: ", v
|
||||
run(relay.facet) do (t: var Turn):
|
||||
var pkt: Packet
|
||||
|
@ -234,10 +232,8 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
|||
# https://synit.org/book/protocol.html#turn-packets
|
||||
for te in pkt.turn:
|
||||
let r = lookupLocal(relay, te.oid.Oid)
|
||||
if not r.isInert:
|
||||
if not r.isNil:
|
||||
dispatch(relay, t, r, te.event)
|
||||
else:
|
||||
stderr.writeLine("discarding event for unknown Cap; ", te.event)
|
||||
of PacketKind.Error:
|
||||
# https://synit.org/book/protocol.html#error-packets
|
||||
when defined(posix):
|
||||
|
@ -250,7 +246,12 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
|||
when defined(posix):
|
||||
stderr.writeLine("discarding undecoded packet ", v)
|
||||
|
||||
proc recv(relay: Relay; buf: seq[byte]) =
|
||||
proc recv(relay: Relay; buf: openarray[byte]; slice: Slice[int]) =
|
||||
feed(relay.wireBuf, buf, slice)
|
||||
var pr = decode(relay.wireBuf)
|
||||
if pr.isSome: dispatch(relay, pr.get)
|
||||
|
||||
proc recv(relay: Relay; buf: openarray[byte]) =
|
||||
feed(relay.wireBuf, buf)
|
||||
var pr = decode(relay.wireBuf)
|
||||
if pr.isSome: dispatch(relay, pr.get)
|
||||
|
@ -265,13 +266,13 @@ type
|
|||
nextLocalOid*: Option[Oid]
|
||||
|
||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
spawn(name, turn) do (turn: var Turn):
|
||||
linkActor(turn, name) do (turn: var Turn):
|
||||
turn.preventInertCheck()
|
||||
let relay = Relay(
|
||||
facet: turn.facet,
|
||||
packetWriter: opts.packetWriter,
|
||||
wireBuf: newBufferedDecoder(0),
|
||||
)
|
||||
discard relay.facet.preventInertCheck()
|
||||
if not opts.initialCap.isNil:
|
||||
var exported: seq[WireSymbol]
|
||||
discard rewriteCapOut(relay, opts.initialCap, exported)
|
||||
|
@ -298,21 +299,39 @@ proc accepted(cap: Cap): Resolved =
|
|||
result = Resolved(orKind: ResolvedKind.accepted)
|
||||
result.accepted.responderSession = cap
|
||||
|
||||
type ShutdownEntity = ref object of Entity
|
||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||
stopActor(e.facet)
|
||||
|
||||
when defined(posix):
|
||||
|
||||
import std/asyncfile
|
||||
export Unix
|
||||
import std/[oserrors, posix]
|
||||
import pkg/sys/[files, handles, ioqueue, sockets]
|
||||
export transportAddress.Unix
|
||||
|
||||
type StdioControlEntity = ref object of Entity
|
||||
type StdioEntity = ref object of Entity
|
||||
relay: Relay
|
||||
stdin: AsyncFile
|
||||
alive: bool
|
||||
|
||||
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||
method message(entity: StdioEntity; turn: var Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
close(entity.stdin)
|
||||
close(stdout)
|
||||
entity.alive = false
|
||||
|
||||
proc loop(entity: StdioEntity) {.asyncio.} =
|
||||
let buf = new seq[byte]
|
||||
entity.alive = true
|
||||
while entity.alive:
|
||||
buf[].setLen(0x1000)
|
||||
let n = read(entity.stdin, buf)
|
||||
if n == 0:
|
||||
stopActor(entity.facet)
|
||||
else:
|
||||
entity.relay.recv(buf[], 0..<n)
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||
## Connect to an external dataspace over stdio.
|
||||
let localDataspace = newDataspace(turn)
|
||||
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
|
||||
## Blocking write to stdout.
|
||||
let n = writeBytes(stdout, buf, 0, buf.len)
|
||||
|
@ -321,107 +340,175 @@ when defined(posix):
|
|||
stopActor(turn)
|
||||
var opts = RelayActorOptions(
|
||||
packetWriter: stdoutWriter,
|
||||
initialCap: ds,
|
||||
initialCap: localDataspace,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||
let
|
||||
facet = turn.facet
|
||||
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
||||
fd = stdin.getOsFileHandle()
|
||||
flags = fcntl(fd.cint, F_GETFL, 0)
|
||||
if flags < 0: raiseOSError(osLastError())
|
||||
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
|
||||
raiseOSError(osLastError())
|
||||
let entity = StdioEntity(
|
||||
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
|
||||
onStop(entity.facet) do (turn: var Turn):
|
||||
entity.alive = false
|
||||
close(entity.stdin)
|
||||
# Close stdin to remove it from the ioqueue
|
||||
discard trampoline:
|
||||
whelp loop(entity)
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
control: StdioControlEntity(stdin: asyncStdin).newCap(turn),
|
||||
resolved: relay.peer.accepted,
|
||||
control: newCap(entity, turn),
|
||||
resolved: localDataspace.accepted,
|
||||
))
|
||||
const stdinReadSize = 0x2000
|
||||
proc readCb(pktFut: Future[string]) {.gcsafe.} =
|
||||
if not pktFut.failed:
|
||||
var buf = pktFut.read
|
||||
if buf.len == 0:
|
||||
run(facet) do (turn: var Turn): stopActor(turn)
|
||||
else:
|
||||
relay.recv(cast[seq[byte]](buf))
|
||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||
|
||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||
## Connect to an external dataspace over stdin and stdout.
|
||||
connectTransport(turn, ds, transportAddress.Stdio())
|
||||
|
||||
import std/asyncnet
|
||||
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
||||
type
|
||||
TcpEntity = ref object of Entity
|
||||
relay: Relay
|
||||
sock: AsyncConn[sockets.Protocol.TCP]
|
||||
alive: bool
|
||||
|
||||
type SocketControlEntity = ref object of Entity
|
||||
socket: AsyncSocket
|
||||
UnixEntity = ref object of Entity
|
||||
relay: Relay
|
||||
sock: AsyncConn[sockets.Protocol.Unix]
|
||||
alive: bool
|
||||
|
||||
method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||
SocketEntity = TcpEntity | UnixEntity
|
||||
|
||||
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
close(entity.socket)
|
||||
entity.alive = false
|
||||
|
||||
type ShutdownEntity* = ref object of Entity
|
||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||
stopActor(turn)
|
||||
template bootSocketEntity() {.dirty.} =
|
||||
proc setup(turn: var Turn) {.closure.} =
|
||||
proc kill(turn: var Turn) =
|
||||
if entity.alive:
|
||||
entity.alive = false
|
||||
close(entity.sock)
|
||||
onStop(turn, kill)
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
control: newCap(entity, turn),
|
||||
resolved: entity.relay.peer.accepted,
|
||||
))
|
||||
run(entity.relay.facet, setup)
|
||||
let buf = new seq[byte]
|
||||
entity.alive = true
|
||||
while entity.alive:
|
||||
buf[].setLen(0x1000)
|
||||
let n = read(entity.sock, buf)
|
||||
if n < 0: raiseOSError(osLastError())
|
||||
elif n == 0:
|
||||
stopActor(entity.facet)
|
||||
else:
|
||||
entity.relay.recv(buf[], 0..<n)
|
||||
# the socket closes when the actor is stopped
|
||||
|
||||
proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) =
|
||||
proc socketWriter(turn: var Turn; buf: seq[byte]) =
|
||||
asyncCheck(turn, socket.send(cast[string](buf)))
|
||||
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
|
||||
entity.sock = connectTcpAsync(ta.host, Port ta.port)
|
||||
bootSocketEntity()
|
||||
|
||||
proc boot(entity: UnixEntity; ta: transportAddress.Unix; ds: Cap) {.asyncio.} =
|
||||
entity.sock = connectUnixAsync(ta.path)
|
||||
bootSocketEntity()
|
||||
|
||||
template spawnSocketRelay() {.dirty.} =
|
||||
proc writeConn(turn: var Turn; buf: seq[byte]) =
|
||||
discard trampoline:
|
||||
whelp write(entity.sock, buf)
|
||||
var ops = RelayActorOptions(
|
||||
packetWriter: socketWriter,
|
||||
packetWriter: writeConn,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||
let facet = turn.facet
|
||||
facet.actor.atExit do (turn: var Turn): close(socket)
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: transAddr,
|
||||
control: SocketControlEntity(socket: socket).newCap(turn),
|
||||
resolved: relay.peer.accepted,
|
||||
))
|
||||
const recvSize = 0x4000
|
||||
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
||||
if pktFut.failed or pktFut.read.len == 0:
|
||||
run(facet) do (turn: var Turn): stopActor(turn)
|
||||
else:
|
||||
relay.recv(cast[seq[byte]](pktFut.read))
|
||||
if not socket.isClosed:
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
|
||||
proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) =
|
||||
let facet = turn.facet
|
||||
fut.addCallback do ():
|
||||
run(facet) do (turn: var Turn):
|
||||
if fut.failed:
|
||||
var ass = TransportConnection(
|
||||
`addr`: ta,
|
||||
resolved: Resolved(orKind: ResolvedKind.Rejected),
|
||||
)
|
||||
ass.resolved.rejected.detail = embed fut.error
|
||||
publish(turn, ds, ass)
|
||||
else:
|
||||
connect(turn, ds, ta, socket)
|
||||
entity.facet = turn.facet
|
||||
entity.relay = relay
|
||||
discard trampoline:
|
||||
whelp boot(entity, ta, ds)
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||
let
|
||||
facet = turn.facet
|
||||
socket = newAsyncSocket(
|
||||
domain = AF_INET,
|
||||
sockType = SOCK_STREAM,
|
||||
protocol = IPPROTO_TCP,
|
||||
buffered = false,
|
||||
)
|
||||
connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port))
|
||||
let entity = TcpEntity()
|
||||
spawnSocketRelay()
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
||||
## Relay a dataspace over a UNIX socket.
|
||||
let socket = newAsyncSocket(
|
||||
domain = AF_UNIX,
|
||||
sockType = SOCK_STREAM,
|
||||
protocol = cast[Protocol](0),
|
||||
buffered = false)
|
||||
connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path))
|
||||
let entity = UnixEntity()
|
||||
spawnSocketRelay()
|
||||
|
||||
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} =
|
||||
elif defined(solo5):
|
||||
|
||||
import solo5_dispatcher
|
||||
import taps
|
||||
|
||||
type
|
||||
TcpEntity = ref object of Entity
|
||||
relay: Relay
|
||||
conn: Connection
|
||||
decoder: BufferedDecoder
|
||||
|
||||
method message(entity: TcpEntity; turn: var Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
entity.conn.abort()
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||
let entity = TcpEntity(facet: turn.facet)
|
||||
|
||||
proc writeConn(turn: var Turn; buf: seq[byte]) =
|
||||
assert not entity.conn.isNil
|
||||
entity.conn.batch:
|
||||
entity.conn.send(buf)
|
||||
var ops = RelayActorOptions(
|
||||
packetWriter: writeConn,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||
entity.facet = turn.facet
|
||||
entity.relay = relay
|
||||
|
||||
var ep = newRemoteEndpoint()
|
||||
if ta.host.isIpAddress:
|
||||
ep.with ta.host.parseIpAddress
|
||||
else:
|
||||
ep.withHostname ta.host
|
||||
ep.with ta.port.Port
|
||||
|
||||
var tp = newTransportProperties()
|
||||
tp.require "reliability"
|
||||
tp.ignore "congestion-control"
|
||||
tp.ignore "preserve-order"
|
||||
|
||||
var preconn = newPreconnection(
|
||||
remote=[ep], transport=tp.some)
|
||||
entity.conn = preconn.initiate()
|
||||
entity.facet.onStop do (turn: var Turn):
|
||||
entity.conn.close()
|
||||
entity.conn.onConnectionError do (err: ref Exception):
|
||||
run(entity.facet) do (turn: var Turn):
|
||||
terminate(turn, err)
|
||||
entity.conn.onClosed():
|
||||
stop(entity.facet)
|
||||
entity.conn.onReceivedPartial do (data: seq[byte]; ctx: MessageContext; eom: bool):
|
||||
entity.relay.recv(data)
|
||||
if eom:
|
||||
stop(entity.facet)
|
||||
else:
|
||||
entity.conn.receive()
|
||||
entity.conn.onReady do ():
|
||||
entity.facet.run do (turn: var Turn):
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
control: newCap(entity, turn),
|
||||
resolved: entity.relay.peer.accepted,
|
||||
))
|
||||
entity.conn.receive()
|
||||
|
||||
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
||||
if stepOff < route.pathSteps.len:
|
||||
let
|
||||
step = route.pathSteps[stepOff]
|
||||
|
@ -462,74 +549,105 @@ proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
|||
onPublish(turn, ds, acceptPat) do (origin: Cap):
|
||||
walk(turn, ds, origin, route, transOff, 0)
|
||||
|
||||
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.gcsafe.}
|
||||
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
|
||||
|
||||
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||
spawn($stepType & "-step", turn) do (turn: var Turn):
|
||||
let stepPat = grabRecord(stepType, grab())
|
||||
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
||||
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||
let step = toRecord(stepType, stepDetail.value)
|
||||
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
|
||||
var res = ass.preservesTo Resolved
|
||||
if res.isSome:
|
||||
if res.get.orKind == ResolvedKind.accepted and
|
||||
res.get.accepted.responderSession of Cap:
|
||||
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
|
||||
else:
|
||||
publish(turn, ds, ResolvedPathStep(
|
||||
origin: origin, pathStep: step, resolved: res.get))
|
||||
proc action(turn: var Turn) =
|
||||
stop(turn)
|
||||
result = action
|
||||
publish(turn, origin, Resolve(
|
||||
step: step, observer: newCap(turn, during(duringCallback))))
|
||||
let stepPat = grabRecord(stepType, grab())
|
||||
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
||||
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||
let step = toRecord(stepType, stepDetail.value)
|
||||
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
|
||||
var res = ass.preservesTo Resolved
|
||||
if res.isSome:
|
||||
if res.get.orKind == ResolvedKind.accepted and
|
||||
res.get.accepted.responderSession of Cap:
|
||||
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
|
||||
else:
|
||||
publish(turn, ds, ResolvedPathStep(
|
||||
origin: origin, pathStep: step, resolved: res.get))
|
||||
proc action(turn: var Turn) =
|
||||
stop(turn)
|
||||
result = action
|
||||
publish(turn, origin, Resolve(
|
||||
step: step, observer: newCap(turn, during(duringCallback))))
|
||||
|
||||
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||
## Spawn actors that manage routes and appeasing gatekeepers.
|
||||
spawn("transport-connector", turn) do (turn: var Turn):
|
||||
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||
# Use a generic pattern and type matching
|
||||
# in the during handler because it is easy.
|
||||
let transPat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||
# Use a generic pattern and type matching
|
||||
# in the during handler because it is easy.
|
||||
|
||||
when defined(posix):
|
||||
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
|
||||
during(turn, ds, stdioPat) do:
|
||||
connectTransport(turn, ds, Stdio())
|
||||
|
||||
# TODO: tcp pattern
|
||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
|
||||
connectTransport(turn, ds, ta.value)
|
||||
|
||||
# TODO: unix pattern
|
||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
|
||||
connectTransport(turn, ds, ta.value)
|
||||
during(turn, ds, transPat) do (ta: Literal[transportAddress.Unix]):
|
||||
try: connectTransport(turn, ds, ta.value)
|
||||
except exceptions.IOError as e:
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserve,
|
||||
resolved: rejected(embed e),
|
||||
))
|
||||
|
||||
spawn("path-resolver", turn) do (turn: var Turn):
|
||||
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
||||
during(turn, ds, pat) do (route: Literal[Route]):
|
||||
for i, transAddr in route.value.transports:
|
||||
connectRoute(turn, ds, route.value, i)
|
||||
# TODO: tcp pattern
|
||||
during(turn, ds, transPat) do (ta: Literal[transportAddress.Tcp]):
|
||||
try: connectTransport(turn, ds, ta.value)
|
||||
except exceptions.IOError as e:
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserve,
|
||||
resolved: rejected(embed e),
|
||||
))
|
||||
|
||||
let resolvePat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
||||
during(turn, ds, resolvePat) do (route: Literal[Route]):
|
||||
for i, transAddr in route.value.transports:
|
||||
connectRoute(turn, ds, route.value, i)
|
||||
|
||||
spawnStepResolver(turn, ds, "ref".toSymbol) do (
|
||||
turn: var Turn, step: Value, origin: Cap, next: Cap):
|
||||
publish(turn, ds, ResolvedPathStep(
|
||||
origin: origin, pathStep: step, resolved: next.accepted))
|
||||
|
||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||
|
||||
proc envRoute*: Route =
|
||||
var text = getEnv("SYNDICATE_ROUTE")
|
||||
if text == "":
|
||||
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
|
||||
result.transports = @[initRecord("unix", tx)]
|
||||
result.pathSteps = @[capabilities.mint().toPreserves]
|
||||
else:
|
||||
var pr = parsePreserves(text)
|
||||
if not result.fromPreserves(pr):
|
||||
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.closure.}
|
||||
|
||||
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||
## Resolve `route` within `ds` and call `bootProc` with resolved capabilities.
|
||||
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
|
||||
bootProc(turn, dst)
|
||||
|
||||
when defined(posix):
|
||||
const defaultRoute* = "<route [<stdio>]>"
|
||||
|
||||
proc envRoute*: Route =
|
||||
## Get an route to a Syndicate capability from the calling environment.
|
||||
## On UNIX this is the SYNDICATE_ROUTE environmental variable with a
|
||||
## fallack to a defaultRoute_.
|
||||
## See https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/raw/branch/main/schemas/gatekeeper.prs.
|
||||
var text = getEnv("SYNDICATE_ROUTE", defaultRoute)
|
||||
if text == "":
|
||||
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
|
||||
result.transports = @[initRecord("unix", tx)]
|
||||
result.pathSteps = @[capabilities.mint().toPreserves]
|
||||
else:
|
||||
var pr = parsePreserves(text)
|
||||
if not result.fromPreserves(pr):
|
||||
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
||||
|
||||
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
|
||||
## Resolve a capability from the calling environment
|
||||
## and call `bootProc`. See envRoute_.
|
||||
var resolved = false
|
||||
let
|
||||
ds = newDataspace(turn)
|
||||
pat = ResolvePath ?: {0: ?envRoute(), 3: ?:ResolvedAccepted}
|
||||
during(turn, ds, pat) do (dst: Cap):
|
||||
if not resolved:
|
||||
resolved = true
|
||||
bootProc(turn, dst)
|
||||
do:
|
||||
resolved = false
|
||||
spawnRelays(turn, ds)
|
||||
|
||||
# TODO: define a runActor that comes preloaded with relaying
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
## https://git.syndicate-lang.org/syndicate-lang/syndicate-rkt/src/commit/90c4c60699069b496491b81ee63b5a45ffd638cb/syndicate/HOWITWORKS.md
|
||||
|
||||
import std/[hashes, options, sets, tables]
|
||||
import std/[assertions, hashes, options, sets, tables]
|
||||
import preserves
|
||||
import ./actors, ./bags, ./patterns
|
||||
import ./protocols/dataspacePatterns
|
||||
|
@ -65,9 +65,9 @@ func isEmpty(cont: Continuation): bool =
|
|||
cont.cache.len == 0 and cont.leafMap.len == 0
|
||||
|
||||
type
|
||||
ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.}
|
||||
LeafProc = proc (l: Leaf; v: Value) {.gcsafe.}
|
||||
ObserverProc = proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) {.gcsafe.}
|
||||
ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
|
||||
LeafProc = proc (l: Leaf; v: Value) {.closure.}
|
||||
ObserverProc = proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
|
||||
|
||||
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
|
||||
result = cont.leafMap.getOrDefault(constPaths)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240208"
|
||||
version = "20240402"
|
||||
author = "Emery Hemingway"
|
||||
description = "Syndicated actors for conversational concurrency"
|
||||
license = "Unlicense"
|
||||
|
@ -9,4 +9,4 @@ srcDir = "src"
|
|||
|
||||
# Dependencies
|
||||
|
||||
requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208"
|
||||
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240312", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://git.sr.ht/~ehmry/nim_taps >= 20240402"
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
include_rules
|
||||
: foreach *.prs |> !preserves_schema_nim |> | {schema}
|
||||
: foreach t*.nim | ../../preserves-nim/<tests> {schema} $(SYNDICATE_PROTOCOL) |> !nim_run |> | ../<test>
|
||||
: foreach solo5*.nim | ../../taps/<sources> ../../preserves-nim/<tests> {schema} $(SYNDICATE_PROTOCOL) |> !nim_solo5_spt |> | ../<test>
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, asyncfile, parseopt]
|
||||
import std/[oserrors, parseopt, posix, strutils]
|
||||
import pkg/sys/[files, handles, ioqueue]
|
||||
import preserves, syndicate, syndicate/relays
|
||||
|
||||
type
|
||||
|
@ -10,33 +11,51 @@ type
|
|||
Says {.preservesRecord: "Says".} = object
|
||||
who, what: string
|
||||
|
||||
proc readStdin(facet: Facet; ds: Cap; username: string) =
|
||||
let file = openAsync("/dev/stdin")
|
||||
onStop(facet) do (turn: var Turn): close(file)
|
||||
close(stdin)
|
||||
proc readLine() {.gcsafe.} =
|
||||
let future = readLine(file)
|
||||
addCallback(future, facet) do (turn: var Turn):
|
||||
var msg = read(future)
|
||||
if msg == "": quit()
|
||||
message(turn, ds, Says(who: username, what: msg))
|
||||
readLine()
|
||||
readLine()
|
||||
proc syncAndStop(facet: Facet; cap: Cap) =
|
||||
## Stop the actor responsible for `facet` after
|
||||
## synchronizing with `cap`.
|
||||
run(facet) do (turn: var Turn):
|
||||
sync(turn, cap, stopActor)
|
||||
|
||||
proc readStdin(facet: Facet; ds: Cap; username: string) {.asyncio.} =
|
||||
let
|
||||
fd = stdin.getOsFileHandle()
|
||||
flags = fcntl(fd.cint, F_GETFL, 0)
|
||||
if flags < 0:
|
||||
raiseOSError(osLastError())
|
||||
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
|
||||
raiseOSError(osLastError())
|
||||
let
|
||||
file = newAsyncFile(FD fd)
|
||||
buf = new string
|
||||
buf[].setLen(0x1000)
|
||||
while true:
|
||||
let n = read(file, buf)
|
||||
if n < 1:
|
||||
stderr.writeLine "test_chat calls stopsActor ", facet.actor
|
||||
syncAndStop(facet, ds)
|
||||
return
|
||||
else:
|
||||
var msg = buf[][0..<n].strip
|
||||
proc send(turn: var Turn) =
|
||||
message(turn, ds, Says(who: username, what: msg))
|
||||
run(facet, send)
|
||||
|
||||
proc chat(turn: var Turn; ds: Cap; username: string) =
|
||||
during(turn, ds, ?:Present) do (who: string):
|
||||
echo who, " joined"
|
||||
do:
|
||||
echo who, " left"
|
||||
during(turn, ds, ?:Present) do (who: string):
|
||||
echo who, " joined"
|
||||
do:
|
||||
echo who, " left"
|
||||
|
||||
onMessage(turn, ds, ?:Says) do (who: string, what: string):
|
||||
echo who, ": ", what
|
||||
onMessage(turn, ds, ?:Says) do (who: string, what: string):
|
||||
echo who, ": ", what
|
||||
|
||||
discard publish(turn, ds, Present(username: username))
|
||||
readStdin(turn.facet, ds, username)
|
||||
discard publish(turn, ds, Present(username: username))
|
||||
|
||||
discard trampoline:
|
||||
whelp readStdin(turn.facet, ds, username)
|
||||
|
||||
proc main =
|
||||
let route = envRoute()
|
||||
var username = ""
|
||||
|
||||
for kind, key, val in getopt():
|
||||
|
@ -48,9 +67,8 @@ proc main =
|
|||
if username == "":
|
||||
stderr.writeLine "--user: unspecified"
|
||||
else:
|
||||
runActor("chat") do (turn: var Turn; root: Cap):
|
||||
spawnRelays(turn, root)
|
||||
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
||||
runActor("chat") do (turn: var Turn):
|
||||
resolveEnvironment(turn) do (turn: var Turn; ds: Cap):
|
||||
chat(turn, ds, username)
|
||||
|
||||
main()
|
||||
|
|
|
@ -4,14 +4,19 @@
|
|||
import std/times
|
||||
import syndicate, syndicate/actors/timers
|
||||
|
||||
proc now: float64 = getTime().toUnixFloat()
|
||||
runActor("timer-test") do (turn: var Turn):
|
||||
let timers = newDataspace(turn)
|
||||
spawnTimerActor(turn, timers)
|
||||
|
||||
runActor("test_timers") do (ds: Cap; turn: var Turn):
|
||||
onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do:
|
||||
stderr.writeLine "slept one second once"
|
||||
onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do:
|
||||
stderr.writeLine "slept one second twice"
|
||||
onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do:
|
||||
stderr.writeLine "slept one second thrice"
|
||||
quit()
|
||||
spawnTimers(turn, ds)
|
||||
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
|
||||
echo "now in 13th bʼakʼtun"
|
||||
|
||||
after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn):
|
||||
echo "third timer expired"
|
||||
stopActor(turn)
|
||||
|
||||
after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn):
|
||||
echo "first timer expired"
|
||||
|
||||
after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn):
|
||||
echo "second timer expired"
|
||||
|
|
Loading…
Reference in New Issue