This commit is contained in:
Tony Garnock-Jones 2021-07-23 08:10:09 +02:00
parent 908ab08f4c
commit d550ba2705
1 changed files with 4 additions and 5 deletions

View File

@ -106,8 +106,8 @@ pub struct Mailbox {
pub struct Actor { pub struct Actor {
actor_id: ActorId, actor_id: ActorId,
tx: UnboundedSender<SystemMessage>, tx: UnboundedSender<SystemMessage>,
rx: UnboundedReceiver<SystemMessage>,
mailbox: Weak<Mailbox>, mailbox: Weak<Mailbox>,
rx: Option<UnboundedReceiver<SystemMessage>>,
outbound_assertions: OutboundAssertions, outbound_assertions: OutboundAssertions,
next_task_id: u64, next_task_id: u64,
linked_tasks: Map<u64, CancellationToken>, linked_tasks: Map<u64, CancellationToken>,
@ -409,7 +409,7 @@ impl Actor {
Actor { Actor {
actor_id, actor_id,
tx, tx,
rx: Some(rx), rx,
mailbox: Weak::new(), mailbox: Weak::new(),
outbound_assertions: Map::new(), outbound_assertions: Map::new(),
next_task_id: 0, next_task_id: 0,
@ -513,7 +513,7 @@ impl Actor {
boot(&mut Activation::new(self, Debtor::new(crate::name!("boot")))).await?; boot(&mut Activation::new(self, Debtor::new(crate::name!("boot")))).await?;
// tracing::trace!(_id, "run"); // tracing::trace!(_id, "run");
loop { loop {
match self.rx.as_mut().expect("present rx channel half").recv().await { match self.rx.recv().await {
None => None =>
Err(error("Unexpected channel close", _Any::new(false)))?, Err(error("Unexpected channel close", _Any::new(false)))?,
Some(m) => { Some(m) => {
@ -599,8 +599,7 @@ impl Actor {
impl Drop for Actor { impl Drop for Actor {
fn drop(&mut self) { fn drop(&mut self) {
let mut rx = self.rx.take().expect("present rx channel half during drop"); self.rx.close();
rx.close();
for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() { for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() {
token.cancel(); token.cancel();