Split out initial services in syndicate-server
This commit is contained in:
parent
f56c0df10f
commit
0eff672c30
|
@ -106,6 +106,8 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
|
||||||
let P_: TokenStream = quote!(syndicate::schemas::dataspace_patterns);
|
let P_: TokenStream = quote!(syndicate::schemas::dataspace_patterns);
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
let V_: TokenStream = quote!(syndicate::value);
|
let V_: TokenStream = quote!(syndicate::value);
|
||||||
|
#[allow(non_snake_case)]
|
||||||
|
let MapFromIterator_: TokenStream = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter);
|
||||||
|
|
||||||
match v.value() {
|
match v.value() {
|
||||||
Value::Symbol(s) => match s.as_str() {
|
Value::Symbol(s) => match s.as_str() {
|
||||||
|
@ -139,7 +141,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
|
||||||
label: #label_stx,
|
label: #label_stx,
|
||||||
arity: #arity .into(),
|
arity: #arity .into(),
|
||||||
}),
|
}),
|
||||||
members: #V_::Map::from_iter(vec![#(#members),*])
|
members: #MapFromIterator_(vec![#(#members),*])
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -151,7 +153,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
|
||||||
ctor: Box::new(#P_::CArr {
|
ctor: Box::new(#P_::CArr {
|
||||||
arity: #arity .into(),
|
arity: #arity .into(),
|
||||||
}),
|
}),
|
||||||
members: #V_::Map::from_iter(vec![#(#members),*])
|
members: #MapFromIterator_(vec![#(#members),*])
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Value::Set(_) =>
|
Value::Set(_) =>
|
||||||
|
@ -164,7 +166,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
|
||||||
}).collect::<Vec<_>>();
|
}).collect::<Vec<_>>();
|
||||||
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict {
|
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict {
|
||||||
ctor: Box::new(#P_::CDict),
|
ctor: Box::new(#P_::CDict),
|
||||||
members: #V_::Map::from_iter(vec![#(#members),*])
|
members: #MapFromIterator_(vec![#(#members),*])
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
_ => lit(compile_value(v)),
|
_ => lit(compile_value(v)),
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use std::iter::FromIterator;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use std::iter::FromIterator;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use std::iter::FromIterator;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use syndicate::actor::*;
|
||||||
|
use syndicate::during::DuringResult;
|
||||||
|
use syndicate::schemas::gatekeeper;
|
||||||
|
use syndicate::value::NestedValue;
|
||||||
|
|
||||||
|
pub fn handle_resolve(
|
||||||
|
ds: &mut Arc<Cap>,
|
||||||
|
t: &mut Activation,
|
||||||
|
a: gatekeeper::Resolve,
|
||||||
|
) -> DuringResult<Arc<Cap>> {
|
||||||
|
use syndicate::schemas::dataspace;
|
||||||
|
|
||||||
|
let gatekeeper::Resolve { sturdyref, observer } = a;
|
||||||
|
let queried_oid = sturdyref.oid.clone();
|
||||||
|
let handler = syndicate::entity(observer)
|
||||||
|
.on_asserted(move |observer, t, a: AnyValue| {
|
||||||
|
let bindings = a.value().to_sequence()?;
|
||||||
|
let key = bindings[0].value().to_bytestring()?;
|
||||||
|
let unattenuated_target = bindings[1].value().to_embedded()?;
|
||||||
|
match sturdyref.validate_and_attenuate(key, unattenuated_target) {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(sturdyref = debug(&AnyValue::from(&sturdyref)),
|
||||||
|
"sturdyref failed validation: {}", e);
|
||||||
|
Ok(None)
|
||||||
|
},
|
||||||
|
Ok(target) => {
|
||||||
|
tracing::trace!(sturdyref = debug(&AnyValue::from(&sturdyref)),
|
||||||
|
target = debug(&target),
|
||||||
|
"sturdyref resolved");
|
||||||
|
if let Some(h) = observer.assert(t, AnyValue::domain(target)) {
|
||||||
|
Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h)))))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.create_cap(t);
|
||||||
|
if let Some(oh) = ds.assert(t, &dataspace::Observe {
|
||||||
|
// TODO: codegen plugin to generate pattern constructors
|
||||||
|
pattern: syndicate_macros::pattern!("<bind =queried_oid $ $>"),
|
||||||
|
observer: handler,
|
||||||
|
}) {
|
||||||
|
Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh)))))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,3 @@
|
||||||
use futures::SinkExt;
|
|
||||||
use futures::StreamExt;
|
|
||||||
|
|
||||||
use std::future::ready;
|
|
||||||
use std::io;
|
|
||||||
use std::iter::FromIterator;
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
@ -11,22 +5,13 @@ use structopt::StructOpt;
|
||||||
|
|
||||||
use syndicate::actor::*;
|
use syndicate::actor::*;
|
||||||
use syndicate::dataspace::*;
|
use syndicate::dataspace::*;
|
||||||
use syndicate::during::DuringResult;
|
|
||||||
use syndicate::error::Error;
|
|
||||||
use syndicate::error::error;
|
|
||||||
use syndicate::relay;
|
|
||||||
use syndicate::schemas::internal_protocol::_Any;
|
|
||||||
use syndicate::schemas::gatekeeper;
|
|
||||||
use syndicate::sturdy;
|
use syndicate::sturdy;
|
||||||
|
|
||||||
use syndicate::value::NestedValue;
|
use syndicate::value::NestedValue;
|
||||||
|
|
||||||
use tokio::net::TcpListener;
|
mod gatekeeper;
|
||||||
use tokio::net::TcpStream;
|
mod protocol;
|
||||||
use tokio::net::UnixListener;
|
mod services;
|
||||||
use tokio::net::UnixStream;
|
|
||||||
|
|
||||||
use tungstenite::Message;
|
|
||||||
|
|
||||||
#[derive(Clone, StructOpt)]
|
#[derive(Clone, StructOpt)]
|
||||||
struct ServerConfig {
|
struct ServerConfig {
|
||||||
|
@ -74,61 +59,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
tracing::trace!("startup");
|
tracing::trace!("startup");
|
||||||
|
|
||||||
if config.debt_reporter {
|
|
||||||
Actor::new().boot(syndicate::name!("debt-reporter"), |t| {
|
|
||||||
t.linked_task(syndicate::name!("tick"), async {
|
|
||||||
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
|
|
||||||
loop {
|
|
||||||
timer.tick().await;
|
|
||||||
for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() {
|
|
||||||
let _enter = name.enter();
|
|
||||||
tracing::info!(id, debt = debug(
|
|
||||||
debt.load(std::sync::atomic::Ordering::Relaxed)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Actor::new().boot(syndicate::name!("dataspace"), move |t| {
|
Actor::new().boot(syndicate::name!("dataspace"), move |t| {
|
||||||
let ds = Cap::new(&t.create(Dataspace::new()));
|
let ds = Cap::new(&t.create(Dataspace::new()));
|
||||||
|
|
||||||
{
|
{
|
||||||
use syndicate::schemas::gatekeeper;
|
use syndicate::schemas::gatekeeper;
|
||||||
let key = vec![0; 16];
|
let key = vec![0; 16];
|
||||||
let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key);
|
let sr = sturdy::SturdyRef::mint(AnyValue::new("syndicate"), &key);
|
||||||
tracing::info!(rootcap = debug(&_Any::from(&sr)));
|
tracing::info!(rootcap = debug(&AnyValue::from(&sr)));
|
||||||
tracing::info!(rootcap = display(sr.to_hex()));
|
tracing::info!(rootcap = display(sr.to_hex()));
|
||||||
ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() });
|
ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.debt_reporter {
|
||||||
|
services::debt_reporter::spawn(t);
|
||||||
|
}
|
||||||
|
|
||||||
if config.inferior {
|
if config.inferior {
|
||||||
let ds = Arc::clone(&ds);
|
services::stdio_relay_listener::spawn(t, Arc::clone(&ds));
|
||||||
Actor::new().boot(syndicate::name!("parent"), move |t| run_io_relay(
|
|
||||||
t,
|
|
||||||
relay::Input::Bytes(Box::pin(tokio::io::stdin())),
|
|
||||||
relay::Output::Bytes(Box::pin(tokio::io::stdout())),
|
|
||||||
ds));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let gateway = Cap::guard(&t.create(
|
let gateway = Cap::guard(&t.create(
|
||||||
syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve)));
|
syndicate::entity(Arc::clone(&ds)).on_asserted(gatekeeper::handle_resolve)));
|
||||||
|
|
||||||
for port in config.ports.clone() {
|
for port in config.ports.clone() {
|
||||||
let gateway = Arc::clone(&gateway);
|
services::tcp_relay_listener::spawn(t, Arc::clone(&gateway), port);
|
||||||
Actor::new().boot(
|
|
||||||
syndicate::name!("tcp", port),
|
|
||||||
move |t| Ok(t.linked_task(syndicate::name!("listener"),
|
|
||||||
run_tcp_listener(gateway, port))));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for path in config.sockets.clone() {
|
for path in config.sockets.clone() {
|
||||||
let gateway = Arc::clone(&gateway);
|
services::unix_relay_listener::spawn(t, Arc::clone(&gateway), path);
|
||||||
Actor::new().boot(
|
|
||||||
syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))),
|
|
||||||
move |t| Ok(t.linked_task(syndicate::name!("listener"),
|
|
||||||
run_unix_listener(gateway, path))));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -136,213 +95,3 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
fn message_error<E: std::fmt::Display>(e: E) -> Error {
|
|
||||||
error(&e.to_string(), _Any::new(false))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_binary_packets(
|
|
||||||
r: Result<Message, tungstenite::Error>,
|
|
||||||
) -> Result<Option<Vec<u8>>, Error> {
|
|
||||||
match r {
|
|
||||||
Ok(m) => match m {
|
|
||||||
Message::Text(_) =>
|
|
||||||
Err("Text websocket frames are not accepted")?,
|
|
||||||
Message::Binary(bs) =>
|
|
||||||
Ok(Some(bs)),
|
|
||||||
Message::Ping(_) =>
|
|
||||||
Ok(None), // pings are handled by tungstenite before we see them
|
|
||||||
Message::Pong(_) =>
|
|
||||||
Ok(None), // unsolicited pongs are to be ignored
|
|
||||||
Message::Close(_) =>
|
|
||||||
Ok(None), // we're about to see the end of the stream, so ignore this
|
|
||||||
},
|
|
||||||
Err(e) => Err(message_error(e)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
struct ExitListener;
|
|
||||||
|
|
||||||
impl Entity<()> for ExitListener {
|
|
||||||
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
|
||||||
tracing::info!(exit_status = debug(exit_status), "disconnect");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run_io_relay(
|
|
||||||
t: &mut Activation,
|
|
||||||
i: relay::Input,
|
|
||||||
o: relay::Output,
|
|
||||||
initial_ref: Arc<Cap>,
|
|
||||||
) -> ActorResult {
|
|
||||||
let exit_listener = t.create(ExitListener);
|
|
||||||
t.state.add_exit_hook(&exit_listener);
|
|
||||||
relay::TunnelRelay::run(t, i, o, Some(initial_ref), None);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run_connection(
|
|
||||||
facet: FacetRef,
|
|
||||||
i: relay::Input,
|
|
||||||
o: relay::Output,
|
|
||||||
initial_ref: Arc<Cap>,
|
|
||||||
) -> ActorResult {
|
|
||||||
facet.activate(Account::new(syndicate::name!("start-session")),
|
|
||||||
|t| run_io_relay(t, i, o, initial_ref))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn detect_protocol(
|
|
||||||
facet: FacetRef,
|
|
||||||
stream: TcpStream,
|
|
||||||
gateway: Arc<Cap>,
|
|
||||||
addr: std::net::SocketAddr,
|
|
||||||
) -> ActorResult {
|
|
||||||
let (i, o) = {
|
|
||||||
let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect
|
|
||||||
match stream.peek(&mut buf).await? {
|
|
||||||
1 => match buf[0] {
|
|
||||||
b'G' /* ASCII 'G' for "GET" */ => {
|
|
||||||
tracing::info!(protocol = display("websocket"), peer = debug(addr));
|
|
||||||
let s = tokio_tungstenite::accept_async(stream).await
|
|
||||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
|
||||||
let (o, i) = s.split();
|
|
||||||
let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose()));
|
|
||||||
let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs))));
|
|
||||||
(relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o)))
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
tracing::info!(protocol = display("raw"), peer = debug(addr));
|
|
||||||
let (i, o) = stream.into_split();
|
|
||||||
(relay::Input::Bytes(Box::pin(i)),
|
|
||||||
relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
0 => Err(error("closed before starting", _Any::new(false)))?,
|
|
||||||
_ => unreachable!()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
run_connection(facet, i, o, gateway)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_tcp_listener(
|
|
||||||
gateway: Arc<Cap>,
|
|
||||||
port: u16,
|
|
||||||
) -> ActorResult {
|
|
||||||
let listen_addr = format!("0.0.0.0:{}", port);
|
|
||||||
tracing::info!("Listening on {}", listen_addr);
|
|
||||||
let listener = TcpListener::bind(listen_addr).await?;
|
|
||||||
loop {
|
|
||||||
let (stream, addr) = listener.accept().await?;
|
|
||||||
let gateway = Arc::clone(&gateway);
|
|
||||||
let ac = Actor::new();
|
|
||||||
ac.boot(syndicate::name!(parent: None, "tcp"),
|
|
||||||
move |t| Ok(t.linked_task(
|
|
||||||
tracing::Span::current(),
|
|
||||||
detect_protocol(t.facet.clone(), stream, gateway, addr))));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_unix_listener(
|
|
||||||
gateway: Arc<Cap>,
|
|
||||||
path: PathBuf,
|
|
||||||
) -> ActorResult {
|
|
||||||
let path_str = path.to_str().expect("representable UnixListener path");
|
|
||||||
tracing::info!("Listening on {:?}", path_str);
|
|
||||||
let listener = bind_unix_listener(&path).await?;
|
|
||||||
loop {
|
|
||||||
let (stream, _addr) = listener.accept().await?;
|
|
||||||
let peer = stream.peer_cred()?;
|
|
||||||
let gateway = Arc::clone(&gateway);
|
|
||||||
Actor::new().boot(
|
|
||||||
syndicate::name!(parent: None, "unix", pid = debug(peer.pid().unwrap_or(-1)), uid = peer.uid()),
|
|
||||||
|t| Ok(t.linked_task(
|
|
||||||
tracing::Span::current(),
|
|
||||||
{
|
|
||||||
let facet = t.facet.clone();
|
|
||||||
async move {
|
|
||||||
tracing::info!(protocol = display("unix"));
|
|
||||||
let (i, o) = stream.into_split();
|
|
||||||
run_connection(facet,
|
|
||||||
relay::Input::Bytes(Box::pin(i)),
|
|
||||||
relay::Output::Bytes(Box::pin(o)),
|
|
||||||
gateway)
|
|
||||||
}
|
|
||||||
})));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn bind_unix_listener(path: &PathBuf) -> Result<UnixListener, Error> {
|
|
||||||
match UnixListener::bind(path) {
|
|
||||||
Ok(s) => Ok(s),
|
|
||||||
Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
|
|
||||||
// Potentially-stale socket file sitting around. Try
|
|
||||||
// connecting to it to see if it is alive, and remove it
|
|
||||||
// if not.
|
|
||||||
match UnixStream::connect(path).await {
|
|
||||||
Ok(_probe) => Err(e)?, // Someone's already there! Give up.
|
|
||||||
Err(f) if f.kind() == io::ErrorKind::ConnectionRefused => {
|
|
||||||
// Try to steal the socket.
|
|
||||||
tracing::info!("Cleaning stale socket");
|
|
||||||
std::fs::remove_file(path)?;
|
|
||||||
Ok(UnixListener::bind(path)?)
|
|
||||||
}
|
|
||||||
Err(f) => {
|
|
||||||
tracing::error!(error = debug(f),
|
|
||||||
"Problem while probing potentially-stale socket");
|
|
||||||
return Err(e)? // signal the *original* error, not the probe error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(e) => Err(e)?,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
fn handle_resolve(
|
|
||||||
ds: &mut Arc<Cap>,
|
|
||||||
t: &mut Activation,
|
|
||||||
a: gatekeeper::Resolve,
|
|
||||||
) -> DuringResult<Arc<Cap>> {
|
|
||||||
use syndicate::schemas::dataspace;
|
|
||||||
|
|
||||||
let gatekeeper::Resolve { sturdyref, observer } = a;
|
|
||||||
let queried_oid = sturdyref.oid.clone();
|
|
||||||
let handler = syndicate::entity(observer)
|
|
||||||
.on_asserted(move |observer, t, a: _Any| {
|
|
||||||
let bindings = a.value().to_sequence()?;
|
|
||||||
let key = bindings[0].value().to_bytestring()?;
|
|
||||||
let unattenuated_target = bindings[1].value().to_embedded()?;
|
|
||||||
match sturdyref.validate_and_attenuate(key, unattenuated_target) {
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(sturdyref = debug(&_Any::from(&sturdyref)),
|
|
||||||
"sturdyref failed validation: {}", e);
|
|
||||||
Ok(None)
|
|
||||||
},
|
|
||||||
Ok(target) => {
|
|
||||||
tracing::trace!(sturdyref = debug(&_Any::from(&sturdyref)),
|
|
||||||
target = debug(&target),
|
|
||||||
"sturdyref resolved");
|
|
||||||
if let Some(h) = observer.assert(t, _Any::domain(target)) {
|
|
||||||
Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h)))))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.create_cap(t);
|
|
||||||
if let Some(oh) = ds.assert(t, &dataspace::Observe {
|
|
||||||
// TODO: codegen plugin to generate pattern constructors
|
|
||||||
pattern: syndicate_macros::pattern!("<bind =queried_oid $ $>"),
|
|
||||||
observer: handler,
|
|
||||||
}) {
|
|
||||||
Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh)))))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
use futures::SinkExt;
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
|
use std::future::ready;
|
||||||
|
use std::io;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use syndicate::actor::*;
|
||||||
|
use syndicate::error::Error;
|
||||||
|
use syndicate::error::error;
|
||||||
|
use syndicate::relay;
|
||||||
|
use syndicate::value::NestedValue;
|
||||||
|
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
|
use tungstenite::Message;
|
||||||
|
|
||||||
|
struct ExitListener;
|
||||||
|
|
||||||
|
impl Entity<()> for ExitListener {
|
||||||
|
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
||||||
|
tracing::info!(exit_status = debug(exit_status), "disconnect");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run_io_relay(
|
||||||
|
t: &mut Activation,
|
||||||
|
i: relay::Input,
|
||||||
|
o: relay::Output,
|
||||||
|
initial_ref: Arc<Cap>,
|
||||||
|
) -> ActorResult {
|
||||||
|
let exit_listener = t.create(ExitListener);
|
||||||
|
t.state.add_exit_hook(&exit_listener);
|
||||||
|
relay::TunnelRelay::run(t, i, o, Some(initial_ref), None);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run_connection(
|
||||||
|
facet: FacetRef,
|
||||||
|
i: relay::Input,
|
||||||
|
o: relay::Output,
|
||||||
|
initial_ref: Arc<Cap>,
|
||||||
|
) -> ActorResult {
|
||||||
|
facet.activate(Account::new(syndicate::name!("start-session")),
|
||||||
|
|t| run_io_relay(t, i, o, initial_ref))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn detect_protocol(
|
||||||
|
facet: FacetRef,
|
||||||
|
stream: TcpStream,
|
||||||
|
gateway: Arc<Cap>,
|
||||||
|
addr: std::net::SocketAddr,
|
||||||
|
) -> ActorResult {
|
||||||
|
let (i, o) = {
|
||||||
|
let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect
|
||||||
|
match stream.peek(&mut buf).await? {
|
||||||
|
1 => match buf[0] {
|
||||||
|
b'G' /* ASCII 'G' for "GET" */ => {
|
||||||
|
tracing::info!(protocol = display("websocket"), peer = debug(addr));
|
||||||
|
let s = tokio_tungstenite::accept_async(stream).await
|
||||||
|
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||||
|
let (o, i) = s.split();
|
||||||
|
let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose()));
|
||||||
|
let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs))));
|
||||||
|
(relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o)))
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
tracing::info!(protocol = display("raw"), peer = debug(addr));
|
||||||
|
let (i, o) = stream.into_split();
|
||||||
|
(relay::Input::Bytes(Box::pin(i)),
|
||||||
|
relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
0 => Err(error("closed before starting", AnyValue::new(false)))?,
|
||||||
|
_ => unreachable!()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
run_connection(facet, i, o, gateway)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn message_error<E: std::fmt::Display>(e: E) -> Error {
|
||||||
|
error(&e.to_string(), AnyValue::new(false))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_binary_packets(
|
||||||
|
r: Result<Message, tungstenite::Error>,
|
||||||
|
) -> Result<Option<Vec<u8>>, Error> {
|
||||||
|
match r {
|
||||||
|
Ok(m) => match m {
|
||||||
|
Message::Text(_) =>
|
||||||
|
Err("Text websocket frames are not accepted")?,
|
||||||
|
Message::Binary(bs) =>
|
||||||
|
Ok(Some(bs)),
|
||||||
|
Message::Ping(_) =>
|
||||||
|
Ok(None), // pings are handled by tungstenite before we see them
|
||||||
|
Message::Pong(_) =>
|
||||||
|
Ok(None), // unsolicited pongs are to be ignored
|
||||||
|
Message::Close(_) =>
|
||||||
|
Ok(None), // we're about to see the end of the stream, so ignore this
|
||||||
|
},
|
||||||
|
Err(e) => Err(message_error(e)),
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
use syndicate::actor::*;
|
||||||
|
|
||||||
|
pub fn spawn(t: &mut Activation) {
|
||||||
|
t.spawn(syndicate::name!("debt-reporter"), |t| {
|
||||||
|
t.linked_task(syndicate::name!("tick"), async {
|
||||||
|
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
|
||||||
|
loop {
|
||||||
|
timer.tick().await;
|
||||||
|
for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() {
|
||||||
|
let _enter = name.enter();
|
||||||
|
tracing::info!(id, debt = debug(
|
||||||
|
debt.load(std::sync::atomic::Ordering::Relaxed)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
pub mod debt_reporter;
|
||||||
|
pub mod stdio_relay_listener;
|
||||||
|
pub mod tcp_relay_listener;
|
||||||
|
pub mod unix_relay_listener;
|
|
@ -0,0 +1,14 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use syndicate::actor::*;
|
||||||
|
use syndicate::relay;
|
||||||
|
|
||||||
|
use crate::protocol::run_io_relay;
|
||||||
|
|
||||||
|
pub fn spawn(t: &mut Activation, ds: Arc<Cap>) {
|
||||||
|
t.spawn(syndicate::name!("parent"), move |t| run_io_relay(
|
||||||
|
t,
|
||||||
|
relay::Input::Bytes(Box::pin(tokio::io::stdin())),
|
||||||
|
relay::Output::Bytes(Box::pin(tokio::io::stdout())),
|
||||||
|
ds))
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use syndicate::actor::*;
|
||||||
|
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
|
use crate::protocol::detect_protocol;
|
||||||
|
|
||||||
|
pub fn spawn(t: &mut Activation, gateway: Arc<Cap>, port: u16) {
|
||||||
|
t.spawn(syndicate::name!("tcp", port),
|
||||||
|
move |t| Ok(t.linked_task(syndicate::name!("listener"), run(gateway, port))))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(
|
||||||
|
gateway: Arc<Cap>,
|
||||||
|
port: u16,
|
||||||
|
) -> ActorResult {
|
||||||
|
let listen_addr = format!("0.0.0.0:{}", port);
|
||||||
|
tracing::info!("Listening on {}", listen_addr);
|
||||||
|
let listener = TcpListener::bind(listen_addr).await?;
|
||||||
|
loop {
|
||||||
|
let (stream, addr) = listener.accept().await?;
|
||||||
|
let gateway = Arc::clone(&gateway);
|
||||||
|
Actor::new().boot(syndicate::name!(parent: None, "tcp"),
|
||||||
|
move |t| Ok(t.linked_task(
|
||||||
|
tracing::Span::current(),
|
||||||
|
detect_protocol(t.facet.clone(), stream, gateway, addr))));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
use std::io;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use syndicate::actor::*;
|
||||||
|
use syndicate::error::Error;
|
||||||
|
use syndicate::relay;
|
||||||
|
|
||||||
|
use tokio::net::UnixListener;
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
|
||||||
|
use crate::protocol::run_connection;
|
||||||
|
|
||||||
|
pub fn spawn(t: &mut Activation, gateway: Arc<Cap>, path: PathBuf) {
|
||||||
|
t.spawn(syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))),
|
||||||
|
|t| Ok(t.linked_task(syndicate::name!("listener!"), run(gateway, path))))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(
|
||||||
|
gateway: Arc<Cap>,
|
||||||
|
path: PathBuf,
|
||||||
|
) -> ActorResult {
|
||||||
|
let path_str = path.to_str().expect("representable UnixListener path");
|
||||||
|
tracing::info!("Listening on {:?}", path_str);
|
||||||
|
let listener = bind_unix_listener(&path).await?;
|
||||||
|
loop {
|
||||||
|
let (stream, _addr) = listener.accept().await?;
|
||||||
|
let peer = stream.peer_cred()?;
|
||||||
|
let gateway = Arc::clone(&gateway);
|
||||||
|
Actor::new().boot(
|
||||||
|
syndicate::name!(parent: None, "unix", pid = debug(peer.pid().unwrap_or(-1)), uid = peer.uid()),
|
||||||
|
|t| Ok(t.linked_task(
|
||||||
|
tracing::Span::current(),
|
||||||
|
{
|
||||||
|
let facet = t.facet.clone();
|
||||||
|
async move {
|
||||||
|
tracing::info!(protocol = display("unix"));
|
||||||
|
let (i, o) = stream.into_split();
|
||||||
|
run_connection(facet,
|
||||||
|
relay::Input::Bytes(Box::pin(i)),
|
||||||
|
relay::Output::Bytes(Box::pin(o)),
|
||||||
|
gateway)
|
||||||
|
}
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn bind_unix_listener(path: &PathBuf) -> Result<UnixListener, Error> {
|
||||||
|
match UnixListener::bind(path) {
|
||||||
|
Ok(s) => Ok(s),
|
||||||
|
Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
|
||||||
|
// Potentially-stale socket file sitting around. Try
|
||||||
|
// connecting to it to see if it is alive, and remove it
|
||||||
|
// if not.
|
||||||
|
match UnixStream::connect(path).await {
|
||||||
|
Ok(_probe) => Err(e)?, // Someone's already there! Give up.
|
||||||
|
Err(f) if f.kind() == io::ErrorKind::ConnectionRefused => {
|
||||||
|
// Try to steal the socket.
|
||||||
|
tracing::info!("Cleaning stale socket");
|
||||||
|
std::fs::remove_file(path)?;
|
||||||
|
Ok(UnixListener::bind(path)?)
|
||||||
|
}
|
||||||
|
Err(f) => {
|
||||||
|
tracing::error!(error = debug(f),
|
||||||
|
"Problem while probing potentially-stale socket");
|
||||||
|
return Err(e)? // signal the *original* error, not the probe error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => Err(e)?,
|
||||||
|
}
|
||||||
|
}
|
|
@ -765,6 +765,23 @@ impl<'activation> Activation<'activation> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn enqueue_for_myself_at_commit(&mut self, action: Action) {
|
||||||
|
let mailbox = self.state.mailbox();
|
||||||
|
self.pending.queue_for_mailbox(&mailbox).push(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedule the creation of a new actor when the Activation commits.
|
||||||
|
pub fn spawn<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
|
||||||
|
&mut self,
|
||||||
|
name: tracing::Span,
|
||||||
|
boot: F,
|
||||||
|
) {
|
||||||
|
self.enqueue_for_myself_at_commit(Box::new(move |_| {
|
||||||
|
Actor::new().boot(name, boot);
|
||||||
|
Ok(())
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's
|
/// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's
|
||||||
/// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets],
|
/// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets],
|
||||||
pub fn facet<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
|
pub fn facet<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
|
||||||
|
@ -810,18 +827,16 @@ impl<'activation> Activation<'activation> {
|
||||||
/// yet, none of the shutdown handlers yields an error, and the facet's parent facet is
|
/// yet, none of the shutdown handlers yields an error, and the facet's parent facet is
|
||||||
/// alive, executes `continuation` in the parent facet's context.
|
/// alive, executes `continuation` in the parent facet's context.
|
||||||
pub fn stop_facet(&mut self, facet_id: FacetId, continuation: Option<Action>) {
|
pub fn stop_facet(&mut self, facet_id: FacetId, continuation: Option<Action>) {
|
||||||
let mailbox = self.state.mailbox();
|
|
||||||
let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id);
|
let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id);
|
||||||
self.pending.queue_for_mailbox(&mailbox).push(Box::new(
|
self.enqueue_for_myself_at_commit(Box::new(move |t| {
|
||||||
move |t| {
|
t._terminate_facet(facet_id, true)?;
|
||||||
t._terminate_facet(facet_id, true)?;
|
if let Some(k) = continuation {
|
||||||
if let Some(k) = continuation {
|
if let Some(parent_id) = maybe_parent_id {
|
||||||
if let Some(parent_id) = maybe_parent_id {
|
t.with_facet(true, parent_id, k)?;
|
||||||
t.with_facet(true, parent_id, k)?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
}
|
||||||
}));
|
Ok(())
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Arranges for the active facet to be stopped cleanly when `self` commits.
|
/// Arranges for the active facet to be stopped cleanly when `self` commits.
|
||||||
|
|
Loading…
Reference in New Issue