Add nap(); fix blocking bugs on write flush
This commit is contained in:
parent
e37e81599b
commit
2e43b12616
48
harness.c
48
harness.c
|
@ -68,6 +68,7 @@ static void enqueue_runlist(Process *p) {
|
|||
}
|
||||
|
||||
static void schedule(void) {
|
||||
//info("schedule %p\n", current_process);
|
||||
if (current_process == NULL) {
|
||||
ICHECK(setcontext(&scheduler), "schedule setcontext");
|
||||
} else {
|
||||
|
@ -114,15 +115,45 @@ void spawn(void (*f)(void *), void *arg) {
|
|||
enqueue_runlist(p);
|
||||
}
|
||||
|
||||
static void io_isr(struct bufferevent *bufev, IOHandle *h) {
|
||||
assert(h->p->state == PROCESS_WAITING);
|
||||
void nap_isr(int fd, short what, void *arg) {
|
||||
Process *p = (Process *) arg;
|
||||
//info("nap_isr %p\n", p);
|
||||
assert((p->state == PROCESS_WAITING) && (p->wait_flags & EV_TIMEOUT));
|
||||
p->wait_flags &= ~EV_TIMEOUT;
|
||||
enqueue_runlist(p);
|
||||
}
|
||||
|
||||
void nap(long millis) {
|
||||
struct timeval tv;
|
||||
assert(current_process != NULL);
|
||||
assert(current_process->state == PROCESS_RUNNING);
|
||||
tv.tv_sec = millis / 1000;
|
||||
tv.tv_usec = (millis % 1000) * 1000;
|
||||
ICHECK(event_once(-1, EV_TIMEOUT, nap_isr, current_process, &tv), "event_once");
|
||||
current_process->state = PROCESS_WAITING;
|
||||
current_process->wait_flags |= EV_TIMEOUT;
|
||||
schedule();
|
||||
}
|
||||
|
||||
static void input_isr(struct bufferevent *bufev, IOHandle *h) {
|
||||
//info("input_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output));
|
||||
assert((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_READ));
|
||||
enqueue_runlist(h->p);
|
||||
}
|
||||
|
||||
static void output_isr(struct bufferevent *bufev, IOHandle *h) {
|
||||
//info("output_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output));
|
||||
if ((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_WRITE)) {
|
||||
enqueue_runlist(h->p);
|
||||
}
|
||||
}
|
||||
|
||||
static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) {
|
||||
info("error_isr 0x%04X %p\n", what, h->p);
|
||||
h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE);
|
||||
h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE);
|
||||
if (h->p->state == PROCESS_WAITING) {
|
||||
if ((h->p->state == PROCESS_WAITING)
|
||||
&& (h->p->wait_flags & (EV_READ | EV_WRITE))) {
|
||||
enqueue_runlist(h->p);
|
||||
} else {
|
||||
warn("Um, not sure what to do here. Error what %d, fd %d, ioh %p process %p\n",
|
||||
|
@ -139,8 +170,8 @@ IOHandle *new_iohandle(int fd) {
|
|||
h->p = current_process;
|
||||
h->fd = fd;
|
||||
h->io = bufferevent_new(fd,
|
||||
(evbuffercb) io_isr,
|
||||
(evbuffercb) io_isr,
|
||||
(evbuffercb) input_isr,
|
||||
(evbuffercb) output_isr,
|
||||
(everrorcb) error_isr,
|
||||
h);
|
||||
PCHECK(h->io, "bufferevent_new");
|
||||
|
@ -160,11 +191,11 @@ void iohandle_clear_error(IOHandle *h) {
|
|||
}
|
||||
|
||||
static void block_on_io(IOHandle *h, short event) {
|
||||
ICHECK(bufferevent_enable(h->io, event), "bufferevent_enable");
|
||||
assert(current_process == h->p);
|
||||
h->p->state = PROCESS_WAITING;
|
||||
h->p->wait_flags |= event;
|
||||
schedule();
|
||||
ICHECK(bufferevent_disable(h->io, event), "bufferevent_disable");
|
||||
h->p->wait_flags &= ~event;
|
||||
}
|
||||
|
||||
cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) {
|
||||
|
@ -172,7 +203,9 @@ cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) {
|
|||
if (h->error_kind) {
|
||||
return EMPTY_BYTES;
|
||||
}
|
||||
ICHECK(bufferevent_enable(h->io, EV_READ), "bufferevent_enable");
|
||||
block_on_io(h, EV_READ);
|
||||
ICHECK(bufferevent_disable(h->io, EV_READ), "bufferevent_disable");
|
||||
}
|
||||
return (cmsg_bytes_t) {
|
||||
.len = EVBUFFER_LENGTH(h->io->input),
|
||||
|
@ -218,6 +251,7 @@ void boot_harness(void) {
|
|||
zero_queue(&runlist);
|
||||
info("Processing %d jobs\n", work.count);
|
||||
while ((current_process = dequeue(&work)) != NULL) {
|
||||
//info("entering %p\n", current_process);
|
||||
ICHECK(swapcontext(&scheduler, ¤t_process->context), "boot_harness swapcontext");
|
||||
clean_dead_processes();
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ typedef enum process_state_t_ {
|
|||
|
||||
typedef struct Process {
|
||||
process_state_t state;
|
||||
int wait_flags;
|
||||
void *stack_base;
|
||||
ucontext_t context;
|
||||
struct Process *link;
|
||||
|
@ -34,6 +35,7 @@ extern Process *current_process;
|
|||
|
||||
extern void yield(void);
|
||||
extern void spawn(process_main_t f, void *arg);
|
||||
extern void nap(long millis);
|
||||
|
||||
extern IOHandle *new_iohandle(int fd);
|
||||
extern void delete_iohandle(IOHandle *h);
|
||||
|
|
6
relay.c
6
relay.c
|
@ -39,6 +39,10 @@ static void relay_main(struct boot_args *args) {
|
|||
|
||||
free(args);
|
||||
|
||||
iohandle_write(h, cmsg_cstring_bytes("Hi\n"));
|
||||
ICHECK(iohandle_flush(h), "iohandle_flush 1");
|
||||
nap(1000);
|
||||
iohandle_write(h, cmsg_cstring_bytes("Proceed\n"));
|
||||
iohandle_settimeout(h, 3, 0);
|
||||
while (1) {
|
||||
cmsg_bytes_t buf = iohandle_readwait(h, 1);
|
||||
|
@ -61,7 +65,7 @@ static void relay_main(struct boot_args *args) {
|
|||
iohandle_write(h, cmsg_cstring_bytes("OK, proceed\n"));
|
||||
}
|
||||
|
||||
ICHECK(iohandle_flush(h), "iohandle_flush");
|
||||
ICHECK(iohandle_flush(h), "iohandle_flush 2");
|
||||
ICHECK(close(h->fd), "close");
|
||||
delete_iohandle(h);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue