From 20539da63b5beead963499493baa0007e565e6b1 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 25 Jul 2021 01:10:43 +0200 Subject: [PATCH] Use recent shared-state changes to avoid scheduling overhead in relay.rs by activating the relay actor right from the input loop --- src/relay.rs | 258 ++++++++++++++++++++++++--------------------------- 1 file changed, 121 insertions(+), 137 deletions(-) diff --git a/src/relay.rs b/src/relay.rs index 2015b2a..5f82e78 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -37,6 +37,7 @@ use std::convert::TryFrom; use std::io; use std::pin::Pin; use std::sync::Arc; +use std::sync::RwLock; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -47,19 +48,6 @@ use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; -enum RelayInput { - Eof, - Packet(Vec), - Segment(Vec), -} - -enum RelayProtocol { - Input(RelayInput), - Output(sturdy::Oid, P::Event), - SyncGc(Arc), - Flush, -} - struct WireSymbol { oid: sturdy::Oid, obj: Arc, @@ -87,17 +75,17 @@ pub enum Output { Bytes(Pin>), } -type TunnelRelayRef = Arc>; +type TunnelRelayRef = Arc>>; // There are other kinds of relay. This one has exactly two participants connected to each other. pub struct TunnelRelay { self_ref: TunnelRelayRef, - input_buffer: BytesMut, inbound_assertions: Map>)>, outbound_assertions: Map>>, membranes: Membranes, pending_outbound: Vec, + self_entity: Arc>, output: UnboundedSender>>, } @@ -106,6 +94,10 @@ struct RelayEntity { oid: sturdy::Oid, } +struct TunnelRefEntity { + relay_ref: TunnelRelayRef, +} + //--------------------------------------------------------------------------- impl WireSymbol { @@ -186,9 +178,12 @@ impl TunnelRelay { initial_oid: Option, ) -> Option> { let (output_tx, output_rx) = unbounded_channel(); + let tr_ref = Arc::new(RwLock::new(None)); + let self_entity = t.state.create(TunnelRefEntity { + relay_ref: Arc::clone(&tr_ref), + }); let mut tr = TunnelRelay { - self_ref: t.state.create_inert(), - input_buffer: BytesMut::with_capacity(1024), + self_ref: Arc::clone(&tr_ref), output: output_tx, inbound_assertions: Map::new(), outbound_assertions: Map::new(), @@ -198,20 +193,52 @@ impl TunnelRelay { next_export_oid: 0, }, pending_outbound: Vec::new(), + self_entity: self_entity.clone(), }; if let Some(ir) = initial_ref { tr.membranes.export_ref(ir, true); } let result = initial_oid.map( - |io| Arc::clone(&tr.membranes.import_oid(t.state, &tr.self_ref, io).obj)); - let tr_ref = Arc::clone(&tr.self_ref); - tr_ref.become_entity(tr); - t.state.add_exit_hook(&tr_ref); + |io| Arc::clone(&tr.membranes.import_oid(t.state, &tr_ref, io).obj)); + *tr_ref.write().unwrap() = Some(tr); t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx)); - t.state.linked_task(crate::name!("reader"), input_loop(i, tr_ref)); + t.state.linked_task(crate::name!("reader"), input_loop(t.actor.clone(), i, tr_ref)); + t.state.add_exit_hook(&self_entity); result } + fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult { + let mut src = BytesBinarySource::new(&bs); + let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); + let mut r = src.packed::<_, _Any, _>(&mut dec); + let item = P::Packet::deserialize(&mut r)?; + self.handle_inbound_packet(t, item) + } + + fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult { + loop { + let (e, count) = { + let mut src = BytesBinarySource::new(buf); + let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); + let mut r = src.packed::<_, _Any, _>(&mut dec); + let e = match P::Packet::deserialize(&mut r) { + Err(ParseError::Preserves(PreservesError::Io(e))) + if is_eof_io_error(&e) => + None, + result => Some(result?), + }; + (e, r.source.index) + }; + match e { + None => return Ok(()), + Some(item) => { + buf.advance(count); + self.handle_inbound_packet(t, item)?; + } + } + } + } + fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult { // tracing::trace!(packet = debug(&p), "-->"); match p { @@ -270,19 +297,23 @@ impl TunnelRelay { let P::Sync { peer } = *b; self.membranes.imported.acquire(&peer); struct SyncPeer { - tr: TunnelRelayRef, + relay_ref: TunnelRelayRef, peer: Arc, } impl Entity for SyncPeer { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { self.peer.message(t, _Any::new(true)); - t.message(&self.tr, RelayProtocol::SyncGc( - Arc::clone(&self.peer))); + let mut g = self.relay_ref.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) { + let ws = Arc::clone(ws); // cloned to release the borrow to permit the release + tr.membranes.imported.release(&ws); + } Ok(()) } } let k = t.state.create(SyncPeer { - tr: Arc::clone(&self.self_ref), + relay_ref: Arc::clone(&self.self_ref), peer: Arc::clone(&peer), }); t.sync(&peer.underlying, k); @@ -338,6 +369,18 @@ impl TunnelRelay { let _ = self.output.send(LoanedItem::new(debtor, cost, bs)); Ok(()) } + + pub fn send_event(&mut self, t: &mut Activation, oid: sturdy::Oid, event: P::Event) -> ActorResult { + if self.pending_outbound.is_empty() { + t.message_for_myself(&self.self_entity, ()); + } + let turn_event = P::TurnEvent { + oid: P::Oid(oid.0), + event: self.handle_outbound_event(t, event)?, + }; + self.pending_outbound.push(turn_event); + Ok(()) + } } impl Membranes { @@ -454,34 +497,24 @@ impl DomainEncode for Membranes { } async fn input_loop( + ac: ActorRef, i: Input, relay: TunnelRelayRef, ) -> ActorResult { - #[must_use] - async fn s( - relay: &TunnelRelayRef, - debtor: &Arc, - m: RelayInput, - ) -> ActorResult { - debtor.ensure_clear_funds().await; - let relay = Arc::clone(relay); - external_event(&Arc::clone(&relay.mailbox), debtor, Box::new( - move |t| relay.with_entity(|e| e.message(t, RelayProtocol::Input(m))))) - } - let debtor = Debtor::new(crate::name!("input-loop")); - match i { Input::Packets(mut src) => { loop { + debtor.ensure_clear_funds().await; match src.next().await { - None => { - s(&relay, &debtor, RelayInput::Eof).await?; - return Ok(()); - } - Some(bs) => { - s(&relay, &debtor, RelayInput::Packet(bs?)).await?; - } + None => return Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + Ok(t.state.shutdown()) + }), + Some(bs) => Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + let mut g = relay.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_datagram(t, &bs?) + })?, } } } @@ -489,30 +522,28 @@ async fn input_loop( const BUFSIZE: usize = 65536; let mut buf = BytesMut::with_capacity(BUFSIZE); loop { + debtor.ensure_clear_funds().await; buf.reserve(BUFSIZE); let n = match r.read_buf(&mut buf).await { Ok(n) => n, Err(e) => if e.kind() == io::ErrorKind::ConnectionReset { - s(&relay, &debtor, RelayInput::Eof).await?; - return Ok(()); + return Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + Ok(t.state.shutdown()) + }); } else { return Err(e)?; }, }; match n { - 0 => { - s(&relay, &debtor, RelayInput::Eof).await?; - return Ok(()); - } - _ => { - while buf.has_remaining() { - let bs = buf.chunk(); - let n = bs.len(); - s(&relay, &debtor, RelayInput::Segment(bs.to_vec())).await?; - buf.advance(n); - } - } + 0 => return Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + Ok(t.state.shutdown()) + }), + _ => Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + let mut g = relay.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_stream(t, &mut buf) + })?, } } } @@ -540,71 +571,20 @@ async fn output_loop( } } -impl Entity for TunnelRelay { - fn message(&mut self, t: &mut Activation, m: RelayProtocol) -> ActorResult { - match m { - RelayProtocol::Input(RelayInput::Eof) => { - t.state.shutdown(); - } - RelayProtocol::Input(RelayInput::Packet(bs)) => { - let mut src = BytesBinarySource::new(&bs); - let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); - let mut r = src.packed::<_, _Any, _>(&mut dec); - let item = P::Packet::deserialize(&mut r)?; - self.handle_inbound_packet(t, item)?; - } - RelayProtocol::Input(RelayInput::Segment(bs)) => { - self.input_buffer.extend_from_slice(&bs); - loop { - let (e, count) = { - let mut src = BytesBinarySource::new(&self.input_buffer); - let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); - let mut r = src.packed::<_, _Any, _>(&mut dec); - let e = match P::Packet::deserialize(&mut r) { - Err(ParseError::Preserves(PreservesError::Io(e))) - if is_eof_io_error(&e) => - None, - result => Some(result?), - }; - (e, r.source.index) - }; - match e { - None => break, - Some(item) => { - self.input_buffer.advance(count); - self.handle_inbound_packet(t, item)?; - } - } - } - } - RelayProtocol::Output(oid, event) => { - if self.pending_outbound.is_empty() { - t.message_for_myself(&self.self_ref, RelayProtocol::Flush); - } - let turn_event = P::TurnEvent { - oid: P::Oid(oid.0), - event: self.handle_outbound_event(t, event)?, - }; - self.pending_outbound.push(turn_event); - } - RelayProtocol::SyncGc(peer) => { - if let Some(ws) = self.membranes.imported.ref_map.get(&peer) { - let ws = Arc::clone(ws); // cloned to release the borrow to permit the release - self.membranes.imported.release(&ws); - } - } - RelayProtocol::Flush => { - let events = std::mem::take(&mut self.pending_outbound); - self.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events))))? - } - } - Ok(()) +impl Entity<()> for TunnelRefEntity { + fn message(&mut self, t: &mut Activation, _m: ()) -> ActorResult { + let mut g = self.relay_ref.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + let events = std::mem::take(&mut tr.pending_outbound); + tr.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events)))) } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { if let Err(e) = &**exit_status { let e = e.clone(); - self.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?; + let mut g = self.relay_ref.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + tr.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?; } Ok(()) } @@ -612,28 +592,32 @@ impl Entity for TunnelRelay { impl Entity<_Any> for RelayEntity { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { - Ok(t.message(&self.relay_ref, RelayProtocol::Output( - self.oid.clone(), - P::Event::Assert(Box::new(P::Assert { - assertion: P::Assertion(a), - handle: P::Handle(h.into()), - }))))) + let mut g = self.relay_ref.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert { + assertion: P::Assertion(a), + handle: P::Handle(h.into()), + }))) } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { - Ok(t.message(&self.relay_ref, RelayProtocol::Output( - self.oid.clone(), - P::Event::Retract(Box::new(P::Retract { - handle: P::Handle(h.into()), - }))))) + let mut g = self.relay_ref.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract { + handle: P::Handle(h.into()), + }))) } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { - Ok(t.message(&self.relay_ref, RelayProtocol::Output( - self.oid.clone(), - P::Event::Message(Box::new(P::Message { body: P::Assertion(m) }))))) + let mut g = self.relay_ref.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message { + body: P::Assertion(m) + }))) } fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { - Ok(t.message(&self.relay_ref, RelayProtocol::Output( - self.oid.clone(), - P::Event::Sync(Box::new(P::Sync { peer: Cap::guard(&peer) }))))) + let mut g = self.relay_ref.write().expect("unpoisoned"); + let tr = g.as_mut().expect("initialized"); + tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync { + peer: Cap::guard(&peer) + }))) } }