Reduce methods to proc pointers
This commit is contained in:
parent
8cf7beeb0d
commit
3b9bbdf0fe
|
@ -30,8 +30,17 @@ type
|
|||
Caveat = sturdy.Caveat[Ref]
|
||||
Rewrite = sturdy.Rewrite[Ref]
|
||||
|
||||
PublishProc* = proc (e: Entity; turn: var Turn; v: Assertion; h: Handle) {.gcsafe.}
|
||||
RetractProc* = proc (e: Entity; turn: var Turn; h: Handle) {.gcsafe.}
|
||||
MessageProc* = proc (e: Entity; turn: var Turn; v: Assertion) {.gcsafe.}
|
||||
SyncProc* = proc (e: Entity; turn: var Turn; peer: Ref) {.gcsafe.}
|
||||
|
||||
Entity* = ref object of RootObj
|
||||
oid*: Oid # oid is how Entities are identified over the wire
|
||||
publishImpl*: PublishProc
|
||||
retractImpl*: RetractProc
|
||||
messageImpl*: MessageProc
|
||||
syncImpl*: SyncProc
|
||||
|
||||
Ref* {.unpreservable.} = ref object # TODO: rename
|
||||
relay*: Facet
|
||||
|
@ -82,17 +91,36 @@ using
|
|||
turn: var Turn
|
||||
action: TurnAction
|
||||
|
||||
method publish(e: Entity; turn: var Turn; v: Assertion; h: Handle) {.base.} =
|
||||
raiseAssert "Entity does not implement publish"
|
||||
proc setProcs*(
|
||||
result: Entity,
|
||||
publish: PublishProc = nil,
|
||||
retract: RetractProc = nil,
|
||||
message: MessageProc = nil,
|
||||
sync: SyncProc = nil) {.inline.} =
|
||||
result.publishImpl = publish
|
||||
result.retractImpl = retract
|
||||
result.messageImpl = message
|
||||
result.syncImpl = sync
|
||||
|
||||
method retract(e: Entity; turn: var Turn; h: Handle) {.base.} =
|
||||
raiseAssert "Entity does not implement retract"
|
||||
proc newEntity*(
|
||||
publish: PublishProc = nil,
|
||||
retract: RetractProc = nil,
|
||||
message: MessageProc = nil,
|
||||
sync: SyncProc = nil): Entity =
|
||||
new result
|
||||
result.setProcs(publish, retract, message, sync)
|
||||
|
||||
method message(e: Entity; turn: var Turn; v: Assertion) {.base.} =
|
||||
raiseAssert "Entity does not implement message"
|
||||
proc publish*(e: Entity; turn: var Turn; v: Assertion; h: Handle) =
|
||||
if not e.publishImpl.isNil: e.publishImpl(e, turn, v, h)
|
||||
|
||||
method sync(e: Entity; turn: var Turn; peer: Ref) {.base.} =
|
||||
raiseAssert "Entity does not implement sync"
|
||||
proc retract*(e: Entity; turn: var Turn; h: Handle) =
|
||||
if not e.retractImpl.isNil: e.retractImpl(e, turn, h)
|
||||
|
||||
proc message*(e: Entity; turn: var Turn; v: Assertion) =
|
||||
if not e.messageImpl.isNil: e.messageImpl(e, turn, v)
|
||||
|
||||
proc sync*(e: Entity; turn: var Turn; peer: Ref) =
|
||||
if not e.syncImpl.isNil: e.syncImpl(e, turn, peer)
|
||||
|
||||
proc labels(f: Facet): string =
|
||||
proc catLabels(f: Facet; labels: var string) =
|
||||
|
@ -287,6 +315,9 @@ proc message*(turn: var Turn; r: Ref; v: Assertion) =
|
|||
enqueue(turn, r.relay) do (turn: var Turn):
|
||||
r.target.message(turn, a)
|
||||
|
||||
proc message*[T](turn: var Turn; r: Ref; v: T) =
|
||||
message(turn, r, toPreserve(v, Ref))
|
||||
|
||||
proc sync(turn: var Turn; e: Entity; peer: Ref) =
|
||||
e.sync(turn, peer)
|
||||
# or turn.message(peer, true) ?
|
||||
|
|
|
@ -70,10 +70,11 @@ type
|
|||
action: TurnAction
|
||||
|
||||
type DuringEntity = ref object of Entity
|
||||
assertionMap: Table[Handle, DuringAction]
|
||||
cb: DuringProc
|
||||
assertionMap: Table[Handle, DuringAction]
|
||||
|
||||
method publish(de: DuringEntity; turn: var Turn; a: Assertion; h: Handle) =
|
||||
proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) =
|
||||
var de = DuringEntity(e)
|
||||
let action = de.cb(turn, a)
|
||||
# assert(not action.isNil "should have put in a no-op action")
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
|
@ -86,7 +87,8 @@ method publish(de: DuringEntity; turn: var Turn; a: Assertion; h: Handle) =
|
|||
of act:
|
||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||
|
||||
method retract(de: DuringEntity; turn: var Turn; h: Handle) =
|
||||
proc duringRetract(e: Entity; turn: var Turn; h: Handle) =
|
||||
var de = DuringEntity(e)
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
|
@ -99,6 +101,7 @@ method retract(de: DuringEntity; turn: var Turn; h: Handle) =
|
|||
|
||||
proc during*(cb: DuringProc): DuringEntity =
|
||||
result = DuringEntity(cb: cb)
|
||||
result.setProcs(publish = duringPublish, retract = duringRetract)
|
||||
|
||||
proc observe*(turn: var Turn; ds: Ref; pat: Pattern; e: Entity): Handle =
|
||||
publish(turn, ds, Observe(pattern: pat, observer: embed newRef(turn, e)))
|
||||
|
|
|
@ -10,8 +10,10 @@ type Oid = sturdy.Oid
|
|||
|
||||
type
|
||||
Assertion = Preserve[Ref]
|
||||
WireAssertion = Preserve[WireRef]
|
||||
WireRef = sturdy.WireRef[Ref]
|
||||
WireAssertion = Preserve[WireRef]
|
||||
Event = protocol.Event[WireRef]
|
||||
TurnEvent = protocol.TurnEvent[WireRef]
|
||||
Packet = protocol.Packet[WireRef]
|
||||
|
||||
Turn = actors.Turn
|
||||
|
@ -74,32 +76,36 @@ type
|
|||
label: string
|
||||
relay: Relay
|
||||
|
||||
#[
|
||||
proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity =
|
||||
SyncPeerEntity(relay: r, peer: p)
|
||||
]#
|
||||
|
||||
proc releaseRefOut(r: Relay; e: WireSymbol) =
|
||||
r.exported.drop e
|
||||
|
||||
method publish(se: SyncPeerEntity; t: var Turn; v: Assertion; h: Handle) =
|
||||
proc syncPeerPublish(e: Entity; t: var Turn; v: Assertion; h: Handle) =
|
||||
var se = SyncPeerEntity(e)
|
||||
se.handleMap[h] = publish(t, se.peer, v)
|
||||
|
||||
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
|
||||
proc syncPeerRetract(e: Entity; t: var Turn; h: Handle) =
|
||||
var se = SyncPeerEntity(e)
|
||||
var other: Handle
|
||||
if se.handleMap.pop(h, other):
|
||||
retract(t, other)
|
||||
|
||||
method message(se: SyncPeerEntity; t: var Turn; v: Assertion) =
|
||||
proc syncPeerMessage(e: Entity; t: var Turn; v: Assertion) =
|
||||
var se = SyncPeerEntity(e)
|
||||
if not se.e.isNil:
|
||||
se.relay.releaseRefOut(se.e)
|
||||
message(t, se.peer, v)
|
||||
|
||||
method sync(se: SyncPeerEntity; t: var Turn; peer: Ref) =
|
||||
proc syncPeerSync(e: Entity; t: var Turn; peer: Ref) =
|
||||
var se = SyncPeerEntity(e)
|
||||
sync(t, se.peer, peer)
|
||||
|
||||
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
|
||||
RelayEntity(label: label, relay: r, oid: o)
|
||||
proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity =
|
||||
result = SyncPeerEntity(relay: r, peer: p)
|
||||
result.setProcs(
|
||||
syncPeerPublish,
|
||||
syncPeerRetract,
|
||||
syncPeerMessage,
|
||||
syncPeerSync)
|
||||
|
||||
#[
|
||||
proc `$`(ws: WireSymbol): string =
|
||||
|
@ -148,7 +154,7 @@ proc send(r: Relay; msg: seq[byte]): Future[void] =
|
|||
assert(not r.packetWriter.isNil, "missing packetWriter proc")
|
||||
r.packetWriter(msg)
|
||||
|
||||
proc send(r: Relay; rOid: protocol.Oid; m: Event[WireRef]) =
|
||||
proc send(r: Relay; rOid: protocol.Oid; m: Event) =
|
||||
if r.pendingTurn.len == 0:
|
||||
callSoon:
|
||||
r.facet.run do (turn: var Turn):
|
||||
|
@ -158,42 +164,51 @@ proc send(r: Relay; rOid: protocol.Oid; m: Event[WireRef]) =
|
|||
echo "C: ", pkt
|
||||
#asyncCheck(turn, r.send(encode pkt))
|
||||
asyncCheck(turn, r.send(cast[seq[byte]](pkt)))
|
||||
r.pendingTurn.add TurnEvent[WireRef](oid: rOid, event: m)
|
||||
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||
|
||||
proc send(re: RelayEntity; ev: Event) =
|
||||
send(re.relay, protocol.Oid re.oid, ev)
|
||||
|
||||
method publish(re: RelayEntity; t: var Turn; v: Assertion; h: Handle) =
|
||||
var ev = protocol.Event[WireRef](
|
||||
orKind: EventKind.Assert,
|
||||
`assert`: protocol.Assert[WireRef](
|
||||
assertion: re.relay.register(v, h),
|
||||
handle: h))
|
||||
re.send ev
|
||||
proc relayPublish(e: Entity; t: var Turn; v: Assertion; h: Handle) =
|
||||
var
|
||||
re = RelayEntity(e)
|
||||
re.send Event(
|
||||
orKind: EventKind.Assert,
|
||||
`assert`: protocol.Assert[WireRef](
|
||||
assertion: re.relay.register(v, h),
|
||||
handle: h))
|
||||
|
||||
method retract(re: RelayEntity; t: var Turn; h: Handle) =
|
||||
proc relayRetract(e: Entity; t: var Turn; h: Handle) =
|
||||
var re = RelayEntity(e)
|
||||
re.relay.deregister h
|
||||
re.send Event[WireRef](
|
||||
re.send Event(
|
||||
orKind: EventKind.Retract,
|
||||
retract: Retract(handle: h))
|
||||
|
||||
method message(re: RelayEntity; turn: var Turn; msg: Assertion) =
|
||||
var ev = Event[WireRef](orKind: EventKind.Message)
|
||||
var (body, _) = rewriteOut(re.relay, msg, true)
|
||||
proc relayMessage(e: Entity; turn: var Turn; msg: Assertion) =
|
||||
var
|
||||
re = RelayEntity(e)
|
||||
ev = Event(orKind: EventKind.Message)
|
||||
(body, _) = rewriteOut(re.relay, msg, true)
|
||||
ev.message = Message[WireRef](body: body)
|
||||
re.send ev
|
||||
|
||||
method sync(re: RelayEntity; turn: var Turn; peer: Ref) =
|
||||
proc relaySync(e: Entity; turn: var Turn; peer: Ref) =
|
||||
var
|
||||
peerEntity = SyncPeerEntity(relay: re.relay, peer: peer)
|
||||
re = RelayEntity(e)
|
||||
peerEntity = newSyncPeerEntity(re.relay, peer)
|
||||
exported: seq[WireSymbol]
|
||||
discard rewriteRefOut(re.relay, turn.newRef(peerEntity), false, exported)
|
||||
# TODO: discard?
|
||||
peerEntity.e = exported[0]
|
||||
re.send Event[WireRef](
|
||||
re.send Event(
|
||||
orKind: EventKind.Sync,
|
||||
sync: Sync[WireRef](peer: embed toPreserve(false, WireRef))) # TODO: send the WireRef?
|
||||
|
||||
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
|
||||
result = RelayEntity(label: label, relay: r, oid: o)
|
||||
result.setProcs(relayPublish, relayRetract, relayMessage, relaySync)
|
||||
|
||||
using
|
||||
relay: Relay
|
||||
facet: Facet
|
||||
|
@ -231,7 +246,7 @@ proc rewriteIn(relay; facet; a: Preserve[WireRef]):
|
|||
|
||||
proc close(r: Relay) = discard
|
||||
|
||||
proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event[WireRef]) =
|
||||
proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) =
|
||||
case event.orKind
|
||||
of EventKind.Assert:
|
||||
let (a, imported) = rewriteIn(relay, turn.activeFacet, event.assert.assertion)
|
||||
|
@ -328,11 +343,13 @@ import protocols/gatekeeper
|
|||
|
||||
type ShutdownEntity = ref object of Entity
|
||||
|
||||
method publish(e: ShutdownEntity; t: var Turn; v: Assertion; h: Handle) = discard
|
||||
|
||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||
proc shutdownRetract(e: Entity; turn: var Turn; h: Handle) =
|
||||
stopActor(turn)
|
||||
|
||||
proc newShutdownEntity(): ShutdownEntity =
|
||||
new result
|
||||
result.setProcs(retract = shutdownRetract)
|
||||
|
||||
type
|
||||
SturdyRef = sturdy.SturdyRef[Ref]
|
||||
Resolve = gatekeeper.Resolve[Ref]
|
||||
|
@ -348,7 +365,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: During
|
|||
const recvSize = 4096
|
||||
var shutdownRef: Ref
|
||||
let reenable = turn.activeFacet.preventInertCheck()
|
||||
let connectionClosedRef = newRef(turn, ShutdownEntity())
|
||||
let connectionClosedRef = newRef(turn, newShutdownEntity())
|
||||
proc setup(turn: var Turn; relay: Relay) =
|
||||
let facet = turn.activeFacet
|
||||
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
||||
|
@ -362,7 +379,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: During
|
|||
socket.recv(recvSize).addCallback(recvCb)
|
||||
turn.activeFacet.actor.atExit do (turn: var Turn): close(socket)
|
||||
discard publish(turn, connectionClosedRef, true)
|
||||
shutdownRef = newRef(turn, ShutdownEntity())
|
||||
shutdownRef = newRef(turn, newShutdownEntity())
|
||||
var fut = newFuture[void]"connectUnix"
|
||||
connectUnix(socket, path).addCallback do (f: Future[void]):
|
||||
read f
|
||||
|
|
Loading…
Reference in New Issue