From 7fb20c11afe5414f3bcaed7f5edfa6df1f40b953 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 6 Jul 2021 20:56:36 +0200 Subject: [PATCH] It actually takes connections again now! Still WIP --- Cargo.lock | 1 - build.rs | 21 ++ local-protocols/Makefile | 8 + local-protocols/schema-bundle.bin | 3 + local-protocols/schemas/tunnelRelay.prs | 4 + src/actor.rs | 283 +++++++++++++++-------- src/bin/syndicate-server.rs | 77 +++---- src/lib.rs | 5 +- src/packets.rs | 35 --- src/{peer.rs => peer.OLD.rs} | 0 src/relay.rs | 293 ++++++++++++++++++++++++ 11 files changed, 549 insertions(+), 181 deletions(-) create mode 100644 build.rs create mode 100644 local-protocols/Makefile create mode 100644 local-protocols/schema-bundle.bin create mode 100644 local-protocols/schemas/tunnelRelay.prs delete mode 100644 src/packets.rs rename src/{peer.rs => peer.OLD.rs} (100%) create mode 100644 src/relay.rs diff --git a/Cargo.lock b/Cargo.lock index 15d8d05..918b15b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -878,7 +878,6 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" name = "preserves" version = "0.15.0" dependencies = [ - "lazy_static", "num", "serde", "serde_bytes", diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..ceb426a --- /dev/null +++ b/build.rs @@ -0,0 +1,21 @@ +use preserves_schema::compiler::*; + +use std::io::Error; + +fn main() -> Result<(), Error> { + let buildroot = std::env::current_dir()?; + + let mut gen_dir = buildroot.clone(); + gen_dir.push("src/schemas"); + + let mut c = CompilerConfig::new(gen_dir, "crate::schemas".to_owned()); + c.module_aliases.insert(vec!["EntityRef".to_owned()], "crate::actor".to_owned()); + + let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned(), + "local-protocols/schema-bundle.bin".to_owned()])?; + for i in &inputs { + println!("cargo:rerun-if-changed={:?}", i); + } + c.load_schemas_and_bundles(&inputs)?; + compile(&c) +} diff --git a/local-protocols/Makefile b/local-protocols/Makefile new file mode 100644 index 0000000..1efe0bd --- /dev/null +++ b/local-protocols/Makefile @@ -0,0 +1,8 @@ +all: schema-bundle.bin + +clean: + rm -f schema-bundle.bin + +schema-bundle.bin: schemas/*.prs + preserves-schemac schemas/*.prs > $@.tmp + mv $@.tmp $@ diff --git a/local-protocols/schema-bundle.bin b/local-protocols/schema-bundle.bin new file mode 100644 index 0000000..b8dd924 --- /dev/null +++ b/local-protocols/schema-bundle.bin @@ -0,0 +1,3 @@ +´³bundle·µ³ tunnelRelay„´³schema·³version‘³ definitions·³Input´³orµµ±eof´³lit³eof„„µ±packet´³rec´³lit³packet„´³tupleµ´³named³bs´³atom³ +ByteString„„„„„„µ±segment´³rec´³lit³segment„´³tupleµ´³named³bs´³atom³ +ByteString„„„„„„„„„³ embeddedType´³refµ³ EntityRef„³Ref„„„„„ \ No newline at end of file diff --git a/local-protocols/schemas/tunnelRelay.prs b/local-protocols/schemas/tunnelRelay.prs new file mode 100644 index 0000000..fcf808c --- /dev/null +++ b/local-protocols/schemas/tunnelRelay.prs @@ -0,0 +1,4 @@ +version 1 . +embeddedType EntityRef.Ref . + +Input = =eof / / . diff --git a/src/actor.rs b/src/actor.rs index 64dcd8a..b7452d0 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -1,3 +1,7 @@ +pub use futures::future::BoxFuture; + +pub use std::future::ready; + use super::Assertion; use super::ActorId; use super::Handle; @@ -5,16 +9,12 @@ use super::schemas::internal_protocol::*; use super::error::Error; use preserves::value::Domain; -use preserves::value::IOResult; use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; use std::boxed::Box; -use std::cell::Cell; use std::collections::hash_map::HashMap; -use std::future::Future; -use std::future::ready; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -27,7 +27,7 @@ use tracing::{Instrument, trace, error}; pub type ActorResult = Result<(), Error>; pub type ActorHandle = tokio::task::JoinHandle; -pub trait Entity { +pub trait Entity: Send { fn assert(&mut self, _t: &mut Activation, _a: Assertion, _h: Handle) -> ActorResult { Ok(()) } @@ -41,6 +41,12 @@ pub trait Entity { t.message(peer, Assertion::new(true)); Ok(()) } + fn turn_end(&mut self, _t: &mut Activation) -> ActorResult { + Ok(()) + } + fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &ActorResult) -> BoxFuture { + Box::pin(ready(Ok(()))) + } } type OutboundAssertions = Map>; @@ -48,10 +54,12 @@ type OutboundAssertions = Map>; // This is what other implementations call a "Turn", renamed here to // avoid conflicts with schemas::internal_protocol::Turn. pub struct Activation<'activation> { - outbound_assertions: &'activation mut OutboundAssertions, + pub actor: &'activation mut Actor, queues: HashMap, Event)>>, + turn_end_revisit_flag: bool, } +#[derive(Debug)] enum SystemMessage { Release, ReleaseOid(Oid), @@ -63,17 +71,21 @@ pub struct Mailbox { pub actor_id: ActorId, pub mailbox_id: u64, tx: UnboundedSender, - pub queue_depth: Arc, - pub mailbox_count: Arc, + queue_depth: Arc, + mailbox_count: Arc, } pub struct Actor { - pub template_mailbox: Mailbox, + actor_id: ActorId, + tx: UnboundedSender, rx: UnboundedReceiver, - pub outbound_assertions: OutboundAssertions, - pub oid_map: Map>>, - pub next_task_id: u64, - pub linked_tasks: Map, + queue_depth: Arc, + mailbox_count: Arc, + outbound_assertions: OutboundAssertions, + oid_map: Map>, + next_task_id: u64, + linked_tasks: Map, + exit_hooks: Vec, } #[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -87,13 +99,10 @@ pub struct Ref { impl<'activation> Activation<'activation> { pub fn for_actor(actor: &'activation mut Actor) -> Self { - Self::for_actor_details(&mut actor.outbound_assertions) - } - - pub fn for_actor_details(outbound_assertions: &'activation mut OutboundAssertions) -> Self { Activation { - outbound_assertions, + actor, queues: HashMap::new(), + turn_end_revisit_flag: false, } } @@ -101,12 +110,12 @@ impl<'activation> Activation<'activation> { let handle = crate::next_handle(); self.queue_for(&r).push((Arc::clone(&r), Event::Assert(Box::new( Assert { assertion: Assertion(a.into()), handle: handle.clone() })))); - self.outbound_assertions.insert(handle.clone(), r); + self.actor.outbound_assertions.insert(handle.clone(), r); handle } pub fn retract(&mut self, handle: Handle) { - if let Some(r) = self.outbound_assertions.remove(&handle) { + if let Some(r) = self.actor.outbound_assertions.remove(&handle) { self.retract_known_ref(r, handle) } } @@ -120,11 +129,15 @@ impl<'activation> Activation<'activation> { Message { body: Assertion(m.into()) })))) } + pub fn set_turn_end_flag(&mut self) { + self.turn_end_revisit_flag = true; + } + fn queue_for(&mut self, r: &Arc) -> &mut Vec<(Arc, Event)> { self.queues.entry(r.relay.actor_id).or_default() } - pub fn deliver(&mut self) { + fn deliver(&mut self) { for (_actor_id, turn) in std::mem::take(&mut self.queues).into_iter() { if turn.len() == 0 { continue; } let first_ref = Arc::clone(&turn[0].0); @@ -133,6 +146,24 @@ impl<'activation> Activation<'activation> { |(r, e)| TurnEvent { oid: r.target.clone(), event: e }).collect())); } } + + fn with_oid R, + Fs: FnOnce(&mut Self, &mut Box) -> R>( + &mut self, + oid: &Oid, + kf: Ff, + ks: Fs, + ) -> R { + match self.actor.oid_map.remove_entry(&oid) { + None => kf(self), + Some((k, mut e)) => { + let result = ks(self, &mut e); + self.actor.oid_map.insert(k, e); + result + } + } + } } impl<'activation> Drop for Activation<'activation> { @@ -182,20 +213,28 @@ impl PartialOrd for Mailbox { impl Clone for Mailbox { fn clone(&self) -> Self { let Mailbox { actor_id, tx, queue_depth, mailbox_count, .. } = self; - mailbox_count.fetch_add(1, Ordering::SeqCst); - Mailbox { + let _old_refcount = mailbox_count.fetch_add(1, Ordering::SeqCst); + let new_mailbox = Mailbox { actor_id: *actor_id, mailbox_id: crate::next_mailbox_id(), tx: tx.clone(), queue_depth: Arc::clone(queue_depth), mailbox_count: Arc::clone(mailbox_count), - } + }; + trace!(old_mailbox = debug(&self), + new_mailbox = debug(&new_mailbox), + new_mailbox_refcount = debug(_old_refcount + 1)); + new_mailbox } } impl Drop for Mailbox { fn drop(&mut self) { - if self.mailbox_count.fetch_sub(1, Ordering::SeqCst) == 1 { + let old_mailbox_refcount = self.mailbox_count.fetch_sub(1, Ordering::SeqCst); + let new_mailbox_refcount = old_mailbox_refcount - 1; + trace!(mailbox = debug(&self), + new_mailbox_refcount); + if new_mailbox_refcount == 0 { let _ = self.tx.send(SystemMessage::Release); () } @@ -205,58 +244,99 @@ impl Drop for Mailbox { impl Actor { pub fn new() -> Self { let (tx, rx) = unbounded_channel(); + let actor_id = crate::next_actor_id(); + trace!(id = actor_id, "Actor::new"); Actor { - template_mailbox: Mailbox { - actor_id: crate::next_actor_id(), - mailbox_id: crate::next_mailbox_id(), - tx, - queue_depth: Arc::new(AtomicUsize::new(0)), - mailbox_count: Arc::new(AtomicUsize::new(0)), - }, + actor_id, + tx, rx, + queue_depth: Arc::new(AtomicUsize::new(0)), + mailbox_count: Arc::new(AtomicUsize::new(0)), outbound_assertions: Map::new(), oid_map: Map::new(), next_task_id: 0, linked_tasks: Map::new(), + exit_hooks: Vec::new(), } } pub fn id(&self) -> ActorId { - self.template_mailbox.actor_id + self.actor_id + } + + fn mailbox(&mut self) -> Mailbox { + let _old_refcount = self.mailbox_count.fetch_add(1, Ordering::SeqCst); + let new_mailbox = Mailbox { + actor_id: self.actor_id, + mailbox_id: crate::next_mailbox_id(), + tx: self.tx.clone(), + queue_depth: Arc::clone(&self.queue_depth), + mailbox_count: Arc::clone(&self.mailbox_count), + }; + trace!(new_mailbox = debug(&new_mailbox), + new_mailbox_refcount = debug(_old_refcount + 1)); + new_mailbox + } + + pub fn shutdown(&mut self) { + let _ = self.tx.send(SystemMessage::Release); + () } pub fn create(&mut self, e: E) -> Arc { let r = Ref { - relay: self.template_mailbox.clone(), + relay: self.mailbox(), target: crate::next_oid(), }; - self.oid_map.insert(r.target.clone(), Cell::new(Box::new(e))); + self.oid_map.insert(r.target.clone(), Box::new(e)); Arc::new(r) } - pub fn boot + Send + 'static>( + pub fn boot BoxFuture>( mut self, name: tracing::Span, boot: F, ) -> ActorHandle { + let id = self.id(); tokio::spawn(async move { - trace!("start"); - let run_future = self.run(boot); - let result = run_future.await; + trace!(id, "start"); + let result = self.run(boot).await; + { + let mut t = Activation::for_actor(&mut self); + for oid in std::mem::take(&mut t.actor.exit_hooks) { + match t.actor.oid_map.remove_entry(&oid) { + None => (), + Some((k, mut e)) => { + if let Err(err) = e.exit_hook(&mut t, &result).await { + tracing::warn!(err = debug(err), + oid = debug(oid), + "error in exit hook"); + } + t.actor.oid_map.insert(k, e); + } + } + } + } match &result { - Ok(()) => trace!("normal stop"), - Err(e) => error!("{}", e), + Ok(()) => trace!(id, "normal stop"), + Err(e) => error!(id, "error stop: {}", e), } result }.instrument(name)) } pub fn start(self, name: tracing::Span) -> ActorHandle { - self.boot(name, ready(Ok(()))) + self.boot(name, |_ac| Box::pin(ready(Ok(())))) } - async fn run>(&mut self, boot: F) -> ActorResult { - boot.await?; + async fn run BoxFuture>( + &mut self, + boot: F, + ) -> ActorResult { + let id = self.id(); + trace!(id, "boot"); + boot(self).await?; + trace!(id, "run"); loop { match self.rx.recv().await { None => @@ -265,6 +345,7 @@ impl Actor { detail: _Any::new(false), })?, Some(m) => { + trace!(id, m = debug(&m), "system message"); if self.handle(m)? { return Ok(()); } @@ -273,12 +354,16 @@ impl Actor { // (instead zeroing it on queue empty - it only needs to be approximate), // but try_recv has been removed from mpsc at the time of writing. See // https://github.com/tokio-rs/tokio/issues/3350 . - self.template_mailbox.queue_depth.fetch_sub(1, Ordering::Relaxed); + self.queue_depth.fetch_sub(1, Ordering::Relaxed); } } } } + pub fn add_exit_hook(&mut self, oid: &Oid) { + self.exit_hooks.push(oid.clone()) + } + fn handle(&mut self, m: SystemMessage) -> Result { match m { SystemMessage::Release => @@ -288,30 +373,35 @@ impl Actor { Ok(false) } SystemMessage::Turn(Turn(events)) => { + let mut t = Activation::for_actor(self); + let mut revisit_oids = Vec::new(); for TurnEvent { oid, event } in events.into_iter() { - if let Some(e) = self.oid_map.get_mut(&oid) { - let mut t = Activation::for_actor_details(&mut self.outbound_assertions); - let e = e.get_mut(); - match event { - Event::Assert(b) => { - let Assert { assertion: Assertion(assertion), handle } = *b; - e.assert(&mut t, assertion, handle)?; - } - Event::Retract(b) => { - let Retract { handle } = *b; - e.retract(&mut t, handle)?; - } - Event::Message(b) => { - let Message { body: Assertion(body) } = *b; - e.message(&mut t, body)?; - } - Event::Sync(b) => { - let Sync { peer } = *b; - e.sync(&mut t, peer)?; - } + t.with_oid(&oid, |_| Ok(()), |t, e| match event { + Event::Assert(b) => { + let Assert { assertion: Assertion(assertion), handle } = *b; + e.assert(t, assertion, handle) } + Event::Retract(b) => { + let Retract { handle } = *b; + e.retract(t, handle) + } + Event::Message(b) => { + let Message { body: Assertion(body) } = *b; + e.message(t, body) + } + Event::Sync(b) => { + let Sync { peer } = *b; + e.sync(t, peer) + } + })?; + if t.turn_end_revisit_flag { + t.turn_end_revisit_flag = false; + revisit_oids.push(oid); } } + for oid in revisit_oids { + t.with_oid(&oid, |_| Ok(()), |t, e| e.turn_end(t))?; + } Ok(false) } SystemMessage::Crash(e) => @@ -319,33 +409,41 @@ impl Actor { } } - pub fn linked_task + Send + 'static>( + pub fn linked_task + Send + 'static>( &mut self, name: tracing::Span, boot: F, - ) { - let mailbox = self.template_mailbox.clone(); + ) -> ActorHandle { + let id = self.id(); + let mailbox = self.mailbox(); let token = CancellationToken::new(); let task_id = self.next_task_id; self.next_task_id += 1; - { + let handle = { let token = token.clone(); tokio::spawn(async move { - trace!("linked task start"); + trace!(id, task_id, "linked task start"); select! { - _ = token.cancelled() => (), - result = boot => match result { - Ok(()) => trace!("linked task normal stop"), - Err(e) => { - error!("linked task error: {}", e); - let _ = mailbox.tx.send(SystemMessage::Crash(e)); - () + _ = token.cancelled() => { + trace!(id, task_id, "linked task cancelled"); + Ok(()) + } + result = boot => { + match &result { + Ok(()) => trace!(id, task_id, "linked task normal stop"), + Err(e) => { + error!(id, task_id, "linked task error: {}", e); + let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); + () + } } + result } } - }.instrument(name)); - } + }.instrument(name)) + }; self.linked_tasks.insert(task_id, token); + handle } } @@ -363,6 +461,12 @@ impl Drop for Actor { } } +impl Ref { + pub fn external_event(&self, event: Event) { + self.relay.send(Turn(vec![TurnEvent { oid: self.target.clone(), event }])) + } +} + impl Drop for Ref { fn drop(&mut self) { let _ = self.relay.tx.send(SystemMessage::ReleaseOid(self.target.clone())); @@ -370,20 +474,17 @@ impl Drop for Ref { } } -impl Domain for Ref { - fn from_preserves(v: IOValue) -> IOResult { - panic!("aiee") - } - fn as_preserves(&self) -> IOValue { - panic!("aiee") +impl Domain for Ref {} + +impl std::convert::TryFrom<&IOValue> for Ref { + type Error = preserves_schema::support::ParseError; + fn try_from(_v: &IOValue) -> Result { + panic!("Attempted to serialize Ref via IOValue"); } } -impl Domain for super::schemas::sturdy::WireRef { - fn from_preserves(v: IOValue) -> IOResult { - panic!("aiee") - } - fn as_preserves(&self) -> IOValue { - panic!("aiee") +impl std::convert::From<&Ref> for IOValue { + fn from(_v: &Ref) -> IOValue { + panic!("Attempted to deserialize Ref via IOValue"); } } diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index c5d2759..04f2f0e 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -1,12 +1,6 @@ -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; +use futures::StreamExt; -use preserves::value::PackedReader; -use preserves::value::PackedWriter; -use preserves::value::Reader; -use preserves::value::Writer; - -use std::convert::TryFrom; -use std::future::Ready; use std::future::ready; use std::sync::Arc; @@ -17,12 +11,10 @@ use syndicate::dataspace::*; use syndicate::error::Error; use syndicate::error::error; use syndicate::config; -use syndicate::packets; -use syndicate::peer::Peer; +use syndicate::relay; use tokio::net::TcpListener; use tokio::net::TcpStream; -use tokio_util::codec::Framed; use tracing::{Level, info, trace}; @@ -31,7 +23,7 @@ use tungstenite::Message; #[tokio::main] async fn main() -> Result<(), Box> { let filter = tracing_subscriber::filter::EnvFilter::from_default_env() - .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()); + .add_directive(tracing_subscriber::filter::LevelFilter::TRACE.into()); let subscriber = tracing_subscriber::FmtSubscriber::builder() .with_ansi(true) .with_max_level(Level::TRACE) @@ -80,7 +72,7 @@ async fn main() -> Result<(), Box> { trace!("startup"); let ds = { - let ac = Actor::new(); + let mut ac = Actor::new(); let ds = ac.create(Dataspace::new()); daemons.push(ac.start(tracing::info_span!("dataspace"))); ds @@ -89,8 +81,9 @@ async fn main() -> Result<(), Box> { for port in config.ports.clone() { let ds = Arc::clone(&ds); let config = Arc::clone(&config); - let ac = Actor::new(); - ac.linked_task(tracing::info_span!("listener", port), run_listener(ds, port, config)); + let mut ac = Actor::new(); + ac.linked_task(tracing::info_span!("tcp", port), run_listener(ds, port, config)); + daemons.push(ac.start(tracing::info_span!("tcp", port))); } futures::future::join_all(daemons).await; @@ -103,29 +96,15 @@ fn message_error(e: E) -> Error { error(&e.to_string(), false) } -fn encode_message(p: packets::Packet) -> Result { - let mut bs = Vec::with_capacity(128); - PackedWriter::new(&mut bs).write(&(&p).into())?; - Ok(Message::Binary(bs)) -} - -fn message_encoder(p: packets::Packet) -> Ready> -{ - ready(encode_message(p)) -} - -fn message_decoder_inner( +fn extract_binary_packets( r: Result, -) -> Result, Error> { +) -> Result>, Error> { match r { Ok(m) => match m { Message::Text(_) => Err("Text websocket frames are not accepted")?, - Message::Binary(bs) => { - let iov = PackedReader::decode_bytes(&bs).demand_next(false)?; - let p = packets::Packet::try_from(&iov)?; - Ok(Some(p)) - } + Message::Binary(bs) => + Ok(Some(bs)), Message::Ping(_) => Ok(None), // pings are handled by tungstenite before we see them Message::Pong(_) => @@ -137,50 +116,46 @@ fn message_decoder_inner( } } -fn message_decoder(r: Result) -> Ready>> { - ready(message_decoder_inner(r).transpose()) -} - async fn run_connection( - mut stream: TcpStream, + ac: &mut Actor, + stream: TcpStream, ds: Arc, addr: std::net::SocketAddr, config: Arc, ) -> ActorResult { let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect - match stream.peek(&mut buf).await? { + let (i, o) = match stream.peek(&mut buf).await? { 1 => match buf[0] { 71 /* ASCII 'G' for "GET" */ => { info!(protocol = display("websocket"), peer = debug(addr)); let s = tokio_tungstenite::accept_async(stream).await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; let (o, i) = s.split(); - let i = i.filter_map(message_decoder); - let o = o.sink_map_err(message_error).with(message_encoder); - let mut p = Peer::new(i, o, ds, config); - p.run().await? + let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose())); + let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs)))); + (relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o))) }, _ => { info!(protocol = display("raw"), peer = debug(addr)); - let (o, i) = Framed::new(stream, packets::Codec).split(); - let mut p = Peer::new(i, o, ds, config); - p.run().await? + let (i, o) = stream.into_split(); + (relay::Input::Bytes(Box::pin(i)), relay::Output::Bytes(Box::pin(o))) } } 0 => Err(error("closed before starting", false))?, _ => unreachable!() - } - Ok(()) + }; + Ok(relay::TunnelRelay::run(ac, i, o)?) } async fn run_listener(ds: Arc, port: u16, config: Arc) -> ActorResult { let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?; loop { let (stream, addr) = listener.accept().await?; - let mut ac = Actor::new(); let ds = Arc::clone(&ds); let config = Arc::clone(&config); - ac.linked_task(tracing::info_span!("connection", id = (ac.id())), - run_connection(stream, ds, addr, config)); + let ac = Actor::new(); + let id = ac.id(); + ac.boot(tracing::info_span!(parent: None, "connection", id), + move |ac| Box::pin(run_connection(ac, stream, ds, addr, config))); } } diff --git a/src/lib.rs b/src/lib.rs index 5b363f3..41728f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,10 +13,9 @@ pub mod bag; pub mod config; pub mod dataspace; pub mod error; -pub mod schemas; -pub mod packets; pub mod pattern; -pub mod peer; +pub mod relay; +pub mod schemas; pub mod skeleton; pub type Assertion = schemas::dataspace::_Any; diff --git a/src/packets.rs b/src/packets.rs deleted file mode 100644 index 1200f01..0000000 --- a/src/packets.rs +++ /dev/null @@ -1,35 +0,0 @@ -pub use crate::schemas::internal_protocol::*; - -use bytes::{Buf, BufMut, BytesMut}; - -use std::convert::TryFrom; - -use preserves::value::PackedReader; -use preserves::value::PackedWriter; -use preserves::value::Reader; -use preserves::value::Writer; - -pub struct Codec; - -impl tokio_util::codec::Decoder for Codec { - type Item = Packet; - type Error = Error; - fn decode(&mut self, bs: &mut BytesMut) -> Result, Self::Error> { - let mut r = PackedReader::decode_bytes(bs); - match r.next(false)? { - None => Ok(None), - Some(item) => { - let count = r.source.index; - bs.advance(count); - Ok(Some(Packet::try_from(&item)?)) - } - } - } -} - -impl tokio_util::codec::Encoder<&Packet> for Codec { - type Error = Error; - fn encode(&mut self, item: &Packet, bs: &mut BytesMut) -> Result<(), Self::Error> { - Ok(PackedWriter::new(&mut bs.writer()).write(&item.into())?) - } -} diff --git a/src/peer.rs b/src/peer.OLD.rs similarity index 100% rename from src/peer.rs rename to src/peer.OLD.rs diff --git a/src/relay.rs b/src/relay.rs new file mode 100644 index 0000000..457cdae --- /dev/null +++ b/src/relay.rs @@ -0,0 +1,293 @@ +use bytes::Buf; +use bytes::BytesMut; + +use crate::actor::*; +use crate::error::Error; +use crate::schemas::internal_protocol::*; +use crate::schemas::tunnel_relay; + +use futures::Sink; +use futures::SinkExt; +use futures::Stream; +use futures::StreamExt; + +use preserves::value::BinarySource; +use preserves::value::BytesBinarySource; +use preserves::value::DomainDecode; +use preserves::value::DomainEncode; +use preserves::value::IOValue; +use preserves::value::Map; +use preserves::value::NoEmbeddedDomainCodec; +use preserves::value::PackedReader; +use preserves::value::PackedWriter; +use preserves::value::Reader; +use preserves::value::Writer; +use preserves_schema::support::lazy_static; + +use std::convert::TryFrom; +use std::io; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic; + +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; + +use tracing; + +struct WireSymbol { + oid: Oid, + obj: Arc, + ref_count: atomic::AtomicUsize, +} + +struct Membrane { + oid_map: Map>, + ref_map: Map, Arc>, +} + +struct Membranes { + exported: Membrane, + imported: Membrane, +} + +pub enum Input { + Packets(Pin, Error>> + Send>>), + Bytes(Pin>), +} + +pub enum Output { + Packets(Pin, Error = Error> + Send>>), + Bytes(Pin>), +} + +// There are other kinds of relay. This one has exactly two participants connected to each other. +pub struct TunnelRelay +{ + input_buffer: BytesMut, + o: Output, + inbound_assertions: Map>)>, + outbound_assertions: Map>>, + membranes: Membranes, + pending_outbound: Vec, +} + +lazy_static! { + static ref INERT_REF: Arc = { + struct InertEntity; + impl crate::actor::Entity for InertEntity {} + let mut ac = crate::actor::Actor::new(); + ac.create(InertEntity) + }; +} + +//--------------------------------------------------------------------------- + +impl WireSymbol { + fn acquire(&self) { + self.ref_count.fetch_add(1, atomic::Ordering::Relaxed); + } + + fn release(&self) -> bool { + self.ref_count.fetch_sub(1, atomic::Ordering::Relaxed) == 1 + } +} + +impl Membrane { + fn new() -> Self { + Membrane { + oid_map: Map::new(), + ref_map: Map::new(), + } + } + + fn insert(&mut self, oid: Oid, obj: Arc) -> Arc { + let ws = Arc::new(WireSymbol { + oid: oid.clone(), + obj: Arc::clone(&obj), + ref_count: atomic::AtomicUsize::new(0), + }); + self.oid_map.insert(oid, Arc::clone(&ws)); + self.ref_map.insert(obj, Arc::clone(&ws)); + ws + } + + fn release(&mut self, ws: &Arc) { + if ws.release() { + self.oid_map.remove(&ws.oid); + self.ref_map.remove(&ws.obj); + } + } +} + +impl TunnelRelay { + pub fn run(ac: &mut Actor, i: Input, o: Output) -> ActorResult { + let tr = ac.create(TunnelRelay { + input_buffer: BytesMut::with_capacity(1024), + o, + inbound_assertions: Map::new(), + outbound_assertions: Map::new(), + membranes: Membranes { + exported: Membrane::new(), + imported: Membrane::new(), + }, + pending_outbound: Vec::new(), + }); + ac.add_exit_hook(&tr.target); + ac.linked_task(tracing::info_span!("reader"), input_loop(i, tr)); + Ok(()) + } + + fn handle_packet(&mut self, p: Packet) -> ActorResult { + match p { + Packet::Error(b) => { + tracing::info!(message = debug(b.message.clone()), + detail = debug(b.detail.clone()), + "received Error from peer"); + Err(*b) + }, + Packet::Turn(b) => { + let Turn(events) = *b; + for TurnEvent { oid, event } in events { + tracing::info!(oid = debug(oid), event = debug(event)) + } + Ok(()) + } + } + } + + pub fn send_event(&mut self, oid: Oid, event: Event) -> bool { + let need_flush = self.pending_outbound.is_empty(); + self.pending_outbound.push(TurnEvent { oid, event }); + need_flush + } + + pub fn decode_packet(&mut self, bs: &[u8]) -> Result { + let mut src = BytesBinarySource::new(bs); + Ok(Packet::try_from(&src.packed::<_, _Any, _>(&mut self.membranes).demand_next(false)?)?) + } + + fn encode_packet(&mut self, p: Packet) -> Result, Error> { + Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &_Any::from(&p))?) + } + + pub async fn send_packet(&mut self, p: Packet) -> ActorResult { + let bs = self.encode_packet(p)?; + match &mut self.o { + Output::Packets(sink) => Ok(sink.send(bs).await?), + Output::Bytes(w) => Ok(w.write_all(&bs).await?), + } + } +} + +impl DomainDecode<_Ptr> for Membranes { + fn decode_embedded<'de, 'src, S: BinarySource<'de>>( + &mut self, + src: &'src mut S, + _read_annotations: bool, + ) -> io::Result<_Ptr> { + let v: IOValue = PackedReader::new(src, NoEmbeddedDomainCodec).demand_next(false)?; + Ok(Arc::new(_Dom::try_from(&v)?)) + } +} + +impl DomainEncode<_Ptr> for Membranes { + fn encode_embedded( + &mut self, + w: &mut W, + d: &_Ptr, + ) -> io::Result<()> { + w.write(&mut NoEmbeddedDomainCodec, &IOValue::from(d.as_ref())) + } +} + +pub async fn input_loop( + i: Input, + relay: Arc, +) -> ActorResult { + fn s>(relay: &Arc, m: M) { + relay.external_event(Event::Message(Box::new(Message { body: Assertion(m.into()) }))) + } + + match i { + Input::Packets(mut src) => { + loop { + match src.next().await { + None => { + s(&relay, &tunnel_relay::Input::Eof); + return Ok(()); + } + Some(bs) => s(&relay, &tunnel_relay::Input::Packet { bs: bs? }), + } + } + } + Input::Bytes(mut r) => { + let mut buf = BytesMut::with_capacity(1024); + loop { + buf.reserve(1024); + let n = r.read_buf(&mut buf).await?; + match n { + 0 => { + s(&relay, &tunnel_relay::Input::Eof); + return Ok(()); + } + _ => { + while buf.has_remaining() { + let bs = buf.chunk(); + let n = bs.len(); + s(&relay, &tunnel_relay::Input::Segment { bs: bs.to_vec() }); + buf.advance(n); + } + } + } + } + } + } +} + +impl Entity for TunnelRelay { + fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { + if let Ok(m) = tunnel_relay::Input::try_from(&m) { + match m { + tunnel_relay::Input::Eof => { + tracing::info!("eof"); + t.actor.shutdown(); + } + tunnel_relay::Input::Packet { bs } => { + let p = self.decode_packet(&bs)?; + self.handle_packet(p)? + } + tunnel_relay::Input::Segment { bs } => { + self.input_buffer.extend_from_slice(&bs); + loop { + let (e, count) = { + let mut src = BytesBinarySource::new(&self.input_buffer); + let mut r = src.packed::<_, _Any, _>(&mut self.membranes); + let e = r.next(false)?; + (e, r.source.index) + }; + match e { + None => break, + Some(item) => { + self.input_buffer.advance(count); + self.handle_packet(Packet::try_from(&item)?)?; + } + } + } + } + } + } + Ok(()) + } + + fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture { + if let Err(e) = exit_status { + let e = e.clone(); + Box::pin(self.send_packet(Packet::Error(Box::new(e)))) + } else { + Box::pin(ready(Ok(()))) + } + } +}