/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ #include #include #include #include #include typedef unsigned char u_char; #include #include "cmsg_private.h" #include "harness.h" #include #ifdef __APPLE__ /* Bollocks. Looks like OS X chokes unless STACK_SIZE is a multiple of 32k. */ #define STACK_SIZE 32768 #elif linux #define STACK_SIZE 32768 #else #error Define STACK_SIZE for your platform. It should probably not be less than 32k? #endif Process *current_process = NULL; static ucontext_t scheduler; static ProcessQueue runlist = { 0, NULL, NULL }; static ProcessQueue deadlist = { 0, NULL, NULL }; static void zero_queue(ProcessQueue *pq) { pq->count = 0; pq->head = NULL; pq->tail = NULL; } static void enqueue(ProcessQueue *pq, Process *p) { p->link = NULL; if (pq->head == NULL) { pq->head = p; } else { pq->tail->link = p; } pq->tail = p; pq->count++; } static Process *dequeue(ProcessQueue *pq) { if (pq->head == NULL) { return NULL; } else { Process *p = pq->head; pq->head = p->link; if (pq->head == NULL) { pq->tail = NULL; } pq->count--; return p; } } static void enqueue_runlist(Process *p) { p->state = PROCESS_RUNNING; enqueue(&runlist, p); } static void schedule(void) { if (current_process == NULL) { ICHECK(setcontext(&scheduler), "schedule setcontext"); } else { ICHECK(swapcontext(¤t_process->context, &scheduler), "schedule swapcontext"); } } void yield(void) { enqueue_runlist(current_process); schedule(); } void killproc(void) { assert(current_process->state == PROCESS_RUNNING); current_process->state = PROCESS_DEAD; enqueue(&deadlist, current_process); current_process = NULL; schedule(); } static void driver(void (*f)(void *), void *arg) { f(arg); killproc(); } void spawn(void (*f)(void *), void *arg) { Process *p = calloc(1, sizeof(*p)); PCHECK(p, "spawn calloc"); p->state = PROCESS_DEAD; p->stack_base = malloc(STACK_SIZE); /* what is a sane value here? 32k for mac... */ PCHECK(p->stack_base, "stack pointer malloc"); ICHECK(getcontext(&p->context), "spawn getcontext"); p->context.uc_link = NULL; p->context.uc_stack.ss_sp = p->stack_base; p->context.uc_stack.ss_size = STACK_SIZE; p->context.uc_stack.ss_flags = 0; makecontext(&p->context, (void (*)(void)) driver, 2, f, arg); p->link = NULL; enqueue_runlist(p); } static void io_isr(struct bufferevent *bufev, IOHandle *h) { assert(h->p->state == PROCESS_WAITING); enqueue_runlist(h->p); } static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) { h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE); h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE); if (h->p->state == PROCESS_WAITING) { enqueue_runlist(h->p); } else { warn("Um, not sure what to do here. Error what %d, fd %d, ioh %p process %p\n", what, bufev->ev_read.ev_fd, h, h->p); } } IOHandle *new_iohandle(int fd) { IOHandle *h = malloc(sizeof(*h)); assert(current_process != NULL); h->p = current_process; h->fd = fd; h->io = bufferevent_new(fd, (evbuffercb) io_isr, (evbuffercb) io_isr, (everrorcb) error_isr, h); PCHECK(h->io, "bufferevent_new"); h->error_direction = 0; h->error_kind = 0; return h; } void delete_iohandle(IOHandle *h) { bufferevent_free(h->io); free(h); } void iohandle_clear_error(IOHandle *h) { h->error_direction = 0; h->error_kind = 0; } static void block_on_io(IOHandle *h, short event) { ICHECK(bufferevent_enable(h->io, event), "bufferevent_enable"); assert(current_process == h->p); h->p->state = PROCESS_WAITING; schedule(); ICHECK(bufferevent_disable(h->io, event), "bufferevent_disable"); } cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) { while (EVBUFFER_LENGTH(h->io->input) < at_least) { if (h->error_kind) { return EMPTY_BYTES; } block_on_io(h, EV_READ); } return (cmsg_bytes_t) { .len = EVBUFFER_LENGTH(h->io->input), .bytes = EVBUFFER_DATA(h->io->input) }; } void iohandle_drain(IOHandle *h, size_t count) { evbuffer_drain(h->io->input, count); } void iohandle_write(IOHandle *h, cmsg_bytes_t buf) { ICHECK(bufferevent_write(h->io, buf.bytes, buf.len), "bufferevent_write"); } int iohandle_flush(IOHandle *h) { while (EVBUFFER_LENGTH(h->io->output) > 0) { if (h->error_kind) { return -1; } block_on_io(h, EV_WRITE); } return 0; } void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write) { bufferevent_settimeout(h->io, timeout_read, timeout_write); } static void clean_dead_processes(void) { Process *deadp; while ((deadp = dequeue(&deadlist)) != NULL) { free(deadp->stack_base); free(deadp); } } void boot_harness(void) { ICHECK(getcontext(&scheduler), "boot_harness getcontext"); while (1) { while (runlist.count) { ProcessQueue work = runlist; zero_queue(&runlist); info("Processing %d jobs\n", work.count); while ((current_process = dequeue(&work)) != NULL) { ICHECK(swapcontext(&scheduler, ¤t_process->context), "boot_harness swapcontext"); clean_dead_processes(); } info("Polling for events\n"); event_loop(EVLOOP_NONBLOCK); } info("Blocking for events\n"); event_loop(EVLOOP_ONCE); } }