/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ #include #include #include #include #include typedef unsigned char u_char; #include #include "cmsg_private.h" #include "harness.h" #include #ifdef __APPLE__ /* Bollocks. Looks like OS X chokes unless STACK_SIZE is a multiple of 32k. */ # include # if !defined(MAC_OS_X_VERSION_10_6) || MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_6 /* Hmm, and looks like 10.5 has more aggressive stack requirements than 10.6. */ # define STACK_SIZE 65536 # else # define STACK_SIZE 32768 # endif #elif linux # define STACK_SIZE 32768 #else # error Define STACK_SIZE for your platform. It should probably not be less than 32k? #endif Process *current_process = NULL; static ucontext_t scheduler; static ProcessQueue runlist = { 0, NULL, NULL }; static ProcessQueue deadlist = { 0, NULL, NULL }; static void zero_queue(ProcessQueue *pq) { pq->count = 0; pq->head = NULL; pq->tail = NULL; } static void enqueue(ProcessQueue *pq, Process *p) { p->link = NULL; if (pq->head == NULL) { pq->head = p; } else { pq->tail->link = p; } pq->tail = p; pq->count++; } static Process *dequeue(ProcessQueue *pq) { if (pq->head == NULL) { return NULL; } else { Process *p = pq->head; pq->head = p->link; if (pq->head == NULL) { pq->tail = NULL; } pq->count--; return p; } } static void enqueue_runlist(Process *p) { p->state = PROCESS_RUNNING; enqueue(&runlist, p); } static void schedule(void) { //info("schedule %p\n", current_process); if (current_process == NULL) { ICHECK(setcontext(&scheduler), "schedule setcontext"); } else { ICHECK(swapcontext(¤t_process->context, &scheduler), "schedule swapcontext"); } } void yield(void) { enqueue_runlist(current_process); schedule(); } void killproc(void) { assert(current_process->state == PROCESS_RUNNING); current_process->state = PROCESS_DEAD; enqueue(&deadlist, current_process); current_process = NULL; schedule(); } static void driver(void (*f)(void *), void *arg) { f(arg); killproc(); } void spawn(void (*f)(void *), void *arg) { Process *p = calloc(1, sizeof(*p)); PCHECK(p, "spawn calloc"); p->state = PROCESS_DEAD; p->stack_base = malloc(STACK_SIZE); /* what is a sane value here? 32k for mac... */ PCHECK(p->stack_base, "stack pointer malloc"); ICHECK(getcontext(&p->context), "spawn getcontext"); p->context.uc_link = NULL; p->context.uc_stack.ss_sp = p->stack_base; p->context.uc_stack.ss_size = STACK_SIZE; p->context.uc_stack.ss_flags = 0; makecontext(&p->context, (void (*)(void)) driver, 2, f, arg); p->link = NULL; enqueue_runlist(p); } void nap_isr(int fd, short what, void *arg) { Process *p = (Process *) arg; //info("nap_isr %p\n", p); assert((p->state == PROCESS_WAITING) && (p->wait_flags & EV_TIMEOUT)); p->wait_flags &= ~EV_TIMEOUT; enqueue_runlist(p); } void nap(long millis) { struct timeval tv; assert(current_process != NULL); assert(current_process->state == PROCESS_RUNNING); tv.tv_sec = millis / 1000; tv.tv_usec = (millis % 1000) * 1000; ICHECK(event_once(-1, EV_TIMEOUT, nap_isr, current_process, &tv), "event_once"); current_process->state = PROCESS_WAITING; current_process->wait_flags |= EV_TIMEOUT; schedule(); } static void input_isr(struct bufferevent *bufev, IOHandle *h) { //info("input_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output)); assert((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_READ)); enqueue_runlist(h->p); } static void output_isr(struct bufferevent *bufev, IOHandle *h) { //info("output_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output)); if ((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_WRITE)) { enqueue_runlist(h->p); } } static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) { info("error_isr 0x%04X %p\n", what, h->p); h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE); h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE); if ((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & (EV_READ | EV_WRITE))) { enqueue_runlist(h->p); } else { warn("Um, not sure what to do here. Error what %d, fd %d, ioh %p process %p\n", what, bufev->ev_read.ev_fd, h, h->p); } } IOHandle *new_iohandle(int fd) { IOHandle *h = malloc(sizeof(*h)); assert(current_process != NULL); h->p = current_process; h->fd = fd; h->io = bufferevent_new(fd, (evbuffercb) input_isr, (evbuffercb) output_isr, (everrorcb) error_isr, h); PCHECK(h->io, "bufferevent_new"); h->error_direction = 0; h->error_kind = 0; return h; } void delete_iohandle(IOHandle *h) { bufferevent_free(h->io); free(h); } void iohandle_clear_error(IOHandle *h) { h->error_direction = 0; h->error_kind = 0; } static void block_on_io(IOHandle *h, short event) { assert(current_process == h->p); h->p->state = PROCESS_WAITING; h->p->wait_flags |= event; schedule(); h->p->wait_flags &= ~event; } cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) { while (EVBUFFER_LENGTH(h->io->input) < at_least) { if (h->error_kind) { return EMPTY_BYTES; } ICHECK(bufferevent_enable(h->io, EV_READ), "bufferevent_enable"); block_on_io(h, EV_READ); ICHECK(bufferevent_disable(h->io, EV_READ), "bufferevent_disable"); } return (cmsg_bytes_t) { .len = EVBUFFER_LENGTH(h->io->input), .bytes = EVBUFFER_DATA(h->io->input) }; } void iohandle_drain(IOHandle *h, size_t count) { evbuffer_drain(h->io->input, count); } void iohandle_write(IOHandle *h, cmsg_bytes_t buf) { ICHECK(bufferevent_write(h->io, buf.bytes, buf.len), "bufferevent_write"); } int iohandle_flush(IOHandle *h) { while (EVBUFFER_LENGTH(h->io->output) > 0) { if (h->error_kind) { return -1; } block_on_io(h, EV_WRITE); } return 0; } void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write) { bufferevent_settimeout(h->io, timeout_read, timeout_write); } static void clean_dead_processes(void) { Process *deadp; while ((deadp = dequeue(&deadlist)) != NULL) { free(deadp->stack_base); free(deadp); } } void boot_harness(void) { ICHECK(getcontext(&scheduler), "boot_harness getcontext"); while (1) { while (runlist.count) { ProcessQueue work = runlist; zero_queue(&runlist); info("Processing %d jobs\n", work.count); while ((current_process = dequeue(&work)) != NULL) { //info("entering %p\n", current_process); ICHECK(swapcontext(&scheduler, ¤t_process->context), "boot_harness swapcontext"); clean_dead_processes(); } //info("Polling for events\n"); event_loop(EVLOOP_NONBLOCK); } //info("Blocking for events\n"); event_loop(EVLOOP_ONCE); } }