From 31ca160c7e531f9b6c807c76ab137cab29642657 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 18 May 2020 13:16:14 +0200 Subject: [PATCH] Overload protection --- src/bin/syndicate-server.rs | 41 +++++++++------------- src/config.rs | 19 ++++++++++ src/dataspace.rs | 13 +++++-- src/lib.rs | 1 + src/peer.rs | 70 +++++++++++++++++++++++++++++++------ 5 files changed, 107 insertions(+), 37 deletions(-) create mode 100644 src/config.rs diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index 7fffac8..a568ae3 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -1,4 +1,4 @@ -use syndicate::{spaces, packets, ConnId, V, Syndicate}; +use syndicate::{config, spaces, packets, ConnId, V, Syndicate}; use syndicate::peer::{Peer, ResultC2S}; use preserves::value; @@ -11,18 +11,7 @@ use tokio_util::codec::Framed; use tungstenite::Message; -use structopt::StructOpt; - -#[derive(Clone, StructOpt)] -struct Cli { - #[structopt(short = "p", long = "port", default_value = "8001")] - ports: Vec, - - #[structopt(long)] - recv_buffer_size: Option, - #[structopt(long)] - send_buffer_size: Option, -} +use structopt::StructOpt; // for from_args in main type UnitAsyncResult = Result<(), std::io::Error>; @@ -73,7 +62,8 @@ fn message_decoder(codec: &value::Codec) async fn run_connection(connid: ConnId, mut stream: TcpStream, - spaces: Arc>) -> + spaces: Arc>, + config: config::ServerConfigRef) -> UnitAsyncResult { let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect @@ -87,13 +77,13 @@ async fn run_connection(connid: ConnId, let i = i.map(message_decoder(&codec)); let o = o.sink_map_err(translate_sink_err).with(message_encoder(&codec)); let mut p = Peer::new(connid, i, o); - p.run(spaces).await? + p.run(spaces, &config).await? }, _ => { println!("First byte: {:?}", buf); let (o, i) = Framed::new(stream, packets::Codec::standard()).split(); let mut p = Peer::new(connid, i, o); - p.run(spaces).await? + p.run(spaces, &config).await? } } 0 => return Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, @@ -103,7 +93,9 @@ async fn run_connection(connid: ConnId, Ok(()) } -async fn run_listener(spaces: Arc>, port: u16, args: Cli) -> UnitAsyncResult { +async fn run_listener(spaces: Arc>, port: u16, config: config::ServerConfigRef) -> + UnitAsyncResult +{ let mut listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?; println!("Listening on port {}", port); let mut id = port as u64 + 100000000000000; @@ -111,12 +103,13 @@ async fn run_listener(spaces: Arc>, port: u16, args: Cli) let (stream, addr) = listener.accept().await?; let connid = id; let spaces = Arc::clone(&spaces); + let config = Arc::clone(&config); id += 100000; - if let Some(n) = args.recv_buffer_size { stream.set_recv_buffer_size(n)?; } - if let Some(n) = args.send_buffer_size { stream.set_send_buffer_size(n)?; } + if let Some(n) = config.recv_buffer_size { stream.set_recv_buffer_size(n)?; } + if let Some(n) = config.send_buffer_size { stream.set_send_buffer_size(n)?; } tokio::spawn(async move { println!("Connection {} ({:?}) accepted from port {}", connid, addr, port); - match run_connection(connid, stream, spaces).await { + match run_connection(connid, stream, spaces, config).await { Ok(()) => println!("Connection {} ({:?}) terminated normally", connid, addr), Err(e) => println!("Connection {} ({:?}) terminated: {}", connid, addr, e), } @@ -139,7 +132,7 @@ async fn periodic_tasks(spaces: Arc>) -> UnitAsyncResult { #[tokio::main] async fn main() -> Result<(), Box> { - let args = Cli::from_args(); + let config = Arc::new(config::ServerConfig::from_args()); let spaces = Arc::new(Mutex::new(spaces::Spaces::new())); let mut daemons = Vec::new(); @@ -151,11 +144,11 @@ async fn main() -> Result<(), Box> { }); } - for port in args.ports.clone() { + for port in config.ports.clone() { let spaces = Arc::clone(&spaces); - let args = args.clone(); + let config = Arc::clone(&config); daemons.push(tokio::spawn(async move { - match run_listener(spaces, port, args).await { + match run_listener(spaces, port, config).await { Ok(()) => (), Err(e) => { eprintln!("Error from listener for port {}: {}", port, e); diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..192488b --- /dev/null +++ b/src/config.rs @@ -0,0 +1,19 @@ +use structopt::StructOpt; + +#[derive(Clone, StructOpt)] +pub struct ServerConfig { + #[structopt(short = "p", long = "port", default_value = "8001")] + pub ports: Vec, + + #[structopt(long)] + pub recv_buffer_size: Option, + #[structopt(long)] + pub send_buffer_size: Option, + + #[structopt(long, default_value = "10000")] + pub overload_threshold: usize, + #[structopt(long, default_value = "5")] + pub overload_turn_limit: usize, +} + +pub type ServerConfigRef = std::sync::Arc; diff --git a/src/dataspace.rs b/src/dataspace.rs index 86e9dc6..3d29060 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -4,7 +4,7 @@ use super::packets::{self, Assertion, EndpointName, Captures}; use super::skeleton; use preserves::value::{self, Map, NestedValue}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, RwLock, atomic::{AtomicUsize, Ordering}}; use tokio::sync::mpsc::UnboundedSender; pub type DataspaceRef = Arc>; @@ -13,6 +13,7 @@ pub type DataspaceError = (String, V); #[derive(Debug)] struct Actor { tx: UnboundedSender, + queue_depth: Arc, endpoints: Map, } @@ -82,10 +83,14 @@ impl Dataspace { Arc::new(RwLock::new(Self::new(name))) } - pub fn register(&mut self, id: ConnId, tx: UnboundedSender) { + pub fn register(&mut self, id: ConnId, + tx: UnboundedSender, + queue_depth: Arc) + { assert!(!self.peers.contains_key(&id)); self.peers.insert(id, Actor { tx, + queue_depth, endpoints: Map::new(), }); self.churn.peers_added += 1; @@ -191,7 +196,9 @@ impl Dataspace { fn deliver_outbound_turns(&mut self, outbound_turns: Map>) { for (target, events) in outbound_turns { - let _ = self.peers.get_mut(&target).unwrap().tx.send(packets::S2C::Turn(events)); + let actor = self.peers.get_mut(&target).unwrap(); + let _ = actor.tx.send(packets::S2C::Turn(events)); + actor.queue_depth.fetch_add(1, Ordering::Relaxed); } } diff --git a/src/lib.rs b/src/lib.rs index 3f8aa48..939e29e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ #![recursion_limit="512"] pub mod bag; +pub mod config; pub mod dataspace; pub mod packets; pub mod peer; diff --git a/src/peer.rs b/src/peer.rs index d3e163b..463dff9 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -3,6 +3,7 @@ use super::ConnId; use super::dataspace; use super::packets; use super::spaces; +use super::config; use core::time::Duration; use futures::{Sink, SinkExt, Stream}; @@ -10,9 +11,9 @@ use futures::FutureExt; use futures::select; use preserves::value; use std::pin::Pin; -use std::sync::{Mutex, Arc}; +use std::sync::{Mutex, Arc, atomic::{AtomicUsize, Ordering}}; use tokio::stream::StreamExt; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver, error::TryRecvError}; use tokio::time::interval; pub type ResultC2S = Result; @@ -42,7 +43,9 @@ where I: Stream + Send, Peer{ id, tx, rx, i: Box::pin(i), o: Box::pin(o), space: None } } - pub async fn run(&mut self, spaces: Arc>) -> Result<(), std::io::Error> { + pub async fn run(&mut self, spaces: Arc>, config: &config::ServerConfig) -> + Result<(), std::io::Error> + { let firstpacket = self.i.next().await; let dsname = if let Some(Ok(packets::C2S::Connect(dsname))) = firstpacket { dsname @@ -53,13 +56,43 @@ where I: Stream + Send, }; self.space = Some(spaces.lock().unwrap().lookup(&dsname)); - self.space.as_ref().unwrap().write().unwrap().register(self.id, self.tx.clone()); + let queue_depth = Arc::new(AtomicUsize::new(0)); + self.space.as_ref().unwrap().write().unwrap().register( + self.id, + self.tx.clone(), + Arc::clone(&queue_depth)); let mut ping_timer = interval(Duration::from_secs(60)); let mut running = true; + let mut overloaded = None; + let mut previous_sample = None; while running { let mut to_send = Vec::new(); + + let queue_depth_sample = queue_depth.load(Ordering::Relaxed); + if queue_depth_sample > config.overload_threshold { + let n = overloaded.unwrap_or(0); + println!("{:?} overloaded({}): {:?}", self.id, n, queue_depth_sample); + if n == config.overload_turn_limit { + to_send.push(err("Overloaded", + value::Value::from(queue_depth_sample as u64).wrap())); + running = false; + } else { + if queue_depth_sample > previous_sample.unwrap_or(0) { + overloaded = Some(n + 1) + } else { + overloaded = Some(0) + } + } + } else { + if let Some(_) = overloaded { + println!("{:?} recovered: {:?}", self.id, queue_depth_sample); + } + overloaded = None; + } + previous_sample = Some(queue_depth_sample); + select! { _instant = ping_timer.next().boxed().fuse() => to_send.push(packets::S2C::Ping()), frame = self.i.next().fuse() => match frame { @@ -102,14 +135,31 @@ where I: Stream + Send, None => running = false, }, msgopt = self.rx.recv().boxed().fuse() => { + let mut ok = true; match msgopt { - Some(msg) => to_send.push(msg), - None => { - /* weird. */ - to_send.push(err("Outbound channel closed unexpectedly", - value::Value::from(false).wrap())); - running = false; + Some(msg) => { + to_send.push(msg); + loop { + match self.rx.try_recv() { + Ok(m) => to_send.push(m), + Err(TryRecvError::Empty) => { + queue_depth.store(0, Ordering::Relaxed); + break; + } + Err(TryRecvError::Closed) => { + ok = false; + break; + } + } + } } + None => ok = false, + } + if !ok { + /* weird. */ + to_send.push(err("Outbound channel closed unexpectedly", + value::Value::from(false).wrap())); + running = false; } }, }