Fix Ref rewriting

This commit is contained in:
Emery Hemingway 2022-03-16 10:34:29 -05:00
parent 697ef56a27
commit 829c0bf61a
4 changed files with 22 additions and 24 deletions

View File

@ -7,7 +7,7 @@ import ./syndicate/[actors, dataspaces, durings, patterns]
from ./syndicate/relays import connectStdio, connectUnix
export Assertion, Facet, Handle, Ref, Turn, TurnAction, bootDataspace, `?`, connectStdio, connectUnix, drop, grab, publish, replace, run
export Assertion, Facet, Handle, Ref, Turn, TurnAction, bootDataspace, `?`, `$`, connectStdio, connectUnix, drop, grab, message, publish, replace, run
type
PublishProc = proc (turn: var Turn; v: Assertion; h: Handle) {.closure.}

View File

@ -336,8 +336,8 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} =
parent.get.children.excl facet
block:
var turn = Turn(facet: facet, queues: turn.queues)
for child in facet.children:
child.terminate(turn, orderly)
while facet.children.len > 0:
facet.children.pop.terminate(turn, orderly)
if orderly:
for act in facet.shutdownActions:
act(turn)

View File

@ -34,7 +34,8 @@ proc grab*(mem: Membrane; key: Oid|Ref): WireSymbol =
proc drop*(mem: var Membrane; sym: WireSymbol) =
## Drop a `WireSymbol` from a `Membrane`.
when not defined(release): assert sym.mem == mem
when not defined(release): assert sym.mem == mem,
"cannot drop WireSymbol at the wrong Membrane"
assert sym.count > 0
dec sym.count
if sym.count < 1:
@ -43,7 +44,7 @@ proc drop*(mem: var Membrane; sym: WireSymbol) =
proc newWireSymbol*(mem: var Membrane; o: Oid; r: Ref): WireSymbol =
## Allocate a `WireSymbol` at a `Membrane`.
result = WireSymbol(oid: o, `ref`: r)
when not defined(release): result.mem = mem
result = WireSymbol(oid: o, `ref`: r, count: 1)
mem.byOid[result.oid] = result
mem.byRef[result.`ref`] = result
when not defined(release): result.mem = mem

View File

@ -73,28 +73,28 @@ proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity =
SyncPeerEntity(relay: r, peer: p)
proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef =
if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay:
trace "do the rewriteRefOut that wasn't being done before"
result = WireRef(
orKind: WirerefKind.yours,
yours: WireRefYours[Ref](oid: `ref`.target.oid))
if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay and `ref`.attenuation.len == 0:
WireRef(orKind: WireRefKind.yours, yours: WireRefYours[Ref](oid: `ref`.target.oid))
else:
var ws = grab(relay.exported, `ref`)
if ws.isNil:
assert(not transient, "Cannot send transient reference")
inc relay.nextLocalOid
doAssert(not transient, "Cannot send transient reference")
ws = newWireSymbol(relay.exported, relay.nextLocalOid, `ref`)
inc relay.nextLocalOid
exported.add ws
result = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: ws.oid))
WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: ws.oid))
proc rewriteOut(relay: Relay; v: Assertion; transient: bool):
tuple[rewritten: WireAssertion, exported: seq[WireSymbol]] =
var exported: seq[WireSymbol]
var rewritten = mapEmbeds[Ref, WireRef](v) do (r: Ref) -> WireRef:
result = rewriteRefOut(relay, r, transient, exported)
(rewritten, exported)
result.rewritten = mapEmbeds[Ref, WireRef](v) do (r: Ref) -> WireRef:
rewriteRefOut(relay, r, transient, exported)
result.exported = exported
proc register(relay: Relay; v: Assertion): WireAssertion =
rewriteOut(relay, v, false).rewritten
proc register(relay: Relay; v: Assertion; h: Handle): WireAssertion =
var (rewritten, exported) = rewriteOut(relay, v, false)
@ -138,11 +138,8 @@ method retract(re: RelayEntity; t: var Turn; h: Handle) =
retract: Retract(handle: h))
method message(re: RelayEntity; turn: var Turn; msg: Assertion) =
var
ev = Event(orKind: EventKind.Message)
(body, _) = rewriteOut(re.relay, msg, true)
ev.message = Message[WireRef](body: body)
re.send ev
re.send Event(orKind: EventKind.Message,
message: Message[WireRef](body: register(re.relay, msg)))
method sync(re: RelayEntity; turn: var Turn; peer: Ref) =
var