/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ #include #include #include #include #include #include #include #include #include "cmsg_private.h" #include "harness.h" #include "ref.h" #include "sexp.h" #include "sexpio.h" #include "hashtable.h" #include "node.h" #include "queue.h" #include "dataq.h" #include "subscription.h" typedef struct queue_extension_t_ { sexp_t *name; sexp_t *backlog_q; queue_t waiter_q; hashtable_t subscriptions; Process *shovel; int shovel_awake; } queue_extension_t; static sexp_t *queue_extend(node_t *n, sexp_t *args) { if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { cmsg_bytes_t name = sexp_data(sexp_head(args)); queue_extension_t *q = calloc(1, sizeof(*q)); q->name = INCREF(sexp_head(args)); q->backlog_q = INCREF(sexp_new_queue()); q->waiter_q = EMPTY_QUEUE(subscription_t, link); init_hashtable(&q->subscriptions, 5, NULL, NULL); q->shovel = NULL; q->shovel_awake = 0; n->extension = q; return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); } else { return sexp_cstring("invalid args"); } } static void queue_destructor(node_t *n) { queue_extension_t *q = n->extension; if (q != NULL) { /* can be NULL if queue_extend was given invalid args */ DECREF(q->name, sexp_destructor); DECREF(q->backlog_q, sexp_destructor); { subscription_t *sub = NULL; while ((sub = dequeue(&q->waiter_q)) != NULL) { free_subscription(sub); } } destroy_hashtable(&q->subscriptions); warn("TODO: the shovel needs to be taken down as well here\n"); free(q); } } static void end_burst(queue_extension_t *q, size_t *burst_count_ptr, size_t total_count) { if (*burst_count_ptr > 0) { info("Queue <<%.*s>>: burst count %lu; total %lu\n", sexp_data(q->name).len, sexp_data(q->name).bytes, *burst_count_ptr, total_count); } *burst_count_ptr = 0; } static void shoveller(void *qv) { queue_extension_t *q = qv; size_t burst_count = 0; size_t total_count = 0; sexp_t *body = NULL; /* held */ subscription_t *sub = NULL; check_for_work: //info("Checking for work\n"); if (sexp_queue_emptyp(q->backlog_q)) { //info("Backlog empty\n"); goto wait_and_shovel; } body = INCREF(sexp_dequeue(q->backlog_q)); /* held */ find_valid_waiter: if (q->waiter_q.count == 0) { //info("No waiters\n"); sexp_queue_pushback(q->backlog_q, body); DECREF(body, sexp_destructor); goto wait_and_shovel; } sub = dequeue(&q->waiter_q); /* info("Delivering to <<%.*s>>/<<%.*s>>...\n", sexp_data(sub->sink).len, sexp_data(sub->sink).bytes, sexp_data(sub->name).len, sexp_data(sub->name).bytes); */ if (!send_to_subscription(&q->subscriptions, sub, body)) { goto find_valid_waiter; } burst_count++; total_count++; //info("Delivery successful\n"); DECREF(body, sexp_destructor); enqueue(&q->waiter_q, sub); if (burst_count >= 10000) { end_burst(q, &burst_count, total_count); yield(); } goto check_for_work; wait_and_shovel: end_burst(q, &burst_count, total_count); //info("Waiting for throck\n"); q->shovel_awake = 0; suspend(); //info("Throck received!\n"); goto check_for_work; } static void throck_shovel(queue_extension_t *q) { if (!q->shovel_awake) { if (!q->shovel) { q->shovel = spawn(shoveller, q); } else { resume(q->shovel); } q->shovel_awake = 1; } } static void queue_handle_message(node_t *n, sexp_t *m) { queue_extension_t *q = n->extension; size_t msglen = sexp_length(m); sexp_t *args; cmsg_bytes_t selector; if (msglen == 0 || !sexp_stringp(sexp_head(m))) { warn("Invalid message in queue\n"); return; } selector = sexp_data(sexp_head(m)); args = sexp_tail(m); if ((msglen == 4) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("post"))) { sexp_enqueue(q->backlog_q, sexp_listref(args, 1)); throck_shovel(q); return; } if ((msglen == 6) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("subscribe"))) { subscription_t *sub = handle_subscribe_message(&q->subscriptions, args); if (sub != NULL) { enqueue(&q->waiter_q, sub); throck_shovel(q); } return; } if ((msglen == 2) && !cmsg_bytes_cmp(selector, cmsg_cstring_bytes("unsubscribe"))) { handle_unsubscribe_message(&q->subscriptions, args); return; } warn("Message not understood in queue; selector <<%.*s>>, length %u\n", selector.len, selector.bytes, msglen); } static node_class_t queue_class = { .name = "queue", .extend = queue_extend, .destroy = queue_destructor, .handle_message = queue_handle_message }; void init_queue(void) { register_node_class(&queue_class); }