From 0a895ac5c0e1e34beddfb26f86660d8366b9540f Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 27 Dec 2010 18:50:42 -0500 Subject: [PATCH] IO framework in harness --- harness.c | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++--- harness.h | 30 +++++++++++++- 2 files changed, 137 insertions(+), 8 deletions(-) diff --git a/harness.c b/harness.c index b50f32d..e21f11b 100644 --- a/harness.c +++ b/harness.c @@ -62,19 +62,30 @@ static Process *dequeue(ProcessQueue *pq) { } } -void yield(void) { +static void enqueue_runlist(Process *p) { + p->state = PROCESS_RUNNING; + enqueue(&runlist, p); +} + +static void schedule(void) { if (current_process == NULL) { - ICHECK(setcontext(&scheduler), "yield setcontext"); + ICHECK(setcontext(&scheduler), "schedule setcontext"); } else { - enqueue(&runlist, current_process); - ICHECK(swapcontext(¤t_process->context, &scheduler), "yield swapcontext"); + ICHECK(swapcontext(¤t_process->context, &scheduler), "schedule swapcontext"); } } +void yield(void) { + enqueue_runlist(current_process); + schedule(); +} + void killproc(void) { + assert(current_process->state == PROCESS_RUNNING); + current_process->state = PROCESS_DEAD; enqueue(&deadlist, current_process); current_process = NULL; - yield(); + schedule(); } static void driver(void (*f)(void *), void *arg) { @@ -86,6 +97,8 @@ void spawn(void (*f)(void *), void *arg) { Process *p = calloc(1, sizeof(*p)); PCHECK(p, "spawn calloc"); + p->state = PROCESS_DEAD; + p->stack_base = malloc(STACK_SIZE); /* what is a sane value here? 32k for mac... */ PCHECK(p->stack_base, "stack pointer malloc"); @@ -96,7 +109,97 @@ void spawn(void (*f)(void *), void *arg) { p->context.uc_stack.ss_flags = 0; makecontext(&p->context, (void (*)(void)) driver, 2, f, arg); - enqueue(&runlist, p); + p->link = NULL; + + enqueue_runlist(p); +} + +static void io_isr(struct bufferevent *bufev, IOHandle *h) { + assert(h->p->state == PROCESS_WAITING); + enqueue_runlist(h->p); +} + +static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) { + h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE); + h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE); + if (h->p->state == PROCESS_WAITING) { + enqueue_runlist(h->p); + } else { + warn("Um, not sure what to do here. Error what %d, fd %d, ioh %p process %p\n", + what, + bufev->ev_read.ev_fd, + h, + h->p); + } +} + +IOHandle *new_iohandle(int fd) { + IOHandle *h = malloc(sizeof(*h)); + assert(current_process != NULL); + h->p = current_process; + h->fd = fd; + h->io = bufferevent_new(fd, + (evbuffercb) io_isr, + (evbuffercb) io_isr, + (everrorcb) error_isr, + h); + PCHECK(h->io, "bufferevent_new"); + h->error_direction = 0; + h->error_kind = 0; + return h; +} + +void delete_iohandle(IOHandle *h) { + bufferevent_free(h->io); + free(h); +} + +void iohandle_clear_error(IOHandle *h) { + h->error_direction = 0; + h->error_kind = 0; +} + +static void block_on_io(IOHandle *h, short event) { + ICHECK(bufferevent_enable(h->io, event), "bufferevent_enable"); + assert(current_process == h->p); + h->p->state = PROCESS_WAITING; + schedule(); + ICHECK(bufferevent_disable(h->io, event), "bufferevent_disable"); +} + +cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) { + while (EVBUFFER_LENGTH(h->io->input) < at_least) { + if (h->error_kind) { + return EMPTY_BYTES; + } + block_on_io(h, EV_READ); + } + return (cmsg_bytes_t) { + .len = EVBUFFER_LENGTH(h->io->input), + .bytes = EVBUFFER_DATA(h->io->input) + }; +} + +void iohandle_drain(IOHandle *h, size_t count) { + evbuffer_drain(h->io->input, count); +} + +void iohandle_write(IOHandle *h, cmsg_bytes_t buf) { + ICHECK(bufferevent_write(h->io, buf.bytes, buf.len), "bufferevent_write"); +} + +int iohandle_flush(IOHandle *h) { + while (EVBUFFER_LENGTH(h->io->output) > 0) { + if (h->error_kind) { + return -1; + } + block_on_io(h, EV_WRITE); + } + return 0; +} + +void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write) { + bufferevent_settimeout(h->io, timeout_read, timeout_write); } static void clean_dead_processes(void) { diff --git a/harness.h b/harness.h index 217069a..69b3369 100644 --- a/harness.h +++ b/harness.h @@ -1,12 +1,29 @@ #ifndef cmsg_harness_h #define cmsg_harness_h +typedef void (*process_main_t)(void *); + +typedef enum process_state_t_ { + PROCESS_DEAD = 0, + PROCESS_RUNNING, + PROCESS_WAITING +} process_state_t; + typedef struct Process { + process_state_t state; + void *stack_base; ucontext_t context; struct Process *link; - void *stack_base; } Process; +typedef struct IOHandle { + Process *p; + int fd; + struct bufferevent *io; + short error_direction; + short error_kind; +} IOHandle; + typedef struct ProcessQueue { int count; Process *head; @@ -16,7 +33,16 @@ typedef struct ProcessQueue { extern Process *current_process; extern void yield(void); -extern void spawn(void (*f)(void *), void *arg); +extern void spawn(process_main_t f, void *arg); + +extern IOHandle *new_iohandle(int fd); +extern void delete_iohandle(IOHandle *h); +extern void iohandle_clear_error(IOHandle *h); +extern cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least); +extern void iohandle_drain(IOHandle *h, size_t count); +extern void iohandle_write(IOHandle *h, cmsg_bytes_t buf); +extern int iohandle_flush(IOHandle *h); +extern void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write); extern void boot_harness(void);