diff --git a/Makefile b/Makefile index 02d14bd..5c6a650 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TARGET = cmsg OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \ - queue.o direct.o + queue.o direct.o fanout.o UUID_CFLAGS:=$(shell uuid-config --cflags) UUID_LDFLAGS:=$(shell uuid-config --ldflags) diff --git a/fanout.c b/fanout.c new file mode 100644 index 0000000..e60938c --- /dev/null +++ b/fanout.c @@ -0,0 +1,147 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "ref.h" +#include "sexp.h" +#include "hashtable.h" +#include "node.h" + +typedef struct fanout_extension_t_ { + sexp_t *name; + hashtable_t subscriptions; +} fanout_extension_t; + +typedef struct subscription_t_ { + sexp_t *uuid; + sexp_t *sink; + sexp_t *name; +} subscription_t; + +static void free_subscription(void *value) { + subscription_t *sub = value; + DECREF(sub->uuid, sexp_destructor); + DECREF(sub->sink, sexp_destructor); + DECREF(sub->name, sexp_destructor); + free(sub); +} + +static sexp_t *fanout_extend(node_t *n, sexp_t *args) { + if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { + cmsg_bytes_t name = sexp_data(sexp_head(args)); + fanout_extension_t *f = calloc(1, sizeof(*f)); + f->name = INCREF(sexp_head(args)); + init_hashtable(&f->subscriptions, 5, NULL, free_subscription); + + n->extension = f; + return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); + } else { + return sexp_cstring("invalid args"); + } +} + +static void fanout_destructor(node_t *n) { + fanout_extension_t *f = n->extension; + if (f != NULL) { /* can be NULL if fanout_extend was given invalid args */ + DECREF(f->name, sexp_destructor); + destroy_hashtable(&f->subscriptions); + free(f); + } +} + +struct delivery_context { + fanout_extension_t *f; + sexp_t *body; +}; + +static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) { + struct delivery_context *context = contextv; + subscription_t *sub = subv; + if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), context->body, sub->uuid)) { + hashtable_erase(&context->f->subscriptions, sexp_data(sub->uuid)); + } +} + +static void fanout_handle_message(node_t *n, sexp_t *m) { + fanout_extension_t *f = n->extension; + + size_t msglen = sexp_length(m); + sexp_t *args; + cmsg_bytes_t selector; + + if (msglen == 0 || !sexp_stringp(sexp_head(m))) { + warn("Invalid message in fanout\n"); + return; + } + + selector = sexp_data(sexp_head(m)); + args = sexp_tail(m); + + if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) { + struct delivery_context context; + context.f = f; + context.body = sexp_listref(args, 1); + hashtable_foreach(&f->subscriptions, send_to_sub, &context); + } else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { + unsigned char uuid[CMSG_UUID_BUF_SIZE]; + if (gen_uuid(uuid) != 0) { + warn("Could not generate UUID\n"); + } else { + sexp_t *reply_sink = sexp_listref(args, 3); + sexp_t *reply_name = sexp_listref(args, 4); + subscription_t *sub = malloc(sizeof(*sub)); + sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); + sub->sink = sexp_listref(args, 1); + sub->name = sexp_listref(args, 2); + if (!sexp_stringp(sub->sink) || !sexp_stringp(sub->name) + || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { + DECREF(sub->uuid, sexp_destructor); + free(sub); + warn("Bad sink/name/reply_sink/reply_name in subscribe"); + } else { + INCREF(sub->sink); + INCREF(sub->name); + hashtable_put(&f->subscriptions, sexp_data(sub->uuid), sub); + { + sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); + INCREF(subok); + post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); + DECREF(subok, sexp_destructor); + } + } + } + } else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { + if (sexp_stringp(sexp_head(args))) { + cmsg_bytes_t uuid = sexp_data(sexp_head(args)); + hashtable_erase(&f->subscriptions, uuid); + } else { + warn("Invalid unsubscription\n"); + } + } else { + warn("Message not understood in fanout; selector <<%.*s>>, length %u\n", + selector.len, selector.bytes, + msglen); + } +} + +static node_class_t fanout_class = { + .name = "fanout", + .extend = fanout_extend, + .destroy = fanout_destructor, + .handle_message = fanout_handle_message +}; + +void init_fanout(void) { + register_node_class(&fanout_class); +} diff --git a/fanout.h b/fanout.h new file mode 100644 index 0000000..0c73a39 --- /dev/null +++ b/fanout.h @@ -0,0 +1,6 @@ +#ifndef cmsg_fanout_h +#define cmsg_fanout_h + +extern void init_fanout(void); + +#endif diff --git a/main.c b/main.c index 4a5a55f..cb55f9c 100644 --- a/main.c +++ b/main.c @@ -22,6 +22,7 @@ typedef unsigned char u_char; #include "node.h" #include "queue.h" #include "direct.h" +#include "fanout.h" #define WANT_CONSOLE_LISTENER 1 @@ -103,6 +104,7 @@ int main(int argc, char *argv[]) { init_factory(); init_queue(); init_direct(); + init_fanout(); #if WANT_CONSOLE_LISTENER spawn(console_listener, NULL); #endif