From a2aae0e93847839cb349ca6b9ee063fdfd4d951c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 2 Jan 2011 22:46:48 -0500 Subject: [PATCH] First stab at "meta" exchange. --- Makefile | 2 +- TODO | 7 +++++++ direct.c | 6 +++--- fanout.c | 6 +++--- main.c | 2 ++ meta.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++ meta.h | 12 ++++++++++++ node.c | 14 ++++++++++++++ queue.c | 6 +++--- subscription.c | 30 ++++++++++++++++++++++-------- subscription.h | 16 ++++++++++++---- t0 | 2 ++ 12 files changed, 127 insertions(+), 22 deletions(-) create mode 100644 meta.c create mode 100644 meta.h create mode 100644 t0 diff --git a/Makefile b/Makefile index 2e5765d..b8ce632 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TARGET = cmsg OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \ - queue.o direct.o fanout.o subscription.o + queue.o direct.o fanout.o subscription.o meta.o UUID_CFLAGS:=$(shell uuid-config --cflags) UUID_LDFLAGS:=$(shell uuid-config --ldflags) diff --git a/TODO b/TODO index 62f7b68..d2505fc 100644 --- a/TODO +++ b/TODO @@ -4,3 +4,10 @@ collision, so choose a new token until there's no collision. SAX-style sexp reader/writer, so that we can do something sensible for enormous (e.g. gigabyte-sized) messages. + +The "meta" exchange probably wants to be a topic exchange. Or +something. + +The "meta" exchange probably wants to emit how-things-are-now messages +when people subscribe to it, to get them started; a kind of +last-value-cache type thing. diff --git a/direct.c b/direct.c index 3162416..c9d4532 100644 --- a/direct.c +++ b/direct.c @@ -59,7 +59,7 @@ static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) { subscription_t *chain = NULL; subscription_t *newchain; hashtable_get(&d->routing_table, sexp_data(rk), (void **) &chain); - newchain = send_to_subscription_chain(&d->subscriptions, chain, body); + newchain = send_to_subscription_chain(d->name, &d->subscriptions, chain, body); if (newchain != chain) { hashtable_put(&d->routing_table, sexp_data(rk), newchain); } @@ -91,7 +91,7 @@ static void direct_handle_message(node_t *n, sexp_t *m) { } if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - subscription_t *sub = handle_subscribe_message(&d->subscriptions, args); + subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, args); if (sub != NULL) { sexp_t *filter = sexp_listref(args, 0); hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link); @@ -101,7 +101,7 @@ static void direct_handle_message(node_t *n, sexp_t *m) { } if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - handle_unsubscribe_message(&d->subscriptions, args); + handle_unsubscribe_message(d->name, &d->subscriptions, args); return; } diff --git a/fanout.c b/fanout.c index 4f65dda..25753bf 100644 --- a/fanout.c +++ b/fanout.c @@ -55,7 +55,7 @@ struct delivery_context { static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) { struct delivery_context *context = contextv; subscription_t *sub = subv; - send_to_subscription(&context->f->subscriptions, sub, context->body); + send_to_subscription(context->f->name, &context->f->subscriptions, sub, context->body); } static void fanout_handle_message(node_t *n, sexp_t *m) { @@ -82,12 +82,12 @@ static void fanout_handle_message(node_t *n, sexp_t *m) { } if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - handle_subscribe_message(&f->subscriptions, args); + handle_subscribe_message(f->name, &f->subscriptions, args); return; } if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - handle_unsubscribe_message(&f->subscriptions, args); + handle_unsubscribe_message(f->name, &f->subscriptions, args); return; } diff --git a/main.c b/main.c index 78f4ae7..538d89e 100644 --- a/main.c +++ b/main.c @@ -23,6 +23,7 @@ typedef unsigned char u_char; #include "queue.h" #include "direct.h" #include "fanout.h" +#include "meta.h" #define WANT_CONSOLE_LISTENER 1 @@ -105,6 +106,7 @@ int main(int argc, char *argv[]) { init_queue(); init_direct(); init_fanout(); + init_meta(); #if WANT_CONSOLE_LISTENER spawn(console_listener, NULL); #endif diff --git a/meta.c b/meta.c new file mode 100644 index 0000000..158c1b7 --- /dev/null +++ b/meta.c @@ -0,0 +1,46 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include +#include +#include +#include + +#include + +#include "cmsg_private.h" +#include "ref.h" +#include "sexp.h" +#include "hashtable.h" +#include "node.h" +#include "meta.h" + +static sexp_t *meta_sym = NULL; + +void init_meta(void) { + sexp_t *args; + meta_sym = INCREF(sexp_cstring("meta")); + args = INCREF(sexp_cons(meta_sym, NULL)); + new_node(lookup_node_class(cmsg_cstring_bytes("direct")), args, NULL); + DECREF(args, sexp_destructor); +} + +void announce_subscription(sexp_t *source, + sexp_t *filter, + sexp_t *sink, + sexp_t *name, + int onoff) +{ + if (meta_sym != NULL) { /* use this as a proxy for whether meta has been initialized or not */ + sexp_t *msg = NULL; + msg = sexp_cons(name, msg); + msg = sexp_cons(sink, msg); + msg = sexp_cons(filter, msg); + msg = sexp_cons(source, msg); + msg = sexp_cons(sexp_cstring(onoff ? "subscribed" : "unsubscribed"), msg); + INCREF(msg); + post_node(sexp_data(meta_sym), sexp_data(source), msg, NULL); + DECREF(msg, sexp_destructor); + } +} diff --git a/meta.h b/meta.h new file mode 100644 index 0000000..1e878b7 --- /dev/null +++ b/meta.h @@ -0,0 +1,12 @@ +#ifndef cmsg_meta_h +#define cmsg_meta_h + +extern void init_meta(void); + +extern void announce_subscription(sexp_t *source, + sexp_t *filter, + sexp_t *sink, + sexp_t *name, + int onoff); + +#endif diff --git a/node.c b/node.c index da16273..b3fc2fd 100644 --- a/node.c +++ b/node.c @@ -14,6 +14,7 @@ #include "sexpio.h" #include "hashtable.h" #include "node.h" +#include "meta.h" static hashtable_t node_class_table; static hashtable_t directory; @@ -92,13 +93,25 @@ node_t *lookup_node(cmsg_bytes_t name) { return n; } +static void announce_binding(cmsg_bytes_t name, int onoff) { + sexp_t *filter = sexp_bytes(name); + INCREF(filter); + announce_subscription(sexp_empty_bytes, filter, sexp_empty_bytes, sexp_empty_bytes, onoff); + DECREF(filter, sexp_destructor); +} + int bind_node(cmsg_bytes_t name, node_t *n) { + if (name.len == 0) { + warn("Binding to empty name forbidden\n"); + return 0; + } if (hashtable_contains(&directory, name)) { return 0; } hashtable_put(&directory, name, n); hashtable_put(&n->names, name, NULL); info("Binding node <<%.*s>> of class %s\n", name.len, name.bytes, n->node_class->name); + announce_binding(name, 1); return 1; } @@ -111,6 +124,7 @@ int unbind_node(cmsg_bytes_t name) { info("Unbinding node <<%.*s>> of class %s\n", name.len, name.bytes, n->node_class->name); hashtable_erase(&n->names, name); hashtable_erase(&directory, name); + announce_binding(name, 0); return 1; } } diff --git a/queue.c b/queue.c index 14852f2..756225c 100644 --- a/queue.c +++ b/queue.c @@ -109,7 +109,7 @@ static void shoveller(void *qv) { sexp_data(sub->name).len, sexp_data(sub->name).bytes); */ - if (!send_to_subscription(&q->subscriptions, sub, body)) { + if (!send_to_subscription(q->name, &q->subscriptions, sub, body)) { goto find_valid_waiter; } @@ -169,7 +169,7 @@ static void queue_handle_message(node_t *n, sexp_t *m) { } if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - subscription_t *sub = handle_subscribe_message(&q->subscriptions, args); + subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, args); if (sub != NULL) { enqueue(&q->waiter_q, sub); throck_shovel(q); @@ -178,7 +178,7 @@ static void queue_handle_message(node_t *n, sexp_t *m) { } if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - handle_unsubscribe_message(&q->subscriptions, args); + handle_unsubscribe_message(q->name, &q->subscriptions, args); return; } diff --git a/subscription.c b/subscription.c index 3f71981..53482cd 100644 --- a/subscription.c +++ b/subscription.c @@ -17,9 +17,11 @@ #include "hashtable.h" #include "subscription.h" #include "node.h" +#include "meta.h" void free_subscription(subscription_t *sub) { DECREF(sub->uuid, sexp_destructor); + DECREF(sub->filter, sexp_destructor); DECREF(sub->sink, sexp_destructor); DECREF(sub->name, sexp_destructor); free(sub); @@ -35,7 +37,8 @@ void free_subscription_chain(subscription_t *chain) { /* Returns true if the subscription has not been unsubscribed and the destination of the subscription exists. */ -int send_to_subscription(hashtable_t *subscriptions, +int send_to_subscription(sexp_t *source, + hashtable_t *subscriptions, subscription_t *sub, sexp_t *body) { @@ -43,6 +46,7 @@ int send_to_subscription(hashtable_t *subscriptions, free_subscription(sub); return 0; } else if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid)) { + announce_subscription(source, sub->filter, sub->sink, sub->name, 0); hashtable_erase(subscriptions, sexp_data(sub->uuid)); free_subscription(sub); return 0; @@ -51,7 +55,8 @@ int send_to_subscription(hashtable_t *subscriptions, } } -subscription_t *send_to_subscription_chain(hashtable_t *subscriptions, +subscription_t *send_to_subscription_chain(sexp_t *source, + hashtable_t *subscriptions, subscription_t *chain, sexp_t *body) { @@ -59,7 +64,7 @@ subscription_t *send_to_subscription_chain(hashtable_t *subscriptions, subscription_t *prev = NULL; while (chain != NULL) { subscription_t *next = chain->link; - if (!send_to_subscription(subscriptions, chain, body)) { + if (!send_to_subscription(source, subscriptions, chain, body)) { if (prev == NULL) { top = next; } else { @@ -72,24 +77,26 @@ subscription_t *send_to_subscription_chain(hashtable_t *subscriptions, return top; } -subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *args) { +subscription_t *handle_subscribe_message(sexp_t *source, + hashtable_t *subscriptions, + sexp_t *args) +{ unsigned char uuid[CMSG_UUID_BUF_SIZE]; if (gen_uuid(uuid) != 0) { warn("Could not generate UUID\n"); return NULL; } else { - sexp_t *filter = sexp_listref(args, 0); sexp_t *reply_sink = sexp_listref(args, 3); sexp_t *reply_name = sexp_listref(args, 4); subscription_t *sub = malloc(sizeof(*sub)); sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); + sub->filter = sexp_listref(args, 0); sub->sink = sexp_listref(args, 1); sub->name = sexp_listref(args, 2); sub->link = NULL; - if (!sexp_stringp(filter) - || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name) + if (!sexp_stringp(sub->filter) || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name) || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { DECREF(sub->uuid, sexp_destructor); free(sub); @@ -97,11 +104,14 @@ subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *arg return NULL; } + INCREF(sub->filter); INCREF(sub->sink); INCREF(sub->name); hashtable_put(subscriptions, sexp_data(sub->uuid), sub); + announce_subscription(source, sub->filter, sub->sink, sub->name, 1); + { sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); INCREF(subok); @@ -113,7 +123,10 @@ subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *arg } } -void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args) { +void handle_unsubscribe_message(sexp_t *source, + hashtable_t *subscriptions, + sexp_t *args) +{ cmsg_bytes_t uuid; subscription_t *sub; @@ -125,6 +138,7 @@ void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args) { uuid = sexp_data(sexp_head(args)); if (hashtable_get(subscriptions, uuid, (void **) &sub)) { /* TODO: clean up more eagerly perhaps? */ + announce_subscription(source, sub->filter, sub->sink, sub->name, 0); DECREF(sub->uuid, sexp_destructor); sub->uuid = NULL; hashtable_erase(subscriptions, uuid); diff --git a/subscription.h b/subscription.h index e6ea884..9b5aeec 100644 --- a/subscription.h +++ b/subscription.h @@ -3,6 +3,7 @@ typedef struct subscription_t_ { sexp_t *uuid; + sexp_t *filter; sexp_t *sink; sexp_t *name; struct subscription_t_ *link; @@ -11,14 +12,21 @@ typedef struct subscription_t_ { extern void free_subscription(subscription_t *sub); extern void free_subscription_chain(subscription_t *chain); -extern int send_to_subscription(hashtable_t *subscriptions, +extern int send_to_subscription(sexp_t *source, + hashtable_t *subscriptions, subscription_t *sub, sexp_t *body); -extern subscription_t *send_to_subscription_chain(hashtable_t *subscriptions, +extern subscription_t *send_to_subscription_chain(sexp_t *source, + hashtable_t *subscriptions, subscription_t *chain, sexp_t *body); -extern subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *args); -extern void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args); +extern subscription_t *handle_subscribe_message(sexp_t *source, + hashtable_t *subscriptions, + sexp_t *args); + +extern void handle_unsubscribe_message(sexp_t *source, + hashtable_t *subscriptions, + sexp_t *args); #endif diff --git a/t0 b/t0 new file mode 100644 index 0000000..083b8e1 --- /dev/null +++ b/t0 @@ -0,0 +1,2 @@ +(9:subscribe5:test00:0:5:test05:login) +(4:post4:meta(9:subscribe0:5:test08:presence5:test01:k)0:)