From 9b7febb8d7f6055b787bd23afa1133ec46613f01 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 30 Aug 2021 12:08:11 +0200 Subject: [PATCH] ConfigWatcher --- Cargo.lock | 247 ++++++++++++++++-- syndicate-server/Cargo.toml | 2 + syndicate-server/protocols/schema-bundle.bin | 2 +- .../protocols/schemas/internalServices.prs | 1 + syndicate-server/src/main.rs | 13 + .../src/services/config_watcher.rs | 219 ++++++++++++++++ syndicate-server/src/services/mod.rs | 1 + 7 files changed, 457 insertions(+), 28 deletions(-) create mode 100644 syndicate-server/src/services/config_watcher.rs diff --git a/Cargo.lock b/Cargo.lock index 0585b0e..d36733b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,7 +17,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -26,7 +26,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -37,7 +37,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -112,6 +112,12 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70cc2f62c6ce1868963827bd677764c62d07c3d9a3e1fb1177ee1a9ab199eb2" +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -127,7 +133,7 @@ dependencies = [ "libc", "num-integer", "num-traits", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -218,7 +224,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -228,7 +234,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-epoch", "crossbeam-utils", ] @@ -239,7 +245,7 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", "lazy_static", "memoffset", @@ -252,7 +258,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "lazy_static", ] @@ -309,6 +315,18 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "filetime" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "975ccf83d8d9d0d84682850a38c8169027be83368805971cc4f238c2b245bc98" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "winapi 0.3.9", +] + [[package]] name = "fnv" version = "1.0.7" @@ -340,6 +358,41 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6" +dependencies = [ + "bitflags", + "fsevent-sys", +] + +[[package]] +name = "fsevent-sys" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0" +dependencies = [ + "libc", +] + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + [[package]] name = "futures" version = "0.3.16" @@ -450,7 +503,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi", ] @@ -523,6 +576,26 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "inotify" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4816c66d2c8ae673df83366c18341538f234a26d65a9ecea5c348b453ac1d02f" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "input_buffer" version = "0.4.0" @@ -532,6 +605,15 @@ dependencies = [ "bytes", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "itertools" version = "0.10.1" @@ -556,12 +638,28 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.99" @@ -574,7 +672,7 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -607,6 +705,25 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mio" +version = "0.6.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" +dependencies = [ + "cfg-if 0.1.10", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow 0.2.2", + "net2", + "slab", + "winapi 0.2.8", +] + [[package]] name = "mio" version = "0.7.13" @@ -615,9 +732,33 @@ checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" dependencies = [ "libc", "log", - "miow", + "miow 0.3.7", "ntapi", - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "mio-extras" +version = "2.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" +dependencies = [ + "lazycell", + "log", + "mio 0.6.23", + "slab", +] + +[[package]] +name = "miow" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", ] [[package]] @@ -626,7 +767,7 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -647,13 +788,42 @@ dependencies = [ "tempfile", ] +[[package]] +name = "net2" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "notify" +version = "4.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae03c8c853dba7bfd23e571ff0cff7bc9dceb40a4cd684cd1681824183f45257" +dependencies = [ + "bitflags", + "filetime", + "fsevent", + "fsevent-sys", + "inotify", + "libc", + "mio 0.6.23", + "mio-extras", + "walkdir", + "winapi 0.3.9", +] + [[package]] name = "ntapi" version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -767,7 +937,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "549430950c79ae24e6d02e0b7404534ecf311d94cc9f861e9e4020187d13d885" dependencies = [ "bitflags", - "cfg-if", + "cfg-if 1.0.0", "foreign-types", "libc", "once_cell", @@ -1066,7 +1236,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1100,7 +1270,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" dependencies = [ "lazy_static", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1195,7 +1365,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a0c8611594e2ab4ebbf06ec7cbbf0a99450b8570e96cbf5188b5d5f6ef18d81" dependencies = [ "block-buffer", - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", "opaque-debug", @@ -1208,7 +1378,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b362ae5752fd2137731f9fa25fd4d9058af34666ca1966fb969119cc35719f12" dependencies = [ "block-buffer", - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", "opaque-debug", @@ -1317,6 +1487,7 @@ name = "syndicate-server" version = "0.9.2" dependencies = [ "futures", + "notify", "preserves-schema", "structopt", "syndicate", @@ -1336,12 +1507,12 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "rand", "redox_syscall", "remove_dir_all", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1417,11 +1588,11 @@ dependencies = [ "bytes", "libc", "memchr", - "mio", + "mio 0.7.13", "num_cpus", "pin-project-lite", "tokio-macros", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1468,7 +1639,7 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -1653,7 +1824,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi", + "winapi 0.3.9", "winapi-util", ] @@ -1669,7 +1840,7 @@ version = "0.2.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b608ecc8f4198fe8680e2ed18eccab5f0cd4caaf3d83516fa5fb2e927fda2586" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -1727,6 +1898,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -1737,6 +1914,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -1749,7 +1932,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1757,3 +1940,13 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] diff --git a/syndicate-server/Cargo.toml b/syndicate-server/Cargo.toml index 70dae8d..e2f129b 100644 --- a/syndicate-server/Cargo.toml +++ b/syndicate-server/Cargo.toml @@ -19,6 +19,8 @@ syndicate-macros = { path = "../syndicate-macros", version = "^0.4.0"} futures = "0.3.5" +notify = "4.0.17" + structopt = "0.3.14" tungstenite = "0.13.0" diff --git a/syndicate-server/protocols/schema-bundle.bin b/syndicate-server/protocols/schema-bundle.bin index 6786710..14e4804 100644 --- a/syndicate-server/protocols/schema-bundle.bin +++ b/syndicate-server/protocols/schema-bundle.bin @@ -1 +1 @@ -´³bundle·µ³internalServices„´³schema·³version‘³ definitions·³ DebtReporter´³lit³ debt-reporter„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType€„„„„ \ No newline at end of file +´³bundle·µ³internalServices„´³schema·³version‘³ definitions·³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType€„„„„ \ No newline at end of file diff --git a/syndicate-server/protocols/schemas/internalServices.prs b/syndicate-server/protocols/schemas/internalServices.prs index 68919fc..1c8fdfd 100644 --- a/syndicate-server/protocols/schemas/internalServices.prs +++ b/syndicate-server/protocols/schemas/internalServices.prs @@ -4,3 +4,4 @@ DebtReporter = =debt-reporter . TcpRelayListener = . UnixRelayListener = . +ConfigWatcher = . diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 24e32f4..88b3d55 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -35,6 +35,9 @@ struct ServerConfig { #[structopt(long)] debt_reporter: bool, + + #[structopt(short = "c", long)] + config: Vec, } #[tokio::main] @@ -94,6 +97,7 @@ async fn main() -> Result<(), Box> { services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds)); services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); + services::config_watcher::on_demand(t, Arc::clone(&server_config_ds)); if config.debt_reporter { server_config_ds.assert(t, &service::RequireService { @@ -124,6 +128,15 @@ async fn main() -> Result<(), Box> { }); } + for path in config.config.clone() { + server_config_ds.assert(t, &service::RequireService { + service_name: from_io_value( + &internal_services::ConfigWatcher { + path: path.to_str().expect("representable ConfigWatcher path").to_owned(), + })?, + }); + } + Ok(()) }).await??; diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs new file mode 100644 index 0000000..4fa7d10 --- /dev/null +++ b/syndicate-server/src/services/config_watcher.rs @@ -0,0 +1,219 @@ +use notify::DebouncedEvent; +use notify::Watcher; +use notify::RecursiveMode; +use notify::watcher; + +use std::convert::TryFrom; +use std::fs; +use std::future; +use std::io; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::mpsc::channel; +use std::thread; +use std::time::Duration; + +use syndicate::actor::*; +use syndicate::convert::*; +use syndicate::during::entity; +use syndicate::schemas::dataspace::Observe; +use syndicate::value::BinarySource; +use syndicate::value::IOBinarySource; +use syndicate::value::Map; +use syndicate::value::NestedValue; +use syndicate::value::NoEmbeddedDomainCodec; +use syndicate::value::Reader; +use syndicate::value::Set; +use syndicate::value::ViaCodec; + +use crate::schemas::internal_services; + +pub fn on_demand(t: &mut Activation, ds: Arc) { + t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { + let monitor = entity(()) + .on_asserted_facet({ + let ds = Arc::clone(&ds); + move |_, t, captures| { + let ds = Arc::clone(&ds); + t.spawn_link(syndicate::name!(parent: None, "config", spec = ?captures), + |t| run(t, ds, captures)); + Ok(()) + } + }) + .create_cap(t); + ds.assert(t, &Observe { + pattern: syndicate_macros::pattern!(">>"), + observer: monitor, + }); + Ok(()) + }) +} + +fn convert_notify_error(e: notify::Error) -> syndicate::error::Error { + syndicate::error::error(&format!("Notify error: {:?}", e), AnyValue::new(false)) +} + +fn assertions_at_existing_file(t: &mut Activation, ds: &Arc, path: &PathBuf) -> io::Result> { + let mut handles = Set::new(); + let fh = fs::File::open(path)?; + let mut src = IOBinarySource::new(fh); + let mut r = src.text::<_, AnyValue, _>(ViaCodec::new(NoEmbeddedDomainCodec)); + while let Some(value) = Reader::<_, AnyValue>::next(&mut r, true)? { + if let Some(handle) = ds.assert(t, value.clone()) { + tracing::debug!("asserted {:?} -> {:?}", value, handle); + handles.insert(handle); + } + } + Ok(handles) +} + +fn assertions_at_path(t: &mut Activation, ds: &Arc, path: &PathBuf) -> io::Result> { + match fs::metadata(path) { + Ok(md) => if md.is_file() { + assertions_at_existing_file(t, ds, path) + } else { + Ok(Set::new()) + } + Err(e) => if e.kind() != io::ErrorKind::NotFound { + Err(e)? + } else { + Ok(Set::new()) + } + } +} + +fn is_hidden(path: &PathBuf) -> bool { + match path.file_name().and_then(|n| n.to_str()) { + Some(n) => n.starts_with("."), + None => true, // ? + } +} + +fn scan_file( + t: &mut Activation, + path_state: &mut Map>, + ds: &Arc, + path: &PathBuf, +) { + if is_hidden(path) { + return; + } + tracing::info!("scan_file: {:?}", path); + match assertions_at_path(t, ds, path) { + Ok(new_handles) => if !new_handles.is_empty() { + path_state.insert(path.clone(), new_handles); + }, + Err(e) => tracing::warn!("scan_file: {:?}: {:?}", path, e), + } +} + +fn initial_scan( + t: &mut Activation, + path_state: &mut Map>, + ds: &Arc, + path: &PathBuf, +) { + if is_hidden(path) { + return; + } + match fs::metadata(path) { + Ok(md) => if md.is_file() { + scan_file(t, path_state, ds, path) + } else { + match fs::read_dir(path) { + Ok(entries) => for er in entries { + match er { + Ok(e) => initial_scan(t, path_state, ds, &e.path()), + Err(e) => tracing::warn!( + "initial_scan: transient during scan of {:?}: {:?}", path, e), + } + } + Err(e) => tracing::warn!("initial_scan: enumerating {:?}: {:?}", path, e), + } + }, + Err(e) => tracing::warn!("initial_scan: `stat`ing {:?}: {:?}", path, e), + } +} + +fn run(t: &mut Activation, ds: Arc, captures: AnyValue) -> ActorResult { + let spec = internal_services::ConfigWatcher::try_from(&from_any_value( + &captures.value().to_sequence()?[0])?)?; + { + let spec = from_io_value(&spec)?; + ds.assert(t, syndicate_macros::template!("")); + } + let path = fs::canonicalize(spec.path)?; + + tracing::info!("watching {:?}", &path); + let (tx, rx) = channel(); + + let mut watcher = watcher(tx, Duration::from_secs(1)).map_err(convert_notify_error)?; + watcher.watch(&path, RecursiveMode::Recursive).map_err(convert_notify_error)?; + + let facet = t.facet.clone(); + thread::spawn(move || { + let mut path_state: Map> = Map::new(); + + let account = Account::new(syndicate::name!("watcher")); + + { + let root_path = path.clone().into(); + facet.activate(Arc::clone(&account), |t| { + initial_scan(t, &mut path_state, &ds, &root_path); + Ok(()) + }).unwrap(); + tracing::trace!("initial_scan complete"); + } + + let mut rescan = |paths: Vec| { + facet.activate(Arc::clone(&account), |t| { + let mut to_retract = Set::new(); + for path in paths.into_iter() { + if let Some(handles) = path_state.remove(&path) { + to_retract.extend(handles.into_iter()); + } + scan_file(t, &mut path_state, &ds, &path); + } + for h in to_retract.into_iter() { + tracing::debug!("retract {:?}", h); + t.retract(h); + } + Ok(()) + }).unwrap() + }; + + while let Ok(event) = rx.recv() { + tracing::trace!("notification: {:?}", &event); + match event { + DebouncedEvent::NoticeWrite(_p) | + DebouncedEvent::NoticeRemove(_p) => + (), + DebouncedEvent::Create(p) | + DebouncedEvent::Write(p) | + DebouncedEvent::Chmod(p) | + DebouncedEvent::Remove(p) => + rescan(vec![p]), + DebouncedEvent::Rename(p, q) => + rescan(vec![p, q]), + _ => + tracing::info!("{:?}", event), + } + } + + let _ = facet.activate(Arc::clone(&account), |t| { + tracing::trace!("linked thread terminating associated facet"); + t.stop(); + Ok(()) + }); + + tracing::trace!("linked thread done"); + }); + + t.linked_task(syndicate::name!("cancel-wait"), async move { + future::pending::<()>().await; + drop(watcher); + Ok(()) + }); + + Ok(()) +} diff --git a/syndicate-server/src/services/mod.rs b/syndicate-server/src/services/mod.rs index 6d7bc31..520a9fd 100644 --- a/syndicate-server/src/services/mod.rs +++ b/syndicate-server/src/services/mod.rs @@ -1,3 +1,4 @@ +pub mod config_watcher; pub mod debt_reporter; pub mod tcp_relay_listener; pub mod unix_relay_listener;