Immediate self-messaging; flush message for relay

This commit is contained in:
Tony Garnock-Jones 2021-07-12 17:41:12 +02:00
parent d968eb34f2
commit 432b7bdf05
4 changed files with 84 additions and 53 deletions

View File

@ -1,3 +1,3 @@
´³bundle·µ³ tunnelRelay„´³schema·³version³ definitions·³Input´³orµµ±eof´³lit³eof„„µ±packet´³rec´³lit³packet„´³tupleµ´³named³bs´³atom³ ´³bundle·µ³ tunnelRelay„´³schema·³version³ definitions·³Input´³orµµ±eof´³lit³eof„„µ±packet´³rec´³lit³packet„´³tupleµ´³named³bs´³atom³
ByteString„„„„„„µ±segment´³rec´³lit³segment„´³tupleµ´³named³bs´³atom³ ByteString„„„„„„µ±segment´³rec´³lit³segment„´³tupleµ´³named³bs´³atom³
ByteString„„„„„„„„³Output´³rec´³lit³event„´³tupleµ´³named³oid´³refµ³sturdy„³Oid„„´³named³event´³refµ³internalProtocol„³Event„„„„„³SyncGc´³rec´³lit³sync-gc„´³tupleµ´³named³peer´³embedded´³lit<69>„„„„„„³ RelayProtocol´³orµµ±Input´³refµ„³Input„„µ±Output´³refµ„³Output„„µ±SyncGc´³refµ„³SyncGc„„„„„³ embeddedType´³refµ³ EntityRef„³Ref„„„„„ ByteString„„„„„„„„³Output´³rec´³lit³event„´³tupleµ´³named³oid´³refµ³sturdy„³Oid„„´³named³event´³refµ³internalProtocol„³Event„„„„„³SyncGc´³rec´³lit³sync-gc„´³tupleµ´³named³peer´³embedded´³lit<69>„„„„„„³ RelayProtocol´³orµµ±Input´³refµ„³Input„„µ±Output´³refµ„³Output„„µ±SyncGc´³refµ„³SyncGc„„µ±flush´³lit³flush„„„„„³ embeddedType´³refµ³ EntityRef„³Ref„„„„„

View File

@ -1,7 +1,7 @@
version 1 . version 1 .
embeddedType EntityRef.Ref . embeddedType EntityRef.Ref .
RelayProtocol = Input / Output / SyncGc . RelayProtocol = Input / Output / SyncGc / =flush .
Input = =eof / <packet @bs bytes> / <segment @bs bytes>. Input = =eof / <packet @bs bytes> / <segment @bs bytes>.
Output = <event @oid sturdy.Oid @event internalProtocol.Event>. Output = <event @oid sturdy.Oid @event internalProtocol.Event>.

View File

@ -1,4 +1,3 @@
use futures::Future;
pub use futures::future::BoxFuture; pub use futures::future::BoxFuture;
pub use std::future::ready; pub use std::future::ready;
@ -54,14 +53,20 @@ pub trait Entity: Send {
} }
} }
type OutboundAssertions = Map<Handle, Arc<Ref>>; enum Destination {
ImmediateSelf(Oid),
Remote(Arc<Ref>),
}
type OutboundAssertions = Map<Handle, Destination>;
type PendingEventQueue = Vec<(Arc<Ref>, Event)>;
// This is what other implementations call a "Turn", renamed here to // This is what other implementations call a "Turn", renamed here to
// avoid conflicts with schemas::internal_protocol::Turn. // avoid conflicts with schemas::internal_protocol::Turn.
pub struct Activation<'activation> { pub struct Activation<'activation> {
pub actor: &'activation mut Actor, pub actor: &'activation mut Actor,
queues: HashMap<ActorId, Vec<(Arc<Ref>, Event)>>, queues: HashMap<ActorId, PendingEventQueue>,
turn_end_revisit_flag: bool, immediate_self: Vec<TurnEvent>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -119,26 +124,53 @@ impl<'activation> Activation<'activation> {
Activation { Activation {
actor, actor,
queues: HashMap::new(), queues: HashMap::new(),
turn_end_revisit_flag: false, immediate_self: Vec::new(),
} }
} }
fn immediate_oid(&self, r: &Arc<Ref>) -> Oid {
if r.relay.actor_id != self.actor.actor_id {
panic!("Cannot use immediate_self to send to remote peers");
}
r.target.clone()
}
pub fn assert<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<Assertion> { pub fn assert<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<Assertion> {
let handle = crate::next_handle(); let handle = crate::next_handle();
self.queue_for(r).push((Arc::clone(r), Event::Assert(Box::new( self.queue_for(r).push((Arc::clone(r), Event::Assert(Box::new(
Assert { assertion: Assertion(a.into()), handle: handle.clone() })))); Assert { assertion: Assertion(a.into()), handle: handle.clone() }))));
self.actor.outbound_assertions.insert(handle.clone(), Arc::clone(r)); self.actor.outbound_assertions.insert(handle.clone(), Destination::Remote(Arc::clone(r)));
handle
}
pub fn assert_immediate_self<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<Assertion> {
let oid = self.immediate_oid(r);
let handle = crate::next_handle();
self.immediate_self.push(TurnEvent {
oid: oid.clone(),
event: Event::Assert(Box::new(
Assert { assertion: Assertion(a.into()), handle: handle.clone() })),
});
self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(oid));
handle handle
} }
pub fn retract(&mut self, handle: Handle) { pub fn retract(&mut self, handle: Handle) {
if let Some(r) = self.actor.outbound_assertions.remove(&handle) { if let Some(d) = self.actor.outbound_assertions.remove(&handle) {
self.retract_known_ref(r, handle) self.retract_known_ref(d, handle)
} }
} }
pub fn retract_known_ref(&mut self, r: Arc<Ref>, handle: Handle) { fn retract_known_ref(&mut self, d: Destination, handle: Handle) {
self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))); match d {
Destination::Remote(r) =>
self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))),
Destination::ImmediateSelf(oid) =>
self.immediate_self.push(TurnEvent {
oid,
event: Event::Retract(Box::new(Retract { handle })),
}),
}
} }
pub fn message<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<Assertion> { pub fn message<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<Assertion> {
@ -146,19 +178,25 @@ impl<'activation> Activation<'activation> {
Message { body: Assertion(m.into()) })))) Message { body: Assertion(m.into()) }))))
} }
pub fn message_immediate_self<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<Assertion> {
self.immediate_self.push(TurnEvent {
oid: self.immediate_oid(r),
event: Event::Message(Box::new(Message { body: Assertion(m.into()) })),
})
}
pub fn sync(&mut self, r: &Arc<Ref>, peer: Arc<Ref>) { pub fn sync(&mut self, r: &Arc<Ref>, peer: Arc<Ref>) {
self.queue_for(r).push((Arc::clone(r), Event::Sync(Box::new(Sync { peer })))); self.queue_for(r).push((Arc::clone(r), Event::Sync(Box::new(Sync { peer }))));
} }
pub fn set_turn_end_flag(&mut self) { fn queue_for(&mut self, r: &Arc<Ref>) -> &mut PendingEventQueue {
self.turn_end_revisit_flag = true;
}
fn queue_for(&mut self, r: &Arc<Ref>) -> &mut Vec<(Arc<Ref>, Event)> {
self.queues.entry(r.relay.actor_id).or_default() self.queues.entry(r.relay.actor_id).or_default()
} }
fn deliver(&mut self) { fn deliver(&mut self) {
if !self.immediate_self.is_empty() {
panic!("Unprocessed immediate_self events remain at deliver() time");
}
for (_actor_id, turn) in std::mem::take(&mut self.queues).into_iter() { for (_actor_id, turn) in std::mem::take(&mut self.queues).into_iter() {
if turn.len() == 0 { continue; } if turn.len() == 0 { continue; }
let first_ref = Arc::clone(&turn[0].0); let first_ref = Arc::clone(&turn[0].0);
@ -189,9 +227,6 @@ impl<'activation> Activation<'activation> {
impl<'activation> Drop for Activation<'activation> { impl<'activation> Drop for Activation<'activation> {
fn drop(&mut self) { fn drop(&mut self) {
if self.turn_end_revisit_flag {
panic!("turn_end_revisit_flag is set");
}
self.deliver() self.deliver()
} }
} }
@ -422,35 +457,31 @@ impl Actor {
self.oid_map.remove(&oid); self.oid_map.remove(&oid);
Ok(false) Ok(false)
} }
SystemMessage::Turn(Turn(events)) => { SystemMessage::Turn(Turn(mut events)) => {
let mut t = Activation::for_actor(self); let mut t = Activation::for_actor(self);
let mut revisit_oids = Vec::new(); loop {
for TurnEvent { oid, event } in events.into_iter() { for TurnEvent { oid, event } in events.into_iter() {
t.with_oid(&oid, |_| Ok(()), |t, e| match event { t.with_oid(&oid, |_| Ok(()), |t, e| match event {
Event::Assert(b) => { Event::Assert(b) => {
let Assert { assertion: Assertion(assertion), handle } = *b; let Assert { assertion: Assertion(assertion), handle } = *b;
e.assert(t, assertion, handle) e.assert(t, assertion, handle)
} }
Event::Retract(b) => { Event::Retract(b) => {
let Retract { handle } = *b; let Retract { handle } = *b;
e.retract(t, handle) e.retract(t, handle)
} }
Event::Message(b) => { Event::Message(b) => {
let Message { body: Assertion(body) } = *b; let Message { body: Assertion(body) } = *b;
e.message(t, body) e.message(t, body)
} }
Event::Sync(b) => { Event::Sync(b) => {
let Sync { peer } = *b; let Sync { peer } = *b;
e.sync(t, peer) e.sync(t, peer)
} }
})?; })?;
if t.turn_end_revisit_flag {
t.turn_end_revisit_flag = false;
revisit_oids.push(oid);
} }
} events = std::mem::take(&mut t.immediate_self);
for oid in revisit_oids.into_iter() { if events.is_empty() { break; }
t.with_oid(&oid, |_| Ok(()), |t, e| e.turn_end(t))?;
} }
t.actor.queue_depth.fetch_sub(1, Ordering::Relaxed); // see (***) in this file t.actor.queue_depth.fetch_sub(1, Ordering::Relaxed); // see (***) in this file
Ok(false) Ok(false)

View File

@ -492,7 +492,8 @@ impl Entity for TunnelRelay {
tunnel_relay::RelayProtocol::Output(b) => match *b { tunnel_relay::RelayProtocol::Output(b) => match *b {
tunnel_relay::Output { oid, event } => { tunnel_relay::Output { oid, event } => {
if self.pending_outbound.is_empty() { if self.pending_outbound.is_empty() {
t.set_turn_end_flag(); t.message_immediate_self(
&self.self_ref, &tunnel_relay::RelayProtocol::Flush);
} }
let turn_event = TurnEvent { let turn_event = TurnEvent {
oid: Oid(oid.0), oid: Oid(oid.0),
@ -509,16 +510,15 @@ impl Entity for TunnelRelay {
} }
} }
} }
tunnel_relay::RelayProtocol::Flush => {
let events = std::mem::take(&mut self.pending_outbound);
self.send_packet(Packet::Turn(Box::new(Turn(events))))?
}
} }
} }
Ok(()) Ok(())
} }
fn turn_end(&mut self, _t: &mut Activation) -> ActorResult {
let events = std::mem::take(&mut self.pending_outbound);
self.send_packet(Packet::Turn(Box::new(Turn(events))))
}
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture<ActorResult> { fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture<ActorResult> {
if let Err(e) = exit_status { if let Err(e) = exit_status {
let e = e.clone(); let e = e.clone();