Remove quasi-dataspace actor; more precise error handling

This commit is contained in:
Tony Garnock-Jones 2019-10-15 21:23:46 +01:00
parent 9e256376a1
commit 34909828eb
2 changed files with 114 additions and 116 deletions

View File

@ -5,7 +5,7 @@ authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
[dependencies]
preserves = "0.2.0"
preserves = "0.2.1"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"

View File

@ -1,4 +1,4 @@
#![recursion_limit="256"]
#![recursion_limit="512"]
mod bag;
mod skeleton;
@ -7,9 +7,10 @@ use bytes::BytesMut;
use preserves::value::{self, Map};
use tokio::prelude::*;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{channel, Sender, Receiver};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
use tokio::codec::{Framed, Encoder, Decoder};
use futures::select;
use std::io;
// use self::skeleton::Index;
@ -58,25 +59,64 @@ mod packets {
}
#[derive(Debug)]
pub enum DataspaceMessage {
Join(ConnId, Sender<packets::Out>),
Input(ConnId, packets::In),
Leave(ConnId),
enum PacketDecodeError {
Read(value::decoder::Error),
Parse(value::error::Error),
}
struct ValueCodec {
codec: value::Codec<V>,
}
impl ValueCodec {
fn new(codec: value::Codec<V>) -> Self {
ValueCodec { codec }
impl From<io::Error> for PacketDecodeError {
fn from(v: io::Error) -> Self {
PacketDecodeError::Read(v.into())
}
}
impl Decoder for ValueCodec {
type Item = V;
type Error = value::decoder::Error;
impl From<value::error::Error> for PacketDecodeError {
fn from(v: value::error::Error) -> Self {
PacketDecodeError::Parse(v)
}
}
#[derive(Debug)]
enum PacketEncodeError {
Write(value::encoder::Error),
Unparse(value::error::Error),
}
impl From<io::Error> for PacketEncodeError {
fn from(v: io::Error) -> Self {
PacketEncodeError::Write(v.into())
}
}
impl From<value::error::Error> for PacketEncodeError {
fn from(v: value::error::Error) -> Self {
PacketEncodeError::Unparse(v)
}
}
impl From<PacketEncodeError> for io::Error {
fn from(v: PacketEncodeError) -> Self {
match v {
PacketEncodeError::Write(e) => e,
PacketEncodeError::Unparse(e) =>
Self::new(io::ErrorKind::InvalidData, format!("{:?}", e)),
}
}
}
struct PacketCodec {
codec: value::Codec<V>,
}
impl PacketCodec {
fn new(codec: value::Codec<V>) -> Self {
PacketCodec { codec }
}
}
impl Decoder for PacketCodec {
type Item = packets::In;
type Error = PacketDecodeError;
fn decode(&mut self, bs: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let mut buf = &bs[..];
let orig_len = buf.len();
@ -84,68 +124,90 @@ impl Decoder for ValueCodec {
let final_len = buf.len();
bs.advance(orig_len - final_len);
match res {
Ok(v) => Ok(Some(v)),
Err(value::codec::Error::Eof) => Ok(None),
Err(e) => Err(e),
Ok(v) => Ok(Some(value::from_value(&v)?)),
Err(value::decoder::Error::Eof) => Ok(None),
Err(e) => Err(PacketDecodeError::Read(e)),
}
}
}
impl Encoder for ValueCodec {
type Item = V;
type Error = value::encoder::Error;
impl Encoder for PacketCodec {
type Item = packets::Out;
type Error = PacketEncodeError;
fn encode(&mut self, item: Self::Item, bs: &mut BytesMut) -> Result<(), Self::Error> {
bs.extend(self.codec.encode_bytes(&item)?);
let v: V = value::to_value(&item)?;
bs.extend(self.codec.encode_bytes(&v)?);
Ok(())
}
}
fn err(s: &str) -> packets::Out {
packets::Out::Err(s.into())
}
struct Peer {
id: ConnId,
rx: Receiver<packets::Out>,
dataspace: Sender<DataspaceMessage>,
frames: Framed<TcpStream, ValueCodec>,
tx: UnboundedSender<packets::Out>,
rx: UnboundedReceiver<packets::Out>,
frames: Framed<TcpStream, PacketCodec>,
}
impl Peer {
async fn new(id: ConnId, mut dataspace: Sender<DataspaceMessage>, stream: TcpStream) -> Self {
let (tx, rx) = channel(1);
let frames = Framed::new(stream, ValueCodec::new(value::Codec::new({
async fn new(id: ConnId, stream: TcpStream) -> Self {
let (tx, rx) = unbounded_channel();
let frames = Framed::new(stream, PacketCodec::new(value::Codec::new({
let mut m = Map::new();
m.insert(0, value::Value::symbol("Discard"));
m.insert(1, value::Value::symbol("Capture"));
m.insert(2, value::Value::symbol("Observe"));
m
})));
dataspace.send(DataspaceMessage::Join(id, tx)).await.unwrap();
Peer{ id, rx, dataspace, frames }
Peer{ id, tx, rx, frames }
}
async fn run(&mut self) -> Result<(), std::io::Error> {
async fn run(&mut self) -> Result<(), io::Error> {
println!("Got {:?} {:?}", self.id, &self.frames.get_ref());
let firstpacket = self.frames.next().await;
let dsname = if let Some(Ok(packets::In::Connect(dsname))) = firstpacket {
dsname
} else {
let e: String = format!("Expected initial Connect, got {:?}", firstpacket);
println!("{}", e);
self.frames.send(err(&e)).await?;
return Ok(())
};
println!("{}: connected to dataspace {:?}", self.id, dsname);
let mut running = true;
while running {
let mut to_send = Vec::new();
select! {
frame = self.frames.next().boxed().fuse() => match frame {
Some(res) => match res {
Ok(v) => {
println!("Input {}: {:?}", self.id, &v);
match value::from_value(&v) {
Ok(p) => {
let m = DataspaceMessage::Input(self.id, p);
self.dataspace.send(m).await.unwrap()
Ok(p) => {
println!("Input {}: {:?}", self.id, &p);
match p {
packets::In::Turn(actions) => (),
packets::In::Ping() => {
to_send.push(packets::Out::Pong())
}
Err(e) => {
to_send.push(packets::Out::Err(format!("{:?}", e)));
packets::In::Pong() => (),
packets::In::Connect(dsname) => {
to_send.push(err("Unexpected Connect"));
running = false;
}
}
}
Err(value::codec::Error::Eof) => running = false,
Err(value::codec::Error::Io(e)) => return Err(e),
Err(value::codec::Error::Syntax(s)) => {
to_send.push(packets::Out::Err(s.to_string()));
Err(PacketDecodeError::Read(value::decoder::Error::Eof)) => running = false,
Err(PacketDecodeError::Read(value::decoder::Error::Io(e))) => return Err(e),
Err(PacketDecodeError::Read(value::decoder::Error::Syntax(s))) => {
to_send.push(err(s));
running = false;
}
Err(PacketDecodeError::Parse(e)) => {
to_send.push(err(&format!("Packet deserialization error: {:?}", e)));
running = false;
}
}
@ -154,7 +216,11 @@ impl Peer {
msgopt = self.rx.recv().boxed().fuse() => {
match msgopt {
Some(msg) => to_send.push(msg),
None => /* weird. */ running = false,
None => {
/* weird. */
to_send.push(err("Outbound channel closed unexpectedly"));
running = false;
}
}
},
}
@ -164,7 +230,7 @@ impl Peer {
} else {
println!("Output {}: {:?}", self.id, &v);
}
self.frames.send(value::to_value(v).unwrap()).await?;
self.frames.send(v).await?;
}
}
Ok(())
@ -173,67 +239,6 @@ impl Peer {
impl Drop for Peer {
fn drop(&mut self) {
let mut dataspace = self.dataspace.clone();
let id = self.id;
tokio::spawn(async move {
let _ = dataspace.send(DataspaceMessage::Leave(id)).await;
});
}
}
struct Dataspace {
rx: Receiver<DataspaceMessage>,
peers: Map<ConnId, Sender<packets::Out>>,
}
impl Dataspace {
fn new(rx: Receiver<DataspaceMessage>) -> Self {
Dataspace { rx, peers: Map::new() }
}
async fn send(&mut self, i: ConnId, s: &mut Sender<packets::Out>, m: &packets::Out)
-> bool
{
match s.send(m.clone()).await {
Ok(_) => true,
Err(_) => { self.remove(i); false }
}
}
async fn send_to(&mut self, i: ConnId, m: &packets::Out) -> bool {
let mut ms = self.peers.get(&i).map(|s| s.clone());
match ms {
Some(ref mut s) => self.send(i, s, m).await,
None => false,
}
}
fn remove(&mut self, i: ConnId) {
self.peers.remove(&i);
// TODO: cleanup. Previously, this was:
// self.pending.push(PeerMessage::Leave(i));
}
async fn run(&mut self) {
loop {
println!("Dataspace waiting for message ({} connected)", self.peers.len());
let msg = self.rx.recv().await.unwrap();
println!("Dataspace: {:?}", msg);
match msg {
DataspaceMessage::Join(i, s) => {
self.peers.insert(i, s);
}
DataspaceMessage::Input(i, p) => {
match p {
packets::In::Connect(dsname) => (),
packets::In::Turn(actions) => (),
packets::In::Ping() => { self.send_to(i, &packets::Out::Pong()).await; }
packets::In::Pong() => (),
}
}
DataspaceMessage::Leave(i) => self.remove(i),
}
}
}
}
@ -241,12 +246,6 @@ impl Dataspace {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// let i = Index::new();
// Unlike std channels, a zero buffer is not supported
let (tx, rx) = channel(100); // but ugh a big buffer is needed to avoid deadlocks???
tokio::spawn(async {
Dataspace::new(rx).run().await;
});
let mut id = 0;
let port = 8001;
@ -254,11 +253,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Listening on port {}", port);
loop {
let (stream, addr) = listener.accept().await?;
let tx = tx.clone();
let connid = id;
id = id + 1;
tokio::spawn(async move {
match Peer::new(connid, tx, stream).await.run().await {
match Peer::new(connid, stream).await.run().await {
Ok(_) => (),
Err(e) => println!("Connection {:?} died with {:?}", addr, e),
}