Update HTTP service protocol

This commit is contained in:
Tony Garnock-Jones 2024-04-01 16:52:10 +02:00
parent 80ad0914ed
commit 94598a574b
2 changed files with 56 additions and 26 deletions

View File

@ -10,6 +10,7 @@ use hyper::header::HeaderValue;
use syndicate::actor::*;
use syndicate::error::Error;
use syndicate::relay::Mutex;
use syndicate::trace;
use syndicate::value::Map;
use syndicate::value::NestedValue;
@ -33,17 +34,23 @@ pub fn empty_response(code: StatusCode) -> Response<Body> {
type ChunkItem = Result<body::Bytes, Box<dyn std::error::Error + Send + Sync>>;
struct ResponseCollector {
framing_handle: Option<Handle>,
context_handle: Arc<Mutex<Option<Handle>>>,
tx_res: Option<(oneshot::Sender<Response<Body>>, Response<Body>)>,
body_tx: Option<UnboundedSender<ChunkItem>>,
}
impl ResponseCollector {
fn new(tx: oneshot::Sender<Response<Body>>) -> Self {
fn new(tx: oneshot::Sender<Response<Body>>, context_handle: Arc<Mutex<Option<Handle>>>) -> Self {
let (body_tx, body_rx) = unbounded_channel();
let body_stream: Box<dyn futures::Stream<Item = ChunkItem> + Send> =
Box::new(UnboundedReceiverStream::new(body_rx));
let mut res = Response::new(body_stream.into());
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
ResponseCollector {
tx_res: Some((tx, Response::new(body_stream.into()))),
framing_handle: None,
context_handle,
tx_res: Some((tx, res)),
body_tx: Some(body_tx),
}
}
@ -74,15 +81,42 @@ impl ResponseCollector {
Ok(())
}
fn finish(&mut self) -> ActorResult {
fn finish(&mut self, t: &mut Activation) -> ActorResult {
self.deliver_res();
self.body_tx = None;
if let Some(h) = self.context_handle.lock().take() {
t.retract(h);
}
Ok(())
}
}
impl Entity<http::HttpResponse> for ResponseCollector {
fn message(&mut self, _turn: &mut Activation, message: http::HttpResponse) -> ActorResult {
fn assert(&mut self, _t: &mut Activation, assertion: http::HttpResponse, handle: Handle) -> ActorResult {
match assertion {
http::HttpResponse::Processing => {
self.framing_handle = Some(handle);
self.with_res(|r| {
*r.status_mut() = StatusCode::OK;
Ok(())
})
}
_ => Err(format!("Unexpected assertion {:?}", assertion))?,
}
}
fn retract(&mut self, t: &mut Activation, handle: Handle) -> ActorResult {
if self.framing_handle == Some(handle) {
self.finish(t)?;
}
Ok(())
}
fn message(&mut self, t: &mut Activation, message: http::HttpResponse) -> ActorResult {
if self.framing_handle.is_none() {
self.finish(t)?;
Err("Attempt to reply before <processing> has been asserted")?;
}
match message {
http::HttpResponse::Status { code, .. } => self.with_res(|r| {
*r.status_mut() = StatusCode::from_u16(
@ -94,11 +128,8 @@ impl Entity<http::HttpResponse> for ResponseCollector {
HeaderValue::from_str(value.as_str())?);
Ok(())
}),
http::HttpResponse::Chunk { chunk } => self.add_chunk(*chunk),
http::HttpResponse::Done { chunk } => {
self.add_chunk(*chunk)?;
self.finish()
}
http::HttpResponse::Body { chunk } => self.add_chunk(*chunk),
_ => Err(format!("Unexpected message {:?}", message))?,
}
}
}
@ -160,7 +191,6 @@ pub async fn serve(
let account = Account::new(Some(AnyValue::symbol("http")), trace_collector);
let (tx, rx) = oneshot::channel();
let mut req_handle: Option<Handle> = None;
facet.activate(&account, Some(trace::TurnCause::external("http")), |t| {
let sreq = http::HttpRequest {
@ -174,20 +204,17 @@ pub async fn serve(
body,
};
tracing::debug!(?sreq);
let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx)));
req_handle = httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep });
let context_handle: Arc<Mutex<Option<Handle>>> = Arc::new(Mutex::new(None));
let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(
tx,
Arc::clone(&context_handle))));
*(context_handle.lock()) = httpd.assert(
t, language(), &http::HttpContext { req: sreq, res: srep });
Ok(())
});
let response_result = rx.await;
facet.activate(&account, Some(trace::TurnCause::external("http")), |t| {
if let Some(h) = req_handle {
t.retract(h);
}
Ok(())
});
match response_result {
Ok(response) => Ok(response),
Err(_ /* sender dropped */) => Ok(empty_response(StatusCode::INTERNAL_SERVER_ERROR)),

View File

@ -141,12 +141,12 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
http::MethodPattern::Specific(m) => m.to_uppercase(),
http::MethodPattern::Any => unreachable!(),
}).collect::<Vec<String>>().join(", ");
let h = res.assert(t, language(), &http::HttpResponse::Processing);
res.message(t, language(), &http::HttpResponse::Status {
code: 405.into(), message: "Method Not Allowed".into() });
res.message(t, language(), &http::HttpResponse::Header {
name: "allow".into(), value: allowed });
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
if let Some(h) = h { t.retract(h); }
return Ok(())
}
}
@ -166,10 +166,10 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
}
fn send_empty(t: &mut Activation, res: &Arc<Cap>, code: u16, message: &str) -> ActorResult {
let h = res.assert(t, language(), &http::HttpResponse::Processing);
res.message(t, language(), &http::HttpResponse::Status {
code: code.into(), message: message.into() });
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
if let Some(h) = h { t.retract(h); }
return Ok(())
}
@ -268,12 +268,12 @@ impl HttpStaticFileServer {
Ok(mut fh) => {
if fh.metadata().is_ok_and(|m| m.is_dir()) {
drop(fh);
let h = res.assert(t, language(), &http::HttpResponse::Processing);
res.message(t, language(), &http::HttpResponse::Status {
code: 301.into(), message: "Moved permanently".into() });
res.message(t, language(), &http::HttpResponse::Header {
name: "location".into(), value: format!("/{}/", req.path.join("/")) });
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
if let Some(h) = h { t.retract(h); }
return Ok(())
} else {
let mut buf = Vec::new();
@ -287,14 +287,17 @@ impl HttpStaticFileServer {
}
};
let h = res.assert(t, language(), &http::HttpResponse::Processing);
res.message(t, language(), &http::HttpResponse::Status {
code: 200.into(), message: "OK".into() });
if let Some(mime_type) = mime_type {
res.message(t, language(), &http::HttpResponse::Header {
name: "content-type".into(), value: mime_type.to_owned() });
}
res.message(t, language(), &http::HttpResponse::Done {
res.message(t, language(), &http::HttpResponse::Body {
chunk: Box::new(http::Chunk::Bytes(body)) });
if let Some(h) = h { t.retract(h); }
Ok(())
}
}