From a1766875fba7ad061f4f52e4a144cc9821b579f1 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 15 Jul 2021 13:13:22 +0200 Subject: [PATCH] A really interesting and apparently effective approach to internal flow control --- benches/bench_dataspace.rs | 18 ++-- examples/consumer.rs | 7 +- examples/pingpong.rs | 10 +- examples/producer.rs | 26 +++--- examples/state-consumer.rs | 7 +- examples/state-producer.rs | 47 +++++----- src/actor.rs | 180 ++++++++++++++++++++++++++---------- src/bin/syndicate-server.rs | 1 + src/relay.rs | 46 +++++---- 9 files changed, 215 insertions(+), 127 deletions(-) diff --git a/benches/bench_dataspace.rs b/benches/bench_dataspace.rs index 6247560..d84a86b 100644 --- a/benches/bench_dataspace.rs +++ b/benches/bench_dataspace.rs @@ -56,16 +56,17 @@ pub fn bench_pub(c: &mut Criterion) { let mut ac = Actor::new(); let ds = ac.create(Dataspace::new()); let shutdown = ac.create(ShutdownEntity); + let debtor = Debtor::new(syndicate::name!("sender-debtor")); ac.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { - ds.external_event(Event::Message(Box::new(Message { + ds.external_event(&debtor, Event::Message(Box::new(Message { body: Assertion(says(_Any::new("bench_pub"), Value::ByteString(vec![]).wrap())), - }))).await + }))).await? } - shutdown.external_event(Event::Message(Box::new(Message { + shutdown.external_event(&debtor, Event::Message(Box::new(Message { body: Assertion(_Any::new(true)), - }))).await; + }))).await?; Ok(()) }); ac.start(syndicate::name!("dataspace")).await.unwrap().unwrap(); @@ -123,16 +124,17 @@ pub fn bench_pub(c: &mut Criterion) { })), observer: shutdown, }); + let debtor = t.debtor.clone(); t.actor.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { - ds.external_event(Event::Message(Box::new(Message { + ds.external_event(&debtor, Event::Message(Box::new(Message { body: Assertion(says(_Any::new("bench_pub"), Value::ByteString(vec![]).wrap())), - }))).await + }))).await? } - ds.external_event(Event::Message(Box::new(Message { + ds.external_event(&debtor, Event::Message(Box::new(Message { body: Assertion(_Any::new(true)), - }))).await; + }))).await?; Ok(()) }); Ok(()) diff --git a/examples/consumer.rs b/examples/consumer.rs index 082b11e..5bb4efd 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -68,9 +68,10 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - consumer.external_event(Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await; + consumer.external_event(&Debtor::new(syndicate::name!("debtor")), + Event::Message(Box::new(Message { + body: Assertion(_Any::new(true)), + }))).await?; } }); Ok(None) diff --git a/examples/pingpong.rs b/examples/pingpong.rs index 0e9588e..2f95733 100644 --- a/examples/pingpong.rs +++ b/examples/pingpong.rs @@ -190,15 +190,17 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - consumer.external_event(Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await; + consumer.external_event(&Debtor::new(syndicate::name!("debtor")), + Event::Message(Box::new(Message { + body: Assertion(_Any::new(true)), + }))).await?; } }); if let PingPongMode::Ping(c) = &config.mode { let turn_count = c.turn_count; let action_count = c.action_count; + let debtor = t.debtor.clone(); t.actor.linked_task(syndicate::name!("boot-ping"), async move { let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap(); for _ in 0..turn_count { @@ -211,7 +213,7 @@ async fn main() -> Result<(), Box> { body: Assertion(current_rec.clone()), }))); } - ds.external_events(events).await + ds.external_events(&debtor, events).await? } Ok(()) }); diff --git a/examples/producer.rs b/examples/producer.rs index b3705ca..909f1ac 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -1,11 +1,9 @@ -use std::sync::Arc; - use structopt::StructOpt; use syndicate::actor::*; use syndicate::relay; +use syndicate::schemas::internal_protocol::*; use syndicate::sturdy; -use syndicate::value::NestedValue; use syndicate::value::Value; use tokio::net::TcpStream; @@ -33,6 +31,7 @@ fn says(who: _Any, what: _Any) -> _Any { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; + syndicate::actor::start_debt_reporter(); Actor::new().boot(syndicate::name!("producer"), |t| Box::pin(async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; @@ -40,18 +39,19 @@ async fn main() -> Result<(), Box> { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let padding: _Any = Value::ByteString(vec![0; config.bytes_padding]).wrap(); let action_count = config.action_count; - - let producer = syndicate::entity(Arc::clone(&*INERT_REF)) - .on_message(move |self_ref, t, _m| { + let debtor = Debtor::new(syndicate::name!("debtor")); + t.actor.linked_task(syndicate::name!("sender"), async move { + loop { + debtor.ensure_clear_funds().await; + let mut events = Vec::new(); for _ in 0..action_count { - t.message(&ds, says(Value::from("producer").wrap(), padding.clone())); + events.push(Event::Message(Box::new(Message { + body: Assertion(says(Value::from("producer").wrap(), padding.clone())), + }))); } - t.message(&self_ref, _Any::new(true)); - Ok(()) - }) - .create_rec(t.actor, |_ac, self_ref, p_ref| *self_ref = Arc::clone(p_ref)); - - t.message(&producer, _Any::new(true)); + ds.external_events(&debtor, events).await?; + } + }); Ok(None) }); Ok(()) diff --git a/examples/state-consumer.rs b/examples/state-consumer.rs index 3ddb388..6b0793f 100644 --- a/examples/state-consumer.rs +++ b/examples/state-consumer.rs @@ -84,9 +84,10 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - consumer.external_event(Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await; + consumer.external_event(&Debtor::new(syndicate::name!("debtor")), + Event::Message(Box::new(Message { + body: Assertion(_Any::new(true)), + }))).await?; } }); Ok(None) diff --git a/examples/state-producer.rs b/examples/state-producer.rs index dabe476..248df29 100644 --- a/examples/state-producer.rs +++ b/examples/state-producer.rs @@ -1,11 +1,9 @@ -use std::sync::Arc; - use structopt::StructOpt; use syndicate::actor::*; use syndicate::relay; +use syndicate::schemas::internal_protocol::*; use syndicate::sturdy; -use syndicate::value::NestedValue; use syndicate::value::Value; use tokio::net::TcpStream; @@ -24,29 +22,26 @@ async fn main() -> Result<(), Box> { let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { - let presence: _Any = Value::simple_record1( - "Present", - Value::from(std::process::id()).wrap()).wrap(); - - let mut handle = Some(t.assert(&ds, presence.clone())); - - let producer = syndicate::entity(Arc::clone(&*INERT_REF)) - .on_message(move |self_ref, t, m| { - match m.value().to_boolean()? { - true => { - handle = Some(t.assert(&ds, presence.clone())); - t.message(&self_ref, _Any::new(false)); - } - false => { - t.retract(handle.take().unwrap()); - t.message(&self_ref, _Any::new(true)); - } - } - Ok(()) - }) - .create_rec(t.actor, |_ac, self_ref, p_ref| *self_ref = Arc::clone(p_ref)); - - t.message(&producer, _Any::new(false)); + let debtor = Debtor::new(syndicate::name!("debtor")); + t.actor.linked_task(syndicate::name!("sender"), async move { + let presence: _Any = Value::simple_record1( + "Present", + Value::from(std::process::id()).wrap()).wrap(); + let handle = syndicate::next_handle(); + let assert_e = Event::Assert(Box::new(Assert { + assertion: Assertion(presence), + handle: handle.clone(), + })); + let retract_e = Event::Retract(Box::new(Retract { + handle, + })); + ds.external_event(&debtor, assert_e.clone()).await?; + loop { + debtor.ensure_clear_funds().await; + ds.external_event(&debtor, retract_e.clone()).await?; + ds.external_event(&debtor, assert_e.clone()).await?; + } + }); Ok(None) }); Ok(()) diff --git a/src/actor.rs b/src/actor.rs index ad86d16..deada94 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -17,11 +17,14 @@ use preserves::value::NestedValue; use std::boxed::Box; use std::collections::hash_map::HashMap; +use std::convert::TryInto; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::RwLock; +use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; +// use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use tracing; @@ -68,15 +71,30 @@ type PendingEventQueue = Vec<(Arc, Event)>; // avoid conflicts with schemas::internal_protocol::Turn. pub struct Activation<'activation> { pub actor: &'activation mut Actor, + pub debtor: Arc, queues: HashMap, immediate_self: Vec, } +#[derive(Debug)] +pub struct Debtor { + id: u64, + debt: Arc, + // notify: Notify, +} + +#[derive(Debug)] +pub struct LoanedItem { + pub debtor: Arc, + pub cost: usize, + pub item: T, +} + #[derive(Debug)] enum SystemMessage { Release, ReleaseOid(Oid), - Turn(Turn), + Turn(LoanedItem), Crash(Error), } @@ -84,7 +102,6 @@ pub struct Mailbox { pub actor_id: ActorId, pub mailbox_id: u64, tx: UnboundedSender, - queue_depth: Arc, mailbox_count: Arc, } @@ -92,7 +109,6 @@ pub struct Actor { actor_id: ActorId, tx: UnboundedSender, rx: Option>, - queue_depth: Arc, mailbox_count: Arc, outbound_assertions: OutboundAssertions, oid_map: Map>, @@ -115,6 +131,8 @@ pub struct Ref { //--------------------------------------------------------------------------- +static NEXT_DEBTOR_ID: AtomicU64 = AtomicU64::new(4); + preserves_schema::support::lazy_static! { pub static ref INERT_REF: Arc = { struct InertEntity; @@ -125,12 +143,40 @@ preserves_schema::support::lazy_static! { |t| Box::pin(ready(Ok(t.actor.shutdown())))); e }; + + pub static ref SYNDICATE_CREDIT: i64 = { + let credit = + std::env::var("SYNDICATE_CREDIT").unwrap_or("100".to_owned()) + .parse::().expect("Valid SYNDICATE_CREDIT environment variable"); + tracing::info!("Configured SYNDICATE_CREDIT = {}", credit); + credit + }; + + pub static ref DEBTORS: RwLock)>> = + RwLock::new(Map::new()); +} + +pub fn start_debt_reporter() { + Actor::new().boot(crate::name!("debt-reporter"), |t| Box::pin(async move { + t.actor.linked_task(crate::name!("tick"), async move { + let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); + loop { + timer.tick().await; + for (id, (name, debt)) in DEBTORS.read().unwrap().iter() { + let _enter = name.enter(); + tracing::info!(id, debt = debug(debt.load(Ordering::Relaxed))); + } + } + }); + Ok(()) + })); } impl<'activation> Activation<'activation> { - pub fn for_actor(actor: &'activation mut Actor) -> Self { + pub fn new(actor: &'activation mut Actor, debtor: Arc) -> Self { Activation { actor, + debtor, queues: HashMap::new(), immediate_self: Vec::new(), } @@ -217,8 +263,10 @@ impl<'activation> Activation<'activation> { if turn.len() == 0 { continue; } let first_ref = Arc::clone(&turn[0].0); let target = &first_ref.addr.mailbox; - target.send(Turn(turn.into_iter().map( - |(r, e)| TurnEvent { oid: r.addr.oid.clone(), event: e }).collect())); + let _ = target.send( + &self.debtor, + Turn(turn.into_iter().map( + |(r, e)| TurnEvent { oid: r.addr.oid.clone(), event: e }).collect())); } } @@ -247,15 +295,70 @@ impl<'activation> Drop for Activation<'activation> { } } -impl Mailbox { - pub fn send(&self, t: Turn) { - if let Ok(()) = self.tx.send(SystemMessage::Turn(t)) { - self.queue_depth.fetch_add(1, Ordering::Relaxed); - } +impl Debtor { + pub fn new(name: tracing::Span) -> Arc { + let id = NEXT_DEBTOR_ID.fetch_add(1, Ordering::Relaxed); + let debt = Arc::new(AtomicI64::new(0)); + DEBTORS.write().unwrap().insert(id, (name, Arc::clone(&debt))); + Arc::new(Debtor { + id, + debt, + // notify: Notify::new(), + }) } - pub fn current_queue_depth(&self) -> usize { - self.queue_depth.load(Ordering::Relaxed) + pub fn balance(&self) -> i64 { + self.debt.load(Ordering::Relaxed) + } + + pub fn borrow(&self, token_count: usize) { + let token_count: i64 = token_count.try_into().expect("manageable token count"); + self.debt.fetch_add(token_count, Ordering::Relaxed); + } + + pub fn repay(&self, token_count: usize) { + let token_count: i64 = token_count.try_into().expect("manageable token count"); + let _old_debt = self.debt.fetch_sub(token_count, Ordering::Relaxed); + // if _old_debt - token_count <= *SYNDICATE_CREDIT { + // self.notify.notify_one(); + // } + } + + pub async fn ensure_clear_funds(&self) { + let limit = *SYNDICATE_CREDIT; + tokio::task::yield_now().await; + while self.balance() > limit { + tokio::task::yield_now().await; + // self.notify.notified().await; + } + } +} + +impl Drop for Debtor { + fn drop(&mut self) { + DEBTORS.write().unwrap().remove(&self.id); + } +} + +impl LoanedItem { + pub fn new(debtor: &Arc, cost: usize, item: T) -> Self { + debtor.borrow(cost); + LoanedItem { debtor: Arc::clone(debtor), cost, item } + } +} + +impl Drop for LoanedItem { + fn drop(&mut self) { + self.debtor.repay(self.cost); + } +} + +impl Mailbox { + #[must_use] + pub fn send(&self, debtor: &Arc, t: Turn) -> ActorResult { + let token_count = t.0.len(); + self.tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t))) + .map_err(|_| error("Target actor not running", _Any::new(false))) } } @@ -292,13 +395,12 @@ impl PartialOrd for Mailbox { impl Clone for Mailbox { fn clone(&self) -> Self { - let Mailbox { actor_id, tx, queue_depth, mailbox_count, .. } = self; + let Mailbox { actor_id, tx, mailbox_count, .. } = self; let _old_refcount = mailbox_count.fetch_add(1, Ordering::SeqCst); let new_mailbox = Mailbox { actor_id: *actor_id, mailbox_id: crate::next_mailbox_id(), tx: tx.clone(), - queue_depth: Arc::clone(queue_depth), mailbox_count: Arc::clone(mailbox_count), }; // tracing::trace!(old_mailbox = debug(&self), @@ -330,7 +432,6 @@ impl Actor { actor_id, tx, rx: Some(rx), - queue_depth: Arc::new(AtomicUsize::new(0)), mailbox_count: Arc::new(AtomicUsize::new(0)), outbound_assertions: Map::new(), oid_map: Map::new(), @@ -366,7 +467,6 @@ impl Actor { actor_id: self.actor_id, mailbox_id: crate::next_mailbox_id(), tx: self.tx.clone(), - queue_depth: Arc::clone(&self.queue_depth), mailbox_count: Arc::clone(&self.mailbox_count), }; // tracing::trace!(new_mailbox = debug(&new_mailbox), @@ -410,19 +510,9 @@ impl Actor { name.record("actor_id", &self.id()); tokio::spawn(async move { tracing::trace!("start"); - { - let queue_depth = Arc::clone(&self.queue_depth); - self.linked_task(crate::name!("queue-monitor"), async move { - let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); - loop { - timer.tick().await; - tracing::info!(queue_depth = debug(queue_depth.load(Ordering::Relaxed))); - } - }); - } let result = self.run(boot).await; { - let mut t = Activation::for_actor(&mut self); + let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown"))); for r in std::mem::take(&mut t.actor.exit_hooks) { match t.actor.oid_map.remove_entry(&r.addr.oid) { None => (), @@ -458,7 +548,7 @@ impl Actor { ) -> ActorResult { let _id = self.id(); // tracing::trace!(_id, "boot"); - boot(&mut Activation::for_actor(self)).await?; + boot(&mut Activation::new(self, Debtor::new(crate::name!("boot")))).await?; // tracing::trace!(_id, "run"); loop { match self.rx.as_mut().expect("present rx channel half").recv().await { @@ -469,11 +559,6 @@ impl Actor { if should_stop { return Ok(()); } - // We would have a loop calling try_recv until it answers "no more at - // present" here, to avoid decrementing queue_depth for every message - // (instead zeroing it on queue empty - it only needs to be approximate), - // but try_recv has been removed from mpsc at the time of writing. See - // https://github.com/tokio-rs/tokio/issues/3350 . (***) } } } @@ -494,8 +579,9 @@ impl Actor { self.oid_map.remove(&oid); Ok(false) } - SystemMessage::Turn(Turn(mut events)) => { - let mut t = Activation::for_actor(self); + SystemMessage::Turn(mut loaned_item) => { + let mut events = std::mem::take(&mut loaned_item.item.0); + let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor)); loop { for TurnEvent { oid, event } in events.into_iter() { t.with_oid(&oid, |_| Ok(()), |t, e| match event { @@ -520,7 +606,6 @@ impl Actor { events = std::mem::take(&mut t.immediate_self); if events.is_empty() { break; } } - t.actor.queue_depth.fetch_sub(1, Ordering::Relaxed); // see (***) in this file Ok(false) } SystemMessage::Crash(e) => { @@ -581,31 +666,26 @@ impl Drop for Actor { let to_clear = std::mem::take(&mut self.outbound_assertions); { - let mut t = Activation::for_actor(self); + let mut t = Activation::new(self, Debtor::new(crate::name!("drop"))); for (handle, r) in to_clear.into_iter() { tracing::trace!(h = debug(&handle), "retract on termination"); t.retract_known_ref(r, handle); } } - // In future, could do this: - // tokio::spawn(async move { - // while let Some(m) = rx.recv().await { - // match m { ... } - // } - // }); - tracing::trace!("Actor::drop"); } } impl Ref { - pub async fn external_event(&self, event: Event) { - self.addr.mailbox.send(Turn(vec![TurnEvent { oid: self.addr.oid.clone(), event }])) + #[must_use] + pub async fn external_event(&self, debtor: &Arc, event: Event) -> ActorResult { + self.addr.mailbox.send(debtor, Turn(vec![TurnEvent { oid: self.addr.oid.clone(), event }])) } - pub async fn external_events(&self, events: Vec) { - self.addr.mailbox.send(Turn(events.into_iter().map(|event| TurnEvent { + #[must_use] + pub async fn external_events(&self, debtor: &Arc, events: Vec) -> ActorResult { + self.addr.mailbox.send(debtor, Turn(events.into_iter().map(|event| TurnEvent { oid: self.addr.oid.clone(), event, }).collect())) diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index 18dfedc..f89b801 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -32,6 +32,7 @@ use tungstenite::Message; #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; + syndicate::actor::start_debt_reporter(); { const BRIGHT_GREEN: &str = "\x1b[92m"; diff --git a/src/relay.rs b/src/relay.rs index 55fe2b8..b8115e4 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -82,7 +82,7 @@ pub struct TunnelRelay outbound_assertions: Map>>, membranes: Membranes, pending_outbound: Vec, - output: UnboundedSender>, + output: UnboundedSender>>, } struct RelayEntity { @@ -208,7 +208,7 @@ impl TunnelRelay { Err(*b) }, Packet::Turn(b) => { - let t = &mut Activation::for_actor(t.actor); + let t = &mut Activation::new(t.actor, Arc::clone(&t.debtor)); let Turn(events) = *b; for TurnEvent { oid, event } in events { let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) { @@ -321,9 +321,9 @@ impl TunnelRelay { Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?) } - pub fn send_packet(&mut self, p: Packet) -> ActorResult { + pub fn send_packet(&mut self, debtor: &Arc, cost: usize, p: Packet) -> ActorResult { let bs = self.encode_packet(p)?; - let _ = self.output.send(bs); + let _ = self.output.send(LoanedItem::new(debtor, cost, bs)); Ok(()) } } @@ -445,20 +445,24 @@ pub async fn input_loop( i: Input, relay: Arc, ) -> ActorResult { - async fn s>(relay: &Arc, m: M) -> () { - relay.external_event(Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await + #[must_use] + async fn s>(relay: &Arc, debtor: &Arc, m: M) -> ActorResult { + debtor.ensure_clear_funds().await; + relay.external_event(debtor, Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await } + let debtor = Debtor::new(crate::name!("input-loop")); + match i { Input::Packets(mut src) => { loop { match src.next().await { None => { - s(&relay, &tunnel_relay::Input::Eof).await; + s(&relay, &debtor, &tunnel_relay::Input::Eof).await?; return Ok(()); } Some(bs) => { - s(&relay, &tunnel_relay::Input::Packet { bs: bs? }).await; + s(&relay, &debtor, &tunnel_relay::Input::Packet { bs: bs? }).await?; } } } @@ -471,7 +475,7 @@ pub async fn input_loop( Ok(n) => n, Err(e) => if e.kind() == io::ErrorKind::ConnectionReset { - s(&relay, &tunnel_relay::Input::Eof).await; + s(&relay, &debtor, &tunnel_relay::Input::Eof).await?; return Ok(()); } else { return Err(e)?; @@ -479,14 +483,14 @@ pub async fn input_loop( }; match n { 0 => { - s(&relay, &tunnel_relay::Input::Eof).await; + s(&relay, &debtor, &tunnel_relay::Input::Eof).await?; return Ok(()); } _ => { while buf.has_remaining() { let bs = buf.chunk(); let n = bs.len(); - s(&relay, &tunnel_relay::Input::Segment { bs: bs.to_vec() }).await; + s(&relay, &debtor, &tunnel_relay::Input::Segment { bs: bs.to_vec() }).await?; buf.advance(n); } } @@ -498,17 +502,19 @@ pub async fn input_loop( pub async fn output_loop( mut o: Output, - mut output_rx: UnboundedReceiver>, + mut output_rx: UnboundedReceiver>>, ) -> ActorResult { loop { match output_rx.recv().await { None => return Ok(()), - Some(bs) => match &mut o { - Output::Packets(sink) => sink.send(bs).await?, - Output::Bytes(w) => { - w.write_all(&bs).await?; - w.flush().await?; + Some(mut loaned_item) => { + match &mut o { + Output::Packets(sink) => sink.send(std::mem::take(&mut loaned_item.item)).await?, + Output::Bytes(w) => { + w.write_all(&loaned_item.item).await?; + w.flush().await?; + } } } } @@ -578,17 +584,17 @@ impl Entity for TunnelRelay { } tunnel_relay::RelayProtocol::Flush => { let events = std::mem::take(&mut self.pending_outbound); - self.send_packet(Packet::Turn(Box::new(Turn(events))))? + self.send_packet(&t.debtor, events.len(), Packet::Turn(Box::new(Turn(events))))? } } } Ok(()) } - fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture { + fn exit_hook(&mut self, t: &mut Activation, exit_status: &ActorResult) -> BoxFuture { if let Err(e) = exit_status { let e = e.clone(); - Box::pin(ready(self.send_packet(Packet::Error(Box::new(e))))) + Box::pin(ready(self.send_packet(&t.debtor, 1, Packet::Error(Box::new(e))))) } else { Box::pin(ready(Ok(()))) }