Factor out commonality in subscription management

This commit is contained in:
Tony Garnock-Jones 2011-01-02 21:23:43 -05:00
parent 62ff086c7b
commit 0cdf9f6e68
7 changed files with 184 additions and 205 deletions

View File

@ -1,6 +1,6 @@
TARGET = cmsg
OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \
queue.o direct.o fanout.o
queue.o direct.o fanout.o subscription.o
UUID_CFLAGS:=$(shell uuid-config --cflags)
UUID_LDFLAGS:=$(shell uuid-config --ldflags)

2
TODO
View File

@ -2,7 +2,5 @@ Cope with possibility of duplicate uuid, in e.g. queue/fanout/direct.
If a subscription token matches an existing subscription, there's a
collision, so choose a new token until there's no collision.
Factor out commonality in subscription-management from queue/fanout/direct.
SAX-style sexp reader/writer, so that we can do something sensible for
enormous (e.g. gigabyte-sized) messages.

View File

@ -17,6 +17,7 @@
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
#include "subscription.h"
typedef struct direct_extension_t_ {
sexp_t *name;
@ -24,29 +25,6 @@ typedef struct direct_extension_t_ {
hashtable_t subscriptions;
} direct_extension_t;
typedef struct subscription_t_ {
sexp_t *uuid;
sexp_t *sink;
sexp_t *name;
struct subscription_t_ *link;
} subscription_t;
static void free_subscription(subscription_t *sub) {
DECREF(sub->uuid, sexp_destructor);
DECREF(sub->sink, sexp_destructor);
DECREF(sub->name, sexp_destructor);
free(sub);
}
static void free_subscription_chain(void *context, cmsg_bytes_t key, void *value) {
subscription_t *chain = value;
while (chain != NULL) {
subscription_t *next = chain->link;
free_subscription(chain);
chain = next;
}
}
static sexp_t *direct_extend(node_t *n, sexp_t *args) {
if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) {
cmsg_bytes_t name = sexp_data(sexp_head(args));
@ -62,38 +40,28 @@ static sexp_t *direct_extend(node_t *n, sexp_t *args) {
}
}
static void free_direct_chain(void *context, cmsg_bytes_t key, void *value) {
free_subscription_chain(value);
}
static void direct_destructor(node_t *n) {
direct_extension_t *d = n->extension;
if (d != NULL) { /* can be NULL if direct_extend was given invalid args */
DECREF(d->name, sexp_destructor);
hashtable_foreach(&d->routing_table, free_subscription_chain, NULL);
hashtable_foreach(&d->routing_table, free_direct_chain, NULL);
destroy_hashtable(&d->routing_table);
destroy_hashtable(&d->subscriptions);
free(d);
}
}
static int send_to_sub(subscription_t *sub, sexp_t *body) {
return post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid);
}
static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) {
subscription_t *chain = NULL;
subscription_t *prev = NULL;
subscription_t *newchain;
hashtable_get(&d->routing_table, sexp_data(rk), (void **) &chain);
while (chain != NULL) {
subscription_t *next = chain->link;
if (!send_to_sub(chain, body)) { /* Destination no longer exists. */
info("Destination not found\n");
if (prev == NULL) {
hashtable_put(&d->routing_table, sexp_data(rk), chain->link);
} else {
prev->link = chain->link;
}
chain->link = NULL;
free_subscription(chain);
}
chain = next;
newchain = send_to_subscription_chain(&d->subscriptions, chain, body);
if (newchain != chain) {
hashtable_put(&d->routing_table, sexp_data(rk), newchain);
}
}
@ -120,51 +88,14 @@ static void direct_handle_message(node_t *n, sexp_t *m) {
warn("Non-string routing key in direct\n");
}
} else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) {
unsigned char uuid[CMSG_UUID_BUF_SIZE];
if (gen_uuid(uuid) != 0) {
warn("Could not generate UUID\n");
} else {
subscription_t *sub = handle_subscribe_message(&d->subscriptions, args);
if (sub != NULL) {
sexp_t *filter = sexp_listref(args, 0);
sexp_t *reply_sink = sexp_listref(args, 3);
sexp_t *reply_name = sexp_listref(args, 4);
subscription_t *sub = malloc(sizeof(*sub));
sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid)));
sub->sink = sexp_listref(args, 1);
sub->name = sexp_listref(args, 2);
sub->link = NULL;
if (!sexp_stringp(filter)
|| !sexp_stringp(sub->sink) || !sexp_stringp(sub->name)
|| !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) {
DECREF(sub->uuid, sexp_destructor);
free(sub);
warn("Bad sink/name/reply_sink/reply_name in subscribe");
} else {
INCREF(sub->sink);
INCREF(sub->name);
hashtable_put(&d->subscriptions, sexp_data(sub->uuid), sub);
hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link);
hashtable_put(&d->routing_table, sexp_data(filter), sub);
{
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL));
INCREF(subok);
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes);
DECREF(subok, sexp_destructor);
}
}
hashtable_get(&d->routing_table, sexp_data(filter), (void **) &sub->link);
hashtable_put(&d->routing_table, sexp_data(filter), sub);
}
} else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) {
if (sexp_stringp(sexp_head(args))) {
cmsg_bytes_t uuid = sexp_data(sexp_head(args));
subscription_t *sub;
if (hashtable_get(&d->subscriptions, uuid, (void **) &sub)) {
/* TODO: clean up more eagerly perhaps? */
hashtable_erase(&d->subscriptions, uuid);
DECREF(sub->uuid, sexp_destructor);
sub->uuid = NULL;
}
} else {
warn("Invalid unsubscription\n");
}
handle_unsubscribe_message(&d->subscriptions, args);
} else {
warn("Message not understood in direct; selector <<%.*s>>, length %u\n",
selector.len, selector.bytes,

View File

@ -17,32 +17,19 @@
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
#include "subscription.h"
typedef struct fanout_extension_t_ {
sexp_t *name;
hashtable_t subscriptions;
} fanout_extension_t;
typedef struct subscription_t_ {
sexp_t *uuid;
sexp_t *sink;
sexp_t *name;
} subscription_t;
static void free_subscription(void *value) {
subscription_t *sub = value;
DECREF(sub->uuid, sexp_destructor);
DECREF(sub->sink, sexp_destructor);
DECREF(sub->name, sexp_destructor);
free(sub);
}
static sexp_t *fanout_extend(node_t *n, sexp_t *args) {
if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) {
cmsg_bytes_t name = sexp_data(sexp_head(args));
fanout_extension_t *f = calloc(1, sizeof(*f));
f->name = INCREF(sexp_head(args));
init_hashtable(&f->subscriptions, 5, NULL, free_subscription);
init_hashtable(&f->subscriptions, 5, NULL, (void (*)(void *)) free_subscription);
n->extension = f;
return bind_node(name, n) ? NULL : sexp_cstring("bind failed");
@ -68,9 +55,7 @@ struct delivery_context {
static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) {
struct delivery_context *context = contextv;
subscription_t *sub = subv;
if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), context->body, sub->uuid)) {
hashtable_erase(&context->f->subscriptions, sexp_data(sub->uuid));
}
send_to_subscription(&context->f->subscriptions, sub, context->body);
}
static void fanout_handle_message(node_t *n, sexp_t *m) {
@ -94,40 +79,9 @@ static void fanout_handle_message(node_t *n, sexp_t *m) {
context.body = sexp_listref(args, 1);
hashtable_foreach(&f->subscriptions, send_to_sub, &context);
} else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) {
unsigned char uuid[CMSG_UUID_BUF_SIZE];
if (gen_uuid(uuid) != 0) {
warn("Could not generate UUID\n");
} else {
sexp_t *reply_sink = sexp_listref(args, 3);
sexp_t *reply_name = sexp_listref(args, 4);
subscription_t *sub = malloc(sizeof(*sub));
sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid)));
sub->sink = sexp_listref(args, 1);
sub->name = sexp_listref(args, 2);
if (!sexp_stringp(sub->sink) || !sexp_stringp(sub->name)
|| !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) {
DECREF(sub->uuid, sexp_destructor);
free(sub);
warn("Bad sink/name/reply_sink/reply_name in subscribe");
} else {
INCREF(sub->sink);
INCREF(sub->name);
hashtable_put(&f->subscriptions, sexp_data(sub->uuid), sub);
{
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL));
INCREF(subok);
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes);
DECREF(subok, sexp_destructor);
}
}
}
handle_subscribe_message(&f->subscriptions, args);
} else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) {
if (sexp_stringp(sexp_head(args))) {
cmsg_bytes_t uuid = sexp_data(sexp_head(args));
hashtable_erase(&f->subscriptions, uuid);
} else {
warn("Invalid unsubscription\n");
}
handle_unsubscribe_message(&f->subscriptions, args);
} else {
warn("Message not understood in fanout; selector <<%.*s>>, length %u\n",
selector.len, selector.bytes,

74
queue.c
View File

@ -20,6 +20,7 @@
#include "node.h"
#include "queue.h"
#include "dataq.h"
#include "subscription.h"
typedef struct queue_extension_t_ {
sexp_t *name;
@ -30,20 +31,6 @@ typedef struct queue_extension_t_ {
int shovel_awake;
} queue_extension_t;
typedef struct subscription_t_ {
sexp_t *uuid;
sexp_t *sink;
sexp_t *name;
struct subscription_t_ *link;
} subscription_t;
static void free_subscription(subscription_t *sub) {
DECREF(sub->uuid, sexp_destructor);
DECREF(sub->sink, sexp_destructor);
DECREF(sub->name, sexp_destructor);
free(sub);
}
static sexp_t *queue_extend(node_t *n, sexp_t *args) {
if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) {
cmsg_bytes_t name = sexp_data(sexp_head(args));
@ -79,10 +66,6 @@ static void queue_destructor(node_t *n) {
}
}
static int send_to_waiter(subscription_t *sub, sexp_t *body) {
return post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid);
}
static void end_burst(queue_extension_t *q, size_t *burst_count_ptr, size_t total_count) {
if (*burst_count_ptr > 0) {
info("Queue <<%.*s>>: burst count %lu; total %lu\n",
@ -98,7 +81,6 @@ static void shoveller(void *qv) {
size_t burst_count = 0;
size_t total_count = 0;
sexp_t *body = NULL; /* held */
queue_t examined;
subscription_t *sub = NULL;
check_for_work:
@ -110,14 +92,12 @@ static void shoveller(void *qv) {
}
body = INCREF(sexp_dequeue(q->backlog_q)); /* held */
examined = EMPTY_QUEUE(subscription_t, link);
find_valid_waiter:
if (q->waiter_q.count == 0) {
//info("No waiters\n");
sexp_queue_pushback(q->backlog_q, body);
DECREF(body, sexp_destructor);
q->waiter_q = examined;
goto wait_and_shovel;
}
@ -129,10 +109,7 @@ static void shoveller(void *qv) {
sexp_data(sub->name).len, sexp_data(sub->name).bytes);
*/
if ((sub->uuid == NULL) /* It has been unsubscribed. */
|| !send_to_waiter(sub, body)) { /* Destination no longer exists. */
info((sub->uuid == NULL) ? "Waiter was unsubscribed\n" : "Destination not found\n");
free_subscription(sub);
if (!send_to_subscription(&q->subscriptions, sub, body)) {
goto find_valid_waiter;
}
@ -141,7 +118,6 @@ static void shoveller(void *qv) {
//info("Delivery successful\n");
DECREF(body, sexp_destructor);
queue_append(&q->waiter_q, &examined);
enqueue(&q->waiter_q, sub);
if (burst_count >= 10000) {
@ -190,49 +166,13 @@ static void queue_handle_message(node_t *n, sexp_t *m) {
sexp_enqueue(q->backlog_q, sexp_listref(args, 1));
throck_shovel(q);
} else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) {
unsigned char uuid[CMSG_UUID_BUF_SIZE];
if (gen_uuid(uuid) != 0) {
warn("Could not generate UUID\n");
} else {
sexp_t *reply_sink = sexp_listref(args, 3);
sexp_t *reply_name = sexp_listref(args, 4);
subscription_t *sub = malloc(sizeof(*sub));
sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid)));
sub->sink = sexp_listref(args, 1);
sub->name = sexp_listref(args, 2);
sub->link = NULL;
if (!sexp_stringp(sub->sink) || !sexp_stringp(sub->name)
|| !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) {
DECREF(sub->uuid, sexp_destructor);
free(sub);
warn("Bad sink/name/reply_sink/reply_name in subscribe");
} else {
INCREF(sub->sink);
INCREF(sub->name);
hashtable_put(&q->subscriptions, sexp_data(sub->uuid), sub);
enqueue(&q->waiter_q, sub);
throck_shovel(q);
{
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL));
INCREF(subok);
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes);
DECREF(subok, sexp_destructor);
}
}
subscription_t *sub = handle_subscribe_message(&q->subscriptions, args);
if (sub != NULL) {
enqueue(&q->waiter_q, sub);
throck_shovel(q);
}
} else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) {
if (sexp_stringp(sexp_head(args))) {
cmsg_bytes_t uuid = sexp_data(sexp_head(args));
subscription_t *sub;
if (hashtable_get(&q->subscriptions, uuid, (void **) &sub)) {
/* TODO: clean up more eagerly perhaps? */
hashtable_erase(&q->subscriptions, uuid);
DECREF(sub->uuid, sexp_destructor);
sub->uuid = NULL;
}
} else {
warn("Invalid unsubscription\n");
}
handle_unsubscribe_message(&q->subscriptions, args);
} else {
warn("Message not understood in queue; selector <<%.*s>>, length %u\n",
selector.len, selector.bytes,

132
subscription.c Normal file
View File

@ -0,0 +1,132 @@
/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include <ucontext.h>
#include "cmsg_private.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "subscription.h"
#include "node.h"
void free_subscription(subscription_t *sub) {
DECREF(sub->uuid, sexp_destructor);
DECREF(sub->sink, sexp_destructor);
DECREF(sub->name, sexp_destructor);
free(sub);
}
void free_subscription_chain(subscription_t *chain) {
while (chain != NULL) {
subscription_t *next = chain->link;
free_subscription(chain);
chain = next;
}
}
/* Returns true if the subscription has not been unsubscribed and the
destination of the subscription exists. */
int send_to_subscription(hashtable_t *subscriptions,
subscription_t *sub,
sexp_t *body)
{
if (sub->uuid == NULL) {
free_subscription(sub);
return 0;
} else if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid)) {
hashtable_erase(subscriptions, sexp_data(sub->uuid));
free_subscription(sub);
return 0;
} else {
return 1;
}
}
subscription_t *send_to_subscription_chain(hashtable_t *subscriptions,
subscription_t *chain,
sexp_t *body)
{
subscription_t *top = chain;
subscription_t *prev = NULL;
while (chain != NULL) {
subscription_t *next = chain->link;
if (!send_to_subscription(subscriptions, chain, body)) {
if (prev == NULL) {
top = next;
} else {
prev->link = next;
}
}
prev = chain;
chain = next;
}
return top;
}
subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *args) {
unsigned char uuid[CMSG_UUID_BUF_SIZE];
if (gen_uuid(uuid) != 0) {
warn("Could not generate UUID\n");
return NULL;
} else {
sexp_t *filter = sexp_listref(args, 0);
sexp_t *reply_sink = sexp_listref(args, 3);
sexp_t *reply_name = sexp_listref(args, 4);
subscription_t *sub = malloc(sizeof(*sub));
sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid)));
sub->sink = sexp_listref(args, 1);
sub->name = sexp_listref(args, 2);
sub->link = NULL;
if (!sexp_stringp(filter)
|| !sexp_stringp(sub->sink) || !sexp_stringp(sub->name)
|| !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) {
DECREF(sub->uuid, sexp_destructor);
free(sub);
warn("Bad sink/name/reply_sink/reply_name in subscribe");
return NULL;
}
INCREF(sub->sink);
INCREF(sub->name);
hashtable_put(subscriptions, sexp_data(sub->uuid), sub);
{
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL));
INCREF(subok);
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes);
DECREF(subok, sexp_destructor);
}
return sub;
}
}
void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args) {
cmsg_bytes_t uuid;
subscription_t *sub;
if (!sexp_stringp(sexp_head(args))) {
warn("Invalid unsubscription\n");
return;
}
uuid = sexp_data(sexp_head(args));
if (hashtable_get(subscriptions, uuid, (void **) &sub)) {
/* TODO: clean up more eagerly perhaps? */
DECREF(sub->uuid, sexp_destructor);
sub->uuid = NULL;
hashtable_erase(subscriptions, uuid);
}
}

24
subscription.h Normal file
View File

@ -0,0 +1,24 @@
#ifndef cmsg_subscription_h
#define cmsg_subscription_h
typedef struct subscription_t_ {
sexp_t *uuid;
sexp_t *sink;
sexp_t *name;
struct subscription_t_ *link;
} subscription_t;
extern void free_subscription(subscription_t *sub);
extern void free_subscription_chain(subscription_t *chain);
extern int send_to_subscription(hashtable_t *subscriptions,
subscription_t *sub,
sexp_t *body);
extern subscription_t *send_to_subscription_chain(hashtable_t *subscriptions,
subscription_t *chain,
sexp_t *body);
extern subscription_t *handle_subscribe_message(hashtable_t *subscriptions, sexp_t *args);
extern void handle_unsubscribe_message(hashtable_t *subscriptions, sexp_t *args);
#endif