/* Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include typedef unsigned char u_char; #include #include "cmsg_private.h" #include "harness.h" #include "relay.h" #include "net.h" #include "ref.h" #include "sexp.h" #include "sexpio.h" #include "hashtable.h" #include "node.h" #define WANT_MESSAGE_TRACE 0 typedef struct relay_extension_t_ { struct sockaddr_in peername; char peername_str[256]; sexp_t *remote_container_name; int fd; IOHandle *outh; } relay_extension_t; static sexp_t *relay_extend(node_t *n, sexp_t *args) { /* TODO: outbound connections; args==NULL -> server relay, nonNULL -> outbound. */ n->extension = calloc(1, sizeof(relay_extension_t)); return NULL; } static void relay_destructor(node_t *n) { relay_extension_t *r = n->extension; delete_iohandle(r->outh); r->outh = NULL; if (close(r->fd) == -1) { /* log errors as warnings here and keep on trucking */ warn("Closing file descriptor %d produced errno %d: %s\n", r->fd, errno, strerror(errno)); } DECREF(r->remote_container_name, sexp_destructor); free(r); } static void relay_handle_message(node_t *n, sexp_t *m) { relay_extension_t *r = n->extension; #if WANT_MESSAGE_TRACE info("fd %d <-- ", r->fd); sexp_writeln(stderr_h, m); #endif BCHECK(!sexp_write(r->outh, m), "relay_handle_message sexp_write"); } static node_class_t relay_class = { .name = "relay", .extend = relay_extend, .destroy = relay_destructor, .handle_message = relay_handle_message }; static void send_error(IOHandle *h, char const *message, sexp_t *extra) { sexp_t *m = extra; m = sexp_cons(sexp_cstring(message), m); m = sexp_cons(sexp_cstring("error"), m); INCREF(m); iohandle_clear_error(h); BCHECK(!sexp_write(h, m), "send_error sexp_write"); DECREF(m, sexp_destructor); iohandle_flush(h); /* ignore result here, there's not much we can do with it */ } static void send_sexp_syntax_error(IOHandle *h, char const *message) { char const *url = "http://people.csail.mit.edu/rivest/Sexp.txt"; send_error(h, message, sexp_cons(sexp_cstring(url), NULL)); } static void relay_main(node_t *n) { relay_extension_t *r = n->extension; IOHandle *inh = new_iohandle(r->fd); sexp_t *message = NULL; /* held */ INCREF(n); /* because the caller doesn't hold a ref, and we need to drop ours on our death */ info("Accepted connection from %s on fd %d\n", r->peername_str, r->fd); iohandle_write(r->outh, cmsg_cstring_bytes("(3:hop1:0)")); ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting"); { sexp_t *s = NULL; s = sexp_cons(sexp_empty_bytes, s); s = sexp_cons(sexp_empty_bytes, s); s = sexp_cons(sexp_empty_bytes, s); s = sexp_cons(sexp_empty_bytes, s); s = sexp_cons(sexp_bytes(local_container_name()), s); s = sexp_cons(sexp_cstring("subscribe"), s); INCREF(s); sexp_write(r->outh, s); DECREF(s, sexp_destructor); } //iohandle_settimeout(r->inh, 3, 0); while (1) { DECREF(message, sexp_destructor); message = NULL; if (!sexp_read(inh, &message)) goto network_error; INCREF(message); #if WANT_MESSAGE_TRACE info("fd %d --> ", r->fd); sexp_writeln(stderr_h, message); #endif if (!(sexp_pairp(message) && sexp_stringp(sexp_head(message)))) { info("Ill-formed message\n"); send_error(r->outh, "ill-formed message", NULL); goto protocol_error; } { cmsg_bytes_t selector = sexp_data(sexp_head(message)); sexp_t *args = sexp_tail(message); size_t msglen = sexp_length(message); /* TODO: have constant atoms for post, subscribe, unsubscribe etc (see also post_node() and the handler in queue.c etc) */ if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post")) && (msglen == 4) && sexp_stringp(sexp_head(args))) { cmsg_bytes_t nodename = sexp_data(sexp_head(args)); if (!send_node(nodename, sexp_head(sexp_tail(args)))) { warn("Was asked to post to unknown node <<%.*s>>\n", nodename.len, nodename.bytes); } } else if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe")) && (msglen == 6) && sexp_stringp(sexp_head(args)) && sexp_stringp(sexp_head(sexp_tail(sexp_tail(sexp_tail(args))))) && sexp_stringp(sexp_head(sexp_tail(sexp_tail(sexp_tail(sexp_tail(args))))))) { sexp_t *filter_sexp = sexp_head(args); cmsg_bytes_t filter = sexp_data(filter_sexp); sexp_t *reply_sink = sexp_listref(args, 3); sexp_t *reply_name = sexp_listref(args, 4); if (bind_node(filter, n)) { sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(filter_sexp, NULL)); post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); DECREF(r->remote_container_name, sexp_destructor); r->remote_container_name = INCREF(filter_sexp); } else { warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes); } } else if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe")) && (msglen == 2) && sexp_stringp(sexp_head(args))) { sexp_t *id_sexp = sexp_head(args); cmsg_bytes_t id = sexp_data(id_sexp); if (!unbind_node(id)) { warn("Unbind failed <<%.*s>>\n", id.len, id.bytes); } } else { send_error(r->outh, "message not understood", sexp_cons(message, NULL)); goto protocol_error; } } } network_error: if (inh->eof) { info("Disconnecting fd %d normally.\n", r->fd); } else { switch (inh->error_kind) { case SEXP_ERROR_OVERFLOW: send_sexp_syntax_error(r->outh, "sexp too big"); break; case SEXP_ERROR_SYNTAX: send_sexp_syntax_error(r->outh, "sexp syntax error"); break; default: warn("Relay handle error 0x%04X on fd %d: %d, %s\n", inh->error_kind, r->fd, inh->error_errno, strerror(inh->error_errno)); break; } } protocol_error: DECREF(message, sexp_destructor); delete_iohandle(inh); unbind_all_names_for_node(n); DECREF(n, node_destructor); } void start_relay(struct sockaddr_in const *peername, int fd) { node_t *n = new_node(&relay_class, NULL, NULL); relay_extension_t *r = n->extension; r->peername = *peername; endpoint_name(&r->peername, CMSG_BYTES(sizeof(r->peername_str), r->peername_str)); r->remote_container_name = NULL; r->fd = fd; r->outh = new_iohandle(r->fd); spawn((process_main_t) relay_main, n); }