diff --git a/Makefile b/Makefile index 5c6a650..2e5765d 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TARGET = cmsg OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \ - queue.o direct.o fanout.o + queue.o direct.o fanout.o subscription.o UUID_CFLAGS:=$(shell uuid-config --cflags) UUID_LDFLAGS:=$(shell uuid-config --ldflags) diff --git a/TODO b/TODO index 8d33684..62f7b68 100644 --- a/TODO +++ b/TODO @@ -2,7 +2,5 @@ Cope with possibility of duplicate uuid, in e.g. queue/fanout/direct. If a subscription token matches an existing subscription, there's a collision, so choose a new token until there's no collision. -Factor out commonality in subscription-management from queue/fanout/direct. - SAX-style sexp reader/writer, so that we can do something sensible for enormous (e.g. gigabyte-sized) messages. diff --git a/direct.c b/direct.c index 8428b31..dfecc85 100644 --- a/direct.c +++ b/direct.c @@ -17,6 +17,7 @@ #include "sexp.h" #include "hashtable.h" #include "node.h" +#include "subscription.h" typedef struct direct_extension_t_ { sexp_t *name; @@ -24,29 +25,6 @@ typedef struct direct_extension_t_ { hashtable_t subscriptions; } direct_extension_t; -typedef struct subscription_t_ { - sexp_t *uuid; - sexp_t *sink; - sexp_t *name; - struct subscription_t_ *link; -} subscription_t; - -static void free_subscription(subscription_t *sub) { - DECREF(sub->uuid, sexp_destructor); - DECREF(sub->sink, sexp_destructor); - DECREF(sub->name, sexp_destructor); - free(sub); -} - -static void free_subscription_chain(void *context, cmsg_bytes_t key, void *value) { - subscription_t *chain = value; - while (chain != NULL) { - subscription_t *next = chain->link; - free_subscription(chain); - chain = next; - } -} - static sexp_t *direct_extend(node_t *n, sexp_t *args) { if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { cmsg_bytes_t name = sexp_data(sexp_head(args)); @@ -62,38 +40,28 @@ static sexp_t *direct_extend(node_t *n, sexp_t *args) { } } +static void free_direct_chain(void *context, cmsg_bytes_t key, void *value) { + free_subscription_chain(value); +} + static void direct_destructor(node_t *n) { direct_extension_t *d = n->extension; if (d != NULL) { /* can be NULL if direct_extend was given invalid args */ DECREF(d->name, sexp_destructor); - hashtable_foreach(&d->routing_table, free_subscription_chain, NULL); + hashtable_foreach(&d->routing_table, free_direct_chain, NULL); destroy_hashtable(&d->routing_table); destroy_hashtable(&d->subscriptions); free(d); } } -static int send_to_sub(subscription_t *sub, sexp_t *body) { - return post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid); -} - static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) { subscription_t *chain = NULL; - subscription_t *prev = NULL; + subscription_t *newchain; hashtable_get(&d->routing_table, sexp_data(rk), (void **) &chain); - while (chain != NULL) { - subscription_t *next = chain->link; - if (!send_to_sub(chain, body)) { /* Destination no longer exists. */ - info("Destination not found\n"); - if (prev == NULL) { - hashtable_put(&d->routing_table, sexp_data(rk), chain->link); - } else { - prev->link = chain->link; - } - chain->link = NULL; - free_subscription(chain); - } - chain = next; + newchain = send_to_subscription_chain(&d->subscriptions, chain, body); + if (newchain != chain) { + hashtable_put(&d->routing_table, sexp_data(rk), newchain); } } @@ -120,51 +88,14 @@ static void direct_handle_message(node_t *n, sexp_t *m) { warn("Non-string routing key in direct\n"); } } else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - unsigned char uuid[CMSG_UUID_BUF_SIZE]; - if (gen_uuid(uuid) != 0) { - warn("Could not generate UUID\n"); - } else { + subscription_t *sub = handle_subscribe_message(&d->subscriptions, args); + if (sub != NULL) { sexp_t *filter = sexp_listref(args, 0); - sexp_t *reply_sink = sexp_listref(args, 3); - sexp_t *reply_name = sexp_listref(args, 4); - subscription_t *sub = malloc(sizeof(*sub)); - sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); - sub->sink = sexp_listref(args, 1); - sub->name = sexp_listref(args, 2); - sub->link = NULL; - if (!sexp_stringp(filter) - || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name) - || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { - DECREF(sub->uuid, sexp_destructor); - free(sub); - warn("Bad sink/name/reply_sink/reply_name in subscribe"); - } else { - INCREF(sub->sink); - INCREF(sub->name); - hashtable_put(&d->subscriptions, sexp_data(sub->uuid), sub); - hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link); - hashtable_put(&d->routing_table, sexp_data(filter), sub); - { - sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); - INCREF(subok); - post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); - DECREF(subok, sexp_destructor); - } - } + hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link); + hashtable_put(&d->routing_table, sexp_data(filter), sub); } } else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - if (sexp_stringp(sexp_head(args))) { - cmsg_bytes_t uuid = sexp_data(sexp_head(args)); - subscription_t *sub; - if (hashtable_get(&d->subscriptions, uuid, (void **) &sub)) { - /* TODO: clean up more eagerly perhaps? */ - hashtable_erase(&d->subscriptions, uuid); - DECREF(sub->uuid, sexp_destructor); - sub->uuid = NULL; - } - } else { - warn("Invalid unsubscription\n"); - } + handle_unsubscribe_message(&d->subscriptions, args); } else { warn("Message not understood in direct; selector <<%.*s>>, length %u\n", selector.len, selector.bytes, diff --git a/fanout.c b/fanout.c index e60938c..02722d7 100644 --- a/fanout.c +++ b/fanout.c @@ -17,32 +17,19 @@ #include "sexp.h" #include "hashtable.h" #include "node.h" +#include "subscription.h" typedef struct fanout_extension_t_ { sexp_t *name; hashtable_t subscriptions; } fanout_extension_t; -typedef struct subscription_t_ { - sexp_t *uuid; - sexp_t *sink; - sexp_t *name; -} subscription_t; - -static void free_subscription(void *value) { - subscription_t *sub = value; - DECREF(sub->uuid, sexp_destructor); - DECREF(sub->sink, sexp_destructor); - DECREF(sub->name, sexp_destructor); - free(sub); -} - static sexp_t *fanout_extend(node_t *n, sexp_t *args) { if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { cmsg_bytes_t name = sexp_data(sexp_head(args)); fanout_extension_t *f = calloc(1, sizeof(*f)); f->name = INCREF(sexp_head(args)); - init_hashtable(&f->subscriptions, 5, NULL, free_subscription); + init_hashtable(&f->subscriptions, 5, NULL, (void (*)(void *)) free_subscription); n->extension = f; return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); @@ -68,9 +55,7 @@ struct delivery_context { static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) { struct delivery_context *context = contextv; subscription_t *sub = subv; - if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), context->body, sub->uuid)) { - hashtable_erase(&context->f->subscriptions, sexp_data(sub->uuid)); - } + send_to_subscription(&context->f->subscriptions, sub, context->body); } static void fanout_handle_message(node_t *n, sexp_t *m) { @@ -94,40 +79,9 @@ static void fanout_handle_message(node_t *n, sexp_t *m) { context.body = sexp_listref(args, 1); hashtable_foreach(&f->subscriptions, send_to_sub, &context); } else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - unsigned char uuid[CMSG_UUID_BUF_SIZE]; - if (gen_uuid(uuid) != 0) { - warn("Could not generate UUID\n"); - } else { - sexp_t *reply_sink = sexp_listref(args, 3); - sexp_t *reply_name = sexp_listref(args, 4); - subscription_t *sub = malloc(sizeof(*sub)); - sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); - sub->sink = sexp_listref(args, 1); - sub->name = sexp_listref(args, 2); - if (!sexp_stringp(sub->sink) || !sexp_stringp(sub->name) - || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { - DECREF(sub->uuid, sexp_destructor); - free(sub); - warn("Bad sink/name/reply_sink/reply_name in subscribe"); - } else { - INCREF(sub->sink); - INCREF(sub->name); - hashtable_put(&f->subscriptions, sexp_data(sub->uuid), sub); - { - sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); - INCREF(subok); - post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); - DECREF(subok, sexp_destructor); - } - } - } + handle_subscribe_message(&f->subscriptions, args); } else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - if (sexp_stringp(sexp_head(args))) { - cmsg_bytes_t uuid = sexp_data(sexp_head(args)); - hashtable_erase(&f->subscriptions, uuid); - } else { - warn("Invalid unsubscription\n"); - } + handle_unsubscribe_message(&f->subscriptions, args); } else { warn("Message not understood in fanout; selector <<%.*s>>, length %u\n", selector.len, selector.bytes, diff --git a/queue.c b/queue.c index 18d1a37..c011a56 100644 --- a/queue.c +++ b/queue.c @@ -20,6 +20,7 @@ #include "node.h" #include "queue.h" #include "dataq.h" +#include "subscription.h" typedef struct queue_extension_t_ { sexp_t *name; @@ -30,20 +31,6 @@ typedef struct queue_extension_t_ { int shovel_awake; } queue_extension_t; -typedef struct subscription_t_ { - sexp_t *uuid; - sexp_t *sink; - sexp_t *name; - struct subscription_t_ *link; -} subscription_t; - -static void free_subscription(subscription_t *sub) { - DECREF(sub->uuid, sexp_destructor); - DECREF(sub->sink, sexp_destructor); - DECREF(sub->name, sexp_destructor); - free(sub); -} - static sexp_t *queue_extend(node_t *n, sexp_t *args) { if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { cmsg_bytes_t name = sexp_data(sexp_head(args)); @@ -79,10 +66,6 @@ static void queue_destructor(node_t *n) { } } -static int send_to_waiter(subscription_t *sub, sexp_t *body) { - return post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid); -} - static void end_burst(queue_extension_t *q, size_t *burst_count_ptr, size_t total_count) { if (*burst_count_ptr > 0) { info("Queue <<%.*s>>: burst count %lu; total %lu\n", @@ -98,7 +81,6 @@ static void shoveller(void *qv) { size_t burst_count = 0; size_t total_count = 0; sexp_t *body = NULL; /* held */ - queue_t examined; subscription_t *sub = NULL; check_for_work: @@ -110,14 +92,12 @@ static void shoveller(void *qv) { } body = INCREF(sexp_dequeue(q->backlog_q)); /* held */ - examined = EMPTY_QUEUE(subscription_t, link); find_valid_waiter: if (q->waiter_q.count == 0) { //info("No waiters\n"); sexp_queue_pushback(q->backlog_q, body); DECREF(body, sexp_destructor); - q->waiter_q = examined; goto wait_and_shovel; } @@ -129,10 +109,7 @@ static void shoveller(void *qv) { sexp_data(sub->name).len, sexp_data(sub->name).bytes); */ - if ((sub->uuid == NULL) /* It has been unsubscribed. */ - || !send_to_waiter(sub, body)) { /* Destination no longer exists. */ - info((sub->uuid == NULL) ? "Waiter was unsubscribed\n" : "Destination not found\n"); - free_subscription(sub); + if (!send_to_subscription(&q->subscriptions, sub, body)) { goto find_valid_waiter; } @@ -141,7 +118,6 @@ static void shoveller(void *qv) { //info("Delivery successful\n"); DECREF(body, sexp_destructor); - queue_append(&q->waiter_q, &examined); enqueue(&q->waiter_q, sub); if (burst_count >= 10000) { @@ -190,49 +166,13 @@ static void queue_handle_message(node_t *n, sexp_t *m) { sexp_enqueue(q->backlog_q, sexp_listref(args, 1)); throck_shovel(q); } else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - unsigned char uuid[CMSG_UUID_BUF_SIZE]; - if (gen_uuid(uuid) != 0) { - warn("Could not generate UUID\n"); - } else { - sexp_t *reply_sink = sexp_listref(args, 3); - sexp_t *reply_name = sexp_listref(args, 4); - subscription_t *sub = malloc(sizeof(*sub)); - sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); - sub->sink = sexp_listref(args, 1); - sub->name = sexp_listref(args, 2); - sub->link = NULL; - if (!sexp_stringp(sub->sink) || !sexp_stringp(sub->name) - || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { - DECREF(sub->uuid, sexp_destructor); - free(sub); - warn("Bad sink/name/reply_sink/reply_name in subscribe"); - } else { - INCREF(sub->sink); - INCREF(sub->name); - hashtable_put(&q->subscriptions, sexp_data(sub->uuid), sub); - enqueue(&q->waiter_q, sub); - throck_shovel(q); - { - sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); - INCREF(subok); - post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); - DECREF(subok, sexp_destructor); - } - } + subscription_t *sub = handle_subscribe_message(&q->subscriptions, args); + if (sub != NULL) { + enqueue(&q->waiter_q, sub); + throck_shovel(q); } } else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - if (sexp_stringp(sexp_head(args))) { - cmsg_bytes_t uuid = sexp_data(sexp_head(args)); - subscription_t *sub; - if (hashtable_get(&q->subscriptions, uuid, (void **) &sub)) { - /* TODO: clean up more eagerly perhaps? */ - hashtable_erase(&q->subscriptions, uuid); - DECREF(sub->uuid, sexp_destructor); - sub->uuid = NULL; - } - } else { - warn("Invalid unsubscription\n"); - } + handle_unsubscribe_message(&q->subscriptions, args); } else { warn("Message not understood in queue; selector <<%.*s>>, length %u\n", selector.len, selector.bytes, diff --git a/subscription.c b/subscription.c new file mode 100644 index 0000000..3f71981 --- /dev/null +++ b/subscription.c @@ -0,0 +1,132 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "cmsg_private.h" +#include "ref.h" +#include "sexp.h" +#include "hashtable.h" +#include "subscription.h" +#include "node.h" + +void free_subscription(subscription_t *sub) { + DECREF(sub->uuid, sexp_destructor); + DECREF(sub->sink, sexp_destructor); + DECREF(sub->name, sexp_destructor); + free(sub); +} + +void free_subscription_chain(subscription_t *chain) { + while (chain != NULL) { + subscription_t *next = chain->link; + free_subscription(chain); + chain = next; + } +} + +/* Returns true if the subscription has not been unsubscribed and the + destination of the subscription exists. */ +int send_to_subscription(hashtable_t *subscriptions, + subscription_t *sub, + sexp_t *body) +{ + if (sub->uuid == NULL) { + free_subscription(sub); + return 0; + } else if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid)) { + hashtable_erase(subscriptions, sexp_data(sub->uuid)); + free_subscription(sub); + return 0; + } else { + return 1; + } +} + +subscription_t *send_to_subscription_chain(hashtable_t *subscriptions, + subscription_t *chain, + sexp_t *body) +{ + subscription_t *top = chain; + subscription_t *prev = NULL; + while (chain != NULL) { + subscription_t *next = chain->link; + if (!send_to_subscription(subscriptions, chain, body)) { + if (prev == NULL) { + top = next; + } else { + prev->link = next; + } + } + prev = chain; + chain = next; + } + return top; +} + +subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *args) { + unsigned char uuid[CMSG_UUID_BUF_SIZE]; + if (gen_uuid(uuid) != 0) { + warn("Could not generate UUID\n"); + return NULL; + } else { + sexp_t *filter = sexp_listref(args, 0); + sexp_t *reply_sink = sexp_listref(args, 3); + sexp_t *reply_name = sexp_listref(args, 4); + subscription_t *sub = malloc(sizeof(*sub)); + + sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); + sub->sink = sexp_listref(args, 1); + sub->name = sexp_listref(args, 2); + sub->link = NULL; + + if (!sexp_stringp(filter) + || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name) + || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { + DECREF(sub->uuid, sexp_destructor); + free(sub); + warn("Bad sink/name/reply_sink/reply_name in subscribe"); + return NULL; + } + + INCREF(sub->sink); + INCREF(sub->name); + + hashtable_put(subscriptions, sexp_data(sub->uuid), sub); + + { + sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); + INCREF(subok); + post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); + DECREF(subok, sexp_destructor); + } + + return sub; + } +} + +void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args) { + cmsg_bytes_t uuid; + subscription_t *sub; + + if (!sexp_stringp(sexp_head(args))) { + warn("Invalid unsubscription\n"); + return; + } + + uuid = sexp_data(sexp_head(args)); + if (hashtable_get(subscriptions, uuid, (void **) &sub)) { + /* TODO: clean up more eagerly perhaps? */ + DECREF(sub->uuid, sexp_destructor); + sub->uuid = NULL; + hashtable_erase(subscriptions, uuid); + } +} diff --git a/subscription.h b/subscription.h new file mode 100644 index 0000000..e6ea884 --- /dev/null +++ b/subscription.h @@ -0,0 +1,24 @@ +#ifndef cmsg_subscription_h +#define cmsg_subscription_h + +typedef struct subscription_t_ { + sexp_t *uuid; + sexp_t *sink; + sexp_t *name; + struct subscription_t_ *link; +} subscription_t; + +extern void free_subscription(subscription_t *sub); +extern void free_subscription_chain(subscription_t *chain); + +extern int send_to_subscription(hashtable_t *subscriptions, + subscription_t *sub, + sexp_t *body); +extern subscription_t *send_to_subscription_chain(hashtable_t *subscriptions, + subscription_t *chain, + sexp_t *body); + +extern subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *args); +extern void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args); + +#endif