multicast ttl support

This commit is contained in:
Tony Garnock-Jones 2024-06-11 21:05:33 +02:00
parent b88c336714
commit 9b5f2f9e80
3 changed files with 77 additions and 39 deletions

View File

@ -4,4 +4,4 @@ sourcePort
ByteString„„„„„³ PacketOut´³rec´³lit³udp-out„´³tupleµ´³named³ ByteString„„„„„³ PacketOut´³rec´³lit³udp-out„´³tupleµ´³named³
targetAddr´³atom³String„„´³named³ targetAddr´³atom³String„„´³named³
targetPort´³atom³ SignedInteger„„´³named³body´³atom³ targetPort´³atom³ SignedInteger„„´³named³body´³atom³
ByteString„„„„„³MulticastLoopback´³rec´³lit³udp-multicast-loopback„´³tupleµ´³named³enabled´³atom³Boolean„„„„„³MulticastGroupMember´³rec´³lit³udp-multicast-group-member„´³tupleµ´³named³ groupAddress´³atom³String„„´³named³groupInterface´³atom³String„„„„„„³ embeddedType€„„„„ ByteString„„„„„³ MulticastTtl´³rec´³lit³udp-multicast-ttl„´³tupleµ´³named³hopCount´³atom³ SignedInteger„„„„„³MulticastLoopback´³rec´³lit³udp-multicast-loopback„´³tupleµ´³named³enabled´³atom³Boolean„„„„„³MulticastGroupMember´³rec´³lit³udp-multicast-group-member„´³tupleµ´³named³ groupAddress´³atom³String„„´³named³groupInterface´³atom³String„„„„„„³ embeddedType€„„„„

View File

@ -13,8 +13,9 @@ Error = <udp-error @detail any> .
PacketIn = <udp-in @sourceAddr string @sourcePort int @body bytes> . PacketIn = <udp-in @sourceAddr string @sourcePort int @body bytes> .
PacketOut = <udp-out @targetAddr string @targetPort int @body bytes> . PacketOut = <udp-out @targetAddr string @targetPort int @body bytes> .
# Assertion from local application # Assertions from local application
MulticastGroupMember = <udp-multicast-group-member @groupAddress string @groupInterface string> . MulticastGroupMember = <udp-multicast-group-member @groupAddress string @groupInterface string> .
MulticastTtl = <udp-multicast-ttl @hopCount int> .
# Message from local application # Message from local application
MulticastLoopback = <udp-multicast-loopback @enabled bool> . MulticastLoopback = <udp-multicast-loopback @enabled bool> .

View File

@ -2,7 +2,13 @@ use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use preserves_schema::Codec; use preserves_schema::Codec;
use syndicate::{actor::{Account, Activation, ActorResult, AnyValue, Cap, FacetRef, LinkedTaskTermination}, dataspace::Dataspace, relay, schemas::trace, value::NestedValue}; use syndicate::{
actor::{Account, Activation, ActorResult, AnyValue, Cap, FacetRef, LinkedTaskTermination},
dataspace::Dataspace,
relay,
schemas::trace,
value::NestedValue,
};
mod schemas { mod schemas {
include!(concat!(env!("OUT_DIR"), "/src/schemas/mod.rs")); include!(concat!(env!("OUT_DIR"), "/src/schemas/mod.rs"));
@ -41,41 +47,72 @@ async fn run_udp_socket(
addr: local_addr.ip().to_string(), addr: local_addr.ip().to_string(),
port: local_addr.port().into(), port: local_addr.port().into(),
}); });
enclose!((sock) on_message!(t, space, language(), $p: udp::PacketOut, enclose!((sock) move |_t: &mut Activation| { enclose!(
tracing::trace!(?p, "outbound packet"); (sock) on_message!(t, space, language(), $p: udp::PacketOut, enclose!(
let target: SocketAddr = str::parse(&format!("{}:{}", p.target_addr, p.target_port))?; (sock) move |_t: &mut Activation| {
match sock.try_send_to(&p.body[..], target) { tracing::trace!(?p, "outbound packet");
Ok(_) => (), let target: SocketAddr =
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => (), str::parse(&format!("{}:{}", p.target_addr, p.target_port))?;
Err(e) => Err(e)?, match sock.try_send_to(&p.body[..], target) {
} Ok(_) => (),
Ok(()) Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => (),
}))); Err(e) => Err(e)?,
enclose!((sock, space) during!(t, space, language(), $g: udp::MulticastGroupMember, enclose!((sock, space) move |t: &mut Activation| { }
tracing::debug!(?g, "adding multicast group"); Ok(())
let group_address = str::parse(&g.group_address)?; })));
let group_interface = str::parse(&g.group_interface)?; enclose!(
let mut ok = true; (sock, space) during!(t, space, language(), $g: udp::MulticastGroupMember, enclose!(
if let Err(err) = sock.join_multicast_v4(group_address, group_interface) { (sock, space) move |t: &mut Activation| {
ok = false; tracing::debug!(?g, "adding multicast group");
space.assert(t, language(), &udp::Error { detail: AnyValue::new(format!("{}", err)) }); let group_address = str::parse(&g.group_address)?;
} let group_interface = str::parse(&g.group_interface)?;
t.on_stop(enclose!((g) move |_t| { let mut ok = true;
if ok { if let Err(err) = sock.join_multicast_v4(group_address, group_interface) {
tracing::debug!(?g, "removing multicast group"); ok = false;
sock.leave_multicast_v4(group_address, group_interface)?; space.assert(t, language(), &udp::Error {
} detail: AnyValue::new(format!("join_multicast_v4: {}", err)),
Ok(()) });
})); }
Ok(()) t.on_stop(enclose!((g) move |_t| {
}))); if ok {
enclose!((sock, space) on_message!(t, space, language(), $l: udp::MulticastLoopback, enclose!((sock, space) move |t: &mut Activation| { tracing::debug!(?g, "removing multicast group");
tracing::debug!(?l, "setting multicast loopback"); sock.leave_multicast_v4(group_address, group_interface)?;
if let Err(err) = sock.set_multicast_loop_v4(l.enabled) { }
space.assert(t, language(), &udp::Error { detail: AnyValue::new(format!("{}", err)) }); Ok(())
} }));
Ok(()) Ok(())
}))); })));
enclose!(
(sock, space) on_message!(t, space, language(), $l: udp::MulticastLoopback, enclose!(
(sock, space) move |t: &mut Activation| {
tracing::debug!(?l, "setting multicast loopback");
if let Err(err) = sock.set_multicast_loop_v4(l.enabled) {
space.assert(t, language(), &udp::Error {
detail: AnyValue::new(format!("set_multicast_loop_v4: {}", err)),
});
}
Ok(())
})));
enclose!(
(sock, space) during!(t, space, language(), $n: udp::MulticastTtl, enclose!(
(sock, space) move |t: &mut Activation| {
tracing::debug!(?n, "setting multicast ttl");
let hop_count = match u32::try_from(&n.hop_count) {
Ok(hop_count) => hop_count,
Err(_) => {
space.assert(t, language(), &udp::Error {
detail: AnyValue::new("set_multicast_ttl_v4: bad hop count"),
});
return Ok(());
}
};
if let Err(err) = sock.set_multicast_ttl_v4(hop_count) {
space.assert(t, language(), &udp::Error {
detail: AnyValue::new(format!("set_multicast_ttl_v4: {}", err)),
});
}
Ok(())
})));
Ok(()) Ok(())
})); }));
let mut buf = [0; 65536]; let mut buf = [0; 65536];
@ -111,7 +148,7 @@ async fn main() -> ActorResult {
match run_udp_socket(&b, account.clone(), cause, facet.clone()).await { match run_udp_socket(&b, account.clone(), cause, facet.clone()).await {
Ok(()) => Ok(LinkedTaskTermination::Normal), Ok(()) => Ok(LinkedTaskTermination::Normal),
Err(err) => { Err(err) => {
let err_val = AnyValue::new(format!("{}", err)); let err_val = AnyValue::new(format!("udp_io: {}", err));
facet.activate(&account, err_cause, move |t| { facet.activate(&account, err_cause, move |t| {
b.space.assert(t, language(), &udp::Error { detail: err_val }); b.space.assert(t, language(), &udp::Error { detail: err_val });
Ok(()) Ok(())