RwLock -> Mutex
This commit is contained in:
parent
5b97628137
commit
73b7ad75bd
32
src/actor.rs
32
src/actor.rs
|
@ -21,6 +21,7 @@ use std::collections::hash_map::HashMap;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::Mutex;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use std::sync::Weak;
|
use std::sync::Weak;
|
||||||
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
||||||
|
@ -120,7 +121,7 @@ pub struct Actor {
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ActorRef {
|
pub struct ActorRef {
|
||||||
pub actor_id: ActorId,
|
pub actor_id: ActorId,
|
||||||
state: Arc<RwLock<ActorState>>,
|
state: Arc<Mutex<ActorState>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum ActorState {
|
pub enum ActorState {
|
||||||
|
@ -142,7 +143,7 @@ pub struct RunningActor {
|
||||||
|
|
||||||
pub struct Ref<M> {
|
pub struct Ref<M> {
|
||||||
pub mailbox: Arc<Mailbox>,
|
pub mailbox: Arc<Mailbox>,
|
||||||
pub target: RwLock<Option<Box<dyn Entity<M>>>>,
|
pub target: Mutex<Option<Box<dyn Entity<M>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||||
|
@ -241,7 +242,7 @@ impl<'activation> Activation<'activation> {
|
||||||
) -> Option<ActorResult> where
|
) -> Option<ActorResult> where
|
||||||
F: FnOnce(&mut Activation) -> Option<ActorResult>,
|
F: FnOnce(&mut Activation) -> Option<ActorResult>,
|
||||||
{
|
{
|
||||||
match actor.state.write() {
|
match actor.state.lock() {
|
||||||
Err(_) => panicked_err(),
|
Err(_) => panicked_err(),
|
||||||
Ok(mut g) => match &mut *g {
|
Ok(mut g) => match &mut *g {
|
||||||
ActorState::Terminated { exit_status } =>
|
ActorState::Terminated { exit_status } =>
|
||||||
|
@ -501,7 +502,7 @@ impl Actor {
|
||||||
rx,
|
rx,
|
||||||
ac_ref: ActorRef {
|
ac_ref: ActorRef {
|
||||||
actor_id,
|
actor_id,
|
||||||
state: Arc::new(RwLock::new(ActorState::Running(RunningActor {
|
state: Arc::new(Mutex::new(ActorState::Running(RunningActor {
|
||||||
actor_id,
|
actor_id,
|
||||||
tx,
|
tx,
|
||||||
mailbox: Weak::new(),
|
mailbox: Weak::new(),
|
||||||
|
@ -525,7 +526,7 @@ impl Actor {
|
||||||
|
|
||||||
pub fn create_and_start_inert<M>(name: tracing::Span) -> Arc<Ref<M>> {
|
pub fn create_and_start_inert<M>(name: tracing::Span) -> Arc<Ref<M>> {
|
||||||
let ac = Self::new();
|
let ac = Self::new();
|
||||||
let r = ac.ac_ref.write(|s| s.unwrap().expect_running().create_inert());
|
let r = ac.ac_ref.access(|s| s.unwrap().expect_running().create_inert());
|
||||||
ac.start(name);
|
ac.start(name);
|
||||||
r
|
r
|
||||||
}
|
}
|
||||||
|
@ -603,22 +604,15 @@ fn panicked_err() -> Option<ActorResult> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ActorRef {
|
impl ActorRef {
|
||||||
pub fn read<R, F: FnOnce(Option<&ActorState>) -> R>(&self, f: F) -> R {
|
pub fn access<R, F: FnOnce(Option<&mut ActorState>) -> R>(&self, f: F) -> R {
|
||||||
match self.state.read() {
|
match self.state.lock() {
|
||||||
Err(_) => f(None),
|
|
||||||
Ok(g) => f(Some(&*g)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write<R, F: FnOnce(Option<&mut ActorState>) -> R>(&self, f: F) -> R {
|
|
||||||
match self.state.write() {
|
|
||||||
Err(_) => f(None),
|
Err(_) => f(None),
|
||||||
Ok(mut g) => f(Some(&mut *g)),
|
Ok(mut g) => f(Some(&mut *g)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn exit_status(&self) -> Option<ActorResult> {
|
pub fn exit_status(&self) -> Option<ActorResult> {
|
||||||
self.read(|s| s.map_or_else(
|
self.access(|s| s.map_or_else(
|
||||||
panicked_err,
|
panicked_err,
|
||||||
|state| match state {
|
|state| match state {
|
||||||
ActorState::Running(_) => None,
|
ActorState::Running(_) => None,
|
||||||
|
@ -668,7 +662,7 @@ impl RunningActor {
|
||||||
pub fn create_inert<M>(&mut self) -> Arc<Ref<M>> {
|
pub fn create_inert<M>(&mut self) -> Arc<Ref<M>> {
|
||||||
Arc::new(Ref {
|
Arc::new(Ref {
|
||||||
mailbox: self.mailbox(),
|
mailbox: self.mailbox(),
|
||||||
target: RwLock::new(None),
|
target: Mutex::new(None),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -756,7 +750,7 @@ pub fn external_events(mailbox: &Arc<Mailbox>, debtor: &Arc<Debtor>, events: Pen
|
||||||
|
|
||||||
impl<M> Ref<M> {
|
impl<M> Ref<M> {
|
||||||
pub fn become_entity<E: 'static + Entity<M>>(&self, e: E) {
|
pub fn become_entity<E: 'static + Entity<M>>(&self, e: E) {
|
||||||
let mut g = self.target.write().expect("unpoisoned");
|
let mut g = self.target.lock().expect("unpoisoned");
|
||||||
if g.is_some() {
|
if g.is_some() {
|
||||||
panic!("Double initialization of Ref");
|
panic!("Double initialization of Ref");
|
||||||
}
|
}
|
||||||
|
@ -764,7 +758,7 @@ impl<M> Ref<M> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_entity<R, F: FnOnce(&mut dyn Entity<M>) -> R>(&self, f: F) -> R {
|
pub fn with_entity<R, F: FnOnce(&mut dyn Entity<M>) -> R>(&self, f: F) -> R {
|
||||||
let mut g = self.target.write().expect("unpoisoned");
|
let mut g = self.target.lock().expect("unpoisoned");
|
||||||
f(g.as_mut().expect("initialized").as_mut())
|
f(g.as_mut().expect("initialized").as_mut())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -815,7 +809,7 @@ impl Cap {
|
||||||
{
|
{
|
||||||
Self::new(&Arc::new(Ref {
|
Self::new(&Arc::new(Ref {
|
||||||
mailbox: Arc::clone(&underlying.mailbox),
|
mailbox: Arc::clone(&underlying.mailbox),
|
||||||
target: RwLock::new(Some(Box::new(Guard { underlying: underlying.clone() }))),
|
target: Mutex::new(Some(Box::new(Guard { underlying: underlying.clone() }))),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
26
src/relay.rs
26
src/relay.rs
|
@ -37,7 +37,7 @@ use std::convert::TryFrom;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::RwLock;
|
use std::sync::Mutex;
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
|
@ -75,7 +75,7 @@ pub enum Output {
|
||||||
Bytes(Pin<Box<dyn AsyncWrite + Send>>),
|
Bytes(Pin<Box<dyn AsyncWrite + Send>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
type TunnelRelayRef = Arc<RwLock<Option<TunnelRelay>>>;
|
type TunnelRelayRef = Arc<Mutex<Option<TunnelRelay>>>;
|
||||||
|
|
||||||
// There are other kinds of relay. This one has exactly two participants connected to each other.
|
// There are other kinds of relay. This one has exactly two participants connected to each other.
|
||||||
pub struct TunnelRelay
|
pub struct TunnelRelay
|
||||||
|
@ -178,7 +178,7 @@ impl TunnelRelay {
|
||||||
initial_oid: Option<sturdy::Oid>,
|
initial_oid: Option<sturdy::Oid>,
|
||||||
) -> Option<Arc<Cap>> {
|
) -> Option<Arc<Cap>> {
|
||||||
let (output_tx, output_rx) = unbounded_channel();
|
let (output_tx, output_rx) = unbounded_channel();
|
||||||
let tr_ref = Arc::new(RwLock::new(None));
|
let tr_ref = Arc::new(Mutex::new(None));
|
||||||
let self_entity = t.state.create(TunnelRefEntity {
|
let self_entity = t.state.create(TunnelRefEntity {
|
||||||
relay_ref: Arc::clone(&tr_ref),
|
relay_ref: Arc::clone(&tr_ref),
|
||||||
});
|
});
|
||||||
|
@ -200,7 +200,7 @@ impl TunnelRelay {
|
||||||
}
|
}
|
||||||
let result = initial_oid.map(
|
let result = initial_oid.map(
|
||||||
|io| Arc::clone(&tr.membranes.import_oid(t.state, &tr_ref, io).obj));
|
|io| Arc::clone(&tr.membranes.import_oid(t.state, &tr_ref, io).obj));
|
||||||
*tr_ref.write().unwrap() = Some(tr);
|
*tr_ref.lock().unwrap() = Some(tr);
|
||||||
t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx));
|
t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx));
|
||||||
t.state.linked_task(crate::name!("reader"), input_loop(t.actor.clone(), i, tr_ref));
|
t.state.linked_task(crate::name!("reader"), input_loop(t.actor.clone(), i, tr_ref));
|
||||||
t.state.add_exit_hook(&self_entity);
|
t.state.add_exit_hook(&self_entity);
|
||||||
|
@ -303,7 +303,7 @@ impl TunnelRelay {
|
||||||
impl Entity<Synced> for SyncPeer {
|
impl Entity<Synced> for SyncPeer {
|
||||||
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
|
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
|
||||||
self.peer.message(t, _Any::new(true));
|
self.peer.message(t, _Any::new(true));
|
||||||
let mut g = self.relay_ref.write().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) {
|
if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) {
|
||||||
let ws = Arc::clone(ws); // cloned to release the borrow to permit the release
|
let ws = Arc::clone(ws); // cloned to release the borrow to permit the release
|
||||||
|
@ -511,7 +511,7 @@ async fn input_loop(
|
||||||
Ok(t.state.shutdown())
|
Ok(t.state.shutdown())
|
||||||
}),
|
}),
|
||||||
Some(bs) => Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
|
Some(bs) => Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
|
||||||
let mut g = relay.write().expect("unpoisoned");
|
let mut g = relay.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
tr.handle_inbound_datagram(t, &bs?)
|
tr.handle_inbound_datagram(t, &bs?)
|
||||||
})?,
|
})?,
|
||||||
|
@ -540,7 +540,7 @@ async fn input_loop(
|
||||||
Ok(t.state.shutdown())
|
Ok(t.state.shutdown())
|
||||||
}),
|
}),
|
||||||
_ => Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
|
_ => Activation::for_actor(&ac, Arc::clone(&debtor), |t| {
|
||||||
let mut g = relay.write().expect("unpoisoned");
|
let mut g = relay.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
tr.handle_inbound_stream(t, &mut buf)
|
tr.handle_inbound_stream(t, &mut buf)
|
||||||
})?,
|
})?,
|
||||||
|
@ -573,7 +573,7 @@ async fn output_loop(
|
||||||
|
|
||||||
impl Entity<()> for TunnelRefEntity {
|
impl Entity<()> for TunnelRefEntity {
|
||||||
fn message(&mut self, t: &mut Activation, _m: ()) -> ActorResult {
|
fn message(&mut self, t: &mut Activation, _m: ()) -> ActorResult {
|
||||||
let mut g = self.relay_ref.write().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
let events = std::mem::take(&mut tr.pending_outbound);
|
let events = std::mem::take(&mut tr.pending_outbound);
|
||||||
tr.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events))))
|
tr.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events))))
|
||||||
|
@ -582,7 +582,7 @@ impl Entity<()> for TunnelRefEntity {
|
||||||
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
||||||
if let Err(e) = &**exit_status {
|
if let Err(e) = &**exit_status {
|
||||||
let e = e.clone();
|
let e = e.clone();
|
||||||
let mut g = self.relay_ref.write().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
tr.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?;
|
tr.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?;
|
||||||
}
|
}
|
||||||
|
@ -592,7 +592,7 @@ impl Entity<()> for TunnelRefEntity {
|
||||||
|
|
||||||
impl Entity<_Any> for RelayEntity {
|
impl Entity<_Any> for RelayEntity {
|
||||||
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
|
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
|
||||||
let mut g = self.relay_ref.write().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert {
|
tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert {
|
||||||
assertion: P::Assertion(a),
|
assertion: P::Assertion(a),
|
||||||
|
@ -600,21 +600,21 @@ impl Entity<_Any> for RelayEntity {
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
|
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
|
||||||
let mut g = self.relay_ref.write().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract {
|
tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract {
|
||||||
handle: P::Handle(h.into()),
|
handle: P::Handle(h.into()),
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
|
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
|
||||||
let mut g = self.relay_ref.write().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message {
|
tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message {
|
||||||
body: P::Assertion(m)
|
body: P::Assertion(m)
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult {
|
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult {
|
||||||
let mut g = self.relay_ref.write().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync {
|
tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync {
|
||||||
peer: Cap::guard(&peer)
|
peer: Cap::guard(&peer)
|
||||||
|
|
Loading…
Reference in New Issue