Gather and report various stats

This commit is contained in:
Tony Garnock-Jones 2020-05-18 11:36:44 +02:00
parent ad1782e1b6
commit 05ca94066a
3 changed files with 75 additions and 19 deletions

View File

@ -121,13 +121,14 @@ async fn run_listener(spaces: Arc<Mutex<spaces::Spaces>>, port: u16) -> UnitAsyn
} }
async fn periodic_tasks(spaces: Arc<Mutex<spaces::Spaces>>) -> UnitAsyncResult { async fn periodic_tasks(spaces: Arc<Mutex<spaces::Spaces>>) -> UnitAsyncResult {
let mut delay = tokio::time::interval(core::time::Duration::from_secs(5)); let interval = core::time::Duration::from_secs(5);
let mut delay = tokio::time::interval(interval);
loop { loop {
delay.next().await.unwrap(); delay.next().await.unwrap();
{ {
let mut spaces = spaces.lock().unwrap(); let mut spaces = spaces.lock().unwrap();
spaces.cleanup(); spaces.cleanup();
println!("{}", spaces.summary_string()); println!("{}", spaces.stats_string(interval));
} }
} }
} }

View File

@ -22,16 +22,57 @@ struct ActorEndpoint {
assertion: Assertion, assertion: Assertion,
} }
#[derive(Debug)]
pub struct Churn {
pub peers_added: usize,
pub peers_removed: usize,
pub assertions_added: usize,
pub assertions_removed: usize,
pub endpoints_added: usize,
pub endpoints_removed: usize,
pub messages_sent: usize,
}
impl Churn {
pub fn new() -> Self {
Self {
peers_added: 0,
peers_removed: 0,
assertions_added: 0,
assertions_removed: 0,
endpoints_added: 0,
endpoints_removed: 0,
messages_sent: 0,
}
}
pub fn reset(&mut self) {
self.peers_added = 0;
self.peers_removed = 0;
self.assertions_added = 0;
self.assertions_removed = 0;
self.endpoints_added = 0;
self.endpoints_removed = 0;
self.messages_sent = 0;
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct Dataspace { pub struct Dataspace {
name: V, name: V,
peers: Map<ConnId, Actor>, peers: Map<ConnId, Actor>,
index: skeleton::Index, index: skeleton::Index,
pub churn: Churn,
} }
impl Dataspace { impl Dataspace {
pub fn new(name: &V) -> Self { pub fn new(name: &V) -> Self {
Self { name: name.clone(), peers: Map::new(), index: skeleton::Index::new() } Self {
name: name.clone(),
peers: Map::new(),
index: skeleton::Index::new(),
churn: Churn::new(),
}
} }
pub fn new_ref(name: &V) -> DataspaceRef { pub fn new_ref(name: &V) -> DataspaceRef {
@ -44,10 +85,12 @@ impl Dataspace {
tx, tx,
endpoints: Map::new(), endpoints: Map::new(),
}); });
self.churn.peers_added += 1;
} }
pub fn deregister(&mut self, id: ConnId) { pub fn deregister(&mut self, id: ConnId) {
let ac = self.peers.remove(&id).unwrap(); let ac = self.peers.remove(&id).unwrap();
self.churn.peers_removed += 1;
let mut outbound_turns: Map<ConnId, Vec<packets::Event>> = Map::new(); let mut outbound_turns: Map<ConnId, Vec<packets::Event>> = Map::new();
for (epname, ep) in ac.endpoints { for (epname, ep) in ac.endpoints {
self.remove_endpoint(&mut outbound_turns, id, &epname, ep); self.remove_endpoint(&mut outbound_turns, id, &epname, ep);
@ -56,18 +99,6 @@ impl Dataspace {
self.deliver_outbound_turns(outbound_turns); self.deliver_outbound_turns(outbound_turns);
} }
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub fn assertion_count(&self) -> usize {
self.index.assertion_count()
}
pub fn endpoint_count(&self) -> isize {
self.index.endpoint_count()
}
fn remove_endpoint(&mut self, fn remove_endpoint(&mut self,
mut outbound_turns: &mut Map<ConnId, Vec<packets::Event>>, mut outbound_turns: &mut Map<ConnId, Vec<packets::Event>>,
id: ConnId, id: ConnId,
@ -81,9 +112,12 @@ impl Dataspace {
name: epname.clone(), name: epname.clone(),
}); });
} }
let old_assertions = self.index.assertion_count();
schedule_events(&mut outbound_turns, schedule_events(&mut outbound_turns,
self.index.remove((&assertion).into()), self.index.remove((&assertion).into()),
|epname, cs| packets::Event::Del(epname, cs)); |epname, cs| packets::Event::Del(epname, cs));
self.churn.assertions_removed += old_assertions - self.index.assertion_count();
self.churn.endpoints_removed += 1;
} }
pub fn turn(&mut self, id: ConnId, actions: Vec<packets::Action>) -> pub fn turn(&mut self, id: ConnId, actions: Vec<packets::Action>) ->
@ -113,9 +147,12 @@ impl Dataspace {
None None
}; };
let old_assertions = self.index.assertion_count();
schedule_events(&mut outbound_turns, schedule_events(&mut outbound_turns,
self.index.insert(assertion.into()), self.index.insert(assertion.into()),
|epname, cs| packets::Event::Add(epname, cs)); |epname, cs| packets::Event::Add(epname, cs));
self.churn.assertions_added += self.index.assertion_count() - old_assertions;
self.churn.endpoints_added += 1;
ac.endpoints.insert(epname.clone(), ActorEndpoint { ac.endpoints.insert(epname.clone(), ActorEndpoint {
analysis_results: ar, analysis_results: ar,
@ -140,6 +177,7 @@ impl Dataspace {
schedule_events(&mut outbound_turns, schedule_events(&mut outbound_turns,
self.index.send(assertion.into()), self.index.send(assertion.into()),
|epname, cs| packets::Event::Msg(epname, cs)); |epname, cs| packets::Event::Msg(epname, cs));
self.churn.messages_sent += 1;
} }
} }
} }
@ -152,6 +190,18 @@ impl Dataspace {
let _ = self.peers.get_mut(&target).unwrap().tx.send(packets::S2C::Turn(events)); let _ = self.peers.get_mut(&target).unwrap().tx.send(packets::S2C::Turn(events));
} }
} }
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub fn assertion_count(&self) -> usize {
self.index.assertion_count()
}
pub fn endpoint_count(&self) -> isize {
self.index.endpoint_count()
}
} }
fn schedule_events<C>(outbound_turns: &mut Map<ConnId, Vec<packets::Event>>, fn schedule_events<C>(outbound_turns: &mut Map<ConnId, Vec<packets::Event>>,

View File

@ -38,16 +38,21 @@ impl Spaces {
.collect(); .collect();
} }
pub fn summary_string(&self) -> String { pub fn stats_string(&self, delta: core::time::Duration) -> String {
let mut v = vec![]; let mut v = vec![];
v.push(format!("{} dataspace(s)", self.index.len())); v.push(format!("{} dataspace(s)", self.index.len()));
for (dsname, dsref) in &self.index { for (dsname, dsref) in &self.index {
let ds = dsref.read().unwrap(); let mut ds = dsref.write().unwrap();
v.push(format!(" {:?}: {} connection(s), {} assertion(s), {} endpoint(s)", v.push(format!(" {:?}: {} connection(s) {}, {} assertion(s) {}, {} endpoint(s) {}, {} messages/sec",
dsname, dsname,
ds.peer_count(), ds.peer_count(),
format!("(+{}/-{})", ds.churn.peers_added, ds.churn.peers_removed),
ds.assertion_count(), ds.assertion_count(),
ds.endpoint_count())); format!("(+{}/-{})", ds.churn.assertions_added, ds.churn.assertions_removed),
ds.endpoint_count(),
format!("(+{}/-{})", ds.churn.endpoints_added, ds.churn.endpoints_removed),
ds.churn.messages_sent as f32 / delta.as_secs() as f32));
ds.churn.reset();
} }
v.join("\n") v.join("\n")
} }