203 lines
8.2 KiB
Rust
203 lines
8.2 KiB
Rust
use preserves_schema::Codec;
|
|
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
|
|
use structopt::StructOpt;
|
|
|
|
use syndicate::actor::*;
|
|
use syndicate::dataspace::*;
|
|
use syndicate::enclose;
|
|
use syndicate::relay;
|
|
use syndicate::schemas::service;
|
|
use syndicate::schemas::transport_address;
|
|
|
|
use syndicate::value::Map;
|
|
use syndicate::value::NestedValue;
|
|
|
|
mod counter;
|
|
mod dependencies;
|
|
mod gatekeeper;
|
|
mod language;
|
|
mod lifecycle;
|
|
mod protocol;
|
|
mod script;
|
|
mod services;
|
|
|
|
mod schemas {
|
|
include!(concat!(env!("OUT_DIR"), "/src/schemas/mod.rs"));
|
|
}
|
|
|
|
use language::language;
|
|
use schemas::internal_services;
|
|
|
|
#[derive(Clone, StructOpt)]
|
|
struct ServerConfig {
|
|
#[structopt(short = "p", long = "port")]
|
|
ports: Vec<u16>,
|
|
|
|
#[structopt(short = "s", long = "socket")]
|
|
sockets: Vec<PathBuf>,
|
|
|
|
#[structopt(long)]
|
|
inferior: bool,
|
|
|
|
#[structopt(long)]
|
|
debt_reporter: bool,
|
|
|
|
#[structopt(short = "c", long)]
|
|
config: Vec<PathBuf>,
|
|
|
|
#[structopt(long)]
|
|
no_banner: bool,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
let config = Arc::new(ServerConfig::from_args());
|
|
|
|
syndicate::convenient_logging()?;
|
|
|
|
if !config.no_banner && !config.inferior {
|
|
const BRIGHT_GREEN: &str = "\x1b[92m";
|
|
const RED: &str = "\x1b[31m";
|
|
const GREEN: &str = "\x1b[32m";
|
|
const NORMAL: &str = "\x1b[0m";
|
|
const BRIGHT_YELLOW: &str = "\x1b[93m";
|
|
|
|
eprintln!(r"{} ______ {}", GREEN, NORMAL);
|
|
eprintln!(r"{} / {}\_{}\{} ", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
|
|
eprintln!(r"{} / {},{}__/{} \ {} ____ __", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL);
|
|
eprintln!(r"{} /{}\__/ \{},{} \{} _______ ______ ____/ /_/________ / /____", GREEN, BRIGHT_GREEN, RED, GREEN, NORMAL);
|
|
eprintln!(r"{} \{}/ \__/ {}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
|
|
eprintln!(r"{} \ {}'{} \__{}/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL);
|
|
eprintln!(r"{} \____{}/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
|
|
eprintln!(r" /____/");
|
|
eprintln!(r"");
|
|
eprintln!(r" {}version {}{}", BRIGHT_YELLOW, env!("CARGO_PKG_VERSION"), NORMAL);
|
|
eprintln!(r"");
|
|
eprintln!(r" documentation & reference material: https://syndicate-lang.org/");
|
|
eprintln!(r" source code & bugs: https://git.syndicate-lang.org/syndicate-lang/syndicate-rs");
|
|
eprintln!(r"");
|
|
}
|
|
|
|
tracing::trace!("startup");
|
|
|
|
Actor::new().boot(tracing::Span::current(), move |t| {
|
|
let server_config_ds = Cap::new(&t.create(Dataspace::new()));
|
|
let log_ds = Cap::new(&t.create(Dataspace::new()));
|
|
|
|
if config.inferior {
|
|
tracing::info!("inferior server instance");
|
|
t.spawn(syndicate::name!("parent"), enclose!((server_config_ds) move |t| {
|
|
protocol::run_io_relay(t,
|
|
relay::Input::Bytes(Box::pin(tokio::io::stdin())),
|
|
relay::Output::Bytes(Box::pin(tokio::io::stdout())),
|
|
server_config_ds)
|
|
}));
|
|
}
|
|
|
|
let gatekeeper = Cap::guard(Arc::clone(&language().syndicate), t.create(
|
|
syndicate::entity(Arc::clone(&server_config_ds))
|
|
.on_asserted(gatekeeper::handle_resolve)));
|
|
|
|
let mut env = Map::new();
|
|
env.insert("config".to_owned(), AnyValue::domain(Arc::clone(&server_config_ds)));
|
|
env.insert("log".to_owned(), AnyValue::domain(Arc::clone(&log_ds)));
|
|
env.insert("gatekeeper".to_owned(), AnyValue::domain(Arc::clone(&gatekeeper)));
|
|
|
|
dependencies::boot(t, Arc::clone(&server_config_ds));
|
|
services::config_watcher::on_demand(t, Arc::clone(&server_config_ds));
|
|
services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&log_ds));
|
|
services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds));
|
|
services::milestone::on_demand(t, Arc::clone(&server_config_ds));
|
|
services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds));
|
|
services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds));
|
|
|
|
if config.debt_reporter {
|
|
server_config_ds.assert(t, language(), &service::RunService {
|
|
service_name: language().unparse(&internal_services::DebtReporter),
|
|
});
|
|
}
|
|
|
|
for port in config.ports.clone() {
|
|
server_config_ds.assert(t, language(), &service::RunService {
|
|
service_name: language().unparse(&internal_services::TcpRelayListener {
|
|
addr: transport_address::Tcp {
|
|
host: "0.0.0.0".to_owned(),
|
|
port: (port as i32).into(),
|
|
},
|
|
gatekeeper: gatekeeper.clone(),
|
|
}),
|
|
});
|
|
}
|
|
|
|
for path in config.sockets.clone() {
|
|
server_config_ds.assert(t, language(), &service::RunService {
|
|
service_name: language().unparse(&internal_services::UnixRelayListener {
|
|
addr: transport_address::Unix {
|
|
path: path.to_str().expect("representable UnixListener path").to_owned(),
|
|
},
|
|
gatekeeper: gatekeeper.clone(),
|
|
}),
|
|
});
|
|
}
|
|
|
|
for path in config.config.clone() {
|
|
server_config_ds.assert(t, language(), &service::RunService {
|
|
service_name: language().unparse(&internal_services::ConfigWatcher {
|
|
path: path.to_str().expect("representable ConfigWatcher path").to_owned(),
|
|
env: internal_services::ConfigEnv(env.clone()),
|
|
}),
|
|
});
|
|
}
|
|
|
|
t.spawn(tracing::Span::current(), enclose!((log_ds) move |t| {
|
|
let n_unknown: AnyValue = AnyValue::symbol("-");
|
|
let n_pid: AnyValue = AnyValue::symbol("pid");
|
|
let n_line: AnyValue = AnyValue::symbol("line");
|
|
let n_service: AnyValue = AnyValue::symbol("service");
|
|
let n_stream: AnyValue = AnyValue::symbol("stream");
|
|
let e = syndicate::during::entity(())
|
|
.on_message(move |(), _t, captures: AnyValue| {
|
|
if let Some(captures) = captures.value_owned().into_sequence() {
|
|
let mut captures = captures.into_iter();
|
|
let timestamp = captures.next()
|
|
.and_then(|t| t.value_owned().into_string())
|
|
.unwrap_or_else(|| "-".to_owned());
|
|
if let Some(mut d) = captures.next()
|
|
.and_then(|d| d.value_owned().into_dictionary())
|
|
{
|
|
let pid = d.remove(&n_pid).unwrap_or_else(|| n_unknown.clone());
|
|
let service = d.remove(&n_service).unwrap_or_else(|| n_unknown.clone());
|
|
let line = d.remove(&n_line).unwrap_or_else(|| n_unknown.clone());
|
|
let stream = d.remove(&n_stream).unwrap_or_else(|| n_unknown.clone());
|
|
let message = format!("{} {:?}[{:?}] {:?}: {:?}",
|
|
timestamp,
|
|
service,
|
|
pid,
|
|
stream,
|
|
line);
|
|
if d.is_empty() {
|
|
tracing::info!(target: "", "{}", message);
|
|
} else {
|
|
tracing::info!(target: "", "{} {:?}", message, d);
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
})
|
|
.create_cap(t);
|
|
log_ds.assert(t, language(), &syndicate::schemas::dataspace::Observe {
|
|
pattern: syndicate_macros::pattern!(<log $ $>),
|
|
observer: e,
|
|
});
|
|
Ok(())
|
|
}));
|
|
|
|
Ok(())
|
|
}).await??;
|
|
|
|
Ok(())
|
|
}
|