From 5913f2008a94f1fe8c357c4da02e29c78bfecf7d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 9 Jan 2011 19:42:36 -0500 Subject: [PATCH] Use generated message-matching code --- .gitignore | 2 + server/Makefile | 12 +++++- server/codegen.py | 95 +++++++++++++++++++++++++++++++++++++++++ server/direct.c | 40 ++++++----------- server/fanout.c | 32 +++++--------- server/main.c | 43 ++++++++----------- server/messages.json | 42 ++++++++++++++++++ server/meta.c | 21 ++++----- server/node.c | 14 +----- server/queue.c | 31 +++++--------- server/relay.c | 99 +++++++++++++++++-------------------------- server/sexp.h | 5 +++ server/subscription.c | 29 ++++++------- server/subscription.h | 4 +- 14 files changed, 272 insertions(+), 197 deletions(-) create mode 100644 server/codegen.py create mode 100644 server/messages.json diff --git a/.gitignore b/.gitignore index 77321e9..c235ef9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ scratch/ *.o +server/messages.h +server/messages.c server/cmsg server/test1 server/test3 diff --git a/server/Makefile b/server/Makefile index 44ba6f9..dda4a5a 100644 --- a/server/Makefile +++ b/server/Makefile @@ -1,6 +1,6 @@ TARGET = cmsg OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \ - queue.o direct.o fanout.o subscription.o meta.o + queue.o direct.o fanout.o subscription.o meta.o messages.o UUID_CFLAGS:=$(shell uuid-config --cflags) UUID_LDFLAGS:=$(shell uuid-config --ldflags) @@ -25,14 +25,22 @@ $(TARGET): $(OBJECTS) %.o: %.c $(CC) $(CFLAGS) -c $< +messages.c: messages.json codegen.py + python codegen.py body > $@ + +messages.h: messages.json codegen.py + python codegen.py header > $@ + clean: rm -f $(TARGET) rm -f $(OBJECTS) rm -rf *.dSYM - rm -f depend.mk + rm -f depend.mk messages.c messages.h rm -f test1 test3 test1.o test3.o test1_latency test3_latency test1_latency.o test3_latency.o depend.mk: + touch messages.h gcc $(CFLAGS) -M *.c > $@ + rm messages.h echo "depend.mk:" Makefile *.c >> $@ -include depend.mk diff --git a/server/codegen.py b/server/codegen.py new file mode 100644 index 0000000..0810bd1 --- /dev/null +++ b/server/codegen.py @@ -0,0 +1,95 @@ +from __future__ import with_statement + +# Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved. +copyright_stmt = '/* Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved. */' + +import sys +import json + +def cify(s): + s = s.replace('-', '_') + s = s.replace(' ', '_') + return s + +class MessageType: + def __init__(self, j): + self.wire_selector = j['selector'] + self.selector = cify(self.wire_selector) + self.wire_argnames = j['args'] + self.argnames = map(cify, self.wire_argnames) + + def format_args(self, template, separator = ', '): + return separator.join([template % (x,) for x in self.argnames]) + +with file("messages.json") as f: + spec = map(MessageType, json.load(f)) + +def entrypoint_header(): + print copyright_stmt + print + print '#ifndef cmsg_messages_h' + print '#define cmsg_messages_h' + print + print 'extern void init_messages(void);' + print + for t in spec: + print 'extern sexp_t *selector_%s;' % (t.selector,) + print + for t in spec: + print 'extern sexp_t *message_%s(%s);' % (t.selector, t.format_args('sexp_t *%s')) + print + print 'typedef union parsed_message_t_ {' + for t in spec: + if t.argnames: + print ' struct { sexp_t %s; } %s;' % (t.format_args('*%s'), t.selector) + print '} parsed_message_t;' + for t in spec: + print + print 'static inline int parse_%s(sexp_t *message, parsed_message_t *out) {' % \ + (t.selector,) + print ' if (!sexp_pairp(message)) return 0;' + print ' if (sexp_cmp(sexp_head(message), selector_%s) != 0) return 0;' % (t.selector,) + for n in t.argnames: + print ' if (!sexp_pseudo_pop(&message)) return 0;' + print ' out->%s.%s = sexp_head(message);' % (t.selector, n) + print ' return sexp_tail(message) == NULL;' + print '}' + print + print '#endif' + +def entrypoint_body(): + print copyright_stmt + print + print '#include ' + print '#include ' + print '#include ' + print '#include ' + print + print '#include ' + print + print '#include "cmsg_private.h"' + print '#include "ref.h"' + print '#include "sexp.h"' + print '#include "messages.h"' + print + for t in spec: + print 'sexp_t *selector_%s = NULL;' % (t.selector,) + print + print 'void init_messages(void) {' + for t in spec: + print ' selector_%s = sexp_cstring("%s");' % (t.selector, t.wire_selector) + for t in spec: + print ' INCREF(selector_%s);' % (t.selector,) + print '}' + for t in spec: + print + print 'sexp_t *message_%s(%s) {' % (t.selector, t.format_args('sexp_t *%s')) + print ' sexp_t *m = NULL;' + for n in reversed(t.argnames): + print ' m = sexp_cons(%s, m);' % (n,) + print ' return sexp_cons(selector_%s, m);' % (t.selector,) + print '}' + +if __name__ == '__main__': + drivername = sys.argv[1] + globals()['entrypoint_' + drivername]() diff --git a/server/direct.c b/server/direct.c index e6b0f6f..5bd0b21 100644 --- a/server/direct.c +++ b/server/direct.c @@ -17,7 +17,9 @@ #include "sexp.h" #include "hashtable.h" #include "node.h" +#include "messages.h" #include "subscription.h" +#include "sexpio.h" typedef struct direct_extension_t_ { sexp_t *name; @@ -67,47 +69,33 @@ static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) { static void direct_handle_message(node_t *n, sexp_t *m) { direct_extension_t *d = n->extension; + parsed_message_t p; - size_t msglen = sexp_length(m); - sexp_t *args; - cmsg_bytes_t selector; - - if (msglen == 0 || !sexp_stringp(sexp_head(m))) { - warn("Invalid message in direct\n"); - return; - } - - selector = sexp_data(sexp_head(m)); - args = sexp_tail(m); - - if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) { - sexp_t *rk = sexp_listref(args, 0); - if (sexp_stringp(rk)) { - route_message(d, rk, sexp_listref(args, 1)); + if (parse_post(m, &p)) { + if (sexp_stringp(p.post.name)) { + route_message(d, p.post.name, p.post.body); } else { warn("Non-string routing key in direct\n"); } return; } - if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, args); + if (parse_subscribe(m, &p)) { + subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, &p); if (sub != NULL) { - sexp_t *filter = sexp_listref(args, 0); - hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link); - hashtable_put(&d->routing_table, sexp_data(filter), sub); + hashtable_get(&d->routing_table, sexp_data(p.subscribe.filter), (void **) &sub->link); + hashtable_put(&d->routing_table, sexp_data(p.subscribe.filter), sub); } return; } - if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - handle_unsubscribe_message(d->name, &d->subscriptions, args); + if (parse_unsubscribe(m, &p)) { + handle_unsubscribe_message(d->name, &d->subscriptions, &p); return; } - warn("Message not understood in direct; selector <<%.*s>>, length %u\n", - selector.len, selector.bytes, - msglen); + warn("Message not understood in direct: "); + sexp_writeln(stderr_h, m); } static node_class_t direct_class = { diff --git a/server/fanout.c b/server/fanout.c index 5cf503b..47d3418 100644 --- a/server/fanout.c +++ b/server/fanout.c @@ -17,7 +17,9 @@ #include "sexp.h" #include "hashtable.h" #include "node.h" +#include "messages.h" #include "subscription.h" +#include "sexpio.h" typedef struct fanout_extension_t_ { sexp_t *name; @@ -60,40 +62,28 @@ static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) { static void fanout_handle_message(node_t *n, sexp_t *m) { fanout_extension_t *f = n->extension; + parsed_message_t p; - size_t msglen = sexp_length(m); - sexp_t *args; - cmsg_bytes_t selector; - - if (msglen == 0 || !sexp_stringp(sexp_head(m))) { - warn("Invalid message in fanout\n"); - return; - } - - selector = sexp_data(sexp_head(m)); - args = sexp_tail(m); - - if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) { + if (parse_post(m, &p)) { struct delivery_context context; context.f = f; - context.body = sexp_listref(args, 1); + context.body = p.post.body; hashtable_foreach(&f->subscriptions, send_to_sub, &context); return; } - if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - handle_subscribe_message(f->name, &f->subscriptions, args); + if (parse_subscribe(m, &p)) { + handle_subscribe_message(f->name, &f->subscriptions, &p); return; } - if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - handle_unsubscribe_message(f->name, &f->subscriptions, args); + if (parse_unsubscribe(m, &p)) { + handle_unsubscribe_message(f->name, &f->subscriptions, &p); return; } - warn("Message not understood in fanout; selector <<%.*s>>, length %u\n", - selector.len, selector.bytes, - msglen); + warn("Message not understood in fanout: "); + sexp_writeln(stderr_h, m); } static node_class_t fanout_class = { diff --git a/server/main.c b/server/main.c index e04d8c2..5bae2b2 100644 --- a/server/main.c +++ b/server/main.c @@ -24,49 +24,41 @@ typedef unsigned char u_char; #include "direct.h" #include "fanout.h" #include "meta.h" +#include "messages.h" +#include "sexpio.h" #define WANT_CONSOLE_LISTENER 1 static void factory_handle_message(node_t *n, sexp_t *m) { - size_t msglen = sexp_length(m); - sexp_t *args; - cmsg_bytes_t selector; + parsed_message_t p; - if (msglen == 0 || !sexp_stringp(sexp_head(m))) { - warn("Invalid message in factory\n"); - return; - } - - selector = sexp_data(sexp_head(m)); - args = sexp_tail(m); - - if ((msglen == 5) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("create"))) { - sexp_t *classname = sexp_listref(args, 0); - sexp_t *ctor_arg = sexp_listref(args, 1); - sexp_t *reply_sink = sexp_listref(args, 2); - sexp_t *reply_name = sexp_listref(args, 3); - if (sexp_stringp(classname) && sexp_stringp(reply_sink) && sexp_stringp(reply_name)) { - cmsg_bytes_t classname_bytes = sexp_data(classname); + if (parse_create(m, &p)) { + if (sexp_stringp(p.create.classname) + && sexp_stringp(p.create.reply_sink) + && sexp_stringp(p.create.reply_name)) { + cmsg_bytes_t classname_bytes = sexp_data(p.create.classname); node_class_t *nc = lookup_node_class(classname_bytes); if (nc == NULL) { warn("Node class not found <<%.*s>>\n", classname_bytes.len, classname_bytes.bytes); } else { sexp_t *error = NULL; sexp_t *reply; - if (new_node(nc, ctor_arg, &error) != NULL) { - reply = sexp_cons(sexp_cstring("create-ok"), NULL); + if (new_node(nc, p.create.arg, &error) != NULL) { + reply = message_create_ok(); } else { - reply = sexp_cons(sexp_cstring("create-failed"), sexp_cons(error, NULL)); + reply = message_create_failed(error); } - post_node(sexp_data(reply_sink), sexp_data(reply_name), reply, sexp_empty_bytes); + post_node(sexp_data(p.create.reply_sink), + sexp_data(p.create.reply_name), + reply, + sexp_empty_bytes); } } return; } - warn("Message not understood in factory; selector <<%.*s>>, length %u\n", - selector.len, selector.bytes, - msglen); + warn("Message not understood in factory: "); + sexp_writeln(stderr_h, m); } static node_class_t factory_class = { @@ -99,6 +91,7 @@ int main(int argc, char *argv[]) { signal(SIGPIPE, SIG_IGN); /* avoid EPIPE when connections drop unexpectedly */ info("Using libevent version %s\n", event_get_version()); init_sexp(); + init_messages(); init_node(cmsg_cstring_bytes("server")); init_factory(); init_queue(); diff --git a/server/messages.json b/server/messages.json new file mode 100644 index 0000000..163aab7 --- /dev/null +++ b/server/messages.json @@ -0,0 +1,42 @@ +[ + { + "selector": "create", + "args": ["classname", "arg", "reply-sink", "reply-name"] + }, + { + "selector": "create-ok", + "args": [] + }, + { + "selector": "create-failed", + "args": ["reason"] + }, + { + "selector": "subscribed", + "args": ["source", "filter", "sink", "name"] + }, + { + "selector": "unsubscribed", + "args": ["source", "filter", "sink", "name"] + }, + { + "selector": "post", + "args": ["name", "body", "token"] + }, + { + "selector": "subscribe", + "args": ["filter", "sink", "name", "reply_sink", "reply_name"] + }, + { + "selector": "subscribe-ok", + "args": ["token"] + }, + { + "selector": "unsubscribe", + "args": ["token"] + }, + { + "selector": "error", + "args": ["message", "details"] + } +] diff --git a/server/meta.c b/server/meta.c index 17074f1..bdd6a32 100644 --- a/server/meta.c +++ b/server/meta.c @@ -15,13 +15,11 @@ #include "hashtable.h" #include "node.h" #include "meta.h" - -static sexp_t *meta_sym = NULL; +#include "messages.h" void init_meta(void) { sexp_t *args; - meta_sym = INCREF(sexp_cstring("meta")); - args = INCREF(sexp_cons(meta_sym, NULL)); + args = INCREF(sexp_cons(sexp_cstring("meta"), NULL)); new_node(lookup_node_class(cmsg_cstring_bytes("direct")), args, NULL); DECREF(args, sexp_destructor); } @@ -32,13 +30,10 @@ void announce_subscription(sexp_t *source, sexp_t *name, int onoff) { - if (meta_sym != NULL) { /* use this as a proxy for whether meta has been initialized or not */ - sexp_t *msg = NULL; - msg = sexp_cons(name, msg); - msg = sexp_cons(sink, msg); - msg = sexp_cons(filter, msg); - msg = sexp_cons(source, msg); - msg = sexp_cons(sexp_cstring(onoff ? "subscribed" : "unsubscribed"), msg); - post_node(sexp_data(meta_sym), sexp_data(source), msg, NULL); - } + post_node(cmsg_cstring_bytes("meta"), + sexp_data(source), + onoff + ? message_subscribed(source, filter, sink, name) + : message_unsubscribed(source, filter, sink, name), + NULL); } diff --git a/server/node.c b/server/node.c index 25da8fe..427e95a 100644 --- a/server/node.c +++ b/server/node.c @@ -17,6 +17,7 @@ #include "hashtable.h" #include "node.h" #include "meta.h" +#include "messages.h" static cmsg_bytes_t _container_name; static hashtable_t node_class_table; @@ -157,18 +158,7 @@ void unbind_all_names_for_node(node_t *n) { } int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token) { - static sexp_t *post_atom = NULL; - sexp_t *msg = NULL; - - if (post_atom == NULL) { - post_atom = INCREF(sexp_cstring("post")); - } - - msg = sexp_cons(token, msg); - msg = sexp_cons(body, msg); - msg = sexp_cons(sexp_bytes(name), msg); - msg = sexp_cons(post_atom, msg); - return send_node_release(node, msg); + return send_node_release(node, message_post(sexp_bytes(name), body, token)); } int send_node(cmsg_bytes_t node, sexp_t *message) { diff --git a/server/queue.c b/server/queue.c index 4ec5c73..6ac9488 100644 --- a/server/queue.c +++ b/server/queue.c @@ -20,6 +20,7 @@ #include "node.h" #include "queue.h" #include "dataq.h" +#include "messages.h" #include "subscription.h" typedef struct queue_extension_t_ { @@ -156,27 +157,16 @@ static void throck_shovel(queue_extension_t *q) { static void queue_handle_message(node_t *n, sexp_t *m) { queue_extension_t *q = n->extension; + parsed_message_t p; - size_t msglen = sexp_length(m); - sexp_t *args; - cmsg_bytes_t selector; - - if (msglen == 0 || !sexp_stringp(sexp_head(m))) { - warn("Invalid message in queue\n"); - return; - } - - selector = sexp_data(sexp_head(m)); - args = sexp_tail(m); - - if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) { - sexp_enqueue(q->backlog_q, sexp_listref(args, 1)); + if (parse_post(m, &p)) { + sexp_enqueue(q->backlog_q, p.post.body); throck_shovel(q); return; } - if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { - subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, args); + if (parse_subscribe(m, &p)) { + subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, &p); if (sub != NULL) { enqueue(&q->waiter_q, sub); throck_shovel(q); @@ -184,14 +174,13 @@ static void queue_handle_message(node_t *n, sexp_t *m) { return; } - if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { - handle_unsubscribe_message(q->name, &q->subscriptions, args); + if (parse_unsubscribe(m, &p)) { + handle_unsubscribe_message(q->name, &q->subscriptions, &p); return; } - warn("Message not understood in queue; selector <<%.*s>>, length %u\n", - selector.len, selector.bytes, - msglen); + warn("Message not understood in queue: "); + sexp_writeln(stderr_h, m); } static node_class_t queue_class = { diff --git a/server/relay.c b/server/relay.c index b5f73b4..35c6469 100644 --- a/server/relay.c +++ b/server/relay.c @@ -30,6 +30,7 @@ typedef unsigned char u_char; #include "sexpio.h" #include "hashtable.h" #include "node.h" +#include "messages.h" #define WANT_MESSAGE_TRACE 0 @@ -78,12 +79,11 @@ static node_class_t relay_class = { .handle_message = relay_handle_message }; -static void send_error(IOHandle *h, char const *message, sexp_t *extra) { - sexp_t *m = extra; - m = sexp_cons(sexp_cstring(message), m); - m = sexp_cons(sexp_cstring("error"), m); +static void send_error(IOHandle *h, char const *message, sexp_t *details) { + sexp_t *m = message_error(sexp_cstring(message), details); INCREF(m); - + warn("Sending error: "); + sexp_writeln(stderr_h, m); iohandle_clear_error(h); BCHECK(!sexp_write(h, m), "send_error sexp_write"); DECREF(m, sexp_destructor); @@ -92,13 +92,14 @@ static void send_error(IOHandle *h, char const *message, sexp_t *extra) { static void send_sexp_syntax_error(IOHandle *h, char const *message) { char const *url = "http://people.csail.mit.edu/rivest/Sexp.txt"; - send_error(h, message, sexp_cons(sexp_cstring(url), NULL)); + send_error(h, message, sexp_cstring(url)); } static void relay_main(node_t *n) { relay_extension_t *r = n->extension; IOHandle *inh = new_iohandle(r->fd); sexp_t *message = NULL; /* held */ + parsed_message_t p; INCREF(n); /* because the caller doesn't hold a ref, and we need to drop ours on our death */ @@ -109,13 +110,9 @@ static void relay_main(node_t *n) { ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting"); { - sexp_t *s = NULL; - s = sexp_cons(sexp_empty_bytes, s); - s = sexp_cons(sexp_empty_bytes, s); - s = sexp_cons(sexp_empty_bytes, s); - s = sexp_cons(sexp_empty_bytes, s); - s = sexp_cons(sexp_bytes(local_container_name()), s); - s = sexp_cons(sexp_cstring("subscribe"), s); + sexp_t *s = message_subscribe(sexp_bytes(local_container_name()), + sexp_empty_bytes, sexp_empty_bytes, + sexp_empty_bytes, sexp_empty_bytes); INCREF(s); sexp_write(r->outh, s); DECREF(s, sexp_destructor); @@ -134,54 +131,36 @@ static void relay_main(node_t *n) { sexp_writeln(stderr_h, message); #endif - if (!(sexp_pairp(message) && sexp_stringp(sexp_head(message)))) { - info("Ill-formed message\n"); - send_error(r->outh, "ill-formed message", NULL); - goto protocol_error; - } - - { - cmsg_bytes_t selector = sexp_data(sexp_head(message)); - sexp_t *args = sexp_tail(message); - size_t msglen = sexp_length(message); - - /* TODO: have constant atoms for post, subscribe, unsubscribe - etc (see also post_node() and the handler in queue.c etc) */ - - if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post")) && (msglen == 4) - && sexp_stringp(sexp_head(args))) { - cmsg_bytes_t nodename = sexp_data(sexp_head(args)); - if (!send_node(nodename, sexp_head(sexp_tail(args)))) { - warn("Was asked to post to unknown node <<%.*s>>\n", nodename.len, nodename.bytes); - } - } else if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe")) && (msglen == 6) - && sexp_stringp(sexp_head(args)) - && sexp_stringp(sexp_head(sexp_tail(sexp_tail(sexp_tail(args))))) - && sexp_stringp(sexp_head(sexp_tail(sexp_tail(sexp_tail(sexp_tail(args))))))) { - sexp_t *filter_sexp = sexp_head(args); - cmsg_bytes_t filter = sexp_data(filter_sexp); - sexp_t *reply_sink = sexp_listref(args, 3); - sexp_t *reply_name = sexp_listref(args, 4); - if (bind_node(filter, n)) { - sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(filter_sexp, NULL)); - post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); - - DECREF(r->remote_container_name, sexp_destructor); - r->remote_container_name = INCREF(filter_sexp); - } else { - warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes); - } - } else if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe")) && (msglen == 2) - && sexp_stringp(sexp_head(args))) { - sexp_t *id_sexp = sexp_head(args); - cmsg_bytes_t id = sexp_data(id_sexp); - if (!unbind_node(id)) { - warn("Unbind failed <<%.*s>>\n", id.len, id.bytes); - } - } else { - send_error(r->outh, "message not understood", sexp_cons(message, NULL)); - goto protocol_error; + if (parse_post(message, &p) && sexp_stringp(p.post.name)) { + cmsg_bytes_t nodename = sexp_data(p.post.name); + if (!send_node(nodename, p.post.body)) { + warn("Was asked to post to unknown node <<%.*s>>\n", nodename.len, nodename.bytes); } + } else if (parse_subscribe(message, &p) + && sexp_stringp(p.subscribe.filter) + && sexp_stringp(p.subscribe.reply_sink) + && sexp_stringp(p.subscribe.reply_name)) { + if (bind_node(sexp_data(p.subscribe.filter), n)) { + post_node(sexp_data(p.subscribe.reply_sink), + sexp_data(p.subscribe.reply_name), + message_subscribe_ok(p.subscribe.filter), + sexp_empty_bytes); + + DECREF(r->remote_container_name, sexp_destructor); + r->remote_container_name = INCREF(p.subscribe.filter); + } else { + cmsg_bytes_t filter = sexp_data(p.subscribe.filter); + warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes); + } + } else if (parse_unsubscribe(message, &p) + && sexp_stringp(p.unsubscribe.token)) { + cmsg_bytes_t id = sexp_data(p.unsubscribe.token); + if (!unbind_node(id)) { + warn("Unbind failed <<%.*s>>\n", id.len, id.bytes); + } + } else { + send_error(r->outh, "message not understood", message); + goto protocol_error; } } diff --git a/server/sexp.h b/server/sexp.h index 2c9058f..efacc97 100644 --- a/server/sexp.h +++ b/server/sexp.h @@ -118,6 +118,11 @@ static inline sexp_t *sexp_pop(sexp_t *oldstack, sexp_t **valp) { return nextstack; } +static inline int sexp_pseudo_pop(sexp_t **px) { + *px = sexp_tail(*px); + return sexp_pairp(*px); +} + static inline sexp_t *sexp_listtail(sexp_t *list, size_t dropcount) { while (dropcount) { list = sexp_tail(list); diff --git a/server/subscription.c b/server/subscription.c index 6d42b71..e9032fd 100644 --- a/server/subscription.c +++ b/server/subscription.c @@ -15,9 +15,10 @@ #include "ref.h" #include "sexp.h" #include "hashtable.h" -#include "subscription.h" #include "node.h" #include "meta.h" +#include "messages.h" +#include "subscription.h" void free_subscription(subscription_t *sub) { DECREF(sub->uuid, sexp_destructor); @@ -79,25 +80,23 @@ subscription_t *send_to_subscription_chain(sexp_t *source, subscription_t *handle_subscribe_message(sexp_t *source, hashtable_t *subscriptions, - sexp_t *args) + parsed_message_t *p) { unsigned char uuid[CMSG_UUID_BUF_SIZE]; if (gen_uuid(uuid) != 0) { warn("Could not generate UUID\n"); return NULL; } else { - sexp_t *reply_sink = sexp_listref(args, 3); - sexp_t *reply_name = sexp_listref(args, 4); subscription_t *sub = malloc(sizeof(*sub)); sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); - sub->filter = sexp_listref(args, 0); - sub->sink = sexp_listref(args, 1); - sub->name = sexp_listref(args, 2); + sub->filter = p->subscribe.filter; + sub->sink = p->subscribe.sink; + sub->name = p->subscribe.name; sub->link = NULL; if (!sexp_stringp(sub->filter) || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name) - || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { + || !sexp_stringp(p->subscribe.reply_sink) || !sexp_stringp(p->subscribe.reply_name)) { DECREF(sub->uuid, sexp_destructor); free(sub); warn("Bad sink/name/reply_sink/reply_name in subscribe"); @@ -112,10 +111,10 @@ subscription_t *handle_subscribe_message(sexp_t *source, announce_subscription(source, sub->filter, sub->sink, sub->name, 1); - { - sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); - post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); - } + post_node(sexp_data(p->subscribe.reply_sink), + sexp_data(p->subscribe.reply_name), + message_subscribe_ok(sub->uuid), + sexp_empty_bytes); return sub; } @@ -123,17 +122,17 @@ subscription_t *handle_subscribe_message(sexp_t *source, void handle_unsubscribe_message(sexp_t *source, hashtable_t *subscriptions, - sexp_t *args) + parsed_message_t *p) { cmsg_bytes_t uuid; subscription_t *sub; - if (!sexp_stringp(sexp_head(args))) { + if (!sexp_stringp(p->unsubscribe.token)) { warn("Invalid unsubscription\n"); return; } - uuid = sexp_data(sexp_head(args)); + uuid = sexp_data(p->unsubscribe.token); if (hashtable_get(subscriptions, uuid, (void **) &sub)) { /* TODO: clean up more eagerly perhaps? */ announce_subscription(source, sub->filter, sub->sink, sub->name, 0); diff --git a/server/subscription.h b/server/subscription.h index b514208..273d1e9 100644 --- a/server/subscription.h +++ b/server/subscription.h @@ -25,10 +25,10 @@ extern subscription_t *send_to_subscription_chain(sexp_t *source, extern subscription_t *handle_subscribe_message(sexp_t *source, hashtable_t *subscriptions, - sexp_t *args); + parsed_message_t *p); extern void handle_unsubscribe_message(sexp_t *source, hashtable_t *subscriptions, - sexp_t *args); + parsed_message_t *p); #endif