diff --git a/src/bag.rs b/src/bag.rs index 0573f3c..ec3eefa 100644 --- a/src/bag.rs +++ b/src/bag.rs @@ -16,6 +16,7 @@ pub enum Net { // Allows negative counts - a "delta" pub struct BTreeBag { counts: BTreeMap, + total: isize, } impl std::default::Default for BTreeBag { @@ -26,7 +27,7 @@ impl std::default::Default for BTreeBag { impl BTreeBag { pub fn new() -> BTreeBag { - BTreeBag { counts: BTreeMap::new() } + BTreeBag { counts: BTreeMap::new(), total: 0 } } pub fn change(&mut self, key: V, delta: Count) -> Net { self._change(key, delta, false) } @@ -36,6 +37,7 @@ impl BTreeBag { let old_count = self[&key]; let mut new_count = old_count + delta; if clamp { new_count = new_count.max(0) } + self.total = self.total + (new_count - old_count) as isize; if new_count == 0 { self.counts.remove(&key); if old_count == 0 { Net::AbsentToAbsent } else { Net::PresentToAbsent } @@ -47,6 +49,7 @@ impl BTreeBag { pub fn clear(&mut self) { self.counts.clear(); + self.total = 0; } pub fn contains_key(&self, key: &V) -> bool { @@ -61,6 +64,10 @@ impl BTreeBag { self.counts.len() } + pub fn total(&self) -> isize { + self.total + } + pub fn keys(&self) -> Keys { self.counts.keys() } diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index d427b1d..19b587e 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -120,6 +120,18 @@ async fn run_listener(spaces: Arc>, port: u16) -> UnitAsyn } } +async fn periodic_tasks(spaces: Arc>) -> UnitAsyncResult { + let mut delay = tokio::time::interval(core::time::Duration::from_secs(5)); + loop { + delay.next().await.unwrap(); + { + let mut spaces = spaces.lock().unwrap(); + spaces.cleanup(); + println!("{}", spaces.summary_string()); + } + } +} + #[tokio::main] async fn main() -> Result<(), Box> { let args = Cli::from_args(); @@ -127,6 +139,13 @@ async fn main() -> Result<(), Box> { let spaces = Arc::new(Mutex::new(spaces::Spaces::new())); let mut daemons = Vec::new(); + { + let spaces = Arc::clone(&spaces); + tokio::spawn(async move { + periodic_tasks(spaces).await + }); + } + for port in args.ports { let spaces = Arc::clone(&spaces); daemons.push(tokio::spawn(async move { diff --git a/src/dataspace.rs b/src/dataspace.rs index 1091771..04de707 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -56,6 +56,18 @@ impl Dataspace { self.deliver_outbound_turns(outbound_turns); } + pub fn peer_count(&self) -> usize { + self.peers.len() + } + + pub fn assertion_count(&self) -> usize { + self.index.assertion_count() + } + + pub fn endpoint_count(&self) -> isize { + self.index.endpoint_count() + } + fn remove_endpoint(&mut self, mut outbound_turns: &mut Map>, id: ConnId, diff --git a/src/skeleton.rs b/src/skeleton.rs index 1ee29cf..862ec75 100644 --- a/src/skeleton.rs +++ b/src/skeleton.rs @@ -168,6 +168,14 @@ impl Index { .perform(&mut self.root); outputs } + + pub fn assertion_count(&self) -> usize { + return self.all_assertions.len() + } + + pub fn endpoint_count(&self) -> isize { + return self.all_assertions.total() + } } #[derive(Debug)] diff --git a/src/spaces.rs b/src/spaces.rs index 9e53200..a3baf86 100644 --- a/src/spaces.rs +++ b/src/spaces.rs @@ -1,6 +1,8 @@ use super::V; use super::dataspace; +use std::sync::Arc; + use preserves::value::Map; pub struct Spaces { @@ -28,4 +30,25 @@ impl Spaces { space } + + pub fn cleanup(&mut self) { + self.index = self.index.iter() + .filter(|s| s.1.read().unwrap().peer_count() > 0) + .map(|(k,v)| (k.clone(), Arc::clone(v))) + .collect(); + } + + pub fn summary_string(&self) -> String { + let mut v = vec![]; + v.push(format!("{} dataspace(s)", self.index.len())); + for (dsname, dsref) in &self.index { + let ds = dsref.read().unwrap(); + v.push(format!(" {:?}: {} connection(s), {} assertion(s), {} endpoint(s)", + dsname, + ds.peer_count(), + ds.assertion_count(), + ds.endpoint_count())); + } + v.join("\n") + } }