Separate socket EOF from socket error conditions

This commit is contained in:
Tony Garnock-Jones 2011-01-02 14:51:13 -05:00
parent 0b9c6a3d09
commit 544a719d21
6 changed files with 41 additions and 32 deletions

View File

@ -165,9 +165,14 @@ static void output_isr(struct bufferevent *bufev, IOHandle *h) {
}
static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) {
info("error_isr 0x%04X\n", what);
unsigned short kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE);
info("error_isr 0x%04X fd %d\n", what, h->fd);
h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE);
h->error_kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE);
if (kind == EVBUFFER_EOF) {
h->eof = 1;
} else {
h->error_kind = kind;
}
awaken_waiters(h, EV_READ | EV_WRITE);
}
@ -181,6 +186,7 @@ IOHandle *new_iohandle(int fd) {
(everrorcb) error_isr,
h);
PCHECK(h->io, "bufferevent_new");
h->eof = 0;
iohandle_clear_error(h);
return h;
}
@ -212,7 +218,7 @@ static void block_on_io(IOHandle *h, short event) {
cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) {
while (EVBUFFER_LENGTH(h->io->input) < at_least) {
if (h->error_kind) {
if (h->eof || h->error_kind) {
return EMPTY_BYTES;
}
ICHECK(bufferevent_enable(h->io, EV_READ), "bufferevent_enable");

View File

@ -21,6 +21,7 @@ typedef struct IOHandle {
Process *waiters;
int fd;
struct bufferevent *io;
int eof;
unsigned short error_direction;
unsigned short error_kind;
} IOHandle;

2
main.c
View File

@ -84,7 +84,7 @@ static void console_listener(void *arg) {
IOHandle *in_handle = new_iohandle(0);
while (1) {
cmsg_bytes_t buf = iohandle_readwait(in_handle, 1);
if (in_handle->error_kind) break;
if (buf.len == 0) break;
iohandle_drain(in_handle, buf.len);
}
delete_iohandle(in_handle);

34
relay.c
View File

@ -109,9 +109,8 @@ static void relay_main(node_t *n) {
while (1) {
DECREF(message, sexp_destructor);
message = NULL;
message = INCREF(sexp_read(inh));
if (inh->error_kind != 0) goto network_error;
if (!sexp_read(inh, &message)) goto network_error;
INCREF(message);
/*
info("fd %d --> ", r->fd);
@ -119,6 +118,7 @@ static void relay_main(node_t *n) {
*/
if (!(sexp_pairp(message) && sexp_stringp(sexp_head(message)))) {
info("Ill-formed message\n");
send_error(r->outh, "ill-formed message", NULL);
goto protocol_error;
}
@ -169,22 +169,22 @@ static void relay_main(node_t *n) {
}
network_error:
switch (inh->error_kind) {
case EVBUFFER_EOF:
info("Disconnecting fd %d normally.\n", r->fd);
break;
if (inh->eof) {
info("Disconnecting fd %d normally.\n", r->fd);
} else {
switch (inh->error_kind) {
case SEXP_ERROR_OVERFLOW:
send_sexp_syntax_error(r->outh, "sexp too big");
break;
case SEXP_ERROR_OVERFLOW:
send_sexp_syntax_error(r->outh, "sexp too big");
break;
case SEXP_ERROR_SYNTAX:
send_sexp_syntax_error(r->outh, "sexp syntax error");
break;
case SEXP_ERROR_SYNTAX:
send_sexp_syntax_error(r->outh, "sexp syntax error");
break;
default:
warn("Relay handle error on fd %d: 0x%04X\n", r->fd, inh->error_kind);
break;
default:
warn("Relay handle error on fd %d: 0x%04X\n", r->fd, inh->error_kind);
break;
}
}
protocol_error:

View File

@ -22,7 +22,7 @@ static sexp_t *read_simple_string(IOHandle *h, cmsg_bytes_t buf) {
while (1) {
buf = iohandle_readwait(h, buf.len + 1);
if (h->error_kind) return NULL;
if (buf.len == 0) return NULL;
/* Don't reset i to zero: avoids scanning the beginning of the
number repeatedly */
@ -39,6 +39,10 @@ static sexp_t *read_simple_string(IOHandle *h, cmsg_bytes_t buf) {
count = atoi((char *) buf.bytes);
iohandle_drain(h, i + 1);
buf = iohandle_readwait(h, count);
if (buf.len < count) {
/* Error or EOF. */
return NULL;
}
buf.len = count;
result = sexp_bytes(buf);
iohandle_drain(h, count);
@ -59,14 +63,11 @@ sexp_t *sexp_read_atom(IOHandle *h) {
return read_simple_string(h, EMPTY_BYTES);
}
#define CHECKH \
if (h->error_kind) goto error;
#define READ1 \
buf = iohandle_readwait(h, 1); \
CHECKH;
if (buf.len == 0) goto error;
sexp_t *sexp_read(IOHandle *h) {
int sexp_read(IOHandle *h, sexp_t **result_ptr) {
cmsg_bytes_t buf;
sexp_t *stack = NULL; /* held */
sexp_t *hint = NULL; /* held */
@ -79,7 +80,7 @@ sexp_t *sexp_read(IOHandle *h) {
case '[': {
iohandle_drain(h, 1);
hint = INCREF(read_simple_string(h, EMPTY_BYTES));
CHECKH;
if (hint == NULL) goto error;
READ1;
if (buf.bytes[0] != ']') {
h->error_kind = SEXP_ERROR_SYNTAX;
@ -93,7 +94,7 @@ sexp_t *sexp_read(IOHandle *h) {
goto skip_whitespace_in_display_hint;
}
body = INCREF(read_simple_string(h, EMPTY_BYTES));
CHECKH;
if (body == NULL) goto error;
accumulator = sexp_display_hint(hint, body);
DECREF(hint, sexp_destructor); /* these could be UNGRABs */
DECREF(body, sexp_destructor);
@ -132,7 +133,8 @@ sexp_t *sexp_read(IOHandle *h) {
}
if (stack == NULL) {
return accumulator;
*result_ptr = accumulator;
return 1;
} else {
sexp_t *current = sexp_head(stack); /* not held */
sexp_t *cell = sexp_cons(accumulator, NULL);
@ -149,7 +151,7 @@ sexp_t *sexp_read(IOHandle *h) {
DECREF(stack, sexp_destructor);
DECREF(hint, sexp_destructor);
DECREF(body, sexp_destructor);
return NULL;
return 0;
}
void write_simple_string(IOHandle *h, sexp_t *x) {

View File

@ -5,7 +5,7 @@
#define SEXP_ERROR_SYNTAX 0x8001
extern sexp_t *sexp_read_atom(IOHandle *h);
extern sexp_t *sexp_read(IOHandle *h);
extern int sexp_read(IOHandle *h, sexp_t **result_ptr);
extern unsigned short sexp_write(IOHandle *h, sexp_t *x);
extern unsigned short sexp_writeln(IOHandle *h, sexp_t *x);