Unix socket listener
This commit is contained in:
parent
107a04f4c9
commit
46d6d80b42
|
@ -6,7 +6,9 @@ use preserves::value::NestedValue;
|
||||||
use preserves::value::Value;
|
use preserves::value::Value;
|
||||||
|
|
||||||
use std::future::ready;
|
use std::future::ready;
|
||||||
|
use std::io;
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use structopt::StructOpt; // for from_args in main
|
use structopt::StructOpt; // for from_args in main
|
||||||
|
@ -24,6 +26,8 @@ use syndicate::sturdy;
|
||||||
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::net::UnixListener;
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
|
||||||
use tungstenite::Message;
|
use tungstenite::Message;
|
||||||
|
|
||||||
|
@ -32,6 +36,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
syndicate::convenient_logging()?;
|
syndicate::convenient_logging()?;
|
||||||
syndicate::actor::start_debt_reporter();
|
syndicate::actor::start_debt_reporter();
|
||||||
|
|
||||||
|
let config = Arc::new(config::ServerConfig::from_args());
|
||||||
|
|
||||||
{
|
{
|
||||||
const BRIGHT_GREEN: &str = "\x1b[92m";
|
const BRIGHT_GREEN: &str = "\x1b[92m";
|
||||||
const RED: &str = "\x1b[31m";
|
const RED: &str = "\x1b[31m";
|
||||||
|
@ -65,8 +71,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
tracing::info!(r"");
|
tracing::info!(r"");
|
||||||
}
|
}
|
||||||
|
|
||||||
let config = Arc::new(config::ServerConfig::from_args());
|
|
||||||
|
|
||||||
let mut daemons = Vec::new();
|
let mut daemons = Vec::new();
|
||||||
|
|
||||||
tracing::trace!("startup");
|
tracing::trace!("startup");
|
||||||
|
@ -94,7 +98,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
daemons.push(Actor::new().boot(
|
daemons.push(Actor::new().boot(
|
||||||
syndicate::name!("tcp", port),
|
syndicate::name!("tcp", port),
|
||||||
move |t| Ok(t.state.linked_task(syndicate::name!("listener"),
|
move |t| Ok(t.state.linked_task(syndicate::name!("listener"),
|
||||||
run_listener(gateway, port, config)))));
|
run_tcp_listener(gateway, port, config)))));
|
||||||
|
}
|
||||||
|
|
||||||
|
for path in config.sockets.clone() {
|
||||||
|
let gateway = Arc::clone(&gateway);
|
||||||
|
let config = Arc::clone(&config);
|
||||||
|
daemons.push(Actor::new().boot(
|
||||||
|
syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))),
|
||||||
|
move |t| Ok(t.state.linked_task(syndicate::name!("listener"),
|
||||||
|
run_unix_listener(gateway, path, config)))));
|
||||||
}
|
}
|
||||||
|
|
||||||
futures::future::join_all(daemons).await;
|
futures::future::join_all(daemons).await;
|
||||||
|
@ -127,43 +140,23 @@ fn extract_binary_packets(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_connection(
|
struct ExitListener;
|
||||||
|
|
||||||
|
impl Entity<()> for ExitListener {
|
||||||
|
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
||||||
|
tracing::info!(exit_status = debug(exit_status), "disconnect");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_connection(
|
||||||
ac: ActorRef,
|
ac: ActorRef,
|
||||||
stream: TcpStream,
|
i: relay::Input,
|
||||||
|
o: relay::Output,
|
||||||
gateway: Arc<Cap>,
|
gateway: Arc<Cap>,
|
||||||
addr: std::net::SocketAddr,
|
|
||||||
config: Arc<config::ServerConfig>,
|
config: Arc<config::ServerConfig>,
|
||||||
) -> ActorResult {
|
) -> ActorResult {
|
||||||
let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect
|
|
||||||
let (i, o) = match stream.peek(&mut buf).await? {
|
|
||||||
1 => match buf[0] {
|
|
||||||
b'G' /* ASCII 'G' for "GET" */ => {
|
|
||||||
tracing::info!(protocol = display("websocket"), peer = debug(addr));
|
|
||||||
let s = tokio_tungstenite::accept_async(stream).await
|
|
||||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
|
||||||
let (o, i) = s.split();
|
|
||||||
let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose()));
|
|
||||||
let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs))));
|
|
||||||
(relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o)))
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
tracing::info!(protocol = display("raw"), peer = debug(addr));
|
|
||||||
let (i, o) = stream.into_split();
|
|
||||||
(relay::Input::Bytes(Box::pin(i)),
|
|
||||||
relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
0 => Err(error("closed before starting", _Any::new(false)))?,
|
|
||||||
_ => unreachable!()
|
|
||||||
};
|
|
||||||
Activation::for_actor(&ac, Debtor::new(syndicate::name!("start-session")), |t| {
|
Activation::for_actor(&ac, Debtor::new(syndicate::name!("start-session")), |t| {
|
||||||
struct ExitListener;
|
|
||||||
impl Entity<()> for ExitListener {
|
|
||||||
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
|
||||||
tracing::info!(exit_status = debug(exit_status), "disconnect");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let exit_listener = t.state.create(ExitListener);
|
let exit_listener = t.state.create(ExitListener);
|
||||||
t.state.add_exit_hook(&exit_listener);
|
t.state.add_exit_hook(&exit_listener);
|
||||||
relay::TunnelRelay::run(t, i, o, Some(gateway), None);
|
relay::TunnelRelay::run(t, i, o, Some(gateway), None);
|
||||||
|
@ -171,7 +164,41 @@ async fn run_connection(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_listener(
|
async fn detect_protocol(
|
||||||
|
ac: ActorRef,
|
||||||
|
stream: TcpStream,
|
||||||
|
gateway: Arc<Cap>,
|
||||||
|
addr: std::net::SocketAddr,
|
||||||
|
config: Arc<config::ServerConfig>,
|
||||||
|
) -> ActorResult {
|
||||||
|
let (i, o) = {
|
||||||
|
let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect
|
||||||
|
match stream.peek(&mut buf).await? {
|
||||||
|
1 => match buf[0] {
|
||||||
|
b'G' /* ASCII 'G' for "GET" */ => {
|
||||||
|
tracing::info!(protocol = display("websocket"), peer = debug(addr));
|
||||||
|
let s = tokio_tungstenite::accept_async(stream).await
|
||||||
|
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||||
|
let (o, i) = s.split();
|
||||||
|
let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose()));
|
||||||
|
let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs))));
|
||||||
|
(relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o)))
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
tracing::info!(protocol = display("raw"), peer = debug(addr));
|
||||||
|
let (i, o) = stream.into_split();
|
||||||
|
(relay::Input::Bytes(Box::pin(i)),
|
||||||
|
relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
0 => Err(error("closed before starting", _Any::new(false)))?,
|
||||||
|
_ => unreachable!()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
run_connection(ac, i, o, gateway, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_tcp_listener(
|
||||||
gateway: Arc<Cap>,
|
gateway: Arc<Cap>,
|
||||||
port: u16,
|
port: u16,
|
||||||
config: Arc<config::ServerConfig>,
|
config: Arc<config::ServerConfig>,
|
||||||
|
@ -184,10 +211,67 @@ async fn run_listener(
|
||||||
let gateway = Arc::clone(&gateway);
|
let gateway = Arc::clone(&gateway);
|
||||||
let config = Arc::clone(&config);
|
let config = Arc::clone(&config);
|
||||||
let ac = Actor::new();
|
let ac = Actor::new();
|
||||||
ac.boot(syndicate::name!(parent: None, "connection"),
|
ac.boot(syndicate::name!(parent: None, "tcp"),
|
||||||
move |t| Ok(t.state.linked_task(
|
move |t| Ok(t.state.linked_task(
|
||||||
tracing::Span::current(),
|
tracing::Span::current(),
|
||||||
run_connection(t.actor.clone(), stream, gateway, addr, config))));
|
detect_protocol(t.actor.clone(), stream, gateway, addr, config))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_unix_listener(
|
||||||
|
gateway: Arc<Cap>,
|
||||||
|
path: PathBuf,
|
||||||
|
config: Arc<config::ServerConfig>,
|
||||||
|
) -> ActorResult {
|
||||||
|
let path_str = path.to_str().expect("representable UnixListener path");
|
||||||
|
tracing::info!("Listening on {:?}", path_str);
|
||||||
|
let listener = bind_unix_listener(&path).await?;
|
||||||
|
loop {
|
||||||
|
let (stream, addr) = listener.accept().await?;
|
||||||
|
let gateway = Arc::clone(&gateway);
|
||||||
|
let config = Arc::clone(&config);
|
||||||
|
let ac = Actor::new();
|
||||||
|
ac.boot(syndicate::name!(parent: None, "unix"),
|
||||||
|
move |t| Ok(t.state.linked_task(
|
||||||
|
tracing::Span::current(),
|
||||||
|
{
|
||||||
|
let ac = t.actor.clone();
|
||||||
|
async move {
|
||||||
|
tracing::info!(protocol = display("unix"), peer = debug(addr));
|
||||||
|
let (i, o) = stream.into_split();
|
||||||
|
run_connection(ac,
|
||||||
|
relay::Input::Bytes(Box::pin(i)),
|
||||||
|
relay::Output::Bytes(Box::pin(o)),
|
||||||
|
gateway,
|
||||||
|
config)
|
||||||
|
}
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn bind_unix_listener(path: &PathBuf) -> Result<UnixListener, Error> {
|
||||||
|
match UnixListener::bind(path) {
|
||||||
|
Ok(s) => Ok(s),
|
||||||
|
Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
|
||||||
|
// Potentially-stale socket file sitting around. Try
|
||||||
|
// connecting to it to see if it is alive, and remove it
|
||||||
|
// if not.
|
||||||
|
match UnixStream::connect(path).await {
|
||||||
|
Ok(_probe) => Err(e)?, // Someone's already there! Give up.
|
||||||
|
Err(f) if f.kind() == io::ErrorKind::ConnectionRefused => {
|
||||||
|
// Try to steal the socket.
|
||||||
|
tracing::info!("Cleaning stale socket");
|
||||||
|
std::fs::remove_file(path)?;
|
||||||
|
Ok(UnixListener::bind(path)?)
|
||||||
|
}
|
||||||
|
Err(f) => {
|
||||||
|
tracing::error!(error = debug(f),
|
||||||
|
"Problem while probing potentially-stale socket");
|
||||||
|
return Err(e)? // signal the *original* error, not the probe error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => Err(e)?,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,15 @@
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
#[derive(Clone, StructOpt)]
|
#[derive(Clone, StructOpt)]
|
||||||
pub struct ServerConfig {
|
pub struct ServerConfig {
|
||||||
#[structopt(short = "p", long = "port", default_value = "8001")]
|
#[structopt(short = "p", long = "port", default_value = "8001")]
|
||||||
pub ports: Vec<u16>,
|
pub ports: Vec<u16>,
|
||||||
|
|
||||||
|
#[structopt(short = "s", long = "socket")]
|
||||||
|
pub sockets: Vec<PathBuf>,
|
||||||
|
|
||||||
#[structopt(long, default_value = "10000")]
|
#[structopt(long, default_value = "10000")]
|
||||||
pub overload_threshold: usize,
|
pub overload_threshold: usize,
|
||||||
#[structopt(long, default_value = "5")]
|
#[structopt(long, default_value = "5")]
|
||||||
|
|
Loading…
Reference in New Issue