diff --git a/Tuprules.tup b/Tuprules.tup index 5eb62bf..5828100 100644 --- a/Tuprules.tup +++ b/Tuprules.tup @@ -1,2 +1,5 @@ include depends.tup NIM_GROUPS += $(TUP_CWD)/ +NIM_FLAGS += --path:$(TUP_CWD)/../cps +NIM_FLAGS += --path:$(TUP_CWD)/../solo5_dispatcher/pkg +NIM_FLAGS += --path:$(TUP_CWD)/../taps/src diff --git a/lock.json b/lock.json index 585f031..27592ff 100644 --- a/lock.json +++ b/lock.json @@ -16,11 +16,33 @@ "packages": [ "cps" ], - "path": "/nix/store/452hfhasrn3gl6vijfmzs69djl099j0j-source", - "rev": "b7c179f172e3a256a482a9daee3c0815ea423206", - "sha256": "1sn9s7iv83sw1jl5jgi2h7b0xpgsn13f9icp5124jvbp0qkxskx2", + "path": "/nix/store/8gbhwni0akqskdb3qhn5nfgv6gkdz0vz-source", + "rev": "c90530ac57f98a842b7be969115c6ef08bdcc564", + "sha256": "0h8ghs2fqg68j3jdcg7grnxssmllmgg99kym2w0a3vlwca1zvr62", "srcDir": "", - "url": "https://github.com/nim-works/cps/archive/b7c179f172e3a256a482a9daee3c0815ea423206.tar.gz" + "url": "https://github.com/ehmry/cps/archive/c90530ac57f98a842b7be969115c6ef08bdcc564.tar.gz" + }, + { + "method": "fetchzip", + "packages": [ + "getdns" + ], + "path": "/nix/store/x9xmn7w4k6jg8nv5bnx148ibhnsfh362-source", + "rev": "c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6", + "sha256": "1sbgx2x51szr22i72n7c8jglnfmr8m7y7ga0v85d58fwadiv7g6b", + "srcDir": "src", + "url": "https://git.sr.ht/~ehmry/getdns-nim/archive/c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6.tar.gz" + }, + { + "method": "fetchzip", + "packages": [ + "getdns" + ], + "path": "/nix/store/x9xmn7w4k6jg8nv5bnx148ibhnsfh362-source", + "rev": "c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6", + "sha256": "1sbgx2x51szr22i72n7c8jglnfmr8m7y7ga0v85d58fwadiv7g6b", + "srcDir": "src", + "url": "https://git.sr.ht/~ehmry/getdns-nim/archive/c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6.tar.gz" }, { "method": "fetchzip", @@ -38,22 +60,22 @@ "packages": [ "nimcrypto" ], - "path": "/nix/store/jwz8pqbv6rsm8w4fjzdb37r0wzjn5hv0-source", - "rev": "d58da671799c69c0b3208b96c154e13c8b1a9e90", - "sha256": "12dm0gsy10ppga7zf7hpf4adaqjrd9b740n2w926xyazq1njf6k9", + "path": "/nix/store/fkrcpp8lzj2yi21na79xm63xk0ggnqsp-source", + "rev": "f147d30c69bc1c9bcf0e37f7699bcf0fbaab97b5", + "sha256": "1h3dzdbc9kacwpi10mj73yjglvn7kbizj1x8qc9099ax091cj5xn", "srcDir": "", - "url": "https://github.com/cheatfate/nimcrypto/archive/d58da671799c69c0b3208b96c154e13c8b1a9e90.tar.gz" + "url": "https://github.com/cheatfate/nimcrypto/archive/f147d30c69bc1c9bcf0e37f7699bcf0fbaab97b5.tar.gz" }, { "method": "fetchzip", "packages": [ "npeg" ], - "path": "/nix/store/ffkxmjmigfs7zhhiiqm0iw2c34smyciy-source", - "rev": "26d62fdc40feb84c6533956dc11d5ee9ea9b6c09", - "sha256": "0xpzifjkfp49w76qmaylan8q181bs45anmp46l4bwr3lkrr7bpwh", + "path": "/nix/store/xpn694ibgipj8xak3j4bky6b3k0vp7hh-source", + "rev": "ec0cc6e64ea4c62d2aa382b176a4838474238f8d", + "sha256": "1fi9ls3xl20bmv1ikillxywl96i9al6zmmxrbffx448gbrxs86kg", "srcDir": "src", - "url": "https://github.com/zevv/npeg/archive/26d62fdc40feb84c6533956dc11d5ee9ea9b6c09.tar.gz" + "url": "https://github.com/zevv/npeg/archive/ec0cc6e64ea4c62d2aa382b176a4838474238f8d.tar.gz" }, { "method": "fetchzip", @@ -87,6 +109,67 @@ "sha256": "1q4qgw4an4mmmcbx48l6xk1jig1vc8p9cq9dbx39kpnb0890j32q", "srcDir": "src", "url": "https://github.com/ehmry/nim-sys/archive/4ef3b624db86e331ba334e705c1aa235d55b05e1.tar.gz" + }, + { + "method": "fetchzip", + "packages": [ + "sys" + ], + "path": "/nix/store/vf9ls2wip6d8xhsi3rjh0dqsqg597i6b-source", + "rev": "c117ee60542f084525f254e6ade590675a6a2ed6", + "sha256": "12qzx2lnh84xqfgypy0pka8nflq0y8n1izfwx8mb4zya5nzawmyf", + "srcDir": "src", + "url": "https://github.com/alaviss/nim-sys/archive/c117ee60542f084525f254e6ade590675a6a2ed6.tar.gz" + }, + { + "method": "fetchzip", + "packages": [ + "taps" + ], + "path": "/nix/store/n86g5fw60z1k53bn35zvrwlwmyk3ixdn-source", + "rev": "756cb07b4f874181ad34c370cad6082ee65f646d", + "sha256": "0dp7ml3kj2fi6isvjkkxf02hwj0gshx6qra0ghnk2cbfykbcgfp8", + "srcDir": "src", + "url": "https://git.sr.ht/~ehmry/nim_taps/archive/756cb07b4f874181ad34c370cad6082ee65f646d.tar.gz" + }, + { + "date": "2024-04-02T15:38:57+01:00", + "deepClone": false, + "fetchLFS": false, + "fetchSubmodules": true, + "hash": "sha256-iZb9aAgYr4FGkqfIg49QWiCqeizIi047kFhugHiP8o0=", + "leaveDotGit": false, + "method": "git", + "packages": [ + "solo5_dispatcher" + ], + "path": "/nix/store/sf5dgj2ljvahcm6my7d61ibda51vnrii-solo5_dispatcher", + "rev": "a7a894a96a2221284012800e6fd32923d83d20bd", + "sha256": "13gjixw80vjqj0xlx2y85ixal82sa27q7j57j9383bqq11lgv5l9", + "srcDir": "pkg", + "url": "https://git.sr.ht/~ehmry/solo5_dispatcher" + }, + { + "method": "fetchzip", + "packages": [ + "cps" + ], + "path": "/nix/store/phdf6siqbhj7vx4qq507lzla81si60iz-source", + "rev": "58772ff9ddb38a4b2ec52da142d8532ba2fe7039", + "sha256": "1lph7v27nqwgm3a0ssi8q348gjrkjwgqc50agw38j7xif6wj80cw", + "srcDir": "", + "url": "https://github.com/ehmry/cps/archive/58772ff9ddb38a4b2ec52da142d8532ba2fe7039.tar.gz" + }, + { + "method": "fetchzip", + "packages": [ + "stew" + ], + "path": "/nix/store/mqg8qzsbcc8xqabq2yzvlhvcyqypk72c-source", + "rev": "3c91b8694e15137a81ec7db37c6c58194ec94a6a", + "sha256": "17lfhfxp5nxvld78xa83p258y80ks5jb4n53152cdr57xk86y07w", + "srcDir": "", + "url": "https://github.com/status-im/nim-stew/archive/3c91b8694e15137a81ec7db37c6c58194ec94a6a.tar.gz" } ] } diff --git a/shell.nix b/shell.nix index 3a6cdd4..f3076d4 100644 --- a/shell.nix +++ b/shell.nix @@ -1,5 +1,8 @@ { pkgs ? import { } }: + pkgs.buildNimPackage { name = "dummy"; lockFile = ./lock.json; + buildInputs = builtins.attrValues { inherit (pkgs) getdns solo5; }; + nativeBuildInputs = builtins.attrValues { inherit (pkgs) pkg-config solo5; }; } diff --git a/src/syndicate.nim b/src/syndicate.nim index 63ada31..e296e12 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -186,3 +186,15 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) = raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`) `callbackProc` discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`)) + +when defined(solo5): + echo """ + ______ + / \_\ + / ,__/ \ ____ __ + /\__/ \, \ _______ ______ ____/ /_/________ / /____ + \/ \__/ / / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \ + \ ' \__/ _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/ + \____/_/ /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/ + /____/ +""" diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index 7f88b22..683a4ca 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -1,18 +1,24 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[deques, hashes, monotimes, options, sets, tables, times] -import pkg/cps -import pkg/sys/ioqueue +import std/[assertions, deques, hashes, monotimes, options, sets, tables, times] +import cps import preserves -import ../syndicate/protocols/[protocol, sturdy, trace] +import ../syndicate/protocols/[protocol, sturdy] +import ../syndicate/protocols/trace + +when defined(solo5): + import solo5_dispatcher +else: + import pkg/sys/ioqueue const tracing = defined(traceSyndicate) when tracing: import std/streams - from std/os import getEnv + when not defined(solo5): + from std/os import getEnv export Handle @@ -81,30 +87,41 @@ type var turnQueue {.threadvar.}: Deque[Turn] when tracing: - proc openTraceStream: FileStream = - let path = getEnv("SYNDICATE_TRACE_FILE") - case path - of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset" - of "-": result = newFileStream(stderr) - else: result = openFileStream(path, fmWrite) + when defined(solo5): + proc traceActivation(actor: Actor; act: ActorActivation) = + discard #[ + echo TraceEntry( + timestamp: getTime().toUnixFloat(), + actor: initRecord("named", actor.name.toPreserves), + item: act, + ).toPreserves + ]# + else: + proc openTraceStream: FileStream = + let path = getEnv("SYNDICATE_TRACE_FILE") + case path + of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset" + of "-": result = newFileStream(stderr) + else: result = openFileStream(path, fmWrite) + + let traceStream = openTraceStream() + + proc traceActivation(actor: Actor; act: ActorActivation) = + if not traceStream.isNil: + var entry = TraceEntry( + timestamp: getTime().toUnixFloat(), + actor: initRecord("named", actor.name.toPreserves), + item: act) + traceStream.write(entry.toPreserves) + traceStream.flush() - let traceStream = openTraceStream() var turnIdAllocator: uint proc nextTurnId(): TurnId = inc(turnIdAllocator) turnIdAllocator.toPreserves - proc trace(actor: Actor; act: ActorActivation) = - if not traceStream.isNil: - var entry = TraceEntry( - timestamp: getTime().toUnixFloat(), - actor: initRecord("named", actor.name.toPreserves), - item: act) - traceStream.write(entry.toPreserves) - traceStream.flush() - - proc path(facet: Facet): seq[trace.FacetId] = + proc path(facet: Facet): seq[FacetId] = var f = facet while not f.isNil: result.add f.id.toPreserves @@ -120,7 +137,7 @@ when tracing: result = ActionDescription(orKind: ActionDescriptionKind.dequeue) result.dequeue.event = move act.enqueue.event - proc toTraceTarget(cap: Cap): trace.Target = + proc toTraceTarget(cap: Cap): Target = assert not cap.target.isNil assert not cap.target.facet.isNil result.actor = cap.target.facet.actor.id @@ -165,7 +182,6 @@ when tracing: proc `$`*(r: Cap): string = "" - proc `$`*(t: Turn): string = "" @@ -361,7 +377,7 @@ proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) = act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves act.enqueue.event.target.facet = turn.facet.id.toPreserves act.enqueue.event.target.oid = cap.target.oid.toPreserves - act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert) + act.enqueue.event.detail = trace.TurnEvent(orKind: trace.TurnEventKind.assert) act.enqueue.event.detail.assert.assertion.value.value = mapEmbeds(v) do (cap: Value) -> Value: discard act.enqueue.event.detail.assert.handle = h @@ -532,7 +548,7 @@ proc newActor(name: string; parent: Facet): Actor = var act = ActorActivation(orKind: ActorActivationKind.start) act.start.actorName = Name(orKind: NameKind.named) act.start.actorName.named.name = name.toPreserves - trace(result, act) + traceActivation(result, act) proc run(actor: Actor; bootProc: TurnAction; initialAssertions: OutboundTable) = queueTurn(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc)) @@ -603,7 +619,7 @@ proc terminateActor(turn; reason: ref Exception) = if not reason.isNil: act.stop.status = ExitStatus(orKind: ExitStatusKind.Error) act.stop.status.error.message = reason.msg - trace(actor, act) + traceActivation(actor, act) while actor.exitHooks.len > 0: var hook = actor.exitHooks.pop() try: hook(turn) @@ -704,7 +720,7 @@ proc run(turn: var Turn) = when tracing: var act = ActorActivation(orKind: ActorActivationKind.turn) act.turn = move turn.desc - trace(turn.facet.actor, act) + traceActivation(turn.facet.actor, act) # TODO: catch exceptions here for eff in turn.effects.mvalues: assert not eff.facet.isNil @@ -717,30 +733,35 @@ proc runPendingTurns* = # TODO: check if actor is still valid try: run(turn) except CatchableError as err: - stderr.writeLine("actor ", turn.actor.name, " threw an error during a turn") terminateActor(turn, err) + raise err proc run* = ## Run actors to completion - var ready: seq[Continuation] - while true: - runPendingTurns() - ioqueue.poll(ready) - if ready.len == 0: break - while ready.len > 0: - try: + when defined(solo5): + while turnQueue.len > 0 or solo5_dispatcher.runOnce(): + runPendingTurns() + else: + var ready: seq[Continuation] + while true: + runPendingTurns() + ioqueue.poll(ready) + if ready.len == 0: break + while ready.len > 0: discard trampoline: ready.pop() - except CatchableError as err: - stderr.writeLine "ioqueue continuation threw an error" - raise err proc runActor*(name: string; bootProc: TurnAction) = ## Boot an actor `Actor` and churn ioqueue. let actor = bootActor(name, bootProc) if not actor.exitReason.isNil: raise actor.exitReason - actors.run() + when defined(solo5): + runPendingTurns() + while (actor.isAlive and solo5_dispatcher.runOnce()) or turnQueue.len > 0: + runPendingTurns() + else: + actors.run() if not actor.exitReason.isNil: raise actor.exitReason diff --git a/src/syndicate/actors/timers.nim b/src/syndicate/actors/timers.nim index 9fbdfa3..c668127 100644 --- a/src/syndicate/actors/timers.nim +++ b/src/syndicate/actors/timers.nim @@ -1,18 +1,48 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[sets, times] -import pkg/sys/[handles, ioqueue] +import std/times import preserves import ../../syndicate, ../bags, ../protocols/[timer, dataspace] +when defined(solo5): + import solo5_dispatcher +else: + import pkg/sys/[handles, ioqueue] + export timer type Observe = dataspace.Observe -when defined(linux): - import std/[oserrors, posix] +when defined(solo5): + import solo5, solo5_dispatcher + + proc wallFloat: float = + solo5_clock_wall().float / 1_000_000_000.0 + + type + TimerDriver = ref object + facet: Facet + ## Owning facet of driver. + target: Cap + ## Destination for LaterThan assertions. + deadlines: Bag[float] + ## Deadlines that other actors are observing. + + proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver = + TimerDriver(facet: facet, target: cap) + + proc await(driver: TimerDriver; deadline: float) {.solo5dispatch.} = + yieldUntil(deadline) + if deadline in driver.deadlines: + # check if the deadline is still observed + proc turnWork(turn: var Turn) = + discard publish(turn, driver.target, LaterThan(seconds: deadline)) + run(driver.facet, turnWork) + +else: + import std/[oserrors, posix, sets] type Time = posix.Time {.pragma: timerfd, importc, header: "".} @@ -42,9 +72,11 @@ when defined(linux): result.tv_sec = Time(f) result.tv_nsec = clong(uint64(f * 1_000_000_000) mod 1_000_000_000) - proc clock_realtime: Timespec = - if clock_gettime(CLOCK_REALTIME, result) < 0: + proc wallFloat: float = + var ts: Timespec + if clock_gettime(CLOCK_REALTIME, ts) < 0: raiseOSError(osLastError(), "clock_gettime") + ts.toFloat type TimerDriver = ref object @@ -83,7 +115,7 @@ when defined(linux): if timerfd_settime(fd, TFD_TIMER_ABSTIME, its, old) < 0: raiseOSError(osLastError(), "failed to set timeout") driver.timers.incl(fd) - while clock_realtime() < its.it_value: + while wallFloat() < deadline: # Check if the timer is expired which # could happen before waiting. wait(FD fd, Read) @@ -95,21 +127,21 @@ when defined(linux): discard close(fd) driver.timers.excl(fd) - proc spawnTimerActor*(turn: var Turn; ds: Cap): Actor {.discardable.} = - ## Spawn a timer actor that responds to - ## dataspace observations of timeouts on `ds`. - linkActor("timers", turn) do (turn: var Turn): - let driver = spawnTimerDriver(turn.facet, ds) - let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) - during(turn, ds, pat) do (deadline: float): - if change(driver.deadlines, deadline, +1) == cdAbsentToPresent: - discard trampoline(whelp await(driver, deadline)) - do: - discard change(driver.deadlines, deadline, -1, clamp = true) - # TODO: retract assertions that are unobserved. +proc spawnTimerActor*(turn: var Turn; ds: Cap): Actor {.discardable.} = + ## Spawn a timer actor that responds to + ## dataspace observations of timeouts on `ds`. + linkActor(turn, "timers") do (turn: var Turn): + let driver = spawnTimerDriver(turn.facet, ds) + let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) + during(turn, ds, pat) do (deadline: float): + if change(driver.deadlines, deadline, +1) == cdAbsentToPresent: + discard trampoline(whelp await(driver, deadline)) + do: + discard change(driver.deadlines, deadline, -1, clamp = true) + # TODO: retract assertions that are unobserved. proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) = ## Execute `act` after some duration of time. - var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0 + var later = wallFloat() + dur.inMilliseconds.float / 1_000.0 onPublish(turn, ds, grab LaterThan(seconds: later)): act(turn) diff --git a/src/syndicate/bags.nim b/src/syndicate/bags.nim index 0b9ae42..492a3d5 100644 --- a/src/syndicate/bags.nim +++ b/src/syndicate/bags.nim @@ -4,7 +4,7 @@ ## An unordered association of items to counts. ## An item count may be negative, unlike CountTable. -import tables +import std/[assertions, tables] type ChangeDescription* = enum diff --git a/src/syndicate/patterns.nim b/src/syndicate/patterns.nim index 3d77926..7fa0f92 100644 --- a/src/syndicate/patterns.nim +++ b/src/syndicate/patterns.nim @@ -1,7 +1,7 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[algorithm, options, sequtils, tables, typetraits] +import std/[algorithm, assertions, options, sequtils, tables, typetraits] import preserves import ./protocols/dataspacePatterns diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 1cd124b..f631366 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -2,10 +2,12 @@ # SPDX-License-Identifier: Unlicense import std/[options, tables] -from std/os import getEnv, `/` -import pkg/sys/ioqueue import preserves -import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress] +import ../syndicate, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress] + +when defined(posix): + import ./capabilities + from std/os import getEnv, `/` when defined(traceSyndicate): when defined(posix): @@ -17,7 +19,7 @@ else: export `$` -export Stdio, Tcp, WebSocket, Unix +export Route, Stdio, Tcp, WebSocket, Unix type Assertion = Value @@ -249,6 +251,11 @@ proc recv(relay: Relay; buf: openarray[byte]; slice: Slice[int]) = var pr = decode(relay.wireBuf) if pr.isSome: dispatch(relay, pr.get) +proc recv(relay: Relay; buf: openarray[byte]) = + feed(relay.wireBuf, buf) + var pr = decode(relay.wireBuf) + if pr.isSome: dispatch(relay, pr.get) + type RelayOptions* = object of RootObj packetWriter*: PacketWriter @@ -292,10 +299,14 @@ proc accepted(cap: Cap): Resolved = result = Resolved(orKind: ResolvedKind.accepted) result.accepted.responderSession = cap +type ShutdownEntity = ref object of Entity +method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = + stopActor(e.facet) + when defined(posix): import std/[oserrors, posix] - import pkg/sys/[files, handles, sockets] + import pkg/sys/[files, handles, ioqueue, sockets] export transportAddress.Unix type StdioEntity = ref object of Entity @@ -375,10 +386,6 @@ when defined(posix): if ass.value.preservesTo(ForceDisconnect).isSome: entity.alive = false - type ShutdownEntity = ref object of Entity - method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = - stopActor(e.facet) - template bootSocketEntity() {.dirty.} = proc setup(turn: var Turn) {.closure.} = proc kill(turn: var Turn) = @@ -434,6 +441,73 @@ when defined(posix): let entity = UnixEntity() spawnSocketRelay() +elif defined(solo5): + + import solo5_dispatcher + import taps + + type + TcpEntity = ref object of Entity + relay: Relay + conn: Connection + decoder: BufferedDecoder + + method message(entity: TcpEntity; turn: var Turn; ass: AssertionRef) = + if ass.value.preservesTo(ForceDisconnect).isSome: + entity.conn.abort() + + proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) = + let entity = TcpEntity(facet: turn.facet) + + proc writeConn(turn: var Turn; buf: seq[byte]) = + assert not entity.conn.isNil + entity.conn.batch: + entity.conn.send(buf) + var ops = RelayActorOptions( + packetWriter: writeConn, + initialOid: 0.Oid.some, + ) + spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay): + entity.facet = turn.facet + entity.relay = relay + + var ep = newRemoteEndpoint() + if ta.host.isIpAddress: + ep.with ta.host.parseIpAddress + else: + ep.withHostname ta.host + ep.with ta.port.Port + + var tp = newTransportProperties() + tp.require "reliability" + tp.ignore "congestion-control" + tp.ignore "preserve-order" + + var preconn = newPreconnection( + remote=[ep], transport=tp.some) + entity.conn = preconn.initiate() + entity.facet.onStop do (turn: var Turn): + entity.conn.close() + entity.conn.onConnectionError do (err: ref Exception): + run(entity.facet) do (turn: var Turn): + terminate(turn, err) + entity.conn.onClosed(): + stop(entity.facet) + entity.conn.onReceivedPartial do (data: seq[byte]; ctx: MessageContext; eom: bool): + entity.relay.recv(data) + if eom: + stop(entity.facet) + else: + entity.conn.receive() + entity.conn.onReady do (): + entity.facet.run do (turn: var Turn): + publish(turn, ds, TransportConnection( + `addr`: ta.toPreserves, + control: newCap(entity, turn), + resolved: entity.relay.peer.accepted, + )) + entity.conn.receive() + proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) = if stepOff < route.pathSteps.len: let @@ -503,9 +577,19 @@ proc spawnRelays*(turn: var Turn; ds: Cap) = # Use a generic pattern and type matching # in the during handler because it is easy. - let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio}) - during(turn, ds, stdioPat) do: - connectTransport(turn, ds, Stdio()) + when defined(posix): + let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio}) + during(turn, ds, stdioPat) do: + connectTransport(turn, ds, Stdio()) + + # TODO: unix pattern + during(turn, ds, transPat) do (ta: Literal[transportAddress.Unix]): + try: connectTransport(turn, ds, ta.value) + except exceptions.IOError as e: + publish(turn, ds, TransportConnection( + `addr`: ta.toPreserve, + resolved: rejected(embed e), + )) # TODO: tcp pattern during(turn, ds, transPat) do (ta: Literal[transportAddress.Tcp]): @@ -516,15 +600,6 @@ proc spawnRelays*(turn: var Turn; ds: Cap) = resolved: rejected(embed e), )) - # TODO: unix pattern - during(turn, ds, transPat) do (ta: Literal[transportAddress.Unix]): - try: connectTransport(turn, ds, ta.value) - except exceptions.IOError as e: - publish(turn, ds, TransportConnection( - `addr`: ta.toPreserve, - resolved: rejected(embed e), - )) - let resolvePat = ?Observe(pattern: !ResolvePath) ?? {0: grab()} during(turn, ds, resolvePat) do (route: Literal[Route]): for i, transAddr in route.value.transports: @@ -537,41 +612,42 @@ proc spawnRelays*(turn: var Turn; ds: Cap) = type BootProc* = proc (turn: var Turn; ds: Cap) {.closure.} -const defaultRoute* = "]>" - -proc envRoute*: Route = - ## Get an route to a Syndicate capability from the calling environment. - ## On UNIX this is the SYNDICATE_ROUTE environmental variable with a - ## fallack to a defaultRoute_. - ## See https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/raw/branch/main/schemas/gatekeeper.prs. - var text = getEnv("SYNDICATE_ROUTE", defaultRoute) - if text == "": - var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves - result.transports = @[initRecord("unix", tx)] - result.pathSteps = @[capabilities.mint().toPreserves] - else: - var pr = parsePreserves(text) - if not result.fromPreserves(pr): - raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr) - proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) = ## Resolve `route` within `ds` and call `bootProc` with resolved capabilities. during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap): bootProc(turn, dst) -proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) = - ## Resolve a capability from the calling environment - ## and call `bootProc`. See envRoute_. - var resolved = false - let - ds = newDataspace(turn) - pat = ResolvePath ?: {0: ?envRoute(), 3: ?:ResolvedAccepted} - during(turn, ds, pat) do (dst: Cap): - if not resolved: - resolved = true - bootProc(turn, dst) - do: - resolved = false - spawnRelays(turn, ds) +when defined(posix): + const defaultRoute* = "]>" + + proc envRoute*: Route = + ## Get an route to a Syndicate capability from the calling environment. + ## On UNIX this is the SYNDICATE_ROUTE environmental variable with a + ## fallack to a defaultRoute_. + ## See https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/raw/branch/main/schemas/gatekeeper.prs. + var text = getEnv("SYNDICATE_ROUTE", defaultRoute) + if text == "": + var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves + result.transports = @[initRecord("unix", tx)] + result.pathSteps = @[capabilities.mint().toPreserves] + else: + var pr = parsePreserves(text) + if not result.fromPreserves(pr): + raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr) + + proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) = + ## Resolve a capability from the calling environment + ## and call `bootProc`. See envRoute_. + var resolved = false + let + ds = newDataspace(turn) + pat = ResolvePath ?: {0: ?envRoute(), 3: ?:ResolvedAccepted} + during(turn, ds, pat) do (dst: Cap): + if not resolved: + resolved = true + bootProc(turn, dst) + do: + resolved = false + spawnRelays(turn, ds) # TODO: define a runActor that comes preloaded with relaying diff --git a/src/syndicate/skeletons.nim b/src/syndicate/skeletons.nim index d231112..2b3c583 100644 --- a/src/syndicate/skeletons.nim +++ b/src/syndicate/skeletons.nim @@ -3,7 +3,7 @@ ## https://git.syndicate-lang.org/syndicate-lang/syndicate-rkt/src/commit/90c4c60699069b496491b81ee63b5a45ffd638cb/syndicate/HOWITWORKS.md -import std/[hashes, options, sets, tables] +import std/[assertions, hashes, options, sets, tables] import preserves import ./actors, ./bags, ./patterns import ./protocols/dataspacePatterns diff --git a/syndicate.nimble b/syndicate.nimble index 25a4d15..3e0dc02 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20240327" +version = "20240402" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense" @@ -9,4 +9,4 @@ srcDir = "src" # Dependencies -requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240312", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps" +requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240312", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://git.sr.ht/~ehmry/nim_taps >= 20240402" diff --git a/tests/Tupfile b/tests/Tupfile index 944a36b..9ee1533 100644 --- a/tests/Tupfile +++ b/tests/Tupfile @@ -1,3 +1,4 @@ include_rules : foreach *.prs |> !preserves_schema_nim |> | {schema} : foreach t*.nim | ../../preserves-nim/ {schema} $(SYNDICATE_PROTOCOL) |> !nim_run |> | ../ +: foreach solo5*.nim | ../../taps/ ../../preserves-nim/ {schema} $(SYNDICATE_PROTOCOL) |> !nim_solo5_spt |> | ../ diff --git a/tests/test_timers.nim b/tests/test_timers.nim index b39d030..f4830d2 100644 --- a/tests/test_timers.nim +++ b/tests/test_timers.nim @@ -2,10 +2,9 @@ # SPDX-License-Identifier: Unlicense import std/times -import pkg/sys/ioqueue import syndicate, syndicate/actors/timers -let actor = bootActor("timer-test") do (turn: var Turn): +runActor("timer-test") do (turn: var Turn): let timers = newDataspace(turn) spawnTimerActor(turn, timers) @@ -21,6 +20,3 @@ let actor = bootActor("timer-test") do (turn: var Turn): after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn): echo "second timer expired" - -echo "single run of ioqueue" -ioqueue.run()