pub use futures::future::BoxFuture; pub use std::future::ready; use super::ActorId; use super::schemas::sturdy; use super::error::Error; use super::error::error; use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; use preserves::value::Domain; use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; use preserves_schema::support::ParseError; use std::boxed::Box; use std::collections::hash_map::HashMap; use std::convert::TryFrom; use std::convert::TryInto; use std::sync::Arc; use std::sync::RwLock; use std::sync::Weak; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; use tokio_util::sync::CancellationToken; use tracing::Instrument; pub use super::schemas::internal_protocol::_Any; pub type Handle = u64; pub type ActorResult = Result<(), Error>; pub type ActorHandle = tokio::task::JoinHandle; pub struct Synced; pub trait Entity: Send + Sync { fn assert(&mut self, _t: &mut Activation, _a: M, _h: Handle) -> ActorResult { Ok(()) } fn retract(&mut self, _t: &mut Activation, _h: Handle) -> ActorResult { Ok(()) } fn message(&mut self, _t: &mut Activation, _m: M) -> ActorResult { Ok(()) } fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { t.message(&peer, Synced); Ok(()) } fn turn_end(&mut self, _t: &mut Activation) -> ActorResult { Ok(()) } fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &Arc) -> ActorResult { Ok(()) } } enum Destination { ImmediateSelf(Action), Remote(Arc, Action), } type OutboundAssertions = Map; pub type Action = Box ActorResult>; pub type PendingEventQueue = Vec; // This is what other implementations call a "Turn", renamed here to // avoid conflicts with schemas::internal_protocol::Turn. pub struct Activation<'activation> { pub actor: &'activation mut Actor, pub debtor: Arc, queues: HashMap, PendingEventQueue)>, immediate_self: PendingEventQueue, } #[derive(Debug)] pub struct Debtor { id: u64, debt: Arc, // notify: Notify, } #[derive(Debug)] pub struct LoanedItem { pub debtor: Arc, pub cost: usize, pub item: T, } enum SystemMessage { Release, Turn(LoanedItem), Crash(Error), } pub struct Mailbox { pub actor_id: ActorId, tx: UnboundedSender, } pub struct Actor { actor_id: ActorId, tx: UnboundedSender, mailbox: Weak, rx: Option>, outbound_assertions: OutboundAssertions, next_task_id: u64, linked_tasks: Map, exit_hooks: Vec, exit_status: Option>, } pub struct Ref { pub mailbox: Arc, pub target: RwLock>>>, } #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Cap { pub underlying: Arc>, pub attenuation: Vec, } pub struct Guard where for<'a> &'a M: Into<_Any>, for<'a> M: TryFrom<&'a _Any>, { underlying: Arc> } //--------------------------------------------------------------------------- static NEXT_DEBTOR_ID: AtomicU64 = AtomicU64::new(4); preserves_schema::support::lazy_static! { pub static ref SYNDICATE_CREDIT: i64 = { let credit = std::env::var("SYNDICATE_CREDIT").unwrap_or("100".to_owned()) .parse::().expect("Valid SYNDICATE_CREDIT environment variable"); tracing::info!("Configured SYNDICATE_CREDIT = {}", credit); credit }; pub static ref DEBTORS: RwLock)>> = RwLock::new(Map::new()); } pub fn start_debt_reporter() { Actor::new().boot(crate::name!("debt-reporter"), |t| Box::pin(async move { t.actor.linked_task(crate::name!("tick"), async move { let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); loop { timer.tick().await; for (id, (name, debt)) in DEBTORS.read().unwrap().iter() { let _enter = name.enter(); tracing::info!(id, debt = debug(debt.load(Ordering::Relaxed))); } } }); Ok(()) })); } impl TryFrom<&_Any> for Synced { type Error = ParseError; fn try_from(value: &_Any) -> Result { if let Some(true) = value.value().as_boolean() { Ok(Synced) } else { Err(ParseError::conformance_error("Synced")) } } } impl From<&Synced> for _Any { fn from(_value: &Synced) -> Self { _Any::new(true) } } impl<'activation> Activation<'activation> { pub fn new(actor: &'activation mut Actor, debtor: Arc) -> Self { Activation { actor, debtor, queues: HashMap::new(), immediate_self: Vec::new(), } } fn immediate_oid(&self, r: &Arc>) { if r.mailbox.actor_id != self.actor.actor_id { panic!("Cannot use immediate_self to send to remote peers"); } } pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = crate::next_handle(); { let r = Arc::clone(r); self.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.assert(t, a, handle)))); } { let r = Arc::clone(r); self.actor.outbound_assertions.insert( handle, Destination::Remote(Arc::clone(&r.mailbox), Box::new( move |t| r.with_entity(|e| e.retract(t, handle))))); } handle } pub fn assert_immediate_self(&mut self, r: &Arc>, a: M) -> Handle { self.immediate_oid(r); let handle = crate::next_handle(); { let r = Arc::clone(r); self.immediate_self.push(Box::new( move |t| r.with_entity(|e| e.assert(t, a, handle)))); } { let r = Arc::clone(r); self.actor.outbound_assertions.insert( handle, Destination::ImmediateSelf(Box::new( move |t| r.with_entity(|e| e.retract(t, handle))))); } handle } pub fn retract(&mut self, handle: Handle) { if let Some(d) = self.actor.outbound_assertions.remove(&handle) { self.retract_known_ref(d) } } fn retract_known_ref(&mut self, d: Destination) { match d { Destination::Remote(mailbox, action) => self.queue_for_mailbox(&mailbox).push(action), Destination::ImmediateSelf(action) => self.immediate_self.push(action), } } pub fn message(&mut self, r: &Arc>, m: M) { let r = Arc::clone(r); self.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.message(t, m)))) } pub fn message_immediate_self(&mut self, r: &Arc>, m: M) { self.immediate_oid(r); let r = Arc::clone(r); self.immediate_self.push(Box::new( move |t| r.with_entity(|e| e.message(t, m)))) } pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); self.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.sync(t, peer)))) } fn queue_for(&mut self, r: &Arc>) -> &mut PendingEventQueue { self.queue_for_mailbox(&r.mailbox) } fn queue_for_mailbox(&mut self, mailbox: &Arc) -> &mut PendingEventQueue { &mut self.queues.entry(mailbox.actor_id) .or_insert((mailbox.tx.clone(), Vec::new())).1 } fn deliver(&mut self) { if !self.immediate_self.is_empty() { panic!("Unprocessed immediate_self events remain at deliver() time"); } for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() { let _ = send_actions(&tx, &self.debtor, turn); } } } impl<'activation> Drop for Activation<'activation> { fn drop(&mut self) { self.deliver() } } impl Debtor { pub fn new(name: tracing::Span) -> Arc { let id = NEXT_DEBTOR_ID.fetch_add(1, Ordering::Relaxed); let debt = Arc::new(AtomicI64::new(0)); DEBTORS.write().unwrap().insert(id, (name, Arc::clone(&debt))); Arc::new(Debtor { id, debt, // notify: Notify::new(), }) } pub fn balance(&self) -> i64 { self.debt.load(Ordering::Relaxed) } pub fn borrow(&self, token_count: usize) { let token_count: i64 = token_count.try_into().expect("manageable token count"); self.debt.fetch_add(token_count, Ordering::Relaxed); } pub fn repay(&self, token_count: usize) { let token_count: i64 = token_count.try_into().expect("manageable token count"); let _old_debt = self.debt.fetch_sub(token_count, Ordering::Relaxed); // if _old_debt - token_count <= *SYNDICATE_CREDIT { // self.notify.notify_one(); // } } pub async fn ensure_clear_funds(&self) { let limit = *SYNDICATE_CREDIT; tokio::task::yield_now().await; while self.balance() > limit { tokio::task::yield_now().await; // self.notify.notified().await; } } } impl Drop for Debtor { fn drop(&mut self) { DEBTORS.write().unwrap().remove(&self.id); } } impl LoanedItem { pub fn new(debtor: &Arc, cost: usize, item: T) -> Self { debtor.borrow(cost); LoanedItem { debtor: Arc::clone(debtor), cost, item } } } impl Drop for LoanedItem { fn drop(&mut self) { self.debtor.repay(self.cost); } } #[must_use] fn send_actions( tx: &UnboundedSender, debtor: &Arc, t: PendingEventQueue, ) -> ActorResult { let token_count = t.len(); tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t))) .map_err(|_| error("Target actor not running", _Any::new(false))) } impl std::fmt::Debug for Mailbox { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "#", self.actor_id) } } impl std::hash::Hash for Mailbox { fn hash(&self, state: &mut H) { self.actor_id.hash(state) } } impl Eq for Mailbox {} impl PartialEq for Mailbox { fn eq(&self, other: &Mailbox) -> bool { self.actor_id == other.actor_id } } impl Ord for Mailbox { fn cmp(&self, other: &Mailbox) -> std::cmp::Ordering { return self.actor_id.cmp(&other.actor_id) } } impl PartialOrd for Mailbox { fn partial_cmp(&self, other: &Mailbox) -> Option { return Some(self.cmp(&other)) } } impl Drop for Mailbox { fn drop(&mut self) { let _ = self.tx.send(SystemMessage::Release); () } } impl Actor { pub fn new() -> Self { let (tx, rx) = unbounded_channel(); let actor_id = crate::next_actor_id(); // tracing::trace!(id = actor_id, "Actor::new"); Actor { actor_id, tx, rx: Some(rx), mailbox: Weak::new(), outbound_assertions: Map::new(), next_task_id: 0, linked_tasks: Map::new(), exit_hooks: Vec::new(), exit_status: None, } } pub fn create_and_start + Send + Sync + 'static>( name: tracing::Span, e: E, ) -> Arc> { let r = Self::create_and_start_inert(name); r.become_entity(e); r } pub fn create_and_start_inert(name: tracing::Span) -> Arc> { let mut ac = Self::new(); let r = ac.create_inert(); ac.start(name); r } pub fn id(&self) -> ActorId { self.actor_id } fn mailbox(&mut self) -> Arc { match self.mailbox.upgrade() { None => { let new_mailbox = Arc::new(Mailbox { actor_id: self.actor_id, tx: self.tx.clone(), }); self.mailbox = Arc::downgrade(&new_mailbox); new_mailbox } Some(m) => m } } pub fn shutdown(&mut self) { let _ = self.tx.send(SystemMessage::Release); () } pub fn create + Send + Sync + 'static>(&mut self, e: E) -> Arc> { let r = self.create_inert(); r.become_entity(e); r } pub fn create_inert(&mut self) -> Arc> { Arc::new(Ref { mailbox: self.mailbox(), target: RwLock::new(None), }) } pub fn boot FnOnce(&'a mut Activation) -> BoxFuture<'a, ActorResult>>( mut self, name: tracing::Span, boot: F, ) -> ActorHandle { name.record("actor_id", &self.id()); tokio::spawn(async move { tracing::trace!("start"); let result = self.run(boot).await; self.exit_status = Some(Arc::new(result.clone())); { let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown"))); for action in std::mem::take(&mut t.actor.exit_hooks) { if let Err(err) = action(&mut t) { tracing::error!(err = debug(err), "error in exit hook"); } } } match &result { Ok(()) => { tracing::trace!("normal stop"); () } Err(e) => tracing::error!("error stop: {}", e), } result }.instrument(name)) } pub fn start(self, name: tracing::Span) -> ActorHandle { self.boot(name, |_ac| Box::pin(ready(Ok(())))) } async fn run FnOnce(&'a mut Activation) -> BoxFuture<'a, ActorResult>>( &mut self, boot: F, ) -> ActorResult { let _id = self.id(); // tracing::trace!(_id, "boot"); boot(&mut Activation::new(self, Debtor::new(crate::name!("boot")))).await?; // tracing::trace!(_id, "run"); loop { match self.rx.as_mut().expect("present rx channel half").recv().await { None => Err(error("Unexpected channel close", _Any::new(false)))?, Some(m) => { let should_stop = self.handle(m).await?; if should_stop { return Ok(()); } } } } } pub fn add_exit_hook(&mut self, r: &Arc>) { let r = Arc::clone(r); self.exit_hooks.push(Box::new(move |t| { let exit_status = Arc::clone(t.actor.exit_status.as_ref().expect("exited")); r.with_entity(|e| e.exit_hook(t, &exit_status)) })) } async fn handle(&mut self, m: SystemMessage) -> Result { match m { SystemMessage::Release => { tracing::trace!("SystemMessage::Release"); Ok(true) } SystemMessage::Turn(mut loaned_item) => { let mut actions = std::mem::take(&mut loaned_item.item); let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor)); loop { for action in actions.into_iter() { action(&mut t)? } actions = std::mem::take(&mut t.immediate_self); if actions.is_empty() { break; } } Ok(false) } SystemMessage::Crash(e) => { tracing::trace!("SystemMessage::Crash({:?})", &e); Err(e)? } } } pub fn linked_task + Send + 'static>( &mut self, name: tracing::Span, boot: F, ) { let mailbox = self.mailbox(); let token = CancellationToken::new(); let task_id = self.next_task_id; self.next_task_id += 1; name.record("task_id", &task_id); { let token = token.clone(); tokio::spawn(async move { tracing::trace!(task_id, "linked task start"); select! { _ = token.cancelled() => { tracing::trace!(task_id, "linked task cancelled"); Ok(()) } result = boot => { match &result { Ok(()) => { tracing::trace!(task_id, "linked task normal stop"); () } Err(e) => { tracing::error!(task_id, "linked task error: {}", e); let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); () } } result } } }.instrument(name)); } self.linked_tasks.insert(task_id, token); } } impl Drop for Actor { fn drop(&mut self) { let mut rx = self.rx.take().expect("present rx channel half during drop"); rx.close(); for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() { token.cancel(); } let to_clear = std::mem::take(&mut self.outbound_assertions); { let mut t = Activation::new(self, Debtor::new(crate::name!("drop"))); for (_handle, r) in to_clear.into_iter() { tracing::trace!(h = debug(&_handle), "retract on termination"); t.retract_known_ref(r); } } tracing::trace!("Actor::drop"); } } #[must_use] pub fn external_event(mailbox: &Arc, debtor: &Arc, action: Action) -> ActorResult { send_actions(&mailbox.tx, debtor, vec![action]) } #[must_use] pub fn external_events(mailbox: &Arc, debtor: &Arc, events: PendingEventQueue) -> ActorResult { send_actions(&mailbox.tx, debtor, events) } impl Ref { pub fn become_entity>(&self, e: E) { let mut g = self.target.write().expect("unpoisoned"); if g.is_some() { panic!("Double initialization of Ref"); } *g = Some(Box::new(e)); } pub fn with_entity) -> R>(&self, f: F) -> R { let mut g = self.target.write().expect("unpoisoned"); f(g.as_mut().expect("initialized").as_mut()) } } impl Ref { pub fn oid(&self) -> usize { std::ptr::addr_of!(*self) as usize } } impl PartialEq for Ref { fn eq(&self, other: &Self) -> bool { self.oid() == other.oid() } } impl Eq for Ref {} impl std::hash::Hash for Ref { fn hash(&self, hash: &mut H) where H: std::hash::Hasher { self.oid().hash(hash) } } impl PartialOrd for Ref { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Ord for Ref { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.oid().cmp(&other.oid()) } } impl std::fmt::Debug for Ref { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "⌜{}:{:016x}⌝", self.mailbox.actor_id, self.oid()) } } impl Cap { pub fn guard(underlying: &Arc>) -> Arc where for<'a> &'a M: Into<_Any>, for<'a> M: TryFrom<&'a _Any>, { Self::new(&Arc::new(Ref { mailbox: Arc::clone(&underlying.mailbox), target: RwLock::new(Some(Box::new(Guard { underlying: underlying.clone() }))), })) } pub fn new(underlying: &Arc>) -> Arc { Arc::new(Cap { underlying: Arc::clone(underlying), attenuation: Vec::new(), }) } pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result, CaveatError> { let mut r = Cap { attenuation: self.attenuation.clone(), .. self.clone() }; r.attenuation.extend(attenuation.check()?); Ok(Arc::new(r)) } pub fn rewrite(&self, mut a: _Any) -> Option<_Any> { for c in &self.attenuation { match c.rewrite(&a) { Some(v) => a = v, None => return None, } } Some(a) } pub fn assert>(&self, t: &mut Activation, m: M) -> Option { self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m)) } pub fn message>(&self, t: &mut Activation, m: M) { if let Some(m) = self.rewrite(m.into()) { t.message(&self.underlying, m) } } } impl std::fmt::Debug for Cap { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { if self.attenuation.is_empty() { self.underlying.fmt(f) } else { write!(f, "⌜{}:{:016x}\\{:?}⌝", self.underlying.mailbox.actor_id, self.underlying.oid(), self.attenuation) } } } impl Domain for Cap {} impl std::convert::TryFrom<&IOValue> for Cap { type Error = preserves_schema::support::ParseError; fn try_from(_v: &IOValue) -> Result { panic!("Attempted to serialize Cap via IOValue"); } } impl std::convert::From<&Cap> for IOValue { fn from(_v: &Cap) -> IOValue { panic!("Attempted to deserialize Ref via IOValue"); } } impl Entity<_Any> for Guard where for<'a> &'a M: Into<_Any>, for<'a> M: TryFrom<&'a _Any>, { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { match M::try_from(&a) { Ok(a) => self.underlying.with_entity(|e| e.assert(t, a, h)), Err(_) => Ok(()), } } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { self.underlying.with_entity(|e| e.retract(t, h)) } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { match M::try_from(&m) { Ok(m) => self.underlying.with_entity(|e| e.message(t, m)), Err(_) => Ok(()), } } fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { self.underlying.with_entity(|e| e.sync(t, peer)) } fn turn_end(&mut self, t: &mut Activation) -> ActorResult { self.underlying.with_entity(|e| e.turn_end(t)) } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { self.underlying.with_entity(|e| e.exit_hook(t, exit_status)) } } #[macro_export] macro_rules! name { () => {tracing::info_span!(actor_id = tracing::field::Empty, task_id = tracing::field::Empty, oid = tracing::field::Empty)}; ($($item:tt)*) => {tracing::info_span!($($item)*, actor_id = tracing::field::Empty, task_id = tracing::field::Empty, oid = tracing::field::Empty)} }