From 73b7ad75bd7690c306490bdac5193ea6a5d65e6a Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 25 Jul 2021 23:12:07 +0200 Subject: [PATCH] RwLock -> Mutex --- src/actor.rs | 32 +++++++++++++------------------- src/relay.rs | 26 +++++++++++++------------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 902a1fc..b46b010 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -21,6 +21,7 @@ use std::collections::hash_map::HashMap; use std::convert::TryFrom; use std::convert::TryInto; use std::sync::Arc; +use std::sync::Mutex; use std::sync::RwLock; use std::sync::Weak; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; @@ -120,7 +121,7 @@ pub struct Actor { #[derive(Clone)] pub struct ActorRef { pub actor_id: ActorId, - state: Arc>, + state: Arc>, } pub enum ActorState { @@ -142,7 +143,7 @@ pub struct RunningActor { pub struct Ref { pub mailbox: Arc, - pub target: RwLock>>>, + pub target: Mutex>>>, } #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -241,7 +242,7 @@ impl<'activation> Activation<'activation> { ) -> Option where F: FnOnce(&mut Activation) -> Option, { - match actor.state.write() { + match actor.state.lock() { Err(_) => panicked_err(), Ok(mut g) => match &mut *g { ActorState::Terminated { exit_status } => @@ -501,7 +502,7 @@ impl Actor { rx, ac_ref: ActorRef { actor_id, - state: Arc::new(RwLock::new(ActorState::Running(RunningActor { + state: Arc::new(Mutex::new(ActorState::Running(RunningActor { actor_id, tx, mailbox: Weak::new(), @@ -525,7 +526,7 @@ impl Actor { pub fn create_and_start_inert(name: tracing::Span) -> Arc> { let ac = Self::new(); - let r = ac.ac_ref.write(|s| s.unwrap().expect_running().create_inert()); + let r = ac.ac_ref.access(|s| s.unwrap().expect_running().create_inert()); ac.start(name); r } @@ -603,22 +604,15 @@ fn panicked_err() -> Option { } impl ActorRef { - pub fn read) -> R>(&self, f: F) -> R { - match self.state.read() { - Err(_) => f(None), - Ok(g) => f(Some(&*g)), - } - } - - pub fn write) -> R>(&self, f: F) -> R { - match self.state.write() { + pub fn access) -> R>(&self, f: F) -> R { + match self.state.lock() { Err(_) => f(None), Ok(mut g) => f(Some(&mut *g)), } } pub fn exit_status(&self) -> Option { - self.read(|s| s.map_or_else( + self.access(|s| s.map_or_else( panicked_err, |state| match state { ActorState::Running(_) => None, @@ -668,7 +662,7 @@ impl RunningActor { pub fn create_inert(&mut self) -> Arc> { Arc::new(Ref { mailbox: self.mailbox(), - target: RwLock::new(None), + target: Mutex::new(None), }) } @@ -756,7 +750,7 @@ pub fn external_events(mailbox: &Arc, debtor: &Arc, events: Pen impl Ref { pub fn become_entity>(&self, e: E) { - let mut g = self.target.write().expect("unpoisoned"); + let mut g = self.target.lock().expect("unpoisoned"); if g.is_some() { panic!("Double initialization of Ref"); } @@ -764,7 +758,7 @@ impl Ref { } pub fn with_entity) -> R>(&self, f: F) -> R { - let mut g = self.target.write().expect("unpoisoned"); + let mut g = self.target.lock().expect("unpoisoned"); f(g.as_mut().expect("initialized").as_mut()) } } @@ -815,7 +809,7 @@ impl Cap { { Self::new(&Arc::new(Ref { mailbox: Arc::clone(&underlying.mailbox), - target: RwLock::new(Some(Box::new(Guard { underlying: underlying.clone() }))), + target: Mutex::new(Some(Box::new(Guard { underlying: underlying.clone() }))), })) } diff --git a/src/relay.rs b/src/relay.rs index 5f82e78..1b0df31 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -37,7 +37,7 @@ use std::convert::TryFrom; use std::io; use std::pin::Pin; use std::sync::Arc; -use std::sync::RwLock; +use std::sync::Mutex; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -75,7 +75,7 @@ pub enum Output { Bytes(Pin>), } -type TunnelRelayRef = Arc>>; +type TunnelRelayRef = Arc>>; // There are other kinds of relay. This one has exactly two participants connected to each other. pub struct TunnelRelay @@ -178,7 +178,7 @@ impl TunnelRelay { initial_oid: Option, ) -> Option> { let (output_tx, output_rx) = unbounded_channel(); - let tr_ref = Arc::new(RwLock::new(None)); + let tr_ref = Arc::new(Mutex::new(None)); let self_entity = t.state.create(TunnelRefEntity { relay_ref: Arc::clone(&tr_ref), }); @@ -200,7 +200,7 @@ impl TunnelRelay { } let result = initial_oid.map( |io| Arc::clone(&tr.membranes.import_oid(t.state, &tr_ref, io).obj)); - *tr_ref.write().unwrap() = Some(tr); + *tr_ref.lock().unwrap() = Some(tr); t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx)); t.state.linked_task(crate::name!("reader"), input_loop(t.actor.clone(), i, tr_ref)); t.state.add_exit_hook(&self_entity); @@ -303,7 +303,7 @@ impl TunnelRelay { impl Entity for SyncPeer { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { self.peer.message(t, _Any::new(true)); - let mut g = self.relay_ref.write().expect("unpoisoned"); + let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) { let ws = Arc::clone(ws); // cloned to release the borrow to permit the release @@ -511,7 +511,7 @@ async fn input_loop( Ok(t.state.shutdown()) }), Some(bs) => Activation::for_actor(&ac, Arc::clone(&debtor), |t| { - let mut g = relay.write().expect("unpoisoned"); + let mut g = relay.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.handle_inbound_datagram(t, &bs?) })?, @@ -540,7 +540,7 @@ async fn input_loop( Ok(t.state.shutdown()) }), _ => Activation::for_actor(&ac, Arc::clone(&debtor), |t| { - let mut g = relay.write().expect("unpoisoned"); + let mut g = relay.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.handle_inbound_stream(t, &mut buf) })?, @@ -573,7 +573,7 @@ async fn output_loop( impl Entity<()> for TunnelRefEntity { fn message(&mut self, t: &mut Activation, _m: ()) -> ActorResult { - let mut g = self.relay_ref.write().expect("unpoisoned"); + let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); let events = std::mem::take(&mut tr.pending_outbound); tr.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events)))) @@ -582,7 +582,7 @@ impl Entity<()> for TunnelRefEntity { fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { if let Err(e) = &**exit_status { let e = e.clone(); - let mut g = self.relay_ref.write().expect("unpoisoned"); + let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?; } @@ -592,7 +592,7 @@ impl Entity<()> for TunnelRefEntity { impl Entity<_Any> for RelayEntity { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { - let mut g = self.relay_ref.write().expect("unpoisoned"); + let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert { assertion: P::Assertion(a), @@ -600,21 +600,21 @@ impl Entity<_Any> for RelayEntity { }))) } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { - let mut g = self.relay_ref.write().expect("unpoisoned"); + let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract { handle: P::Handle(h.into()), }))) } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { - let mut g = self.relay_ref.write().expect("unpoisoned"); + let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message { body: P::Assertion(m) }))) } fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { - let mut g = self.relay_ref.write().expect("unpoisoned"); + let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync { peer: Cap::guard(&peer)