2021-08-30 10:08:11 +00:00
|
|
|
use notify::DebouncedEvent;
|
|
|
|
use notify::Watcher;
|
|
|
|
use notify::RecursiveMode;
|
|
|
|
use notify::watcher;
|
|
|
|
|
2022-01-19 13:40:50 +00:00
|
|
|
use syndicate::preserves::rec;
|
|
|
|
|
2021-08-30 10:08:11 +00:00
|
|
|
use std::fs;
|
|
|
|
use std::future;
|
|
|
|
use std::io;
|
|
|
|
use std::path::PathBuf;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::sync::mpsc::channel;
|
|
|
|
use std::thread;
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use syndicate::actor::*;
|
2021-10-06 20:03:12 +00:00
|
|
|
use syndicate::error::Error;
|
2021-09-23 19:46:10 +00:00
|
|
|
use syndicate::enclose;
|
2021-09-28 10:53:11 +00:00
|
|
|
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
|
2022-01-19 13:40:50 +00:00
|
|
|
use syndicate::trace;
|
2021-08-30 10:08:11 +00:00
|
|
|
use syndicate::value::BinarySource;
|
2022-01-07 16:18:16 +00:00
|
|
|
use syndicate::value::BytesBinarySource;
|
2021-08-30 10:08:11 +00:00
|
|
|
use syndicate::value::Map;
|
|
|
|
use syndicate::value::NestedValue;
|
|
|
|
use syndicate::value::NoEmbeddedDomainCodec;
|
|
|
|
use syndicate::value::Reader;
|
|
|
|
use syndicate::value::ViaCodec;
|
|
|
|
|
2021-09-19 14:53:37 +00:00
|
|
|
use crate::language::language;
|
2021-09-28 10:53:11 +00:00
|
|
|
use crate::lifecycle;
|
2021-08-30 10:08:11 +00:00
|
|
|
use crate::schemas::internal_services;
|
2021-10-07 15:00:04 +00:00
|
|
|
use crate::script;
|
2021-08-30 10:08:11 +00:00
|
|
|
|
2021-09-20 13:10:31 +00:00
|
|
|
use syndicate_macros::during;
|
|
|
|
|
2021-10-07 15:00:04 +00:00
|
|
|
pub fn on_demand(t: &mut Activation, config_ds: Arc<Cap>) {
|
2022-01-19 13:40:50 +00:00
|
|
|
t.spawn(Some(AnyValue::symbol("config_watcher")), move |t| {
|
2023-11-10 22:19:22 +00:00
|
|
|
Ok(during!(t, config_ds, language(), <run-service $spec: internal_services::ConfigWatcher::<AnyValue>>, |t| {
|
2021-10-07 15:00:04 +00:00
|
|
|
Supervisor::start(
|
|
|
|
t,
|
2022-01-19 13:40:50 +00:00
|
|
|
Some(rec![AnyValue::symbol("config"), AnyValue::new(spec.path.clone())]),
|
2021-10-07 15:00:04 +00:00
|
|
|
SupervisorConfiguration::default(),
|
|
|
|
enclose!((config_ds, spec) lifecycle::updater(config_ds, spec)),
|
|
|
|
enclose!((config_ds) move |t| enclose!((config_ds, spec) run(t, config_ds, spec))))
|
|
|
|
}))
|
2021-09-01 15:31:01 +00:00
|
|
|
});
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
|
2021-10-06 20:03:12 +00:00
|
|
|
fn convert_notify_error(e: notify::Error) -> Error {
|
2021-08-30 10:08:11 +00:00
|
|
|
syndicate::error::error(&format!("Notify error: {:?}", e), AnyValue::new(false))
|
|
|
|
}
|
|
|
|
|
2021-10-06 20:03:12 +00:00
|
|
|
fn process_existing_file(
|
|
|
|
t: &mut Activation,
|
2021-10-07 15:00:04 +00:00
|
|
|
mut env: script::Env,
|
2021-10-06 20:03:12 +00:00
|
|
|
) -> io::Result<Option<FacetId>> {
|
2022-01-07 16:18:16 +00:00
|
|
|
let mut contents = fs::read(&env.path)?;
|
|
|
|
contents.append(&mut Vec::from("\n[]".as_bytes())); // improved ergonomics of trailing comments
|
|
|
|
let tokens: Vec<AnyValue> = BytesBinarySource::new(&contents)
|
2021-10-06 20:03:12 +00:00
|
|
|
.text::<AnyValue, _>(ViaCodec::new(NoEmbeddedDomainCodec))
|
|
|
|
.configured(true)
|
|
|
|
.collect::<Result<Vec<_>, _>>()?;
|
2021-10-07 15:00:04 +00:00
|
|
|
match script::Parser::new(&tokens).parse_top("config") {
|
|
|
|
Ok(Some(i)) => Ok(Some(t.facet(|t| {
|
2022-01-15 22:23:48 +00:00
|
|
|
tracing::debug!("Instructions for file {:?}: {:#?}", &env.path, &i);
|
2021-10-07 15:00:04 +00:00
|
|
|
env.safe_eval(t, &i);
|
|
|
|
Ok(())
|
|
|
|
}).expect("Successful facet startup"))),
|
|
|
|
Ok(None) => Ok(None),
|
|
|
|
Err(errors) => {
|
|
|
|
for e in errors {
|
|
|
|
tracing::error!(path = ?env.path, message = %e);
|
2021-10-06 20:03:12 +00:00
|
|
|
}
|
2021-10-07 16:10:59 +00:00
|
|
|
Ok(None)
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-06 20:03:12 +00:00
|
|
|
fn process_path(
|
|
|
|
t: &mut Activation,
|
2021-10-07 15:00:04 +00:00
|
|
|
env: script::Env,
|
2021-10-06 20:03:12 +00:00
|
|
|
) -> io::Result<Option<FacetId>> {
|
2021-10-07 15:00:04 +00:00
|
|
|
match fs::metadata(&env.path) {
|
2021-08-30 10:08:11 +00:00
|
|
|
Ok(md) => if md.is_file() {
|
2021-10-07 15:00:04 +00:00
|
|
|
process_existing_file(t, env)
|
2021-08-30 10:08:11 +00:00
|
|
|
} else {
|
2021-10-06 20:03:12 +00:00
|
|
|
Ok(None)
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
Err(e) => if e.kind() != io::ErrorKind::NotFound {
|
|
|
|
Err(e)?
|
|
|
|
} else {
|
2021-10-06 20:03:12 +00:00
|
|
|
Ok(None)
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn is_hidden(path: &PathBuf) -> bool {
|
|
|
|
match path.file_name().and_then(|n| n.to_str()) {
|
|
|
|
Some(n) => n.starts_with("."),
|
|
|
|
None => true, // ?
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-07 19:37:24 +00:00
|
|
|
fn should_process(path: &PathBuf) -> bool {
|
|
|
|
path.file_name().and_then(|n| n.to_str()).map(|n| n.ends_with(".pr")).unwrap_or(false)
|
|
|
|
}
|
|
|
|
|
2021-08-30 10:08:11 +00:00
|
|
|
fn scan_file(
|
|
|
|
t: &mut Activation,
|
2021-10-06 20:03:12 +00:00
|
|
|
path_state: &mut Map<PathBuf, FacetId>,
|
2021-10-07 15:00:04 +00:00
|
|
|
env: script::Env,
|
2021-08-30 11:24:00 +00:00
|
|
|
) -> bool {
|
2021-10-07 15:00:04 +00:00
|
|
|
let path = env.path.clone();
|
2021-10-07 19:37:24 +00:00
|
|
|
if is_hidden(&path) || !should_process(&path) {
|
2021-08-30 11:24:00 +00:00
|
|
|
return true;
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
2021-10-07 15:00:04 +00:00
|
|
|
tracing::trace!("scan_file: scanning {:?}", &path);
|
|
|
|
match process_path(t, env) {
|
2021-10-06 20:03:12 +00:00
|
|
|
Ok(maybe_facet_id) => {
|
|
|
|
if let Some(facet_id) = maybe_facet_id {
|
2021-10-07 15:00:04 +00:00
|
|
|
tracing::info!("scan_file: processed {:?}", &path);
|
|
|
|
path_state.insert(path, facet_id);
|
2021-08-30 11:24:00 +00:00
|
|
|
}
|
|
|
|
true
|
2021-08-30 10:08:11 +00:00
|
|
|
},
|
2021-08-30 11:24:00 +00:00
|
|
|
Err(e) => {
|
2021-10-07 16:10:59 +00:00
|
|
|
tracing::error!("scan_file: {:?}: {:?}", &path, e);
|
2021-08-30 11:24:00 +00:00
|
|
|
false
|
|
|
|
}
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn initial_scan(
|
|
|
|
t: &mut Activation,
|
2021-10-06 20:03:12 +00:00
|
|
|
path_state: &mut Map<PathBuf, FacetId>,
|
|
|
|
config_ds: &Arc<Cap>,
|
2021-10-07 15:00:04 +00:00
|
|
|
env: script::Env,
|
2021-08-30 10:08:11 +00:00
|
|
|
) {
|
2021-10-07 15:00:04 +00:00
|
|
|
if is_hidden(&env.path) {
|
2021-08-30 10:08:11 +00:00
|
|
|
return;
|
|
|
|
}
|
2021-10-07 15:00:04 +00:00
|
|
|
match fs::metadata(&env.path) {
|
2021-08-30 10:08:11 +00:00
|
|
|
Ok(md) => if md.is_file() {
|
2021-10-07 15:00:04 +00:00
|
|
|
scan_file(t, path_state, env);
|
2021-08-30 10:08:11 +00:00
|
|
|
} else {
|
2021-10-07 15:00:04 +00:00
|
|
|
match fs::read_dir(&env.path) {
|
2022-02-04 15:59:29 +00:00
|
|
|
Ok(unsorted_entries) => {
|
|
|
|
let mut entries: Vec<fs::DirEntry> = Vec::new();
|
|
|
|
for er in unsorted_entries {
|
|
|
|
match er {
|
|
|
|
Ok(e) =>
|
|
|
|
entries.push(e),
|
|
|
|
Err(e) =>
|
|
|
|
tracing::warn!(
|
|
|
|
"initial_scan: transient during scan of {:?}: {:?}", &env.path, e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
entries.sort_by_key(|e| e.file_name());
|
|
|
|
for e in entries {
|
|
|
|
initial_scan(t, path_state, config_ds, env.clone_with_path(e.path()));
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
}
|
2021-10-07 15:00:04 +00:00
|
|
|
Err(e) => tracing::warn!("initial_scan: enumerating {:?}: {:?}", &env.path, e),
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
},
|
2021-10-07 15:00:04 +00:00
|
|
|
Err(e) => tracing::warn!("initial_scan: `stat`ing {:?}: {:?}", &env.path, e),
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-06 20:03:12 +00:00
|
|
|
fn run(
|
|
|
|
t: &mut Activation,
|
|
|
|
config_ds: Arc<Cap>,
|
|
|
|
spec: internal_services::ConfigWatcher,
|
|
|
|
) -> ActorResult {
|
2022-01-07 16:18:00 +00:00
|
|
|
lifecycle::terminate_on_service_restart(t, &config_ds, &spec);
|
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
let path = fs::canonicalize(spec.path.clone())?;
|
2021-10-07 15:00:04 +00:00
|
|
|
let env = script::Env::new(path, spec.env.0.clone());
|
2021-08-30 10:08:11 +00:00
|
|
|
|
2021-10-07 20:21:38 +00:00
|
|
|
tracing::info!(?env);
|
2021-08-30 10:08:11 +00:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
|
2021-10-05 17:09:32 +00:00
|
|
|
let mut watcher = watcher(tx, Duration::from_millis(100)).map_err(convert_notify_error)?;
|
2021-10-07 15:00:04 +00:00
|
|
|
watcher.watch(&env.path, RecursiveMode::Recursive).map_err(convert_notify_error)?;
|
2021-08-30 10:08:11 +00:00
|
|
|
|
2024-03-03 17:28:51 +00:00
|
|
|
let facet = t.facet_ref();
|
2022-01-19 13:40:50 +00:00
|
|
|
let trace_collector = t.trace_collector();
|
2021-10-07 20:21:38 +00:00
|
|
|
let span = tracing::Span::current();
|
2021-08-30 10:08:11 +00:00
|
|
|
thread::spawn(move || {
|
2021-10-07 20:21:38 +00:00
|
|
|
let _entry = span.enter();
|
|
|
|
|
2021-10-06 20:03:12 +00:00
|
|
|
let mut path_state: Map<PathBuf, FacetId> = Map::new();
|
2021-08-30 10:08:11 +00:00
|
|
|
|
2022-01-15 23:02:33 +00:00
|
|
|
{
|
2022-01-19 13:40:50 +00:00
|
|
|
let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("initial_scan"));
|
|
|
|
let account = Account::new(Some(AnyValue::symbol("initial_scan")), trace_collector.clone());
|
|
|
|
if !facet.activate(
|
|
|
|
&account, cause, |t| {
|
|
|
|
initial_scan(t, &mut path_state, &config_ds, env.clone());
|
|
|
|
config_ds.assert(t, language(), &lifecycle::ready(&spec));
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
{
|
|
|
|
return;
|
|
|
|
}
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
2022-01-15 23:02:33 +00:00
|
|
|
tracing::trace!("initial_scan complete");
|
2021-08-30 10:08:11 +00:00
|
|
|
|
|
|
|
let mut rescan = |paths: Vec<PathBuf>| {
|
2022-01-19 13:40:50 +00:00
|
|
|
let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("rescan"));
|
|
|
|
let account = Account::new(Some(AnyValue::symbol("rescan")), trace_collector.clone());
|
|
|
|
facet.activate(&account, cause, |t| {
|
2021-10-06 20:03:12 +00:00
|
|
|
let mut to_stop = Vec::new();
|
2021-08-30 10:08:11 +00:00
|
|
|
for path in paths.into_iter() {
|
2021-10-06 20:03:12 +00:00
|
|
|
let maybe_facet_id = path_state.remove(&path);
|
2021-10-07 15:00:04 +00:00
|
|
|
let new_content_ok =
|
|
|
|
scan_file(t, &mut path_state, env.clone_with_path(path.clone()));
|
2021-10-06 20:03:12 +00:00
|
|
|
if let Some(old_facet_id) = maybe_facet_id {
|
2021-08-30 11:24:00 +00:00
|
|
|
if new_content_ok {
|
2021-10-06 20:03:12 +00:00
|
|
|
to_stop.push(old_facet_id);
|
2021-08-30 11:24:00 +00:00
|
|
|
} else {
|
2021-10-06 20:03:12 +00:00
|
|
|
path_state.insert(path, old_facet_id);
|
2021-08-30 11:24:00 +00:00
|
|
|
}
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
}
|
2021-10-06 20:03:12 +00:00
|
|
|
for facet_id in to_stop.into_iter() {
|
2021-10-07 14:59:34 +00:00
|
|
|
t.stop_facet(facet_id);
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
2022-01-15 23:02:33 +00:00
|
|
|
})
|
2021-08-30 10:08:11 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
while let Ok(event) = rx.recv() {
|
|
|
|
tracing::trace!("notification: {:?}", &event);
|
2022-01-15 23:02:33 +00:00
|
|
|
let keep_running = match event {
|
2021-08-30 10:08:11 +00:00
|
|
|
DebouncedEvent::NoticeWrite(_p) |
|
|
|
|
DebouncedEvent::NoticeRemove(_p) =>
|
2022-01-15 23:02:33 +00:00
|
|
|
true,
|
2021-08-30 10:08:11 +00:00
|
|
|
DebouncedEvent::Create(p) |
|
|
|
|
DebouncedEvent::Write(p) |
|
|
|
|
DebouncedEvent::Chmod(p) |
|
|
|
|
DebouncedEvent::Remove(p) =>
|
|
|
|
rescan(vec![p]),
|
|
|
|
DebouncedEvent::Rename(p, q) =>
|
|
|
|
rescan(vec![p, q]),
|
2022-01-15 23:02:33 +00:00
|
|
|
_ => {
|
|
|
|
tracing::info!("{:?}", event);
|
|
|
|
true
|
|
|
|
}
|
|
|
|
};
|
|
|
|
if !keep_running { break; }
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
|
2022-01-19 13:40:50 +00:00
|
|
|
{
|
|
|
|
let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("termination"));
|
|
|
|
let account = Account::new(Some(AnyValue::symbol("termination")), trace_collector);
|
|
|
|
facet.activate(&account, cause, |t| {
|
|
|
|
tracing::trace!("linked thread terminating associated facet");
|
|
|
|
Ok(t.stop())
|
|
|
|
});
|
|
|
|
}
|
2021-08-30 10:08:11 +00:00
|
|
|
|
|
|
|
tracing::trace!("linked thread done");
|
|
|
|
});
|
|
|
|
|
2022-01-19 13:40:50 +00:00
|
|
|
t.linked_task(Some(AnyValue::symbol("cancel-wait")), async move {
|
2021-08-30 10:08:11 +00:00
|
|
|
future::pending::<()>().await;
|
|
|
|
drop(watcher);
|
2021-09-28 10:53:11 +00:00
|
|
|
Ok(LinkedTaskTermination::KeepFacet)
|
2021-08-30 10:08:11 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|