diff --git a/Cargo.toml b/Cargo.toml index 9bf9072..71fb143 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ vendored-openssl = ["openssl/vendored"] [profile.release] debug = true -lto = true +# lto = true [lib] name = "syndicate" diff --git a/Makefile b/Makefile index 368ec32..d3909d9 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,14 @@ arm-binary-release: arm-binary-debug: cross build --target=armv7-unknown-linux-musleabihf --all-targets --features vendored-openssl +aarch64-binary: aarch64-binary-release + +aarch64-binary-release: + cross build --target=aarch64-unknown-linux-musl --release --all-targets --features vendored-openssl + +aarch64-binary-debug: + cross build --target=aarch64-unknown-linux-musl --all-targets --features vendored-openssl + pull-protocols: git subtree pull -P protocols \ -m 'Merge latest changes from the syndicate-protocols repository' \ diff --git a/src/actor.rs b/src/actor.rs index deada94..b56857d 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -27,7 +27,6 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; // use tokio::sync::Notify; use tokio_util::sync::CancellationToken; -use tracing; use tracing::Instrument; pub use super::schemas::internal_protocol::_Any; @@ -263,10 +262,11 @@ impl<'activation> Activation<'activation> { if turn.len() == 0 { continue; } let first_ref = Arc::clone(&turn[0].0); let target = &first_ref.addr.mailbox; - let _ = target.send( - &self.debtor, - Turn(turn.into_iter().map( - |(r, e)| TurnEvent { oid: r.addr.oid.clone(), event: e }).collect())); + let mut turn_events = Vec::new(); + for (r, e) in turn.into_iter() { + turn_events.push(TurnEvent { oid: r.addr.oid.clone(), event: e }); + } + let _ = target.send(&self.debtor, Turn(turn_events)); } } diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index f89b801..9f13eed 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -25,8 +25,6 @@ use syndicate::sturdy; use tokio::net::TcpListener; use tokio::net::TcpStream; -use tracing::{info, trace}; - use tungstenite::Message; #[tokio::main] @@ -41,37 +39,37 @@ async fn main() -> Result<(), Box> { const NORMAL: &str = "\x1b[0m"; const BRIGHT_YELLOW: &str = "\x1b[93m"; - info!(r"{} ______ {}", GREEN, NORMAL); - info!(r"{} / {}\_{}\{} ", GREEN, BRIGHT_GREEN, GREEN, NORMAL); - info!(r"{} / {},{}__/{} \ {} ____ __", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL); - info!(r"{} /{}\__/ \{},{} \{} _______ ______ ____/ /_/________ / /____", GREEN, BRIGHT_GREEN, RED, GREEN, NORMAL); - info!(r"{} \{}/ \__/ {}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL); - info!(r"{} \ {}'{} \__{}/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL); - info!(r"{} \____{}/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL); - info!(r" /____/"); + tracing::info!(r"{} ______ {}", GREEN, NORMAL); + tracing::info!(r"{} / {}\_{}\{} ", GREEN, BRIGHT_GREEN, GREEN, NORMAL); + tracing::info!(r"{} / {},{}__/{} \ {} ____ __", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL); + tracing::info!(r"{} /{}\__/ \{},{} \{} _______ ______ ____/ /_/________ / /____", GREEN, BRIGHT_GREEN, RED, GREEN, NORMAL); + tracing::info!(r"{} \{}/ \__/ {}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL); + tracing::info!(r"{} \ {}'{} \__{}/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL); + tracing::info!(r"{} \____{}/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL); + tracing::info!(r" /____/"); - // info!(r" {} __{}__{}__ {}", GREEN, BRIGHT_GREEN, GREEN, NORMAL); - // info!(r" {} /{}_/ \_{}\ {}", GREEN, BRIGHT_GREEN, GREEN, NORMAL); - // info!(r" {} / \__/ \ {} __ __", BRIGHT_GREEN, NORMAL); - // info!(r" {}/{}\__/ \__/{}\{} _______ ______ ____/ /__________ / /____", GREEN, BRIGHT_GREEN, GREEN, NORMAL); - // info!(r" {}\{}/ \__/ \{}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL); - // info!(r" {} \__/ \__/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", BRIGHT_GREEN, NORMAL); - // info!(r" {} \_{}\__/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL); - // info!(r" /____/"); + // tracing::info!(r" {} __{}__{}__ {}", GREEN, BRIGHT_GREEN, GREEN, NORMAL); + // tracing::info!(r" {} /{}_/ \_{}\ {}", GREEN, BRIGHT_GREEN, GREEN, NORMAL); + // tracing::info!(r" {} / \__/ \ {} __ __", BRIGHT_GREEN, NORMAL); + // tracing::info!(r" {}/{}\__/ \__/{}\{} _______ ______ ____/ /__________ / /____", GREEN, BRIGHT_GREEN, GREEN, NORMAL); + // tracing::info!(r" {}\{}/ \__/ \{}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL); + // tracing::info!(r" {} \__/ \__/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", BRIGHT_GREEN, NORMAL); + // tracing::info!(r" {} \_{}\__/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL); + // tracing::info!(r" /____/"); - info!(r""); - info!(r" {}version {}{}", BRIGHT_YELLOW, env!("CARGO_PKG_VERSION"), NORMAL); - info!(r""); - info!(r" documentation & reference material: https://syndicate-lang.org/"); - info!(r" source code & bugs: https://git.syndicate-lang.org/syndicate-lang/syndicate-rs"); - info!(r""); + tracing::info!(r""); + tracing::info!(r" {}version {}{}", BRIGHT_YELLOW, env!("CARGO_PKG_VERSION"), NORMAL); + tracing::info!(r""); + tracing::info!(r" documentation & reference material: https://syndicate-lang.org/"); + tracing::info!(r" source code & bugs: https://git.syndicate-lang.org/syndicate-lang/syndicate-rs"); + tracing::info!(r""); } let config = Arc::new(config::ServerConfig::from_args()); let mut daemons = Vec::new(); - trace!("startup"); + tracing::trace!("startup"); let ds = Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new()); let gateway = Actor::create_and_start( @@ -140,7 +138,7 @@ async fn run_connection( let (i, o) = match stream.peek(&mut buf).await? { 1 => match buf[0] { 71 /* ASCII 'G' for "GET" */ => { - info!(protocol = display("websocket"), peer = debug(addr)); + tracing::info!(protocol = display("websocket"), peer = debug(addr)); let s = tokio_tungstenite::accept_async(stream).await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; let (o, i) = s.split(); @@ -149,7 +147,7 @@ async fn run_connection( (relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o))) }, _ => { - info!(protocol = display("raw"), peer = debug(addr)); + tracing::info!(protocol = display("raw"), peer = debug(addr)); let (i, o) = stream.into_split(); (relay::Input::Bytes(Box::pin(i)), relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */))) @@ -161,7 +159,7 @@ async fn run_connection( struct ExitListener; impl Entity for ExitListener { fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture { - info!(exit_status = debug(exit_status), "disconnect"); + tracing::info!(exit_status = debug(exit_status), "disconnect"); Box::pin(ready(Ok(()))) } } diff --git a/src/dataspace.rs b/src/dataspace.rs index 02a6038..feec6af 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -69,7 +69,7 @@ impl Dataspace { impl Entity for Dataspace { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { - tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert"); + // tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert"); let old_assertions = self.index.assertion_count(); self.index.insert(t, &a); @@ -87,7 +87,7 @@ impl Entity for Dataspace { } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { - tracing::trace!(handle = debug(&h), "retract"); + // tracing::trace!(handle = debug(&h), "retract"); if let Some((a, maybe_o)) = self.handle_map.remove(&h) { if let Some(o) = maybe_o { @@ -104,7 +104,7 @@ impl Entity for Dataspace { } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { - tracing::trace!(body = debug(&m), "message"); + // tracing::trace!(body = debug(&m), "message"); self.index.send(t, &m, &mut self.churn.messages_delivered); self.churn.messages_injected += 1; diff --git a/src/relay.rs b/src/relay.rs index b8115e4..28fd9d7 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -15,6 +15,7 @@ use futures::SinkExt; use futures::Stream; use futures::StreamExt; +use preserves::error::Error as PreservesError; use preserves::error::is_eof_io_error; use preserves::value::BinarySource; use preserves::value::BytesBinarySource; @@ -30,6 +31,9 @@ use preserves::value::Reader; use preserves::value::Writer; use preserves::value::signed_integer::SignedInteger; +use preserves_schema::support::Deserialize; +use preserves_schema::support::ParseError; + use std::convert::TryFrom; use std::io; use std::pin::Pin; @@ -44,8 +48,6 @@ use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; -use tracing; - struct WireSymbol { oid: sturdy::Oid, obj: Arc, @@ -200,6 +202,7 @@ impl TunnelRelay { } fn handle_inbound_packet(&mut self, t: &mut Activation, p: Packet) -> ActorResult { + // tracing::trace!(packet = debug(&p), "-->"); match p { Packet::Error(b) => { tracing::info!(message = debug(b.message.clone()), @@ -317,7 +320,7 @@ impl TunnelRelay { fn encode_packet(&mut self, p: Packet) -> Result, Error> { let item = _Any::from(&p); - tracing::trace!(packet = debug(&item), "<--"); + // tracing::trace!(packet = debug(&item), "<--"); Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?) } @@ -468,9 +471,10 @@ pub async fn input_loop( } } Input::Bytes(mut r) => { - let mut buf = BytesMut::with_capacity(1024); + const BUFSIZE: usize = 65536; + let mut buf = BytesMut::with_capacity(BUFSIZE); loop { - buf.reserve(8192); + buf.reserve(BUFSIZE); let n = match r.read_buf(&mut buf).await { Ok(n) => n, Err(e) => @@ -531,11 +535,10 @@ impl Entity for TunnelRelay { } tunnel_relay::Input::Packet { bs } => { let mut src = BytesBinarySource::new(&bs); - let item = src.packed::<_, _Any, _>( - &mut ActivatedMembranes(t, &self.self_ref, &mut self.membranes)) - .demand_next(false)?; - tracing::trace!(packet = debug(&item), "-->"); - self.handle_inbound_packet(t, Packet::try_from(&item)?)?; + let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); + let mut r = src.packed::<_, _Any, _>(&mut dec); + let item = Packet::deserialize(&mut r)?; + self.handle_inbound_packet(t, item)?; } tunnel_relay::Input::Segment { bs } => { self.input_buffer.extend_from_slice(&bs); @@ -544,9 +547,11 @@ impl Entity for TunnelRelay { let mut src = BytesBinarySource::new(&self.input_buffer); let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); let mut r = src.packed::<_, _Any, _>(&mut dec); - let e = match r.next(false) { - Err(e) if is_eof_io_error(&e) => None, - result => result?, + let e = match Packet::deserialize(&mut r) { + Err(ParseError::Preserves(PreservesError::Io(e))) + if is_eof_io_error(&e) => + None, + result => Some(result?), }; (e, r.source.index) }; @@ -554,8 +559,7 @@ impl Entity for TunnelRelay { None => break, Some(item) => { self.input_buffer.advance(count); - tracing::trace!(packet = debug(&item), "-->"); - self.handle_inbound_packet(t, Packet::try_from(&item)?)?; + self.handle_inbound_packet(t, item)?; } } }