Steps toward node, factory and relay functionality.

This commit is contained in:
Tony Garnock-Jones 2010-12-29 23:56:31 -05:00
parent 17edf15cbd
commit a3ebf4df37
5 changed files with 232 additions and 33 deletions

View File

@ -1,5 +1,5 @@
TARGET = cmsg
OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o
OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o
CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O0 -g
#CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O3

25
main.c
View File

@ -13,11 +13,36 @@ typedef unsigned char u_char;
#include "cmsg_private.h"
#include "harness.h"
#include "net.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
static node_t *factory_construct(node_class_t *nc, sexp_t *args) {
return malloc(sizeof(node_t));
}
static void factory_handle_message(node_t *n, sexp_t *m) {
info("factory_handle_message\n");
}
static node_class_t factory_class = {
.name = "factory",
.construct = factory_construct,
.destroy = (node_destructor_fn_t) free,
.handle_message = factory_handle_message
};
static void init_factory(void) {
bind_node(cmsg_cstring_bytes("factory"), new_node(&factory_class, NULL));
}
int main(int argc, char *argv[]) {
info("cmsg, Copyright (C) 2010 Tony Garnock-Jones. All rights reserved.\n");
event_init();
info("Using libevent version %s\n", event_get_version());
init_node();
init_factory();
start_net(5671);
boot_harness();
return 0;

121
node.c Normal file
View File

@ -0,0 +1,121 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <assert.h>
#include "cmsg_private.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
static hashtable_t node_class_table;
static hashtable_t directory;
static void *node_incref(void *arg) {
return INCREF((node_t *) arg);
}
static void node_decref(void *arg) {
DECREF((node_t *) arg, node_destructor);
}
void init_node(void) {
init_hashtable(&node_class_table,
31,
NULL,
NULL);
init_hashtable(&directory,
10007,
node_incref,
node_decref);
}
void register_node_class(node_class_t *nc) {
cmsg_bytes_t key = cmsg_cstring_bytes(nc->name);
if (hashtable_contains(&node_class_table, key)) {
die("Duplicate node class name %s\n", nc->name);
}
hashtable_put(&node_class_table, key, nc);
}
node_class_t *lookup_node_class(cmsg_bytes_t name) {
node_class_t *nc = NULL;
hashtable_get(&node_class_table, name, (void **) &nc);
return nc;
}
node_t *new_node(node_class_t *nc, sexp_t *args) {
node_t *n = nc->construct(nc, args);
n->refcount = ZERO_REFCOUNT();
n->node_class = nc;
init_hashtable(&n->names, 5, NULL, NULL);
return n;
}
void unbind_on_destroy(void *context, cmsg_bytes_t key, void *value) {
unbind_node(key);
}
void node_destructor(node_t *n) {
hashtable_foreach(&n->names, unbind_on_destroy, NULL);
destroy_hashtable(&n->names);
n->node_class->destroy(n);
}
node_t *lookup_node(cmsg_bytes_t name) {
node_t *n = NULL;
hashtable_get(&directory, name, (void **) &n);
return n;
}
int bind_node(cmsg_bytes_t name, node_t *n) {
if (hashtable_contains(&directory, name)) {
return 0;
}
hashtable_put(&directory, name, n);
hashtable_put(&n->names, name, NULL);
return 1;
}
int unbind_node(cmsg_bytes_t name) {
node_t *n = NULL;
hashtable_get(&directory, name, (void **) &n);
if (n == NULL) {
return 0;
} else {
hashtable_erase(&n->names, name);
hashtable_erase(&directory, name);
return 1;
}
}
int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body) {
static sexp_t *post_atom = NULL;
sexp_t *msg = NULL;
int result;
if (post_atom == NULL) {
post_atom = INCREF(sexp_bytes(cmsg_cstring_bytes("post")));
}
msg = sexp_cons(body, msg);
msg = sexp_cons(sexp_bytes(name), msg);
msg = sexp_cons(post_atom, msg);
INCREF(msg);
result = send_node(node, msg);
DECREF(msg, sexp_destructor);
return result;
}
int send_node(cmsg_bytes_t node, sexp_t *message) {
node_t *n = lookup_node(node);
if (n == NULL) {
return 0;
}
n->node_class->handle_message(n, message);
return 1;
}

30
node.h
View File

@ -2,15 +2,37 @@
#define cmsg_node_h
typedef struct node_t_ {
refcount_t refcount;
struct node_class_t_ *node_class;
cmsg_bytes_t name; /* used as (partial) routing key for metamessages */
hashtable_t names;
} node_t;
typedef node_t *(*node_constructor_fn_t)(struct node_class_t_ *nc, sexp_t *args);
typedef void (*node_destructor_fn_t)(node_t *n);
typedef void (*node_message_handler_fn_t)(node_t *n, sexp_t *m);
typedef struct node_class_t_ {
void (*destroy)(node_t *n);
void (*handle_message)(node_t *n, msg_t *m);
char const *name;
node_constructor_fn_t construct;
node_destructor_fn_t destroy;
node_message_handler_fn_t handle_message;
} node_class_t;
extern node_t *new_node(
extern void init_node(void);
extern void basic_node_destroy(node_t *n);
extern void register_node_class(node_class_t *nc);
extern node_class_t *lookup_node_class(cmsg_bytes_t name);
extern node_t *new_node(node_class_t *nc, sexp_t *args);
extern void node_destructor(node_t *n);
extern node_t *lookup_node(cmsg_bytes_t name);
extern int bind_node(cmsg_bytes_t name, node_t *n);
extern int unbind_node(cmsg_bytes_t name);
extern int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body);
extern int send_node(cmsg_bytes_t node, sexp_t *message);
#endif

87
relay.c
View File

@ -3,6 +3,7 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/uio.h>
@ -31,9 +32,23 @@ struct boot_args {
int fd;
};
static void send_sexp_syntax_error(IOHandle *h, char const *message) {
sexp_t *m = NULL;
m = sexp_cons(sexp_bytes(cmsg_cstring_bytes("http://people.csail.mit.edu/rivest/Sexp.txt")), m);
m = sexp_cons(sexp_bytes(cmsg_cstring_bytes(message)), m);
m = sexp_cons(sexp_bytes(cmsg_cstring_bytes("error")), m);
INCREF(m);
iohandle_clear_error(h);
ICHECK(sexp_write(h, m), "send_sexp_syntax_error sexp_write");
DECREF(m, sexp_destructor);
iohandle_flush(h); /* ignore result here, there's not much we can do with it */
}
static void relay_main(struct boot_args *args) {
IOHandle *h = new_iohandle(args->fd);
IOHandle *out = new_iohandle(1);
IOHandle *out_handle = new_iohandle(1);
sexp_t *message = NULL; /* held */
{
char name[256];
@ -43,41 +58,57 @@ static void relay_main(struct boot_args *args) {
free(args);
iohandle_write(h, cmsg_cstring_bytes("Hi\n"));
iohandle_write(h, cmsg_cstring_bytes("(3:hop1:0)"));
ICHECK(iohandle_flush(h), "iohandle_flush 1");
nap(1000);
iohandle_write(h, cmsg_cstring_bytes("Proceed\n"));
//iohandle_settimeout(h, 3, 0);
loop:
{
sexp_t *x = sexp_read(h);
switch (h->error_kind) {
case 0:
fflush(NULL);
sexp_write(out, x);
iohandle_write(out, cmsg_cstring_bytes("\n"));
ICHECK(iohandle_flush(out), "iohandle_flush out");
DECREF(x, sexp_destructor);
iohandle_write(h, cmsg_cstring_bytes("OK, proceed\n"));
goto loop;
while (1) {
DECREF(message, sexp_destructor);
message = NULL;
message = INCREF(sexp_read(h));
case EVBUFFER_TIMEOUT:
info("Timeout\n");
iohandle_clear_error(h);
iohandle_write(h, cmsg_cstring_bytes("Timed out\n"));
ICHECK(iohandle_flush(h), "iohandle_flush 2");
break;
if (h->error_kind != 0) goto handle_error;
default:
info("Error! 0x%04X\n", h->error_kind);
break;
}
fflush(NULL);
sexp_write(out_handle, message);
iohandle_write(out_handle, cmsg_cstring_bytes("\n"));
ICHECK(iohandle_flush(out_handle), "iohandle_flush out_handle");
iohandle_write(h, cmsg_cstring_bytes("OK, proceed\n"));
}
ICHECK(close(h->fd), "close");
handle_error:
switch (h->error_kind) {
case EVBUFFER_EOF:
info("Disconnecting fd %d normally.\n", h->fd);
break;
case SEXP_ERROR_OVERFLOW:
send_sexp_syntax_error(h, "sexp too big");
break;
case SEXP_ERROR_SYNTAX:
send_sexp_syntax_error(h, "sexp syntax error");
break;
default:
warn("Relay handle error on fd %d: 0x%04X\n", h->fd, h->error_kind);
break;
}
goto cleanup;
cleanup:
goto closedown;
closedown:
if (close(h->fd) == -1) {
/* log errors as warnings here and keep on trucking */
warn("Closing file descriptor %d produced errno %d: %s\n",
h->fd, errno, strerror(errno));
}
delete_iohandle(h);
delete_iohandle(out);
delete_iohandle(out_handle);
}
void start_relay(struct sockaddr_in const *peername, int fd) {