hop-2012/relay.c

203 lines
6.1 KiB
C

/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <time.h>
#include <sys/time.h>
#include <assert.h>
typedef unsigned char u_char;
#include <event.h>
#include "cmsg_private.h"
#include "harness.h"
#include "relay.h"
#include "net.h"
#include "ref.h"
#include "sexp.h"
#include "sexpio.h"
#include "hashtable.h"
#include "node.h"
struct relay_node {
node_t node;
struct sockaddr_in peername;
char peername_str[256];
int fd;
IOHandle *outh;
};
static node_t *relay_construct(node_class_t *nc, sexp_t *args) {
/* TODO: outbound connections; args==NULL -> server relay, nonNULL -> outbound. */
struct relay_node *r = calloc(1, sizeof(*r));
return (node_t *) r;
}
static void relay_destructor(node_t *n) {
struct relay_node *r = (struct relay_node *) n;
delete_iohandle(r->outh);
r->outh = NULL;
if (close(r->fd) == -1) {
/* log errors as warnings here and keep on trucking */
warn("Closing file descriptor %d produced errno %d: %s\n",
r->fd, errno, strerror(errno));
}
free(n);
}
static void relay_handle_message(node_t *n, sexp_t *m) {
struct relay_node *r = (struct relay_node *) n;
BCHECK(!sexp_write(r->outh, m), "relay_handle_message sexp_write");
}
static node_class_t relay_class = {
.name = "relay",
.construct = relay_construct,
.destroy = relay_destructor,
.handle_message = relay_handle_message
};
static void send_error(IOHandle *h, char const *message, sexp_t *extra) {
sexp_t *m = extra;
m = sexp_cons(sexp_bytes(cmsg_cstring_bytes(message)), m);
m = sexp_cons(sexp_bytes(cmsg_cstring_bytes("error")), m);
INCREF(m);
iohandle_clear_error(h);
BCHECK(!sexp_write(h, m), "send_error sexp_write");
DECREF(m, sexp_destructor);
iohandle_flush(h); /* ignore result here, there's not much we can do with it */
}
static void send_sexp_syntax_error(IOHandle *h, char const *message) {
char const *url = "http://people.csail.mit.edu/rivest/Sexp.txt";
send_error(h, message, sexp_cons(sexp_bytes(cmsg_cstring_bytes(url)), NULL));
}
static void relay_main(struct relay_node *r) {
IOHandle *inh = new_iohandle(r->fd);
sexp_t *message = NULL; /* held */
assert((void *) &r->node == (void *) r);
INCREF(&r->node); /* because the caller doesn't hold a ref, and we
need to drop ours on our death */
info("Accepted connection from %s on fd %d\n", r->peername_str, r->fd);
iohandle_write(r->outh, cmsg_cstring_bytes("(3:hop1:0)"));
ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting");
//iohandle_settimeout(r->inh, 3, 0);
while (1) {
DECREF(message, sexp_destructor);
message = NULL;
message = INCREF(sexp_read(inh));
if (inh->error_kind != 0) goto network_error;
/* Log received message to console */
/*
fflush(NULL);
sexp_write(out_handle, message);
iohandle_write(out_handle, cmsg_cstring_bytes("\n"));
ICHECK(iohandle_flush(out_handle), "iohandle_flush out_handle");
*/
if (!(sexp_pairp(message) && sexp_stringp(sexp_head(message)))) {
send_error(r->outh, "ill-formed message", NULL);
goto protocol_error;
}
{
cmsg_bytes_t selector = sexp_data(sexp_head(message));
sexp_t *args = sexp_tail(message);
size_t msglen = sexp_length(message);
/* TODO: have constant atoms for post, subscribe, unsubscribe etc (see also post_node()) */
if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post")) && (msglen == 4)
&& sexp_stringp(sexp_head(args))) {
cmsg_bytes_t nodename = sexp_data(sexp_head(args));
if (!send_node(nodename, sexp_head(sexp_tail(args)))) {
warn("Was asked to post to unknown node <<%.*s>>\n", nodename.len, nodename.bytes);
}
} else if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe")) && (msglen == 6)
&& sexp_stringp(sexp_head(args))
&& sexp_stringp(sexp_head(sexp_tail(sexp_tail(sexp_tail(args)))))
&& sexp_stringp(sexp_head(sexp_tail(sexp_tail(sexp_tail(sexp_tail(args))))))) {
sexp_t *filter_sexp = sexp_head(args);
cmsg_bytes_t filter = sexp_data(filter_sexp);
sexp_t *reply_sink_and_name = sexp_tail(sexp_tail(sexp_tail(args)));
cmsg_bytes_t reply_sink = sexp_data(sexp_head(reply_sink_and_name));
cmsg_bytes_t reply_name = sexp_data(sexp_head(sexp_tail(reply_sink_and_name)));
if (bind_node(filter, &r->node)) {
sexp_t *subok = sexp_cons(sexp_bytes(cmsg_cstring_bytes("subscribe-ok")),
sexp_cons(filter_sexp, NULL));
INCREF(subok);
post_node(reply_sink, reply_name, subok);
DECREF(subok, sexp_destructor);
} else {
warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes);
}
} else if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe")) && (msglen == 2)
&& sexp_stringp(sexp_head(args))) {
sexp_t *id_sexp = sexp_head(args);
cmsg_bytes_t id = sexp_data(id_sexp);
if (!unbind_node(id)) {
warn("Unbind failed <<%.*s>>\n", id.len, id.bytes);
}
} else {
send_error(r->outh, "message not understood", message);
goto protocol_error;
}
}
}
network_error:
switch (inh->error_kind) {
case EVBUFFER_EOF:
info("Disconnecting fd %d normally.\n", r->fd);
break;
case SEXP_ERROR_OVERFLOW:
send_sexp_syntax_error(r->outh, "sexp too big");
break;
case SEXP_ERROR_SYNTAX:
send_sexp_syntax_error(r->outh, "sexp syntax error");
break;
default:
warn("Relay handle error on fd %d: 0x%04X\n", r->fd, inh->error_kind);
break;
}
protocol_error:
DECREF(message, sexp_destructor);
delete_iohandle(inh);
unbind_all_names_for_node(&r->node);
DECREF(&r->node, node_destructor);
}
void start_relay(struct sockaddr_in const *peername, int fd) {
struct relay_node *n = (struct relay_node *) new_node(&relay_class, NULL);
n->peername = *peername;
endpoint_name(&n->peername, CMSG_BYTES(sizeof(n->peername_str), n->peername_str));
n->fd = fd;
n->outh = new_iohandle(n->fd);
spawn((process_main_t) relay_main, n);
}