From f966b37d7ade78cb2e500805112be64271ffea5c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 27 Dec 2010 16:56:42 -0500 Subject: [PATCH] Initial commit --- .gitignore | 3 ++ Makefile | 18 +++++++ cmsg_private.h | 24 ++++++++++ harness.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++++ harness.h | 23 +++++++++ main.c | 24 ++++++++++ net.c | 97 ++++++++++++++++++++++++++++++++++++++ net.h | 9 ++++ node.h | 14 ++++++ relay.c | 45 ++++++++++++++++++ relay.h | 6 +++ util.c | 62 ++++++++++++++++++++++++ 12 files changed, 450 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 cmsg_private.h create mode 100644 harness.c create mode 100644 harness.h create mode 100644 main.c create mode 100644 net.c create mode 100644 net.h create mode 100644 node.h create mode 100644 relay.c create mode 100644 relay.h create mode 100644 util.c diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3f0df94 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +scratch/ +*.o +cmsg diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b84df12 --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +TARGET = cmsg +OBJECTS = main.o harness.o net.o util.o relay.o + +CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O0 -g +#CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O3 + +all: $(TARGET) + +$(TARGET): $(OBJECTS) + $(CC) $(CFLAGS) -o $@ $(OBJECTS) -levent + +%.o: %.c + $(CC) $(CFLAGS) -c $< + +clean: + rm -f $(TARGET) + rm -f $(OBJECTS) + rm -rf *.dSYM diff --git a/cmsg_private.h b/cmsg_private.h new file mode 100644 index 0000000..37fc507 --- /dev/null +++ b/cmsg_private.h @@ -0,0 +1,24 @@ +#ifndef cmsg_private_h +#define cmsg_private_h + +typedef struct cmsg_bytes_t { + size_t len; + void *bytes; +} cmsg_bytes_t; + +#define EMPTY_BYTES ((cmsg_bytes_t) { NULL, 0 }) + +extern cmsg_bytes_t cmsg_cstring_bytes(char const *cstr); +extern cmsg_bytes_t cmsg_bytes_malloc_dup(cmsg_bytes_t src); +extern cmsg_bytes_t cmsg_bytes_malloc(size_t amount); +extern void cmsg_bytes_free(cmsg_bytes_t bytes); + +#define ICHECK(result, message) do { if ((result) == -1) { perror(message); exit(2); } } while (0) +#define BCHECK(result, message) do { if (!(result)) { perror(message); exit(2); } } while (0) +#define PCHECK(result, message) do { if (!(result)) { perror(message); exit(2); } } while (0) + +extern void die(char const *format, ...); +extern void warn(char const *format, ...); +extern void info(char const *format, ...); + +#endif diff --git a/harness.c b/harness.c new file mode 100644 index 0000000..0c9b840 --- /dev/null +++ b/harness.c @@ -0,0 +1,125 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include + +#include + +#include + +typedef unsigned char u_char; +#include + +#include "cmsg_private.h" +#include "harness.h" + +#ifdef __APPLE__ +/* Bollocks. Looks like OS X chokes unless STACK_SIZE is a multiple of 32k. */ +#define STACK_SIZE 32768 +#elif linux +#define STACK_SIZE 4096 +#else +#error Define STACK_SIZE for your platform. +#endif + +Process *current_process = NULL; + +static ucontext_t scheduler; +static ProcessQueue runlist = { 0, NULL, NULL }; +static ProcessQueue deadlist = { 0, NULL, NULL }; + +static void zero_queue(ProcessQueue *pq) { + pq->count = 0; + pq->head = NULL; + pq->tail = NULL; +} + +static void enqueue(ProcessQueue *pq, Process *p) { + p->link = NULL; + if (pq->head == NULL) { + pq->head = p; + } else { + pq->tail->link = p; + } + pq->tail = p; + pq->count++; +} + +static Process *dequeue(ProcessQueue *pq) { + if (pq->head == NULL) { + return NULL; + } else { + Process *p = pq->head; + pq->head = p->link; + if (pq->head == NULL) { + pq->tail = NULL; + } + pq->count--; + return p; + } +} + +void yield(void) { + if (current_process == NULL) { + ICHECK(setcontext(&scheduler), "yield setcontext"); + } else { + enqueue(&runlist, current_process); + ICHECK(swapcontext(¤t_process->context, &scheduler), "yield swapcontext"); + } +} + +void killproc(void) { + enqueue(&deadlist, current_process); + current_process = NULL; + yield(); +} + +static void driver(void (*f)(void *), void *arg) { + f(arg); + killproc(); +} + +void spawn(void (*f)(void *), void *arg) { + Process *p = calloc(1, sizeof(*p)); + PCHECK(p, "spawn calloc"); + + p->stack_base = malloc(STACK_SIZE); /* what is a sane value here? 32k for mac... */ + PCHECK(p->stack_base, "stack pointer malloc"); + + ICHECK(getcontext(&p->context), "spawn getcontext"); + p->context.uc_link = NULL; + p->context.uc_stack.ss_sp = p->stack_base; + p->context.uc_stack.ss_size = STACK_SIZE; + p->context.uc_stack.ss_flags = 0; + makecontext(&p->context, (void (*)(void)) driver, 2, f, arg); + + enqueue(&runlist, p); +} + +static void clean_dead_processes(void) { + Process *deadp; + while ((deadp = dequeue(&deadlist)) != NULL) { + free(deadp->stack_base); + free(deadp); + } +} + +void boot_harness(void) { + ICHECK(getcontext(&scheduler), "boot_harness getcontext"); + while (1) { + while (runlist.count) { + ProcessQueue work = runlist; + zero_queue(&runlist); + info("Processing %d jobs\n", work.count); + while ((current_process = dequeue(&work)) != NULL) { + ICHECK(swapcontext(&scheduler, ¤t_process->context), "boot_harness swapcontext"); + clean_dead_processes(); + } + info("Polling for events\n"); + event_loop(EVLOOP_NONBLOCK); + } + info("Blocking for events\n"); + event_loop(EVLOOP_ONCE); + } +} diff --git a/harness.h b/harness.h new file mode 100644 index 0000000..217069a --- /dev/null +++ b/harness.h @@ -0,0 +1,23 @@ +#ifndef cmsg_harness_h +#define cmsg_harness_h + +typedef struct Process { + ucontext_t context; + struct Process *link; + void *stack_base; +} Process; + +typedef struct ProcessQueue { + int count; + Process *head; + Process *tail; +} ProcessQueue; + +extern Process *current_process; + +extern void yield(void); +extern void spawn(void (*f)(void *), void *arg); + +extern void boot_harness(void); + +#endif diff --git a/main.c b/main.c new file mode 100644 index 0000000..ef7b606 --- /dev/null +++ b/main.c @@ -0,0 +1,24 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include + +#include +#include + +typedef unsigned char u_char; +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "net.h" + +int main(int argc, char *argv[]) { + info("cmsg, Copyright (C) 2010 Tony Garnock-Jones. All rights reserved.\n"); + event_init(); + info("Using libevent version %s\n", event_get_version()); + start_net(5671); + boot_harness(); + return 0; +} diff --git a/net.c b/net.c new file mode 100644 index 0000000..90b532f --- /dev/null +++ b/net.c @@ -0,0 +1,97 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +typedef unsigned char u_char; +#include + +#include "cmsg_private.h" +#include "relay.h" + +static struct event accept_event; + +void get_addr_name(char *namebuf, size_t buflen, struct sockaddr_in const *sin) { + unsigned char *addr = (unsigned char *) &sin->sin_addr.s_addr; + struct hostent *h = gethostbyaddr(addr, 4, AF_INET); + + if (h == NULL) { + snprintf(namebuf, buflen, "%u.%u.%u.%u", addr[0], addr[1], addr[2], addr[3]); + } else { + snprintf(namebuf, buflen, "%s", h->h_name); + } +} + +char const *endpoint_name(struct sockaddr_in const *peername) { + char name[256]; + static char result[256]; + get_addr_name(name, sizeof(name), peername); + snprintf(result, sizeof(result), "%s:%d", name, ntohs(peername->sin_port)); + return result; +} + +static void accept_connection(int servfd, short what, void *arg) { + struct sockaddr_in s; + socklen_t addrlen = sizeof(s); + int fd = accept(servfd, (struct sockaddr *) &s, &addrlen); + + if (fd == -1) { + if (errno != EAGAIN && errno != EINTR) { + warn("accept: errno %d (%s)\n", errno, strerror(errno)); + } + return; + } + + start_relay(&s, fd); +} + +void start_net(int listen_port) { + int servfd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in s; + + if (servfd < 0) { + die("Could not open listen socket.\n"); + } + + s.sin_family = AF_INET; + s.sin_addr.s_addr = htonl(INADDR_ANY); + s.sin_port = htons(listen_port); + + { + int i = 1; + setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)); // don't care if this fails + } + + if (bind(servfd, (struct sockaddr *) &s, sizeof(s)) < 0) { + die("Could not bind listen socket.\n"); + } + + if (listen(servfd, 5) < 0) { + int savedErrno = errno; + die("Could not listen on listen socket (errno %d: %s).\n", + savedErrno, strerror(savedErrno)); + } + + event_set(&accept_event, servfd, EV_READ | EV_PERSIST, accept_connection, NULL); + if (event_add(&accept_event, NULL) == -1) { + die("Could not add accept_event."); + } + + info("Accepting connections on port %d.\n", listen_port); +} diff --git a/net.h b/net.h new file mode 100644 index 0000000..351bf2f --- /dev/null +++ b/net.h @@ -0,0 +1,9 @@ +#ifndef cmsg_net_h +#define cmsg_net_h + +extern void get_addr_name(char *namebuf, size_t buflen, struct sockaddr_in const *sin); +extern char const *endpoint_name(struct sockaddr_in const *peername); + +extern void start_net(int listen_port); + +#endif diff --git a/node.h b/node.h new file mode 100644 index 0000000..f917c4a --- /dev/null +++ b/node.h @@ -0,0 +1,14 @@ +#ifndef cmsg_node_h +#define cmsg_node_h + +typedef struct Node { + struct NodeClass *node_class; + cmsg_bytes_t name; /* used as (partial) routing key for metamessages */ +} Node; + +typedef struct NodeClass { + void (*destroy)(Node *n); + void (*handle_message)(Node *n, void *buffer, size_t len); +} NodeClass; + +#endif diff --git a/relay.c b/relay.c new file mode 100644 index 0000000..6f37e77 --- /dev/null +++ b/relay.c @@ -0,0 +1,45 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef unsigned char u_char; +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "relay.h" +#include "net.h" + +void start_relay(struct sockaddr_in const *peername, int fd) { + /* + connstate_t *conn = calloc(1, sizeof(connstate_t)); + conn->peername = *peername; + conn->fd = fd; + conn->amqp_conn = amqp_new_connection(); + amqp_set_sockfd(conn->amqp_conn, fd); + conn->io = bufferevent_new(fd, + (evbuffercb) read_callback, + NULL, + (everrorcb) error_callback, + conn); + bufferevent_settimeout(conn->io, PROTOCOL_HEADER_TIMEOUT, 0); + bufferevent_enable(conn->io, EV_READ | EV_WRITE); + conn->state = CONNECTION_STATE_INITIAL; + conn->vhost = NULL; + */ + + info("Accepted connection from %s on fd %d\n", endpoint_name(peername), fd); +} diff --git a/relay.h b/relay.h new file mode 100644 index 0000000..dc7d92e --- /dev/null +++ b/relay.h @@ -0,0 +1,6 @@ +#ifndef cmsg_relay_h +#define cmsg_relay_h + +extern void start_relay(struct sockaddr_in const *peername, int fd); + +#endif diff --git a/util.c b/util.c new file mode 100644 index 0000000..f2d280b --- /dev/null +++ b/util.c @@ -0,0 +1,62 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include + +#include + +#include "cmsg_private.h" + +cmsg_bytes_t cmsg_cstring_bytes(char const *cstr) { + cmsg_bytes_t result; + result.len = strlen(cstr); + result.bytes = (void *) cstr; + return result; +} + +cmsg_bytes_t cmsg_bytes_malloc_dup(cmsg_bytes_t src) { + cmsg_bytes_t result; + result.len = src.len; + result.bytes = malloc(src.len); + if (result.bytes != NULL) { + memcpy(result.bytes, src.bytes, src.len); + } + return result; +} + +cmsg_bytes_t cmsg_bytes_malloc(size_t amount) { + cmsg_bytes_t result; + result.len = amount; + result.bytes = malloc(amount); + return result; +} + +void cmsg_bytes_free(cmsg_bytes_t bytes) { + free(bytes.bytes); +} + +void die(char const *format, ...) { + va_list vl; + va_start(vl, format); + fprintf(stderr, "ERROR: "); + vfprintf(stderr, format, vl); + va_end(vl); + exit(1); +} + +void warn(char const *format, ...) { + va_list vl; + va_start(vl, format); + fprintf(stderr, "WARNING: "); + vfprintf(stderr, format, vl); + va_end(vl); +} + +void info(char const *format, ...) { + va_list vl; + va_start(vl, format); + fprintf(stderr, "INFO: "); + vfprintf(stderr, format, vl); + va_end(vl); +}