First stab at "meta" exchange.

This commit is contained in:
Tony Garnock-Jones 2011-01-02 22:46:48 -05:00
parent b40930997c
commit a2aae0e938
12 changed files with 127 additions and 22 deletions

View File

@ -1,6 +1,6 @@
TARGET = cmsg
OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \
queue.o direct.o fanout.o subscription.o
queue.o direct.o fanout.o subscription.o meta.o
UUID_CFLAGS:=$(shell uuid-config --cflags)
UUID_LDFLAGS:=$(shell uuid-config --ldflags)

7
TODO
View File

@ -4,3 +4,10 @@ collision, so choose a new token until there's no collision.
SAX-style sexp reader/writer, so that we can do something sensible for
enormous (e.g. gigabyte-sized) messages.
The "meta" exchange probably wants to be a topic exchange. Or
something.
The "meta" exchange probably wants to emit how-things-are-now messages
when people subscribe to it, to get them started; a kind of
last-value-cache type thing.

View File

@ -59,7 +59,7 @@ static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) {
subscription_t *chain = NULL;
subscription_t *newchain;
hashtable_get(&d->routing_table, sexp_data(rk), (void **) &chain);
newchain = send_to_subscription_chain(&d->subscriptions, chain, body);
newchain = send_to_subscription_chain(d->name, &d->subscriptions, chain, body);
if (newchain != chain) {
hashtable_put(&d->routing_table, sexp_data(rk), newchain);
}
@ -91,7 +91,7 @@ static void direct_handle_message(node_t *n, sexp_t *m) {
}
if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) {
subscription_t *sub = handle_subscribe_message(&d->subscriptions, args);
subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, args);
if (sub != NULL) {
sexp_t *filter = sexp_listref(args, 0);
hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link);
@ -101,7 +101,7 @@ static void direct_handle_message(node_t *n, sexp_t *m) {
}
if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) {
handle_unsubscribe_message(&d->subscriptions, args);
handle_unsubscribe_message(d->name, &d->subscriptions, args);
return;
}

View File

@ -55,7 +55,7 @@ struct delivery_context {
static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) {
struct delivery_context *context = contextv;
subscription_t *sub = subv;
send_to_subscription(&context->f->subscriptions, sub, context->body);
send_to_subscription(context->f->name, &context->f->subscriptions, sub, context->body);
}
static void fanout_handle_message(node_t *n, sexp_t *m) {
@ -82,12 +82,12 @@ static void fanout_handle_message(node_t *n, sexp_t *m) {
}
if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) {
handle_subscribe_message(&f->subscriptions, args);
handle_subscribe_message(f->name, &f->subscriptions, args);
return;
}
if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) {
handle_unsubscribe_message(&f->subscriptions, args);
handle_unsubscribe_message(f->name, &f->subscriptions, args);
return;
}

2
main.c
View File

@ -23,6 +23,7 @@ typedef unsigned char u_char;
#include "queue.h"
#include "direct.h"
#include "fanout.h"
#include "meta.h"
#define WANT_CONSOLE_LISTENER 1
@ -105,6 +106,7 @@ int main(int argc, char *argv[]) {
init_queue();
init_direct();
init_fanout();
init_meta();
#if WANT_CONSOLE_LISTENER
spawn(console_listener, NULL);
#endif

46
meta.c Normal file
View File

@ -0,0 +1,46 @@
/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include "cmsg_private.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
#include "meta.h"
static sexp_t *meta_sym = NULL;
void init_meta(void) {
sexp_t *args;
meta_sym = INCREF(sexp_cstring("meta"));
args = INCREF(sexp_cons(meta_sym, NULL));
new_node(lookup_node_class(cmsg_cstring_bytes("direct")), args, NULL);
DECREF(args, sexp_destructor);
}
void announce_subscription(sexp_t *source,
sexp_t *filter,
sexp_t *sink,
sexp_t *name,
int onoff)
{
if (meta_sym != NULL) { /* use this as a proxy for whether meta has been initialized or not */
sexp_t *msg = NULL;
msg = sexp_cons(name, msg);
msg = sexp_cons(sink, msg);
msg = sexp_cons(filter, msg);
msg = sexp_cons(source, msg);
msg = sexp_cons(sexp_cstring(onoff ? "subscribed" : "unsubscribed"), msg);
INCREF(msg);
post_node(sexp_data(meta_sym), sexp_data(source), msg, NULL);
DECREF(msg, sexp_destructor);
}
}

12
meta.h Normal file
View File

@ -0,0 +1,12 @@
#ifndef cmsg_meta_h
#define cmsg_meta_h
extern void init_meta(void);
extern void announce_subscription(sexp_t *source,
sexp_t *filter,
sexp_t *sink,
sexp_t *name,
int onoff);
#endif

14
node.c
View File

@ -14,6 +14,7 @@
#include "sexpio.h"
#include "hashtable.h"
#include "node.h"
#include "meta.h"
static hashtable_t node_class_table;
static hashtable_t directory;
@ -92,13 +93,25 @@ node_t *lookup_node(cmsg_bytes_t name) {
return n;
}
static void announce_binding(cmsg_bytes_t name, int onoff) {
sexp_t *filter = sexp_bytes(name);
INCREF(filter);
announce_subscription(sexp_empty_bytes, filter, sexp_empty_bytes, sexp_empty_bytes, onoff);
DECREF(filter, sexp_destructor);
}
int bind_node(cmsg_bytes_t name, node_t *n) {
if (name.len == 0) {
warn("Binding to empty name forbidden\n");
return 0;
}
if (hashtable_contains(&directory, name)) {
return 0;
}
hashtable_put(&directory, name, n);
hashtable_put(&n->names, name, NULL);
info("Binding node <<%.*s>> of class %s\n", name.len, name.bytes, n->node_class->name);
announce_binding(name, 1);
return 1;
}
@ -111,6 +124,7 @@ int unbind_node(cmsg_bytes_t name) {
info("Unbinding node <<%.*s>> of class %s\n", name.len, name.bytes, n->node_class->name);
hashtable_erase(&n->names, name);
hashtable_erase(&directory, name);
announce_binding(name, 0);
return 1;
}
}

View File

@ -109,7 +109,7 @@ static void shoveller(void *qv) {
sexp_data(sub->name).len, sexp_data(sub->name).bytes);
*/
if (!send_to_subscription(&q->subscriptions, sub, body)) {
if (!send_to_subscription(q->name, &q->subscriptions, sub, body)) {
goto find_valid_waiter;
}
@ -169,7 +169,7 @@ static void queue_handle_message(node_t *n, sexp_t *m) {
}
if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) {
subscription_t *sub = handle_subscribe_message(&q->subscriptions, args);
subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, args);
if (sub != NULL) {
enqueue(&q->waiter_q, sub);
throck_shovel(q);
@ -178,7 +178,7 @@ static void queue_handle_message(node_t *n, sexp_t *m) {
}
if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) {
handle_unsubscribe_message(&q->subscriptions, args);
handle_unsubscribe_message(q->name, &q->subscriptions, args);
return;
}

View File

@ -17,9 +17,11 @@
#include "hashtable.h"
#include "subscription.h"
#include "node.h"
#include "meta.h"
void free_subscription(subscription_t *sub) {
DECREF(sub->uuid, sexp_destructor);
DECREF(sub->filter, sexp_destructor);
DECREF(sub->sink, sexp_destructor);
DECREF(sub->name, sexp_destructor);
free(sub);
@ -35,7 +37,8 @@ void free_subscription_chain(subscription_t *chain) {
/* Returns true if the subscription has not been unsubscribed and the
destination of the subscription exists. */
int send_to_subscription(hashtable_t *subscriptions,
int send_to_subscription(sexp_t *source,
hashtable_t *subscriptions,
subscription_t *sub,
sexp_t *body)
{
@ -43,6 +46,7 @@ int send_to_subscription(hashtable_t *subscriptions,
free_subscription(sub);
return 0;
} else if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid)) {
announce_subscription(source, sub->filter, sub->sink, sub->name, 0);
hashtable_erase(subscriptions, sexp_data(sub->uuid));
free_subscription(sub);
return 0;
@ -51,7 +55,8 @@ int send_to_subscription(hashtable_t *subscriptions,
}
}
subscription_t *send_to_subscription_chain(hashtable_t *subscriptions,
subscription_t *send_to_subscription_chain(sexp_t *source,
hashtable_t *subscriptions,
subscription_t *chain,
sexp_t *body)
{
@ -59,7 +64,7 @@ subscription_t *send_to_subscription_chain(hashtable_t *subscriptions,
subscription_t *prev = NULL;
while (chain != NULL) {
subscription_t *next = chain->link;
if (!send_to_subscription(subscriptions, chain, body)) {
if (!send_to_subscription(source, subscriptions, chain, body)) {
if (prev == NULL) {
top = next;
} else {
@ -72,24 +77,26 @@ subscription_t *send_to_subscription_chain(hashtable_t *subscriptions,
return top;
}
subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *args) {
subscription_t *handle_subscribe_message(sexp_t *source,
hashtable_t *subscriptions,
sexp_t *args)
{
unsigned char uuid[CMSG_UUID_BUF_SIZE];
if (gen_uuid(uuid) != 0) {
warn("Could not generate UUID\n");
return NULL;
} else {
sexp_t *filter = sexp_listref(args, 0);
sexp_t *reply_sink = sexp_listref(args, 3);
sexp_t *reply_name = sexp_listref(args, 4);
subscription_t *sub = malloc(sizeof(*sub));
sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid)));
sub->filter = sexp_listref(args, 0);
sub->sink = sexp_listref(args, 1);
sub->name = sexp_listref(args, 2);
sub->link = NULL;
if (!sexp_stringp(filter)
|| !sexp_stringp(sub->sink) || !sexp_stringp(sub->name)
if (!sexp_stringp(sub->filter) || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name)
|| !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) {
DECREF(sub->uuid, sexp_destructor);
free(sub);
@ -97,11 +104,14 @@ subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *arg
return NULL;
}
INCREF(sub->filter);
INCREF(sub->sink);
INCREF(sub->name);
hashtable_put(subscriptions, sexp_data(sub->uuid), sub);
announce_subscription(source, sub->filter, sub->sink, sub->name, 1);
{
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL));
INCREF(subok);
@ -113,7 +123,10 @@ subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *arg
}
}
void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args) {
void handle_unsubscribe_message(sexp_t *source,
hashtable_t *subscriptions,
sexp_t *args)
{
cmsg_bytes_t uuid;
subscription_t *sub;
@ -125,6 +138,7 @@ void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args) {
uuid = sexp_data(sexp_head(args));
if (hashtable_get(subscriptions, uuid, (void **) &sub)) {
/* TODO: clean up more eagerly perhaps? */
announce_subscription(source, sub->filter, sub->sink, sub->name, 0);
DECREF(sub->uuid, sexp_destructor);
sub->uuid = NULL;
hashtable_erase(subscriptions, uuid);

View File

@ -3,6 +3,7 @@
typedef struct subscription_t_ {
sexp_t *uuid;
sexp_t *filter;
sexp_t *sink;
sexp_t *name;
struct subscription_t_ *link;
@ -11,14 +12,21 @@ typedef struct subscription_t_ {
extern void free_subscription(subscription_t *sub);
extern void free_subscription_chain(subscription_t *chain);
extern int send_to_subscription(hashtable_t *subscriptions,
extern int send_to_subscription(sexp_t *source,
hashtable_t *subscriptions,
subscription_t *sub,
sexp_t *body);
extern subscription_t *send_to_subscription_chain(hashtable_t *subscriptions,
extern subscription_t *send_to_subscription_chain(sexp_t *source,
hashtable_t *subscriptions,
subscription_t *chain,
sexp_t *body);
extern subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *args);
extern void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args);
extern subscription_t *handle_subscribe_message(sexp_t *source,
hashtable_t *subscriptions,
sexp_t *args);
extern void handle_unsubscribe_message(sexp_t *source,
hashtable_t *subscriptions,
sexp_t *args);
#endif

2
t0 Normal file
View File

@ -0,0 +1,2 @@
(9:subscribe5:test00:0:5:test05:login)
(4:post4:meta(9:subscribe0:5:test08:presence5:test01:k)0:)