Switch to preserves-schema deserialize; minor performance tweaks

This commit is contained in:
Tony Garnock-Jones 2021-07-21 23:29:53 +02:00
parent 8cf6ace5f6
commit 052da62572
6 changed files with 62 additions and 52 deletions

View File

@ -13,7 +13,7 @@ vendored-openssl = ["openssl/vendored"]
[profile.release] [profile.release]
debug = true debug = true
lto = true # lto = true
[lib] [lib]
name = "syndicate" name = "syndicate"

View File

@ -42,6 +42,14 @@ arm-binary-release:
arm-binary-debug: arm-binary-debug:
cross build --target=armv7-unknown-linux-musleabihf --all-targets --features vendored-openssl cross build --target=armv7-unknown-linux-musleabihf --all-targets --features vendored-openssl
aarch64-binary: aarch64-binary-release
aarch64-binary-release:
cross build --target=aarch64-unknown-linux-musl --release --all-targets --features vendored-openssl
aarch64-binary-debug:
cross build --target=aarch64-unknown-linux-musl --all-targets --features vendored-openssl
pull-protocols: pull-protocols:
git subtree pull -P protocols \ git subtree pull -P protocols \
-m 'Merge latest changes from the syndicate-protocols repository' \ -m 'Merge latest changes from the syndicate-protocols repository' \

View File

@ -27,7 +27,6 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
// use tokio::sync::Notify; // use tokio::sync::Notify;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing;
use tracing::Instrument; use tracing::Instrument;
pub use super::schemas::internal_protocol::_Any; pub use super::schemas::internal_protocol::_Any;
@ -263,10 +262,11 @@ impl<'activation> Activation<'activation> {
if turn.len() == 0 { continue; } if turn.len() == 0 { continue; }
let first_ref = Arc::clone(&turn[0].0); let first_ref = Arc::clone(&turn[0].0);
let target = &first_ref.addr.mailbox; let target = &first_ref.addr.mailbox;
let _ = target.send( let mut turn_events = Vec::new();
&self.debtor, for (r, e) in turn.into_iter() {
Turn(turn.into_iter().map( turn_events.push(TurnEvent { oid: r.addr.oid.clone(), event: e });
|(r, e)| TurnEvent { oid: r.addr.oid.clone(), event: e }).collect())); }
let _ = target.send(&self.debtor, Turn(turn_events));
} }
} }

View File

@ -25,8 +25,6 @@ use syndicate::sturdy;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tracing::{info, trace};
use tungstenite::Message; use tungstenite::Message;
#[tokio::main] #[tokio::main]
@ -41,37 +39,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
const NORMAL: &str = "\x1b[0m"; const NORMAL: &str = "\x1b[0m";
const BRIGHT_YELLOW: &str = "\x1b[93m"; const BRIGHT_YELLOW: &str = "\x1b[93m";
info!(r"{} ______ {}", GREEN, NORMAL); tracing::info!(r"{} ______ {}", GREEN, NORMAL);
info!(r"{} / {}\_{}\{} ", GREEN, BRIGHT_GREEN, GREEN, NORMAL); tracing::info!(r"{} / {}\_{}\{} ", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
info!(r"{} / {},{}__/{} \ {} ____ __", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL); tracing::info!(r"{} / {},{}__/{} \ {} ____ __", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL);
info!(r"{} /{}\__/ \{},{} \{} _______ ______ ____/ /_/________ / /____", GREEN, BRIGHT_GREEN, RED, GREEN, NORMAL); tracing::info!(r"{} /{}\__/ \{},{} \{} _______ ______ ____/ /_/________ / /____", GREEN, BRIGHT_GREEN, RED, GREEN, NORMAL);
info!(r"{} \{}/ \__/ {}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL); tracing::info!(r"{} \{}/ \__/ {}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
info!(r"{} \ {}'{} \__{}/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL); tracing::info!(r"{} \ {}'{} \__{}/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", GREEN, RED, BRIGHT_GREEN, GREEN, NORMAL);
info!(r"{} \____{}/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL); tracing::info!(r"{} \____{}/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
info!(r" /____/"); tracing::info!(r" /____/");
// info!(r" {} __{}__{}__ {}", GREEN, BRIGHT_GREEN, GREEN, NORMAL); // tracing::info!(r" {} __{}__{}__ {}", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
// info!(r" {} /{}_/ \_{}\ {}", GREEN, BRIGHT_GREEN, GREEN, NORMAL); // tracing::info!(r" {} /{}_/ \_{}\ {}", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
// info!(r" {} / \__/ \ {} __ __", BRIGHT_GREEN, NORMAL); // tracing::info!(r" {} / \__/ \ {} __ __", BRIGHT_GREEN, NORMAL);
// info!(r" {}/{}\__/ \__/{}\{} _______ ______ ____/ /__________ / /____", GREEN, BRIGHT_GREEN, GREEN, NORMAL); // tracing::info!(r" {}/{}\__/ \__/{}\{} _______ ______ ____/ /__________ / /____", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
// info!(r" {}\{}/ \__/ \{}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL); // tracing::info!(r" {}\{}/ \__/ \{}/{} / ___/ / / / __ \/ __ / / ___/ __ \/ __/ _ \", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
// info!(r" {} \__/ \__/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", BRIGHT_GREEN, NORMAL); // tracing::info!(r" {} \__/ \__/ {} _\_ \/ /_/ / / / / /_/ / / /__/ /_/ / /_/ __/", BRIGHT_GREEN, NORMAL);
// info!(r" {} \_{}\__/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL); // tracing::info!(r" {} \_{}\__/{}_/ {} /____/\__, /_/ /_/\____/_/\___/\__/_/\__/\___/", GREEN, BRIGHT_GREEN, GREEN, NORMAL);
// info!(r" /____/"); // tracing::info!(r" /____/");
info!(r""); tracing::info!(r"");
info!(r" {}version {}{}", BRIGHT_YELLOW, env!("CARGO_PKG_VERSION"), NORMAL); tracing::info!(r" {}version {}{}", BRIGHT_YELLOW, env!("CARGO_PKG_VERSION"), NORMAL);
info!(r""); tracing::info!(r"");
info!(r" documentation & reference material: https://syndicate-lang.org/"); tracing::info!(r" documentation & reference material: https://syndicate-lang.org/");
info!(r" source code & bugs: https://git.syndicate-lang.org/syndicate-lang/syndicate-rs"); tracing::info!(r" source code & bugs: https://git.syndicate-lang.org/syndicate-lang/syndicate-rs");
info!(r""); tracing::info!(r"");
} }
let config = Arc::new(config::ServerConfig::from_args()); let config = Arc::new(config::ServerConfig::from_args());
let mut daemons = Vec::new(); let mut daemons = Vec::new();
trace!("startup"); tracing::trace!("startup");
let ds = Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new()); let ds = Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new());
let gateway = Actor::create_and_start( let gateway = Actor::create_and_start(
@ -140,7 +138,7 @@ async fn run_connection(
let (i, o) = match stream.peek(&mut buf).await? { let (i, o) = match stream.peek(&mut buf).await? {
1 => match buf[0] { 1 => match buf[0] {
71 /* ASCII 'G' for "GET" */ => { 71 /* ASCII 'G' for "GET" */ => {
info!(protocol = display("websocket"), peer = debug(addr)); tracing::info!(protocol = display("websocket"), peer = debug(addr));
let s = tokio_tungstenite::accept_async(stream).await let s = tokio_tungstenite::accept_async(stream).await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
let (o, i) = s.split(); let (o, i) = s.split();
@ -149,7 +147,7 @@ async fn run_connection(
(relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o))) (relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o)))
}, },
_ => { _ => {
info!(protocol = display("raw"), peer = debug(addr)); tracing::info!(protocol = display("raw"), peer = debug(addr));
let (i, o) = stream.into_split(); let (i, o) = stream.into_split();
(relay::Input::Bytes(Box::pin(i)), (relay::Input::Bytes(Box::pin(i)),
relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */))) relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */)))
@ -161,7 +159,7 @@ async fn run_connection(
struct ExitListener; struct ExitListener;
impl Entity for ExitListener { impl Entity for ExitListener {
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture<ActorResult> { fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture<ActorResult> {
info!(exit_status = debug(exit_status), "disconnect"); tracing::info!(exit_status = debug(exit_status), "disconnect");
Box::pin(ready(Ok(()))) Box::pin(ready(Ok(())))
} }
} }

View File

@ -69,7 +69,7 @@ impl Dataspace {
impl Entity for Dataspace { impl Entity for Dataspace {
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert"); // tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert");
let old_assertions = self.index.assertion_count(); let old_assertions = self.index.assertion_count();
self.index.insert(t, &a); self.index.insert(t, &a);
@ -87,7 +87,7 @@ impl Entity for Dataspace {
} }
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
tracing::trace!(handle = debug(&h), "retract"); // tracing::trace!(handle = debug(&h), "retract");
if let Some((a, maybe_o)) = self.handle_map.remove(&h) { if let Some((a, maybe_o)) = self.handle_map.remove(&h) {
if let Some(o) = maybe_o { if let Some(o) = maybe_o {
@ -104,7 +104,7 @@ impl Entity for Dataspace {
} }
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
tracing::trace!(body = debug(&m), "message"); // tracing::trace!(body = debug(&m), "message");
self.index.send(t, &m, &mut self.churn.messages_delivered); self.index.send(t, &m, &mut self.churn.messages_delivered);
self.churn.messages_injected += 1; self.churn.messages_injected += 1;

View File

@ -15,6 +15,7 @@ use futures::SinkExt;
use futures::Stream; use futures::Stream;
use futures::StreamExt; use futures::StreamExt;
use preserves::error::Error as PreservesError;
use preserves::error::is_eof_io_error; use preserves::error::is_eof_io_error;
use preserves::value::BinarySource; use preserves::value::BinarySource;
use preserves::value::BytesBinarySource; use preserves::value::BytesBinarySource;
@ -30,6 +31,9 @@ use preserves::value::Reader;
use preserves::value::Writer; use preserves::value::Writer;
use preserves::value::signed_integer::SignedInteger; use preserves::value::signed_integer::SignedInteger;
use preserves_schema::support::Deserialize;
use preserves_schema::support::ParseError;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
@ -44,8 +48,6 @@ use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
use tracing;
struct WireSymbol { struct WireSymbol {
oid: sturdy::Oid, oid: sturdy::Oid,
obj: Arc<Ref>, obj: Arc<Ref>,
@ -200,6 +202,7 @@ impl TunnelRelay {
} }
fn handle_inbound_packet(&mut self, t: &mut Activation, p: Packet) -> ActorResult { fn handle_inbound_packet(&mut self, t: &mut Activation, p: Packet) -> ActorResult {
// tracing::trace!(packet = debug(&p), "-->");
match p { match p {
Packet::Error(b) => { Packet::Error(b) => {
tracing::info!(message = debug(b.message.clone()), tracing::info!(message = debug(b.message.clone()),
@ -317,7 +320,7 @@ impl TunnelRelay {
fn encode_packet(&mut self, p: Packet) -> Result<Vec<u8>, Error> { fn encode_packet(&mut self, p: Packet) -> Result<Vec<u8>, Error> {
let item = _Any::from(&p); let item = _Any::from(&p);
tracing::trace!(packet = debug(&item), "<--"); // tracing::trace!(packet = debug(&item), "<--");
Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?) Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?)
} }
@ -468,9 +471,10 @@ pub async fn input_loop(
} }
} }
Input::Bytes(mut r) => { Input::Bytes(mut r) => {
let mut buf = BytesMut::with_capacity(1024); const BUFSIZE: usize = 65536;
let mut buf = BytesMut::with_capacity(BUFSIZE);
loop { loop {
buf.reserve(8192); buf.reserve(BUFSIZE);
let n = match r.read_buf(&mut buf).await { let n = match r.read_buf(&mut buf).await {
Ok(n) => n, Ok(n) => n,
Err(e) => Err(e) =>
@ -531,11 +535,10 @@ impl Entity for TunnelRelay {
} }
tunnel_relay::Input::Packet { bs } => { tunnel_relay::Input::Packet { bs } => {
let mut src = BytesBinarySource::new(&bs); let mut src = BytesBinarySource::new(&bs);
let item = src.packed::<_, _Any, _>( let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
&mut ActivatedMembranes(t, &self.self_ref, &mut self.membranes)) let mut r = src.packed::<_, _Any, _>(&mut dec);
.demand_next(false)?; let item = Packet::deserialize(&mut r)?;
tracing::trace!(packet = debug(&item), "-->"); self.handle_inbound_packet(t, item)?;
self.handle_inbound_packet(t, Packet::try_from(&item)?)?;
} }
tunnel_relay::Input::Segment { bs } => { tunnel_relay::Input::Segment { bs } => {
self.input_buffer.extend_from_slice(&bs); self.input_buffer.extend_from_slice(&bs);
@ -544,9 +547,11 @@ impl Entity for TunnelRelay {
let mut src = BytesBinarySource::new(&self.input_buffer); let mut src = BytesBinarySource::new(&self.input_buffer);
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
let mut r = src.packed::<_, _Any, _>(&mut dec); let mut r = src.packed::<_, _Any, _>(&mut dec);
let e = match r.next(false) { let e = match Packet::deserialize(&mut r) {
Err(e) if is_eof_io_error(&e) => None, Err(ParseError::Preserves(PreservesError::Io(e)))
result => result?, if is_eof_io_error(&e) =>
None,
result => Some(result?),
}; };
(e, r.source.index) (e, r.source.index)
}; };
@ -554,8 +559,7 @@ impl Entity for TunnelRelay {
None => break, None => break,
Some(item) => { Some(item) => {
self.input_buffer.advance(count); self.input_buffer.advance(count);
tracing::trace!(packet = debug(&item), "-->"); self.handle_inbound_packet(t, item)?;
self.handle_inbound_packet(t, Packet::try_from(&item)?)?;
} }
} }
} }