diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index a974562..b63a224 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -148,6 +148,8 @@ async fn main() -> ActorResult { services::http_router::on_demand(t, Arc::clone(&server_config_ds)); services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds)); services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds)); + resolution::client::start(t, Arc::clone(&server_config_ds)); + resolution::transports::on_demand(t, Arc::clone(&server_config_ds)); if config.debt_reporter { server_config_ds.assert(t, language(), &service::RunService { diff --git a/syndicate-server/src/resolution/client.rs b/syndicate-server/src/resolution/client.rs new file mode 100644 index 0000000..b31ba86 --- /dev/null +++ b/syndicate-server/src/resolution/client.rs @@ -0,0 +1,10 @@ +use std::sync::Arc; + +use syndicate::actor::*; + +use syndicate::schemas::gatekeeper; +use syndicate::schemas::transport_address; + +pub fn start(t: &mut Activation, ds: Arc) { + +} diff --git a/syndicate-server/src/resolution/mod.rs b/syndicate-server/src/resolution/mod.rs index fbb09ee..47d2a1c 100644 --- a/syndicate-server/src/resolution/mod.rs +++ b/syndicate-server/src/resolution/mod.rs @@ -6,8 +6,10 @@ use syndicate::enclose; use crate::language; -pub mod sturdy; +pub mod client; pub mod noise; +pub mod sturdy; +pub mod transports; fn handle_direct_resolution( ds: &mut Arc, diff --git a/syndicate-server/src/resolution/transports.rs b/syndicate-server/src/resolution/transports.rs new file mode 100644 index 0000000..7a63434 --- /dev/null +++ b/syndicate-server/src/resolution/transports.rs @@ -0,0 +1,93 @@ +use preserves_schema::Codec; +use syndicate::relay; +use syndicate::schemas::trace; + +use std::convert::TryFrom; +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::rpc; +use syndicate::supervise::{Supervisor, SupervisorConfiguration}; +use syndicate::value::NestedValue; + +use syndicate::schemas::transport_address::Tcp; +use syndicate::schemas::rpc as R; +use syndicate::schemas::gatekeeper as G; + +use tokio::net::TcpStream; + +use crate::language; + +use syndicate::enclose; +use syndicate::preserves::rec; +use syndicate_macros::during; + +struct TransportControl; + +impl Entity for TransportControl { + fn message(&mut self, t: &mut Activation, c: G::TransportControl) -> ActorResult { + let G::TransportControl(G::ForceDisconnect) = c; + t.stop_root(); + Ok(()) + } +} + +pub fn on_demand(t: &mut Activation, ds: Arc) { + t.spawn(Some(AnyValue::symbol("transport_connector")), move |t| { + during!(t, ds, language(), >, |t| { + Supervisor::start( + t, + Some(rec![AnyValue::symbol("relay"), language().unparse(&addr)]), + SupervisorConfiguration::default(), + |_t, _s| Ok(()), + enclose!((ds) move |t| enclose!((ds, addr) run(t, ds, addr)))) + }); + Ok(()) + }); +} + +fn run(t: &mut Activation, ds: Arc, addr: Tcp) -> ActorResult { + tracing::info!(?addr, "Connecting"); + let name = AnyValue::new(vec![AnyValue::symbol("connector"), language().unparse(&addr)]); + let trace_collector = t.trace_collector(); + let facet = t.facet_ref(); + t.linked_task(Some(name.clone()), async move { + let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?; + let account = Account::new(Some(name), trace_collector.clone()); + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("connect")); + match TcpStream::connect((addr.host.clone(), port)).await { + Ok(stream) => { + let (i, o) = stream.into_split(); + let i = relay::Input::Bytes(Box::pin(i)); + let o = relay::Output::Bytes(Box::pin(o)); + let initial_oid = Some(syndicate::sturdy::Oid(0.into())); + facet.activate(&account, cause, |t| { + let peer = relay::TunnelRelay::run(t, i, o, None, initial_oid, false) + .expect("missing initial cap on connection"); + let control = Cap::guard(&language().syndicate, t.create(TransportControl)); + ds.assert(t, language(), &rpc::answer( + language(), + G::ConnectTransport { addr: language().unparse(&addr) }, + R::Result::Ok { value: language().unparse(&G::ConnectedTransport { + addr: language().unparse(&addr), + control, + responder_session: peer, + }) })); + Ok(()) + }); + Ok(LinkedTaskTermination::KeepFacet) + } + Err(e) => { + facet.activate(&account, cause, |t| { + ds.assert(t, language(), &rpc::answer( + language(), + G::ConnectTransport { addr: language().unparse(&addr) }, + R::Result::Error { error: AnyValue::symbol(&format!("{:?}", e.kind())) })); + Ok(()) + }); + Ok(LinkedTaskTermination::Normal) + } + } + }); + Ok(()) +} diff --git a/syndicate/src/lib.rs b/syndicate/src/lib.rs index 4e13a96..8c18fd1 100644 --- a/syndicate/src/lib.rs +++ b/syndicate/src/lib.rs @@ -20,6 +20,7 @@ pub mod error; pub mod pattern; pub mod relay; pub mod rewrite; +pub mod rpc; pub mod supervise; pub mod schemas { diff --git a/syndicate/src/rpc.rs b/syndicate/src/rpc.rs new file mode 100644 index 0000000..21982f3 --- /dev/null +++ b/syndicate/src/rpc.rs @@ -0,0 +1,17 @@ +use preserves_schema::support::Codec; +use preserves_schema::support::Unparse; + +use crate::actor::AnyValue; +use crate::language; +use crate::schemas::rpc as R; + +pub fn answer<'a, L, Q: Unparse<&'a L, AnyValue>, A: Unparse<&'a L, AnyValue>>(literals: &'a L, request: Q, response: A) -> R::Answer { + R::Answer { + request: request.unparse(literals), + response: response.unparse(literals), + } +} + +pub fn answer_value<'a, L, Q: Unparse<&'a L, AnyValue>, A: Unparse<&'a L, AnyValue>>(literals: &'a L, request: Q, response: A) -> AnyValue { + language().unparse(&answer(literals, request, response)) +}