diff --git a/src/syndicate.nim b/src/syndicate.nim index 6c18d65..5d51f1b 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -3,36 +3,17 @@ ## This module implements the `Syndicate DSL `_. -runnableExamples: - from syndicate/protocols/simpleChatProtocol import Present, Says - import std/asyncdispatch - - discard bootDataspace("example") do (ds: Ref; turn: var Turn): - let - me = "user" - presenceHandle = publish(turn, ds, Present(username: me)) - - onMessage(turn, ds, ?Says) do (who: string; what: string): - echo who, ": ", what - retract(turn, presenceHandle) - - during(turn, ds, ?Present) do (username: string): - echo "[", username, " arrived]" - message(turn, ds, Says(who: me, what: "users are losers")) - do: - echo "[", username, "departed]" - import std/[asyncdispatch, macros, tables, typetraits] + import preserves +export fromPreserve, toPreserve + import ./syndicate/[actors, dataspaces, durings, patterns] import ./syndicate/protocols/dataspace when defined(posix): - from ./syndicate/relays import connectStdio, connectUnix, SturdyRef - export connectStdio, connectUnix -else: - from ./syndicate/relays import SturdyRef -export SturdyRef + from ./syndicate/relays import Tcp, Unix, connect, connectStdio + export Tcp, Unix, connect, connectStdio export Actor, Assertion, Facet, Handle, Ref, Symbol, Turn, TurnAction, `$`, addCallback, analyse, asyncCheck, bootDataspace, diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index ef34568..a6b0902 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -19,10 +19,10 @@ generateIdType(FieldId) generateIdType(TurnId) type - Attenuation = sturdy.Attenuation[Ref] Oid = sturdy.Oid Assertion* = Preserve[Ref] Caveat = sturdy.Caveat[Ref] + Attenuation = seq[Caveat] Rewrite = sturdy.Rewrite[Ref] Entity* = ref object of RootObj @@ -199,7 +199,7 @@ proc instantiate(t: Template; bindings: Bindings): Assertion = raise newException(ValueError, "Attempt to attenuate non-capability") result = embed(attenuate(v.embed, t.tattenuate.attenuation)) of TemplateKind.TRef: - let n = $t.tref.binding + let n = $t.tref.binding.int try: result = bindings[toPreserve(n, Ref)] except KeyError: raise newException(ValueError, "unbound reference: " & n) @@ -227,12 +227,14 @@ proc rewrite(r: Rewrite; v: Assertion): Assertion = proc examineAlternatives(cav: Caveat; v: Assertion): Assertion = case cav.orKind - of CaveatKind.`Rewrite`: + of CaveatKind.Rewrite: result = rewrite(cav.rewrite, v) - of CaveatKind.`Alts`: + of CaveatKind.Alts: for r in cav.alts.alternatives: result = rewrite(r, v) if not result.isFalse: break + of CaveatKind.Reject: discard + of CaveatKind.unknown: discard proc runRewrites*(a: Attenuation; v: Assertion): Assertion = result = v @@ -466,7 +468,7 @@ proc addCallback*(fut: FutureBase; turn: var Turn; act: TurnAction) = elif fut.finished: enqueue(turn, turn.facet, act) else: - addCallback(fut, turn.facet, act) + addCallback(fut, turn.facet, act) proc stop*(turn: var Turn, facet: Facet) = enqueue(turn, facet.parent.get) do (turn: var Turn): diff --git a/src/syndicate/capabilities.nim b/src/syndicate/capabilities.nim index 4b4fbb5..f0a8227 100644 --- a/src/syndicate/capabilities.nim +++ b/src/syndicate/capabilities.nim @@ -1,6 +1,12 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense +runnableExamples: + from std/unittest import check + let sturdy = mint() + check $sturdy == """""" + +import std/options from std/sequtils import toSeq import hashlib/misc/blake2 @@ -14,28 +20,34 @@ proc hmac(key, data: openarray[byte]): seq[byte] = count[Hmac[BLAKE2S_256]](key, data).data[0..15].toSeq proc mint*[T](key: openarray[byte]; oid: Preserve[T]): SturdyRef[T] = - SturdyRef[T](oid: oid, sig: hmac(key, encode oid)) - -proc mint*[T](key: openarray[byte]; oid: T; E = void): SturdyRef[E] = - var oidPr = toPreserve(oid, E) - SturdyRef[E](oid: oidPr, sig: hmac(key, encode oidPr)) + SturdyRef[T](parameters: { + "oid": oid, + "sig": hmac(key, encode(oid)).toPreserve(T), + }.toDictionary, + ) proc mint*(): SturdyRef[Ref] = var key: array[16, byte] - cast[SturdyRef[Ref]](mint(key, "syndicate", Ref)) + mint(key, toPreserve("syndicate", Ref)) -proc attenuate*[T](r: SturdyRef[T]; caveats: Attenuation): SturdyRef[T] = +proc attenuate*[T](r: SturdyRef[T]; caveats: seq[Caveat]): SturdyRef[T] = result = SturdyRef[T]( oid: r.oid, caveatChain: r.caveatChain, sig: hmac(r.sig, encode caveats)) result.caveatChain.add caveats -proc validate*[T](key: openarray[byte]; r: SturdyRef[T]): bool = - var sig = hmac(key, encode r.oid) - for a in r.caveatChain: - sig = hmac(sig, encode a) - r.sig == sig +proc validate*[T](key: openarray[byte]; sturdy: SturdyRef[T]): bool = + let oid = step(sturdy.parameters, Symbol"oid") + if oid.isSome: + let ctrl = step(sturdy.parameters, Symbol"sig") + if ctrl.isSome: + var sig = hmac(key, oid.get.encode) + let caveats = step(sturdy.parameters, Symbol"caveats") + if caveats.isSome and caveats.get.isSequence: + for cav in caveats.get.sequence: + sig = hmac(sig, encode cav) + result = (sig == ctrl.get.bytes) when isMainModule: from os import commandLineParams diff --git a/src/syndicate/membranes.nim b/src/syndicate/membranes.nim index 23d8439..bb0675d 100644 --- a/src/syndicate/membranes.nim +++ b/src/syndicate/membranes.nim @@ -11,6 +11,7 @@ proc hash(r: Ref): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash) type Membrane* = object ## Bidirectional mapping between `Oid` and `Ref` values. + ## https://synit.org/book/protocol.html#membranes byOid: Table[Oid, WireSymbol] byRef: Table[Ref, WireSymbol] diff --git a/src/syndicate/patterns.nim b/src/syndicate/patterns.nim index ef1c259..4ca1422 100644 --- a/src/syndicate/patterns.nim +++ b/src/syndicate/patterns.nim @@ -93,13 +93,11 @@ proc grab*[T](pr: Preserve[T]): Pattern = proc grab*[T](val: T): Pattern = ## Construct a `Pattern` from value of type `T`. runnableExamples: - import syndicate/protocols/simpleChatProtocol from std/unittest import check check: $grab(true) == "" $grab(3.14) == "" $grab([0, 1, 2, 3]) == " ]>" - $grab(Present(username: "Carol")) == """]>""" grab (toPreserve(val, Ref)) proc patternOfType(typ: static typedesc; `bind`: static bool): Pattern = @@ -203,7 +201,6 @@ proc grab*(typ: static typedesc; bindings: sink openArray[(int, Pattern)]): Patt proc grabLit*(): Pattern = runnableExamples: - import syndicate/protocols/simpleChatProtocol from std/unittest import check check: $grabLit() == """>]>""" @@ -215,11 +212,6 @@ proc grabDict*(): Pattern = proc inject*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern = ## Construct a `Pattern` from `pat` with injected overrides from `bindings`. ## Injects are made at offsets indexed by the discard (`<_>`) patterns in `pat`. - runnableExamples: - import syndicate/protocols/simpleChatProtocol - from std/unittest import check - check: - $inject(dropType(Says), {1: grabLit()}) == """ >]>]>""" proc inject(pat: Pattern; bindings: openArray[(int, Pattern)]; offset: var int): Pattern = case pat.orKind of PatternKind.DDiscard: @@ -312,17 +304,6 @@ func projectPaths*(v: Value; paths: seq[Path]): seq[Value] = if vv.isSome: result[i] = get(vv) func matches*(pat: Pattern; pr: Value): bool = - runnableExamples: - import preserves - from syndicate/actors import Ref - from std/unittest import check, checkpoint - - import syndicate/protocols/simpleChatProtocol - - let pat = grabType(Says) - let val = parsePreserves("""""", Ref) - check matches(pat, val) - let analysis = analyse(pat) assert analysis.constPaths.len == analysis.constValues.len for i, path in analysis.constPaths: @@ -334,24 +315,6 @@ func matches*(pat: Pattern; pr: Value): bool = true func capture*(pat: Pattern; pr: Value): seq[Value] = - runnableExamples: - import preserves - from syndicate/actors import Ref - from std/unittest import check, checkpoint - from syndicate/protocols/simpleChatProtocol import Says - from syndicate/protocols/dataspace import Observe - - type Observe = dataspace.Observe[Ref] - - let - pat = grab(Says, {0: grab("Mike"), 1: grab()}) - obs = Observe(pattern: pat) - obsPat = inject(grab(Observe(pattern: grabType(Says))), {0: grabLit()}) - obsVal = toPreserve(obs, Ref) - checkpoint "observer pattern: " & $obsPat - checkpoint "value: " & $obsVal - check capture(obsPat, obsVal) == @[toPreserve("Mike", Ref)] - let analysis = analyse(pat) assert analysis.constPaths.len == analysis.constValues.len for i, path in analysis.constPaths: diff --git a/src/syndicate/peers.nim b/src/syndicate/peers.nim deleted file mode 100644 index b0fc80f..0000000 --- a/src/syndicate/peers.nim +++ /dev/null @@ -1,91 +0,0 @@ -# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway -# SPDX-License-Identifier: Unlicense - -## Module for peering with remote dataspaces over network. - -import std/[asyncdispatch, net, options, tables] -import preserves -import ./actors, ./durings, ./relays, ./protocols/protocol - -import taps - -export `$` - -type - Turn = actors.Turn - Assertion = Preserve[Ref] - Value = Preserve[void] - -proc connectTcp(remote: RemoteSpecifier): Connection = - var transport = newTransportProperties() - transport.require("reliability") - transport.require("preserve-order") - var preConn = newPreConnection( - transport = some transport, - remote = @[remote]) - preconn.initiate() - -proc connectNet*(turn: var Turn; remote: RemoteSpecifier; cap: SturdyRef; bootProc: ConnectProc) = - let - facet = turn.facet - reenable = facet.preventInertCheck() - connectionClosedRef = newRef(turn, ShutdownEntity()) - conn = connectTcp(remote) - conn.onReady do (): - discard bootActor("net") do (turn: var Turn): - var shutdownRef: Ref - proc tapsWriter(pkt: sink Packet): Future[void] = - let fut = newFuture[void]("tapsWriter") - send(conn, encode(pkt)) - onSent(conn) do (ctx: MessageContext): - complete(fut) - onSendError(conn) do (ctx: MessageContext; reason: ref Exception): - fail(fut, reason) - fut - var ops = RelayActorOptions( - packetWriter: tapsWriter, - initialOid: 0.Oid.some) - let relayFut = spawnRelay("net", turn, ops) do (turn: var Turn; relay: Relay): - let facet = turn.facet - facet.actor.atExit do (turn: var Turn): - close(conn) - conn.onConnectionError do (reason: ref Exception): - terminate(facet, reason) - conn.onReceiveError do (ctx: MessageContext; reason: ref Exception): - terminate(facet, reason) - conn.onClosed do (): - run(facet) do (turn: var Turn): - stopActor(turn) - var wireBuf = newBufferedDecoder() - conn.onReceived do (buf: seq[byte]; ctx: MessageContext): - feed(wireBuf, buf) - var (success, pr) = decode(wireBuf) - if success: dispatch(relay, pr) - receive(conn) - receive(conn) - discard publish(turn, connectionClosedRef, true) - shutdownRef = newRef(turn, ShutdownEntity()) - relayFut.addCallback do (refFut: Future[Ref]): - let gatekeeper = read refFut - run(gatekeeper.relay) do (turn: var Turn): - reenable() - discard publish(turn, shutdownRef, true) - proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = - let facet = inFacet(turn) do (turn: var Turn): - bootProc(turn, unembed a) - proc action(turn: var Turn) = - stop(turn, facet) - result = action - var res = Resolve( - sturdyref: cap, - observer: newRef(turn, during(duringCallback))) - discard publish(turn, gatekeeper, res) - -proc connectNet*(turn: var Turn; host: string; port: Port; cap: SturdyRef; bootProc: ConnectProc) = - var remote = newRemoteEndpoint() - remote.with(port) - if isIpAddress host: - remote.with(parseIpAddress(host)) - else: - remote.withHostname(host) - connectNet(turn, remote, cap, bootProc) diff --git a/src/syndicate/protocols/dataspace.nim b/src/syndicate/protocols/dataspace.nim index 4add75d..f5a6278 100644 --- a/src/syndicate/protocols/dataspace.nim +++ b/src/syndicate/protocols/dataspace.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves, dataspacePatterns + preserves, dataspacePatterns type Observe*[Cap] {.preservesRecord: "Observe".} = ref object diff --git a/src/syndicate/protocols/dataspacePatterns.nim b/src/syndicate/protocols/dataspacePatterns.nim index f9d7994..e8d7d03 100644 --- a/src/syndicate/protocols/dataspacePatterns.nim +++ b/src/syndicate/protocols/dataspacePatterns.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves, std/tables + preserves, std/tables type AnyAtomKind* {.pure.} = enum diff --git a/src/syndicate/protocols/gatekeeper.nim b/src/syndicate/protocols/gatekeeper.nim index fa6d835..86aea9b 100644 --- a/src/syndicate/protocols/gatekeeper.nim +++ b/src/syndicate/protocols/gatekeeper.nim @@ -1,19 +1,108 @@ import - std/typetraits, preserves, sturdy + preserves type Bind*[Cap] {.preservesRecord: "bind".} = object - `oid`*: Preserve[Cap] - `key`*: seq[byte] + `description`*: Description[Cap] `target`*: Cap + `observer`*: BindObserver[Cap] - Resolve*[Cap] {.preservesRecord: "resolve".} = ref object - `sturdyref`*: sturdy.SturdyRef[Cap] + Route*[Cap] {.preservesRecord: "route".} = object + `transports`*: seq[Preserve[Cap]] + `pathSteps`* {.preservesTupleTail.}: seq[PathStep[Cap]] + + BindObserverKind* {.pure.} = enum + `present`, `absent` + BindObserverPresent*[Cap] = Cap + `BindObserver`*[Cap] {.preservesOr.} = object + case orKind*: BindObserverKind + of BindObserverKind.`present`: + `present`*: BindObserverPresent[Cap] + + of BindObserverKind.`absent`: + `absent`* {.preservesLiteral: "#f".}: bool + + + TransportConnection*[Cap] {.preservesRecord: "connect-transport".} = object + `addr`*: Preserve[Cap] + `control`*: Cap + `resolved`*: Resolved[Cap] + + Step*[Cap] = Preserve[Cap] + ResolvedPathStep*[Cap] {.preservesRecord: "path-step".} = object + `origin`*: Cap + `pathStep`*: PathStep[Cap] + `resolved`*: Resolved[Cap] + + BoundKind* {.pure.} = enum + `bound`, `Rejected` + BoundBound*[Cap] {.preservesRecord: "bound".} = object + `pathStep`*: PathStep[Cap] + + `Bound`*[Cap] {.preservesOr.} = object + case orKind*: BoundKind + of BoundKind.`bound`: + `bound`*: BoundBound[Cap] + + of BoundKind.`Rejected`: + `rejected`*: Rejected[Cap] + + + ForceDisconnect* {.preservesRecord: "force-disconnect".} = object + + Description*[Cap] = Preserve[Cap] + Rejected*[Cap] {.preservesRecord: "rejected".} = object + `detail`*: Preserve[Cap] + + Resolve*[Cap] {.preservesRecord: "resolve".} = object + `step`*: Step[Cap] `observer`*: Cap -proc `$`*[Cap](x: Bind[Cap] | Resolve[Cap]): string = + ResolvedKind* {.pure.} = enum + `accepted`, `Rejected` + ResolvedAccepted*[Cap] {.preservesRecord: "accepted".} = object + `responderSession`*: Cap + + `Resolved`*[Cap] {.preservesOr.} = object + case orKind*: ResolvedKind + of ResolvedKind.`accepted`: + `accepted`*: ResolvedAccepted[Cap] + + of ResolvedKind.`Rejected`: + `rejected`*: Rejected[Cap] + + + TransportControl* = ForceDisconnect + ResolvePath*[Cap] {.preservesRecord: "resolve-path".} = object + `route`*: Route[Cap] + `addr`*: Preserve[Cap] + `control`*: Cap + `resolved`*: Resolved[Cap] + + PathStep*[Cap] = Preserve[Cap] +proc `$`*[Cap](x: Bind[Cap] | Route[Cap] | BindObserver[Cap] | + TransportConnection[Cap] | + ResolvedPathStep[Cap] | + Bound[Cap] | + Rejected[Cap] | + Resolve[Cap] | + Resolved[Cap] | + ResolvePath[Cap]): string = `$`(toPreserve(x, Cap)) -proc encode*[Cap](x: Bind[Cap] | Resolve[Cap]): seq[byte] = +proc encode*[Cap](x: Bind[Cap] | Route[Cap] | BindObserver[Cap] | + TransportConnection[Cap] | + ResolvedPathStep[Cap] | + Bound[Cap] | + Rejected[Cap] | + Resolve[Cap] | + Resolved[Cap] | + ResolvePath[Cap]): seq[byte] = encode(toPreserve(x, Cap)) + +proc `$`*(x: ForceDisconnect | TransportControl): string = + `$`(toPreserve(x)) + +proc encode*(x: ForceDisconnect | TransportControl): seq[byte] = + encode(toPreserve(x)) diff --git a/src/syndicate/protocols/protocol.nim b/src/syndicate/protocols/protocol.nim index c8a022a..17f66c8 100644 --- a/src/syndicate/protocols/protocol.nim +++ b/src/syndicate/protocols/protocol.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves + preserves type Error* {.preservesRecord: "error".} = object @@ -18,9 +18,7 @@ type `assertion`*: Assertion `handle`*: Handle - Extension* {.preservesRecord: "label".} = object - `field0`*: seq[Preserve[void]] - + Extension* = Preserve[void] Sync* {.preservesRecord: "sync".} = object `peer`* {.preservesLiteral: "#!".}: tuple[] @@ -62,16 +60,13 @@ type `sync`*: Sync -proc `$`*(x: Error | Turn | Message | Retract | Assert | Extension | Sync | - TurnEvent | - Oid | +proc `$`*(x: Error | Turn | Message | Retract | Assert | Sync | TurnEvent | Oid | Handle | Packet | Event): string = `$`(toPreserve(x)) -proc encode*(x: Error | Turn | Message | Retract | Assert | Extension | Sync | - TurnEvent | +proc encode*(x: Error | Turn | Message | Retract | Assert | Sync | TurnEvent | Oid | Handle | Packet | diff --git a/src/syndicate/protocols/racketEvent.nim b/src/syndicate/protocols/racketEvent.nim deleted file mode 100644 index b3ce914..0000000 --- a/src/syndicate/protocols/racketEvent.nim +++ /dev/null @@ -1,14 +0,0 @@ - -import - std/typetraits, preserves - -type - RacketEvent* {.preservesRecord: "racket-event".} = object - `source`* {.preservesEmbedded.}: Preserve[void] - `event`* {.preservesEmbedded.}: Preserve[void] - -proc `$`*(x: RacketEvent): string = - `$`(toPreserve(x)) - -proc encode*(x: RacketEvent): seq[byte] = - encode(toPreserve(x)) diff --git a/src/syndicate/protocols/secureChatProtocol.nim b/src/syndicate/protocols/secureChatProtocol.nim deleted file mode 100644 index 83498c9..0000000 --- a/src/syndicate/protocols/secureChatProtocol.nim +++ /dev/null @@ -1,73 +0,0 @@ - -import - std/typetraits, preserves - -type - UserId* = BiggestInt - NickConflict* {.preservesRecord: "nickConflict".} = object - - NickClaimResponseKind* {.pure.} = enum - `true`, `NickConflict` - `NickClaimResponse`* {.preservesOr.} = object - case orKind*: NickClaimResponseKind - of NickClaimResponseKind.`true`: - `true`* {.preservesLiteral: "#t".}: bool - - of NickClaimResponseKind.`NickConflict`: - `nickconflict`*: NickConflict - - - Join*[Cap] {.preservesRecord: "joinedUser".} = object - `uid`*: UserId - `handle`*: Cap - - SessionKind* {.pure.} = enum - `observeUsers`, `observeSpeech`, `NickClaim`, `Says` - SessionObserveUsers*[Cap] {.preservesRecord: "Observe".} = object - `field0`* {.preservesLiteral: "user".}: tuple[] - `observer`*: Cap - - SessionObserveSpeech*[Cap] {.preservesRecord: "Observe".} = object - `field0`* {.preservesLiteral: "says".}: tuple[] - `observer`*: Cap - - `Session`*[Cap] {.preservesOr.} = object - case orKind*: SessionKind - of SessionKind.`observeUsers`: - `observeusers`*: SessionObserveUsers[Cap] - - of SessionKind.`observeSpeech`: - `observespeech`*: SessionObserveSpeech[Cap] - - of SessionKind.`NickClaim`: - `nickclaim`*: NickClaim[Cap] - - of SessionKind.`Says`: - `says`*: Says - - - UserInfo* {.preservesRecord: "user".} = object - `uid`*: UserId - `name`*: string - - NickClaim*[Cap] {.preservesRecord: "claimNick".} = object - `uid`*: UserId - `name`*: string - `k`*: Cap - - Says* {.preservesRecord: "says".} = object - `who`*: UserId - `what`*: string - -proc `$`*[Cap](x: Join[Cap] | Session[Cap] | NickClaim[Cap]): string = - `$`(toPreserve(x, Cap)) - -proc encode*[Cap](x: Join[Cap] | Session[Cap] | NickClaim[Cap]): seq[byte] = - encode(toPreserve(x, Cap)) - -proc `$`*(x: UserId | NickConflict | NickClaimResponse | UserInfo | Says): string = - `$`(toPreserve(x)) - -proc encode*(x: UserId | NickConflict | NickClaimResponse | UserInfo | Says): seq[ - byte] = - encode(toPreserve(x)) diff --git a/src/syndicate/protocols/service.nim b/src/syndicate/protocols/service.nim index 20c7ca9..e0496cd 100644 --- a/src/syndicate/protocols/service.nim +++ b/src/syndicate/protocols/service.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves + preserves type StateKind* {.pure.} = enum diff --git a/src/syndicate/protocols/simpleChatProtocol.nim b/src/syndicate/protocols/simpleChatProtocol.nim deleted file mode 100644 index c4ce95c..0000000 --- a/src/syndicate/protocols/simpleChatProtocol.nim +++ /dev/null @@ -1,17 +0,0 @@ - -import - std/typetraits, preserves - -type - Says* {.preservesRecord: "Says".} = object - `who`*: string - `what`*: string - - Present* {.preservesRecord: "Present".} = object - `username`*: string - -proc `$`*(x: Says | Present): string = - `$`(toPreserve(x)) - -proc encode*(x: Says | Present): seq[byte] = - encode(toPreserve(x)) diff --git a/src/syndicate/protocols/stream.nim b/src/syndicate/protocols/stream.nim index ca247db..d1b9784 100644 --- a/src/syndicate/protocols/stream.nim +++ b/src/syndicate/protocols/stream.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves + preserves type CreditAmountKind* {.pure.} = enum diff --git a/src/syndicate/protocols/sturdy.nim b/src/syndicate/protocols/sturdy.nim index ade3ee7..925c50c 100644 --- a/src/syndicate/protocols/sturdy.nim +++ b/src/syndicate/protocols/sturdy.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves, std/tables, std/tables + preserves, std/tables type PCompoundKind* {.pure.} = enum @@ -27,13 +27,44 @@ type `dict`*: PCompoundDict[Cap] + Reject*[Cap] {.preservesRecord: "reject".} = ref object + `pattern`*: Pattern[Cap] + + CaveatsFieldKind* {.pure.} = enum + `present`, `invalid`, `absent` + CaveatsFieldPresent*[Cap] {.preservesDictionary.} = ref object + `caveats`*: seq[Caveat[Cap]] + + CaveatsFieldInvalid*[Cap] {.preservesDictionary.} = object + `caveats`*: Preserve[Cap] + + CaveatsFieldAbsent* {.preservesDictionary.} = object + + `CaveatsField`*[Cap] {.preservesOr.} = ref object + case orKind*: CaveatsFieldKind + of CaveatsFieldKind.`present`: + `present`*: CaveatsFieldPresent[Cap] + + of CaveatsFieldKind.`invalid`: + `invalid`*: CaveatsFieldInvalid[Cap] + + of CaveatsFieldKind.`absent`: + `absent`*: CaveatsFieldAbsent + + + SturdyDescriptionDetail*[Cap] {.preservesDictionary.} = object + `oid`*: Preserve[Cap] + `key`*: seq[byte] + PAnd*[Cap] {.preservesRecord: "and".} = ref object `patterns`*: seq[Pattern[Cap]] + SturdyStepDetail*[Cap] = Parameters[Cap] Rewrite*[Cap] {.preservesRecord: "rewrite".} = ref object `pattern`*: Pattern[Cap] `template`*: Template[Cap] + Parameters*[Cap] = Preserve[Cap] TRef* {.preservesRecord: "ref".} = object `binding`*: BiggestInt @@ -67,10 +98,10 @@ type `dict`*: TCompoundDict[Cap] + SturdyPathStepDetail*[Cap] = Parameters[Cap] `PAtom`* {.preservesOr, pure.} = enum `Boolean`, `Float`, `Double`, `SignedInteger`, `String`, `ByteString`, `Symbol` - Attenuation*[Cap] = seq[Caveat[Cap]] PDiscard* {.preservesRecord: "_".} = object TemplateKind* {.pure.} = enum @@ -91,7 +122,8 @@ type CaveatKind* {.pure.} = enum - `Rewrite`, `Alts` + `Rewrite`, `Alts`, `Reject`, `unknown` + CaveatUnknown*[Cap] = Preserve[Cap] `Caveat`*[Cap] {.preservesOr.} = ref object case orKind*: CaveatKind of CaveatKind.`Rewrite`: @@ -100,14 +132,18 @@ type of CaveatKind.`Alts`: `alts`*: Alts[Cap] + of CaveatKind.`Reject`: + `reject`*: Reject[Cap] + + of CaveatKind.`unknown`: + `unknown`*: CaveatUnknown[Cap] + PNot*[Cap] {.preservesRecord: "not".} = ref object `pattern`*: Pattern[Cap] SturdyRef*[Cap] {.preservesRecord: "ref".} = ref object - `oid`*: Preserve[Cap] - `caveatChain`*: seq[Attenuation[Cap]] - `sig`*: seq[byte] + `parameters`*: Parameters[Cap] WireRefKind* {.pure.} = enum `mine`, `yours` @@ -131,7 +167,7 @@ type TAttenuate*[Cap] {.preservesRecord: "attenuate".} = ref object `template`*: Template[Cap] - `attenuation`*: Attenuation[Cap] + `attenuation`*: seq[Caveat[Cap]] Oid* = BiggestInt Alts*[Cap] {.preservesRecord: "or".} = ref object @@ -167,10 +203,13 @@ type `pcompound`*: PCompound[Cap] -proc `$`*[Cap](x: PCompound[Cap] | PAnd[Cap] | Rewrite[Cap] | PBind[Cap] | +proc `$`*[Cap](x: PCompound[Cap] | Reject[Cap] | CaveatsField[Cap] | + SturdyDescriptionDetail[Cap] | + PAnd[Cap] | + Rewrite[Cap] | + PBind[Cap] | Lit[Cap] | TCompound[Cap] | - Attenuation[Cap] | Template[Cap] | Caveat[Cap] | PNot[Cap] | @@ -181,10 +220,13 @@ proc `$`*[Cap](x: PCompound[Cap] | PAnd[Cap] | Rewrite[Cap] | PBind[Cap] | Pattern[Cap]): string = `$`(toPreserve(x, Cap)) -proc encode*[Cap](x: PCompound[Cap] | PAnd[Cap] | Rewrite[Cap] | PBind[Cap] | +proc encode*[Cap](x: PCompound[Cap] | Reject[Cap] | CaveatsField[Cap] | + SturdyDescriptionDetail[Cap] | + PAnd[Cap] | + Rewrite[Cap] | + PBind[Cap] | Lit[Cap] | TCompound[Cap] | - Attenuation[Cap] | Template[Cap] | Caveat[Cap] | PNot[Cap] | diff --git a/src/syndicate/protocols/tcp.nim b/src/syndicate/protocols/tcp.nim index 1604757..8cf1830 100644 --- a/src/syndicate/protocols/tcp.nim +++ b/src/syndicate/protocols/tcp.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves + preserves type TcpLocal* {.preservesRecord: "tcp-local".} = object diff --git a/src/syndicate/protocols/timer.nim b/src/syndicate/protocols/timer.nim index e3a40b0..13a051e 100644 --- a/src/syndicate/protocols/timer.nim +++ b/src/syndicate/protocols/timer.nim @@ -1,21 +1,21 @@ import - std/typetraits, preserves + preserves type TimerExpired* {.preservesRecord: "timer-expired".} = object `label`*: Preserve[void] - `msecs`*: float64 + `seconds`*: float64 SetTimer* {.preservesRecord: "set-timer".} = object `label`*: Preserve[void] - `msecs`*: float64 + `seconds`*: float64 `kind`*: TimerKind `TimerKind`* {.preservesOr, pure.} = enum `relative`, `absolute`, `clear` LaterThan* {.preservesRecord: "later-than".} = object - `msecs`*: float64 + `seconds`*: float64 proc `$`*(x: TimerExpired | SetTimer | LaterThan): string = `$`(toPreserve(x)) diff --git a/src/syndicate/protocols/trace.nim b/src/syndicate/protocols/trace.nim index c42d8db..ee0ddde 100644 --- a/src/syndicate/protocols/trace.nim +++ b/src/syndicate/protocols/trace.nim @@ -1,7 +1,6 @@ import - std/typetraits, preserves, protocol, protocol, protocol, protocol, protocol, - protocol + preserves, protocol type TargetedTurnEvent*[Cap] {.preservesRecord: "event".} = object diff --git a/src/syndicate/protocols/transportAddress.nim b/src/syndicate/protocols/transportAddress.nim index 3053178..25ec7fa 100644 --- a/src/syndicate/protocols/transportAddress.nim +++ b/src/syndicate/protocols/transportAddress.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves + preserves type WebSocket* {.preservesRecord: "ws".} = object diff --git a/src/syndicate/protocols/worker.nim b/src/syndicate/protocols/worker.nim index ea43609..c05e84b 100644 --- a/src/syndicate/protocols/worker.nim +++ b/src/syndicate/protocols/worker.nim @@ -1,6 +1,6 @@ import - std/typetraits, preserves + preserves type Instance*[Cap] {.preservesRecord: "Instance".} = object diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 396170a..ee7c204 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -3,7 +3,7 @@ import std/[asyncdispatch, options, streams, tables] import preserves -import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy] +import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy, transportAddress] when defined(traceSyndicate): when defined(posix): @@ -47,6 +47,7 @@ type e: WireSymbol RelayEntity = ref object of Entity + ## https://synit.org/book/protocol.html#relay-entities label: string relay: Relay @@ -72,13 +73,12 @@ method sync(se: SyncPeerEntity; t: var Turn; peer: Ref) = proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity = SyncPeerEntity(relay: r, peer: p) -proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef = +proc rewriteRefOut(relay: Relay; `ref`: Ref; exported: var seq[WireSymbol]): WireRef = if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay and `ref`.attenuation.len == 0: WireRef(orKind: WireRefKind.yours, yours: WireRefYours[void](oid: `ref`.target.oid)) else: var ws = grab(relay.exported, `ref`) if ws.isNil: - doAssert(not transient, "Cannot send transient reference") ws = newWireSymbol(relay.exported, relay.nextLocalOid, `ref`) inc relay.nextLocalOid exported.add ws @@ -86,20 +86,16 @@ proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[ orKind: WireRefKind.mine, mine: WireRefMine(oid: ws.oid)) -proc rewriteOut(relay: Relay; v: Assertion; transient: bool): +proc rewriteOut(relay: Relay; v: Assertion): tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} = var exported: seq[WireSymbol] result.rewritten = contract(v) do (r: Ref) -> Value: - rewriteRefOut(relay, r, transient, exported).toPreserve + rewriteRefOut(relay, r, exported).toPreserve result.exported = exported -proc register(relay: Relay; v: Assertion): Value = - rewriteOut(relay, v, false).rewritten - -proc register(relay: Relay; v: Assertion; h: Handle): Value = - var (rewritten, exported) = rewriteOut(relay, v, false) - relay.outboundAssertions[h] = exported - rewritten +proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] = + result = rewriteOut(relay, v) + relay.outboundAssertions[h] = result.exported proc deregister(relay: Relay; h: Handle) = var outbound: seq[WireSymbol] @@ -130,7 +126,7 @@ method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsaf re.send Event( orKind: EventKind.Assert, `assert`: protocol.Assert( - assertion: re.relay.register(a.value, h), + assertion: re.relay.register(a.value, h).rewritten, handle: h)) method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} = @@ -140,14 +136,16 @@ method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} = retract: Retract(handle: h)) method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} = - re.send Event(orKind: EventKind.Message, - message: Message(body: register(re.relay, msg.value))) + var (value, exported) = rewriteOut(re.relay, msg.value) + assert(len(exported) == 0, "cannot send a reference in a message") + if len(exported) == 0: + re.send Event(orKind: EventKind.Message, message: Message(body: value)) method sync(re: RelayEntity; turn: var Turn; peer: Ref) {.gcsafe.} = var peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] - discard rewriteRefOut(re.relay, turn.newRef(peerEntity), false, exported) + discard rewriteRefOut(re.relay, turn.newRef(peerEntity), exported) # TODO: discard? peerEntity.e = exported[0] re.send Event(orKind: EventKind.Sync) @@ -229,13 +227,20 @@ proc dispatch*(relay: Relay; v: Value) {.gcsafe.} = if fromPreserve(pkt, v): case pkt.orKind of PacketKind.Turn: + # https://synit.org/book/protocol.html#turn-packets for te in pkt.turn: - dispatch(relay, t, lookupLocal(relay, te.oid.Oid), te.event) + let r = lookupLocal(relay, te.oid.Oid) + if not r.isInert: + dispatch(relay, t, r, te.event) + else: + stderr.writeLine("discarding event for unknown Ref; ", te.event) of PacketKind.Error: + # https://synit.org/book/protocol.html#error-packets when defined(posix): stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")") close relay of PacketKind.Extension: + # https://synit.org/book/protocol.html#extension-packets discard else: when defined(posix): @@ -264,7 +269,7 @@ proc spawnRelay*(name: string; turn: var Turn; opts: RelayActorOptions; setup: R let relay = newRelay(turn, opts, setup) if not opts.initialRef.isNil: var exported: seq[WireSymbol] - discard rewriteRefOut(relay, opts.initialRef, false, exported) + discard rewriteRefOut(relay, opts.initialRef, exported) if opts.initialOid.isSome: var imported: seq[WireSymbol] var wr = WireRef( @@ -281,78 +286,101 @@ proc spawnRelay*(name: string; turn: var Turn; opts: RelayActorOptions; setup: R when defined(posix): import std/asyncnet - from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol + from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol -import protocols/gatekeeper +import protocols/[gatekeeper, sturdy] type ShutdownEntity* = ref object of Entity method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = stopActor(turn) -type - SturdyRef* = sturdy.SturdyRef[Ref] - Resolve* = gatekeeper.Resolve[Ref] - ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.} +type ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.} + +export Tcp when defined(posix): - proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) = - var socket = newAsyncSocket( - domain = AF_UNIX, - sockType = SOCK_STREAM, - protocol = cast[Protocol](0), - buffered = false) + export Unix + + proc connect*(turn: var Turn; socket: AsyncSocket; step: Preserve[Ref]; bootProc: ConnectProc) = proc socketWriter(packet: sink Packet): Future[void] = socket.send(cast[string](encode(packet))) const recvSize = 0x2000 var shutdownRef: Ref - let reenable = turn.facet.preventInertCheck() - let connectionClosedRef = newRef(turn, ShutdownEntity()) - var fut = newFuture[void]"connectUnix" - connectUnix(socket, path).addCallback do (f: Future[void]): - read f - discard bootActor("unix") do (turn: var Turn): - var ops = RelayActorOptions( - packetWriter: socketWriter, - initialOid: 0.Oid.some) - let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): - let facet = turn.facet - var wireBuf = newBufferedDecoder(0) - proc recvCb(pktFut: Future[string]) {.gcsafe.} = - if pktFut.failed: + let + reenable = turn.facet.preventInertCheck() + connectionClosedRef = newRef(turn, ShutdownEntity()) + fut = newFuture[void]"connect" + discard bootActor("socket") do (turn: var Turn): + var ops = RelayActorOptions( + packetWriter: socketWriter, + initialOid: 0.Oid.some) + let refFut = spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay): + let facet = turn.facet + var wireBuf = newBufferedDecoder(0) + proc recvCb(pktFut: Future[string]) {.gcsafe.} = + if pktFut.failed: + run(facet) do (turn: var Turn): stopActor(turn) + else: + var buf = pktFut.read + if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: - var buf = pktFut.read - if buf.len == 0: - run(facet) do (turn: var Turn): stopActor(turn) + feed(wireBuf, buf) + var (success, pr) = decode(wireBuf) + if success: + dispatch(relay, pr) + socket.recv(recvSize).addCallback(recvCb) + socket.recv(recvSize).addCallback(recvCb) + turn.facet.actor.atExit do (turn: var Turn): close(socket) + discard publish(turn, connectionClosedRef, true) + shutdownRef = newRef(turn, ShutdownEntity()) + addCallback(refFut) do (): + let gatekeeper = read refFut + run(gatekeeper.relay) do (turn: var Turn): + reenable() + discard publish(turn, shutdownRef, true) + proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = + let facet = inFacet(turn) do (turn: var Turn): + var + accepted: ResolvedAccepted[Ref] + rejected: Rejected[Ref] + if fromPreserve(accepted, a): + bootProc(turn, accepted.responderSession) + elif fromPreserve(rejected, a): + fail(fut, newException(CatchableError, $rejected.detail)) else: - feed(wireBuf, buf) - var (success, pr) = decode(wireBuf) - if success: - dispatch(relay, pr) - socket.recv(recvSize).addCallback(recvCb) - socket.recv(recvSize).addCallback(recvCb) - turn.facet.actor.atExit do (turn: var Turn): close(socket) - discard publish(turn, connectionClosedRef, true) - shutdownRef = newRef(turn, ShutdownEntity()) - relayFut.addCallback do (refFut: Future[Ref]): - let gatekeeper = read refFut - run(gatekeeper.relay) do (turn: var Turn): - reenable() - discard publish(turn, shutdownRef, true) - proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = - let facet = inFacet(turn) do (turn: var Turn): - bootProc(turn, unembed a) - proc action(turn: var Turn) = - stop(turn, facet) - result = action - var res = Resolve( - sturdyref: cap, - observer: newRef(turn, during(duringCallback))) - discard publish(turn, gatekeeper, res) - fut.complete() + fail(fut, newException(CatchableError, $a)) + proc action(turn: var Turn) = + stop(turn, facet) + result = action + discard publish(turn, gatekeeper, Resolve[Ref]( + step: step, + observer: newRef(turn, during(duringCallback)), + )) + fut.complete() asyncCheck(turn, fut) + proc connect*(turn: var Turn; transport: Tcp; step: Preserve[Ref]; bootProc: ConnectProc) = + let socket = newAsyncSocket( + domain = AF_INET, + sockType = SOCK_STREAM, + protocol = IPPROTO_TCP, + buffered = false) + let fut = connect(socket, transport.host, Port transport.port) + addCallback(fut, turn) do (turn: var Turn): + connect(turn, socket, step, bootProc) + + proc connect*(turn: var Turn; transport: Unix; step: Preserve[Ref]; bootProc: ConnectProc) = + let socket = newAsyncSocket( + domain = AF_UNIX, + sockType = SOCK_STREAM, + protocol = cast[Protocol](0), + buffered = false) + let fut = connectUnix(socket, transport.path) + addCallback(fut, turn) do (turn: var Turn): + connect(turn, socket, step, bootProc) + import std/asyncfile const stdinReadSize = 128 diff --git a/syndicate.nimble b/syndicate.nimble index ab25076..b1d5529 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20230517" +version = "20230518" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense" diff --git a/tests/stdio_client.nim b/tests/stdio_client.nim deleted file mode 100644 index 79f6585..0000000 --- a/tests/stdio_client.nim +++ /dev/null @@ -1,36 +0,0 @@ -# SPDX-FileCopyrightText: ☭ Emery Hemingway -# SPDX-License-Identifier: Unlicense - -import std/[asyncdispatch, random] -import preserves -import syndicate - -import syndicate/[actors, capabilities] - -randomize() - -type - A* {.preservesRecord: "A".} = object - str*: string - B* {.preservesRecord: "B".} = object - str*: string - -runActor("x") do (ds: Ref; turn: var Turn): - connectStdio(ds, turn) - - discard publish(turn, ds, A(str: "A stdio")) - discard publish(turn, ds, B(str: "B stdio")) - - onPublish(turn, ds, ?A) do (v: Assertion): - stderr.writeLine "received over stdio ", v - -bootDataspace("y") do (ds: Ref; turn: var Turn): - - connectUnix(turn, "/run/user/1000/dataspace", capabilities.mint()) do (turn: var Turn; a: Assertion) -> TurnAction: - let ds = unembed a - - discard publish(turn, ds, A(str: "A unix")) - discard publish(turn, ds, B(str: "B unix")) - - onPublish(turn, ds, ?B) do (v: Assertion): - stderr.writeLine "received over unix ", v diff --git a/tests/test_chat.nim b/tests/test_chat.nim index de56c1c..63f9e6e 100644 --- a/tests/test_chat.nim +++ b/tests/test_chat.nim @@ -22,6 +22,18 @@ proc readStdin(facet: Facet; ds: Ref; username: string) = readLine() readLine() +proc chat(turn: var Turn; ds: Ref; username: string) = + during(turn, ds, ?Present) do (who: string): + echo who, " joined" + do: + echo who, " left" + + onMessage(turn, ds, ?Says) do (who: string, what: string): + echo who, ": ", what + + discard publish(turn, ds, Present(username: username)) + readStdin(turn.facet, ds, username) + proc main = var transport: Preserve[void] @@ -41,20 +53,14 @@ proc main = if calledWithArguments: runActor("chat") do (root: Ref; turn: var Turn): - var unixAddr: transportAddress.Unix + var + unixAddr: transportAddress.Unix + tcpAddr: transportAddress.Tcp if fromPreserve(unixAddr, transport): - stderr.writeLine "connect to ", unixAddr, " with ", cap connect(turn, unixAddr, cap) do (turn: var Turn; ds: Ref): - - during(turn, ds, ?Present) do (who: string): - echo who, " joined" - do: - echo who, " left" - - onMessage(turn, ds, ?Says) do (who: string, what: string): - echo who, ": ", what - - discard publish(turn, ds, Present(username: username)) - readStdin(turn.facet, ds, username) + chat(turn, ds, username) + elif fromPreserve(tcpAddr, transport): + connect(turn, tcpAddr, cap) do (turn: var Turn; ds: Ref): + chat(turn, ds, username) main()