Make IOHandles shareable
This commit is contained in:
parent
e491109d98
commit
a2fabf1605
55
harness.c
55
harness.c
|
@ -115,39 +115,44 @@ void nap(long millis) {
|
||||||
schedule();
|
schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void awaken_waiters(IOHandle *h, short mask) {
|
||||||
|
Process *prev = NULL;
|
||||||
|
Process *p;
|
||||||
|
Process *next;
|
||||||
|
for (p = h->waiters; p != NULL; p = next) {
|
||||||
|
next = p->link;
|
||||||
|
assert(p->state == PROCESS_WAITING);
|
||||||
|
if ((p->wait_flags & mask) != 0) {
|
||||||
|
if (prev == NULL) {
|
||||||
|
h->waiters = next;
|
||||||
|
} else {
|
||||||
|
prev->link = next;
|
||||||
|
}
|
||||||
|
enqueue_runlist(p);
|
||||||
|
} else {
|
||||||
|
prev = p;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void input_isr(struct bufferevent *bufev, IOHandle *h) {
|
static void input_isr(struct bufferevent *bufev, IOHandle *h) {
|
||||||
//info("input_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output));
|
awaken_waiters(h, EV_READ);
|
||||||
assert((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_READ));
|
|
||||||
enqueue_runlist(h->p);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void output_isr(struct bufferevent *bufev, IOHandle *h) {
|
static void output_isr(struct bufferevent *bufev, IOHandle *h) {
|
||||||
//info("output_isr %p r %d w %d\n", h->p, EVBUFFER_LENGTH(bufev->input), EVBUFFER_LENGTH(bufev->output));
|
awaken_waiters(h, EV_WRITE);
|
||||||
if ((h->p->state == PROCESS_WAITING) && (h->p->wait_flags & EV_WRITE)) {
|
|
||||||
enqueue_runlist(h->p);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) {
|
static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) {
|
||||||
info("error_isr 0x%04X %p\n", what, h->p);
|
info("error_isr 0x%04X\n", what);
|
||||||
h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE);
|
h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE);
|
||||||
h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE);
|
h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE);
|
||||||
if ((h->p->state == PROCESS_WAITING)
|
awaken_waiters(h, EV_READ | EV_WRITE);
|
||||||
&& (h->p->wait_flags & (EV_READ | EV_WRITE))) {
|
|
||||||
enqueue_runlist(h->p);
|
|
||||||
} else {
|
|
||||||
warn("Um, not sure what to do here. Error what %d, fd %d, ioh %p process %p\n",
|
|
||||||
what,
|
|
||||||
bufev->ev_read.ev_fd,
|
|
||||||
h,
|
|
||||||
h->p);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
IOHandle *new_iohandle(int fd) {
|
IOHandle *new_iohandle(int fd) {
|
||||||
IOHandle *h = malloc(sizeof(*h));
|
IOHandle *h = malloc(sizeof(*h));
|
||||||
assert(current_process != NULL);
|
h->waiters = NULL;
|
||||||
h->p = current_process;
|
|
||||||
h->fd = fd;
|
h->fd = fd;
|
||||||
h->io = bufferevent_new(fd,
|
h->io = bufferevent_new(fd,
|
||||||
(evbuffercb) input_isr,
|
(evbuffercb) input_isr,
|
||||||
|
@ -171,11 +176,13 @@ void iohandle_clear_error(IOHandle *h) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void block_on_io(IOHandle *h, short event) {
|
static void block_on_io(IOHandle *h, short event) {
|
||||||
assert(current_process == h->p);
|
assert(current_process->link == NULL);
|
||||||
h->p->state = PROCESS_WAITING;
|
current_process->link = h->waiters;
|
||||||
h->p->wait_flags |= event;
|
h->waiters = current_process;
|
||||||
|
current_process->state = PROCESS_WAITING;
|
||||||
|
current_process->wait_flags |= event;
|
||||||
schedule();
|
schedule();
|
||||||
h->p->wait_flags &= ~event;
|
current_process->wait_flags &= ~event;
|
||||||
}
|
}
|
||||||
|
|
||||||
cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) {
|
cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) {
|
||||||
|
|
Loading…
Reference in New Issue