From d3748a286ba08fba53315cfbd96d778d4144c7f1 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 1 Apr 2024 15:08:11 +0200 Subject: [PATCH] Release independent packages syndicate-server@0.43.1 Generated by cargo-workspaces --- Cargo.lock | 2 +- syndicate-server/Cargo.toml | 2 +- syndicate-server/src/http.rs | 38 ++++++++++++++++++------------------ 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce807c0..a37143c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1897,7 +1897,7 @@ dependencies = [ [[package]] name = "syndicate-server" -version = "0.43.0" +version = "0.43.1" dependencies = [ "chrono", "futures", diff --git a/syndicate-server/Cargo.toml b/syndicate-server/Cargo.toml index 9a8fef3..e06042d 100644 --- a/syndicate-server/Cargo.toml +++ b/syndicate-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "syndicate-server" -version = "0.43.0" +version = "0.43.1" authors = ["Tony Garnock-Jones "] edition = "2018" diff --git a/syndicate-server/src/http.rs b/syndicate-server/src/http.rs index ed052d0..0d48c93 100644 --- a/syndicate-server/src/http.rs +++ b/syndicate-server/src/http.rs @@ -32,13 +32,9 @@ pub fn empty_response(code: StatusCode) -> Response { type ChunkItem = Result>; -enum ResponseCollector { - Pending { - tx: oneshot::Sender>, - body_tx: UnboundedSender, - res: Response, - }, - Done +struct ResponseCollector { + tx_res: Option<(oneshot::Sender>, Response)>, + body_tx: Option>, } impl ResponseCollector { @@ -46,37 +42,41 @@ impl ResponseCollector { let (body_tx, body_rx) = unbounded_channel(); let body_stream: Box + Send> = Box::new(UnboundedReceiverStream::new(body_rx)); - ResponseCollector::Pending { - tx, - body_tx, - res: Response::new(body_stream.into()), + ResponseCollector { + tx_res: Some((tx, Response::new(body_stream.into()))), + body_tx: Some(body_tx), } } fn with_res) -> ActorResult>(&mut self, f: F) -> ActorResult { - if let ResponseCollector::Pending { res, .. } = self { + if let ResponseCollector { tx_res: Some((_, res)), .. } = self { f(res)?; } Ok(()) } + fn deliver_res(&mut self) { + if let Some((tx, res)) = std::mem::replace(&mut self.tx_res, None) { + let _ = tx.send(res); + } + } + fn add_chunk(&mut self, value: http::Chunk) -> ActorResult { - if let ResponseCollector::Pending { body_tx, .. } = self { + self.deliver_res(); + + if let Some(body_tx) = self.body_tx.as_mut() { body_tx.send(Ok(match value { http::Chunk::Bytes(bs) => bs.into(), http::Chunk::String(s) => s.as_bytes().to_vec().into(), }))?; } + Ok(()) } fn finish(&mut self) -> ActorResult { - match std::mem::replace(self, ResponseCollector::Done) { - ResponseCollector::Pending { tx, res, .. } => { - let _ = tx.send(res); - } - ResponseCollector::Done => (), - } + self.deliver_res(); + self.body_tx = None; Ok(()) } }