Use `during!` macro in services

This commit is contained in:
Tony Garnock-Jones 2021-09-20 15:10:31 +02:00
parent d5b28097ef
commit 9a09cac5f7
7 changed files with 62 additions and 106 deletions

View File

@ -67,12 +67,20 @@ pub fn during(src: proc_macro::TokenStream) -> proc_macro::TokenStream {
Err(e) => return Error::new(Span::call_site(), e).to_compile_error().into(),
};
(quote_spanned!{Span::mixed_site()=> {
let ds = #ds_stx.clone();
let __ds = #ds_stx.clone();
let __lang = #lang_stx;
let monitor = syndicate::during::entity(())
.on_asserted_facet(move |_, t, captures: syndicate::actor::AnyValue| {
if let Some(captures) = captures.value().as_sequence() {
if let Some(captures) = {
use syndicate::value::NestedValue;
use syndicate::value::Value;
captures.value().as_sequence()
}{
if captures.len() == #binding_count {
#(let #varname_stx: #type_stx = match #lang_stx.parse(&captures[#index_stx]) {
#(let #varname_stx: #type_stx = match {
use syndicate::preserves_schema::Codec;
__lang.parse(&captures[#index_stx])
} {
Ok(v) => v,
Err(_) => return Ok(()),
};)*
@ -82,7 +90,7 @@ pub fn during(src: proc_macro::TokenStream) -> proc_macro::TokenStream {
Ok(())
})
.create_cap(#turn_stx);
ds.assert(#turn_stx, #lang_stx, &syndicate::schemas::dataspace::Observe {
__ds.assert(#turn_stx, __lang, &syndicate::schemas::dataspace::Observe {
pattern: #pat_stx_expr,
observer: monitor,
});

View File

@ -15,8 +15,6 @@ use std::thread;
use std::time::Duration;
use syndicate::actor::*;
use syndicate::during::entity;
use syndicate::schemas::dataspace::Observe;
use syndicate::value::BinarySource;
use syndicate::value::IOBinarySource;
use syndicate::value::Map;
@ -29,24 +27,17 @@ use syndicate::value::ViaCodec;
use crate::language::language;
use crate::schemas::internal_services;
use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
let monitor = entity(())
.on_asserted_facet({
let ds = Arc::clone(&ds);
move |_, t, captures| {
let ds = Arc::clone(&ds);
t.spawn_link(syndicate::name!(parent: None, "config", spec = ?captures),
|t| run(t, ds, captures));
Ok(())
}
})
.create_cap(t);
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<require-service ${<config-watcher _>}>},
observer: monitor,
});
Ok(())
Ok(during!(t, ds, language(), <require-service $spec: internal_services::ConfigWatcher>,
|t: &mut Activation| {
let ds = Arc::clone(&ds);
t.spawn_link(syndicate::name!(parent: None, "config", spec = ?spec),
|t| run(t, ds, spec));
Ok(())
}))
});
}
@ -145,8 +136,7 @@ fn initial_scan(
}
}
fn run(t: &mut Activation, ds: Arc<Cap>, captures: AnyValue) -> ActorResult {
let spec: internal_services::ConfigWatcher = language().parse(&captures.value().to_sequence()?[0])?;
fn run(t: &mut Activation, ds: Arc<Cap>, spec: internal_services::ConfigWatcher) -> ActorResult {
{
let spec = language().unparse(&spec);
ds.assert(t, &(), &syndicate_macros::template!("<service-running =spec>"));

View File

@ -4,24 +4,15 @@ use std::sync::Arc;
use syndicate::actor::*;
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use syndicate::value::NestedValue;
use crate::language::language;
use crate::schemas::external_services::DaemonService;
use syndicate_macros::during;
// use syndicate::schemas::dataspace_patterns::*;
// impl DaemonService {
// fn wildcard_dataspace_pattern() -> Pattern {
// Pattern::DDiscard(Box::new(DDiscard))
// }
// }
pub fn on_demand(t: &mut Activation, config_ds: Arc<Cap>, root_ds: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
during!(t, config_ds, language(), <require-service $spec: DaemonService>, |t| {
Ok(during!(t, config_ds, language(), <require-service $spec: DaemonService>, |t| {
let config_ds = Arc::clone(&config_ds);
let root_ds = Arc::clone(&root_ds);
Ok(Supervisor::start(
@ -29,9 +20,7 @@ pub fn on_demand(t: &mut Activation, config_ds: Arc<Cap>, root_ds: Arc<Cap>) {
syndicate::name!(parent: None, "daemon", service = ?spec),
SupervisorConfiguration::default(),
move |t| run(t, Arc::clone(&config_ds), Arc::clone(&root_ds), spec.clone())))
});
Ok(())
}))
});
}

View File

@ -1,37 +1,25 @@
use preserves_schema::Codec;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::during::entity;
use syndicate::schemas::dataspace::Observe;
use syndicate::preserves_schema::Codec;
use crate::language::language;
use crate::schemas::internal_services;
use crate::schemas::internal_services::DebtReporter;
use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
let monitor = entity(())
.on_asserted_facet({
let ds = Arc::clone(&ds);
move |_, t, _| {
let ds = Arc::clone(&ds);
t.spawn_link(tracing::Span::current(), |t| run(t, ds));
Ok(())
}
})
.create_cap(t);
let spec = language().unparse(&internal_services::DebtReporter);
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<require-service #(spec)>},
observer: monitor,
});
Ok(())
Ok(during!(t, ds, language(), <require-service $_spec: DebtReporter>, |t: &mut Activation| {
let ds = Arc::clone(&ds);
t.spawn_link(tracing::Span::current(), |t| run(t, ds));
Ok(())
}))
});
}
fn run(t: &mut Activation, ds: Arc<Cap>) -> ActorResult {
let spec = language().unparse(&internal_services::DebtReporter);
let spec = language().unparse(&DebtReporter);
ds.assert(t, &(), &syndicate_macros::template!("<service-running =spec>"));
t.linked_task(syndicate::name!("tick"), async {
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));

View File

@ -4,10 +4,7 @@ use std::convert::TryFrom;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::during::entity;
use syndicate::schemas::dataspace::Observe;
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use syndicate::value::NestedValue;
use tokio::net::TcpListener;
@ -15,28 +12,21 @@ use crate::language::language;
use crate::protocol::detect_protocol;
use crate::schemas::internal_services;
use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>, gateway: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
let monitor = entity(())
.on_asserted_facet({
let ds = Arc::clone(&ds);
move |_, t, captures: AnyValue| {
let ds = Arc::clone(&ds);
let gateway = Arc::clone(&gateway);
Supervisor::start(
t,
syndicate::name!(parent: None, "relay", addr = ?captures),
SupervisorConfiguration::default(),
move |t| run(t, Arc::clone(&ds), Arc::clone(&gateway), captures.clone()));
Ok(())
}
})
.create_cap(t);
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<require-service ${<relay-listener <tcp _ _>>}>},
observer: monitor,
});
Ok(())
Ok(during!(t, ds, language(), <require-service $spec: internal_services::TcpRelayListener>,
|t| {
let ds = Arc::clone(&ds);
let gateway = Arc::clone(&gateway);
Supervisor::start(
t,
syndicate::name!(parent: None, "relay", addr = ?spec),
SupervisorConfiguration::default(),
move |t| run(t, Arc::clone(&ds), Arc::clone(&gateway), spec.clone()));
Ok(())
}))
});
}
@ -44,9 +34,8 @@ fn run(
t: &'_ mut Activation,
ds: Arc<Cap>,
gateway: Arc<Cap>,
captures: AnyValue,
spec: internal_services::TcpRelayListener,
) -> ActorResult {
let spec: internal_services::TcpRelayListener = language().parse(&captures.value().to_sequence()?[0])?;
let host = spec.addr.host.clone();
let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?;
{

View File

@ -5,11 +5,8 @@ use std::path::PathBuf;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::during::entity;
use syndicate::error::Error;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::value::NestedValue;
use tokio::net::UnixListener;
use tokio::net::UnixStream;
@ -18,25 +15,18 @@ use crate::language::language;
use crate::protocol::run_connection;
use crate::schemas::internal_services;
use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>, gateway: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
let monitor = entity(())
.on_asserted_facet({
let ds = Arc::clone(&ds);
move |_, t, captures| {
let ds = Arc::clone(&ds);
let gateway = Arc::clone(&gateway);
t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?captures),
|t| run(t, ds, gateway, captures));
Ok(())
}
})
.create_cap(t);
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<require-service ${<relay-listener <unix _>>}>},
observer: monitor,
});
Ok(())
Ok(during!(t, ds, language(), <require-service $spec: internal_services::UnixRelayListener>,
|t: &mut Activation| {
let ds = Arc::clone(&ds);
let gateway = Arc::clone(&gateway);
t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?spec),
|t| run(t, ds, gateway, spec));
Ok(())
}))
});
}
@ -44,9 +34,8 @@ fn run(
t: &'_ mut Activation,
ds: Arc<Cap>,
gateway: Arc<Cap>,
captures: AnyValue,
spec: internal_services::UnixRelayListener,
) -> ActorResult {
let spec: internal_services::UnixRelayListener = language().parse(&captures.value().to_sequence()?[0])?;
let path_str = spec.addr.path.clone();
{
let spec = language().unparse(&spec);

View File

@ -6,6 +6,9 @@ pub use preserves::value;
#[doc(inline)]
pub use preserves;
#[doc(inline)]
pub use preserves_schema;
pub mod actor;
pub mod bag;
pub mod dataspace;