diff --git a/syndicate-server/src/http.rs b/syndicate-server/src/http.rs index 0d48c93..5386ffe 100644 --- a/syndicate-server/src/http.rs +++ b/syndicate-server/src/http.rs @@ -10,6 +10,7 @@ use hyper::header::HeaderValue; use syndicate::actor::*; use syndicate::error::Error; +use syndicate::relay::Mutex; use syndicate::trace; use syndicate::value::Map; use syndicate::value::NestedValue; @@ -33,17 +34,23 @@ pub fn empty_response(code: StatusCode) -> Response { type ChunkItem = Result>; struct ResponseCollector { + framing_handle: Option, + context_handle: Arc>>, tx_res: Option<(oneshot::Sender>, Response)>, body_tx: Option>, } impl ResponseCollector { - fn new(tx: oneshot::Sender>) -> Self { + fn new(tx: oneshot::Sender>, context_handle: Arc>>) -> Self { let (body_tx, body_rx) = unbounded_channel(); let body_stream: Box + Send> = Box::new(UnboundedReceiverStream::new(body_rx)); + let mut res = Response::new(body_stream.into()); + *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; ResponseCollector { - tx_res: Some((tx, Response::new(body_stream.into()))), + framing_handle: None, + context_handle, + tx_res: Some((tx, res)), body_tx: Some(body_tx), } } @@ -74,15 +81,42 @@ impl ResponseCollector { Ok(()) } - fn finish(&mut self) -> ActorResult { + fn finish(&mut self, t: &mut Activation) -> ActorResult { self.deliver_res(); self.body_tx = None; + if let Some(h) = self.context_handle.lock().take() { + t.retract(h); + } Ok(()) } } impl Entity for ResponseCollector { - fn message(&mut self, _turn: &mut Activation, message: http::HttpResponse) -> ActorResult { + fn assert(&mut self, _t: &mut Activation, assertion: http::HttpResponse, handle: Handle) -> ActorResult { + match assertion { + http::HttpResponse::Processing => { + self.framing_handle = Some(handle); + self.with_res(|r| { + *r.status_mut() = StatusCode::OK; + Ok(()) + }) + } + _ => Err(format!("Unexpected assertion {:?}", assertion))?, + } + } + + fn retract(&mut self, t: &mut Activation, handle: Handle) -> ActorResult { + if self.framing_handle == Some(handle) { + self.finish(t)?; + } + Ok(()) + } + + fn message(&mut self, t: &mut Activation, message: http::HttpResponse) -> ActorResult { + if self.framing_handle.is_none() { + self.finish(t)?; + Err("Attempt to reply before has been asserted")?; + } match message { http::HttpResponse::Status { code, .. } => self.with_res(|r| { *r.status_mut() = StatusCode::from_u16( @@ -94,11 +128,8 @@ impl Entity for ResponseCollector { HeaderValue::from_str(value.as_str())?); Ok(()) }), - http::HttpResponse::Chunk { chunk } => self.add_chunk(*chunk), - http::HttpResponse::Done { chunk } => { - self.add_chunk(*chunk)?; - self.finish() - } + http::HttpResponse::Body { chunk } => self.add_chunk(*chunk), + _ => Err(format!("Unexpected message {:?}", message))?, } } } @@ -160,7 +191,6 @@ pub async fn serve( let account = Account::new(Some(AnyValue::symbol("http")), trace_collector); let (tx, rx) = oneshot::channel(); - let mut req_handle: Option = None; facet.activate(&account, Some(trace::TurnCause::external("http")), |t| { let sreq = http::HttpRequest { @@ -174,20 +204,17 @@ pub async fn serve( body, }; tracing::debug!(?sreq); - let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx))); - req_handle = httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep }); + let context_handle: Arc>> = Arc::new(Mutex::new(None)); + let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new( + tx, + Arc::clone(&context_handle)))); + *(context_handle.lock()) = httpd.assert( + t, language(), &http::HttpContext { req: sreq, res: srep }); Ok(()) }); let response_result = rx.await; - facet.activate(&account, Some(trace::TurnCause::external("http")), |t| { - if let Some(h) = req_handle { - t.retract(h); - } - Ok(()) - }); - match response_result { Ok(response) => Ok(response), Err(_ /* sender dropped */) => Ok(empty_response(StatusCode::INTERNAL_SERVER_ERROR)), diff --git a/syndicate-server/src/services/http_router.rs b/syndicate-server/src/services/http_router.rs index 15ca938..2e98e7d 100644 --- a/syndicate-server/src/services/http_router.rs +++ b/syndicate-server/src/services/http_router.rs @@ -141,12 +141,12 @@ fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { http::MethodPattern::Specific(m) => m.to_uppercase(), http::MethodPattern::Any => unreachable!(), }).collect::>().join(", "); + let h = res.assert(t, language(), &http::HttpResponse::Processing); res.message(t, language(), &http::HttpResponse::Status { code: 405.into(), message: "Method Not Allowed".into() }); res.message(t, language(), &http::HttpResponse::Header { name: "allow".into(), value: allowed }); - res.message(t, language(), &http::HttpResponse::Done { - chunk: Box::new(http::Chunk::Bytes(vec![])) }); + if let Some(h) = h { t.retract(h); } return Ok(()) } } @@ -166,10 +166,10 @@ fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { } fn send_empty(t: &mut Activation, res: &Arc, code: u16, message: &str) -> ActorResult { + let h = res.assert(t, language(), &http::HttpResponse::Processing); res.message(t, language(), &http::HttpResponse::Status { code: code.into(), message: message.into() }); - res.message(t, language(), &http::HttpResponse::Done { - chunk: Box::new(http::Chunk::Bytes(vec![])) }); + if let Some(h) = h { t.retract(h); } return Ok(()) } @@ -268,12 +268,12 @@ impl HttpStaticFileServer { Ok(mut fh) => { if fh.metadata().is_ok_and(|m| m.is_dir()) { drop(fh); + let h = res.assert(t, language(), &http::HttpResponse::Processing); res.message(t, language(), &http::HttpResponse::Status { code: 301.into(), message: "Moved permanently".into() }); res.message(t, language(), &http::HttpResponse::Header { name: "location".into(), value: format!("/{}/", req.path.join("/")) }); - res.message(t, language(), &http::HttpResponse::Done { - chunk: Box::new(http::Chunk::Bytes(vec![])) }); + if let Some(h) = h { t.retract(h); } return Ok(()) } else { let mut buf = Vec::new(); @@ -287,14 +287,17 @@ impl HttpStaticFileServer { } }; + let h = res.assert(t, language(), &http::HttpResponse::Processing); res.message(t, language(), &http::HttpResponse::Status { code: 200.into(), message: "OK".into() }); if let Some(mime_type) = mime_type { res.message(t, language(), &http::HttpResponse::Header { name: "content-type".into(), value: mime_type.to_owned() }); } - res.message(t, language(), &http::HttpResponse::Done { + res.message(t, language(), &http::HttpResponse::Body { chunk: Box::new(http::Chunk::Bytes(body)) }); + if let Some(h) = h { t.retract(h); } + Ok(()) } }