From ef5b53d52b91650727913e677862d46bbea3ba88 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 10 Aug 2021 07:48:09 -0400 Subject: [PATCH] Remove old peer code --- src/peer.OLD.rs | 130 ------------------------------------------------ 1 file changed, 130 deletions(-) delete mode 100644 src/peer.OLD.rs diff --git a/src/peer.OLD.rs b/src/peer.OLD.rs deleted file mode 100644 index 9191215..0000000 --- a/src/peer.OLD.rs +++ /dev/null @@ -1,130 +0,0 @@ -use futures::FutureExt; -use futures::StreamExt; -use futures::select; -use futures::{Sink, SinkExt, Stream}; - -use preserves::value; - -use std::pin::Pin; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use super::actor::*; -use super::config; -use super::error::Error; -use super::error::error; -use super::packets; - -pub struct Peer -where I: Stream> + Send, - O: Sink, -{ - i: Pin>, - o: Pin>, - ds: Arc, - config: Arc, -} - -impl Peer -where I: Stream> + Send, - O: Sink, -{ - pub fn new(i: I, o: O, ds: Arc, config: Arc) -> Self { - Peer{ i: Box::pin(i), o: Box::pin(o), ds, config } - } - - pub async fn run(mut self) -> Result<(), packets::Error> { - let queue_depth = Arc::new(AtomicUsize::new(0)); - - let mut running = true; - let mut overloaded = None; - let mut previous_sample = None; - while running { - let mut to_send = Vec::new(); - - let queue_depth_sample = queue_depth.load(Ordering::Relaxed); - if queue_depth_sample > self.config.overload_threshold { - let n = overloaded.unwrap_or(0); - tracing::warn!(turns=n, queue_depth=queue_depth_sample, "overloaded"); - if n == self.config.overload_turn_limit { - to_send.push(error("Overloaded", queue_depth_sample as u128)); - running = false; - } else { - if queue_depth_sample > previous_sample.unwrap_or(0) { - overloaded = Some(n + 1) - } else { - overloaded = Some(0) - } - } - } else { - if let Some(_) = overloaded { - tracing::info!(queue_depth=queue_depth_sample, "recovered"); - } - overloaded = None; - } - previous_sample = Some(queue_depth_sample); - - select! { - frame = self.i.next().fuse() => match frame { - Some(res) => match res { - Ok(p) => { - tracing::trace!(packet = debug(&p), "input"); - match p { - packets::Packet::Turn(b) => { - let packets::Turn(actions) = &*b; - /* ... */ - } - packets::Packet::Error(b) => { - let e = &*b; - /* ... */ - } - } - } - Err(e) => return Err(e), - } - None => { - tracing::trace!("remote has closed"); - running = false; - } - }, - msgopt = self.rx.recv().boxed().fuse() => { - let mut ok = true; - match msgopt { - Some(msg) => { - to_send.push(msg); - loop { - match self.rx.try_recv() { - Ok(m) => to_send.push(m), - Err(TryRecvError::Empty) => { - queue_depth.store(0, Ordering::Relaxed); - break; - } - Err(TryRecvError::Closed) => { - ok = false; - break; - } - } - } - } - None => ok = false, - } - if !ok { - /* weird. */ - to_send.push(error("Outbound channel closed unexpectedly", value::FALSE.clone())); - running = false; - } - }, - } - for v in to_send { - if let packets::S2C::Err(ref msg, ref ctx) = v { - tracing::error!(context = debug(ctx), msg = display(msg), "error"); - } else { - tracing::trace!(packet = debug(&v), "output"); - } - self.o.send(v).await?; - } - tokio::task::yield_now().await; - } - Ok(()) - } -}