/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ #include #include #include #include #include #include #include #include #include "cmsg_private.h" #include "harness.h" #include "ref.h" #include "sexp.h" #include "hashtable.h" #include "node.h" typedef struct direct_extension_t_ { sexp_t *name; hashtable_t routing_table; hashtable_t subscriptions; } direct_extension_t; typedef struct subscription_t_ { sexp_t *uuid; sexp_t *sink; sexp_t *name; struct subscription_t_ *link; } subscription_t; static void free_subscription(subscription_t *sub) { DECREF(sub->uuid, sexp_destructor); DECREF(sub->sink, sexp_destructor); DECREF(sub->name, sexp_destructor); free(sub); } static void free_subscription_chain(void *context, cmsg_bytes_t key, void *value) { subscription_t *chain = value; while (chain != NULL) { subscription_t *next = chain->link; free_subscription(chain); chain = next; } } static sexp_t *direct_extend(node_t *n, sexp_t *args) { if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { cmsg_bytes_t name = sexp_data(sexp_head(args)); direct_extension_t *d = calloc(1, sizeof(*d)); d->name = INCREF(sexp_head(args)); init_hashtable(&d->routing_table, 5, NULL, NULL); init_hashtable(&d->subscriptions, 5, NULL, NULL); n->extension = d; return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); } else { return sexp_cstring("invalid args"); } } static void direct_destructor(node_t *n) { direct_extension_t *d = n->extension; if (d != NULL) { /* can be NULL if direct_extend was given invalid args */ DECREF(d->name, sexp_destructor); hashtable_foreach(&d->routing_table, free_subscription_chain, NULL); destroy_hashtable(&d->routing_table); destroy_hashtable(&d->subscriptions); free(d); } } static int send_to_sub(subscription_t *sub, sexp_t *body) { return post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid); } static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) { subscription_t *chain = NULL; subscription_t *prev = NULL; hashtable_get(&d->routing_table, sexp_data(rk), (void **) &chain); while (chain != NULL) { subscription_t *next = chain->link; if (!send_to_sub(chain, body)) { /* Destination no longer exists. */ info("Destination not found\n"); if (prev == NULL) { hashtable_put(&d->routing_table, sexp_data(rk), chain->link); } else { prev->link = chain->link; } chain->link = NULL; free_subscription(chain); } chain = next; } } static void direct_handle_message(node_t *n, sexp_t *m) { direct_extension_t *d = n->extension; size_t msglen = sexp_length(m); sexp_t *args; cmsg_bytes_t selector; if (msglen == 0 || !sexp_stringp(sexp_head(m))) { warn("Invalid message in direct\n"); return; } selector = sexp_data(sexp_head(m)); args = sexp_tail(m); if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) { sexp_t *rk = sexp_listref(args, 0); if (sexp_stringp(rk)) { route_message(d, rk, sexp_listref(args, 1)); } else { warn("Non-string routing key in direct\n"); } } else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { unsigned char uuid[CMSG_UUID_BUF_SIZE]; if (gen_uuid(uuid) != 0) { warn("Could not generate UUID\n"); } else { sexp_t *filter = sexp_listref(args, 0); sexp_t *reply_sink = sexp_listref(args, 3); sexp_t *reply_name = sexp_listref(args, 4); subscription_t *sub = malloc(sizeof(*sub)); sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); sub->sink = sexp_listref(args, 1); sub->name = sexp_listref(args, 2); sub->link = NULL; if (!sexp_stringp(filter) || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name) || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { DECREF(sub->uuid, sexp_destructor); free(sub); warn("Bad sink/name/reply_sink/reply_name in subscribe"); } else { INCREF(sub->sink); INCREF(sub->name); hashtable_put(&d->subscriptions, sexp_data(sub->uuid), sub); hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link); hashtable_put(&d->routing_table, sexp_data(filter), sub); { sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); INCREF(subok); post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); DECREF(subok, sexp_destructor); } } } } else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { if (sexp_stringp(sexp_head(args))) { cmsg_bytes_t uuid = sexp_data(sexp_head(args)); subscription_t *sub; if (hashtable_get(&d->subscriptions, uuid, (void **) &sub)) { /* TODO: clean up more eagerly perhaps? */ hashtable_erase(&d->subscriptions, uuid); DECREF(sub->uuid, sexp_destructor); sub->uuid = NULL; } } else { warn("Invalid unsubscription\n"); } } else { warn("Message not understood in direct; selector <<%.*s>>, length %u\n", selector.len, selector.bytes, msglen); } } static node_class_t direct_class = { .name = "direct", .extend = direct_extend, .destroy = direct_destructor, .handle_message = direct_handle_message }; void init_direct(void) { register_node_class(&direct_class); }