spawn_link; reactive debt_reporter service startup
This commit is contained in:
parent
a252cfdfdf
commit
738ac3163a
|
@ -883,7 +883,7 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "preserves"
|
name = "preserves"
|
||||||
version = "0.17.2"
|
version = "0.19.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"dtoa",
|
"dtoa",
|
||||||
|
@ -895,7 +895,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "preserves-schema"
|
name = "preserves-schema"
|
||||||
version = "0.5.0"
|
version = "0.7.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"convert_case",
|
"convert_case",
|
||||||
"glob",
|
"glob",
|
||||||
|
|
|
@ -170,7 +170,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
ds.assert(t, &Observe {
|
ds.assert(t, &Observe {
|
||||||
pattern: {
|
pattern: {
|
||||||
let recv_label = Value::symbol(recv_label).wrap();
|
let recv_label = AnyValue::symbol(recv_label);
|
||||||
syndicate_macros::pattern!("<=recv_label $ $>")
|
syndicate_macros::pattern!("<=recv_label $ $>")
|
||||||
},
|
},
|
||||||
observer: Arc::clone(&consumer),
|
observer: Arc::clone(&consumer),
|
||||||
|
@ -194,7 +194,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let action_count = c.action_count;
|
let action_count = c.action_count;
|
||||||
let account = Arc::clone(t.account());
|
let account = Arc::clone(t.account());
|
||||||
t.linked_task(syndicate::name!("boot-ping"), async move {
|
t.linked_task(syndicate::name!("boot-ping"), async move {
|
||||||
let padding: AnyValue = Value::ByteString(vec![0; bytes_padding]).wrap();
|
let padding = AnyValue::bytestring(vec![0; bytes_padding]);
|
||||||
for _ in 0..turn_count {
|
for _ in 0..turn_count {
|
||||||
let mut events: PendingEventQueue = vec![];
|
let mut events: PendingEventQueue = vec![];
|
||||||
let current_rec = simple_record2(send_label,
|
let current_rec = simple_record2(send_label,
|
||||||
|
|
|
@ -8,9 +8,9 @@ use syndicate::value::NestedValue;
|
||||||
|
|
||||||
pub fn bind(
|
pub fn bind(
|
||||||
t: &mut Activation,
|
t: &mut Activation,
|
||||||
|
ds: &Arc<Cap>,
|
||||||
oid: syndicate::schemas::sturdy::_Any,
|
oid: syndicate::schemas::sturdy::_Any,
|
||||||
key: [u8; 16],
|
key: [u8; 16],
|
||||||
ds: &Arc<Cap>,
|
|
||||||
target: Arc<Cap>,
|
target: Arc<Cap>,
|
||||||
) {
|
) {
|
||||||
let sr = sturdy::SturdyRef::mint(oid.clone(), &key);
|
let sr = sturdy::SturdyRef::mint(oid.clone(), &key);
|
||||||
|
|
|
@ -60,11 +60,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
Actor::new().boot(tracing::Span::current(), move |t| {
|
Actor::new().boot(tracing::Span::current(), move |t| {
|
||||||
let root_ds = Cap::new(&t.create(Dataspace::new()));
|
let root_ds = Cap::new(&t.create(Dataspace::new()));
|
||||||
|
let server_config_ds = Cap::new(&t.create(Dataspace::new()));
|
||||||
|
|
||||||
gatekeeper::bind(t, AnyValue::new("syndicate"), [0; 16], &root_ds, Arc::clone(&root_ds));
|
gatekeeper::bind(t, &root_ds, AnyValue::new("syndicate"), [0; 16],
|
||||||
|
Arc::clone(&root_ds));
|
||||||
|
gatekeeper::bind(t, &root_ds, AnyValue::new("server-config"), [0; 16],
|
||||||
|
Arc::clone(&server_config_ds));
|
||||||
|
|
||||||
|
services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds));
|
||||||
if config.debt_reporter {
|
if config.debt_reporter {
|
||||||
services::debt_reporter::spawn(t);
|
server_config_ds.assert(t, &syndicate::schemas::service::RequireService {
|
||||||
|
service_name: AnyValue::symbol("debt-reporter")
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.inferior {
|
if config.inferior {
|
||||||
|
|
|
@ -1,18 +1,44 @@
|
||||||
use syndicate::actor::*;
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub fn spawn(t: &mut Activation) {
|
use syndicate::actor::*;
|
||||||
t.spawn(syndicate::name!("debt-reporter"), |t| {
|
use syndicate::during::entity;
|
||||||
t.linked_task(syndicate::name!("tick"), async {
|
use syndicate::schemas::dataspace::Observe;
|
||||||
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
|
use syndicate::value::NestedValue;
|
||||||
loop {
|
|
||||||
timer.tick().await;
|
const SERVICE_NAME: &str = "debt-reporter";
|
||||||
for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() {
|
|
||||||
let _enter = name.enter();
|
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
|
||||||
tracing::info!(id, debt = debug(
|
t.spawn(syndicate::name!("on_demand", service = SERVICE_NAME), move |t| {
|
||||||
debt.load(std::sync::atomic::Ordering::Relaxed)));
|
let monitor = entity(())
|
||||||
}
|
.on_asserted_facet(|_, t, _| {
|
||||||
}
|
t.spawn_link(syndicate::name!(SERVICE_NAME), run);
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.create_cap(t);
|
||||||
|
let service_sym = AnyValue::symbol(SERVICE_NAME);
|
||||||
|
ds.assert(t, &Observe {
|
||||||
|
pattern: syndicate_macros::pattern!("<require-service =service_sym>"),
|
||||||
|
observer: monitor,
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// pub fn spawn(t: &mut Activation) {
|
||||||
|
// t.spawn(syndicate::name!(SERVICE_NAME), run);
|
||||||
|
// }
|
||||||
|
|
||||||
|
fn run(t: &mut Activation) -> ActorResult {
|
||||||
|
t.linked_task(syndicate::name!("tick"), async {
|
||||||
|
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
|
||||||
|
loop {
|
||||||
|
timer.tick().await;
|
||||||
|
for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() {
|
||||||
|
let _enter = name.enter();
|
||||||
|
tracing::info!(id, debt = debug(
|
||||||
|
debt.load(std::sync::atomic::Ordering::Relaxed)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,11 +13,11 @@ license = "Apache-2.0"
|
||||||
vendored-openssl = ["openssl/vendored"]
|
vendored-openssl = ["openssl/vendored"]
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
preserves-schema = "0.5.0"
|
preserves-schema = "0.7.0"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
preserves = "0.17.2"
|
preserves = "0.19.0"
|
||||||
preserves-schema = "0.5.0"
|
preserves-schema = "0.7.0"
|
||||||
|
|
||||||
tokio = { version = "1.10.0", features = ["io-util", "macros", "sync", "net", "rt", "rt-multi-thread", "time"] }
|
tokio = { version = "1.10.0", features = ["io-util", "macros", "sync", "net", "rt", "rt-multi-thread", "time"] }
|
||||||
tokio-util = "0.6.7"
|
tokio-util = "0.6.7"
|
||||||
|
|
|
@ -104,7 +104,7 @@ pub fn bench_pub(c: &mut Criterion) {
|
||||||
ds.assert(t, &Observe {
|
ds.assert(t, &Observe {
|
||||||
pattern: p::Pattern::DBind(Box::new(p::DBind {
|
pattern: p::Pattern::DBind(Box::new(p::DBind {
|
||||||
pattern: p::Pattern::DLit(Box::new(p::DLit {
|
pattern: p::Pattern::DLit(Box::new(p::DLit {
|
||||||
value: Value::symbol("consumer").wrap(),
|
value: AnyValue::symbol("consumer"),
|
||||||
})),
|
})),
|
||||||
})),
|
})),
|
||||||
observer: shutdown,
|
observer: shutdown,
|
||||||
|
@ -126,7 +126,7 @@ pub fn bench_pub(c: &mut Criterion) {
|
||||||
ds.assert(t, &Observe {
|
ds.assert(t, &Observe {
|
||||||
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
|
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
|
||||||
ctor: Box::new(p::CRec {
|
ctor: Box::new(p::CRec {
|
||||||
label: Value::symbol("Says").wrap(),
|
label: AnyValue::symbol("Says"),
|
||||||
arity: 2.into(),
|
arity: 2.into(),
|
||||||
}),
|
}),
|
||||||
members: Map::from_iter(vec![
|
members: Map::from_iter(vec![
|
||||||
|
|
|
@ -393,6 +393,10 @@ where
|
||||||
underlying: Arc<Ref<M>>
|
underlying: Arc<Ref<M>>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Simple entity that stops its containing facet when any assertion it receives is
|
||||||
|
/// subsequently retracted.
|
||||||
|
pub struct StopOnRetract;
|
||||||
|
|
||||||
//---------------------------------------------------------------------------
|
//---------------------------------------------------------------------------
|
||||||
|
|
||||||
const BUMP_AMOUNT: u8 = 10;
|
const BUMP_AMOUNT: u8 = 10;
|
||||||
|
@ -578,13 +582,7 @@ impl<'activation> Activation<'activation> {
|
||||||
pub fn assert<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
|
pub fn assert<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
|
||||||
let handle = next_handle();
|
let handle = next_handle();
|
||||||
if let Some(f) = self.active_facet() {
|
if let Some(f) = self.active_facet() {
|
||||||
{
|
f.insert_retract_cleanup_action(&r, handle);
|
||||||
let r = Arc::clone(r);
|
|
||||||
f.cleanup_actions.insert(
|
|
||||||
handle,
|
|
||||||
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
|
|
||||||
move |t| t.with_entity(&r, |t, e| e.retract(t, handle)))));
|
|
||||||
}
|
|
||||||
drop(f);
|
drop(f);
|
||||||
{
|
{
|
||||||
let r = Arc::clone(r);
|
let r = Arc::clone(r);
|
||||||
|
@ -630,6 +628,13 @@ impl<'activation> Activation<'activation> {
|
||||||
handle
|
handle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn half_link(&mut self, t_other: &mut Activation) {
|
||||||
|
let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract);
|
||||||
|
let handle = next_handle();
|
||||||
|
self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle);
|
||||||
|
t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
/// Core API: retract a previously-established assertion.
|
/// Core API: retract a previously-established assertion.
|
||||||
pub fn retract(&mut self, handle: Handle) {
|
pub fn retract(&mut self, handle: Handle) {
|
||||||
if let Some(f) = self.active_facet() {
|
if let Some(f) = self.active_facet() {
|
||||||
|
@ -782,9 +787,28 @@ impl<'activation> Activation<'activation> {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Schedule the creation of a new actor when the Activation commits.
|
||||||
|
///
|
||||||
|
/// The new actor will be "linked" to the active facet: if the new actor terminates, the
|
||||||
|
/// active facet is stopped, and if the active facet stops, the new actor's root facet is
|
||||||
|
/// stopped.
|
||||||
|
pub fn spawn_link<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
|
||||||
|
&mut self,
|
||||||
|
name: tracing::Span,
|
||||||
|
boot: F,
|
||||||
|
) {
|
||||||
|
let facet_id = self.facet.facet_id;
|
||||||
|
self.enqueue_for_myself_at_commit(Box::new(move |t| {
|
||||||
|
t.with_facet(true, facet_id, move |t| {
|
||||||
|
Actor::new().link(t).boot(name, boot);
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's
|
/// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's
|
||||||
/// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets],
|
/// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets],
|
||||||
pub fn facet<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
|
pub fn facet<F: FnOnce(&mut Activation) -> ActorResult>(
|
||||||
&mut self,
|
&mut self,
|
||||||
boot: F,
|
boot: F,
|
||||||
) -> Result<FacetId, Error> {
|
) -> Result<FacetId, Error> {
|
||||||
|
@ -847,9 +871,13 @@ impl<'activation> Activation<'activation> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop_if_inert(&mut self) {
|
fn stop_if_inert(&mut self) {
|
||||||
if self.state.facet_exists_and_is_inert(self.facet.facet_id) {
|
let facet_id = self.facet.facet_id;
|
||||||
self.stop_facet(self.facet.facet_id, None);
|
self.enqueue_for_myself_at_commit(Box::new(move |t| {
|
||||||
}
|
if t.state.facet_exists_and_is_inert(facet_id) {
|
||||||
|
t.stop_facet(facet_id, None);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn _terminate_facet(&mut self, facet_id: FacetId, orderly: bool) -> ActorResult {
|
fn _terminate_facet(&mut self, facet_id: FacetId, orderly: bool) -> ActorResult {
|
||||||
|
@ -1082,6 +1110,18 @@ impl Actor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn link(self, t_parent: &mut Activation) -> Self {
|
||||||
|
if t_parent.active_facet().is_none() {
|
||||||
|
panic!("No active facet when calling spawn_link");
|
||||||
|
}
|
||||||
|
self.ac_ref.root_facet_ref().activate(Account::new(crate::name!("link")), |t_child| {
|
||||||
|
t_parent.half_link(t_child);
|
||||||
|
t_child.half_link(t_parent);
|
||||||
|
Ok(())
|
||||||
|
}).expect("Failed during link");
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Start the actor's mainloop. Takes ownership of `self`. The
|
/// Start the actor's mainloop. Takes ownership of `self`. The
|
||||||
/// `name` is used as context for any log messages emitted by the
|
/// `name` is used as context for any log messages emitted by the
|
||||||
/// actor. The `boot` function is called in the actor's context,
|
/// actor. The `boot` function is called in the actor's context,
|
||||||
|
@ -1111,12 +1151,7 @@ impl Actor {
|
||||||
&mut self,
|
&mut self,
|
||||||
boot: F,
|
boot: F,
|
||||||
) -> () {
|
) -> () {
|
||||||
let root = self.ac_ref.access(|s| match s.expect("New actor missing its state") {
|
let root_facet_ref = self.ac_ref.root_facet_ref();
|
||||||
ActorState::Terminated { .. } => panic!("New actor unexpectedly in terminated state"),
|
|
||||||
ActorState::Running(ra) => ra.root, // what a lot of work to get this one number
|
|
||||||
});
|
|
||||||
|
|
||||||
let root_facet_ref = self.ac_ref.facet_ref(root);
|
|
||||||
|
|
||||||
let terminate = |result: ActorResult| {
|
let terminate = |result: ActorResult| {
|
||||||
let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")),
|
let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")),
|
||||||
|
@ -1185,6 +1220,14 @@ impl Facet {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn insert_retract_cleanup_action<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, handle: Handle) {
|
||||||
|
let r = Arc::clone(r);
|
||||||
|
self.cleanup_actions.insert(
|
||||||
|
handle,
|
||||||
|
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
|
||||||
|
move |t| t.with_entity(&r, |t, e| e.retract(t, handle)))));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn panicked_err() -> Option<ActorResult> {
|
fn panicked_err() -> Option<ActorResult> {
|
||||||
|
@ -1221,6 +1264,17 @@ impl ActorRef {
|
||||||
facet_id,
|
facet_id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn root_facet_id(&self) -> FacetId {
|
||||||
|
self.access(|s| match s.expect("Actor missing its state") {
|
||||||
|
ActorState::Terminated { .. } => panic!("Actor unexpectedly in terminated state"),
|
||||||
|
ActorState::Running(ra) => ra.root, // what a lot of work to get this one number
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn root_facet_ref(&self) -> FacetRef {
|
||||||
|
self.facet_ref(self.root_facet_id())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RunningActor {
|
impl RunningActor {
|
||||||
|
@ -1494,6 +1548,13 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<M> Entity<M> for StopOnRetract {
|
||||||
|
fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult {
|
||||||
|
t.stop();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A convenient Syndicate-enhanced variation on
|
/// A convenient Syndicate-enhanced variation on
|
||||||
/// [`tracing::info_span`].
|
/// [`tracing::info_span`].
|
||||||
///
|
///
|
||||||
|
|
|
@ -85,6 +85,22 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn on_asserted_facet<Fa1>(
|
||||||
|
self,
|
||||||
|
mut assertion_handler: Fa1,
|
||||||
|
) -> DuringEntity<M, E, Box<dyn 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult<E>>, Fm>
|
||||||
|
where
|
||||||
|
Fa1: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult
|
||||||
|
{
|
||||||
|
self.on_asserted(Box::new(move |state, t, a| {
|
||||||
|
let facet_id = t.facet(|t| assertion_handler(state, t, a))?;
|
||||||
|
Ok(Some(Box::new(move |_state, t| {
|
||||||
|
t.stop_facet(facet_id, None);
|
||||||
|
Ok(())
|
||||||
|
})))
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn on_message<Fm1>(self, message_handler: Fm1) -> DuringEntity<M, E, Fa, Fm1>
|
pub fn on_message<Fm1>(self, message_handler: Fm1) -> DuringEntity<M, E, Fa, Fm1>
|
||||||
where
|
where
|
||||||
Fm1: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult,
|
Fm1: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult,
|
||||||
|
|
|
@ -36,12 +36,12 @@ pub fn error<Detail>(message: &str, detail: Detail) -> Error where _Any: From<De
|
||||||
pub fn encode_error(result: Result<(), Error>) -> _Any {
|
pub fn encode_error(result: Result<(), Error>) -> _Any {
|
||||||
match result {
|
match result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
let mut r = Value::record(Value::symbol("Ok").wrap(), 1);
|
let mut r = Value::record(_Any::symbol("Ok"), 1);
|
||||||
r.fields_vec_mut().push(Value::record(Value::symbol("tuple").wrap(), 0).finish().wrap());
|
r.fields_vec_mut().push(Value::record(_Any::symbol("tuple"), 0).finish().wrap());
|
||||||
r.finish().wrap()
|
r.finish().wrap()
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let mut r = Value::record(Value::symbol("Err").wrap(), 1);
|
let mut r = Value::record(_Any::symbol("Err"), 1);
|
||||||
r.fields_vec_mut().push((&e).into());
|
r.fields_vec_mut().push((&e).into());
|
||||||
r.finish().wrap()
|
r.finish().wrap()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue