From 432b7bdf0549689670027dc913e045b346f2413c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 12 Jul 2021 17:41:12 +0200 Subject: [PATCH] Immediate self-messaging; flush message for relay --- local-protocols/schema-bundle.bin | 2 +- local-protocols/schemas/tunnelRelay.prs | 2 +- src/actor.rs | 121 +++++++++++++++--------- src/relay.rs | 12 +-- 4 files changed, 84 insertions(+), 53 deletions(-) diff --git a/local-protocols/schema-bundle.bin b/local-protocols/schema-bundle.bin index ed6f0fb..46c4d91 100644 --- a/local-protocols/schema-bundle.bin +++ b/local-protocols/schema-bundle.bin @@ -1,3 +1,3 @@ “³bundle·µ³ tunnelRelay„“³schema·³version‘³ definitions·³Input“³orµµ±eof“³lit³eof„„µ±packet“³rec“³lit³packet„“³tupleµ“³named³bs“³atom³ ByteString„„„„„„µ±segment“³rec“³lit³segment„“³tupleµ“³named³bs“³atom³ -ByteString„„„„„„„„³Output“³rec“³lit³event„“³tupleµ“³named³oid“³refµ³sturdy„³Oid„„“³named³event“³refµ³internalProtocol„³Event„„„„„³SyncGc“³rec“³lit³sync-gc„“³tupleµ“³named³peer“³embedded“³lit„„„„„„³ RelayProtocol“³orµµ±Input“³refµ„³Input„„µ±Output“³refµ„³Output„„µ±SyncGc“³refµ„³SyncGc„„„„„³ embeddedType“³refµ³ EntityRef„³Ref„„„„„ \ No newline at end of file +ByteString„„„„„„„„³Output“³rec“³lit³event„“³tupleµ“³named³oid“³refµ³sturdy„³Oid„„“³named³event“³refµ³internalProtocol„³Event„„„„„³SyncGc“³rec“³lit³sync-gc„“³tupleµ“³named³peer“³embedded“³lit„„„„„„³ RelayProtocol“³orµµ±Input“³refµ„³Input„„µ±Output“³refµ„³Output„„µ±SyncGc“³refµ„³SyncGc„„µ±flush“³lit³flush„„„„„³ embeddedType“³refµ³ EntityRef„³Ref„„„„„ \ No newline at end of file diff --git a/local-protocols/schemas/tunnelRelay.prs b/local-protocols/schemas/tunnelRelay.prs index 66fe602..d7d834d 100644 --- a/local-protocols/schemas/tunnelRelay.prs +++ b/local-protocols/schemas/tunnelRelay.prs @@ -1,7 +1,7 @@ version 1 . embeddedType EntityRef.Ref . -RelayProtocol = Input / Output / SyncGc . +RelayProtocol = Input / Output / SyncGc / =flush . Input = =eof / / . Output = . diff --git a/src/actor.rs b/src/actor.rs index 0ce4397..c1ca918 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -1,4 +1,3 @@ -use futures::Future; pub use futures::future::BoxFuture; pub use std::future::ready; @@ -54,14 +53,20 @@ pub trait Entity: Send { } } -type OutboundAssertions = Map>; +enum Destination { + ImmediateSelf(Oid), + Remote(Arc), +} + +type OutboundAssertions = Map; +type PendingEventQueue = Vec<(Arc, Event)>; // This is what other implementations call a "Turn", renamed here to // avoid conflicts with schemas::internal_protocol::Turn. pub struct Activation<'activation> { pub actor: &'activation mut Actor, - queues: HashMap, Event)>>, - turn_end_revisit_flag: bool, + queues: HashMap, + immediate_self: Vec, } #[derive(Debug)] @@ -119,26 +124,53 @@ impl<'activation> Activation<'activation> { Activation { actor, queues: HashMap::new(), - turn_end_revisit_flag: false, + immediate_self: Vec::new(), } } + fn immediate_oid(&self, r: &Arc) -> Oid { + if r.relay.actor_id != self.actor.actor_id { + panic!("Cannot use immediate_self to send to remote peers"); + } + r.target.clone() + } + pub fn assert(&mut self, r: &Arc, a: M) -> Handle where M: Into { let handle = crate::next_handle(); self.queue_for(r).push((Arc::clone(r), Event::Assert(Box::new( Assert { assertion: Assertion(a.into()), handle: handle.clone() })))); - self.actor.outbound_assertions.insert(handle.clone(), Arc::clone(r)); + self.actor.outbound_assertions.insert(handle.clone(), Destination::Remote(Arc::clone(r))); + handle + } + + pub fn assert_immediate_self(&mut self, r: &Arc, a: M) -> Handle where M: Into { + let oid = self.immediate_oid(r); + let handle = crate::next_handle(); + self.immediate_self.push(TurnEvent { + oid: oid.clone(), + event: Event::Assert(Box::new( + Assert { assertion: Assertion(a.into()), handle: handle.clone() })), + }); + self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(oid)); handle } pub fn retract(&mut self, handle: Handle) { - if let Some(r) = self.actor.outbound_assertions.remove(&handle) { - self.retract_known_ref(r, handle) + if let Some(d) = self.actor.outbound_assertions.remove(&handle) { + self.retract_known_ref(d, handle) } } - pub fn retract_known_ref(&mut self, r: Arc, handle: Handle) { - self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))); + fn retract_known_ref(&mut self, d: Destination, handle: Handle) { + match d { + Destination::Remote(r) => + self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))), + Destination::ImmediateSelf(oid) => + self.immediate_self.push(TurnEvent { + oid, + event: Event::Retract(Box::new(Retract { handle })), + }), + } } pub fn message(&mut self, r: &Arc, m: M) where M: Into { @@ -146,19 +178,25 @@ impl<'activation> Activation<'activation> { Message { body: Assertion(m.into()) })))) } + pub fn message_immediate_self(&mut self, r: &Arc, m: M) where M: Into { + self.immediate_self.push(TurnEvent { + oid: self.immediate_oid(r), + event: Event::Message(Box::new(Message { body: Assertion(m.into()) })), + }) + } + pub fn sync(&mut self, r: &Arc, peer: Arc) { self.queue_for(r).push((Arc::clone(r), Event::Sync(Box::new(Sync { peer })))); } - pub fn set_turn_end_flag(&mut self) { - self.turn_end_revisit_flag = true; - } - - fn queue_for(&mut self, r: &Arc) -> &mut Vec<(Arc, Event)> { + fn queue_for(&mut self, r: &Arc) -> &mut PendingEventQueue { self.queues.entry(r.relay.actor_id).or_default() } fn deliver(&mut self) { + if !self.immediate_self.is_empty() { + panic!("Unprocessed immediate_self events remain at deliver() time"); + } for (_actor_id, turn) in std::mem::take(&mut self.queues).into_iter() { if turn.len() == 0 { continue; } let first_ref = Arc::clone(&turn[0].0); @@ -189,9 +227,6 @@ impl<'activation> Activation<'activation> { impl<'activation> Drop for Activation<'activation> { fn drop(&mut self) { - if self.turn_end_revisit_flag { - panic!("turn_end_revisit_flag is set"); - } self.deliver() } } @@ -422,35 +457,31 @@ impl Actor { self.oid_map.remove(&oid); Ok(false) } - SystemMessage::Turn(Turn(events)) => { + SystemMessage::Turn(Turn(mut events)) => { let mut t = Activation::for_actor(self); - let mut revisit_oids = Vec::new(); - for TurnEvent { oid, event } in events.into_iter() { - t.with_oid(&oid, |_| Ok(()), |t, e| match event { - Event::Assert(b) => { - let Assert { assertion: Assertion(assertion), handle } = *b; - e.assert(t, assertion, handle) - } - Event::Retract(b) => { - let Retract { handle } = *b; - e.retract(t, handle) - } - Event::Message(b) => { - let Message { body: Assertion(body) } = *b; - e.message(t, body) - } - Event::Sync(b) => { - let Sync { peer } = *b; - e.sync(t, peer) - } - })?; - if t.turn_end_revisit_flag { - t.turn_end_revisit_flag = false; - revisit_oids.push(oid); + loop { + for TurnEvent { oid, event } in events.into_iter() { + t.with_oid(&oid, |_| Ok(()), |t, e| match event { + Event::Assert(b) => { + let Assert { assertion: Assertion(assertion), handle } = *b; + e.assert(t, assertion, handle) + } + Event::Retract(b) => { + let Retract { handle } = *b; + e.retract(t, handle) + } + Event::Message(b) => { + let Message { body: Assertion(body) } = *b; + e.message(t, body) + } + Event::Sync(b) => { + let Sync { peer } = *b; + e.sync(t, peer) + } + })?; } - } - for oid in revisit_oids.into_iter() { - t.with_oid(&oid, |_| Ok(()), |t, e| e.turn_end(t))?; + events = std::mem::take(&mut t.immediate_self); + if events.is_empty() { break; } } t.actor.queue_depth.fetch_sub(1, Ordering::Relaxed); // see (***) in this file Ok(false) diff --git a/src/relay.rs b/src/relay.rs index 04bf01f..eff74ed 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -492,7 +492,8 @@ impl Entity for TunnelRelay { tunnel_relay::RelayProtocol::Output(b) => match *b { tunnel_relay::Output { oid, event } => { if self.pending_outbound.is_empty() { - t.set_turn_end_flag(); + t.message_immediate_self( + &self.self_ref, &tunnel_relay::RelayProtocol::Flush); } let turn_event = TurnEvent { oid: Oid(oid.0), @@ -509,16 +510,15 @@ impl Entity for TunnelRelay { } } } + tunnel_relay::RelayProtocol::Flush => { + let events = std::mem::take(&mut self.pending_outbound); + self.send_packet(Packet::Turn(Box::new(Turn(events))))? + } } } Ok(()) } - fn turn_end(&mut self, _t: &mut Activation) -> ActorResult { - let events = std::mem::take(&mut self.pending_outbound); - self.send_packet(Packet::Turn(Box::new(Turn(events)))) - } - fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture { if let Err(e) = exit_status { let e = e.clone();