Outbound TCP transport and infrastructure for gatekeeper-client resolution
This commit is contained in:
parent
af4cc732d5
commit
57037d6f8c
|
@ -148,6 +148,8 @@ async fn main() -> ActorResult {
|
|||
services::http_router::on_demand(t, Arc::clone(&server_config_ds));
|
||||
services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds));
|
||||
services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds));
|
||||
resolution::client::start(t, Arc::clone(&server_config_ds));
|
||||
resolution::transports::on_demand(t, Arc::clone(&server_config_ds));
|
||||
|
||||
if config.debt_reporter {
|
||||
server_config_ds.assert(t, language(), &service::RunService {
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use syndicate::actor::*;
|
||||
|
||||
use syndicate::schemas::gatekeeper;
|
||||
use syndicate::schemas::transport_address;
|
||||
|
||||
pub fn start(t: &mut Activation, ds: Arc<Cap>) {
|
||||
|
||||
}
|
|
@ -6,8 +6,10 @@ use syndicate::enclose;
|
|||
|
||||
use crate::language;
|
||||
|
||||
pub mod sturdy;
|
||||
pub mod client;
|
||||
pub mod noise;
|
||||
pub mod sturdy;
|
||||
pub mod transports;
|
||||
|
||||
fn handle_direct_resolution(
|
||||
ds: &mut Arc<Cap>,
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
use preserves_schema::Codec;
|
||||
use syndicate::relay;
|
||||
use syndicate::schemas::trace;
|
||||
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
|
||||
use syndicate::actor::*;
|
||||
use syndicate::rpc;
|
||||
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
|
||||
use syndicate::value::NestedValue;
|
||||
|
||||
use syndicate::schemas::transport_address::Tcp;
|
||||
use syndicate::schemas::rpc as R;
|
||||
use syndicate::schemas::gatekeeper as G;
|
||||
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use crate::language;
|
||||
|
||||
use syndicate::enclose;
|
||||
use syndicate::preserves::rec;
|
||||
use syndicate_macros::during;
|
||||
|
||||
struct TransportControl;
|
||||
|
||||
impl Entity<G::TransportControl> for TransportControl {
|
||||
fn message(&mut self, t: &mut Activation, c: G::TransportControl) -> ActorResult {
|
||||
let G::TransportControl(G::ForceDisconnect) = c;
|
||||
t.stop_root();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
|
||||
t.spawn(Some(AnyValue::symbol("transport_connector")), move |t| {
|
||||
during!(t, ds, language(), <q <connect-transport $addr: Tcp>>, |t| {
|
||||
Supervisor::start(
|
||||
t,
|
||||
Some(rec![AnyValue::symbol("relay"), language().unparse(&addr)]),
|
||||
SupervisorConfiguration::default(),
|
||||
|_t, _s| Ok(()),
|
||||
enclose!((ds) move |t| enclose!((ds, addr) run(t, ds, addr))))
|
||||
});
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
fn run(t: &mut Activation, ds: Arc<Cap>, addr: Tcp) -> ActorResult {
|
||||
tracing::info!(?addr, "Connecting");
|
||||
let name = AnyValue::new(vec![AnyValue::symbol("connector"), language().unparse(&addr)]);
|
||||
let trace_collector = t.trace_collector();
|
||||
let facet = t.facet_ref();
|
||||
t.linked_task(Some(name.clone()), async move {
|
||||
let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?;
|
||||
let account = Account::new(Some(name), trace_collector.clone());
|
||||
let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("connect"));
|
||||
match TcpStream::connect((addr.host.clone(), port)).await {
|
||||
Ok(stream) => {
|
||||
let (i, o) = stream.into_split();
|
||||
let i = relay::Input::Bytes(Box::pin(i));
|
||||
let o = relay::Output::Bytes(Box::pin(o));
|
||||
let initial_oid = Some(syndicate::sturdy::Oid(0.into()));
|
||||
facet.activate(&account, cause, |t| {
|
||||
let peer = relay::TunnelRelay::run(t, i, o, None, initial_oid, false)
|
||||
.expect("missing initial cap on connection");
|
||||
let control = Cap::guard(&language().syndicate, t.create(TransportControl));
|
||||
ds.assert(t, language(), &rpc::answer(
|
||||
language(),
|
||||
G::ConnectTransport { addr: language().unparse(&addr) },
|
||||
R::Result::Ok { value: language().unparse(&G::ConnectedTransport {
|
||||
addr: language().unparse(&addr),
|
||||
control,
|
||||
responder_session: peer,
|
||||
}) }));
|
||||
Ok(())
|
||||
});
|
||||
Ok(LinkedTaskTermination::KeepFacet)
|
||||
}
|
||||
Err(e) => {
|
||||
facet.activate(&account, cause, |t| {
|
||||
ds.assert(t, language(), &rpc::answer(
|
||||
language(),
|
||||
G::ConnectTransport { addr: language().unparse(&addr) },
|
||||
R::Result::Error { error: AnyValue::symbol(&format!("{:?}", e.kind())) }));
|
||||
Ok(())
|
||||
});
|
||||
Ok(LinkedTaskTermination::Normal)
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
|
@ -20,6 +20,7 @@ pub mod error;
|
|||
pub mod pattern;
|
||||
pub mod relay;
|
||||
pub mod rewrite;
|
||||
pub mod rpc;
|
||||
pub mod supervise;
|
||||
|
||||
pub mod schemas {
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
use preserves_schema::support::Codec;
|
||||
use preserves_schema::support::Unparse;
|
||||
|
||||
use crate::actor::AnyValue;
|
||||
use crate::language;
|
||||
use crate::schemas::rpc as R;
|
||||
|
||||
pub fn answer<'a, L, Q: Unparse<&'a L, AnyValue>, A: Unparse<&'a L, AnyValue>>(literals: &'a L, request: Q, response: A) -> R::Answer {
|
||||
R::Answer {
|
||||
request: request.unparse(literals),
|
||||
response: response.unparse(literals),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn answer_value<'a, L, Q: Unparse<&'a L, AnyValue>, A: Unparse<&'a L, AnyValue>>(literals: &'a L, request: Q, response: A) -> AnyValue {
|
||||
language().unparse(&answer(literals, request, response))
|
||||
}
|
Loading…
Reference in New Issue