hop-2012/experiments/cmsg/queue.c

229 lines
5.9 KiB
C

/* Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved. */
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include <ucontext.h>
#include "cmsg_private.h"
#include "harness.h"
#include "ref.h"
#include "sexp.h"
#include "sexpio.h"
#include "hashtable.h"
#include "node.h"
#include "queue.h"
#include "dataq.h"
#include "messages.h"
#include "subscription.h"
typedef struct queue_extension_t_ {
sexp_t *name;
sexp_t *backlog_q;
queue_t waiter_q;
hashtable_t subscriptions;
Process *shovel;
int shovel_awake;
} queue_extension_t;
static sexp_t *queue_extend(node_t *n, sexp_t *args) {
if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) {
cmsg_bytes_t name = sexp_data(sexp_head(args));
queue_extension_t *q = calloc(1, sizeof(*q));
q->name = INCREF(sexp_head(args));
q->backlog_q = INCREF(sexp_new_queue());
q->waiter_q = EMPTY_QUEUE(subscription_t, link);
init_hashtable(&q->subscriptions, 5, NULL, NULL);
q->shovel = NULL;
q->shovel_awake = 0;
n->extension = q;
return bind_node(name, n) ? NULL : sexp_cstring("bind failed");
} else {
return sexp_cstring("invalid args");
}
}
static void queue_destructor(node_t *n) {
queue_extension_t *q = n->extension;
if (q != NULL) { /* can be NULL if queue_extend was given invalid args */
DECREF(q->name, sexp_destructor);
DECREF(q->backlog_q, sexp_destructor);
{
subscription_t *sub = NULL;
while ((sub = dequeue(&q->waiter_q)) != NULL) {
free_subscription(sub);
}
}
destroy_hashtable(&q->subscriptions);
if (q->shovel) {
warn("TODO: the shovel needs to be taken down as well here\n");
/* The difficulty is that the shovel may be running at the
moment, so careful ordering of operations is required to
avoid referencing deallocated memory. */
}
free(q);
}
}
static void end_burst(queue_extension_t *q, size_t *burst_count_ptr, size_t total_count) {
#if 0
if (*burst_count_ptr > 0) {
info("Queue <<%.*s>>: burst count %lu; total %lu\n",
sexp_data(q->name).len, sexp_data(q->name).bytes,
*burst_count_ptr, total_count);
}
#endif
*burst_count_ptr = 0;
}
static void shoveller(void *qv) {
queue_extension_t *q = qv;
size_t burst_count = 0;
size_t total_count = 0;
sexp_t *body = NULL; /* held */
subscription_t *sub = NULL;
{
cmsg_bytes_t n = sexp_data(q->name);
info("Queue <<%.*s>> busy. Shoveller entering\n", n.len, n.bytes);
}
check_for_work:
//info("Checking for work\n");
if (sexp_queue_emptyp(q->backlog_q)) {
//info("Backlog empty\n");
goto wait_and_shovel;
}
body = INCREF(sexp_dequeue(q->backlog_q)); /* held */
find_valid_waiter:
if (q->waiter_q.count == 0) {
//info("No waiters\n");
sexp_queue_pushback(q->backlog_q, body);
DECREF(body, sexp_destructor);
goto wait_and_shovel;
}
sub = dequeue(&q->waiter_q);
/*
info("Delivering to <<%.*s>>/<<%.*s>>...\n",
sexp_data(sub->sink).len, sexp_data(sub->sink).bytes,
sexp_data(sub->name).len, sexp_data(sub->name).bytes);
*/
if (!send_to_subscription(q->name, &q->subscriptions, sub, body)) {
goto find_valid_waiter;
}
burst_count++;
total_count++;
//info("Delivery successful\n");
DECREF(body, sexp_destructor);
enqueue(&q->waiter_q, sub);
if (burst_count >= 10000) {
end_burst(q, &burst_count, total_count);
yield();
}
goto check_for_work;
wait_and_shovel:
end_burst(q, &burst_count, total_count);
//info("Waiting for throck\n");
q->shovel_awake = 0;
/* TODO: if the number of active processes is large, assume we have
memory pressure, and quit the shovel early rather than waiting
for a few milliseconds to see if we're idle. */
if (nap(100)) {
cmsg_bytes_t n = sexp_data(q->name);
info("Queue <<%.*s>> idle. Shoveller exiting\n", n.len, n.bytes);
q->shovel = NULL;
return;
}
//info("Throck received!\n");
goto check_for_work;
}
static void throck_shovel(queue_extension_t *q) {
//int counter = 0;
retry:
//printf("throck %d %d %p\n", counter++, q->shovel_awake, q->shovel);
if (!q->shovel_awake) {
if (!q->shovel) {
q->shovel_awake = 1;
q->shovel = spawn(shoveller, q);
} else {
if (resume(q->shovel) == -1) {
/* The nap() in the shoveller returned and scheduled the
shoveller *just* before we got to it, but the shoveller
hasn't had a chance to run yet, so hasn't been able to
clear q->shovel and exit. The resume() attempt failed
because q->shovel's state is PROCESS_RUNNING, now that it
has been scheduled by the return of nap(), so we know that
we should back off and try again from the top. */
yield();
goto retry;
} else {
/* The resume() was successful, i.e. the nap() hadn't returned
before we tried to resume(). We know that nap() will return
zero (since the timeout didn't fire before the process was
resumed), and so the existing shoveller will continue
running. */
q->shovel_awake = 1;
}
}
}
}
static void queue_handle_message(node_t *n, sexp_t *m) {
queue_extension_t *q = n->extension;
parsed_message_t p;
if (parse_post(m, &p)) {
sexp_enqueue(q->backlog_q, p.post.body);
throck_shovel(q);
return;
}
if (parse_subscribe(m, &p)) {
subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, &p);
if (sub != NULL) {
enqueue(&q->waiter_q, sub);
throck_shovel(q);
}
return;
}
if (parse_unsubscribe(m, &p)) {
handle_unsubscribe_message(q->name, &q->subscriptions, &p);
return;
}
warn("Message not understood in queue: ");
sexp_writeln(stderr_h, m);
}
static node_class_t queue_class = {
.name = "queue",
.extend = queue_extend,
.destroy = queue_destructor,
.handle_message = queue_handle_message
};
void init_queue(void) {
register_node_class(&queue_class);
}