diff --git a/harness.c b/harness.c index e21f11b..a43d7d3 100644 --- a/harness.c +++ b/harness.c @@ -68,6 +68,7 @@ static void enqueue_runlist(Process *p) { } static void schedule(void) { + //info("schedule %p\n", current_process); if (current_process == NULL) { ICHECK(setcontext(&scheduler), "schedule setcontext"); } else { @@ -114,15 +115,45 @@ void spawn(void (*f)(void *), void *arg) { enqueue_runlist(p); } -static void io_isr(struct bufferevent *bufev, IOHandle *h) { - assert(h->p->state == PROCESS_WAITING); +void nap_isr(int fd, short what, void *arg) { + Process *p = (Process *) arg; + //info("nap_isr %p\n", p); + assert((p->state == PROCESS_WAITING) && (p->wait_flags & EV_TIMEOUT)); + p->wait_flags &= ~EV_TIMEOUT; + enqueue_runlist(p); +} + +void nap(long millis) { + struct timeval tv; + assert(current_process != NULL); + assert(current_process->state == PROCESS_RUNNING); + tv.tv_sec = millis / 1000; + tv.tv_usec = (millis % 1000) * 1000; + ICHECK(event_once(-1, EV_TIMEOUT, nap_isr, current_process, &tv), "event_once"); + current_process->state = PROCESS_WAITING; + current_process->wait_flags |= EV_TIMEOUT; + schedule(); +} + +static void input_isr(struct bufferevent *bufev, IOHandle *h) { + //info("input_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output)); + assert((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_READ)); enqueue_runlist(h->p); } +static void output_isr(struct bufferevent *bufev, IOHandle *h) { + //info("output_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output)); + if ((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_WRITE)) { + enqueue_runlist(h->p); + } +} + static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) { + info("error_isr 0x%04X %p\n", what, h->p); h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE); h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE); - if (h->p->state == PROCESS_WAITING) { + if ((h->p->state == PROCESS_WAITING) + && (h->p->wait_flags & (EV_READ | EV_WRITE))) { enqueue_runlist(h->p); } else { warn("Um, not sure what to do here. Error what %d, fd %d, ioh %p process %p\n", @@ -139,8 +170,8 @@ IOHandle *new_iohandle(int fd) { h->p = current_process; h->fd = fd; h->io = bufferevent_new(fd, - (evbuffercb) io_isr, - (evbuffercb) io_isr, + (evbuffercb) input_isr, + (evbuffercb) output_isr, (everrorcb) error_isr, h); PCHECK(h->io, "bufferevent_new"); @@ -160,11 +191,11 @@ void iohandle_clear_error(IOHandle *h) { } static void block_on_io(IOHandle *h, short event) { - ICHECK(bufferevent_enable(h->io, event), "bufferevent_enable"); assert(current_process == h->p); h->p->state = PROCESS_WAITING; + h->p->wait_flags |= event; schedule(); - ICHECK(bufferevent_disable(h->io, event), "bufferevent_disable"); + h->p->wait_flags &= ~event; } cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) { @@ -172,7 +203,9 @@ cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) { if (h->error_kind) { return EMPTY_BYTES; } + ICHECK(bufferevent_enable(h->io, EV_READ), "bufferevent_enable"); block_on_io(h, EV_READ); + ICHECK(bufferevent_disable(h->io, EV_READ), "bufferevent_disable"); } return (cmsg_bytes_t) { .len = EVBUFFER_LENGTH(h->io->input), @@ -218,6 +251,7 @@ void boot_harness(void) { zero_queue(&runlist); info("Processing %d jobs\n", work.count); while ((current_process = dequeue(&work)) != NULL) { + //info("entering %p\n", current_process); ICHECK(swapcontext(&scheduler, ¤t_process->context), "boot_harness swapcontext"); clean_dead_processes(); } diff --git a/harness.h b/harness.h index 69b3369..5d5cda6 100644 --- a/harness.h +++ b/harness.h @@ -11,6 +11,7 @@ typedef enum process_state_t_ { typedef struct Process { process_state_t state; + int wait_flags; void *stack_base; ucontext_t context; struct Process *link; @@ -34,6 +35,7 @@ extern Process *current_process; extern void yield(void); extern void spawn(process_main_t f, void *arg); +extern void nap(long millis); extern IOHandle *new_iohandle(int fd); extern void delete_iohandle(IOHandle *h); diff --git a/relay.c b/relay.c index ed6c3ae..984e272 100644 --- a/relay.c +++ b/relay.c @@ -39,6 +39,10 @@ static void relay_main(struct boot_args *args) { free(args); + iohandle_write(h, cmsg_cstring_bytes("Hi\n")); + ICHECK(iohandle_flush(h), "iohandle_flush 1"); + nap(1000); + iohandle_write(h, cmsg_cstring_bytes("Proceed\n")); iohandle_settimeout(h, 3, 0); while (1) { cmsg_bytes_t buf = iohandle_readwait(h, 1); @@ -61,7 +65,7 @@ static void relay_main(struct boot_args *args) { iohandle_write(h, cmsg_cstring_bytes("OK, proceed\n")); } - ICHECK(iohandle_flush(h), "iohandle_flush"); + ICHECK(iohandle_flush(h), "iohandle_flush 2"); ICHECK(close(h->fd), "close"); delete_iohandle(h); }