Use generated message-matching code

This commit is contained in:
Tony Garnock-Jones 2011-01-09 19:42:36 -05:00
parent 4ab09181c3
commit 5913f2008a
14 changed files with 272 additions and 197 deletions

2
.gitignore vendored
View File

@ -1,5 +1,7 @@
scratch/ scratch/
*.o *.o
server/messages.h
server/messages.c
server/cmsg server/cmsg
server/test1 server/test1
server/test3 server/test3

View File

@ -1,6 +1,6 @@
TARGET = cmsg TARGET = cmsg
OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \ OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \
queue.o direct.o fanout.o subscription.o meta.o queue.o direct.o fanout.o subscription.o meta.o messages.o
UUID_CFLAGS:=$(shell uuid-config --cflags) UUID_CFLAGS:=$(shell uuid-config --cflags)
UUID_LDFLAGS:=$(shell uuid-config --ldflags) UUID_LDFLAGS:=$(shell uuid-config --ldflags)
@ -25,14 +25,22 @@ $(TARGET): $(OBJECTS)
%.o: %.c %.o: %.c
$(CC) $(CFLAGS) -c $< $(CC) $(CFLAGS) -c $<
messages.c: messages.json codegen.py
python codegen.py body > $@
messages.h: messages.json codegen.py
python codegen.py header > $@
clean: clean:
rm -f $(TARGET) rm -f $(TARGET)
rm -f $(OBJECTS) rm -f $(OBJECTS)
rm -rf *.dSYM rm -rf *.dSYM
rm -f depend.mk rm -f depend.mk messages.c messages.h
rm -f test1 test3 test1.o test3.o test1_latency test3_latency test1_latency.o test3_latency.o rm -f test1 test3 test1.o test3.o test1_latency test3_latency test1_latency.o test3_latency.o
depend.mk: depend.mk:
touch messages.h
gcc $(CFLAGS) -M *.c > $@ gcc $(CFLAGS) -M *.c > $@
rm messages.h
echo "depend.mk:" Makefile *.c >> $@ echo "depend.mk:" Makefile *.c >> $@
-include depend.mk -include depend.mk

95
server/codegen.py Normal file
View File

@ -0,0 +1,95 @@
from __future__ import with_statement
# Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved.
copyright_stmt = '/* Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved. */'
import sys
import json
def cify(s):
s = s.replace('-', '_')
s = s.replace(' ', '_')
return s
class MessageType:
def __init__(self, j):
self.wire_selector = j['selector']
self.selector = cify(self.wire_selector)
self.wire_argnames = j['args']
self.argnames = map(cify, self.wire_argnames)
def format_args(self, template, separator = ', '):
return separator.join([template % (x,) for x in self.argnames])
with file("messages.json") as f:
spec = map(MessageType, json.load(f))
def entrypoint_header():
print copyright_stmt
print
print '#ifndef cmsg_messages_h'
print '#define cmsg_messages_h'
print
print 'extern void init_messages(void);'
print
for t in spec:
print 'extern sexp_t *selector_%s;' % (t.selector,)
print
for t in spec:
print 'extern sexp_t *message_%s(%s);' % (t.selector, t.format_args('sexp_t *%s'))
print
print 'typedef union parsed_message_t_ {'
for t in spec:
if t.argnames:
print ' struct { sexp_t %s; } %s;' % (t.format_args('*%s'), t.selector)
print '} parsed_message_t;'
for t in spec:
print
print 'static inline int parse_%s(sexp_t *message, parsed_message_t *out) {' % \
(t.selector,)
print ' if (!sexp_pairp(message)) return 0;'
print ' if (sexp_cmp(sexp_head(message), selector_%s) != 0) return 0;' % (t.selector,)
for n in t.argnames:
print ' if (!sexp_pseudo_pop(&message)) return 0;'
print ' out->%s.%s = sexp_head(message);' % (t.selector, n)
print ' return sexp_tail(message) == NULL;'
print '}'
print
print '#endif'
def entrypoint_body():
print copyright_stmt
print
print '#include <stdlib.h>'
print '#include <string.h>'
print '#include <stdio.h>'
print '#include <signal.h>'
print
print '#include <assert.h>'
print
print '#include "cmsg_private.h"'
print '#include "ref.h"'
print '#include "sexp.h"'
print '#include "messages.h"'
print
for t in spec:
print 'sexp_t *selector_%s = NULL;' % (t.selector,)
print
print 'void init_messages(void) {'
for t in spec:
print ' selector_%s = sexp_cstring("%s");' % (t.selector, t.wire_selector)
for t in spec:
print ' INCREF(selector_%s);' % (t.selector,)
print '}'
for t in spec:
print
print 'sexp_t *message_%s(%s) {' % (t.selector, t.format_args('sexp_t *%s'))
print ' sexp_t *m = NULL;'
for n in reversed(t.argnames):
print ' m = sexp_cons(%s, m);' % (n,)
print ' return sexp_cons(selector_%s, m);' % (t.selector,)
print '}'
if __name__ == '__main__':
drivername = sys.argv[1]
globals()['entrypoint_' + drivername]()

View File

@ -17,7 +17,9 @@
#include "sexp.h" #include "sexp.h"
#include "hashtable.h" #include "hashtable.h"
#include "node.h" #include "node.h"
#include "messages.h"
#include "subscription.h" #include "subscription.h"
#include "sexpio.h"
typedef struct direct_extension_t_ { typedef struct direct_extension_t_ {
sexp_t *name; sexp_t *name;
@ -67,47 +69,33 @@ static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) {
static void direct_handle_message(node_t *n, sexp_t *m) { static void direct_handle_message(node_t *n, sexp_t *m) {
direct_extension_t *d = n->extension; direct_extension_t *d = n->extension;
parsed_message_t p;
size_t msglen = sexp_length(m); if (parse_post(m, &p)) {
sexp_t *args; if (sexp_stringp(p.post.name)) {
cmsg_bytes_t selector; route_message(d, p.post.name, p.post.body);
if (msglen == 0 || !sexp_stringp(sexp_head(m))) {
warn("Invalid message in direct\n");
return;
}
selector = sexp_data(sexp_head(m));
args = sexp_tail(m);
if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) {
sexp_t *rk = sexp_listref(args, 0);
if (sexp_stringp(rk)) {
route_message(d, rk, sexp_listref(args, 1));
} else { } else {
warn("Non-string routing key in direct\n"); warn("Non-string routing key in direct\n");
} }
return; return;
} }
if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { if (parse_subscribe(m, &p)) {
subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, args); subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, &p);
if (sub != NULL) { if (sub != NULL) {
sexp_t *filter = sexp_listref(args, 0); hashtable_get(&d->routing_table, sexp_data(p.subscribe.filter), (void **) &sub->link);
hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link); hashtable_put(&d->routing_table, sexp_data(p.subscribe.filter), sub);
hashtable_put(&d->routing_table, sexp_data(filter), sub);
} }
return; return;
} }
if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { if (parse_unsubscribe(m, &p)) {
handle_unsubscribe_message(d->name, &d->subscriptions, args); handle_unsubscribe_message(d->name, &d->subscriptions, &p);
return; return;
} }
warn("Message not understood in direct; selector <<%.*s>>, length %u\n", warn("Message not understood in direct: ");
selector.len, selector.bytes, sexp_writeln(stderr_h, m);
msglen);
} }
static node_class_t direct_class = { static node_class_t direct_class = {

View File

@ -17,7 +17,9 @@
#include "sexp.h" #include "sexp.h"
#include "hashtable.h" #include "hashtable.h"
#include "node.h" #include "node.h"
#include "messages.h"
#include "subscription.h" #include "subscription.h"
#include "sexpio.h"
typedef struct fanout_extension_t_ { typedef struct fanout_extension_t_ {
sexp_t *name; sexp_t *name;
@ -60,40 +62,28 @@ static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) {
static void fanout_handle_message(node_t *n, sexp_t *m) { static void fanout_handle_message(node_t *n, sexp_t *m) {
fanout_extension_t *f = n->extension; fanout_extension_t *f = n->extension;
parsed_message_t p;
size_t msglen = sexp_length(m); if (parse_post(m, &p)) {
sexp_t *args;
cmsg_bytes_t selector;
if (msglen == 0 || !sexp_stringp(sexp_head(m))) {
warn("Invalid message in fanout\n");
return;
}
selector = sexp_data(sexp_head(m));
args = sexp_tail(m);
if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) {
struct delivery_context context; struct delivery_context context;
context.f = f; context.f = f;
context.body = sexp_listref(args, 1); context.body = p.post.body;
hashtable_foreach(&f->subscriptions, send_to_sub, &context); hashtable_foreach(&f->subscriptions, send_to_sub, &context);
return; return;
} }
if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { if (parse_subscribe(m, &p)) {
handle_subscribe_message(f->name, &f->subscriptions, args); handle_subscribe_message(f->name, &f->subscriptions, &p);
return; return;
} }
if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { if (parse_unsubscribe(m, &p)) {
handle_unsubscribe_message(f->name, &f->subscriptions, args); handle_unsubscribe_message(f->name, &f->subscriptions, &p);
return; return;
} }
warn("Message not understood in fanout; selector <<%.*s>>, length %u\n", warn("Message not understood in fanout: ");
selector.len, selector.bytes, sexp_writeln(stderr_h, m);
msglen);
} }
static node_class_t fanout_class = { static node_class_t fanout_class = {

View File

@ -24,49 +24,41 @@ typedef unsigned char u_char;
#include "direct.h" #include "direct.h"
#include "fanout.h" #include "fanout.h"
#include "meta.h" #include "meta.h"
#include "messages.h"
#include "sexpio.h"
#define WANT_CONSOLE_LISTENER 1 #define WANT_CONSOLE_LISTENER 1
static void factory_handle_message(node_t *n, sexp_t *m) { static void factory_handle_message(node_t *n, sexp_t *m) {
size_t msglen = sexp_length(m); parsed_message_t p;
sexp_t *args;
cmsg_bytes_t selector;
if (msglen == 0 || !sexp_stringp(sexp_head(m))) { if (parse_create(m, &p)) {
warn("Invalid message in factory\n"); if (sexp_stringp(p.create.classname)
return; && sexp_stringp(p.create.reply_sink)
} && sexp_stringp(p.create.reply_name)) {
cmsg_bytes_t classname_bytes = sexp_data(p.create.classname);
selector = sexp_data(sexp_head(m));
args = sexp_tail(m);
if ((msglen == 5) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("create"))) {
sexp_t *classname = sexp_listref(args, 0);
sexp_t *ctor_arg = sexp_listref(args, 1);
sexp_t *reply_sink = sexp_listref(args, 2);
sexp_t *reply_name = sexp_listref(args, 3);
if (sexp_stringp(classname) && sexp_stringp(reply_sink) && sexp_stringp(reply_name)) {
cmsg_bytes_t classname_bytes = sexp_data(classname);
node_class_t *nc = lookup_node_class(classname_bytes); node_class_t *nc = lookup_node_class(classname_bytes);
if (nc == NULL) { if (nc == NULL) {
warn("Node class not found <<%.*s>>\n", classname_bytes.len, classname_bytes.bytes); warn("Node class not found <<%.*s>>\n", classname_bytes.len, classname_bytes.bytes);
} else { } else {
sexp_t *error = NULL; sexp_t *error = NULL;
sexp_t *reply; sexp_t *reply;
if (new_node(nc, ctor_arg, &error) != NULL) { if (new_node(nc, p.create.arg, &error) != NULL) {
reply = sexp_cons(sexp_cstring("create-ok"), NULL); reply = message_create_ok();
} else { } else {
reply = sexp_cons(sexp_cstring("create-failed"), sexp_cons(error, NULL)); reply = message_create_failed(error);
} }
post_node(sexp_data(reply_sink), sexp_data(reply_name), reply, sexp_empty_bytes); post_node(sexp_data(p.create.reply_sink),
sexp_data(p.create.reply_name),
reply,
sexp_empty_bytes);
} }
} }
return; return;
} }
warn("Message not understood in factory; selector <<%.*s>>, length %u\n", warn("Message not understood in factory: ");
selector.len, selector.bytes, sexp_writeln(stderr_h, m);
msglen);
} }
static node_class_t factory_class = { static node_class_t factory_class = {
@ -99,6 +91,7 @@ int main(int argc, char *argv[]) {
signal(SIGPIPE, SIG_IGN); /* avoid EPIPE when connections drop unexpectedly */ signal(SIGPIPE, SIG_IGN); /* avoid EPIPE when connections drop unexpectedly */
info("Using libevent version %s\n", event_get_version()); info("Using libevent version %s\n", event_get_version());
init_sexp(); init_sexp();
init_messages();
init_node(cmsg_cstring_bytes("server")); init_node(cmsg_cstring_bytes("server"));
init_factory(); init_factory();
init_queue(); init_queue();

42
server/messages.json Normal file
View File

@ -0,0 +1,42 @@
[
{
"selector": "create",
"args": ["classname", "arg", "reply-sink", "reply-name"]
},
{
"selector": "create-ok",
"args": []
},
{
"selector": "create-failed",
"args": ["reason"]
},
{
"selector": "subscribed",
"args": ["source", "filter", "sink", "name"]
},
{
"selector": "unsubscribed",
"args": ["source", "filter", "sink", "name"]
},
{
"selector": "post",
"args": ["name", "body", "token"]
},
{
"selector": "subscribe",
"args": ["filter", "sink", "name", "reply_sink", "reply_name"]
},
{
"selector": "subscribe-ok",
"args": ["token"]
},
{
"selector": "unsubscribe",
"args": ["token"]
},
{
"selector": "error",
"args": ["message", "details"]
}
]

View File

@ -15,13 +15,11 @@
#include "hashtable.h" #include "hashtable.h"
#include "node.h" #include "node.h"
#include "meta.h" #include "meta.h"
#include "messages.h"
static sexp_t *meta_sym = NULL;
void init_meta(void) { void init_meta(void) {
sexp_t *args; sexp_t *args;
meta_sym = INCREF(sexp_cstring("meta")); args = INCREF(sexp_cons(sexp_cstring("meta"), NULL));
args = INCREF(sexp_cons(meta_sym, NULL));
new_node(lookup_node_class(cmsg_cstring_bytes("direct")), args, NULL); new_node(lookup_node_class(cmsg_cstring_bytes("direct")), args, NULL);
DECREF(args, sexp_destructor); DECREF(args, sexp_destructor);
} }
@ -32,13 +30,10 @@ void announce_subscription(sexp_t *source,
sexp_t *name, sexp_t *name,
int onoff) int onoff)
{ {
if (meta_sym != NULL) { /* use this as a proxy for whether meta has been initialized or not */ post_node(cmsg_cstring_bytes("meta"),
sexp_t *msg = NULL; sexp_data(source),
msg = sexp_cons(name, msg); onoff
msg = sexp_cons(sink, msg); ? message_subscribed(source, filter, sink, name)
msg = sexp_cons(filter, msg); : message_unsubscribed(source, filter, sink, name),
msg = sexp_cons(source, msg); NULL);
msg = sexp_cons(sexp_cstring(onoff ? "subscribed" : "unsubscribed"), msg);
post_node(sexp_data(meta_sym), sexp_data(source), msg, NULL);
}
} }

View File

@ -17,6 +17,7 @@
#include "hashtable.h" #include "hashtable.h"
#include "node.h" #include "node.h"
#include "meta.h" #include "meta.h"
#include "messages.h"
static cmsg_bytes_t _container_name; static cmsg_bytes_t _container_name;
static hashtable_t node_class_table; static hashtable_t node_class_table;
@ -157,18 +158,7 @@ void unbind_all_names_for_node(node_t *n) {
} }
int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token) { int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token) {
static sexp_t *post_atom = NULL; return send_node_release(node, message_post(sexp_bytes(name), body, token));
sexp_t *msg = NULL;
if (post_atom == NULL) {
post_atom = INCREF(sexp_cstring("post"));
}
msg = sexp_cons(token, msg);
msg = sexp_cons(body, msg);
msg = sexp_cons(sexp_bytes(name), msg);
msg = sexp_cons(post_atom, msg);
return send_node_release(node, msg);
} }
int send_node(cmsg_bytes_t node, sexp_t *message) { int send_node(cmsg_bytes_t node, sexp_t *message) {

View File

@ -20,6 +20,7 @@
#include "node.h" #include "node.h"
#include "queue.h" #include "queue.h"
#include "dataq.h" #include "dataq.h"
#include "messages.h"
#include "subscription.h" #include "subscription.h"
typedef struct queue_extension_t_ { typedef struct queue_extension_t_ {
@ -156,27 +157,16 @@ static void throck_shovel(queue_extension_t *q) {
static void queue_handle_message(node_t *n, sexp_t *m) { static void queue_handle_message(node_t *n, sexp_t *m) {
queue_extension_t *q = n->extension; queue_extension_t *q = n->extension;
parsed_message_t p;
size_t msglen = sexp_length(m); if (parse_post(m, &p)) {
sexp_t *args; sexp_enqueue(q->backlog_q, p.post.body);
cmsg_bytes_t selector;
if (msglen == 0 || !sexp_stringp(sexp_head(m))) {
warn("Invalid message in queue\n");
return;
}
selector = sexp_data(sexp_head(m));
args = sexp_tail(m);
if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) {
sexp_enqueue(q->backlog_q, sexp_listref(args, 1));
throck_shovel(q); throck_shovel(q);
return; return;
} }
if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { if (parse_subscribe(m, &p)) {
subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, args); subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, &p);
if (sub != NULL) { if (sub != NULL) {
enqueue(&q->waiter_q, sub); enqueue(&q->waiter_q, sub);
throck_shovel(q); throck_shovel(q);
@ -184,14 +174,13 @@ static void queue_handle_message(node_t *n, sexp_t *m) {
return; return;
} }
if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { if (parse_unsubscribe(m, &p)) {
handle_unsubscribe_message(q->name, &q->subscriptions, args); handle_unsubscribe_message(q->name, &q->subscriptions, &p);
return; return;
} }
warn("Message not understood in queue; selector <<%.*s>>, length %u\n", warn("Message not understood in queue: ");
selector.len, selector.bytes, sexp_writeln(stderr_h, m);
msglen);
} }
static node_class_t queue_class = { static node_class_t queue_class = {

View File

@ -30,6 +30,7 @@ typedef unsigned char u_char;
#include "sexpio.h" #include "sexpio.h"
#include "hashtable.h" #include "hashtable.h"
#include "node.h" #include "node.h"
#include "messages.h"
#define WANT_MESSAGE_TRACE 0 #define WANT_MESSAGE_TRACE 0
@ -78,12 +79,11 @@ static node_class_t relay_class = {
.handle_message = relay_handle_message .handle_message = relay_handle_message
}; };
static void send_error(IOHandle *h, char const *message, sexp_t *extra) { static void send_error(IOHandle *h, char const *message, sexp_t *details) {
sexp_t *m = extra; sexp_t *m = message_error(sexp_cstring(message), details);
m = sexp_cons(sexp_cstring(message), m);
m = sexp_cons(sexp_cstring("error"), m);
INCREF(m); INCREF(m);
warn("Sending error: ");
sexp_writeln(stderr_h, m);
iohandle_clear_error(h); iohandle_clear_error(h);
BCHECK(!sexp_write(h, m), "send_error sexp_write"); BCHECK(!sexp_write(h, m), "send_error sexp_write");
DECREF(m, sexp_destructor); DECREF(m, sexp_destructor);
@ -92,13 +92,14 @@ static void send_error(IOHandle *h, char const *message, sexp_t *extra) {
static void send_sexp_syntax_error(IOHandle *h, char const *message) { static void send_sexp_syntax_error(IOHandle *h, char const *message) {
char const *url = "http://people.csail.mit.edu/rivest/Sexp.txt"; char const *url = "http://people.csail.mit.edu/rivest/Sexp.txt";
send_error(h, message, sexp_cons(sexp_cstring(url), NULL)); send_error(h, message, sexp_cstring(url));
} }
static void relay_main(node_t *n) { static void relay_main(node_t *n) {
relay_extension_t *r = n->extension; relay_extension_t *r = n->extension;
IOHandle *inh = new_iohandle(r->fd); IOHandle *inh = new_iohandle(r->fd);
sexp_t *message = NULL; /* held */ sexp_t *message = NULL; /* held */
parsed_message_t p;
INCREF(n); /* because the caller doesn't hold a ref, and we need to INCREF(n); /* because the caller doesn't hold a ref, and we need to
drop ours on our death */ drop ours on our death */
@ -109,13 +110,9 @@ static void relay_main(node_t *n) {
ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting"); ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting");
{ {
sexp_t *s = NULL; sexp_t *s = message_subscribe(sexp_bytes(local_container_name()),
s = sexp_cons(sexp_empty_bytes, s); sexp_empty_bytes, sexp_empty_bytes,
s = sexp_cons(sexp_empty_bytes, s); sexp_empty_bytes, sexp_empty_bytes);
s = sexp_cons(sexp_empty_bytes, s);
s = sexp_cons(sexp_empty_bytes, s);
s = sexp_cons(sexp_bytes(local_container_name()), s);
s = sexp_cons(sexp_cstring("subscribe"), s);
INCREF(s); INCREF(s);
sexp_write(r->outh, s); sexp_write(r->outh, s);
DECREF(s, sexp_destructor); DECREF(s, sexp_destructor);
@ -134,54 +131,36 @@ static void relay_main(node_t *n) {
sexp_writeln(stderr_h, message); sexp_writeln(stderr_h, message);
#endif #endif
if (!(sexp_pairp(message) && sexp_stringp(sexp_head(message)))) { if (parse_post(message, &p) && sexp_stringp(p.post.name)) {
info("Ill-formed message\n"); cmsg_bytes_t nodename = sexp_data(p.post.name);
send_error(r->outh, "ill-formed message", NULL); if (!send_node(nodename, p.post.body)) {
goto protocol_error; warn("Was asked to post to unknown node <<%.*s>>\n", nodename.len, nodename.bytes);
}
{
cmsg_bytes_t selector = sexp_data(sexp_head(message));
sexp_t *args = sexp_tail(message);
size_t msglen = sexp_length(message);
/* TODO: have constant atoms for post, subscribe, unsubscribe
etc (see also post_node() and the handler in queue.c etc) */
if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post")) && (msglen == 4)
&& sexp_stringp(sexp_head(args))) {
cmsg_bytes_t nodename = sexp_data(sexp_head(args));
if (!send_node(nodename, sexp_head(sexp_tail(args)))) {
warn("Was asked to post to unknown node <<%.*s>>\n", nodename.len, nodename.bytes);
}
} else if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe")) && (msglen == 6)
&& sexp_stringp(sexp_head(args))
&& sexp_stringp(sexp_head(sexp_tail(sexp_tail(sexp_tail(args)))))
&& sexp_stringp(sexp_head(sexp_tail(sexp_tail(sexp_tail(sexp_tail(args))))))) {
sexp_t *filter_sexp = sexp_head(args);
cmsg_bytes_t filter = sexp_data(filter_sexp);
sexp_t *reply_sink = sexp_listref(args, 3);
sexp_t *reply_name = sexp_listref(args, 4);
if (bind_node(filter, n)) {
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(filter_sexp, NULL));
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes);
DECREF(r->remote_container_name, sexp_destructor);
r->remote_container_name = INCREF(filter_sexp);
} else {
warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes);
}
} else if (!cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe")) && (msglen == 2)
&& sexp_stringp(sexp_head(args))) {
sexp_t *id_sexp = sexp_head(args);
cmsg_bytes_t id = sexp_data(id_sexp);
if (!unbind_node(id)) {
warn("Unbind failed <<%.*s>>\n", id.len, id.bytes);
}
} else {
send_error(r->outh, "message not understood", sexp_cons(message, NULL));
goto protocol_error;
} }
} else if (parse_subscribe(message, &p)
&& sexp_stringp(p.subscribe.filter)
&& sexp_stringp(p.subscribe.reply_sink)
&& sexp_stringp(p.subscribe.reply_name)) {
if (bind_node(sexp_data(p.subscribe.filter), n)) {
post_node(sexp_data(p.subscribe.reply_sink),
sexp_data(p.subscribe.reply_name),
message_subscribe_ok(p.subscribe.filter),
sexp_empty_bytes);
DECREF(r->remote_container_name, sexp_destructor);
r->remote_container_name = INCREF(p.subscribe.filter);
} else {
cmsg_bytes_t filter = sexp_data(p.subscribe.filter);
warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes);
}
} else if (parse_unsubscribe(message, &p)
&& sexp_stringp(p.unsubscribe.token)) {
cmsg_bytes_t id = sexp_data(p.unsubscribe.token);
if (!unbind_node(id)) {
warn("Unbind failed <<%.*s>>\n", id.len, id.bytes);
}
} else {
send_error(r->outh, "message not understood", message);
goto protocol_error;
} }
} }

View File

@ -118,6 +118,11 @@ static inline sexp_t *sexp_pop(sexp_t *oldstack, sexp_t **valp) {
return nextstack; return nextstack;
} }
static inline int sexp_pseudo_pop(sexp_t **px) {
*px = sexp_tail(*px);
return sexp_pairp(*px);
}
static inline sexp_t *sexp_listtail(sexp_t *list, size_t dropcount) { static inline sexp_t *sexp_listtail(sexp_t *list, size_t dropcount) {
while (dropcount) { while (dropcount) {
list = sexp_tail(list); list = sexp_tail(list);

View File

@ -15,9 +15,10 @@
#include "ref.h" #include "ref.h"
#include "sexp.h" #include "sexp.h"
#include "hashtable.h" #include "hashtable.h"
#include "subscription.h"
#include "node.h" #include "node.h"
#include "meta.h" #include "meta.h"
#include "messages.h"
#include "subscription.h"
void free_subscription(subscription_t *sub) { void free_subscription(subscription_t *sub) {
DECREF(sub->uuid, sexp_destructor); DECREF(sub->uuid, sexp_destructor);
@ -79,25 +80,23 @@ subscription_t *send_to_subscription_chain(sexp_t *source,
subscription_t *handle_subscribe_message(sexp_t *source, subscription_t *handle_subscribe_message(sexp_t *source,
hashtable_t *subscriptions, hashtable_t *subscriptions,
sexp_t *args) parsed_message_t *p)
{ {
unsigned char uuid[CMSG_UUID_BUF_SIZE]; unsigned char uuid[CMSG_UUID_BUF_SIZE];
if (gen_uuid(uuid) != 0) { if (gen_uuid(uuid) != 0) {
warn("Could not generate UUID\n"); warn("Could not generate UUID\n");
return NULL; return NULL;
} else { } else {
sexp_t *reply_sink = sexp_listref(args, 3);
sexp_t *reply_name = sexp_listref(args, 4);
subscription_t *sub = malloc(sizeof(*sub)); subscription_t *sub = malloc(sizeof(*sub));
sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid)));
sub->filter = sexp_listref(args, 0); sub->filter = p->subscribe.filter;
sub->sink = sexp_listref(args, 1); sub->sink = p->subscribe.sink;
sub->name = sexp_listref(args, 2); sub->name = p->subscribe.name;
sub->link = NULL; sub->link = NULL;
if (!sexp_stringp(sub->filter) || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name) if (!sexp_stringp(sub->filter) || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name)
|| !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { || !sexp_stringp(p->subscribe.reply_sink) || !sexp_stringp(p->subscribe.reply_name)) {
DECREF(sub->uuid, sexp_destructor); DECREF(sub->uuid, sexp_destructor);
free(sub); free(sub);
warn("Bad sink/name/reply_sink/reply_name in subscribe"); warn("Bad sink/name/reply_sink/reply_name in subscribe");
@ -112,10 +111,10 @@ subscription_t *handle_subscribe_message(sexp_t *source,
announce_subscription(source, sub->filter, sub->sink, sub->name, 1); announce_subscription(source, sub->filter, sub->sink, sub->name, 1);
{ post_node(sexp_data(p->subscribe.reply_sink),
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); sexp_data(p->subscribe.reply_name),
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); message_subscribe_ok(sub->uuid),
} sexp_empty_bytes);
return sub; return sub;
} }
@ -123,17 +122,17 @@ subscription_t *handle_subscribe_message(sexp_t *source,
void handle_unsubscribe_message(sexp_t *source, void handle_unsubscribe_message(sexp_t *source,
hashtable_t *subscriptions, hashtable_t *subscriptions,
sexp_t *args) parsed_message_t *p)
{ {
cmsg_bytes_t uuid; cmsg_bytes_t uuid;
subscription_t *sub; subscription_t *sub;
if (!sexp_stringp(sexp_head(args))) { if (!sexp_stringp(p->unsubscribe.token)) {
warn("Invalid unsubscription\n"); warn("Invalid unsubscription\n");
return; return;
} }
uuid = sexp_data(sexp_head(args)); uuid = sexp_data(p->unsubscribe.token);
if (hashtable_get(subscriptions, uuid, (void **) &sub)) { if (hashtable_get(subscriptions, uuid, (void **) &sub)) {
/* TODO: clean up more eagerly perhaps? */ /* TODO: clean up more eagerly perhaps? */
announce_subscription(source, sub->filter, sub->sink, sub->name, 0); announce_subscription(source, sub->filter, sub->sink, sub->name, 0);

View File

@ -25,10 +25,10 @@ extern subscription_t *send_to_subscription_chain(sexp_t *source,
extern subscription_t *handle_subscribe_message(sexp_t *source, extern subscription_t *handle_subscribe_message(sexp_t *source,
hashtable_t *subscriptions, hashtable_t *subscriptions,
sexp_t *args); parsed_message_t *p);
extern void handle_unsubscribe_message(sexp_t *source, extern void handle_unsubscribe_message(sexp_t *source,
hashtable_t *subscriptions, hashtable_t *subscriptions,
sexp_t *args); parsed_message_t *p);
#endif #endif