From a2fabf160552344bf75c4c3b46da866c53891088 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 31 Dec 2010 17:26:39 -0500 Subject: [PATCH] Make IOHandles shareable --- harness.c | 55 +++++++++++++++++++++++++++++++------------------------ harness.h | 2 +- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/harness.c b/harness.c index 3339c18..861e756 100644 --- a/harness.c +++ b/harness.c @@ -115,39 +115,44 @@ void nap(long millis) { schedule(); } +static void awaken_waiters(IOHandle *h, short mask) { + Process *prev = NULL; + Process *p; + Process *next; + for (p = h->waiters; p != NULL; p = next) { + next = p->link; + assert(p->state == PROCESS_WAITING); + if ((p->wait_flags & mask) != 0) { + if (prev == NULL) { + h->waiters = next; + } else { + prev->link = next; + } + enqueue_runlist(p); + } else { + prev = p; + } + } +} + static void input_isr(struct bufferevent *bufev, IOHandle *h) { - //info("input_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output)); - assert((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_READ)); - enqueue_runlist(h->p); + awaken_waiters(h, EV_READ); } static void output_isr(struct bufferevent *bufev, IOHandle *h) { - //info("output_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output)); - if ((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_WRITE)) { - enqueue_runlist(h->p); - } + awaken_waiters(h, EV_WRITE); } static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) { - info("error_isr 0x%04X %p\n", what, h->p); + info("error_isr 0x%04X\n", what); h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE); h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE); - if ((h->p->state == PROCESS_WAITING) - && (h->p->wait_flags & (EV_READ | EV_WRITE))) { - enqueue_runlist(h->p); - } else { - warn("Um, not sure what to do here. Error what %d, fd %d, ioh %p process %p\n", - what, - bufev->ev_read.ev_fd, - h, - h->p); - } + awaken_waiters(h, EV_READ | EV_WRITE); } IOHandle *new_iohandle(int fd) { IOHandle *h = malloc(sizeof(*h)); - assert(current_process != NULL); - h->p = current_process; + h->waiters = NULL; h->fd = fd; h->io = bufferevent_new(fd, (evbuffercb) input_isr, @@ -171,11 +176,13 @@ void iohandle_clear_error(IOHandle *h) { } static void block_on_io(IOHandle *h, short event) { - assert(current_process == h->p); - h->p->state = PROCESS_WAITING; - h->p->wait_flags |= event; + assert(current_process->link == NULL); + current_process->link = h->waiters; + h->waiters = current_process; + current_process->state = PROCESS_WAITING; + current_process->wait_flags |= event; schedule(); - h->p->wait_flags &= ~event; + current_process->wait_flags &= ~event; } cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) { diff --git a/harness.h b/harness.h index 7926e0b..f171f52 100644 --- a/harness.h +++ b/harness.h @@ -18,7 +18,7 @@ typedef struct Process { } Process; typedef struct IOHandle { - Process *p; + Process *waiters; int fd; struct bufferevent *io; unsigned short error_direction;