Use recent shared-state changes to avoid scheduling overhead in relay.rs by activating the relay actor right from the input loop

This commit is contained in:
Tony Garnock-Jones 2021-07-25 01:10:43 +02:00
parent 35f510aa0b
commit 20539da63b
1 changed files with 121 additions and 137 deletions

View File

@ -37,6 +37,7 @@ use std::convert::TryFrom;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -47,19 +48,6 @@ use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
enum RelayInput {
Eof,
Packet(Vec<u8>),
Segment(Vec<u8>),
}
enum RelayProtocol {
Input(RelayInput),
Output(sturdy::Oid, P::Event),
SyncGc(Arc<Cap>),
Flush,
}
struct WireSymbol { struct WireSymbol {
oid: sturdy::Oid, oid: sturdy::Oid,
obj: Arc<Cap>, obj: Arc<Cap>,
@ -87,17 +75,17 @@ pub enum Output {
Bytes(Pin<Box<dyn AsyncWrite + Send>>), Bytes(Pin<Box<dyn AsyncWrite + Send>>),
} }
type TunnelRelayRef = Arc<Ref<RelayProtocol>>; type TunnelRelayRef = Arc<RwLock<Option<TunnelRelay>>>;
// There are other kinds of relay. This one has exactly two participants connected to each other. // There are other kinds of relay. This one has exactly two participants connected to each other.
pub struct TunnelRelay pub struct TunnelRelay
{ {
self_ref: TunnelRelayRef, self_ref: TunnelRelayRef,
input_buffer: BytesMut,
inbound_assertions: Map</* remote */ P::Handle, (/* local */ Handle, Vec<Arc<WireSymbol>>)>, inbound_assertions: Map</* remote */ P::Handle, (/* local */ Handle, Vec<Arc<WireSymbol>>)>,
outbound_assertions: Map<P::Handle, Vec<Arc<WireSymbol>>>, outbound_assertions: Map<P::Handle, Vec<Arc<WireSymbol>>>,
membranes: Membranes, membranes: Membranes,
pending_outbound: Vec<P::TurnEvent>, pending_outbound: Vec<P::TurnEvent>,
self_entity: Arc<Ref<()>>,
output: UnboundedSender<LoanedItem<Vec<u8>>>, output: UnboundedSender<LoanedItem<Vec<u8>>>,
} }
@ -106,6 +94,10 @@ struct RelayEntity {
oid: sturdy::Oid, oid: sturdy::Oid,
} }
struct TunnelRefEntity {
relay_ref: TunnelRelayRef,
}
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
impl WireSymbol { impl WireSymbol {
@ -186,9 +178,12 @@ impl TunnelRelay {
initial_oid: Option<sturdy::Oid>, initial_oid: Option<sturdy::Oid>,
) -> Option<Arc<Cap>> { ) -> Option<Arc<Cap>> {
let (output_tx, output_rx) = unbounded_channel(); let (output_tx, output_rx) = unbounded_channel();
let tr_ref = Arc::new(RwLock::new(None));
let self_entity = t.state.create(TunnelRefEntity {
relay_ref: Arc::clone(&tr_ref),
});
let mut tr = TunnelRelay { let mut tr = TunnelRelay {
self_ref: t.state.create_inert(), self_ref: Arc::clone(&tr_ref),
input_buffer: BytesMut::with_capacity(1024),
output: output_tx, output: output_tx,
inbound_assertions: Map::new(), inbound_assertions: Map::new(),
outbound_assertions: Map::new(), outbound_assertions: Map::new(),
@ -198,20 +193,52 @@ impl TunnelRelay {
next_export_oid: 0, next_export_oid: 0,
}, },
pending_outbound: Vec::new(), pending_outbound: Vec::new(),
self_entity: self_entity.clone(),
}; };
if let Some(ir) = initial_ref { if let Some(ir) = initial_ref {
tr.membranes.export_ref(ir, true); tr.membranes.export_ref(ir, true);
} }
let result = initial_oid.map( let result = initial_oid.map(
|io| Arc::clone(&tr.membranes.import_oid(t.state, &tr.self_ref, io).obj)); |io| Arc::clone(&tr.membranes.import_oid(t.state, &tr_ref, io).obj));
let tr_ref = Arc::clone(&tr.self_ref); *tr_ref.write().unwrap() = Some(tr);
tr_ref.become_entity(tr);
t.state.add_exit_hook(&tr_ref);
t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx)); t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx));
t.state.linked_task(crate::name!("reader"), input_loop(i, tr_ref)); t.state.linked_task(crate::name!("reader"), input_loop(t.actor.clone(), i, tr_ref));
t.state.add_exit_hook(&self_entity);
result result
} }
fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult {
let mut src = BytesBinarySource::new(&bs);
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
let mut r = src.packed::<_, _Any, _>(&mut dec);
let item = P::Packet::deserialize(&mut r)?;
self.handle_inbound_packet(t, item)
}
fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult {
loop {
let (e, count) = {
let mut src = BytesBinarySource::new(buf);
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
let mut r = src.packed::<_, _Any, _>(&mut dec);
let e = match P::Packet::deserialize(&mut r) {
Err(ParseError::Preserves(PreservesError::Io(e)))
if is_eof_io_error(&e) =>
None,
result => Some(result?),
};
(e, r.source.index)
};
match e {
None => return Ok(()),
Some(item) => {
buf.advance(count);
self.handle_inbound_packet(t, item)?;
}
}
}
}
fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult { fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult {
// tracing::trace!(packet = debug(&p), "-->"); // tracing::trace!(packet = debug(&p), "-->");
match p { match p {
@ -270,19 +297,23 @@ impl TunnelRelay {
let P::Sync { peer } = *b; let P::Sync { peer } = *b;
self.membranes.imported.acquire(&peer); self.membranes.imported.acquire(&peer);
struct SyncPeer { struct SyncPeer {
tr: TunnelRelayRef, relay_ref: TunnelRelayRef,
peer: Arc<Cap>, peer: Arc<Cap>,
} }
impl Entity<Synced> for SyncPeer { impl Entity<Synced> for SyncPeer {
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
self.peer.message(t, _Any::new(true)); self.peer.message(t, _Any::new(true));
t.message(&self.tr, RelayProtocol::SyncGc( let mut g = self.relay_ref.write().expect("unpoisoned");
Arc::clone(&self.peer))); let tr = g.as_mut().expect("initialized");
if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) {
let ws = Arc::clone(ws); // cloned to release the borrow to permit the release
tr.membranes.imported.release(&ws);
}
Ok(()) Ok(())
} }
} }
let k = t.state.create(SyncPeer { let k = t.state.create(SyncPeer {
tr: Arc::clone(&self.self_ref), relay_ref: Arc::clone(&self.self_ref),
peer: Arc::clone(&peer), peer: Arc::clone(&peer),
}); });
t.sync(&peer.underlying, k); t.sync(&peer.underlying, k);
@ -338,6 +369,18 @@ impl TunnelRelay {
let _ = self.output.send(LoanedItem::new(debtor, cost, bs)); let _ = self.output.send(LoanedItem::new(debtor, cost, bs));
Ok(()) Ok(())
} }
pub fn send_event(&mut self, t: &mut Activation, oid: sturdy::Oid, event: P::Event) -> ActorResult {
if self.pending_outbound.is_empty() {
t.message_for_myself(&self.self_entity, ());
}
let turn_event = P::TurnEvent {
oid: P::Oid(oid.0),
event: self.handle_outbound_event(t, event)?,
};
self.pending_outbound.push(turn_event);
Ok(())
}
} }
impl Membranes { impl Membranes {
@ -454,34 +497,24 @@ impl DomainEncode<P::_Ptr> for Membranes {
} }
async fn input_loop( async fn input_loop(
ac: ActorRef,
i: Input, i: Input,
relay: TunnelRelayRef, relay: TunnelRelayRef,
) -> ActorResult { ) -> ActorResult {
#[must_use]
async fn s(
relay: &TunnelRelayRef,
debtor: &Arc<Debtor>,
m: RelayInput,
) -> ActorResult {
debtor.ensure_clear_funds().await;
let relay = Arc::clone(relay);
external_event(&Arc::clone(&relay.mailbox), debtor, Box::new(
move |t| relay.with_entity(|e| e.message(t, RelayProtocol::Input(m)))))
}
let debtor = Debtor::new(crate::name!("input-loop")); let debtor = Debtor::new(crate::name!("input-loop"));
match i { match i {
Input::Packets(mut src) => { Input::Packets(mut src) => {
loop { loop {
debtor.ensure_clear_funds().await;
match src.next().await { match src.next().await {
None => { None => return Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
s(&relay, &debtor, RelayInput::Eof).await?; Ok(t.state.shutdown())
return Ok(()); }),
} Some(bs) => Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
Some(bs) => { let mut g = relay.write().expect("unpoisoned");
s(&relay, &debtor, RelayInput::Packet(bs?)).await?; let tr = g.as_mut().expect("initialized");
} tr.handle_inbound_datagram(t, &bs?)
})?,
} }
} }
} }
@ -489,30 +522,28 @@ async fn input_loop(
const BUFSIZE: usize = 65536; const BUFSIZE: usize = 65536;
let mut buf = BytesMut::with_capacity(BUFSIZE); let mut buf = BytesMut::with_capacity(BUFSIZE);
loop { loop {
debtor.ensure_clear_funds().await;
buf.reserve(BUFSIZE); buf.reserve(BUFSIZE);
let n = match r.read_buf(&mut buf).await { let n = match r.read_buf(&mut buf).await {
Ok(n) => n, Ok(n) => n,
Err(e) => Err(e) =>
if e.kind() == io::ErrorKind::ConnectionReset { if e.kind() == io::ErrorKind::ConnectionReset {
s(&relay, &debtor, RelayInput::Eof).await?; return Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
return Ok(()); Ok(t.state.shutdown())
});
} else { } else {
return Err(e)?; return Err(e)?;
}, },
}; };
match n { match n {
0 => { 0 => return Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
s(&relay, &debtor, RelayInput::Eof).await?; Ok(t.state.shutdown())
return Ok(()); }),
} _ => Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
_ => { let mut g = relay.write().expect("unpoisoned");
while buf.has_remaining() { let tr = g.as_mut().expect("initialized");
let bs = buf.chunk(); tr.handle_inbound_stream(t, &mut buf)
let n = bs.len(); })?,
s(&relay, &debtor, RelayInput::Segment(bs.to_vec())).await?;
buf.advance(n);
}
}
} }
} }
} }
@ -540,71 +571,20 @@ async fn output_loop(
} }
} }
impl Entity<RelayProtocol> for TunnelRelay { impl Entity<()> for TunnelRefEntity {
fn message(&mut self, t: &mut Activation, m: RelayProtocol) -> ActorResult { fn message(&mut self, t: &mut Activation, _m: ()) -> ActorResult {
match m { let mut g = self.relay_ref.write().expect("unpoisoned");
RelayProtocol::Input(RelayInput::Eof) => { let tr = g.as_mut().expect("initialized");
t.state.shutdown(); let events = std::mem::take(&mut tr.pending_outbound);
} tr.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events))))
RelayProtocol::Input(RelayInput::Packet(bs)) => {
let mut src = BytesBinarySource::new(&bs);
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
let mut r = src.packed::<_, _Any, _>(&mut dec);
let item = P::Packet::deserialize(&mut r)?;
self.handle_inbound_packet(t, item)?;
}
RelayProtocol::Input(RelayInput::Segment(bs)) => {
self.input_buffer.extend_from_slice(&bs);
loop {
let (e, count) = {
let mut src = BytesBinarySource::new(&self.input_buffer);
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
let mut r = src.packed::<_, _Any, _>(&mut dec);
let e = match P::Packet::deserialize(&mut r) {
Err(ParseError::Preserves(PreservesError::Io(e)))
if is_eof_io_error(&e) =>
None,
result => Some(result?),
};
(e, r.source.index)
};
match e {
None => break,
Some(item) => {
self.input_buffer.advance(count);
self.handle_inbound_packet(t, item)?;
}
}
}
}
RelayProtocol::Output(oid, event) => {
if self.pending_outbound.is_empty() {
t.message_for_myself(&self.self_ref, RelayProtocol::Flush);
}
let turn_event = P::TurnEvent {
oid: P::Oid(oid.0),
event: self.handle_outbound_event(t, event)?,
};
self.pending_outbound.push(turn_event);
}
RelayProtocol::SyncGc(peer) => {
if let Some(ws) = self.membranes.imported.ref_map.get(&peer) {
let ws = Arc::clone(ws); // cloned to release the borrow to permit the release
self.membranes.imported.release(&ws);
}
}
RelayProtocol::Flush => {
let events = std::mem::take(&mut self.pending_outbound);
self.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events))))?
}
}
Ok(())
} }
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult { fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
if let Err(e) = &**exit_status { if let Err(e) = &**exit_status {
let e = e.clone(); let e = e.clone();
self.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?; let mut g = self.relay_ref.write().expect("unpoisoned");
let tr = g.as_mut().expect("initialized");
tr.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?;
} }
Ok(()) Ok(())
} }
@ -612,28 +592,32 @@ impl Entity<RelayProtocol> for TunnelRelay {
impl Entity<_Any> for RelayEntity { impl Entity<_Any> for RelayEntity {
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
Ok(t.message(&self.relay_ref, RelayProtocol::Output( let mut g = self.relay_ref.write().expect("unpoisoned");
self.oid.clone(), let tr = g.as_mut().expect("initialized");
P::Event::Assert(Box::new(P::Assert { tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert {
assertion: P::Assertion(a), assertion: P::Assertion(a),
handle: P::Handle(h.into()), handle: P::Handle(h.into()),
}))))) })))
} }
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
Ok(t.message(&self.relay_ref, RelayProtocol::Output( let mut g = self.relay_ref.write().expect("unpoisoned");
self.oid.clone(), let tr = g.as_mut().expect("initialized");
P::Event::Retract(Box::new(P::Retract { tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract {
handle: P::Handle(h.into()), handle: P::Handle(h.into()),
}))))) })))
} }
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
Ok(t.message(&self.relay_ref, RelayProtocol::Output( let mut g = self.relay_ref.write().expect("unpoisoned");
self.oid.clone(), let tr = g.as_mut().expect("initialized");
P::Event::Message(Box::new(P::Message { body: P::Assertion(m) }))))) tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message {
body: P::Assertion(m)
})))
} }
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult { fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult {
Ok(t.message(&self.relay_ref, RelayProtocol::Output( let mut g = self.relay_ref.write().expect("unpoisoned");
self.oid.clone(), let tr = g.as_mut().expect("initialized");
P::Event::Sync(Box::new(P::Sync { peer: Cap::guard(&peer) }))))) tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync {
peer: Cap::guard(&peer)
})))
} }
} }