diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index 19b587e..6459c41 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -121,13 +121,14 @@ async fn run_listener(spaces: Arc>, port: u16) -> UnitAsyn } async fn periodic_tasks(spaces: Arc>) -> UnitAsyncResult { - let mut delay = tokio::time::interval(core::time::Duration::from_secs(5)); + let interval = core::time::Duration::from_secs(5); + let mut delay = tokio::time::interval(interval); loop { delay.next().await.unwrap(); { let mut spaces = spaces.lock().unwrap(); spaces.cleanup(); - println!("{}", spaces.summary_string()); + println!("{}", spaces.stats_string(interval)); } } } diff --git a/src/dataspace.rs b/src/dataspace.rs index 04de707..2fb1c6e 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -22,16 +22,57 @@ struct ActorEndpoint { assertion: Assertion, } +#[derive(Debug)] +pub struct Churn { + pub peers_added: usize, + pub peers_removed: usize, + pub assertions_added: usize, + pub assertions_removed: usize, + pub endpoints_added: usize, + pub endpoints_removed: usize, + pub messages_sent: usize, +} + +impl Churn { + pub fn new() -> Self { + Self { + peers_added: 0, + peers_removed: 0, + assertions_added: 0, + assertions_removed: 0, + endpoints_added: 0, + endpoints_removed: 0, + messages_sent: 0, + } + } + + pub fn reset(&mut self) { + self.peers_added = 0; + self.peers_removed = 0; + self.assertions_added = 0; + self.assertions_removed = 0; + self.endpoints_added = 0; + self.endpoints_removed = 0; + self.messages_sent = 0; + } +} + #[derive(Debug)] pub struct Dataspace { name: V, peers: Map, index: skeleton::Index, + pub churn: Churn, } impl Dataspace { pub fn new(name: &V) -> Self { - Self { name: name.clone(), peers: Map::new(), index: skeleton::Index::new() } + Self { + name: name.clone(), + peers: Map::new(), + index: skeleton::Index::new(), + churn: Churn::new(), + } } pub fn new_ref(name: &V) -> DataspaceRef { @@ -44,10 +85,12 @@ impl Dataspace { tx, endpoints: Map::new(), }); + self.churn.peers_added += 1; } pub fn deregister(&mut self, id: ConnId) { let ac = self.peers.remove(&id).unwrap(); + self.churn.peers_removed += 1; let mut outbound_turns: Map> = Map::new(); for (epname, ep) in ac.endpoints { self.remove_endpoint(&mut outbound_turns, id, &epname, ep); @@ -56,18 +99,6 @@ impl Dataspace { self.deliver_outbound_turns(outbound_turns); } - pub fn peer_count(&self) -> usize { - self.peers.len() - } - - pub fn assertion_count(&self) -> usize { - self.index.assertion_count() - } - - pub fn endpoint_count(&self) -> isize { - self.index.endpoint_count() - } - fn remove_endpoint(&mut self, mut outbound_turns: &mut Map>, id: ConnId, @@ -81,9 +112,12 @@ impl Dataspace { name: epname.clone(), }); } + let old_assertions = self.index.assertion_count(); schedule_events(&mut outbound_turns, self.index.remove((&assertion).into()), |epname, cs| packets::Event::Del(epname, cs)); + self.churn.assertions_removed += old_assertions - self.index.assertion_count(); + self.churn.endpoints_removed += 1; } pub fn turn(&mut self, id: ConnId, actions: Vec) -> @@ -113,9 +147,12 @@ impl Dataspace { None }; + let old_assertions = self.index.assertion_count(); schedule_events(&mut outbound_turns, self.index.insert(assertion.into()), |epname, cs| packets::Event::Add(epname, cs)); + self.churn.assertions_added += self.index.assertion_count() - old_assertions; + self.churn.endpoints_added += 1; ac.endpoints.insert(epname.clone(), ActorEndpoint { analysis_results: ar, @@ -140,6 +177,7 @@ impl Dataspace { schedule_events(&mut outbound_turns, self.index.send(assertion.into()), |epname, cs| packets::Event::Msg(epname, cs)); + self.churn.messages_sent += 1; } } } @@ -152,6 +190,18 @@ impl Dataspace { let _ = self.peers.get_mut(&target).unwrap().tx.send(packets::S2C::Turn(events)); } } + + pub fn peer_count(&self) -> usize { + self.peers.len() + } + + pub fn assertion_count(&self) -> usize { + self.index.assertion_count() + } + + pub fn endpoint_count(&self) -> isize { + self.index.endpoint_count() + } } fn schedule_events(outbound_turns: &mut Map>, diff --git a/src/spaces.rs b/src/spaces.rs index a3baf86..d29f209 100644 --- a/src/spaces.rs +++ b/src/spaces.rs @@ -38,16 +38,21 @@ impl Spaces { .collect(); } - pub fn summary_string(&self) -> String { + pub fn stats_string(&self, delta: core::time::Duration) -> String { let mut v = vec![]; v.push(format!("{} dataspace(s)", self.index.len())); for (dsname, dsref) in &self.index { - let ds = dsref.read().unwrap(); - v.push(format!(" {:?}: {} connection(s), {} assertion(s), {} endpoint(s)", + let mut ds = dsref.write().unwrap(); + v.push(format!(" {:?}: {} connection(s) {}, {} assertion(s) {}, {} endpoint(s) {}, {} messages/sec", dsname, ds.peer_count(), + format!("(+{}/-{})", ds.churn.peers_added, ds.churn.peers_removed), ds.assertion_count(), - ds.endpoint_count())); + format!("(+{}/-{})", ds.churn.assertions_added, ds.churn.assertions_removed), + ds.endpoint_count(), + format!("(+{}/-{})", ds.churn.endpoints_added, ds.churn.endpoints_removed), + ds.churn.messages_sent as f32 / delta.as_secs() as f32)); + ds.churn.reset(); } v.join("\n") }