Release independent packages

syndicate-server@0.43.1

Generated by cargo-workspaces
This commit is contained in:
Tony Garnock-Jones 2024-04-01 15:08:11 +02:00
parent a56aec2c30
commit d3748a286b
3 changed files with 21 additions and 21 deletions

2
Cargo.lock generated
View File

@ -1897,7 +1897,7 @@ dependencies = [
[[package]] [[package]]
name = "syndicate-server" name = "syndicate-server"
version = "0.43.0" version = "0.43.1"
dependencies = [ dependencies = [
"chrono", "chrono",
"futures", "futures",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "syndicate-server" name = "syndicate-server"
version = "0.43.0" version = "0.43.1"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"] authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018" edition = "2018"

View File

@ -32,13 +32,9 @@ pub fn empty_response(code: StatusCode) -> Response<Body> {
type ChunkItem = Result<body::Bytes, Box<dyn std::error::Error + Send + Sync>>; type ChunkItem = Result<body::Bytes, Box<dyn std::error::Error + Send + Sync>>;
enum ResponseCollector { struct ResponseCollector {
Pending { tx_res: Option<(oneshot::Sender<Response<Body>>, Response<Body>)>,
tx: oneshot::Sender<Response<Body>>, body_tx: Option<UnboundedSender<ChunkItem>>,
body_tx: UnboundedSender<ChunkItem>,
res: Response<Body>,
},
Done
} }
impl ResponseCollector { impl ResponseCollector {
@ -46,37 +42,41 @@ impl ResponseCollector {
let (body_tx, body_rx) = unbounded_channel(); let (body_tx, body_rx) = unbounded_channel();
let body_stream: Box<dyn futures::Stream<Item = ChunkItem> + Send> = let body_stream: Box<dyn futures::Stream<Item = ChunkItem> + Send> =
Box::new(UnboundedReceiverStream::new(body_rx)); Box::new(UnboundedReceiverStream::new(body_rx));
ResponseCollector::Pending { ResponseCollector {
tx, tx_res: Some((tx, Response::new(body_stream.into()))),
body_tx, body_tx: Some(body_tx),
res: Response::new(body_stream.into()),
} }
} }
fn with_res<F: FnOnce(&mut Response<Body>) -> ActorResult>(&mut self, f: F) -> ActorResult { fn with_res<F: FnOnce(&mut Response<Body>) -> ActorResult>(&mut self, f: F) -> ActorResult {
if let ResponseCollector::Pending { res, .. } = self { if let ResponseCollector { tx_res: Some((_, res)), .. } = self {
f(res)?; f(res)?;
} }
Ok(()) Ok(())
} }
fn deliver_res(&mut self) {
if let Some((tx, res)) = std::mem::replace(&mut self.tx_res, None) {
let _ = tx.send(res);
}
}
fn add_chunk(&mut self, value: http::Chunk) -> ActorResult { fn add_chunk(&mut self, value: http::Chunk) -> ActorResult {
if let ResponseCollector::Pending { body_tx, .. } = self { self.deliver_res();
if let Some(body_tx) = self.body_tx.as_mut() {
body_tx.send(Ok(match value { body_tx.send(Ok(match value {
http::Chunk::Bytes(bs) => bs.into(), http::Chunk::Bytes(bs) => bs.into(),
http::Chunk::String(s) => s.as_bytes().to_vec().into(), http::Chunk::String(s) => s.as_bytes().to_vec().into(),
}))?; }))?;
} }
Ok(()) Ok(())
} }
fn finish(&mut self) -> ActorResult { fn finish(&mut self) -> ActorResult {
match std::mem::replace(self, ResponseCollector::Done) { self.deliver_res();
ResponseCollector::Pending { tx, res, .. } => { self.body_tx = None;
let _ = tx.send(res);
}
ResponseCollector::Done => (),
}
Ok(()) Ok(())
} }
} }