/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . * * This file is part of Hop. * * Hop is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Hop is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public * License for more details. * * You should have received a copy of the GNU General Public License * along with Hop. If not, see . */ #include #include #include #include #include #include #include #include #include "cmsg_private.h" #include "harness.h" #include "ref.h" #include "sexp.h" #include "hashtable.h" #include "node.h" #include "messages.h" #include "subscription.h" #include "sexpio.h" typedef struct direct_extension_t_ { sexp_t *name; hashtable_t routing_table; hashtable_t subscriptions; } direct_extension_t; static sexp_t *direct_extend(node_t *n, sexp_t *args) { if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { cmsg_bytes_t name = sexp_data(sexp_head(args)); direct_extension_t *d = calloc(1, sizeof(*d)); d->name = INCREF(sexp_head(args)); init_hashtable(&d->routing_table, 5, NULL, NULL); init_hashtable(&d->subscriptions, 5, NULL, NULL); n->extension = d; return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); } else { return sexp_cstring("invalid args"); } } static void free_direct_chain(void *context, cmsg_bytes_t key, void *value) { free_subscription_chain(value); } static void direct_destructor(node_t *n) { direct_extension_t *d = n->extension; if (d != NULL) { /* can be NULL if direct_extend was given invalid args */ DECREF(d->name, sexp_destructor); hashtable_foreach(&d->routing_table, free_direct_chain, NULL); destroy_hashtable(&d->routing_table); destroy_hashtable(&d->subscriptions); free(d); } } static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) { subscription_t *chain = NULL; subscription_t *newchain; hashtable_get(&d->routing_table, sexp_data(rk), (void **) &chain); newchain = send_to_subscription_chain(d->name, &d->subscriptions, chain, body); if (newchain != chain) { hashtable_put(&d->routing_table, sexp_data(rk), newchain); } } static void direct_handle_message(node_t *n, sexp_t *m) { direct_extension_t *d = n->extension; parsed_message_t p; if (parse_post(m, &p)) { if (sexp_stringp(p.post.name)) { route_message(d, p.post.name, p.post.body); } else { warn("Non-string routing key in direct\n"); } return; } if (parse_subscribe(m, &p)) { subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, &p); if (sub != NULL) { hashtable_get(&d->routing_table, sexp_data(p.subscribe.filter), (void **) &sub->link); hashtable_put(&d->routing_table, sexp_data(p.subscribe.filter), sub); } return; } if (parse_unsubscribe(m, &p)) { handle_unsubscribe_message(d->name, &d->subscriptions, &p); return; } warn("Message not understood in direct: "); sexp_writeln(stderr_h, m); } static node_class_t direct_class = { .name = "direct", .extend = direct_extend, .destroy = direct_destructor, .handle_message = direct_handle_message }; void init_direct(void) { register_node_class(&direct_class); }