From 6b642645f9b8cee11000568fb86a87ede30cf9a2 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Sun, 14 Jan 2024 12:09:49 +0200 Subject: [PATCH] Make sync work --- src/syndicate/actors.nim | 12 +++++++++--- src/syndicate/protocols/protocol.nim | 2 +- src/syndicate/relays.nim | 18 +++++++++++------- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index cc42ea3..8a10275 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -325,7 +325,6 @@ proc message*[T](turn: var Turn; r: Cap; v: T) = proc sync(turn: var Turn; e: Entity; peer: Cap) = e.sync(turn, peer) - # or turn.message(peer, true) ? proc sync*(turn: var Turn; r, peer: Cap) = enqueue(turn, r.relay) do (turn: var Turn): @@ -595,8 +594,15 @@ proc newCap*(relay: Facet; e: Entity): Cap = proc newCap*(turn; e: Entity): Cap = Cap(relay: turn.facet, target: e) -proc sync*(turn, refer: Cap, cb: proc(t: Turn) {.gcsafe.}) = - raiseAssert "not implemented" +type SyncContinuation {.final.} = ref object of Entity + action: TurnAction + +method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) = + entity.action(turn) + +proc sync*(turn: var Turn; refer: Cap; act: TurnAction) = + let e = SyncContinuation(action: act) + sync(turn, refer, newCap(turn, SyncContinuation(action: act))) proc running*(actor): bool = result = not actor.exited diff --git a/src/syndicate/protocols/protocol.nim b/src/syndicate/protocols/protocol.nim index b9daadb..dfdba81 100644 --- a/src/syndicate/protocols/protocol.nim +++ b/src/syndicate/protocols/protocol.nim @@ -20,7 +20,7 @@ type Extension* = Value Sync* {.preservesRecord: "sync".} = object - `peer`* {.preservesLiteral: "#!#t".}: tuple[] + `peer`* {.preservesEmbedded.}: Value TurnEvent* {.preservesTuple.} = object `oid`*: Oid diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index c325f9a..7758631 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -25,6 +25,7 @@ type Assertion = Value WireRef = sturdy.WireRef Turn = syndicate.Turn + Handle = actors.Handle type PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.} @@ -146,10 +147,11 @@ method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} = var peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] - discard rewriteCapOut(re.relay, turn.newCap(peerEntity), exported) - # TODO: discard? + wr = rewriteCapOut(re.relay, turn.newCap(peerEntity), exported) peerEntity.e = exported[0] - re.send(turn, Event(orKind: EventKind.Sync)) + var ev = Event(orKind: EventKind.Sync) + ev.sync.peer = wr.toPreserves.embed + re.send(turn, ev) proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity = RelayEntity(label: label, relay: r, oid: o) @@ -170,10 +172,12 @@ proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap case n.orKind of WireRefKind.mine: var e = relay.imported.grab(n.mine.oid) - if e.isNil: e = newWireSymbol( - relay.imported, - n.mine.oid, - newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid))) + if e.isNil: + e = newWireSymbol( + relay.imported, + n.mine.oid, + newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)), + ) imported.add e result = e.cap of WireRefKind.yours: