/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ #include #include #include #include #include #include #include #include #include "cmsg_private.h" #include "harness.h" #include "ref.h" #include "sexp.h" #include "sexpio.h" #include "hashtable.h" #include "node.h" #include "queue.h" #include "dataq.h" typedef struct queue_extension_t_ { sexp_t *backlog_q; queue_t waiter_q; hashtable_t subscriptions; Process *shovel; int shovel_awake; } queue_extension_t; typedef struct subscription_t_ { sexp_t *uuid; sexp_t *sink; sexp_t *name; struct subscription_t_ *link; } subscription_t; static void free_subscription(subscription_t *sub) { DECREF(sub->uuid, sexp_destructor); DECREF(sub->sink, sexp_destructor); DECREF(sub->name, sexp_destructor); free(sub); } static sexp_t *queue_extend(node_t *n, sexp_t *args) { if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { cmsg_bytes_t name = sexp_data(sexp_head(args)); queue_extension_t *q = calloc(1, sizeof(*q)); q->backlog_q = INCREF(sexp_new_queue()); q->waiter_q = EMPTY_QUEUE(subscription_t, link); init_hashtable(&q->subscriptions, 5, NULL, NULL); q->shovel = NULL; q->shovel_awake = 0; n->extension = q; return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); } else { return sexp_cstring("invalid args"); } } static void queue_destructor(node_t *n) { queue_extension_t *q = n->extension; if (q != NULL) { /* can be NULL if queue_extend was given invalid args */ DECREF(q->backlog_q, sexp_destructor); /* q->waiter_q will be automatically destroyed as part of the destruction of q->subscriptions */ destroy_hashtable(&q->subscriptions); warn("TODO: the shovel needs to be taken down as well here\n"); free(q); } } static int send_to_waiter(subscription_t *sub, sexp_t *body) { return post_node(sexp_data(sub->sink), sexp_data(sub->name), body); } static void shoveller(void *qv) { queue_extension_t *q = qv; sexp_t *body = NULL; /* held */ queue_t examined; subscription_t *sub = NULL; check_for_work: info("Checking for work\n"); if (sexp_queue_emptyp(q->backlog_q)) { info("Backlog empty\n"); goto wait_and_shovel; } body = INCREF(sexp_dequeue(q->backlog_q)); /* held */ examined = EMPTY_QUEUE(subscription_t, link); find_valid_waiter: if (q->waiter_q.count == 0) { info("No waiters\n"); sexp_queue_pushback(q->backlog_q, body); DECREF(body, sexp_destructor); q->waiter_q = examined; goto wait_and_shovel; } sub = dequeue(&q->waiter_q); info("Delivering to <<%.*s>>/<<%.*s>>...\n", sexp_data(sub->sink).len, sexp_data(sub->sink).bytes, sexp_data(sub->name).len, sexp_data(sub->name).bytes); if ((sub->uuid == NULL) /* It has been unsubscribed. */ || !send_to_waiter(sub, body)) { /* Destination no longer exists. */ info((sub->uuid == NULL) ? "Waiter was unsubscribed\n" : "Destination not found\n"); free_subscription(sub); goto find_valid_waiter; } info("Delivery successful\n"); DECREF(body, sexp_destructor); queue_append(&q->waiter_q, &examined); enqueue(&q->waiter_q, sub); goto check_for_work; wait_and_shovel: info("Waiting for throck\n"); q->shovel_awake = 0; suspend(); info("Throck received!\n"); goto check_for_work; } static void throck_shovel(queue_extension_t *q) { if (!q->shovel_awake) { if (!q->shovel) { q->shovel = spawn(shoveller, q); } else { resume(q->shovel); } q->shovel_awake = 1; } } static void queue_handle_message(node_t *n, sexp_t *m) { queue_extension_t *q = n->extension; size_t msglen = sexp_length(m); sexp_t *args; cmsg_bytes_t selector; if (msglen == 0 || !sexp_stringp(sexp_head(m))) { warn("Invalid message in factory\n"); return; } selector = sexp_data(sexp_head(m)); args = sexp_tail(m); if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) { sexp_enqueue(q->backlog_q, sexp_listref(args, 1)); throck_shovel(q); } else if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { unsigned char uuid[CMSG_UUID_BUF_SIZE]; if (gen_uuid(uuid) != 0) { warn("Could not generate UUID\n"); } else { sexp_t *reply_sink = sexp_listref(args, 3); sexp_t *reply_name = sexp_listref(args, 4); subscription_t *sub = malloc(sizeof(*sub)); sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); sub->sink = sexp_listref(args, 1); sub->name = sexp_listref(args, 2); sub->link = NULL; if (!sexp_stringp(sub->sink) || !sexp_stringp(sub->name) || !sexp_stringp(reply_sink) || !sexp_stringp(reply_name)) { DECREF(sub->uuid, sexp_destructor); warn("Bad sink/name/reply_sink/replay_name in subscribe"); } else { INCREF(sub->sink); INCREF(sub->name); hashtable_put(&q->subscriptions, sexp_data(sub->uuid), sub); enqueue(&q->waiter_q, sub); throck_shovel(q); { sexp_t *subok = sexp_cons(sexp_cstring("subscribe-ok"), sexp_cons(sub->uuid, NULL)); INCREF(subok); post_node(sexp_data(reply_sink), sexp_data(reply_name), subok); DECREF(subok, sexp_destructor); } } } } else if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { if (sexp_stringp(sexp_head(args))) { cmsg_bytes_t uuid = sexp_data(sexp_head(args)); subscription_t *sub; if (hashtable_get(&q->subscriptions, uuid, (void **) &sub)) { /* TODO: clean up more eagerly perhaps? */ hashtable_erase(&q->subscriptions, uuid); DECREF(sub->uuid, sexp_destructor); sub->uuid = NULL; } } else { warn("Invalid unsubscription\n"); } } else { warn("Message not understood in queue; selector <<%.*s>>, length %u\n", selector.len, selector.bytes, msglen); } } static node_class_t queue_class = { .name = "queue", .extend = queue_extend, .destroy = queue_destructor, .handle_message = queue_handle_message }; void init_queue(void) { register_node_class(&queue_class); }