2021-08-30 10:08:11 +00:00
|
|
|
use notify::DebouncedEvent;
|
|
|
|
use notify::Watcher;
|
|
|
|
use notify::RecursiveMode;
|
|
|
|
use notify::watcher;
|
|
|
|
|
|
|
|
use std::convert::TryFrom;
|
|
|
|
use std::fs;
|
|
|
|
use std::future;
|
|
|
|
use std::io;
|
|
|
|
use std::path::PathBuf;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::sync::mpsc::channel;
|
|
|
|
use std::thread;
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use syndicate::actor::*;
|
|
|
|
use syndicate::convert::*;
|
|
|
|
use syndicate::during::entity;
|
|
|
|
use syndicate::schemas::dataspace::Observe;
|
|
|
|
use syndicate::value::BinarySource;
|
|
|
|
use syndicate::value::IOBinarySource;
|
|
|
|
use syndicate::value::Map;
|
|
|
|
use syndicate::value::NestedValue;
|
|
|
|
use syndicate::value::NoEmbeddedDomainCodec;
|
|
|
|
use syndicate::value::Reader;
|
|
|
|
use syndicate::value::Set;
|
|
|
|
use syndicate::value::ViaCodec;
|
|
|
|
|
|
|
|
use crate::schemas::internal_services;
|
|
|
|
|
|
|
|
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
|
|
|
|
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
|
|
|
|
let monitor = entity(())
|
|
|
|
.on_asserted_facet({
|
|
|
|
let ds = Arc::clone(&ds);
|
|
|
|
move |_, t, captures| {
|
|
|
|
let ds = Arc::clone(&ds);
|
|
|
|
t.spawn_link(syndicate::name!(parent: None, "config", spec = ?captures),
|
|
|
|
|t| run(t, ds, captures));
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.create_cap(t);
|
|
|
|
ds.assert(t, &Observe {
|
2021-08-31 14:19:29 +00:00
|
|
|
pattern: syndicate_macros::pattern!{<require-service $(<config-watcher _>)>},
|
2021-08-30 10:08:11 +00:00
|
|
|
observer: monitor,
|
|
|
|
});
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
fn convert_notify_error(e: notify::Error) -> syndicate::error::Error {
|
|
|
|
syndicate::error::error(&format!("Notify error: {:?}", e), AnyValue::new(false))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn assertions_at_existing_file(t: &mut Activation, ds: &Arc<Cap>, path: &PathBuf) -> io::Result<Set<Handle>> {
|
|
|
|
let mut handles = Set::new();
|
|
|
|
let fh = fs::File::open(path)?;
|
|
|
|
let mut src = IOBinarySource::new(fh);
|
|
|
|
let mut r = src.text::<_, AnyValue, _>(ViaCodec::new(NoEmbeddedDomainCodec));
|
2021-08-30 11:24:00 +00:00
|
|
|
let mut values = Vec::new();
|
2021-08-30 10:08:11 +00:00
|
|
|
while let Some(value) = Reader::<_, AnyValue>::next(&mut r, true)? {
|
2021-08-30 11:24:00 +00:00
|
|
|
values.push(value);
|
|
|
|
}
|
|
|
|
for value in values.into_iter() {
|
2021-08-30 10:08:11 +00:00
|
|
|
if let Some(handle) = ds.assert(t, value.clone()) {
|
|
|
|
handles.insert(handle);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(handles)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn assertions_at_path(t: &mut Activation, ds: &Arc<Cap>, path: &PathBuf) -> io::Result<Set<Handle>> {
|
|
|
|
match fs::metadata(path) {
|
|
|
|
Ok(md) => if md.is_file() {
|
|
|
|
assertions_at_existing_file(t, ds, path)
|
|
|
|
} else {
|
|
|
|
Ok(Set::new())
|
|
|
|
}
|
|
|
|
Err(e) => if e.kind() != io::ErrorKind::NotFound {
|
|
|
|
Err(e)?
|
|
|
|
} else {
|
|
|
|
Ok(Set::new())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn is_hidden(path: &PathBuf) -> bool {
|
|
|
|
match path.file_name().and_then(|n| n.to_str()) {
|
|
|
|
Some(n) => n.starts_with("."),
|
|
|
|
None => true, // ?
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn scan_file(
|
|
|
|
t: &mut Activation,
|
|
|
|
path_state: &mut Map<PathBuf, Set<Handle>>,
|
|
|
|
ds: &Arc<Cap>,
|
|
|
|
path: &PathBuf,
|
2021-08-30 11:24:00 +00:00
|
|
|
) -> bool {
|
2021-08-30 10:08:11 +00:00
|
|
|
if is_hidden(path) {
|
2021-08-30 11:24:00 +00:00
|
|
|
return true;
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
tracing::info!("scan_file: {:?}", path);
|
|
|
|
match assertions_at_path(t, ds, path) {
|
2021-08-30 11:24:00 +00:00
|
|
|
Ok(new_handles) => {
|
|
|
|
if !new_handles.is_empty() {
|
|
|
|
path_state.insert(path.clone(), new_handles);
|
|
|
|
}
|
|
|
|
true
|
2021-08-30 10:08:11 +00:00
|
|
|
},
|
2021-08-30 11:24:00 +00:00
|
|
|
Err(e) => {
|
|
|
|
tracing::warn!("scan_file: {:?}: {:?}", path, e);
|
|
|
|
false
|
|
|
|
}
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn initial_scan(
|
|
|
|
t: &mut Activation,
|
|
|
|
path_state: &mut Map<PathBuf, Set<Handle>>,
|
|
|
|
ds: &Arc<Cap>,
|
|
|
|
path: &PathBuf,
|
|
|
|
) {
|
|
|
|
if is_hidden(path) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
match fs::metadata(path) {
|
|
|
|
Ok(md) => if md.is_file() {
|
2021-08-30 11:24:00 +00:00
|
|
|
scan_file(t, path_state, ds, path);
|
2021-08-30 10:08:11 +00:00
|
|
|
} else {
|
|
|
|
match fs::read_dir(path) {
|
|
|
|
Ok(entries) => for er in entries {
|
|
|
|
match er {
|
|
|
|
Ok(e) => initial_scan(t, path_state, ds, &e.path()),
|
|
|
|
Err(e) => tracing::warn!(
|
|
|
|
"initial_scan: transient during scan of {:?}: {:?}", path, e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => tracing::warn!("initial_scan: enumerating {:?}: {:?}", path, e),
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(e) => tracing::warn!("initial_scan: `stat`ing {:?}: {:?}", path, e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn run(t: &mut Activation, ds: Arc<Cap>, captures: AnyValue) -> ActorResult {
|
|
|
|
let spec = internal_services::ConfigWatcher::try_from(&from_any_value(
|
|
|
|
&captures.value().to_sequence()?[0])?)?;
|
|
|
|
{
|
|
|
|
let spec = from_io_value(&spec)?;
|
|
|
|
ds.assert(t, syndicate_macros::template!("<service-running =spec>"));
|
|
|
|
}
|
|
|
|
let path = fs::canonicalize(spec.path)?;
|
|
|
|
|
|
|
|
tracing::info!("watching {:?}", &path);
|
|
|
|
let (tx, rx) = channel();
|
|
|
|
|
|
|
|
let mut watcher = watcher(tx, Duration::from_secs(1)).map_err(convert_notify_error)?;
|
|
|
|
watcher.watch(&path, RecursiveMode::Recursive).map_err(convert_notify_error)?;
|
|
|
|
|
|
|
|
let facet = t.facet.clone();
|
|
|
|
thread::spawn(move || {
|
|
|
|
let mut path_state: Map<PathBuf, Set<Handle>> = Map::new();
|
|
|
|
|
|
|
|
{
|
|
|
|
let root_path = path.clone().into();
|
2021-08-31 14:21:00 +00:00
|
|
|
facet.activate(Account::new(syndicate::name!("initial_scan")), |t| {
|
2021-08-30 10:08:11 +00:00
|
|
|
initial_scan(t, &mut path_state, &ds, &root_path);
|
|
|
|
Ok(())
|
|
|
|
}).unwrap();
|
|
|
|
tracing::trace!("initial_scan complete");
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut rescan = |paths: Vec<PathBuf>| {
|
2021-08-31 14:21:00 +00:00
|
|
|
facet.activate(Account::new(syndicate::name!("rescan")), |t| {
|
2021-08-30 10:08:11 +00:00
|
|
|
let mut to_retract = Set::new();
|
|
|
|
for path in paths.into_iter() {
|
2021-08-30 11:24:00 +00:00
|
|
|
let maybe_handles = path_state.remove(&path);
|
|
|
|
let new_content_ok = scan_file(t, &mut path_state, &ds, &path);
|
|
|
|
if let Some(old_handles) = maybe_handles {
|
|
|
|
if new_content_ok {
|
|
|
|
to_retract.extend(old_handles.into_iter());
|
|
|
|
} else {
|
|
|
|
path_state.insert(path, old_handles);
|
|
|
|
}
|
2021-08-30 10:08:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
for h in to_retract.into_iter() {
|
|
|
|
t.retract(h);
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}).unwrap()
|
|
|
|
};
|
|
|
|
|
|
|
|
while let Ok(event) = rx.recv() {
|
|
|
|
tracing::trace!("notification: {:?}", &event);
|
|
|
|
match event {
|
|
|
|
DebouncedEvent::NoticeWrite(_p) |
|
|
|
|
DebouncedEvent::NoticeRemove(_p) =>
|
|
|
|
(),
|
|
|
|
DebouncedEvent::Create(p) |
|
|
|
|
DebouncedEvent::Write(p) |
|
|
|
|
DebouncedEvent::Chmod(p) |
|
|
|
|
DebouncedEvent::Remove(p) =>
|
|
|
|
rescan(vec![p]),
|
|
|
|
DebouncedEvent::Rename(p, q) =>
|
|
|
|
rescan(vec![p, q]),
|
|
|
|
_ =>
|
|
|
|
tracing::info!("{:?}", event),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-31 14:21:00 +00:00
|
|
|
let _ = facet.activate(Account::new(syndicate::name!("termination")), |t| {
|
2021-08-30 10:08:11 +00:00
|
|
|
tracing::trace!("linked thread terminating associated facet");
|
|
|
|
t.stop();
|
|
|
|
Ok(())
|
|
|
|
});
|
|
|
|
|
|
|
|
tracing::trace!("linked thread done");
|
|
|
|
});
|
|
|
|
|
|
|
|
t.linked_task(syndicate::name!("cancel-wait"), async move {
|
|
|
|
future::pending::<()>().await;
|
|
|
|
drop(watcher);
|
|
|
|
Ok(())
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|