Stats and dataspace cleanup

This commit is contained in:
Tony Garnock-Jones 2020-05-18 10:44:57 +02:00
parent 23003aad56
commit ad1782e1b6
5 changed files with 70 additions and 1 deletions

View File

@ -16,6 +16,7 @@ pub enum Net {
// Allows negative counts - a "delta"
pub struct BTreeBag<V: std::cmp::Ord> {
counts: BTreeMap<V, Count>,
total: isize,
}
impl<V: std::cmp::Ord> std::default::Default for BTreeBag<V> {
@ -26,7 +27,7 @@ impl<V: std::cmp::Ord> std::default::Default for BTreeBag<V> {
impl<V: std::cmp::Ord> BTreeBag<V> {
pub fn new() -> BTreeBag<V> {
BTreeBag { counts: BTreeMap::new() }
BTreeBag { counts: BTreeMap::new(), total: 0 }
}
pub fn change(&mut self, key: V, delta: Count) -> Net { self._change(key, delta, false) }
@ -36,6 +37,7 @@ impl<V: std::cmp::Ord> BTreeBag<V> {
let old_count = self[&key];
let mut new_count = old_count + delta;
if clamp { new_count = new_count.max(0) }
self.total = self.total + (new_count - old_count) as isize;
if new_count == 0 {
self.counts.remove(&key);
if old_count == 0 { Net::AbsentToAbsent } else { Net::PresentToAbsent }
@ -47,6 +49,7 @@ impl<V: std::cmp::Ord> BTreeBag<V> {
pub fn clear(&mut self) {
self.counts.clear();
self.total = 0;
}
pub fn contains_key(&self, key: &V) -> bool {
@ -61,6 +64,10 @@ impl<V: std::cmp::Ord> BTreeBag<V> {
self.counts.len()
}
pub fn total(&self) -> isize {
self.total
}
pub fn keys(&self) -> Keys<V, Count> {
self.counts.keys()
}

View File

@ -120,6 +120,18 @@ async fn run_listener(spaces: Arc<Mutex<spaces::Spaces>>, port: u16) -> UnitAsyn
}
}
async fn periodic_tasks(spaces: Arc<Mutex<spaces::Spaces>>) -> UnitAsyncResult {
let mut delay = tokio::time::interval(core::time::Duration::from_secs(5));
loop {
delay.next().await.unwrap();
{
let mut spaces = spaces.lock().unwrap();
spaces.cleanup();
println!("{}", spaces.summary_string());
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Cli::from_args();
@ -127,6 +139,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spaces = Arc::new(Mutex::new(spaces::Spaces::new()));
let mut daemons = Vec::new();
{
let spaces = Arc::clone(&spaces);
tokio::spawn(async move {
periodic_tasks(spaces).await
});
}
for port in args.ports {
let spaces = Arc::clone(&spaces);
daemons.push(tokio::spawn(async move {

View File

@ -56,6 +56,18 @@ impl Dataspace {
self.deliver_outbound_turns(outbound_turns);
}
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub fn assertion_count(&self) -> usize {
self.index.assertion_count()
}
pub fn endpoint_count(&self) -> isize {
self.index.endpoint_count()
}
fn remove_endpoint(&mut self,
mut outbound_turns: &mut Map<ConnId, Vec<packets::Event>>,
id: ConnId,

View File

@ -168,6 +168,14 @@ impl Index {
.perform(&mut self.root);
outputs
}
pub fn assertion_count(&self) -> usize {
return self.all_assertions.len()
}
pub fn endpoint_count(&self) -> isize {
return self.all_assertions.total()
}
}
#[derive(Debug)]

View File

@ -1,6 +1,8 @@
use super::V;
use super::dataspace;
use std::sync::Arc;
use preserves::value::Map;
pub struct Spaces {
@ -28,4 +30,25 @@ impl Spaces {
space
}
pub fn cleanup(&mut self) {
self.index = self.index.iter()
.filter(|s| s.1.read().unwrap().peer_count() > 0)
.map(|(k,v)| (k.clone(), Arc::clone(v)))
.collect();
}
pub fn summary_string(&self) -> String {
let mut v = vec![];
v.push(format!("{} dataspace(s)", self.index.len()));
for (dsname, dsref) in &self.index {
let ds = dsref.read().unwrap();
v.push(format!(" {:?}: {} connection(s), {} assertion(s), {} endpoint(s)",
dsname,
ds.peer_count(),
ds.assertion_count(),
ds.endpoint_count()));
}
v.join("\n")
}
}