Add forgotten token to delivery posts

This commit is contained in:
Tony Garnock-Jones 2011-01-02 13:58:18 -05:00
parent 334c532a9b
commit 9fcffa8083
8 changed files with 29 additions and 14 deletions

7
main.c
View File

@ -57,7 +57,7 @@ static void factory_handle_message(node_t *n, sexp_t *m) {
reply = sexp_cons(sexp_cstring("create-failed"), sexp_cons(error, NULL));
}
INCREF(reply);
post_node(sexp_data(reply_sink), sexp_data(reply_name), reply);
post_node(sexp_data(reply_sink), sexp_data(reply_name), reply, sexp_empty_bytes);
DECREF(reply, sexp_destructor);
}
}
@ -97,6 +97,7 @@ int main(int argc, char *argv[]) {
event_init();
signal(SIGPIPE, SIG_IGN); /* avoid EPIPE when connections drop unexpectedly */
info("Using libevent version %s\n", event_get_version());
init_sexp();
init_node();
init_factory();
init_queue();
@ -105,8 +106,6 @@ int main(int argc, char *argv[]) {
#endif
start_net(5671);
boot_harness();
#ifndef NDEBUG
release_sexp_cache();
#endif
done_sexp();
return 0;
}

3
node.c
View File

@ -126,7 +126,7 @@ void unbind_all_names_for_node(node_t *n) {
destroy_hashtable(&names);
}
int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body) {
int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token) {
static sexp_t *post_atom = NULL;
sexp_t *msg = NULL;
int result;
@ -135,6 +135,7 @@ int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body) {
post_atom = INCREF(sexp_cstring("post"));
}
msg = sexp_cons(token, msg);
msg = sexp_cons(body, msg);
msg = sexp_cons(sexp_bytes(name), msg);
msg = sexp_cons(post_atom, msg);

2
node.h
View File

@ -34,7 +34,7 @@ extern int bind_node(cmsg_bytes_t name, node_t *n);
extern int unbind_node(cmsg_bytes_t name);
extern void unbind_all_names_for_node(node_t *n);
extern int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body);
extern int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token);
extern int send_node(cmsg_bytes_t node, sexp_t *message);
#endif

View File

@ -72,7 +72,7 @@ static void queue_destructor(node_t *n) {
}
static int send_to_waiter(subscription_t *sub, sexp_t *body) {
return post_node(sexp_data(sub->sink), sexp_data(sub->name), body);
return post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid);
}
static void shoveller(void *qv) {
@ -185,7 +185,7 @@ static void queue_handle_message(node_t *n, sexp_t *m) {
{
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL));
INCREF(subok);
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok);
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes);
DECREF(subok, sexp_destructor);
}
}

View File

@ -149,7 +149,7 @@ static void relay_main(node_t *n) {
if (bind_node(filter, n)) {
sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(filter_sexp, NULL));
INCREF(subok);
post_node(reply_sink, reply_name, subok);
post_node(reply_sink, reply_name, subok, sexp_empty_bytes);
DECREF(subok, sexp_destructor);
} else {
warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes);

12
sexp.c
View File

@ -10,8 +10,18 @@
static sexp_t *freelist = NULL;
void release_sexp_cache(void) {
sexp_t *sexp_empty_bytes = NULL;
void init_sexp(void) {
sexp_empty_bytes = INCREF(sexp_cstring(""));
}
void done_sexp(void) {
int count = 0;
DECREF(sexp_empty_bytes, sexp_destructor);
sexp_empty_bytes = NULL;
while (freelist != NULL) {
sexp_t *x = freelist;
freelist = x->data.pair.tail;

5
sexp.h
View File

@ -30,7 +30,10 @@ typedef struct sexp_t_ {
} data;
} sexp_t;
extern void release_sexp_cache(void);
extern sexp_t *sexp_empty_bytes;
extern void init_sexp(void);
extern void done_sexp(void);
extern void sexp_data_destructor(sexp_data_t *data);
extern void sexp_destructor(sexp_t *x);

View File

@ -36,13 +36,16 @@ int main(int argc, char *argv[]) {
fprintf(f, "(9:subscribe5:test10:0:5:test15:login)(4:post7:factory(6:create5:queue(2:q1)5:test11:k)0:)(4:post2:q1(9:subscribe0:5:test18:consumer5:test11:k)0:)\n");
fflush(f);
#define MESSAGESIZE 65
while (1) {
char buf[1024];
size_t n = read(fd, buf, sizeof(buf));
if (n == 0) break;
if (n >= 7) {
if (!memcmp(buf, "(4:post", 7)) {
if (n >= 16) {
if (!memcmp(buf, "(4:post8:consumer", 16)) {
if (bytecount == -1) {
printf("Buffer at start: <<%.*s>>\n", (int) n, buf);
printf("Starting.\n");
bytecount = 0;
gettimeofday(&start_time, NULL);
@ -51,7 +54,6 @@ int main(int argc, char *argv[]) {
}
if (bytecount >= 0) {
bytecount += n;
#define MESSAGESIZE 26
if ((bytecount % 100000) < MESSAGESIZE) {
struct timeval now;
double delta;