255 lines
6.5 KiB
C
255 lines
6.5 KiB
C
/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */
|
|
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
#include <stddef.h>
|
|
|
|
#include <sys/time.h>
|
|
|
|
#include <ucontext.h>
|
|
|
|
typedef unsigned char u_char;
|
|
#include <event.h>
|
|
|
|
#include "cmsg_private.h"
|
|
#include "harness.h"
|
|
#include "dataq.h"
|
|
|
|
#include <assert.h>
|
|
|
|
#ifdef __APPLE__
|
|
/* Bollocks. Looks like OS X chokes unless STACK_SIZE is a multiple of 32k. */
|
|
# include <AvailabilityMacros.h>
|
|
# if !defined(MAC_OS_X_VERSION_10_6) || MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_6
|
|
/* Hmm, and looks like 10.5 has more aggressive stack requirements than 10.6. */
|
|
# define STACK_SIZE 65536
|
|
# else
|
|
# define STACK_SIZE 32768
|
|
# endif
|
|
#elif linux
|
|
# define STACK_SIZE 32768
|
|
#else
|
|
# error Define STACK_SIZE for your platform. It should probably not be less than 32k?
|
|
#endif
|
|
|
|
static volatile int harness_running = 1;
|
|
Process *current_process = NULL;
|
|
|
|
#define EMPTY_PROCESS_QUEUE ((queue_t) { offsetof(Process, link), 0, NULL, NULL })
|
|
|
|
static ucontext_t scheduler;
|
|
static queue_t runlist = EMPTY_PROCESS_QUEUE;
|
|
static queue_t deadlist = EMPTY_PROCESS_QUEUE;
|
|
|
|
static void enqueue_runlist(Process *p) {
|
|
p->state = PROCESS_RUNNING;
|
|
enqueue(&runlist, p);
|
|
}
|
|
|
|
static void schedule(void) {
|
|
//info("schedule %p\n", current_process);
|
|
if (current_process == NULL) {
|
|
ICHECK(setcontext(&scheduler), "schedule setcontext");
|
|
} else {
|
|
ICHECK(swapcontext(¤t_process->context, &scheduler), "schedule swapcontext");
|
|
}
|
|
}
|
|
|
|
void yield(void) {
|
|
enqueue_runlist(current_process);
|
|
schedule();
|
|
}
|
|
|
|
void killproc(void) {
|
|
assert(current_process->state == PROCESS_RUNNING);
|
|
current_process->state = PROCESS_DEAD;
|
|
enqueue(&deadlist, current_process);
|
|
current_process = NULL;
|
|
schedule();
|
|
}
|
|
|
|
static void driver(void (*f)(void *), void *arg) {
|
|
f(arg);
|
|
killproc();
|
|
}
|
|
|
|
void spawn(void (*f)(void *), void *arg) {
|
|
Process *p = calloc(1, sizeof(*p));
|
|
PCHECK(p, "spawn calloc");
|
|
|
|
p->state = PROCESS_DEAD;
|
|
|
|
p->stack_base = malloc(STACK_SIZE); /* what is a sane value here? 32k for mac... */
|
|
PCHECK(p->stack_base, "stack pointer malloc");
|
|
|
|
ICHECK(getcontext(&p->context), "spawn getcontext");
|
|
p->context.uc_link = NULL;
|
|
p->context.uc_stack.ss_sp = p->stack_base;
|
|
p->context.uc_stack.ss_size = STACK_SIZE;
|
|
p->context.uc_stack.ss_flags = 0;
|
|
makecontext(&p->context, (void (*)(void)) driver, 2, f, arg);
|
|
|
|
p->link = NULL;
|
|
|
|
enqueue_runlist(p);
|
|
}
|
|
|
|
void nap_isr(int fd, short what, void *arg) {
|
|
Process *p = (Process *) arg;
|
|
//info("nap_isr %p\n", p);
|
|
assert((p->state == PROCESS_WAITING) && (p->wait_flags & EV_TIMEOUT));
|
|
p->wait_flags &= ~EV_TIMEOUT;
|
|
enqueue_runlist(p);
|
|
}
|
|
|
|
void nap(long millis) {
|
|
struct timeval tv;
|
|
assert(current_process != NULL);
|
|
assert(current_process->state == PROCESS_RUNNING);
|
|
tv.tv_sec = millis / 1000;
|
|
tv.tv_usec = (millis % 1000) * 1000;
|
|
ICHECK(event_once(-1, EV_TIMEOUT, nap_isr, current_process, &tv), "event_once");
|
|
current_process->state = PROCESS_WAITING;
|
|
current_process->wait_flags |= EV_TIMEOUT;
|
|
schedule();
|
|
}
|
|
|
|
static void awaken_waiters(IOHandle *h, short mask) {
|
|
Process *prev = NULL;
|
|
Process *p;
|
|
Process *next;
|
|
for (p = h->waiters; p != NULL; p = next) {
|
|
next = p->link;
|
|
assert(p->state == PROCESS_WAITING);
|
|
if ((p->wait_flags & mask) != 0) {
|
|
if (prev == NULL) {
|
|
h->waiters = next;
|
|
} else {
|
|
prev->link = next;
|
|
}
|
|
enqueue_runlist(p);
|
|
} else {
|
|
prev = p;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void input_isr(struct bufferevent *bufev, IOHandle *h) {
|
|
awaken_waiters(h, EV_READ);
|
|
}
|
|
|
|
static void output_isr(struct bufferevent *bufev, IOHandle *h) {
|
|
awaken_waiters(h, EV_WRITE);
|
|
}
|
|
|
|
static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) {
|
|
info("error_isr 0x%04X\n", what);
|
|
h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE);
|
|
h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE);
|
|
awaken_waiters(h, EV_READ | EV_WRITE);
|
|
}
|
|
|
|
IOHandle *new_iohandle(int fd) {
|
|
IOHandle *h = malloc(sizeof(*h));
|
|
h->waiters = NULL;
|
|
h->fd = fd;
|
|
h->io = bufferevent_new(fd,
|
|
(evbuffercb) input_isr,
|
|
(evbuffercb) output_isr,
|
|
(everrorcb) error_isr,
|
|
h);
|
|
PCHECK(h->io, "bufferevent_new");
|
|
h->error_direction = 0;
|
|
h->error_kind = 0;
|
|
return h;
|
|
}
|
|
|
|
void delete_iohandle(IOHandle *h) {
|
|
bufferevent_free(h->io);
|
|
free(h);
|
|
}
|
|
|
|
void iohandle_clear_error(IOHandle *h) {
|
|
h->error_direction = 0;
|
|
h->error_kind = 0;
|
|
}
|
|
|
|
static void block_on_io(IOHandle *h, short event) {
|
|
assert(current_process->link == NULL);
|
|
current_process->link = h->waiters;
|
|
h->waiters = current_process;
|
|
current_process->state = PROCESS_WAITING;
|
|
current_process->wait_flags |= event;
|
|
schedule();
|
|
current_process->wait_flags &= ~event;
|
|
}
|
|
|
|
cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) {
|
|
while (EVBUFFER_LENGTH(h->io->input) < at_least) {
|
|
if (h->error_kind) {
|
|
return EMPTY_BYTES;
|
|
}
|
|
ICHECK(bufferevent_enable(h->io, EV_READ), "bufferevent_enable");
|
|
block_on_io(h, EV_READ);
|
|
ICHECK(bufferevent_disable(h->io, EV_READ), "bufferevent_disable");
|
|
}
|
|
return CMSG_BYTES(EVBUFFER_LENGTH(h->io->input), EVBUFFER_DATA(h->io->input));
|
|
}
|
|
|
|
void iohandle_drain(IOHandle *h, size_t count) {
|
|
evbuffer_drain(h->io->input, count);
|
|
}
|
|
|
|
void iohandle_write(IOHandle *h, cmsg_bytes_t buf) {
|
|
ICHECK(bufferevent_write(h->io, buf.bytes, buf.len), "bufferevent_write");
|
|
}
|
|
|
|
int iohandle_flush(IOHandle *h) {
|
|
while (EVBUFFER_LENGTH(h->io->output) > 0) {
|
|
if (h->error_kind) {
|
|
return -1;
|
|
}
|
|
block_on_io(h, EV_WRITE);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write) {
|
|
bufferevent_settimeout(h->io, timeout_read, timeout_write);
|
|
}
|
|
|
|
static void clean_dead_processes(void) {
|
|
Process *deadp;
|
|
while ((deadp = dequeue(&deadlist)) != NULL) {
|
|
free(deadp->stack_base);
|
|
free(deadp);
|
|
}
|
|
}
|
|
|
|
void boot_harness(void) {
|
|
ICHECK(getcontext(&scheduler), "boot_harness getcontext");
|
|
while (1) {
|
|
while (runlist.count) {
|
|
queue_t work = runlist;
|
|
runlist = EMPTY_PROCESS_QUEUE;
|
|
info("Processing %d jobs\n", work.count);
|
|
while ((current_process = dequeue(&work)) != NULL) {
|
|
//info("entering %p\n", current_process);
|
|
ICHECK(swapcontext(&scheduler, ¤t_process->context), "boot_harness swapcontext");
|
|
clean_dead_processes();
|
|
}
|
|
//info("Polling for events\n");
|
|
event_loop(EVLOOP_NONBLOCK);
|
|
}
|
|
if (!harness_running) break;
|
|
//info("Blocking for events\n");
|
|
event_loop(EVLOOP_ONCE);
|
|
}
|
|
}
|
|
|
|
void interrupt_harness(void) {
|
|
info("Interrupting harness\n");
|
|
harness_running = 0;
|
|
}
|