diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index da6149a..37ed169 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -275,7 +275,10 @@ struct EventBuffer { pub desc: Option, pub trace_collector: Option, pub account: Arc, - queues: HashMap, PendingEventQueue)>, + + // INVARIANT: At most one of single_queue and multiple_queues is non-None at any given time. + single_queue: Option<(ActorId, UnboundedSender, PendingEventQueue)>, + multiple_queues: Option, PendingEventQueue)>>, } /// An `Account` records a "debt" in terms of outstanding work items. @@ -1551,7 +1554,8 @@ impl EventBuffer { desc, trace_collector, account, - queues: HashMap::new(), + single_queue: None, + multiple_queues: None, } } @@ -1560,8 +1564,26 @@ impl EventBuffer { } fn queue_for_mailbox(&mut self, mailbox: &Arc) -> &mut PendingEventQueue { - &mut self.queues.entry(mailbox.actor_id) - .or_insert((mailbox.tx.clone(), Vec::new())).1 + if self.multiple_queues.is_some() { + return &mut self.multiple_queues.as_mut().unwrap().entry(mailbox.actor_id) + .or_insert((mailbox.tx.clone(), Vec::new())).1; + } + + if let None = self.single_queue { + self.single_queue = Some((mailbox.actor_id, mailbox.tx.clone(), Vec::with_capacity(3))); + return &mut self.single_queue.as_mut().unwrap().2; + } + + if Some(mailbox.actor_id) == self.single_queue.as_ref().map(|e| e.0) { + return &mut self.single_queue.as_mut().unwrap().2; + } + + let (aid, tx, q) = std::mem::take(&mut self.single_queue).unwrap(); + let mut table = HashMap::new(); + table.insert(aid, (tx, q)); + self.multiple_queues = Some(table); + return &mut self.multiple_queues.as_mut().unwrap().entry(mailbox.actor_id) + .or_insert((mailbox.tx.clone(), Vec::new())).1; } fn commit(&mut self) { @@ -1571,19 +1593,22 @@ impl EventBuffer { c.record(self.source_actor_id, trace::ActorActivation::Turn(Box::new(d.take()))); } } - for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() { - // Deliberately ignore send errors here: they indicate that the recipient is no - // longer alive. But we don't care about that case, since we have to be robust to - // crashes anyway. (When we were printing errors we saw here, an example of the - // problems it caused was a relay output_loop that received EPIPE causing the relay - // to crash, just as it was receiving thousands of messages a second, leading to - // many, many log reports of failed send_actions from the following line.) - let _ = send_actions(&tx, - self.desc.as_ref().map(|d| trace::TurnCause::Turn { - id: Box::new(d.id.clone()), - }), - &self.account, - turn); + + // Deliberately ignore send errors here: they indicate that the recipient is no + // longer alive. But we don't care about that case, since we have to be robust to + // crashes anyway. (When we were printing errors we saw here, an example of the + // problems it caused was a relay output_loop that received EPIPE causing the relay + // to crash, just as it was receiving thousands of messages a second, leading to + // many, many log reports of failed send_actions from the following line.) + if let Some((_actor_id, tx, turn)) = std::mem::take(&mut self.single_queue) { + let desc = self.desc.as_ref().map(|d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); + let _ = send_actions(&tx, desc, &self.account, turn); + } + if let Some(table) = std::mem::take(&mut self.multiple_queues) { + for (_actor_id, (tx, turn)) in table.into_iter() { + let desc = self.desc.as_ref().map(|d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); + let _ = send_actions(&tx, desc, &self.account, turn); + } } }