Compare commits

..

9 Commits

Author SHA1 Message Date
Emery Hemingway 15d2e8bfb4 Replace actor and relay API 2024-03-04 18:20:59 +00:00
Emery Hemingway eb5d4d9a57 Port relays to nim-sys 2024-03-04 18:20:29 +00:00
Emery Hemingway 01f26caf7b New timers actor 2024-03-01 14:26:36 +00:00
Emery Hemingway e31069e41a API renaming 2024-03-01 14:10:20 +00:00
Emery Hemingway fdf2994ec4 bags: export some table procs 2024-03-01 14:05:03 +00:00
Emery Hemingway 0cee6670c9 De-async actors 2024-03-01 14:04:47 +00:00
Emery Hemingway 1ce96560f4 Cleanup actors 2024-03-01 14:01:42 +00:00
Emery Hemingway d365a1e6e5 Remove gcsafe declarations
CPS doesn't give gcsafe assurances.
2024-03-01 14:00:31 +00:00
Emery Hemingway 3e5d910d1a Depend on cps and nim-sys 2024-03-01 13:57:48 +00:00
42 changed files with 581 additions and 2394 deletions

View File

@ -16,11 +16,11 @@
"packages": [
"cps"
],
"path": "/nix/store/m9vpcf3dq6z2h1xpi1vlw0ycxp91s5p7-source",
"rev": "2a4d771a715ba45cfba3a82fa625ae7ad6591c8b",
"sha256": "0c62k5wpq9z9mn8cd4rm8jjc4z0xmnak4piyj5dsfbyj6sbdw2bf",
"path": "/nix/store/452hfhasrn3gl6vijfmzs69djl099j0j-source",
"rev": "b7c179f172e3a256a482a9daee3c0815ea423206",
"sha256": "1sn9s7iv83sw1jl5jgi2h7b0xpgsn13f9icp5124jvbp0qkxskx2",
"srcDir": "",
"url": "https://github.com/nim-works/cps/archive/2a4d771a715ba45cfba3a82fa625ae7ad6591c8b.tar.gz"
"url": "https://github.com/nim-works/cps/archive/b7c179f172e3a256a482a9daee3c0815ea423206.tar.gz"
},
{
"method": "fetchzip",
@ -82,11 +82,11 @@
"packages": [
"sys"
],
"path": "/nix/store/ayplzmq7xdzrp3n6ly6dnskf5c5aiihp-source",
"rev": "3b86a5083a4aa178994fe4ffdc046d340aa13b32",
"sha256": "0qz9hag7synp8sx2b6caazm2kidvd0lv2p0h98sslkyzaf4icnal",
"path": "/nix/store/syhxsjlsdqfap0hk4qp3s6kayk8cqknd-source",
"rev": "4ef3b624db86e331ba334e705c1aa235d55b05e1",
"sha256": "1q4qgw4an4mmmcbx48l6xk1jig1vc8p9cq9dbx39kpnb0890j32q",
"srcDir": "src",
"url": "https://github.com/alaviss/nim-sys/archive/3b86a5083a4aa178994fe4ffdc046d340aa13b32.tar.gz"
"url": "https://github.com/ehmry/nim-sys/archive/4ef3b624db86e331ba334e705c1aa235d55b05e1.tar.gz"
}
]
}

2
src/Tupfile Normal file
View File

@ -0,0 +1,2 @@
include_rules
: foreach *.nim |> !nim_check |>

View File

@ -1,677 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
# Use procedures to call into SAM,
# use continuations to call within SAM.
import std/[deques, hashes, options, sets, tables, times]
import pkg/cps
import preserves
import ./protocols/[protocol, sturdy]
# const traceSyndicate {.booldefine.}: bool = true
const traceSyndicate* = true
when traceSyndicate:
import std/streams
from std/os import getEnv
import ./protocols/trace
export protocol.Handle
type
Cont* = ref object of Continuation
turn: Turn
PublishProc* = proc (e: Entity; v: Value; h: Handle) {.cps: Cont.}
RetractProc* = proc (e: Entity; h: Handle) {.cps: Cont.}
MessageProc* = proc (e: Entity; v: Value) {.cps: Cont.}
SyncProc* = proc (e: Entity; peer: Cap) {.cps: Cont.}
Handler* = proc() {.closure.}
HandlerDeque = seq[ContinuationProc[Continuation]]
FacetState = enum fFresh, fRunning, fEnded
Callback* = proc () {.closure.}
OutboundTable = Table[Handle, OutboundAssertion]
OutboundAssertion = ref object
handle: Handle
peer: Cap
established: bool
Facet* = ref object
## https://synit.org/book/glossary.html#facet
actor: Actor
parent: Facet
children: seq[Facet]
outbound: OutboundTable
stopHandlers: HandlerDeque
stopCallbacks: seq[Callback]
state: FacetState
id: FacetId
FacetProc* = proc (f: Facet) {.closure.}
## Type for callbacks to be called within a turn.
## The `Facet` parameter is the owning facet.
Turn = ref object
## https://synit.org/book/glossary.html#turn
actor: Actor
work: Dequeue[Cont]
actions: seq[Cont]
event: Option[protocol.Event]
rollback: bool
when traceSyndicate:
desc: TurnDescription
Entity* = ref object of RootObj
## https://synit.org/book/glossary.html#entity
publishImpl: PublishProc
retractImpl: RetractProc
messageImpl: MessageProc
syncImpl: SyncProc
facet*: Facet
# This implementation associates Entities to
# Facets, which is not to be taken as a SAM
# axiom.
oid*: sturdy.Oid # oid is how Entities are identified over the wire
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
target*: Entity
attenuation*: seq[sturdy.Caveat]
Actor* = ref object
## https://synit.org/book/glossary.html#actor
# TODO: run on a seperate thread.
# crashHandlers: HandlerDeque
root: Facet
handleAllocator: Handle
facetIdAllocator: int
id: ActorId
when traceSyndicate:
traceStream: FileStream
stopped: bool
var turnQueue {.threadvar, used.}: Deque[Turn]
proc queueTurn(turn: sink Turn) =
turnQueue.addLast(turn)
proc turnsPending*(): bool =
turnQueue.len > 0
template turnWork*(prc: typed): untyped =
## Pragma to mark work that executes in a `Turn` context.
cps(Cont, prc)
proc activeTurn(c: Cont): Turn {.cpsVoodoo.} =
## Return the active `Turn` within a turn context.
assert not c.turn.isNil
c.turn
proc activeFacet(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a turn context.
assert not c.turn.isNil
assert not c.turn.facet.isNil
c.turn.facet
using
actor: Actor
facet: Facet
entity: Entity
cap: Cap
turn: Turn
proc `$`*(facet): string = $facet.id
proc `$`*(cap): string = "#:" & $cast[uint](cap.unsafeAddr)
proc hash*(x: Actor|Facet|Cap): Hash = x.unsafeAddr.hash
proc relay*(cap): Facet =
assert not cap.target.facet.isNil
cap.target.facet
proc collectPath(result: var seq[FacetId]; facet) =
if not facet.parent.isNil:
collectPath(result, facet.parent)
result.add(facet.id)
proc stopped*(facet): bool = facet.state != fRunning
when traceSyndicate:
proc traceFlush(actor) =
if not actor.traceStream.isNil:
actor.traceStream.flush()
proc trace(actor; act: ActorActivation) =
if not actor.traceStream.isNil:
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: actor.id,
item: act,
)
actor.traceStream.writeLine($entry.toPreserves)
proc traceTurn(actor) =
if not actor.traceStream.isNil:
actor.trace(ActorActivation(
orKind: ActorActivationKind.turn,
turn: actor.turn.desc,
))
actor.traceFlush()
reset actor.turn.desc
proc traceTarget(facet): trace.Target =
Target(
actor: facet.actor.id,
facet: facet.id,
)
proc traceTarget(cap): trace.Target =
let facet = cap.relay
Target(
actor: facet.actor.id,
facet: facet.id,
oid: cap.target.oid.toPreserves,
)
proc traceEnqueue(actor; e: TargetedTurnEvent) =
actor.turn.desc.actions.add ActionDescription(
orKind: ActionDescriptionKind.enqueue,
enqueue: ActionDescriptionEnqueue(event: e),
)
proc traceDequeue(actor; e: TargetedTurnEvent) =
actor.turn.desc.actions.add ActionDescription(
orKind: ActionDescriptionKind.dequeue,
dequeue: ActionDescriptionDequeue(event: e),
)
proc pass*(a, b: Cont): Cont =
assert not a.facet.isNil
b.facet = a.facet
b
proc queueWork*(facet; c: Cont) =
c.facet = facet
facet.actor.turn.work.addLast(c)
proc queueAction*(facet; c: Cont) =
c.facet = facet
facet.actor.turn.actions.add(c)
proc yieldWork(c: Cont): Cont {.cpsMagic.} =
## Suspend and enqueue the caller until later in the turn.
assert not c.facet.isNil
c.facet.queueWork(c)
nil
proc yieldToActions(c: Cont): Cont {.cpsMagic.} =
assert not c.facet.isNil
c.facet.actor.turn.actions.add(c)
nil
proc terminate(actor; err: ref Exception) =
when traceSyndicate:
actor.traceTurn()
raise err
proc terminate(facet; err: ref Exception) =
terminate(facet.actor, err)
proc complete(c: Cont) =
var c = c
try:
while not c.isNil and not c.fn.isNil:
var y = c.fn
var x = y(c)
c = Cont(x)
except CatchableError as err:
c.facet.actor.turn.rollback = true
if not c.dismissed:
writeStackFrames c
terminate(c.facet, err)
proc run*(actor): bool =
if actor.stopped: return
try:
result = actor.turn.work.len > 0
while actor.turn.work.len > 0:
actor.turn.work.popFirst().complete()
var i: int
while i < actor.turn.actions.len:
complete(move actor.turn.actions[i])
inc i
actor.turn.actions.setLen(0)
when traceSyndicate:
actor.traceTurn()
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
for child in actor.children:
result = result and run(child)
except Exception as err:
actor.terminate(err)
proc start(actor; cont: Cont) =
when traceSyndicate:
var act = ActorActivation(orkind: ActorActivationKind.start)
trace(actor, act)
actor.root.state = fRunning
actor.root.startExternalTurn()
actor.root.queueWork(cont)
proc stop*(actor)
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
c.fn = facet.stopHandlers.pop()
result = c
proc runNextFacetStop() {.cps: Cont.} =
activeFacet().runNextStop()
proc stop(facet; reason: FacetStopReason) =
let actor = facet.actor
while facet.stopHandlers.len > 0:
var c = whelp runNextFacetStop()
c.facet = facet
complete(c)
while facet.stopCallbacks.len > 0:
var cb = facet.stopCallbacks.pop()
cb()
while facet.children.len > 0:
stop(facet.children.pop(), FacetStopReason.parentStopping)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
collectPath(act.facetstop.path, facet)
act.facetStop.reason = reason
actor.turn.desc.actions.add act
if facet.parent.isNil:
actor.root = nil
stop(actor)
proc stop*(actor) =
if not actor.root.isNil:
stop(actor.root, FacetStopReason.actorStopping)
actor.stopped = true
proc stopped*(actor): bool =
actor.`stopped`
# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} =
# stop(c.facet, reason)
proc stopActorAction() {.cps: Cont.} =
yieldToActions()
activeFacet().actor.stop()
proc stopActor*(facet) =
let c = whelp stopActorAction()
c.facet = facet
facet.actor.turn.actions.add(c)
proc stopActor*(cap) =
## Stop an `Actor` from a `Cap`.
stopActor(cap.relay)
type
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
proc newCap*(facet; entity): Cap =
doAssert entity.facet.isNil
entity.facet = facet
Cap(target: entity)
proc nextHandle(facet: Facet): Handle =
inc(facet.actor.handleAllocator)
facet.actor.handleAllocator
proc actor(cap): Actor = cap.relay.actor
type Bindings = Table[Value, Value]
proc attenuate(cap; att: seq[Caveat]): Cap =
if att.len == 0: cap
else: Cap(
target: cap.target,
attenuation: att & cap.attenuation)
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
case p.orKind
of PatternKind.Pdiscard: result = true
of PatternKind.Patom:
result = case p.patom
of PAtom.Boolean: v.isBoolean
of PAtom.Double: v.isDouble
of PAtom.Signedinteger: v.isInteger
of PAtom.String: v.isString
of PAtom.Bytestring: v.isByteString
of PAtom.Symbol: v.isSymbol
of PatternKind.Pembedded:
result = v.isEmbedded
of PatternKind.Pbind:
if match(bindings, p.pbind.pattern, v):
bindings[p.pbind.pattern.toPreserves] = v
result = true
of PatternKind.Pand:
for pp in p.pand.patterns:
result = match(bindings, pp, v)
if not result: break
of PatternKind.Pnot:
var b: Bindings
result = not match(b, p.pnot.pattern, v)
of PatternKind.Lit:
result = p.lit.value == v
of PatternKind.PCompound:
case p.pcompound.orKind
of PCompoundKind.rec:
if v.isRecord and
p.pcompound.rec.label == v.label and
p.pcompound.rec.fields.len == v.arity:
result = true
for i, pp in p.pcompound.rec.fields:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.arr:
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
result = true
for i, pp in p.pcompound.arr.items:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.dict:
if v.isDictionary:
result = true
for key, pp in p.pcompound.dict.entries:
let vv = step(v, key)
if vv.isNone or not match(bindings, pp, get vv):
result = true
break
proc match(p: Pattern; v: Value): Option[Bindings] =
var b: Bindings
if match(b, p, v):
result = some b
proc instantiate(t: Template; bindings: Bindings): Value =
case t.orKind
of TemplateKind.Tattenuate:
let v = instantiate(t.tattenuate.template, bindings)
let cap = v.unembed(Cap)
if cap.isNone:
raise newException(ValueError, "Attempt to attenuate non-capability")
result = attenuate(get cap, t.tattenuate.attenuation).embed
of TemplateKind.TRef:
let n = $t.tref.binding.int
try: result = bindings[n.toPreserves]
except KeyError:
raise newException(ValueError, "unbound reference: " & n)
of TemplateKind.Lit:
result = t.lit.value
of TemplateKind.Tcompound:
case t.tcompound.orKind
of TCompoundKind.rec:
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
for i, tt in t.tcompound.rec.fields:
result[i] = instantiate(tt, bindings)
of TCompoundKind.arr:
result = initSequence(t.tcompound.arr.items.len)
for i, tt in t.tcompound.arr.items:
result[i] = instantiate(tt, bindings)
of TCompoundKind.dict:
result = initDictionary()
for key, tt in t.tcompound.dict.entries:
result[key] = instantiate(tt, bindings)
proc rewrite(r: Rewrite; v: Value): Value =
let bindings = match(r.pattern, v)
if bindings.isSome:
result = instantiate(r.template, get bindings)
proc examineAlternatives(cav: Caveat; v: Value): Value =
case cav.orKind
of CaveatKind.Rewrite:
result = rewrite(cav.rewrite, v)
of CaveatKind.Alts:
for r in cav.alts.alternatives:
result = rewrite(r, v)
if not result.isFalse: break
of CaveatKind.Reject: discard
of CaveatKind.unknown: discard
proc runRewrites(v: Value; a: openarray[Caveat]): Value =
result = v
for stage in a:
result = examineAlternatives(stage, result)
if result.isFalse: break
proc setActions*(entity;
publish = PublishProc();
retract = RetractProc();
message = MessageProc();
sync = SyncProc();
): Entity {.discardable} =
## Set the action handlers for an `Entity`.
result = entity
result.publishImpl = publish
result.retractImpl = retract
result.messageImpl = message
result.syncImpl = sync
proc publish(c: Cont; e: Entity; v: Value; h: Handle): Cont {.cpsMagic.} =
if not e.publishImpl.fn.isNil:
result = pass(c, e.publishImpl.call(e, v, h))
proc retract(c: Cont; e: Entity; h: Handle): Cont {.cpsMagic.} =
if not e.retractImpl.fn.isNil:
result = pass(c, e.retractImpl.call(e, h))
proc message(c: Cont; e: Entity; v: Value): Cont {.cpsMagic.} =
if not e.messageImpl.fn.isNil:
result = pass(c, e.messageImpl.call(e, v))
proc sync(c: Cont; e: Entity; p: Cap): Cont {.cpsMagic.} =
if not e.syncImpl.fn.isNil:
result = pass(c, e.syncImpl.call(e, p))
proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
traceEvent.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
traceEvent.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: h,
)
traceEvent.detail.assert.assertion.value.value = val
cap.actor.traceEnqueue(traceEvent)
cap.relay.outbound[h] = OutboundAssertion(handle: h, peer: cap)
yieldToActions()
if activeTurn().rollback:
cap.relay.outbound.del(h)
else:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.publish(val, h)
cap.relay.outbound[h].established = true
proc turnRetract(cap: Cap; h: Handle) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
)
traceEvent.detail.retract.handle = h
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
var e: OutboundAssertion
if cap.relay.outbound.pop(h, e):
cap.target.retract(h)
proc turnMessage(cap: Cap; val: Value) {.turnWork.} =
var val = runRewrites(val, cap.attenuation)
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
)
traceEvent.detail.message.body.value.value = val
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.message(val)
proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
)
traceEvent.detail.sync.peer = peer.traceTarget
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.sync(peer)
proc publish*(cap; val: Value): Handle =
## Publish a `Value` to a `Cap` returning `Handle`
## for later retraction.
# The publish action on an Entity always goes through
# here first.
var val = runRewrites(val, cap.attenuation)
# TODO: attenuation to nothing?
result = cap.relay.nextHandle()
cap.relay.queueWork(whelp turnPublish(cap, val, result))
proc publish*[T](cap; x: T): Handle =
## Publish Preserves-convertable value to a `Cap`
## returning `Handle` for later retraction.
publish(cap, x.toPreserves)
proc retract*(cap; h: Handle) =
## Retract a `Handle` from a `Cap`.
cap.relay.queueWork(whelp turnRetract(cap, h))
proc message*(cap; val: Value) =
var val = runRewrites(val, cap.attenuation)
cap.relay.queueWork(whelp turnMessage(cap, val))
proc message*[T](cap; x: T) =
message(cap, x.toPreserves)
proc sync*(cap, peer: Cap) =
cap.relay.queueWork(whelp turnSync(cap, peer))
proc installStopHook(c: Cont, facet: Facet): Cont {.cpsMagic.} =
facet.stopHandlers.add(c.fn)
return c
proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} =
c.facet.stopCallbacks.add(cb)
result = c
proc onStop*(facet; cb: Callback) =
facet.stopCallbacks.add(cb)
proc facetCall(prc: FacetProc; f: Facet) {.cps: Cont.} =
prc(f)
proc facetCall(prc: FacetProc) {.cps: Cont.} =
prc(activeFacet())
proc workCall(cb: Callback) {.cps: Cont.} =
cb()
proc god(facet): Actor =
## Return the parent of all actors associated with `facet`.
var facet = facet
while not facet.parent.isNil:
facet = facet.parent
facet.actor
proc newExternalTurn(facet; desc: Value) =
result = Turn(facet: facet)
let actor = facet.actor
when traceSyndicate:
result.desc.cause = TurnCause(orKind: TurnCauseKind.external))
result.desc.cause.external description = desc
proc runExternalTurn*(facet: Facet; cb: proc ()) =
echo "startExternalTurn"
startExternalTurn(facet)
facet.queueWork(whelp workCall(cb))
while run(facet.actor): discard
echo "runExternalTurn finished"
assert facet.actor.turn.work.len == 0
assert facet.actor.turn.actions.len == 0
proc newFacet(turn: Turn; actor; parent: Facet): Facet =
inc(actor.facetIdAllocator)
result = Facet(
actor: actor,
parent: parent,
id: actor.facetIdAllocator.toPreserves,
)
if not parent.isNil:
parent.children.add result
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
collectPath(act.facetstart.path, result)
actor.turn.desc.actions.add act
proc newActor(parent: Facet; name: string): Actor =
result = Actor(id: name.toPreserves)
result.root = newFacet(result, parent)
when traceSyndicate:
if parent.isNil:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": discard
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
else:
result.traceStream = parent.actor.traceStream
proc spawnActor*(parent: Facet; name: string; bootProc: FacetProc): Actor {.discardable.} =
## Spawn a new `Actor` at `parent`.
result = newActor(parent, name)
var turn = newExternalTurn(result.root, "bootActor".toPreserves)
turn.queueWork(whelp facetCall(bootProc))
queueTurn(turn)
proc bootActor*(name: string; bootProc: FacetProc): Actor =
## Boot a new `Actor`.
result = newActor(nil, name)
var turn = newExternalTurn(result.root, "bootActor".toPreserves)
turn.queueWork(whelp facetCall(bootProc))
queueTurn(turn)
proc run(turn: sink Turn) =
try:
while turn.work.len > 0:
discard trampoline turn.work.pop()
except CatchableError as err:
terminate turn.actor(err)
return
var i: int
while i < turn.actions.len:
discard trampoline turn.actions[i]
proc runTurn*() =
## Run one turn.
turnQueue.pop().run()

View File

@ -1,52 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, options, tables]
import pkg/cps
import preserves
import ./[actors, patterns, skeletons]
from ./protocols/protocol import Handle
from ./protocols/dataspace import Observe
type
Dataspace {.final.} = ref object of Entity
index: Index
handleMap: Table[Handle, Value]
proc dsPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
var ds = Dataspace(e)
if ds.index.add(v):
var obs = v.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.add(obs.get.pattern, Cap(obs.get.observer))
ds.handleMap[h] = v
proc dsRetract(e: Entity; h: Handle) {.cps: Cont.} =
var ds = Dataspace(e)
var v = ds.handleMap[h]
if ds.index.remove(v):
ds.handleMap.del h
var obs = v.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.remove(obs.get.pattern, Cap(obs.get.observer))
proc dsMessage(e: Entity; v: Value) {.cps: Cont.} =
var ds = Dataspace(e)
ds.index.deliverMessage(v)
proc newDataspace*(f: Facet): Cap =
var ds = Dataspace(
index: initIndex(),
).setActions(
publish = whelp dsPublish,
retract = whelp dsRetract,
message = whelp dsMessage,
)
newCap(f, ds)
proc observe*(cap: Cap; pat: Pattern; peer: Cap): Handle =
publish(cap, Observe(pattern: pat, observer: peer))
proc observe*(cap: Cap; pat: Pattern; e: Entity): Handle =
observe(cap, pat, newCap(cap.relay, e))

View File

@ -1,49 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, tables]
import preserves
import pkg/cps
import ./[actors, patterns]
type
DuringProc* = proc (f: Facet; a: Value; h: Handle): FacetProc {.closure.}
DuringActionKind = enum null, dead, act
DuringAction = object
case kind: DuringActionKind
of null, dead: discard
of act:
retractProc: FacetProc
DuringEntity {.final.}= ref object of Entity
publishProc: DuringProc
assertionMap: Table[Handle, DuringAction]
proc duringPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
var de = DuringEntity(e)
let g = de.assertionMap.getOrDefault h
case g.kind
of null, dead:
var cb = de.publishProc(e.facet, v, h)
de.assertionMap[h] = DuringAction(kind: act, retractProc: cb)
of act:
raiseAssert("during: duplicate handle in publish: " & $h)
proc duringRetract(e: Entity; h: Handle) {.cps: Cont.} =
var de = DuringEntity(e)
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
de.assertionMap[h] = DuringAction(kind: dead)
of dead:
raiseAssert("during: duplicate handle in retract: " & $h)
of act:
de.assertionMap.del h
if not g.retractProc.isNil:
g.retractProc(de.facet)
proc during*(cb: DuringProc): Entity =
## TODO: this doesn't follow Nim idom well.
DuringEntity(publishProc: cb).setActions(
publish = whelp duringPublish,
retract = whelp duringRetract,
)

File diff suppressed because one or more lines are too long

View File

@ -1,9 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import preserves
import ./actors, ./patterns, ./protocols/dataspace
proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Cap {.discardable.} =
result = newCap(turn, e)
publish(turn, ds, Observe(pattern: pat, observer: result))

View File

@ -1,19 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import ./protocols/[protocol, trace]
export trace
proc traceAction*(e: protocol.Event): trace.TurnEvent =
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: result,
)
act.enqueue.event.detail.assert.assertion.value.value = val
turn.desc.actions.add act

View File

@ -4,14 +4,13 @@
## This module implements the `Syndicate DSL <https://syndicate-lang.org/doc/syndicate/>`_.
import std/[macros, tables, typetraits]
import pkg/cps
# import pkg/sys/ioqueue
import pkg/sys/ioqueue
import preserves
export preserves
export fromPreserves, toPreserves
import ./[actors, dataspaces, durings, patterns]
import ./protocols/dataspace
import ./syndicate/[actors, dataspaces, durings, patterns]
import ./syndicate/protocols/dataspace
export actors, dataspace, dataspaces, patterns
@ -36,26 +35,22 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline.
patterns.inject(pat, bindings)
type
PublishProc = proc (v: Value; h: Handle) {.closure.}
RetractProc = proc (h: Handle) {.closure.}
MessageProc = proc (v: Value) {.closure.}
PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure.}
RetractProc = proc (turn: var Turn; h: Handle) {.closure.}
MessageProc = proc (turn: var Turn; v: Value) {.closure.}
ClosureEntity = ref object of Entity
publishCb*: PublishProc
retractCb*: RetractProc
messageCb*: MessageProc
publishImpl: PublishProc
retractImpl: RetractProc
messageImpl: MessageProc
proc publishCont(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
var ce = ClosureEntity(e)
if not ce.publishCb.isNil: ce.publishCb(v, h)
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) =
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
proc retractCont(e: Entity; h: Handle) {.cps: Cont.} =
var ce = ClosureEntity(e)
if not ce.retractCb.isNil: ce.retractCb(h)
method retract(e: ClosureEntity; turn: var Turn; h: Handle) =
if not e.retractImpl.isNil: e.retractImpl(turn, h)
proc messageCont(e: Entity; v: Value) {.cps: Cont.} =
var ce = ClosureEntity(e)
if not ce.messageCb.isNil: ce.messageCb(v)
method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) =
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
proc argumentCount(handler: NimNode): int =
handler.expectKind {nnkDo, nnkStmtList}
@ -91,7 +86,7 @@ proc generateHandlerNodes(handler: NimNode): HandlerNodes =
result.varSection = newNimNode(nnkVarSection, handler).
add(newIdentDefs(result.valuesSym, valuesTuple))
proc wrapPublishHandler(handler: NimNode): NimNode =
proc wrapPublishHandler(turn, handler: NimNode): NimNode =
var
(valuesSym, varSection, publishBody) =
generateHandlerNodes(handler)
@ -99,24 +94,24 @@ proc wrapPublishHandler(handler: NimNode): NimNode =
handlerSym = genSym(nskProc, "publish")
bindingsSym = ident"bindings"
quote do:
proc `handlerSym`(`bindingsSym`: Value; `handleSym`: Handle) =
proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle) =
`varSection`
if fromPreserves(`valuesSym`, bindings):
`publishBody`
proc wrapMessageHandler(handler: NimNode): NimNode =
proc wrapMessageHandler(turn, handler: NimNode): NimNode =
var
(valuesSym, varSection, body) =
generateHandlerNodes(handler)
handlerSym = genSym(nskProc, "message")
bindingsSym = ident"bindings"
quote do:
proc `handlerSym`(`bindingsSym`: Value) =
proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value) =
`varSection`
if fromPreserves(`valuesSym`, bindings):
`body`
proc wrapDuringHandler(entryBody, exitBody: NimNode): NimNode =
proc wrapDuringHandler(turn, entryBody, exitBody: NimNode): NimNode =
var
(valuesSym, varSection, publishBody) =
generateHandlerNodes(entryBody)
@ -125,54 +120,46 @@ proc wrapDuringHandler(entryBody, exitBody: NimNode): NimNode =
duringSym = genSym(nskProc, "during")
if exitBody.isNil:
quote do:
proc `duringSym`(f: Facet; `bindingsSym`: Value; `handleSym`: Handle): FacetProc =
proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
`varSection`
if fromPreserves(`valuesSym`, `bindingsSym`):
`publishBody`
else:
quote do:
proc `duringSym`(f: Facet; `bindingsSym`: Value; `handleSym`: Handle): FacetProc =
proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
`varSection`
if fromPreserves(`valuesSym`, `bindingsSym`):
`publishBody`
result = proc (facet: Facet) =
proc action(`turn`: var Turn) =
`exitBody`
result = action
macro onPublish*(cap: Cap; pattern: Pattern; handler: untyped) =
## Call `handler` when an assertion matching `pattern` is published at `cap`.
macro onPublish*(turn: untyped; ds: Cap; pattern: Pattern; handler: untyped) =
## Call `handler` when an assertion matching `pattern` is published at `ds`.
let
argCount = argumentCount(handler)
handlerProc = wrapPublishHandler(handler)
handlerProc = wrapPublishHandler(turn, handler)
handlerSym = handlerProc[0]
result = quote do:
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`handlerProc`
var ce = ClosureEntity(
publishCb: `handlerSym`,
).setActions(
publish = whelp publishCont,
)
discard observe(`cap`, `pattern`, ce)
discard observe(`turn`, `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`))
macro onMessage*(cap: Cap; pattern: Pattern; handler: untyped) =
## Call `handler` when an message matching `pattern` is broadcasted at `cap`.
macro onMessage*(turn: untyped; ds: Cap; pattern: Pattern; handler: untyped) =
## Call `handler` when an message matching `pattern` is broadcasted at `ds`.
let
argCount = argumentCount(handler)
handlerProc = wrapMessageHandler(handler)
handlerProc = wrapMessageHandler(turn, handler)
handlerSym = handlerProc[0]
result = quote do:
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`handlerProc`
discard observe(`cap`, `pattern`, ClosureEntity(
messageImpl: whelp messageCont,
messageCb: `handlerSym`,
name: "ClosureEntity",
))
discard observe(`turn`, `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`))
macro during*(cap: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
## Call `publishBody` when an assertion matching `pattern` is published to `cap` and
macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
## Call `publishBody` when an assertion matching `pattern` is published to `ds` and
## call `retractBody` on retraction. Assertions that match `pattern` but are not
## convertable to the arguments of `publishBody` are silently discarded.
##
@ -181,47 +168,27 @@ macro during*(cap: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
## - `duringHandle` - dataspace handle of the assertion that triggered `publishBody`
let
argCount = argumentCount(publishBody)
callbackProc = wrapDuringHandler(publishBody, retractBody)
callbackProc = wrapDuringHandler(turn, publishBody, retractBody)
callbackSym = callbackProc[0]
result = quote do:
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`callbackProc`
discard observe(`cap`, `pattern`, during(`callbackSym`))
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
macro during*(cap: Cap; pattern: Pattern; publishBody: untyped) =
macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
## Variant of `during` without a retract body.
let
`argCount` = argumentCount(publishBody)
callbackProc = wrapDuringHandler(publishBody, nil)
callbackProc = wrapDuringHandler(turn, publishBody, nil)
callbackSym = callbackProc[0]
result = quote do:
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`callbackProc`
discard observe(`cap`, `pattern`, during(`callbackSym`))
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
proc wrapHandler(body: NimNode; ident: string): NimNode =
var sym = genSym(nskProc, ident)
quote do:
proc `sym`() =
`body`
#[
macro onStop*(facet: Facet; body: untyped) =
let
handlerDef = wrapHandler(body, "onStop")
handlerSym = handlerDef[0]
result = quote do:
`handlerDef`
addOnStopHandler(facet, `handlerSym`)
macro onStop*(body: untyped) =
quote do:
block:
let facet = activeFacet()
facet.installStopHook()
if facet.stopped():
`body`
return
]#
proc runActor*(name: string; bootProc: TurnAction) =
## Boot an actor `Actor` and churn ioqueue once.
let actor = bootActor(name, bootProc)
ioqueue.run()

View File

@ -1,32 +1,16 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[deques, hashes, monotimes, options, sets, sequtils, tables, times]
import pkg/cps
import std/[hashes, monotimes, options, sets, tables, times]
import preserves
import ../syndicate/protocols/[protocol, sturdy]
export cps
const tracing = defined(traceSyndicate)
const traceSyndicate {.booldefine.}: bool = true
when traceSyndicate:
when tracing:
import std/streams
from std/os import getEnv
import ./protocols/trace
type TraceSink = ref object
stream: FileStream
proc newTraceSink: TraceSink =
new result
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": quit"$SYNDICATE_TRACE_FILE unset"
of "-": actor.stream = newFileStream(stderr)
else: result.stream = openFileStream(path, fmWrite)
proc write(s: TraceSink; e: TraceEntry) = s.write(e.toPreserves)
export Handle
@ -78,23 +62,20 @@ type
exitHooks: seq[TurnAction]
id: ActorId
exiting, exited: bool
when traceSyndicate:
when tracing:
turnIdAllocator: ref TurnId
traceStream: FileStream
TurnAction* = proc (t: Turn)
TurnAction* = proc (t: var Turn) {.closure.}
Queues = TableRef[Facet, Deque[Cont]]
Queues = TableRef[Facet, seq[TurnAction]]
Turn* = ref object
Turn* = object # an object that should remain on the stack
facet: Facet
queues: Queues
when traceSyndicate:
queues: Queues # a ref object that can outlive Turn
when tracing:
desc: TurnDescription
Cont* = ref object of Continuation
turn*: Turn
Facet* = ref FacetObj
FacetObj = object
actor*: Actor
@ -106,31 +87,20 @@ type
id: FacetId
isAlive: bool
proc pass*(a, b: Cont): Cont =
assert not a.turn.isNil
b.turn = move a.turn
return b
template turnAction*(prc: typed): untyped =
cps(Cont, prc)
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
assert not c.turn.isNil
c.turn
when traceSyndicate:
when tracing:
proc nextTurnId(facet: Facet): TurnId =
result = succ(facet.actor.turnIdAllocator[])
facet.actor.turnIdAllocator[] = result
proc trace(actor: Actor; act: ActorActivation) =
assert not actor.traceStream.isNil
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: initRecord("named", actor.name.toPreserves),
item: act)
actor.traceStream.writeLine($entry.toPreserves)
if not actor.traceStream.isNil:
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: initRecord("named", actor.name.toPreserves),
item: act)
actor.traceStream.writeText entry.toPreserves
actor.traceStream.writeLine()
proc path(facet: Facet): seq[trace.FacetId] =
var f = facet
@ -138,15 +108,15 @@ when traceSyndicate:
result.add f.id.toPreserves
f = f.parent
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
using
actor: Actor
facet: Facet
turn: Turn
turn: var Turn
action: TurnAction
proc labels(f: Facet): string =
@ -155,7 +125,7 @@ proc labels(f: Facet): string =
if not f.parent.isNil:
catLabels(f.parent, labels)
labels.add ':'
when traceSyndicate:
when tracing:
labels.add $f.id
result.add f.actor.name
catLabels(f, result)
@ -185,14 +155,13 @@ proc nextHandle(facet: Facet): Handle =
result = succ(facet.actor.handleAllocator[])
facet.actor.handleAllocator[] = result
proc facet*(turn: Turn): Facet = turn.facet
proc facet*(turn: var Turn): Facet = turn.facet
proc enqueue(turn: Turn; target: Facet; cont: Cont) =
cont.turn = turn
proc enqueue(turn: var Turn; target: Facet; action: TurnAction) =
if target in turn.queues:
turn.queues[target].addLast cont
turn.queues[target].add action
else:
turn.queues[target] = toDeque([cont])
turn.queues[target] = @[action]
type Bindings = Table[Value, Value]
@ -202,7 +171,7 @@ proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
of PatternKind.Patom:
result = case p.patom
of PAtom.Boolean: v.isBoolean
of PAtom.Double: v.isDouble
of PAtom.Double: v.isFloat
of PAtom.Signedinteger: v.isInteger
of PAtom.String: v.isString
of PAtom.Bytestring: v.isByteString
@ -306,18 +275,15 @@ proc runRewrites*(a: Attenuation; v: Value): Value =
result = examineAlternatives(stage, result)
if result.isFalse: break
proc publish(target: Entity; e: OutboundAssertion; a: AssertionRef) {.turnAction.} =
e.established = true
publish(target, activeTurn(), a, e.handle)
proc publish(turn: Turn; r: Cap; v: Value; h: Handle) =
proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
let e = OutboundAssertion(
handle: h, peer: r, established: false)
let e = OutboundAssertion(handle: h, peer: r)
turn.facet.outbound[h] = e
enqueue(turn, r.relay, whelp publish(r.target, e, AssertionRef(value: a)))
when traceSyndicate:
enqueue(turn, r.relay) do (turn: var Turn):
e.established = true
publish(r.target, turn, AssertionRef(value: a), e.handle)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
act.enqueue.event.target.facet = turn.facet.id.toPreserves
@ -328,59 +294,53 @@ proc publish(turn: Turn; r: Cap; v: Value; h: Handle) =
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
proc publish*(turn: Turn; r: Cap; a: Value): Handle =
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
result = turn.facet.nextHandle()
publish(turn, r, a, result)
proc publish*(r: Cap; a: Value): Handle {.turnAction.} =
publish(activeTurn(), r, a, result)
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
publish(turn, r, a.toPreserves)
proc retract(e: OutboundAssertion) {.turnAction.} =
if e.established:
e.established = false
e.peer.target.retract(activeTurn(), e.handle)
proc retract(turn: var Turn; e: OutboundAssertion) =
enqueue(turn, e.peer.relay) do (turn: var Turn):
if e.established:
e.established = false
e.peer.target.retract(turn, e.handle)
proc retract(turn: Turn; e: OutboundAssertion) =
enqueue(turn, e.peer.relay, whelp retract(e))
proc retract*(turn: Turn; h: Handle) =
proc retract*(turn: var Turn; h: Handle) =
var e: OutboundAssertion
if turn.facet.outbound.pop(h, e):
turn.retract(e)
proc message(target: Entity; a: AssertionRef) {.turnAction.} =
target.message(activeTurn(), a)
proc message*(turn: Turn; r: Cap; v: Value) =
proc message*(turn: var Turn; r: Cap; v: Value) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
enqueue(turn, r.relay, whelp message(r.target, AssertionRef(value: a)))
enqueue(turn, r.relay) do (turn: var Turn):
r.target.message(turn, AssertionRef(value: a))
proc message*(target: Cap; value: Value) {.turnAction.} =
message(activeTurn(), target, value)
proc message*[T](turn: var Turn; r: Cap; v: T) =
message(turn, r, v.toPreserves)
proc sync(e: Entity; peer: Cap) {.turnAction.} =
e.sync(activeTurn(), peer)
proc sync(turn: Turn; e: Entity; peer: Cap) =
proc sync(turn: var Turn; e: Entity; peer: Cap) =
e.sync(turn, peer)
proc sync*(turn: Turn; r, peer: Cap) =
enqueue(turn, r.relay, whelp sync(r.target, peer))
proc sync*(turn: var Turn; r, peer: Cap) =
enqueue(turn, r.relay) do (turn: var Turn):
sync(turn, r.target, peer)
proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle =
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
result = publish(turn, cap, v)
if h != default(Handle):
retract(turn, h)
proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
var old = h
h = publish(turn, cap, v)
if old != default(Handle):
retract(turn, old)
h
proc stop*(turn: Turn)
proc stop*(turn: var Turn)
proc run*(facet; action: TurnAction; zombieTurn = false)
@ -402,7 +362,7 @@ proc isInert(facet): bool =
(facet.outbound.len == 0 or facet.parent.isNil) and
facet.inertCheckPreventers == 0
proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
proc preventInertCheck*(facet): (proc()) {.discardable.} =
var armed = true
inc facet.inertCheckPreventers
proc disarm() =
@ -411,7 +371,7 @@ proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} =
dec facet.inertCheckPreventers
result = disarm
proc inFacet(turn: Turn; facet; act: TurnAction) =
proc inFacet(turn: var Turn; facet; act: TurnAction) =
## Call an action with a facet using a temporary `Turn`
## that shares the `Queues` of the calling `Turn`.
var t = Turn(facet: facet, queues: turn.queues)
@ -419,7 +379,7 @@ proc inFacet(turn: Turn; facet; act: TurnAction) =
proc terminate(actor; turn; reason: ref Exception)
proc terminate(facet; turn: Turn; orderly: bool) =
proc terminate(facet; turn: var Turn; orderly: bool) =
if facet.isAlive:
facet.isAlive = false
let parent = facet.parent
@ -439,41 +399,33 @@ proc terminate(facet; turn: Turn; orderly: bool) =
parent.terminate(turn, true)
else:
terminate(facet.actor, turn, nil)
when traceSyndicate:
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
turn.desc.actions.add act
proc stopIfInert() {.turnAction.} =
let turn = activeTurn()
if (not turn.facet.parent.isNil and (not turn.facet.parent.isAlive)) or turn.facet.isInert:
stop(turn)
proc stopIfInertAfter(action: TurnAction): TurnAction =
proc wrapper(turn: Turn) =
proc wrapper(turn: var Turn) =
action(turn)
enqueue(turn, turn.facet, whelp stopIfInert())
enqueue(turn, turn.facet) do (turn: var Turn):
if (not turn.facet.parent.isNil and
(not turn.facet.parent.isAlive)) or
turn.facet.isInert:
stop(turn)
wrapper
proc newFacet*(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet)
proc newFacet*(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
proc inFacet*(turn: Turn; bootProc: TurnAction): Facet =
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
result = newFacet(turn)
when traceSyndicate:
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
act.facetstart.path.add result.path
turn.desc.actions.add act
inFacet(turn, result, stopIfInertAfter(bootProc))
proc facet*(turn: Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
proc facet*(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc run(actor; bootProc: TurnAction) =
var initialAssertions: OutboundTable
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc newActor(name: string; handleAlloc: ref Handle): Actor =
let
now = getTime()
@ -484,81 +436,70 @@ proc newActor(name: string; handleAlloc: ref Handle): Actor =
handleAllocator: handleAlloc,
)
result.root = newFacet(result, nil)
when traceSyndicate:
new result.turnIdAllocator
proc newActor*(name: string): Actor =
newActor(name, new(ref Handle))
proc bootActor*(name: string; bootProc: TurnAction) =
var
initialAssertions: OutboundTable
actor = newActor(name)
when traceSyndicate:
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
case path
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
of "-": actor.traceStream = newFileStream(stderr)
else: actor.traceStream = openFileStream(path, fmWrite)
when traceSyndicate:
when tracing:
var act = ActorActivation(orKind: ActorActivationKind.start)
act.start.actorName = Name(orKind: NameKind.named)
act.start.actorName.named.name = name.toPreserves
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
item: act)
actor.traceStream.writeLine($entry.toPreserves)
let turn = newTurn(actor, TurnCauseExternal(description: "top-level actor"))
run(actor, bootProc, initialAssertions)
trace(result, act)
proc bootActor*(name: string; cont: Cont) =
bootActor(name) do (turn: Turn):
enqueue(turn, turn.facet, cont)
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc spawnActor(actor: Actor; bootProc: TurnAction; initialAssertions: HashSet[Handle]) {.turnAction.} =
let turn = activeTurn()
var newOutBound: Table[Handle, OutboundAssertion]
for key in initialAssertions:
discard turn.facet.outbound.pop(key, newOutbound[key])
when traceSyndicate:
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
actor.traceStream = turn.facet.actor.traceStream
var act = ActionDescription(orKind: ActionDescriptionKind.spawn)
act.spawn.id = actor.id.toPreserves
turn.desc.actions.add act
run(actor, bootProc, newOutBound)
proc bootActor*(name: string; bootProc: TurnAction): Actor =
var initialAssertions: OutboundTable
result = newActor(name, new(ref Handle))
when tracing:
new result.turnIdAllocator
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
case path
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
run(result, bootProc, initialAssertions)
proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
proc spawnActor*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
let actor = newActor(name, turn.facet.actor.handleAllocator)
enqueue(turn, turn.facet, whelp spawnActor(actor, bootProc, initialAssertions))
enqueue(turn, turn.facet) do (turn: var Turn):
var newOutBound: Table[Handle, OutboundAssertion]
for key in initialAssertions:
discard turn.facet.outbound.pop(key, newOutbound[key])
when tracing:
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
actor.traceStream = turn.facet.actor.traceStream
var act = ActionDescription(orKind: ActionDescriptionKind.spawn)
act.spawn.id = actor.id.toPreserves
turn.desc.actions.add act
run(actor, bootProc, newOutBound)
actor
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
spawnActor(name, turn, bootProc, initialAssertions)
proc newInertCap*(): Cap =
let a = newActor("inert")
run(a) do (turn: Turn): turn.stop()
let a = bootActor("inert") do (turn: var Turn): turn.stop()
Cap(relay: a.root)
proc atExit*(actor; action) = actor.exitHooks.add action
proc terminate(actor: Actor; orderly: bool) {.turnAction.} =
actor.root.terminate(activeTurn(), orderly)
actor.exited = true
proc terminate(actor; turn; reason: ref Exception) =
if not actor.exiting:
actor.exiting = true
actor.exitReason = reason
when traceSyndicate:
when tracing:
var act = ActorActivation(orKind: ActorActivationKind.stop)
if not reason.isNil:
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
act.stop.status.error.message = reason.msg
trace(actor, act)
for hook in actor.exitHooks: hook(turn)
enqueue(turn, actor.root, whelp terminate(actor, reason.isNil))
proc finish(turn: var Turn) =
actor.root.terminate(turn, reason.isNil)
actor.exited = true
block:
run(actor.root, finish, true)
proc terminate*(facet; e: ref Exception) =
run(facet.actor.root) do (turn: Turn):
run(facet.actor.root) do (turn: var Turn):
facet.actor.terminate(turn, e)
#[
@ -576,40 +517,19 @@ template tryFacet(facet; body: untyped) =
try: body
except CatchableError as err: terminate(facet, err)
proc run(facet: Facet; turn: Turn; deq: var Deque[Cont]): int =
## Return the number of continuations processed.
while deq.len > 0:
var c = deq.popFirst()
try:
while not c.isNil and not c.fn.isNil:
c.turn = turn
var y = c.fn
var x = y(c)
inc(result)
c = Cont(x)
except CatchableError as err:
if not c.dismissed:
writeStackFrames c
terminate(facet, err)
stderr.writeLine("ran ", result, " continuations for ", facet)
proc run*(facet; action: TurnAction; zombieTurn = false) =
if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
if true and zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
tryFacet(facet):
var queues = newTable[Facet, Deque[Cont]]()
var turn = Turn(facet: facet, queues: queues)
action(turn)
when traceSyndicate:
turn.desc.id = facet.nextTurnId.toPreserves
facet.actor.trace ActorActivation(
orKind: ActorActivationKind.turn, turn: turn.desc)
assert not turn.isNil
var n = 1
while n > 0:
n = 0
var facets = queues.keys.toSeq
for facet in facets:
n.inc run(facet, turn, queues[facet])
var queues = newTable[Facet, seq[TurnAction]]()
block:
var turn = Turn(facet: facet, queues: queues)
action(turn)
when tracing:
turn.desc.id = facet.nextTurnId.toPreserves
facet.actor.trace ActorActivation(
orKind: ActorActivationKind.turn, turn: turn.desc)
for facet, queue in queues:
for action in queue: run(facet, action)
proc run*(cap: Cap; action: TurnAction) =
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
@ -622,15 +542,15 @@ proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
addCallback(fut) do ():
if fut.failed: terminate(facet, fut.error)
else:
when traceSyndicate:
run(facet) do (turn: Turn):
when tracing:
run(facet) do (turn: var Turn):
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserves
act(turn)
else:
run(facet, act)
proc addCallback*(fut: FutureBase; turn: Turn; act: TurnAction) =
proc addCallback*(fut: FutureBase; turn: var Turn; act: TurnAction) =
## Add a callback to a `Future` that will be called at a later `Turn`
## with the same context as the current.
if fut.failed:
@ -640,35 +560,43 @@ proc addCallback*(fut: FutureBase; turn: Turn; act: TurnAction) =
else:
addCallback(fut, turn.facet, act)
proc addCallback*[T](fut: Future[T]; turn: Turn; act: proc (t: Turn, x: T) {.gcsafe.}) =
addCallback(fut, turn) do (turn: Turn):
proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x: T) {.closure.}) =
addCallback(fut, turn) do (turn: var Turn):
if fut.failed: terminate(turn.facet, fut.error)
else:
when traceSyndicate:
when tracing:
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserves
act(turn, read fut)
]#
proc stop*(turn: Turn, facet: Facet) =
proc stop*(turn: var Turn, facet: Facet) =
if facet.parent.isNil:
facet.terminate(turn, true)
else:
enqueue(turn, facet.parent, whelp terminate(facet.actor, true))
# TODO: terminate the actor?
enqueue(turn, facet.parent) do (turn: var Turn):
facet.terminate(turn, true)
proc stop*(turn: Turn) =
proc stop*(turn: var Turn) =
stop(turn, turn.facet)
proc onStop*(facet: Facet; act: TurnAction) =
## Add a `proc (turn: Turn)` action to `facet` to be called as it stops.
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
add(facet.shutdownActions, act)
proc stopActor*(turn: Turn) =
let actor = turn.facet.actor
enqueue(turn, actor.root, whelp terminate(actor, true))
proc stopActor*(facet: Facet) =
run(facet) do (turn: var Turn):
let actor = facet.actor
enqueue(turn, actor.root) do (turn: var Turn):
terminate(actor, turn, nil)
proc freshen*(turn: Turn, act: TurnAction) =
proc stopActor*(turn: var Turn) =
stopActor(turn.facet)
proc stop*(actor: Actor) =
stopActor(actor.root)
proc freshen*(turn: var Turn, act: TurnAction) =
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
run(turn.facet, act)
@ -684,16 +612,13 @@ proc newCap*(e: Entity; turn): Cap =
type SyncContinuation {.final.} = ref object of Entity
action: TurnAction
method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) =
method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) =
entity.action(turn)
proc sync*(turn: Turn; refer: Cap; act: TurnAction) =
proc sync*(turn: var Turn; refer: Cap; act: TurnAction) =
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
proc running*(actor): bool =
result = not actor.exited
if not (result or actor.exitReason.isNil):
raise actor.exitReason
proc newCap*(e: Entity): Cap {.turnAction.} =
Cap(relay: activeTurn().facet, target: e)

View File

@ -1,19 +1,19 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, times, oserrors, posix, sets, times, epoll]
import std/[sets, times]
import pkg/sys/[handles, ioqueue]
import preserves
import ../[bags, syndicate], ../protocols/[timer, dataspace]
import ../../syndicate, ../bags, ../protocols/[timer, dataspace]
export timer
type
Observe = dataspace.Observe
Time = posix.Time
when defined(linux):
import std/[epoll, posix]
import std/[oserrors, posix]
type Time = posix.Time
{.pragma: timerfd, importc, header: "<sys/timerfd.h>".}
@ -27,6 +27,14 @@ when defined(linux):
TFD_CLOEXEC {.timerfd.}: cint
TFD_TIMER_ABSTIME {.timerfd.}: cint
proc `<`(a, b: Timespec): bool =
a.tv_sec.clong < b.tv_sec.clong or
(a.tv_sec.clong == b.tv_sec.clong and a.tv_nsec < b.tv_nsec)
proc `+`(a, b: Timespec): Timespec =
result.tv_sec = Time a.tv_sec.clong + b.tv_sec.clong
result.tv_nsec = a.tv_nsec + b.tv_nsec
func toFloat(ts: Timespec): float =
ts.tv_sec.float + ts.tv_nsec.float / 1_000_000_000
@ -34,30 +42,24 @@ when defined(linux):
result.tv_sec = Time(f)
result.tv_nsec = clong(uint64(f * 1_000_000_000) mod 1_000_000_000)
proc `<`(a, b: Timespec): bool =
a.tv_sec.clong <= b.tv_sec.clong and a.tv_nsec <= b.tv_nsec
proc `+`(a, b: Timespec): Timespec =
result.tv_sec = Time a.tv_sec.clong + b.tv_sec.clong
result.tv_nsec = a.tv_nsec + b.tv_nsec
proc clock_realtime: Timespec =
if clock_gettime(CLOCK_REALTIME, result) < 0:
raiseOSError(osLastError(), "clock_gettime")
proc nsec(epoch: float): clong =
clong(uint64(epoch * 1_000_000_000) mod 1_000_000_000'u64)
type
TimerDriver = ref object
facet: Facet
## Owning facet of driver.
target: Cap
## Destination for LaterThan assertions.
deadlines: Bag[float]
## Deadlines that other actors are observing.
timers: HashSet[cint]
# TODO: use a single timer descriptor
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
let driver = TimerDriver(facet: facet, target: cap)
facet.onStop do ():
facet.onStop do (turn: var Turn):
for fd in driver.timers:
unregister(FD fd)
discard close(fd)
@ -71,43 +73,43 @@ when defined(linux):
result = deadline
proc await(driver: TimerDriver; deadline: float) {.asyncio.} =
## Run timer driver concurrently with actor.
let fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC)
if fd < 0:
raiseOSError(osLastError(), "failed to acquire timer descriptor")
var
old: Itimerspec
its = Itimerspec(it_value: deadline.toTimeSpec)
its = Itimerspec(it_value: deadline.toTimespec)
if timerfd_settime(fd, TFD_TIMER_ABSTIME, its, old) < 0:
raiseOSError(osLastError(), "failed to set timeout")
driver.timers.incl(fd)
while clock_realtime().toFloat() < deadline:
while clock_realtime() < its.it_value:
# Check if the timer is expired which
# could happen before waiting.
wait(FD fd, Read)
if deadline in driver.deadlines:
proc pub() =
echo "publishing later-than"
discard publish(driver.target, LaterThan(seconds: deadline))
runExternalTurn(driver.facet, pub)
# Check if the deadline is still observed.
proc turnWork(turn: var Turn) =
discard publish(turn, driver.target, LaterThan(seconds: deadline))
run(driver.facet, turnWork)
discard close(fd)
driver.timers.excl(fd)
proc spawnTimerActor*(ds: Cap): Actor {.discardable.} =
proc spawnTimerActor*(ds: Cap; turn: var Turn): Actor {.discardable.} =
## Spawn a timer actor that responds to
## dataspace observations of timeouts on `ds`.
spawnActor(ds.relay, "timers") do (facet: Facet):
let driver = spawnTimerDriver(facet, ds)
spawnActor("timers", turn) do (turn: var Turn):
let driver = spawnTimerDriver(turn.facet, ds)
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
during(ds, pat) do (deadline: float):
echo "timer actor sees observation of ", LaterThan(seconds: deadline)
during(turn, ds, pat) do (deadline: float):
if change(driver.deadlines, deadline, +1) == cdAbsentToPresent:
discard trampoline(whelp await(driver, deadline))
do:
discard change(driver.deadlines, deadline, -1, clamp = true)
# TODO: retract assertions that are unobserved.
proc after*(ds: Cap; dur: Duration; cb: proc () {.closure.}) =
## Execute `cb` after some duration of time.
var
later = clock_realtime().toFloat() +
dur.inMilliseconds.float / 1_000.0
pat = ?LaterThan(seconds: later)
onPublish(ds, pat):
cb()
proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) =
## Execute `act` after some duration of time.
var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0
onPublish(turn, ds, grab LaterThan(seconds: later)):
act(turn)

View File

@ -51,7 +51,7 @@ iterator observersOf[Sid, Oid](g: Graph[Sid, Oid]; oid: Oid): Sid =
if g.edgesForward.hasKey(oid):
for sid in g.edgesForward[oid]: yield sid
proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.gcsafe.}) =
proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.closure.}) =
var repairedThisRound: Set[Oid]
while true:
var workSet = move g.damagedNodes

View File

@ -0,0 +1,50 @@
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, options, tables]
import preserves
import ./actors, ./protocols/dataspace, ./skeletons
from ./protocols/protocol import Handle
type
Assertion = Value
Observe = dataspace.Observe
Turn = actors.Turn
Dataspace {.final.} = ref object of Entity
index: Index
handleMap: Table[Handle, Assertion]
method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) =
if add(ds.index, turn, a.value):
var obs = a.value.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
ds.handleMap[h] = a.value
method retract(ds: Dataspace; turn: var Turn; h: Handle) =
let v = ds.handleMap[h]
if remove(ds.index, turn, v):
ds.handleMap.del h
var obs = v.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
method message(ds: Dataspace; turn: var Turn; a: AssertionRef) =
ds.index.deliverMessage(turn, a.value)
proc newDataspace*(turn: var Turn): Cap =
newCap(turn, Dataspace(index: initIndex()))
type BootProc = proc (turn: var Turn; ds: Cap) {.closure.}
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
bootActor(name) do (turn: var Turn):
discard turn.facet.preventInertCheck()
bootProc(turn, newDataspace(turn))
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
bootDataspace(name) do (turn: var Turn, ds: Cap):
bootProc(ds, turn)

48
src/syndicate/durings.nim Normal file
View File

@ -0,0 +1,48 @@
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, tables]
import preserves
import ./actors, ./patterns, ./protocols/dataspace
type
DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction
DuringActionKind = enum null, dead, act
DuringAction = object
case kind: DuringActionKind
of null, dead: discard
of act:
action: TurnAction
DuringEntity {.final.}= ref object of Entity
cb: DuringProc
assertionMap: Table[Handle, DuringAction]
method publish(de: DuringEntity; turn: var Turn; a: AssertionRef; h: Handle) =
let action = de.cb(turn, a.value, h)
# assert(not action.isNil "should have put in a no-op action")
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
de.assertionMap[h] = DuringAction(kind: act, action: action)
of dead:
de.assertionMap.del h
freshen(turn, action)
of act:
raiseAssert("during: duplicate handle in publish: " & $h)
method retract(de: DuringEntity; turn: var Turn; h: Handle) =
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
de.assertionMap[h] = DuringAction(kind: dead)
of dead:
raiseAssert("during: duplicate handle in retract: " & $h)
of act:
de.assertionMap.del h
if not g.action.isNil:
g.action(turn)
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)))

View File

@ -313,7 +313,7 @@ proc grabDictionary*(bindings: sink openArray[(string, Pattern)]): Pattern =
for (key, val) in bindings.items:
result.dcompound.dict.entries[key.toSymbol] = val
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value {.gcsafe.}
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value
proc depattern(pat: Pattern; values: var seq[Value]; index: var int): Value =
case pat.orKind
@ -328,7 +328,7 @@ proc depattern(pat: Pattern; values: var seq[Value]; index: var int): Value =
of PatternKind.DCompound:
result = depattern(pat.dcompound, values, index)
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value {.gcsafe.} =
proc depattern(comp: DCompound; values: var seq[Value]; index: var int): Value =
case comp.orKind
of DCompoundKind.rec:
result = initRecord(comp.rec.label, comp.rec.fields.len)

View File

@ -3,10 +3,9 @@
import std/[options, tables]
from std/os import getEnv, `/`
import pkg/sys/[ioqueue, sockets]
import pkg/sys/ioqueue
import preserves
import ./syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
when defined(traceSyndicate):
when defined(posix):
@ -17,6 +16,7 @@ else:
template trace(args: varargs[untyped]): untyped = discard
export `$`
export Stdio, Tcp, WebSocket, Unix
type
@ -27,8 +27,8 @@ type
Turn = syndicate.Turn
WireRef = sturdy.WireRef
PacketWriter = proc (turn: Turn; buf: seq[byte]) {.closure, gcsafe.}
RelaySetup = proc (turn: Turn; relay: Relay) {.closure, gcsafe.}
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
Relay* = ref object
facet: Facet
@ -38,7 +38,6 @@ type
exported: Membrane
imported: Membrane
nextLocalOid: Oid
pendingTurn: protocol.Turn
wireBuf: BufferedDecoder
packetWriter: PacketWriter
peer: Cap
@ -57,20 +56,20 @@ type
proc releaseCapOut(r: Relay; e: WireSymbol) =
r.exported.drop e
method publish(spe: SyncPeerEntity; t: Turn; a: AssertionRef; h: Handle) =
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
spe.handleMap[h] = publish(t, spe.peer, a.value)
method retract(se: SyncPeerEntity; t: Turn; h: Handle) =
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
var other: Handle
if se.handleMap.pop(h, other):
retract(t, other)
method message(se: SyncPeerEntity; t: Turn; a: AssertionRef) =
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
if not se.e.isNil:
se.relay.releaseCapOut(se.e)
message(t, se.peer, a.value)
method sync(se: SyncPeerEntity; t: Turn; peer: Cap) =
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
sync(t, se.peer, peer)
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
@ -90,7 +89,7 @@ proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireR
mine: WireRefMine(oid: ws.oid))
proc rewriteOut(relay: Relay; v: Assertion):
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
tuple[rewritten: Value, exported: seq[WireSymbol]] =
var exported: seq[WireSymbol]
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let o = pr.unembed(Cap); if o.isSome:
@ -107,46 +106,40 @@ proc deregister(relay: Relay; h: Handle) =
if relay.outboundAssertions.pop(h, outbound):
for e in outbound: releaseCapOut(relay, e)
proc send(relay: Relay; turn: Turn; rOid: protocol.Oid; m: Event) =
if relay.pendingTurn.len > 0:
stderr.writeLine "relay has pending turn events"
# If the pending queue is empty then schedule a packet
# to be sent after pending I/O is processed.
#[
assert turn.finalizer.isNil
turn.finalizer = proc () =
relay.facet.run do (turn: Turn):
var pkt = Packet(
orKind: PacketKind.Turn,
turn: move relay.pendingTurn)
trace "C: ", pkt
relay.packetWriter(turn, encode pkt)
]#
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
# TODO: don't send right away.
var pendingTurn: protocol.Turn
pendingTurn.add TurnEvent(oid: rOid, event: m)
relay.facet.run do (turn: var Turn):
var pkt = Packet(
orKind: PacketKind.Turn,
turn: pendingTurn)
trace "C: ", pkt
relay.packetWriter(turn, encode pkt)
proc send(re: RelayEntity; turn: Turn; ev: Event) =
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
send(re.relay, turn, protocol.Oid re.oid, ev)
method publish(re: RelayEntity; t: Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) =
re.send(t, Event(
orKind: EventKind.Assert,
`assert`: protocol.Assert(
assertion: re.relay.register(a.value, h).rewritten,
handle: h)))
method retract(re: RelayEntity; t: Turn; h: Handle) {.gcsafe.} =
method retract(re: RelayEntity; t: var Turn; h: Handle) =
re.relay.deregister h
re.send(t, Event(
orKind: EventKind.Retract,
retract: Retract(handle: h)))
method message(re: RelayEntity; turn: Turn; msg: AssertionRef) {.gcsafe.} =
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) =
var (value, exported) = rewriteOut(re.relay, msg.value)
assert(len(exported) == 0, "cannot send a reference in a message")
if len(exported) == 0:
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
method sync(re: RelayEntity; turn: Turn; peer: Cap) {.gcsafe.} =
method sync(re: RelayEntity; turn: var Turn; peer: Cap) =
var
peerEntity = newSyncPeerEntity(re.relay, peer)
exported: seq[WireSymbol]
@ -189,7 +182,7 @@ proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap
else: raiseAssert "attenuation not implemented"
proc rewriteIn(relay; facet; v: Value):
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
tuple[rewritten: Assertion; imported: seq[WireSymbol]] =
var imported: seq[WireSymbol]
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let wr = pr.preservesTo WireRef; if wr.isSome:
@ -200,7 +193,7 @@ proc rewriteIn(relay; facet; v: Value):
proc close(r: Relay) = discard
proc dispatch(relay: Relay; turn: Turn; cap: Cap; event: Event) {.gcsafe.} =
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
@ -223,14 +216,14 @@ proc dispatch(relay: Relay; turn: Turn; cap: Cap; event: Event) {.gcsafe.} =
#[
var imported: seq[WireSymbol]
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
turn.sync(cap) do (turn: Turn):
turn.sync(cap) do (turn: var Turn):
turn.message(k, true)
for e in imported: relay.imported.del e
]#
proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
proc dispatch(relay: Relay; v: Value) =
trace "S: ", v
run(relay.facet) do (t: Turn):
run(relay.facet) do (t: var Turn):
var pkt: Packet
if pkt.fromPreserves(v):
case pkt.orKind
@ -254,8 +247,8 @@ proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
when defined(posix):
stderr.writeLine("discarding undecoded packet ", v)
proc recv(relay: Relay; buf: seq[byte]) =
feed(relay.wireBuf, buf)
proc recv(relay: Relay; buf: openarray[byte]; slice: Slice[int]) =
feed(relay.wireBuf, buf, slice)
var pr = decode(relay.wireBuf)
if pr.isSome: dispatch(relay, pr.get)
@ -268,8 +261,8 @@ type
initialCap*: Cap
nextLocalOid*: Option[Oid]
proc spawnRelay(name: string; turn: Turn; opts: RelayActorOptions; setup: RelaySetup) =
spawn(name, turn) do (turn: Turn):
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
spawnActor(name, turn) do (turn: var Turn):
let relay = Relay(
facet: turn.facet,
packetWriter: opts.packetWriter,
@ -304,21 +297,33 @@ proc accepted(cap: Cap): Resolved =
when defined(posix):
#[
import std/asyncfile
export Unix
import std/[oserrors, posix]
import pkg/sys/[files, handles, sockets]
export transportAddress.Unix
type StdioControlEntity = ref object of Entity
buf: ref seq[byte]
relay: Relay
stdin: AsyncFile
method message(entity: StdioControlEntity; turn: Turn; ass: AssertionRef) =
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
close(entity.stdin)
close(stdout)
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Stdio) =
proc loop(entity: StdioControlEntity) {.asyncio.} =
new entity.buf
entity.buf[].setLen(0x1000)
while true:
let n = read(entity.stdin, entity.buf)
if n == 0:
stderr.writeLine "empty read on stdin, stopping actor"
stopActor(entity.relay.facet)
else:
entity.relay.recv(entity.buf[], 0..<n)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
## Connect to an external dataspace over stdio.
proc stdoutWriter(turn: Turn; buf: seq[byte]) =
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
## Blocking write to stdout.
let n = writeBytes(stdout, buf, 0, buf.len)
flushFile(stdout)
@ -329,111 +334,114 @@ when defined(posix):
initialCap: ds,
initialOid: 0.Oid.some,
)
spawnRelay("stdio", turn, opts) do (turn: Turn; relay: Relay):
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin") # this is universal now?
fd = stdin.getOsFileHandle()
flags = fcntl(fd.cint, F_GETFL, 0)
if flags < 0: raiseOSError(osLastError())
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
raiseOSError(osLastError())
let entity = StdioControlEntity(
relay: relay, stdin: newAsyncFile(FD fd))
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: StdioControlEntity(stdin: asyncStdin).newCap(turn),
control: newCap(entity, turn),
resolved: relay.peer.accepted,
))
const stdinReadSize = 0x2000
proc readCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: Turn): stopActor(turn)
else:
relay.recv(cast[seq[byte]](buf))
asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)
discard trampoline:
whelp loop(entity)
proc connectStdio*(turn: Turn; ds: Cap) =
proc connectStdio*(turn: var Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout.
connectTransport(turn, ds, transportAddress.Stdio())
import std/asyncnet
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
type
TcpEntity = ref object of Entity
relay: Relay
sock: AsyncConn[sockets.Protocol.TCP]
buf: ref seq[byte]
alive: bool
type SocketControlEntity = ref object of Entity
socket: AsyncSocket
UnixEntity = ref object of Entity
relay: Relay
sock: AsyncConn[sockets.Protocol.Unix]
buf: ref seq[byte]
alive: bool
method message(entity: SocketControlEntity; turn: Turn; ass: AssertionRef) =
SocketEntity = TcpEntity | UnixEntity
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
close(entity.socket)
reset entity.alive
close(entity.sock)
type ShutdownEntity* = ref object of Entity
method retract(e: ShutdownEntity; turn: Turn; h: Handle) =
template socketLoop() {.dirty.}=
new entity.buf
entity.buf[].setLen(0x1000)
entity.alive = not entity.alive
while entity.alive:
let n = read(entity.sock, entity.buf)
if n < 0: raiseOSError(osLastError())
elif n == 0:
stderr.writeLine "empty read on socket, stopping actor"
stopActor(entity.relay.facet)
else:
entity.relay.recv(entity.buf[], 0..<n)
stderr.writeLine "breaking socketLoop"
proc loop(entity: TcpEntity) {.asyncio.} =
socketLoop()
proc loop(entity: UnixEntity) {.asyncio.} =
socketLoop()
type ShutdownEntity = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
proc connect(turn: Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) =
proc socketWriter(turn: Turn; buf: seq[byte]) =
asyncCheck(turn, socket.send(cast[string](buf)))
template bootSocketEntity() {.dirty.} =
proc publish(turn: var Turn) =
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: newCap(entity, turn),
resolved: entity.relay.peer.accepted,
))
run(entity.relay.facet, publish)
loop(entity)
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
entity.sock = connectTcpAsync(ta.host, Port ta.port)
bootSocketEntity()
proc boot(entity: UnixEntity; ta: transportAddress.Unix; ds: Cap) {.asyncio.} =
entity.sock = connectUnixAsync(ta.path)
bootSocketEntity()
template spawnSocketRelay() {.dirty.} =
proc writeConn(turn: var Turn; buf: seq[byte]) =
discard trampoline:
whelp write(entity.sock, buf)
var ops = RelayActorOptions(
packetWriter: socketWriter,
packetWriter: writeConn,
initialOid: 0.Oid.some,
)
spawnRelay("socket", turn, ops) do (turn: Turn; relay: Relay):
let facet = turn.facet
facet.actor.atExit do (turn: Turn): close(socket)
publish(turn, ds, TransportConnection(
`addr`: transAddr,
control: SocketControlEntity(socket: socket).newCap(turn),
resolved: relay.peer.accepted,
))
const recvSize = 0x4000
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if pktFut.failed or pktFut.read.len == 0:
run(facet) do (turn: Turn): stopActor(turn)
else:
relay.recv(cast[seq[byte]](pktFut.read))
if not socket.isClosed:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
entity.relay = relay
atExit(turn.facet.actor) do (turn: var Turn):
entity.alive = false
close(entity.sock)
discard trampoline:
whelp boot(entity, ta, ds)
proc connect(turn: Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) =
let facet = turn.facet
fut.addCallback do ():
run(facet) do (turn: Turn):
if fut.failed:
var ass = TransportConnection(
`addr`: ta,
resolved: Resolved(orKind: ResolvedKind.Rejected),
)
ass.resolved.rejected.detail = embed fut.error
publish(turn, ds, ass)
else:
connect(turn, ds, ta, socket)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
let entity = TcpEntity()
spawnSocketRelay()
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Tcp) =
let
facet = turn.facet
socket = newAsyncSocket(
domain = AF_INET,
sockType = SOCK_STREAM,
protocol = IPPROTO_TCP,
buffered = false,
)
connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port))
]#
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
let entity = UnixEntity()
spawnSocketRelay()
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Tcp) {.asyncio.} =
var conn = connectTcpAsync(ta.host, Port ta.port)
# connect(turn, ds, ta.toPreserves, conn)
#[
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Unix) =
## Relay a dataspace over a UNIX socket.
let socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path))
]#
proc walk(turn: Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} =
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
if stepOff < route.pathSteps.len:
let
step = route.pathSteps[stepOff]
@ -456,7 +464,7 @@ proc walk(turn: Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.g
resolved: origin.accepted,
))
proc connectRoute(turn: Turn; ds: Cap; route: Route; transOff: int) =
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
let rejectPat = TransportConnection ?: {
0: ?route.transports[transOff],
2: ?:Rejected,
@ -474,15 +482,15 @@ proc connectRoute(turn: Turn; ds: Cap; route: Route; transOff: int) =
onPublish(turn, ds, acceptPat) do (origin: Cap):
walk(turn, ds, origin, route, transOff, 0)
type StepCallback = proc (turn: Turn; step: Value; origin, next: Cap) {.gcsafe.}
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
proc spawnStepResolver(turn: Turn; ds: Cap; stepType: Value; cb: StepCallback) =
spawn($stepType & "-step", turn) do (turn: Turn):
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
spawnActor($stepType & "-step", turn) do (turn: var Turn):
let stepPat = grabRecord(stepType, grab())
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
let step = toRecord(stepType, stepDetail.value)
proc duringCallback(turn: Turn; ass: Value; h: Handle): TurnAction =
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
var res = ass.preservesTo Resolved
if res.isSome:
if res.get.orKind == ResolvedKind.accepted and
@ -491,46 +499,63 @@ proc spawnStepResolver(turn: Turn; ds: Cap; stepType: Value; cb: StepCallback) =
else:
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: res.get))
proc action(turn: Turn) =
proc action(turn: var Turn) =
stop(turn)
result = action
publish(turn, origin, Resolve(
step: step, observer: newCap(turn, during(duringCallback))))
proc spawnRelays*(turn: Turn; ds: Cap) =
proc spawnRelays*(turn: var Turn; ds: Cap) =
## Spawn actors that manage routes and appeasing gatekeepers.
spawn("transport-connector", turn) do (turn: Turn):
spawnActor("transport-connector", turn) do (turn: var Turn):
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
# Use a generic pattern and type matching
# in the during handler because it is easy.
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
# during(turn, ds, stdioPat) do:
# connectTransport(turn, ds, Stdio())
during(turn, ds, stdioPat) do:
connectTransport(turn, ds, Stdio())
# TODO: tcp pattern
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
connectTransport(turn, ds, ta.value)
try: connectTransport(turn, ds, ta.value)
except CatchableError as e:
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserve,
resolved: rejected(embed e),
))
# TODO: unix pattern
# during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
# connectTransport(turn, ds, ta.value)
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
try: connectTransport(turn, ds, ta.value)
except CatchableError as e:
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserve,
resolved: rejected(embed e),
))
spawn("path-resolver", turn) do (turn: Turn):
spawnActor("path-resolver", turn) do (turn: var Turn):
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
during(turn, ds, pat) do (route: Literal[Route]):
for i, transAddr in route.value.transports:
connectRoute(turn, ds, route.value, i)
spawnStepResolver(turn, ds, "ref".toSymbol) do (
turn: Turn, step: Value, origin: Cap, next: Cap):
turn: var Turn, step: Value, origin: Cap, next: Cap):
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: next.accepted))
type BootProc* = proc (turn: Turn; ds: Cap) {.gcsafe.}
type BootProc* = proc (turn: var Turn; ds: Cap) {.closure.}
const defaultRoute* = "<route [<stdio>]>"
proc envRoute*: Route =
var text = getEnv("SYNDICATE_ROUTE")
## Get an route to a Syndicate capability from the calling environment.
## On UNIX this is the SYNDICATE_ROUTE environmental variable with a
## fallack to a defaultRoute_.
## See https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/raw/branch/main/schemas/gatekeeper.prs.
var text = getEnv("SYNDICATE_ROUTE", defaultRoute)
if text == "":
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
result.transports = @[initRecord("unix", tx)]
@ -540,8 +565,19 @@ proc envRoute*: Route =
if not result.fromPreserves(pr):
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
proc resolve*(turn: Turn; ds: Cap; route: Route; bootProc: BootProc) =
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
## Resolve `route` within `ds` and call `bootProc` with resolved capabilities.
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
bootProc(turn, dst)
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
## Resolve a capability from the calling environment
## and call `bootProc`. See envRoute_.
let
ds = newDataspace(turn)
pat = ResolvePath ?: {0: ?envRoute(), 3: ?:ResolvedAccepted}
during(turn, ds, pat) do (dst: Cap):
bootProc(turn, dst)
spawnRelays(turn, ds)
# TODO: define a runActor that comes preloaded with relaying

View File

@ -45,8 +45,8 @@ type
AssertionCache = HashSet[Value]
ObserverGroup = ref object # Endpoints
cachedCaptures: Bag[Value]
observers: Table[Cap, TableRef[Value, Handle]]
cachedCaptures: Bag[Captures]
observers: Table[Cap, TableRef[Captures, Handle]]
Leaf = ref object
cache: AssertionCache
@ -67,7 +67,7 @@ func isEmpty(cont: Continuation): bool =
type
ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
LeafProc = proc (l: Leaf; v: Value) {.closure.}
ObserverProc = proc (group: ObserverGroup; vs: Value) {.closure.}
ObserverProc = proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
result = cont.leafMap.getOrDefault(constPaths)
@ -76,7 +76,7 @@ proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
cont.leafMap[constPaths] = result
assert not cont.isEmpty
for ass in cont.cache:
var key = projectPaths(ass, constPaths)
let key = projectPaths(ass, constPaths)
if key.isSome:
var leaf = result.getOrDefault(get key)
if leaf.isNil:
@ -114,36 +114,36 @@ proc top(stack: TermStack): Value =
assert stack.len > 0
stack[stack.high]
proc modify(node: Node; outerValue: Value; event: EventKind;
proc modify(node: Node; turn: var Turn; outerValue: Value; event: EventKind;
modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) =
proc walk(cont: Continuation) =
proc walk(cont: Continuation; turn: var Turn) =
modCont(cont, outerValue)
for constPaths, constValMap in cont.leafMap.pairs:
var constVals = projectPaths(outerValue, constPaths)
let constVals = projectPaths(outerValue, constPaths)
if constVals.isSome:
case event
of addedEvent, messageEvent:
let leaf = constValMap.getLeaf(get constVals)
modLeaf(leaf, outerValue)
for capturePaths, observerGroup in leaf.observerGroups.pairs:
var captures = projectPaths(outerValue, capturePaths)
let captures = projectPaths(outerValue, capturePaths)
if captures.isSome:
modObs(observerGroup, captures.get.toPreserves)
modObs(turn, observerGroup, get captures)
of removedEvent:
let leaf = constValMap.getOrDefault(get constVals)
if not leaf.isNil:
modLeaf(leaf, outerValue)
for capturePaths, observerGroup in leaf.observerGroups.pairs:
var captures = projectPaths(outerValue, capturePaths)
let captures = projectPaths(outerValue, capturePaths)
if captures.isSome:
modObs(observerGroup, captures.get.toPreserves)
modObs(turn, observerGroup, get captures)
if leaf.isEmpty:
constValMap.del(get constVals)
proc walk(node: Node; termStack: TermStack) =
walk(node.continuation)
proc walk(node: Node; turn: var Turn; termStack: TermStack) =
walk(node.continuation, turn)
for selector, table in node.edges:
let
nextStack = pop(termStack, selector.popCount)
@ -153,11 +153,11 @@ proc modify(node: Node; outerValue: Value; event: EventKind;
if nextClass.kind != classNone:
let nextNode = table.getOrDefault(nextClass)
if not nextNode.isNil:
walk(nextNode, push(nextStack, get nextValue))
walk(nextNode, turn, push(nextStack, get nextValue))
if event == removedEvent and nextNode.isEmpty:
table.del(nextClass)
walk(node, @[@[outerValue].toPreserves])
walk(node, turn, @[@[outerValue].toPreserves])
proc getOrNew[A, B, C](t: var Table[A, TableRef[B, C]], k: A): TableRef[B, C] =
result = t.getOrDefault(k)
@ -223,11 +223,11 @@ proc getEndpoints(leaf: Leaf; capturePaths: Paths): ObserverGroup =
leaf.observerGroups[capturePaths] = result
for term in leaf.cache:
# leaf.cache would be empty if observers come before assertions
var captures = projectPaths(term, capturePaths)
let captures = projectPaths(term, capturePaths)
if captures.isSome:
discard result.cachedCaptures.change(captures.get.toPreserves, +1)
discard result.cachedCaptures.change(get captures, +1)
proc add*(index: var Index; pattern: Pattern; observer: Cap) =
proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
let
cont = index.root.extend(pattern)
analysis = analyse pattern
@ -235,12 +235,12 @@ proc add*(index: var Index; pattern: Pattern; observer: Cap) =
leaf = constValMap.getLeaf(analysis.constValues)
endpoints = leaf.getEndpoints(analysis.capturePaths)
# TODO if endpoints.cachedCaptures.len > 0:
var captureMap = newTable[Value, Handle]()
var captureMap = newTable[seq[Value], Handle]()
for capture in endpoints.cachedCaptures.items:
captureMap[capture] = publish(observer, capture.toPreserves)
captureMap[capture] = publish(turn, observer, capture)
endpoints.observers[observer] = captureMap
proc remove*(index: var Index; pattern: Pattern; observer: Cap) =
proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
let
cont = index.root.extend(pattern)
analysis = analyse pattern
@ -250,9 +250,9 @@ proc remove*(index: var Index; pattern: Pattern; observer: Cap) =
if not leaf.isNil:
let endpoints = leaf.observerGroups.getOrDefault(analysis.capturePaths)
if not endpoints.isNil:
var captureMap: TableRef[Value, Handle]
var captureMap: TableRef[seq[Value], Handle]
if endpoints.observers.pop(observer, captureMap):
for handle in captureMap.values: retract(observer, handle)
for handle in captureMap.values: retract(turn, handle)
if endpoints.observers.len == 0:
leaf.observerGroups.del(analysis.capturePaths)
if leaf.observerGroups.len == 0:
@ -260,7 +260,7 @@ proc remove*(index: var Index; pattern: Pattern; observer: Cap) =
if constValMap.len == 0:
cont.leafMap.del(analysis.constPaths)
proc adjustAssertion(index: var Index; outerValue: Value; delta: int): bool =
proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta: int): bool =
case index.allAssertions.change(outerValue, delta)
of cdAbsentToPresent:
result = true
@ -268,37 +268,37 @@ proc adjustAssertion(index: var Index; outerValue: Value; delta: int): bool =
c.cache.incl(v)
proc modLeaf(l: Leaf; v: Value) =
l.cache.incl(v)
proc modObserver(group: ObserverGroup; vs: Value) =
proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
let change = group.cachedCaptures.change(vs, +1)
if change == cdAbsentToPresent:
for (observer, captureMap) in group.observers.pairs:
captureMap[vs] = publish(observer, vs.toPreserves)
captureMap[vs] = publish(turn, observer, vs.toPreserves)
# TODO: this handle is coming from the facet?
modify(index.root, outerValue, addedEvent, modContinuation, modLeaf, modObserver)
modify(index.root, turn, outerValue, addedEvent, modContinuation, modLeaf, modObserver)
of cdPresentToAbsent:
result = true
proc modContinuation(c: Continuation; v: Value) =
c.cache.excl(v)
proc modLeaf(l: Leaf; v: Value) =
l.cache.excl(v)
proc modObserver(group: ObserverGroup; vs: Value) =
proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
for (observer, captureMap) in group.observers.pairs:
var h: Handle
if captureMap.take(vs, h):
retract(observer, h)
modify(index.root, outerValue, removedEvent, modContinuation, modLeaf, modObserver)
retract(observer.target, turn, h)
modify(index.root, turn, outerValue, removedEvent, modContinuation, modLeaf, modObserver)
else: discard
proc continuationNoop(c: Continuation; v: Value) = discard
proc leafNoop(l: Leaf; v: Value) = discard
proc add*(index: var Index; v: Value): bool =
adjustAssertion(index, v, +1)
proc remove*(index: var Index; v: Value): bool =
adjustAssertion(index, v, -1)
proc add*(index: var Index; turn: var Turn; v: Value): bool =
adjustAssertion(index, turn, v, +1)
proc remove*(index: var Index; turn: var Turn; v: Value): bool =
adjustAssertion(index, turn, v, -1)
proc deliverMessage*(index: var Index; v: Value) =
proc observersCb(group: ObserverGroup; vs: Value) =
for observer in group.observers.keys: message(observer, vs)
index.root.modify(v, messageEvent, continuationNoop, leafNoop, observersCb)
proc deliverMessage*(index: var Index; turn: var Turn; v: Value) =
proc observersCb(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
for observer in group.observers.keys: message(turn, observer, vs)
index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb)

View File

@ -1,6 +1,6 @@
# Package
version = "20240301"
version = "20240304"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "Unlicense"
@ -9,4 +9,4 @@ srcDir = "src"
# Dependencies
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/alaviss/nim-sys.git", "https://github.com/nim-works/cps"
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps"

View File

@ -1,7 +1,3 @@
include_rules
NIM_FLAGS += --define:traceSyndicate
: foreach *.prs |> !preserves_schema_nim |> | {schema}
: foreach t*.nim | ../../preserves-nim/<tests> {schema} $(SYNDICATE_PROTOCOL) |> !nim |> | ../<test> {test}
: foreach {test} |> SYNDICATE_TRACE_FILE=%o ./%f |> ./%B.trace.bin {bintrace}
: foreach {bintrace} |> preserves-tool convert <%f >%o |> %B
: foreach t*.nim | ../../preserves-nim/<tests> {schema} $(SYNDICATE_PROTOCOL) |> !nim_run |> | ../<test>

View File

@ -23,17 +23,17 @@ proc readStdin(facet: Facet; ds: Cap; username: string) =
readLine()
readLine()
proc chat(facet: Facet; ds: Cap; username: string) =
during(facet, ds, ?:Present) do (who: string):
proc chat(turn: var Turn; ds: Cap; username: string) =
during(turn, ds, ?:Present) do (who: string):
echo who, " joined"
do:
echo who, " left"
onMessage(facet, ds, ?:Says) do (who: string, what: string):
onMessage(turn, ds, ?:Says) do (who: string, what: string):
echo who, ": ", what
discard publish(facet, ds, Present(username: username))
readStdin(facet, ds, username)
discard publish(turn, ds, Present(username: username))
readStdin(turn.facet, ds, username)
proc main =
let route = envRoute()
@ -48,10 +48,9 @@ proc main =
if username == "":
stderr.writeLine "--user: unspecified"
else:
runActor("chat") do (root: Facet):
let ds = facet.newDataspace()
facet.spawnRelays(ds)
resolve(facet, ds, route) do (remote: Cap):
chat(facet, remote, username)
runActor("chat") do (turn: var Turn; root: Cap):
spawnRelays(turn, root)
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
chat(turn, ds, username)
main()

View File

@ -1,32 +1,26 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[os, times]
import pkg/cps
import std/times
import pkg/sys/ioqueue
import sam/syndicate
import sam/actors/timers
import syndicate, syndicate/actors/timers
let actor = bootActor("timer-test") do (facet: Facet):
let timers = facet.newDataspace()
spawnTimerActor(timers)
let actor = bootActor("timer-test") do (turn: var Turn):
let timers = newDataspace(turn)
spawnTimerActor(timers, turn)
onPublish(timers, ?LaterThan(seconds: 1356091200)):
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
echo "now in 13th bʼakʼtun"
timers.after(initDuration(seconds = 3)) do ():
after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn):
echo "third timer expired"
stopActor(facet)
stopActor(turn)
timers.after(initDuration(seconds = 1)) do ():
after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn):
echo "first timer expired"
timers.after(initDuration(seconds = 2)) do ():
after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn):
echo "second timer expired"
while not actor.stopped:
echo "running actor"
if not run(actor):
echo "run(actor) did not progress"
ioqueue.run()
echo "ioqueue.run finished"
echo "single run of ioqueue"
ioqueue.run()