2021-07-03 07:03:52 +00:00
|
|
|
use super::ActorId;
|
2021-07-15 07:13:31 +00:00
|
|
|
use super::schemas::sturdy;
|
2021-07-03 07:03:52 +00:00
|
|
|
use super::error::Error;
|
2021-07-24 21:22:01 +00:00
|
|
|
use super::error::encode_error;
|
2021-07-08 22:04:11 +00:00
|
|
|
use super::error::error;
|
2021-07-15 07:13:31 +00:00
|
|
|
use super::rewrite::CaveatError;
|
|
|
|
use super::rewrite::CheckedCaveat;
|
2021-07-03 07:03:52 +00:00
|
|
|
|
|
|
|
use preserves::value::Domain;
|
|
|
|
use preserves::value::IOValue;
|
|
|
|
use preserves::value::Map;
|
|
|
|
use preserves::value::NestedValue;
|
2021-07-22 14:53:56 +00:00
|
|
|
use preserves_schema::support::ParseError;
|
2021-07-03 07:03:52 +00:00
|
|
|
|
|
|
|
use std::boxed::Box;
|
|
|
|
use std::collections::hash_map::HashMap;
|
2021-07-22 14:53:56 +00:00
|
|
|
use std::convert::TryFrom;
|
2021-07-15 11:13:22 +00:00
|
|
|
use std::convert::TryInto;
|
2021-07-03 07:03:52 +00:00
|
|
|
use std::sync::Arc;
|
2021-07-25 21:12:07 +00:00
|
|
|
use std::sync::Mutex;
|
2021-07-15 11:13:22 +00:00
|
|
|
use std::sync::RwLock;
|
2021-07-22 14:53:56 +00:00
|
|
|
use std::sync::Weak;
|
|
|
|
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
2021-07-03 07:03:52 +00:00
|
|
|
|
|
|
|
use tokio::select;
|
2021-07-27 14:30:42 +00:00
|
|
|
use tokio::sync::Notify;
|
2021-07-03 07:03:52 +00:00
|
|
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
|
|
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
|
2021-07-08 22:04:11 +00:00
|
|
|
use tracing::Instrument;
|
|
|
|
|
2021-08-11 21:16:01 +00:00
|
|
|
pub type AnyValue = super::schemas::internal_protocol::_Any;
|
|
|
|
|
2021-07-22 08:07:49 +00:00
|
|
|
pub type Handle = u64;
|
2021-07-03 07:03:52 +00:00
|
|
|
|
|
|
|
pub type ActorResult = Result<(), Error>;
|
|
|
|
pub type ActorHandle = tokio::task::JoinHandle<ActorResult>;
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
pub struct Synced;
|
2021-07-21 23:05:08 +00:00
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub trait Entity<M>: Send {
|
2021-07-22 14:53:56 +00:00
|
|
|
fn assert(&mut self, _t: &mut Activation, _a: M, _h: Handle) -> ActorResult {
|
2021-07-03 07:03:52 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
fn retract(&mut self, _t: &mut Activation, _h: Handle) -> ActorResult {
|
|
|
|
Ok(())
|
|
|
|
}
|
2021-07-22 14:53:56 +00:00
|
|
|
fn message(&mut self, _t: &mut Activation, _m: M) -> ActorResult {
|
2021-07-03 07:03:52 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
2021-07-22 14:53:56 +00:00
|
|
|
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult {
|
|
|
|
t.message(&peer, Synced);
|
2021-07-03 07:03:52 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
2021-07-06 18:56:36 +00:00
|
|
|
fn turn_end(&mut self, _t: &mut Activation) -> ActorResult {
|
|
|
|
Ok(())
|
|
|
|
}
|
2021-07-22 14:53:56 +00:00
|
|
|
fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &Arc<ActorResult>) -> ActorResult {
|
2021-07-21 23:05:08 +00:00
|
|
|
Ok(())
|
2021-07-06 18:56:36 +00:00
|
|
|
}
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-07-24 21:22:01 +00:00
|
|
|
pub struct InertEntity;
|
|
|
|
impl<M> Entity<M> for InertEntity {}
|
|
|
|
|
2021-07-23 06:11:48 +00:00
|
|
|
enum CleanupAction {
|
2021-07-24 21:22:01 +00:00
|
|
|
ForMyself(Action),
|
|
|
|
ForAnother(Arc<Mailbox>, Action),
|
2021-07-12 15:41:12 +00:00
|
|
|
}
|
|
|
|
|
2021-07-23 06:11:48 +00:00
|
|
|
type CleanupActions = Map<Handle, CleanupAction>;
|
2021-07-26 08:53:56 +00:00
|
|
|
pub type Action = Box<dyn Send + FnOnce(&mut Activation) -> ActorResult>;
|
2021-07-22 07:56:21 +00:00
|
|
|
pub type PendingEventQueue = Vec<Action>;
|
2021-07-03 07:03:52 +00:00
|
|
|
|
|
|
|
// This is what other implementations call a "Turn", renamed here to
|
|
|
|
// avoid conflicts with schemas::internal_protocol::Turn.
|
|
|
|
pub struct Activation<'activation> {
|
2021-07-24 21:22:01 +00:00
|
|
|
pub actor: ActorRef,
|
|
|
|
pub state: &'activation mut RunningActor,
|
|
|
|
pending: EventBuffer,
|
|
|
|
}
|
|
|
|
|
|
|
|
struct EventBuffer {
|
2021-07-15 11:13:22 +00:00
|
|
|
pub debtor: Arc<Debtor>,
|
2021-07-22 07:56:21 +00:00
|
|
|
queues: HashMap<ActorId, (UnboundedSender<SystemMessage>, PendingEventQueue)>,
|
2021-07-24 21:22:01 +00:00
|
|
|
for_myself: PendingEventQueue,
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-07-15 11:13:22 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct Debtor {
|
|
|
|
id: u64,
|
|
|
|
debt: Arc<AtomicI64>,
|
2021-07-27 14:30:42 +00:00
|
|
|
notify: Notify,
|
2021-07-15 11:13:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct LoanedItem<T> {
|
|
|
|
pub debtor: Arc<Debtor>,
|
|
|
|
pub cost: usize,
|
|
|
|
pub item: T,
|
|
|
|
}
|
|
|
|
|
2021-07-03 07:03:52 +00:00
|
|
|
enum SystemMessage {
|
|
|
|
Release,
|
2021-07-21 21:53:55 +00:00
|
|
|
Turn(LoanedItem<PendingEventQueue>),
|
2021-07-03 07:03:52 +00:00
|
|
|
Crash(Error),
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct Mailbox {
|
|
|
|
pub actor_id: ActorId,
|
|
|
|
tx: UnboundedSender<SystemMessage>,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct Actor {
|
2021-07-23 06:10:09 +00:00
|
|
|
rx: UnboundedReceiver<SystemMessage>,
|
2021-07-24 21:22:01 +00:00
|
|
|
ac_ref: ActorRef,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct ActorRef {
|
|
|
|
pub actor_id: ActorId,
|
2021-07-25 21:12:07 +00:00
|
|
|
state: Arc<Mutex<ActorState>>,
|
2021-07-24 21:22:01 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub enum ActorState {
|
|
|
|
Running(RunningActor),
|
|
|
|
Terminated {
|
|
|
|
exit_status: Arc<ActorResult>,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct RunningActor {
|
|
|
|
pub actor_id: ActorId,
|
|
|
|
tx: UnboundedSender<SystemMessage>,
|
2021-07-22 14:53:56 +00:00
|
|
|
mailbox: Weak<Mailbox>,
|
2021-07-23 06:11:48 +00:00
|
|
|
cleanup_actions: CleanupActions,
|
2021-07-06 18:56:36 +00:00
|
|
|
next_task_id: u64,
|
|
|
|
linked_tasks: Map<u64, CancellationToken>,
|
2021-07-26 08:53:56 +00:00
|
|
|
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
|
2021-07-15 07:13:31 +00:00
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
pub struct Ref<M> {
|
|
|
|
pub mailbox: Arc<Mailbox>,
|
2021-07-25 21:12:07 +00:00
|
|
|
pub target: Mutex<Option<Box<dyn Entity<M>>>>,
|
2021-07-21 23:05:08 +00:00
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
|
|
|
pub struct Cap {
|
2021-08-11 21:16:01 +00:00
|
|
|
pub underlying: Arc<Ref<AnyValue>>,
|
2021-07-22 14:53:56 +00:00
|
|
|
pub attenuation: Vec<CheckedCaveat>,
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
pub struct Guard<M>
|
|
|
|
where
|
2021-08-11 21:16:01 +00:00
|
|
|
for<'a> &'a M: Into<AnyValue>,
|
|
|
|
for<'a> M: TryFrom<&'a AnyValue>,
|
2021-07-22 14:53:56 +00:00
|
|
|
{
|
|
|
|
underlying: Arc<Ref<M>>
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
//---------------------------------------------------------------------------
|
|
|
|
|
2021-07-15 11:13:22 +00:00
|
|
|
static NEXT_DEBTOR_ID: AtomicU64 = AtomicU64::new(4);
|
|
|
|
|
2021-07-08 22:04:11 +00:00
|
|
|
preserves_schema::support::lazy_static! {
|
2021-07-15 11:13:22 +00:00
|
|
|
pub static ref SYNDICATE_CREDIT: i64 = {
|
|
|
|
let credit =
|
|
|
|
std::env::var("SYNDICATE_CREDIT").unwrap_or("100".to_owned())
|
|
|
|
.parse::<i64>().expect("Valid SYNDICATE_CREDIT environment variable");
|
|
|
|
tracing::info!("Configured SYNDICATE_CREDIT = {}", credit);
|
|
|
|
credit
|
|
|
|
};
|
|
|
|
|
|
|
|
pub static ref DEBTORS: RwLock<Map<u64, (tracing::Span, Arc<AtomicI64>)>> =
|
|
|
|
RwLock::new(Map::new());
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn start_debt_reporter() {
|
2021-07-24 21:22:01 +00:00
|
|
|
Actor::new().boot(crate::name!("debt-reporter"), |t| {
|
|
|
|
t.state.linked_task(crate::name!("tick"), async {
|
2021-07-15 11:13:22 +00:00
|
|
|
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
|
|
|
|
loop {
|
|
|
|
timer.tick().await;
|
|
|
|
for (id, (name, debt)) in DEBTORS.read().unwrap().iter() {
|
|
|
|
let _enter = name.enter();
|
|
|
|
tracing::info!(id, debt = debug(debt.load(Ordering::Relaxed)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
Ok(())
|
2021-07-24 21:22:01 +00:00
|
|
|
});
|
2021-07-08 22:04:11 +00:00
|
|
|
}
|
|
|
|
|
2021-08-11 21:16:01 +00:00
|
|
|
impl TryFrom<&AnyValue> for Synced {
|
2021-07-22 14:53:56 +00:00
|
|
|
type Error = ParseError;
|
2021-08-11 21:16:01 +00:00
|
|
|
fn try_from(value: &AnyValue) -> Result<Self, Self::Error> {
|
2021-07-22 14:53:56 +00:00
|
|
|
if let Some(true) = value.value().as_boolean() {
|
|
|
|
Ok(Synced)
|
|
|
|
} else {
|
|
|
|
Err(ParseError::conformance_error("Synced"))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-11 21:16:01 +00:00
|
|
|
impl From<&Synced> for AnyValue {
|
2021-07-22 14:53:56 +00:00
|
|
|
fn from(_value: &Synced) -> Self {
|
2021-08-11 21:16:01 +00:00
|
|
|
AnyValue::new(true)
|
2021-07-22 14:53:56 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-03 07:03:52 +00:00
|
|
|
impl<'activation> Activation<'activation> {
|
2021-07-24 21:22:01 +00:00
|
|
|
fn make(actor: &ActorRef, debtor: Arc<Debtor>, state: &'activation mut RunningActor) -> Self {
|
2021-07-03 07:03:52 +00:00
|
|
|
Activation {
|
2021-07-24 21:22:01 +00:00
|
|
|
actor: actor.clone(),
|
|
|
|
state,
|
|
|
|
pending: EventBuffer::new(debtor),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn for_actor<F>(
|
|
|
|
actor: &ActorRef,
|
|
|
|
debtor: Arc<Debtor>,
|
|
|
|
f: F,
|
|
|
|
) -> ActorResult where
|
|
|
|
F: FnOnce(&mut Activation) -> ActorResult,
|
|
|
|
{
|
|
|
|
match Self::for_actor_exit(actor, debtor, |t| match f(t) {
|
|
|
|
Ok(()) => None,
|
|
|
|
Err(e) => Some(Err(e)),
|
|
|
|
}) {
|
|
|
|
None => Ok(()),
|
|
|
|
Some(e) => Err(error("Could not activate terminated actor", encode_error(e))),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn for_actor_exit<F>(
|
|
|
|
actor: &ActorRef,
|
|
|
|
debtor: Arc<Debtor>,
|
|
|
|
f: F,
|
|
|
|
) -> Option<ActorResult> where
|
|
|
|
F: FnOnce(&mut Activation) -> Option<ActorResult>,
|
|
|
|
{
|
2021-07-25 21:12:07 +00:00
|
|
|
match actor.state.lock() {
|
2021-07-24 21:22:01 +00:00
|
|
|
Err(_) => panicked_err(),
|
|
|
|
Ok(mut g) => match &mut *g {
|
|
|
|
ActorState::Terminated { exit_status } =>
|
|
|
|
Some((**exit_status).clone()),
|
|
|
|
ActorState::Running(state) =>
|
|
|
|
match f(&mut Activation::make(actor, debtor, state)) {
|
|
|
|
None => None,
|
|
|
|
Some(exit_status) => {
|
|
|
|
let exit_status = Arc::new(exit_status);
|
|
|
|
let mut t = Activation::make(actor, Debtor::new(crate::name!("shutdown")), state);
|
|
|
|
for action in std::mem::take(&mut t.state.exit_hooks) {
|
|
|
|
if let Err(err) = action(&mut t, &exit_status) {
|
|
|
|
tracing::error!(err = debug(err), "error in exit hook");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
*g = ActorState::Terminated {
|
|
|
|
exit_status: Arc::clone(&exit_status),
|
|
|
|
};
|
|
|
|
Some((*exit_status).clone())
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
fn immediate_oid<M>(&self, r: &Arc<Ref<M>>) {
|
|
|
|
if r.mailbox.actor_id != self.actor.actor_id {
|
2021-07-24 21:22:01 +00:00
|
|
|
panic!("Cannot use for_myself to send to remote peers");
|
2021-07-12 15:41:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn assert<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
|
2021-07-03 07:03:52 +00:00
|
|
|
let handle = crate::next_handle();
|
2021-07-22 14:53:56 +00:00
|
|
|
{
|
|
|
|
let r = Arc::clone(r);
|
2021-07-24 21:22:01 +00:00
|
|
|
self.pending.queue_for(&r).push(Box::new(
|
2021-07-22 14:53:56 +00:00
|
|
|
move |t| r.with_entity(|e| e.assert(t, a, handle))));
|
|
|
|
}
|
|
|
|
{
|
|
|
|
let r = Arc::clone(r);
|
2021-07-24 21:22:01 +00:00
|
|
|
self.state.cleanup_actions.insert(
|
2021-07-22 14:53:56 +00:00
|
|
|
handle,
|
2021-07-24 21:22:01 +00:00
|
|
|
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
|
2021-07-22 14:53:56 +00:00
|
|
|
move |t| r.with_entity(|e| e.retract(t, handle)))));
|
2021-07-15 07:13:31 +00:00
|
|
|
}
|
2021-07-12 15:41:12 +00:00
|
|
|
handle
|
|
|
|
}
|
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn assert_for_myself<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
|
2021-07-21 21:53:55 +00:00
|
|
|
self.immediate_oid(r);
|
2021-07-12 15:41:12 +00:00
|
|
|
let handle = crate::next_handle();
|
2021-07-22 14:53:56 +00:00
|
|
|
{
|
|
|
|
let r = Arc::clone(r);
|
2021-07-24 21:22:01 +00:00
|
|
|
self.pending.for_myself.push(Box::new(
|
2021-07-22 14:53:56 +00:00
|
|
|
move |t| r.with_entity(|e| e.assert(t, a, handle))));
|
|
|
|
}
|
|
|
|
{
|
|
|
|
let r = Arc::clone(r);
|
2021-07-24 21:22:01 +00:00
|
|
|
self.state.cleanup_actions.insert(
|
2021-07-22 14:53:56 +00:00
|
|
|
handle,
|
2021-07-24 21:22:01 +00:00
|
|
|
CleanupAction::ForMyself(Box::new(
|
2021-07-22 14:53:56 +00:00
|
|
|
move |t| r.with_entity(|e| e.retract(t, handle)))));
|
2021-07-15 07:13:31 +00:00
|
|
|
}
|
2021-07-03 07:03:52 +00:00
|
|
|
handle
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn retract(&mut self, handle: Handle) {
|
2021-07-24 21:22:01 +00:00
|
|
|
if let Some(d) = self.state.cleanup_actions.remove(&handle) {
|
|
|
|
self.pending.execute_cleanup_action(d)
|
2021-07-12 15:41:12 +00:00
|
|
|
}
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn message<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, m: M) {
|
2021-07-22 14:53:56 +00:00
|
|
|
let r = Arc::clone(r);
|
2021-07-24 21:22:01 +00:00
|
|
|
self.pending.queue_for(&r).push(Box::new(
|
2021-07-22 14:53:56 +00:00
|
|
|
move |t| r.with_entity(|e| e.message(t, m))))
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn message_for_myself<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, m: M) {
|
2021-07-21 21:53:55 +00:00
|
|
|
self.immediate_oid(r);
|
2021-07-22 14:53:56 +00:00
|
|
|
let r = Arc::clone(r);
|
2021-07-24 21:22:01 +00:00
|
|
|
self.pending.for_myself.push(Box::new(
|
2021-07-22 14:53:56 +00:00
|
|
|
move |t| r.with_entity(|e| e.message(t, m))))
|
2021-07-08 22:04:11 +00:00
|
|
|
}
|
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn sync<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, peer: Arc<Ref<Synced>>) {
|
2021-07-22 07:56:21 +00:00
|
|
|
let r = Arc::clone(r);
|
2021-07-24 21:22:01 +00:00
|
|
|
self.pending.queue_for(&r).push(Box::new(
|
2021-07-22 07:56:21 +00:00
|
|
|
move |t| r.with_entity(|e| e.sync(t, peer))))
|
2021-07-06 18:56:36 +00:00
|
|
|
}
|
|
|
|
|
2021-07-24 21:22:01 +00:00
|
|
|
pub fn debtor(&self) -> &Arc<Debtor> {
|
|
|
|
&self.pending.debtor
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn deliver(&mut self) {
|
|
|
|
self.pending.deliver();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl EventBuffer {
|
|
|
|
fn new(debtor: Arc<Debtor>) -> Self {
|
|
|
|
EventBuffer {
|
|
|
|
debtor,
|
|
|
|
queues: HashMap::new(),
|
|
|
|
for_myself: Vec::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn execute_cleanup_action(&mut self, d: CleanupAction) {
|
|
|
|
match d {
|
|
|
|
CleanupAction::ForAnother(mailbox, action) =>
|
|
|
|
self.queue_for_mailbox(&mailbox).push(action),
|
|
|
|
CleanupAction::ForMyself(action) =>
|
|
|
|
self.for_myself.push(action),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
fn queue_for<M>(&mut self, r: &Arc<Ref<M>>) -> &mut PendingEventQueue {
|
|
|
|
self.queue_for_mailbox(&r.mailbox)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn queue_for_mailbox(&mut self, mailbox: &Arc<Mailbox>) -> &mut PendingEventQueue {
|
|
|
|
&mut self.queues.entry(mailbox.actor_id)
|
|
|
|
.or_insert((mailbox.tx.clone(), Vec::new())).1
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-07-06 18:56:36 +00:00
|
|
|
fn deliver(&mut self) {
|
2021-07-24 21:22:01 +00:00
|
|
|
if !self.for_myself.is_empty() {
|
|
|
|
panic!("Unprocessed for_myself events remain at deliver() time");
|
2021-07-12 15:41:12 +00:00
|
|
|
}
|
2021-07-22 07:56:21 +00:00
|
|
|
for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() {
|
|
|
|
let _ = send_actions(&tx, &self.debtor, turn);
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-24 21:22:01 +00:00
|
|
|
impl Drop for EventBuffer {
|
2021-07-03 07:03:52 +00:00
|
|
|
fn drop(&mut self) {
|
|
|
|
self.deliver()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-15 11:13:22 +00:00
|
|
|
impl Debtor {
|
|
|
|
pub fn new(name: tracing::Span) -> Arc<Self> {
|
|
|
|
let id = NEXT_DEBTOR_ID.fetch_add(1, Ordering::Relaxed);
|
|
|
|
let debt = Arc::new(AtomicI64::new(0));
|
|
|
|
DEBTORS.write().unwrap().insert(id, (name, Arc::clone(&debt)));
|
|
|
|
Arc::new(Debtor {
|
|
|
|
id,
|
|
|
|
debt,
|
2021-07-27 14:30:42 +00:00
|
|
|
notify: Notify::new(),
|
2021-07-15 11:13:22 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn balance(&self) -> i64 {
|
|
|
|
self.debt.load(Ordering::Relaxed)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn borrow(&self, token_count: usize) {
|
|
|
|
let token_count: i64 = token_count.try_into().expect("manageable token count");
|
|
|
|
self.debt.fetch_add(token_count, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn repay(&self, token_count: usize) {
|
|
|
|
let token_count: i64 = token_count.try_into().expect("manageable token count");
|
|
|
|
let _old_debt = self.debt.fetch_sub(token_count, Ordering::Relaxed);
|
2021-07-27 14:30:42 +00:00
|
|
|
if _old_debt - token_count <= *SYNDICATE_CREDIT {
|
|
|
|
self.notify.notify_one();
|
|
|
|
}
|
2021-07-15 11:13:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn ensure_clear_funds(&self) {
|
|
|
|
let limit = *SYNDICATE_CREDIT;
|
2021-07-27 14:30:42 +00:00
|
|
|
// tokio::task::yield_now().await;
|
2021-07-15 11:13:22 +00:00
|
|
|
while self.balance() > limit {
|
2021-07-27 14:30:42 +00:00
|
|
|
// tokio::task::yield_now().await;
|
|
|
|
self.notify.notified().await;
|
2021-07-15 07:13:31 +00:00
|
|
|
}
|
|
|
|
}
|
2021-07-15 11:13:22 +00:00
|
|
|
}
|
2021-07-15 07:13:31 +00:00
|
|
|
|
2021-07-15 11:13:22 +00:00
|
|
|
impl Drop for Debtor {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
DEBTORS.write().unwrap().remove(&self.id);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> LoanedItem<T> {
|
|
|
|
pub fn new(debtor: &Arc<Debtor>, cost: usize, item: T) -> Self {
|
|
|
|
debtor.borrow(cost);
|
|
|
|
LoanedItem { debtor: Arc::clone(debtor), cost, item }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> Drop for LoanedItem<T> {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.debtor.repay(self.cost);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-22 07:56:21 +00:00
|
|
|
#[must_use]
|
|
|
|
fn send_actions(
|
|
|
|
tx: &UnboundedSender<SystemMessage>,
|
|
|
|
debtor: &Arc<Debtor>,
|
|
|
|
t: PendingEventQueue,
|
|
|
|
) -> ActorResult {
|
|
|
|
let token_count = t.len();
|
|
|
|
tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t)))
|
2021-08-11 21:16:01 +00:00
|
|
|
.map_err(|_| error("Target actor not running", AnyValue::new(false)))
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl std::fmt::Debug for Mailbox {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
|
2021-07-22 14:53:56 +00:00
|
|
|
write!(f, "#<Mailbox {}>", self.actor_id)
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl std::hash::Hash for Mailbox {
|
|
|
|
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
2021-07-22 14:53:56 +00:00
|
|
|
self.actor_id.hash(state)
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Eq for Mailbox {}
|
|
|
|
impl PartialEq for Mailbox {
|
|
|
|
fn eq(&self, other: &Mailbox) -> bool {
|
2021-07-22 14:53:56 +00:00
|
|
|
self.actor_id == other.actor_id
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Ord for Mailbox {
|
|
|
|
fn cmp(&self, other: &Mailbox) -> std::cmp::Ordering {
|
2021-07-22 14:53:56 +00:00
|
|
|
return self.actor_id.cmp(&other.actor_id)
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PartialOrd for Mailbox {
|
|
|
|
fn partial_cmp(&self, other: &Mailbox) -> Option<std::cmp::Ordering> {
|
|
|
|
return Some(self.cmp(&other))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for Mailbox {
|
|
|
|
fn drop(&mut self) {
|
2021-07-22 14:53:56 +00:00
|
|
|
let _ = self.tx.send(SystemMessage::Release);
|
|
|
|
()
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Actor {
|
|
|
|
pub fn new() -> Self {
|
|
|
|
let (tx, rx) = unbounded_channel();
|
2021-07-06 18:56:36 +00:00
|
|
|
let actor_id = crate::next_actor_id();
|
2021-07-08 22:04:11 +00:00
|
|
|
// tracing::trace!(id = actor_id, "Actor::new");
|
2021-07-03 07:03:52 +00:00
|
|
|
Actor {
|
2021-07-23 06:10:09 +00:00
|
|
|
rx,
|
2021-07-24 21:22:01 +00:00
|
|
|
ac_ref: ActorRef {
|
|
|
|
actor_id,
|
2021-07-25 21:12:07 +00:00
|
|
|
state: Arc::new(Mutex::new(ActorState::Running(RunningActor {
|
2021-07-24 21:22:01 +00:00
|
|
|
actor_id,
|
|
|
|
tx,
|
|
|
|
mailbox: Weak::new(),
|
|
|
|
cleanup_actions: Map::new(),
|
|
|
|
next_task_id: 0,
|
|
|
|
linked_tasks: Map::new(),
|
|
|
|
exit_hooks: Vec::new(),
|
|
|
|
}))),
|
|
|
|
},
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn create_and_start<M, E: Entity<M> + Send + 'static>(
|
2021-07-21 23:05:08 +00:00
|
|
|
name: tracing::Span,
|
|
|
|
e: E,
|
2021-07-22 14:53:56 +00:00
|
|
|
) -> Arc<Ref<M>> {
|
|
|
|
let r = Self::create_and_start_inert(name);
|
|
|
|
r.become_entity(e);
|
|
|
|
r
|
2021-07-08 22:04:11 +00:00
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
pub fn create_and_start_inert<M>(name: tracing::Span) -> Arc<Ref<M>> {
|
2021-07-24 21:22:01 +00:00
|
|
|
let ac = Self::new();
|
2021-07-25 21:12:07 +00:00
|
|
|
let r = ac.ac_ref.access(|s| s.unwrap().expect_running().create_inert());
|
2021-07-08 22:04:11 +00:00
|
|
|
ac.start(name);
|
|
|
|
r
|
|
|
|
}
|
|
|
|
|
2021-07-24 21:22:01 +00:00
|
|
|
pub fn boot<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
|
|
|
|
mut self,
|
|
|
|
name: tracing::Span,
|
|
|
|
boot: F,
|
|
|
|
) -> ActorHandle {
|
|
|
|
name.record("actor_id", &self.ac_ref.actor_id);
|
|
|
|
tokio::spawn(async move {
|
|
|
|
tracing::trace!("start");
|
|
|
|
self.run(boot).await;
|
|
|
|
let result = self.ac_ref.exit_status().expect("terminated");
|
|
|
|
match &result {
|
|
|
|
Ok(()) => tracing::trace!("normal stop"),
|
|
|
|
Err(e) => tracing::error!("error stop: {}", e),
|
|
|
|
}
|
|
|
|
result
|
|
|
|
}.instrument(name))
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn start(self, name: tracing::Span) -> ActorHandle {
|
|
|
|
self.boot(name, |_ac| Ok(()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn terminate(&mut self, result: ActorResult) {
|
|
|
|
let _ = Activation::for_actor_exit(
|
|
|
|
&self.ac_ref, Debtor::new(crate::name!("shutdown")), |_| Some(result));
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn run<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
|
|
|
|
&mut self,
|
|
|
|
boot: F,
|
|
|
|
) -> () {
|
|
|
|
if Activation::for_actor(&self.ac_ref, Debtor::new(crate::name!("boot")), boot).is_err() {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
loop {
|
|
|
|
match self.rx.recv().await {
|
|
|
|
None => {
|
2021-08-11 21:16:01 +00:00
|
|
|
return self.terminate(Err(error("Unexpected channel close", AnyValue::new(false))));
|
2021-07-24 21:22:01 +00:00
|
|
|
}
|
|
|
|
Some(m) => match m {
|
|
|
|
SystemMessage::Release => {
|
|
|
|
tracing::trace!("SystemMessage::Release");
|
|
|
|
return self.terminate(Ok(()));
|
|
|
|
}
|
|
|
|
SystemMessage::Turn(mut loaned_item) => {
|
|
|
|
let mut actions = std::mem::take(&mut loaned_item.item);
|
|
|
|
let r = Activation::for_actor(
|
|
|
|
&self.ac_ref, Arc::clone(&loaned_item.debtor), |t| {
|
|
|
|
loop {
|
|
|
|
for action in actions.into_iter() { action(t)? }
|
|
|
|
actions = std::mem::take(&mut t.pending.for_myself);
|
|
|
|
if actions.is_empty() { break; }
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
});
|
|
|
|
if r.is_err() { return; }
|
|
|
|
}
|
|
|
|
SystemMessage::Crash(e) => {
|
|
|
|
tracing::trace!("SystemMessage::Crash({:?})", &e);
|
|
|
|
return self.terminate(Err(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn panicked_err() -> Option<ActorResult> {
|
2021-08-11 21:16:01 +00:00
|
|
|
Some(Err(error("Actor panicked", AnyValue::new(false))))
|
2021-07-24 21:22:01 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl ActorRef {
|
2021-07-25 21:12:07 +00:00
|
|
|
pub fn access<R, F: FnOnce(Option<&mut ActorState>) -> R>(&self, f: F) -> R {
|
|
|
|
match self.state.lock() {
|
2021-07-24 21:22:01 +00:00
|
|
|
Err(_) => f(None),
|
|
|
|
Ok(mut g) => f(Some(&mut *g)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn exit_status(&self) -> Option<ActorResult> {
|
2021-07-25 21:12:07 +00:00
|
|
|
self.access(|s| s.map_or_else(
|
2021-07-24 21:22:01 +00:00
|
|
|
panicked_err,
|
|
|
|
|state| match state {
|
|
|
|
ActorState::Running(_) => None,
|
|
|
|
ActorState::Terminated { exit_status } => Some((**exit_status).clone()),
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ActorState {
|
|
|
|
fn expect_running(&mut self) -> &mut RunningActor {
|
|
|
|
match self {
|
|
|
|
ActorState::Terminated { .. } => panic!("Expected a running actor"),
|
|
|
|
ActorState::Running(r) => r,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl RunningActor {
|
|
|
|
pub fn shutdown(&self) {
|
|
|
|
let _ = self.tx.send(SystemMessage::Release);
|
2021-07-06 18:56:36 +00:00
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
fn mailbox(&mut self) -> Arc<Mailbox> {
|
|
|
|
match self.mailbox.upgrade() {
|
|
|
|
None => {
|
|
|
|
let new_mailbox = Arc::new(Mailbox {
|
|
|
|
actor_id: self.actor_id,
|
|
|
|
tx: self.tx.clone(),
|
|
|
|
});
|
|
|
|
self.mailbox = Arc::downgrade(&new_mailbox);
|
|
|
|
new_mailbox
|
|
|
|
}
|
|
|
|
Some(m) => m
|
|
|
|
}
|
2021-07-06 18:56:36 +00:00
|
|
|
}
|
|
|
|
|
2021-07-24 21:22:01 +00:00
|
|
|
pub fn inert_entity<M>(&mut self) -> Arc<Ref<M>> {
|
|
|
|
self.create(InertEntity)
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn create<M, E: Entity<M> + Send + 'static>(&mut self, e: E) -> Arc<Ref<M>> {
|
2021-07-22 14:53:56 +00:00
|
|
|
let r = self.create_inert();
|
|
|
|
r.become_entity(e);
|
|
|
|
r
|
2021-07-08 22:04:11 +00:00
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
pub fn create_inert<M>(&mut self) -> Arc<Ref<M>> {
|
|
|
|
Arc::new(Ref {
|
|
|
|
mailbox: self.mailbox(),
|
2021-07-25 21:12:07 +00:00
|
|
|
target: Mutex::new(None),
|
2021-07-22 14:53:56 +00:00
|
|
|
})
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn add_exit_hook<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>) {
|
2021-07-22 14:53:56 +00:00
|
|
|
let r = Arc::clone(r);
|
2021-07-24 21:22:01 +00:00
|
|
|
self.exit_hooks.push(Box::new(move |t, exit_status| {
|
2021-07-22 14:53:56 +00:00
|
|
|
r.with_entity(|e| e.exit_hook(t, &exit_status))
|
|
|
|
}))
|
2021-07-06 18:56:36 +00:00
|
|
|
}
|
|
|
|
|
2021-07-24 21:22:01 +00:00
|
|
|
pub fn linked_task<F: 'static + Send + futures::Future<Output = ActorResult>>(
|
2021-07-03 07:03:52 +00:00
|
|
|
&mut self,
|
|
|
|
name: tracing::Span,
|
|
|
|
boot: F,
|
2021-07-08 22:04:11 +00:00
|
|
|
) {
|
2021-07-06 18:56:36 +00:00
|
|
|
let mailbox = self.mailbox();
|
2021-07-03 07:03:52 +00:00
|
|
|
let token = CancellationToken::new();
|
|
|
|
let task_id = self.next_task_id;
|
|
|
|
self.next_task_id += 1;
|
2021-07-08 22:04:11 +00:00
|
|
|
name.record("task_id", &task_id);
|
|
|
|
{
|
2021-07-03 07:03:52 +00:00
|
|
|
let token = token.clone();
|
|
|
|
tokio::spawn(async move {
|
2021-07-08 22:04:11 +00:00
|
|
|
tracing::trace!(task_id, "linked task start");
|
2021-07-03 07:03:52 +00:00
|
|
|
select! {
|
2021-07-06 18:56:36 +00:00
|
|
|
_ = token.cancelled() => {
|
2021-07-08 22:04:11 +00:00
|
|
|
tracing::trace!(task_id, "linked task cancelled");
|
2021-07-06 18:56:36 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
result = boot => {
|
|
|
|
match &result {
|
2021-07-08 22:04:11 +00:00
|
|
|
Ok(()) => {
|
|
|
|
tracing::trace!(task_id, "linked task normal stop");
|
|
|
|
()
|
|
|
|
}
|
2021-07-06 18:56:36 +00:00
|
|
|
Err(e) => {
|
2021-07-08 22:04:11 +00:00
|
|
|
tracing::error!(task_id, "linked task error: {}", e);
|
2021-07-06 18:56:36 +00:00
|
|
|
let _ = mailbox.tx.send(SystemMessage::Crash(e.clone()));
|
|
|
|
()
|
|
|
|
}
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
2021-07-06 18:56:36 +00:00
|
|
|
result
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
2021-07-08 22:04:11 +00:00
|
|
|
}.instrument(name));
|
|
|
|
}
|
2021-07-03 07:03:52 +00:00
|
|
|
self.linked_tasks.insert(task_id, token);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for Actor {
|
|
|
|
fn drop(&mut self) {
|
2021-07-23 06:10:09 +00:00
|
|
|
self.rx.close();
|
2021-07-24 21:22:01 +00:00
|
|
|
}
|
|
|
|
}
|
2021-07-15 07:13:31 +00:00
|
|
|
|
2021-07-24 21:22:01 +00:00
|
|
|
impl Drop for RunningActor {
|
|
|
|
fn drop(&mut self) {
|
2021-07-03 07:03:52 +00:00
|
|
|
for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() {
|
|
|
|
token.cancel();
|
|
|
|
}
|
|
|
|
|
2021-07-23 06:11:48 +00:00
|
|
|
let to_clear = std::mem::take(&mut self.cleanup_actions);
|
2021-07-15 07:13:31 +00:00
|
|
|
{
|
2021-07-24 21:22:01 +00:00
|
|
|
let mut b = EventBuffer::new(Debtor::new(crate::name!("drop")));
|
2021-07-22 07:56:21 +00:00
|
|
|
for (_handle, r) in to_clear.into_iter() {
|
|
|
|
tracing::trace!(h = debug(&_handle), "retract on termination");
|
2021-07-24 21:22:01 +00:00
|
|
|
b.execute_cleanup_action(r);
|
2021-07-15 07:13:31 +00:00
|
|
|
}
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
2021-07-15 07:13:31 +00:00
|
|
|
|
2021-07-08 22:04:11 +00:00
|
|
|
tracing::trace!("Actor::drop");
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-21 21:53:55 +00:00
|
|
|
#[must_use]
|
2021-07-22 14:53:56 +00:00
|
|
|
pub fn external_event(mailbox: &Arc<Mailbox>, debtor: &Arc<Debtor>, action: Action) -> ActorResult {
|
|
|
|
send_actions(&mailbox.tx, debtor, vec![action])
|
2021-07-21 21:53:55 +00:00
|
|
|
}
|
2021-07-15 07:13:31 +00:00
|
|
|
|
2021-07-21 21:53:55 +00:00
|
|
|
#[must_use]
|
2021-07-22 14:53:56 +00:00
|
|
|
pub fn external_events(mailbox: &Arc<Mailbox>, debtor: &Arc<Debtor>, events: PendingEventQueue) -> ActorResult {
|
|
|
|
send_actions(&mailbox.tx, debtor, events)
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<M> Ref<M> {
|
|
|
|
pub fn become_entity<E: 'static + Entity<M>>(&self, e: E) {
|
2021-07-25 21:12:07 +00:00
|
|
|
let mut g = self.target.lock().expect("unpoisoned");
|
2021-07-22 14:53:56 +00:00
|
|
|
if g.is_some() {
|
|
|
|
panic!("Double initialization of Ref");
|
|
|
|
}
|
|
|
|
*g = Some(Box::new(e));
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn with_entity<R, F: FnOnce(&mut dyn Entity<M>) -> R>(&self, f: F) -> R {
|
2021-07-25 21:12:07 +00:00
|
|
|
let mut g = self.target.lock().expect("unpoisoned");
|
2021-07-22 14:53:56 +00:00
|
|
|
f(g.as_mut().expect("initialized").as_mut())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<M> Ref<M> {
|
|
|
|
pub fn oid(&self) -> usize {
|
|
|
|
std::ptr::addr_of!(*self) as usize
|
|
|
|
}
|
2021-07-21 21:53:55 +00:00
|
|
|
}
|
2021-07-15 07:13:31 +00:00
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
impl<M> PartialEq for Ref<M> {
|
|
|
|
fn eq(&self, other: &Self) -> bool {
|
|
|
|
self.oid() == other.oid()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<M> Eq for Ref<M> {}
|
|
|
|
|
|
|
|
impl<M> std::hash::Hash for Ref<M> {
|
|
|
|
fn hash<H>(&self, hash: &mut H) where H: std::hash::Hasher {
|
|
|
|
self.oid().hash(hash)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<M> PartialOrd for Ref<M> {
|
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
|
|
|
Some(self.cmp(other))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<M> Ord for Ref<M> {
|
|
|
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
|
|
|
self.oid().cmp(&other.oid())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<M> std::fmt::Debug for Ref<M> {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
|
|
|
|
write!(f, "⌜{}:{:016x}⌝", self.mailbox.actor_id, self.oid())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Cap {
|
2021-07-26 08:53:56 +00:00
|
|
|
pub fn guard<M: 'static + Send>(underlying: &Arc<Ref<M>>) -> Arc<Self>
|
2021-07-22 14:53:56 +00:00
|
|
|
where
|
2021-08-11 21:16:01 +00:00
|
|
|
for<'a> &'a M: Into<AnyValue>,
|
|
|
|
for<'a> M: TryFrom<&'a AnyValue>,
|
2021-07-22 14:53:56 +00:00
|
|
|
{
|
|
|
|
Self::new(&Arc::new(Ref {
|
|
|
|
mailbox: Arc::clone(&underlying.mailbox),
|
2021-07-25 21:12:07 +00:00
|
|
|
target: Mutex::new(Some(Box::new(Guard { underlying: underlying.clone() }))),
|
2021-07-22 14:53:56 +00:00
|
|
|
}))
|
|
|
|
}
|
|
|
|
|
2021-08-11 21:16:01 +00:00
|
|
|
pub fn new(underlying: &Arc<Ref<AnyValue>>) -> Arc<Self> {
|
2021-07-22 14:53:56 +00:00
|
|
|
Arc::new(Cap {
|
|
|
|
underlying: Arc::clone(underlying),
|
|
|
|
attenuation: Vec::new(),
|
|
|
|
})
|
2021-07-22 07:56:21 +00:00
|
|
|
}
|
|
|
|
|
2021-07-15 07:13:31 +00:00
|
|
|
pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result<Arc<Self>, CaveatError> {
|
2021-07-22 14:53:56 +00:00
|
|
|
let mut r = Cap { attenuation: self.attenuation.clone(), .. self.clone() };
|
2021-07-15 07:13:31 +00:00
|
|
|
r.attenuation.extend(attenuation.check()?);
|
|
|
|
Ok(Arc::new(r))
|
|
|
|
}
|
|
|
|
|
2021-08-11 21:16:01 +00:00
|
|
|
pub fn rewrite(&self, mut a: AnyValue) -> Option<AnyValue> {
|
2021-07-15 07:13:31 +00:00
|
|
|
for c in &self.attenuation {
|
|
|
|
match c.rewrite(&a) {
|
|
|
|
Some(v) => a = v,
|
|
|
|
None => return None,
|
|
|
|
}
|
|
|
|
}
|
2021-07-22 07:56:21 +00:00
|
|
|
Some(a)
|
2021-07-06 18:56:36 +00:00
|
|
|
}
|
2021-07-22 14:53:56 +00:00
|
|
|
|
2021-08-11 21:16:01 +00:00
|
|
|
pub fn assert<M: Into<AnyValue>>(&self, t: &mut Activation, m: M) -> Option<Handle> {
|
2021-07-22 14:53:56 +00:00
|
|
|
self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m))
|
|
|
|
}
|
|
|
|
|
2021-08-11 21:16:01 +00:00
|
|
|
pub fn message<M: Into<AnyValue>>(&self, t: &mut Activation, m: M) {
|
2021-07-22 14:53:56 +00:00
|
|
|
if let Some(m) = self.rewrite(m.into()) {
|
|
|
|
t.message(&self.underlying, m)
|
|
|
|
}
|
|
|
|
}
|
2021-07-06 18:56:36 +00:00
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
impl std::fmt::Debug for Cap {
|
2021-07-08 22:04:11 +00:00
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
|
2021-07-15 07:13:31 +00:00
|
|
|
if self.attenuation.is_empty() {
|
2021-07-22 14:53:56 +00:00
|
|
|
self.underlying.fmt(f)
|
2021-07-15 07:13:31 +00:00
|
|
|
} else {
|
2021-07-22 14:53:56 +00:00
|
|
|
write!(f, "⌜{}:{:016x}\\{:?}⌝",
|
|
|
|
self.underlying.mailbox.actor_id,
|
|
|
|
self.underlying.oid(),
|
|
|
|
self.attenuation)
|
2021-07-15 07:13:31 +00:00
|
|
|
}
|
2021-07-08 22:04:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
impl Domain for Cap {}
|
2021-07-06 18:56:36 +00:00
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
impl std::convert::TryFrom<&IOValue> for Cap {
|
2021-07-06 18:56:36 +00:00
|
|
|
type Error = preserves_schema::support::ParseError;
|
|
|
|
fn try_from(_v: &IOValue) -> Result<Self, Self::Error> {
|
2021-07-22 14:53:56 +00:00
|
|
|
panic!("Attempted to serialize Cap via IOValue");
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
impl std::convert::From<&Cap> for IOValue {
|
|
|
|
fn from(_v: &Cap) -> IOValue {
|
2021-07-06 18:56:36 +00:00
|
|
|
panic!("Attempted to deserialize Ref via IOValue");
|
2021-07-03 07:03:52 +00:00
|
|
|
}
|
|
|
|
}
|
2021-07-08 22:04:11 +00:00
|
|
|
|
2021-08-11 21:16:01 +00:00
|
|
|
impl<M> Entity<AnyValue> for Guard<M>
|
2021-07-22 14:53:56 +00:00
|
|
|
where
|
2021-08-11 21:16:01 +00:00
|
|
|
for<'a> &'a M: Into<AnyValue>,
|
|
|
|
for<'a> M: TryFrom<&'a AnyValue>,
|
2021-07-22 14:53:56 +00:00
|
|
|
{
|
2021-08-11 21:16:01 +00:00
|
|
|
fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult {
|
2021-07-22 14:53:56 +00:00
|
|
|
match M::try_from(&a) {
|
|
|
|
Ok(a) => self.underlying.with_entity(|e| e.assert(t, a, h)),
|
|
|
|
Err(_) => Ok(()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
|
|
|
|
self.underlying.with_entity(|e| e.retract(t, h))
|
|
|
|
}
|
2021-08-11 21:16:01 +00:00
|
|
|
fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult {
|
2021-07-22 14:53:56 +00:00
|
|
|
match M::try_from(&m) {
|
|
|
|
Ok(m) => self.underlying.with_entity(|e| e.message(t, m)),
|
|
|
|
Err(_) => Ok(()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult {
|
|
|
|
self.underlying.with_entity(|e| e.sync(t, peer))
|
|
|
|
}
|
|
|
|
fn turn_end(&mut self, t: &mut Activation) -> ActorResult {
|
|
|
|
self.underlying.with_entity(|e| e.turn_end(t))
|
|
|
|
}
|
|
|
|
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
|
|
|
self.underlying.with_entity(|e| e.exit_hook(t, exit_status))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-08 22:04:11 +00:00
|
|
|
#[macro_export]
|
|
|
|
macro_rules! name {
|
|
|
|
() => {tracing::info_span!(actor_id = tracing::field::Empty,
|
|
|
|
task_id = tracing::field::Empty,
|
|
|
|
oid = tracing::field::Empty)};
|
|
|
|
($($item:tt)*) => {tracing::info_span!($($item)*,
|
|
|
|
actor_id = tracing::field::Empty,
|
|
|
|
task_id = tracing::field::Empty,
|
|
|
|
oid = tracing::field::Empty)}
|
|
|
|
}
|