Count inbound and outbound messages/sec

This commit is contained in:
Tony Garnock-Jones 2020-05-18 12:08:57 +02:00
parent 1bfee29886
commit 817a4d91a7
3 changed files with 17 additions and 10 deletions

View File

@ -30,7 +30,8 @@ pub struct Churn {
pub assertions_removed: usize, pub assertions_removed: usize,
pub endpoints_added: usize, pub endpoints_added: usize,
pub endpoints_removed: usize, pub endpoints_removed: usize,
pub messages_sent: usize, pub messages_injected: usize,
pub messages_delivered: usize,
} }
impl Churn { impl Churn {
@ -42,7 +43,8 @@ impl Churn {
assertions_removed: 0, assertions_removed: 0,
endpoints_added: 0, endpoints_added: 0,
endpoints_removed: 0, endpoints_removed: 0,
messages_sent: 0, messages_injected: 0,
messages_delivered: 0,
} }
} }
@ -53,7 +55,8 @@ impl Churn {
self.assertions_removed = 0; self.assertions_removed = 0;
self.endpoints_added = 0; self.endpoints_added = 0;
self.endpoints_removed = 0; self.endpoints_removed = 0;
self.messages_sent = 0; self.messages_injected = 0;
self.messages_delivered = 0;
} }
} }
@ -175,9 +178,10 @@ impl Dataspace {
} }
packets::Action::Message(ref assertion) => { packets::Action::Message(ref assertion) => {
schedule_events(&mut outbound_turns, schedule_events(&mut outbound_turns,
self.index.send(assertion.into()), self.index.send(assertion.into(),
&mut self.churn.messages_delivered),
|epname, cs| packets::Event::Msg(epname, cs)); |epname, cs| packets::Event::Msg(epname, cs));
self.churn.messages_sent += 1; self.churn.messages_injected += 1;
} }
} }
} }

View File

@ -157,15 +157,17 @@ impl Index {
outputs outputs
} }
pub fn send(&mut self, outer_value: CachedAssertion) -> Events { pub fn send(&mut self, outer_value: CachedAssertion, delivery_count: &mut usize) -> Events {
let mut outputs = Vec::new(); let mut outputs = Vec::new();
Modification::new( Modification::new(
false, false,
&outer_value, &outer_value,
|_c, _v| (), |_c, _v| (),
|_l, _v| (), |_l, _v| (),
|es, cs| outputs.push((es.endpoints.iter().cloned().collect(), cs))) |es, cs| {
.perform(&mut self.root); *delivery_count += es.endpoints.len();
outputs.push((es.endpoints.iter().cloned().collect(), cs))
}).perform(&mut self.root);
outputs outputs
} }

View File

@ -43,7 +43,7 @@ impl Spaces {
v.push(format!("{} dataspace(s)", self.index.len())); v.push(format!("{} dataspace(s)", self.index.len()));
for (dsname, dsref) in &self.index { for (dsname, dsref) in &self.index {
let mut ds = dsref.write().unwrap(); let mut ds = dsref.write().unwrap();
v.push(format!(" {:?}: {} connection(s) {}, {} assertion(s) {}, {} endpoint(s) {}, {} messages/sec", v.push(format!(" {:?}: {} connection(s) {}, {} assertion(s) {}, {} endpoint(s) {}, msgs in {}/sec, out {}/sec",
dsname, dsname,
ds.peer_count(), ds.peer_count(),
format!("(+{}/-{})", ds.churn.peers_added, ds.churn.peers_removed), format!("(+{}/-{})", ds.churn.peers_added, ds.churn.peers_removed),
@ -51,7 +51,8 @@ impl Spaces {
format!("(+{}/-{})", ds.churn.assertions_added, ds.churn.assertions_removed), format!("(+{}/-{})", ds.churn.assertions_added, ds.churn.assertions_removed),
ds.endpoint_count(), ds.endpoint_count(),
format!("(+{}/-{})", ds.churn.endpoints_added, ds.churn.endpoints_removed), format!("(+{}/-{})", ds.churn.endpoints_added, ds.churn.endpoints_removed),
ds.churn.messages_sent as f32 / delta.as_secs() as f32)); ds.churn.messages_injected as f32 / delta.as_secs() as f32,
ds.churn.messages_delivered as f32 / delta.as_secs() as f32));
ds.churn.reset(); ds.churn.reset();
} }
v.join("\n") v.join("\n")