diff --git a/Cargo.lock b/Cargo.lock index e1f515d..e49f82a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -883,7 +883,7 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "preserves" -version = "0.17.2" +version = "0.19.0" dependencies = [ "base64", "dtoa", @@ -895,7 +895,7 @@ dependencies = [ [[package]] name = "preserves-schema" -version = "0.5.0" +version = "0.7.0" dependencies = [ "convert_case", "glob", diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index f9869ce..b5674ac 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -170,7 +170,7 @@ async fn main() -> Result<(), Box> { ds.assert(t, &Observe { pattern: { - let recv_label = Value::symbol(recv_label).wrap(); + let recv_label = AnyValue::symbol(recv_label); syndicate_macros::pattern!("<=recv_label $ $>") }, observer: Arc::clone(&consumer), @@ -194,7 +194,7 @@ async fn main() -> Result<(), Box> { let action_count = c.action_count; let account = Arc::clone(t.account()); t.linked_task(syndicate::name!("boot-ping"), async move { - let padding: AnyValue = Value::ByteString(vec![0; bytes_padding]).wrap(); + let padding = AnyValue::bytestring(vec![0; bytes_padding]); for _ in 0..turn_count { let mut events: PendingEventQueue = vec![]; let current_rec = simple_record2(send_label, diff --git a/syndicate-server/src/gatekeeper.rs b/syndicate-server/src/gatekeeper.rs index b65bc34..52b35dd 100644 --- a/syndicate-server/src/gatekeeper.rs +++ b/syndicate-server/src/gatekeeper.rs @@ -8,9 +8,9 @@ use syndicate::value::NestedValue; pub fn bind( t: &mut Activation, + ds: &Arc, oid: syndicate::schemas::sturdy::_Any, key: [u8; 16], - ds: &Arc, target: Arc, ) { let sr = sturdy::SturdyRef::mint(oid.clone(), &key); diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 4133e60..3ea7436 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -60,11 +60,18 @@ async fn main() -> Result<(), Box> { Actor::new().boot(tracing::Span::current(), move |t| { let root_ds = Cap::new(&t.create(Dataspace::new())); + let server_config_ds = Cap::new(&t.create(Dataspace::new())); - gatekeeper::bind(t, AnyValue::new("syndicate"), [0; 16], &root_ds, Arc::clone(&root_ds)); + gatekeeper::bind(t, &root_ds, AnyValue::new("syndicate"), [0; 16], + Arc::clone(&root_ds)); + gatekeeper::bind(t, &root_ds, AnyValue::new("server-config"), [0; 16], + Arc::clone(&server_config_ds)); + services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds)); if config.debt_reporter { - services::debt_reporter::spawn(t); + server_config_ds.assert(t, &syndicate::schemas::service::RequireService { + service_name: AnyValue::symbol("debt-reporter") + }); } if config.inferior { diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index 139e442..bc4e28e 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -1,18 +1,44 @@ -use syndicate::actor::*; +use std::sync::Arc; -pub fn spawn(t: &mut Activation) { - t.spawn(syndicate::name!("debt-reporter"), |t| { - t.linked_task(syndicate::name!("tick"), async { - let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); - loop { - timer.tick().await; - for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() { - let _enter = name.enter(); - tracing::info!(id, debt = debug( - debt.load(std::sync::atomic::Ordering::Relaxed))); - } - } +use syndicate::actor::*; +use syndicate::during::entity; +use syndicate::schemas::dataspace::Observe; +use syndicate::value::NestedValue; + +const SERVICE_NAME: &str = "debt-reporter"; + +pub fn on_demand(t: &mut Activation, ds: Arc) { + t.spawn(syndicate::name!("on_demand", service = SERVICE_NAME), move |t| { + let monitor = entity(()) + .on_asserted_facet(|_, t, _| { + t.spawn_link(syndicate::name!(SERVICE_NAME), run); + Ok(()) + }) + .create_cap(t); + let service_sym = AnyValue::symbol(SERVICE_NAME); + ds.assert(t, &Observe { + pattern: syndicate_macros::pattern!(""), + observer: monitor, }); Ok(()) - }); + }) +} + +// pub fn spawn(t: &mut Activation) { +// t.spawn(syndicate::name!(SERVICE_NAME), run); +// } + +fn run(t: &mut Activation) -> ActorResult { + t.linked_task(syndicate::name!("tick"), async { + let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); + loop { + timer.tick().await; + for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() { + let _enter = name.enter(); + tracing::info!(id, debt = debug( + debt.load(std::sync::atomic::Ordering::Relaxed))); + } + } + }); + Ok(()) } diff --git a/syndicate/Cargo.toml b/syndicate/Cargo.toml index 94c6018..f0d772e 100644 --- a/syndicate/Cargo.toml +++ b/syndicate/Cargo.toml @@ -13,11 +13,11 @@ license = "Apache-2.0" vendored-openssl = ["openssl/vendored"] [build-dependencies] -preserves-schema = "0.5.0" +preserves-schema = "0.7.0" [dependencies] -preserves = "0.17.2" -preserves-schema = "0.5.0" +preserves = "0.19.0" +preserves-schema = "0.7.0" tokio = { version = "1.10.0", features = ["io-util", "macros", "sync", "net", "rt", "rt-multi-thread", "time"] } tokio-util = "0.6.7" diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index 43b0231..2619897 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -104,7 +104,7 @@ pub fn bench_pub(c: &mut Criterion) { ds.assert(t, &Observe { pattern: p::Pattern::DBind(Box::new(p::DBind { pattern: p::Pattern::DLit(Box::new(p::DLit { - value: Value::symbol("consumer").wrap(), + value: AnyValue::symbol("consumer"), })), })), observer: shutdown, @@ -126,7 +126,7 @@ pub fn bench_pub(c: &mut Criterion) { ds.assert(t, &Observe { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { ctor: Box::new(p::CRec { - label: Value::symbol("Says").wrap(), + label: AnyValue::symbol("Says"), arity: 2.into(), }), members: Map::from_iter(vec![ diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 94eb5a4..a68bdc5 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -393,6 +393,10 @@ where underlying: Arc> } +/// Simple entity that stops its containing facet when any assertion it receives is +/// subsequently retracted. +pub struct StopOnRetract; + //--------------------------------------------------------------------------- const BUMP_AMOUNT: u8 = 10; @@ -578,13 +582,7 @@ impl<'activation> Activation<'activation> { pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); if let Some(f) = self.active_facet() { - { - let r = Arc::clone(r); - f.cleanup_actions.insert( - handle, - CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( - move |t| t.with_entity(&r, |t, e| e.retract(t, handle))))); - } + f.insert_retract_cleanup_action(&r, handle); drop(f); { let r = Arc::clone(r); @@ -630,6 +628,13 @@ impl<'activation> Activation<'activation> { handle } + fn half_link(&mut self, t_other: &mut Activation) { + let entity_ref = t_other.create::(StopOnRetract); + let handle = next_handle(); + self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle); + t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); + } + /// Core API: retract a previously-established assertion. pub fn retract(&mut self, handle: Handle) { if let Some(f) = self.active_facet() { @@ -782,9 +787,28 @@ impl<'activation> Activation<'activation> { })); } + /// Schedule the creation of a new actor when the Activation commits. + /// + /// The new actor will be "linked" to the active facet: if the new actor terminates, the + /// active facet is stopped, and if the active facet stops, the new actor's root facet is + /// stopped. + pub fn spawn_link ActorResult>( + &mut self, + name: tracing::Span, + boot: F, + ) { + let facet_id = self.facet.facet_id; + self.enqueue_for_myself_at_commit(Box::new(move |t| { + t.with_facet(true, facet_id, move |t| { + Actor::new().link(t).boot(name, boot); + Ok(()) + }) + })); + } + /// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's /// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets], - pub fn facet ActorResult>( + pub fn facet ActorResult>( &mut self, boot: F, ) -> Result { @@ -847,9 +871,13 @@ impl<'activation> Activation<'activation> { } fn stop_if_inert(&mut self) { - if self.state.facet_exists_and_is_inert(self.facet.facet_id) { - self.stop_facet(self.facet.facet_id, None); - } + let facet_id = self.facet.facet_id; + self.enqueue_for_myself_at_commit(Box::new(move |t| { + if t.state.facet_exists_and_is_inert(facet_id) { + t.stop_facet(facet_id, None); + } + Ok(()) + })) } fn _terminate_facet(&mut self, facet_id: FacetId, orderly: bool) -> ActorResult { @@ -1082,6 +1110,18 @@ impl Actor { } } + fn link(self, t_parent: &mut Activation) -> Self { + if t_parent.active_facet().is_none() { + panic!("No active facet when calling spawn_link"); + } + self.ac_ref.root_facet_ref().activate(Account::new(crate::name!("link")), |t_child| { + t_parent.half_link(t_child); + t_child.half_link(t_parent); + Ok(()) + }).expect("Failed during link"); + self + } + /// Start the actor's mainloop. Takes ownership of `self`. The /// `name` is used as context for any log messages emitted by the /// actor. The `boot` function is called in the actor's context, @@ -1111,12 +1151,7 @@ impl Actor { &mut self, boot: F, ) -> () { - let root = self.ac_ref.access(|s| match s.expect("New actor missing its state") { - ActorState::Terminated { .. } => panic!("New actor unexpectedly in terminated state"), - ActorState::Running(ra) => ra.root, // what a lot of work to get this one number - }); - - let root_facet_ref = self.ac_ref.facet_ref(root); + let root_facet_ref = self.ac_ref.root_facet_ref(); let terminate = |result: ActorResult| { let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")), @@ -1185,6 +1220,14 @@ impl Facet { } } } + + fn insert_retract_cleanup_action(&mut self, r: &Arc>, handle: Handle) { + let r = Arc::clone(r); + self.cleanup_actions.insert( + handle, + CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( + move |t| t.with_entity(&r, |t, e| e.retract(t, handle))))); + } } fn panicked_err() -> Option { @@ -1221,6 +1264,17 @@ impl ActorRef { facet_id, } } + + fn root_facet_id(&self) -> FacetId { + self.access(|s| match s.expect("Actor missing its state") { + ActorState::Terminated { .. } => panic!("Actor unexpectedly in terminated state"), + ActorState::Running(ra) => ra.root, // what a lot of work to get this one number + }) + } + + fn root_facet_ref(&self) -> FacetRef { + self.facet_ref(self.root_facet_id()) + } } impl RunningActor { @@ -1494,6 +1548,13 @@ where } } +impl Entity for StopOnRetract { + fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult { + t.stop(); + Ok(()) + } +} + /// A convenient Syndicate-enhanced variation on /// [`tracing::info_span`]. /// diff --git a/syndicate/src/during.rs b/syndicate/src/during.rs index 911c3eb..a5f9773 100644 --- a/syndicate/src/during.rs +++ b/syndicate/src/during.rs @@ -85,6 +85,22 @@ where } } + pub fn on_asserted_facet( + self, + mut assertion_handler: Fa1, + ) -> DuringEntity DuringResult>, Fm> + where + Fa1: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult + { + self.on_asserted(Box::new(move |state, t, a| { + let facet_id = t.facet(|t| assertion_handler(state, t, a))?; + Ok(Some(Box::new(move |_state, t| { + t.stop_facet(facet_id, None); + Ok(()) + }))) + })) + } + pub fn on_message(self, message_handler: Fm1) -> DuringEntity where Fm1: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult, diff --git a/syndicate/src/error.rs b/syndicate/src/error.rs index e13c0e6..16f674c 100644 --- a/syndicate/src/error.rs +++ b/syndicate/src/error.rs @@ -36,12 +36,12 @@ pub fn error(message: &str, detail: Detail) -> Error where _Any: From) -> _Any { match result { Ok(()) => { - let mut r = Value::record(Value::symbol("Ok").wrap(), 1); - r.fields_vec_mut().push(Value::record(Value::symbol("tuple").wrap(), 0).finish().wrap()); + let mut r = Value::record(_Any::symbol("Ok"), 1); + r.fields_vec_mut().push(Value::record(_Any::symbol("tuple"), 0).finish().wrap()); r.finish().wrap() } Err(e) => { - let mut r = Value::record(Value::symbol("Err").wrap(), 1); + let mut r = Value::record(_Any::symbol("Err"), 1); r.fields_vec_mut().push((&e).into()); r.finish().wrap() }