Supervision; delayed actions; better tracing (incl `M: Debug`); linked task release

This commit is contained in:
Tony Garnock-Jones 2021-08-30 23:41:51 +02:00
parent 5861f91971
commit d8c3e37d17
6 changed files with 255 additions and 18 deletions

View File

@ -64,7 +64,6 @@ fn assertions_at_existing_file(t: &mut Activation, ds: &Arc<Cap>, path: &PathBuf
}
for value in values.into_iter() {
if let Some(handle) = ds.assert(t, value.clone()) {
tracing::debug!("asserted {:?} -> {:?}", value, handle);
handles.insert(handle);
}
}
@ -190,7 +189,6 @@ fn run(t: &mut Activation, ds: Arc<Cap>, captures: AnyValue) -> ActorResult {
}
}
for h in to_retract.into_iter() {
tracing::debug!("retract {:?}", h);
t.retract(h);
}
Ok(())

View File

@ -5,6 +5,7 @@ use syndicate::actor::*;
use syndicate::convert::*;
use syndicate::during::entity;
use syndicate::schemas::dataspace::Observe;
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use syndicate::value::NestedValue;
use tokio::net::TcpListener;
@ -17,11 +18,14 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>, gateway: Arc<Cap>) {
let monitor = entity(())
.on_asserted_facet({
let ds = Arc::clone(&ds);
move |_, t, captures| {
move |_, t, captures: AnyValue| {
let ds = Arc::clone(&ds);
let gateway = Arc::clone(&gateway);
t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?captures),
|t| run(t, ds, gateway, captures));
Supervisor::start(
t,
syndicate::name!(parent: None, "relay", addr = ?captures),
SupervisorConfiguration::default(),
move |t| run(t, Arc::clone(&ds), Arc::clone(&gateway), captures.clone()));
Ok(())
}
})

View File

@ -19,7 +19,7 @@ preserves-schema = "0.8.0"
preserves = "0.20.0"
preserves-schema = "0.8.0"
tokio = { version = "1.10.0", features = ["io-util", "macros", "rt", "rt-multi-thread"] }
tokio = { version = "1.10.0", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] }
tokio-util = "0.6.7"
bytes = "1.0.1"

View File

@ -29,6 +29,7 @@ use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::Weak;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::time;
use tokio::select;
use tokio::sync::Notify;
@ -69,8 +70,12 @@ pub type ActorResult = Result<(), Error>;
/// the actor's mainloop task.
pub type ActorHandle = tokio::task::JoinHandle<ActorResult>;
/// The type of the "disarm" function returned from [`Activation::prevent_inert_check`].
pub type DisarmFn = Box<dyn Send + FnOnce()>;
/// A small protocol for indicating successful synchronisation with
/// some peer; see [Entity::sync].
#[derive(Debug)]
pub struct Synced;
/// The core metaprotocol implemented by every object.
@ -594,15 +599,19 @@ impl<'activation> Activation<'activation> {
/// Core API: assert `a` at recipient `r`.
///
/// Returns the [`Handle`] for the new assertion.
pub fn assert<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
pub fn assert<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
let handle = next_handle();
if let Some(f) = self.active_facet() {
tracing::trace!(?r, ?handle, ?a, "assert");
f.insert_retract_cleanup_action(&r, handle);
drop(f);
{
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle))));
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, ?a, "asserted");
e.assert(t, a, handle)
})));
}
}
handle
@ -622,22 +631,29 @@ impl<'activation> Activation<'activation> {
/// # Panics
///
/// Panics if `r` is not part of the active actor.
pub fn assert_for_myself<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
pub fn assert_for_myself<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
self.immediate_oid(r);
let handle = next_handle();
if let Some(f) = self.active_facet() {
tracing::trace!(?r, ?handle, ?a, "assert_for_myself");
{
let r = Arc::clone(r);
f.cleanup_actions.insert(
handle,
CleanupAction::ForMyself(Box::new(
move |t| t.with_entity(&r, |t, e| e.retract(t, handle)))));
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted");
e.retract(t, handle)
}))));
}
drop(f);
{
let r = Arc::clone(r);
self.pending.for_myself.push(Box::new(
move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle))));
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, ?a, "asserted");
e.assert(t, a, handle)
})));
}
}
handle
@ -646,6 +662,7 @@ impl<'activation> Activation<'activation> {
fn half_link(&mut self, t_other: &mut Activation) {
let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract);
let handle = next_handle();
tracing::trace!(?handle, ?entity_ref, "half_link");
self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle);
t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap();
}
@ -660,10 +677,14 @@ impl<'activation> Activation<'activation> {
}
/// Core API: send message `m` to recipient `r`.
pub fn message<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, m: M) {
pub fn message<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, m: M) {
tracing::trace!(?r, ?m, "message");
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
move |t| t.with_entity(&r, |t, e| e.message(t, m))))
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?m, "delivered");
e.message(t, m)
})))
}
/// Core API: send message `m` to recipient `r`, which must be a
@ -766,6 +787,7 @@ impl<'activation> Activation<'activation> {
boot: F,
) {
let mailbox = self.state.mailbox();
let facet = self.facet.clone();
if let Some(f) = self.active_facet() {
let token = CancellationToken::new();
let task_id = NEXT_TASK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed);
@ -774,7 +796,7 @@ impl<'activation> Activation<'activation> {
let token = token.clone();
tokio::spawn(async move {
tracing::trace!(task_id, "linked task start");
select! {
let result = select! {
_ = token.cancelled() => {
tracing::trace!(task_id, "linked task cancelled");
Ok(())
@ -793,13 +815,40 @@ impl<'activation> Activation<'activation> {
}
result
}
}
};
let _ = facet.activate(
Account::new(crate::name!("release_linked_task")), |t| {
if let Some(f) = t.active_facet() {
tracing::trace!(task_id, "cancellation token removed");
f.linked_tasks.remove(&task_id);
}
Ok(())
});
result
}.instrument(name));
}
f.linked_tasks.insert(task_id, token);
}
}
/// Executes the given action after the given duration has elapsed (so long as the active
/// facet still exists at that time).
pub fn after(&mut self, duration: time::Duration, a: Action) {
self.at(time::Instant::now() + duration, a)
}
/// Executes the given action at the given instant (so long as the active facet still
/// exists at that time).
pub fn at<I: Into<tokio::time::Instant>>(&mut self, instant: I, a: Action) {
let facet = self.facet.clone();
let account = Arc::clone(self.account());
let instant = instant.into();
self.linked_task(crate::name!("Activation::at"), async move {
tokio::time::sleep_until(instant.into()).await;
facet.activate(account, a)
});
}
fn enqueue_for_myself_at_commit(&mut self, action: Action) {
let mailbox = self.state.mailbox();
self.pending.queue_for_mailbox(&mailbox).push(action);
@ -868,7 +917,7 @@ impl<'activation> Activation<'activation> {
/// its life, the Dataspace actor will have no outbound assertions, no child facets, and no
/// linked tasks, so the only way to prevent it from being prematurely garbage collected is
/// to use `prevent_inert_check` in its boot function.
pub fn prevent_inert_check(&mut self) -> Box<dyn FnOnce()> {
pub fn prevent_inert_check(&mut self) -> DisarmFn {
if let Some(f) = self.active_facet() {
Box::new(f.prevent_inert_check())
} else {
@ -986,6 +1035,7 @@ impl EventBuffer {
}
fn deliver(&mut self) {
tracing::trace!("EventBuffer::deliver");
if !self.for_myself.is_empty() {
panic!("Unprocessed for_myself events remain at deliver() time");
}
@ -1223,7 +1273,8 @@ impl Actor {
if r.is_err() { return; }
}
SystemMessage::Crash(e) => {
tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Crash({:?})", &e);
tracing::trace!(actor_id = ?self.ac_ref.actor_id,
"SystemMessage::Crash({:?})", &e);
return terminate(Err(e));
}
}
@ -1264,7 +1315,10 @@ impl Facet {
self.cleanup_actions.insert(
handle,
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
move |t| t.with_entity(&r, |t, e| e.retract(t, handle)))));
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted");
e.retract(t, handle)
}))));
}
}

View File

@ -16,6 +16,7 @@ pub mod error;
pub mod pattern;
pub mod relay;
pub mod rewrite;
pub mod supervise;
pub mod schemas {
//! Auto-generated codecs for [Syndicate protocol

180
syndicate/src/supervise.rs Normal file
View File

@ -0,0 +1,180 @@
//! Extremely simple single-actor supervision. Vastly simplified compared to the available
//! options in [Erlang/OTP](https://erlang.org/doc/man/supervisor.html).
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use crate::actor::*;
enum Protocol<Boot: 'static + Send + FnMut(&mut Activation) -> ActorResult> {
SuperviseeStarted, // assertion
BootFunction(Boot), // message
Retry, // message
}
#[derive(Debug)]
pub struct SupervisorConfiguration {
pub intensity: usize,
pub period: Duration,
pub pause_time: Duration,
pub sleep_time: Duration,
}
pub struct Supervisor<Boot: 'static + Send + FnMut(&mut Activation) -> ActorResult> {
self_ref: Arc<Ref<Protocol<Boot>>>,
name: tracing::Span,
config: SupervisorConfiguration,
boot_fn: Option<Boot>,
restarts: VecDeque<Instant>,
supervisee: Supervisee,
}
#[derive(Debug, PartialEq, Eq)]
enum Supervisee {
NotRunning,
Booting,
Running,
Recovering,
}
impl<Boot: 'static + Send + FnMut(&mut Activation) -> ActorResult> std::fmt::Debug for Protocol<Boot> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Protocol::SuperviseeStarted => write!(f, "Protocol::SuperviseeStarted"),
Protocol::BootFunction(_) => write!(f, "Protocol::BootFunction(_)"),
Protocol::Retry => write!(f, "Protocol::Retry"),
}
}
}
impl Default for SupervisorConfiguration {
fn default() -> Self {
Self {
intensity: 1,
period: Duration::from_secs(5),
pause_time: Duration::from_millis(200),
sleep_time: Duration::from_secs(10),
}
}
}
impl<Boot: 'static + Send + FnMut(&mut Activation) -> ActorResult>
Entity<Protocol<Boot>> for Supervisor<Boot>
{
fn assert(&mut self, _t: &mut Activation, m: Protocol<Boot>, _h: Handle) -> ActorResult {
match m {
Protocol::SuperviseeStarted => self.enter_state(Supervisee::Booting),
_ => (),
}
Ok(())
}
fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult {
self.enter_state(Supervisee::Recovering);
let now = Instant::now();
let oldest_to_keep = now - self.config.period;
self.restarts.push_back(now);
while let Some(stamp) = self.restarts.front() {
if stamp < &oldest_to_keep {
self.restarts.pop_front();
} else {
break;
}
}
let self_ref = Arc::clone(&self.self_ref);
let wait_time = if self.restarts.len() > self.config.intensity {
self.config.sleep_time
} else {
self.config.pause_time
};
t.after(wait_time, Box::new(move |t| {
t.message(&self_ref, Protocol::Retry);
Ok(())
}));
Ok(())
}
fn message(&mut self, t: &mut Activation, m: Protocol<Boot>) -> ActorResult {
match m {
Protocol::BootFunction(b) => {
self.enter_state(Supervisee::Running);
self.boot_fn = Some(b);
Ok(())
}
Protocol::Retry => {
self.enter_state(Supervisee::NotRunning);
self.ensure_started(t)
}
_ => Ok(())
}
}
}
impl<Boot: 'static + Send + FnMut(&mut Activation) -> ActorResult> Supervisor<Boot> {
pub fn start(
t: &mut Activation,
name: tracing::Span,
config: SupervisorConfiguration,
boot_fn: Boot,
) {
tracing::trace!(?config);
let self_ref = t.create_inert();
let mut supervisor = Supervisor {
self_ref: Arc::clone(&self_ref),
name,
config,
boot_fn: Some(boot_fn),
restarts: VecDeque::new(),
supervisee: Supervisee::NotRunning,
};
// In cases where we are the only Entity in our Facet, and our
// supervisee terminates, we will often be "inert" until we
// can restart it. So we prevent_inert_check to signal to the
// system that there's something going on for that moment of
// time between the supervisee terminating and our responding
// to it.
let _ = t.prevent_inert_check();
supervisor.ensure_started(t).unwrap();
self_ref.become_entity(supervisor);
}
fn enter_state(&mut self, supervisee: Supervisee) {
let _entry = self.name.enter();
tracing::info!("{:?} --> {:?}", self.supervisee, supervisee);
self.supervisee = supervisee;
}
fn ensure_started(&mut self, t: &mut Activation) -> ActorResult {
match self.boot_fn.take() {
None =>
tracing::error!("Cannot restart supervisee, because it panicked at startup"),
Some(mut boot_fn) => {
let self_ref = Arc::clone(&self.self_ref);
t.facet(|t: &mut Activation| {
t.assert(&self.self_ref, Protocol::SuperviseeStarted);
t.spawn_link(crate::name!(parent: &self.name, "supervisee"), move |t| {
match boot_fn(t) {
Ok(()) => {
t.message(&self_ref, Protocol::BootFunction(boot_fn));
Ok(())
}
Err(e) => {
t.clear();
t.message(&self_ref, Protocol::BootFunction(boot_fn));
t.deliver();
Err(e)
}
}
});
Ok(())
})?;
}
}
Ok(())
}
}