diff --git a/main.c b/main.c index 217719d..b701027 100644 --- a/main.c +++ b/main.c @@ -23,10 +23,6 @@ typedef unsigned char u_char; #define WANT_CONSOLE_LISTENER 1 -static node_t *factory_construct(node_class_t *nc, sexp_t *args) { - return malloc(sizeof(node_t)); -} - static void factory_handle_message(node_t *n, sexp_t *m) { size_t msglen = sexp_length(m); sexp_t *args; @@ -52,15 +48,16 @@ static void factory_handle_message(node_t *n, sexp_t *m) { if (nc == NULL) { warn("Node class not found <<%.*s>>\n", classname_bytes.len, classname_bytes.bytes); } else { - new_node(nc, ctor_arg); - { - sexp_t *createok = sexp_cons(sexp_bytes(cmsg_cstring_bytes("create-ok")), NULL); - INCREF(createok); - post_node(sexp_data(reply_sink), - sexp_data(reply_name), - createok); - DECREF(createok, sexp_destructor); + sexp_t *error = NULL; + sexp_t *reply; + if (new_node(nc, ctor_arg, &error) != NULL) { + reply = sexp_cons(sexp_cstring("create-ok"), NULL); + } else { + reply = sexp_cons(sexp_cstring("create-failed"), sexp_cons(error, NULL)); } + INCREF(reply); + post_node(sexp_data(reply_sink), sexp_data(reply_name), reply); + DECREF(reply, sexp_destructor); } } } else { @@ -72,13 +69,13 @@ static void factory_handle_message(node_t *n, sexp_t *m) { static node_class_t factory_class = { .name = "factory", - .construct = factory_construct, - .destroy = (node_destructor_fn_t) free, + .extend = NULL, + .destroy = NULL, .handle_message = factory_handle_message }; static void init_factory(void) { - bind_node(cmsg_cstring_bytes("factory"), new_node(&factory_class, NULL)); + bind_node(cmsg_cstring_bytes("factory"), new_node(&factory_class, NULL, NULL)); } #if WANT_CONSOLE_LISTENER diff --git a/node.c b/node.c index 62c9362..3d4b141 100644 --- a/node.c +++ b/node.c @@ -5,9 +5,13 @@ #include +#include + #include "cmsg_private.h" #include "ref.h" #include "sexp.h" +#include "harness.h" +#include "sexpio.h" #include "hashtable.h" #include "node.h" @@ -51,18 +55,35 @@ static void init_node_names(node_t *n) { init_hashtable(&n->names, 5, NULL, NULL); } -node_t *new_node(node_class_t *nc, sexp_t *args) { - node_t *n = nc->construct(nc, args); +node_t *new_node(node_class_t *nc, sexp_t *args, sexp_t **error_out) { + node_t *n = malloc(sizeof(*n)); n->refcount = ZERO_REFCOUNT(); n->node_class = nc; + n->extension = NULL; init_node_names(n); + if (nc->extend != NULL) { + sexp_t *error = nc->extend(n, args); + if (error != NULL) { + node_destructor(n); + if (error_out != NULL) { + *error_out = error; + } else { + warn("Creating node of class %s failed with ", nc->name); + sexp_writeln(stderr_h, error); + } + return NULL; + } + } return n; } void node_destructor(node_t *n) { + if (n->node_class->destroy != NULL) { + n->node_class->destroy(n); + } unbind_all_names_for_node(n); destroy_hashtable(&n->names); - n->node_class->destroy(n); + free(n); } node_t *lookup_node(cmsg_bytes_t name) { @@ -111,7 +132,7 @@ int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body) { int result; if (post_atom == NULL) { - post_atom = INCREF(sexp_bytes(cmsg_cstring_bytes("post"))); + post_atom = INCREF(sexp_cstring("post")); } msg = sexp_cons(body, msg); diff --git a/node.h b/node.h index ddbf88e..541d5cc 100644 --- a/node.h +++ b/node.h @@ -5,15 +5,16 @@ typedef struct node_t_ { refcount_t refcount; struct node_class_t_ *node_class; hashtable_t names; + void *extension; /* Each node class puts something different here in its instances */ } node_t; -typedef node_t *(*node_constructor_fn_t)(struct node_class_t_ *nc, sexp_t *args); +typedef sexp_t *(*node_extension_fn_t)(node_t *n, sexp_t *args); typedef void (*node_destructor_fn_t)(node_t *n); typedef void (*node_message_handler_fn_t)(node_t *n, sexp_t *m); typedef struct node_class_t_ { char const *name; - node_constructor_fn_t construct; + node_extension_fn_t extend; node_destructor_fn_t destroy; node_message_handler_fn_t handle_message; } node_class_t; @@ -25,7 +26,7 @@ extern void basic_node_destroy(node_t *n); extern void register_node_class(node_class_t *nc); extern node_class_t *lookup_node_class(cmsg_bytes_t name); -extern node_t *new_node(node_class_t *nc, sexp_t *args); +extern node_t *new_node(node_class_t *nc, sexp_t *args, sexp_t **error_out); extern void node_destructor(node_t *n); extern node_t *lookup_node(cmsg_bytes_t name); diff --git a/queue.c b/queue.c index fdb62e3..0e0e856 100644 --- a/queue.c +++ b/queue.c @@ -15,19 +15,19 @@ #include "harness.h" #include "ref.h" #include "sexp.h" +#include "sexpio.h" #include "hashtable.h" #include "node.h" #include "queue.h" #include "dataq.h" -typedef struct queue_node_t_ { - node_t node; +typedef struct queue_extension_t_ { sexp_t *backlog_q; queue_t waiter_q; hashtable_t subscriptions; Process *shovel; int shovel_awake; -} queue_node_t; +} queue_extension_t; typedef struct subscription_t_ { sexp_t *uuid; @@ -43,23 +43,32 @@ static void free_subscription(subscription_t *sub) { free(sub); } -static node_t *queue_construct(node_class_t *nc, sexp_t *args) { - queue_node_t *q = calloc(1, sizeof(*q)); - q->backlog_q = INCREF(sexp_new_queue()); - q->waiter_q = EMPTY_QUEUE(subscription_t, link); - init_hashtable(&q->subscriptions, 5, NULL, NULL); - q->shovel = NULL; - q->shovel_awake = 0; - return (node_t *) q; +static sexp_t *queue_extend(node_t *n, sexp_t *args) { + if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { + cmsg_bytes_t name = sexp_data(sexp_head(args)); + queue_extension_t *q = calloc(1, sizeof(*q)); + q->backlog_q = INCREF(sexp_new_queue()); + q->waiter_q = EMPTY_QUEUE(subscription_t, link); + init_hashtable(&q->subscriptions, 5, NULL, NULL); + q->shovel = NULL; + q->shovel_awake = 0; + + n->extension = q; + return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); + } else { + return sexp_cstring("invalid args"); + } } static void queue_destructor(node_t *n) { - queue_node_t *q = (queue_node_t *) n; - DECREF(q->backlog_q, sexp_destructor); - /* q->waiter_q will be automatically destroyed as part of the destruction of q->subscriptions */ - destroy_hashtable(&q->subscriptions); - /* TODO: um, take down the shovel too! */ - free(q); + queue_extension_t *q = n->extension; + if (q != NULL) { /* can be NULL if queue_extend was given invalid args */ + DECREF(q->backlog_q, sexp_destructor); + /* q->waiter_q will be automatically destroyed as part of the destruction of q->subscriptions */ + destroy_hashtable(&q->subscriptions); + warn("TODO: the shovel needs to be taken down as well here\n"); + free(q); + } } static int send_to_waiter(subscription_t *sub, sexp_t *body) { @@ -67,14 +76,17 @@ static int send_to_waiter(subscription_t *sub, sexp_t *body) { } static void shoveller(void *qv) { - queue_node_t *q = (queue_node_t *) qv; + queue_extension_t *q = qv; + sexp_t *body = NULL; /* held */ queue_t examined; subscription_t *sub = NULL; check_for_work: + info("Checking for work\n"); - if (!sexp_queue_emptyp(q->backlog_q)) { + if (sexp_queue_emptyp(q->backlog_q)) { + info("Backlog empty\n"); goto wait_and_shovel; } @@ -83,6 +95,7 @@ static void shoveller(void *qv) { find_valid_waiter: if (q->waiter_q.count == 0) { + info("No waiters\n"); sexp_queue_pushback(q->backlog_q, body); DECREF(body, sexp_destructor); q->waiter_q = examined; @@ -91,24 +104,32 @@ static void shoveller(void *qv) { sub = dequeue(&q->waiter_q); + info("Delivering to <<%.*s>>/<<%.*s>>...\n", + sexp_data(sub->sink).len, sexp_data(sub->sink).bytes, + sexp_data(sub->name).len, sexp_data(sub->name).bytes); + if ((sub->uuid == NULL) /* It has been unsubscribed. */ || !send_to_waiter(sub, body)) { /* Destination no longer exists. */ + info((sub->uuid == NULL) ? "Waiter was unsubscribed\n" : "Destination not found\n"); free_subscription(sub); goto find_valid_waiter; } + info("Delivery successful\n"); DECREF(body, sexp_destructor); queue_append(&q->waiter_q, &examined); enqueue(&q->waiter_q, sub); goto check_for_work; wait_and_shovel: + info("Waiting for throck\n"); q->shovel_awake = 0; suspend(); + info("Throck received!\n"); goto check_for_work; } -static void throck_shovel(queue_node_t *q) { +static void throck_shovel(queue_extension_t *q) { if (!q->shovel_awake) { if (!q->shovel) { q->shovel = spawn(shoveller, q); @@ -120,7 +141,7 @@ static void throck_shovel(queue_node_t *q) { } static void queue_handle_message(node_t *n, sexp_t *m) { - queue_node_t *q = (queue_node_t *) n; + queue_extension_t *q = n->extension; size_t msglen = sexp_length(m); sexp_t *args; @@ -160,8 +181,7 @@ static void queue_handle_message(node_t *n, sexp_t *m) { enqueue(&q->waiter_q, sub); throck_shovel(q); { - sexp_t *subok = sexp_cons(sexp_bytes(cmsg_cstring_bytes("subscribe-ok")), - sexp_cons(sub->uuid, NULL)); + sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); INCREF(subok); post_node(sexp_data(reply_sink), sexp_data(reply_name), subok); DECREF(subok, sexp_destructor); @@ -190,7 +210,7 @@ static void queue_handle_message(node_t *n, sexp_t *m) { static node_class_t queue_class = { .name = "queue", - .construct = queue_construct, + .extend = queue_extend, .destroy = queue_destructor, .handle_message = queue_handle_message }; diff --git a/relay.c b/relay.c index 28e2efb..5c641bb 100644 --- a/relay.c +++ b/relay.c @@ -31,22 +31,21 @@ typedef unsigned char u_char; #include "hashtable.h" #include "node.h" -struct relay_node { - node_t node; +typedef struct relay_extension_t_ { struct sockaddr_in peername; char peername_str[256]; int fd; IOHandle *outh; -}; +} relay_extension_t; -static node_t *relay_construct(node_class_t *nc, sexp_t *args) { +static sexp_t *relay_extend(node_t *n, sexp_t *args) { /* TODO: outbound connections; args==NULL -> server relay, nonNULL -> outbound. */ - struct relay_node *r = calloc(1, sizeof(*r)); - return (node_t *) r; + n->extension = calloc(1, sizeof(relay_extension_t)); + return NULL; } static void relay_destructor(node_t *n) { - struct relay_node *r = (struct relay_node *) n; + relay_extension_t *r = n->extension; delete_iohandle(r->outh); r->outh = NULL; if (close(r->fd) == -1) { @@ -54,30 +53,29 @@ static void relay_destructor(node_t *n) { warn("Closing file descriptor %d produced errno %d: %s\n", r->fd, errno, strerror(errno)); } - free(n); + free(r); } static void relay_handle_message(node_t *n, sexp_t *m) { - struct relay_node *r = (struct relay_node *) n; + relay_extension_t *r = n->extension; info("fd %d <-- ", r->fd); - sexp_write_flush(stderr_h, m); - fprintf(stderr, "\n"); + sexp_writeln(stderr_h, m); BCHECK(!sexp_write(r->outh, m), "relay_handle_message sexp_write"); } static node_class_t relay_class = { .name = "relay", - .construct = relay_construct, + .extend = relay_extend, .destroy = relay_destructor, .handle_message = relay_handle_message }; static void send_error(IOHandle *h, char const *message, sexp_t *extra) { sexp_t *m = extra; - m = sexp_cons(sexp_bytes(cmsg_cstring_bytes(message)), m); - m = sexp_cons(sexp_bytes(cmsg_cstring_bytes("error")), m); + m = sexp_cons(sexp_cstring(message), m); + m = sexp_cons(sexp_cstring("error"), m); INCREF(m); iohandle_clear_error(h); @@ -88,17 +86,16 @@ static void send_error(IOHandle *h, char const *message, sexp_t *extra) { static void send_sexp_syntax_error(IOHandle *h, char const *message) { char const *url = "http://people.csail.mit.edu/rivest/Sexp.txt"; - send_error(h, message, sexp_cons(sexp_bytes(cmsg_cstring_bytes(url)), NULL)); + send_error(h, message, sexp_cons(sexp_cstring(url), NULL)); } -static void relay_main(struct relay_node *r) { +static void relay_main(node_t *n) { + relay_extension_t *r = n->extension; IOHandle *inh = new_iohandle(r->fd); sexp_t *message = NULL; /* held */ - assert((void *) &r->node == (void *) r); - - INCREF(&r->node); /* because the caller doesn't hold a ref, and we - need to drop ours on our death */ + INCREF(n); /* because the caller doesn't hold a ref, and we need to + drop ours on our death */ info("Accepted connection from %s on fd %d\n", r->peername_str, r->fd); @@ -117,8 +114,7 @@ static void relay_main(struct relay_node *r) { if (inh->error_kind != 0) goto network_error; info("fd %d --> ", r->fd); - sexp_write_flush(stderr_h, message); - fprintf(stderr, "\n"); + sexp_writeln(stderr_h, message); if (!(sexp_pairp(message) && sexp_stringp(sexp_head(message)))) { send_error(r->outh, "ill-formed message", NULL); @@ -148,9 +144,8 @@ static void relay_main(struct relay_node *r) { sexp_t *reply_sink_and_name = sexp_tail(sexp_tail(sexp_tail(args))); cmsg_bytes_t reply_sink = sexp_data(sexp_head(reply_sink_and_name)); cmsg_bytes_t reply_name = sexp_data(sexp_head(sexp_tail(reply_sink_and_name))); - if (bind_node(filter, &r->node)) { - sexp_t *subok = sexp_cons(sexp_bytes(cmsg_cstring_bytes("subscribe-ok")), - sexp_cons(filter_sexp, NULL)); + if (bind_node(filter, n)) { + sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(filter_sexp, NULL)); INCREF(subok); post_node(reply_sink, reply_name, subok); DECREF(subok, sexp_destructor); @@ -165,7 +160,7 @@ static void relay_main(struct relay_node *r) { warn("Unbind failed <<%.*s>>\n", id.len, id.bytes); } } else { - send_error(r->outh, "message not understood", message); + send_error(r->outh, "message not understood", sexp_cons(message, NULL)); goto protocol_error; } } @@ -193,15 +188,16 @@ static void relay_main(struct relay_node *r) { protocol_error: DECREF(message, sexp_destructor); delete_iohandle(inh); - unbind_all_names_for_node(&r->node); - DECREF(&r->node, node_destructor); + unbind_all_names_for_node(n); + DECREF(n, node_destructor); } void start_relay(struct sockaddr_in const *peername, int fd) { - struct relay_node *n = (struct relay_node *) new_node(&relay_class, NULL); - n->peername = *peername; - endpoint_name(&n->peername, CMSG_BYTES(sizeof(n->peername_str), n->peername_str)); - n->fd = fd; - n->outh = new_iohandle(n->fd); + node_t *n = new_node(&relay_class, NULL, NULL); + relay_extension_t *r = n->extension; + r->peername = *peername; + endpoint_name(&r->peername, CMSG_BYTES(sizeof(r->peername_str), r->peername_str)); + r->fd = fd; + r->outh = new_iohandle(r->fd); spawn((process_main_t) relay_main, n); } diff --git a/sexp.c b/sexp.c index 791bece..0bf8a43 100644 --- a/sexp.c +++ b/sexp.c @@ -85,6 +85,10 @@ sexp_data_t *sexp_data_alias(cmsg_bytes_t body) { return data; } +sexp_t *sexp_cstring(char const *str) { + return sexp_bytes(cmsg_cstring_bytes(str)); +} + sexp_t *sexp_bytes(cmsg_bytes_t bytes) { sexp_t *x = alloc_shell(SEXP_BYTES); x->data.bytes = cmsg_bytes_malloc_dup(bytes); diff --git a/sexp.h b/sexp.h index ea457c1..a69830e 100644 --- a/sexp.h +++ b/sexp.h @@ -38,6 +38,7 @@ extern void sexp_destructor(sexp_t *x); extern sexp_data_t *sexp_data_copy(cmsg_bytes_t body, size_t offset, size_t length); extern sexp_data_t *sexp_data_alias(cmsg_bytes_t body); +extern sexp_t *sexp_cstring(char const *str); extern sexp_t *sexp_bytes(cmsg_bytes_t bytes); extern sexp_t *sexp_slice(sexp_data_t *data, size_t offset, size_t length); extern sexp_t *sexp_display_hint(sexp_t *hint, sexp_t *body); diff --git a/sexpio.c b/sexpio.c index d56c3e5..319fb18 100644 --- a/sexpio.c +++ b/sexpio.c @@ -215,11 +215,14 @@ unsigned short sexp_write(IOHandle *h, sexp_t *x) { } } -unsigned short sexp_write_flush(IOHandle *h, sexp_t *x) { +unsigned short sexp_writeln(IOHandle *h, sexp_t *x) { unsigned short result; fflush(NULL); result = sexp_write(h, x); - ICHECK(iohandle_flush(h), "sexp_write_flush iohandle_flush"); + if (result == 0) { + iohandle_write(h, cmsg_cstring_bytes("\n")); + ICHECK(iohandle_flush(h), "sexp_writeln iohandle_flush"); + } return result; } diff --git a/sexpio.h b/sexpio.h index 70a5afd..1b8af06 100644 --- a/sexpio.h +++ b/sexpio.h @@ -7,6 +7,6 @@ extern sexp_t *sexp_read_atom(IOHandle *h); extern sexp_t *sexp_read(IOHandle *h); extern unsigned short sexp_write(IOHandle *h, sexp_t *x); -extern unsigned short sexp_write_flush(IOHandle *h, sexp_t *x); +extern unsigned short sexp_writeln(IOHandle *h, sexp_t *x); #endif diff --git a/t1 b/t1 index 35be9b8..1687325 100644 --- a/t1 +++ b/t1 @@ -1,2 +1,3 @@ (9:subscribe5:test10:0:5:test15:login) -(4:post7:factory(6:create5:queue((4:name2:q1))5:test11:k)0:) +(4:post7:factory(6:create5:queue(2:q1)5:test11:k)0:) +(4:post2:q1(9:subscribe0:5:test18:consumer5:test11:k)0:) diff --git a/t2 b/t2 new file mode 100644 index 0000000..fc76e16 --- /dev/null +++ b/t2 @@ -0,0 +1,4 @@ +(9:subscribe5:test20:0:5:test25:login) +(4:post2:q1(4:post0:8:message10:)0:) +(4:post2:q1(4:post0:8:message20:)0:) +(11:unsubscribe5:test2)