diff --git a/experiments/cmsg/.gitignore b/experiments/cmsg/.gitignore new file mode 100644 index 0000000..8c0763f --- /dev/null +++ b/experiments/cmsg/.gitignore @@ -0,0 +1,5 @@ +*.o +messages.h +messages.c +cmsg +depend.mk diff --git a/experiments/cmsg/Makefile b/experiments/cmsg/Makefile new file mode 100644 index 0000000..d225937 --- /dev/null +++ b/experiments/cmsg/Makefile @@ -0,0 +1,45 @@ +TARGET = cmsg +OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \ + queue.o direct.o fanout.o subscription.o meta.o messages.o + +UUID_CFLAGS:=$(shell uuid-config --cflags) +UUID_LDFLAGS:=$(shell uuid-config --ldflags) + +# grr +ifeq ($(shell uname -s),Darwin) +UUID_LIB=uuid +else +UUID_LIB=ossp-uuid +endif + +#CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O0 -g $(UUID_CFLAGS) +CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O3 $(UUID_CFLAGS) +#CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O3 -static $(UUID_CFLAGS) + +all: $(TARGET) + +$(TARGET): $(OBJECTS) + $(CC) $(CFLAGS) $(UUID_LDFLAGS) -o $@ $(OBJECTS) -l$(UUID_LIB) -levent +# $(CC) $(CFLAGS) $(UUID_LDFLAGS) -o $@ $(OBJECTS) -l$(UUID_LIB) -levent -lrt + +%.o: %.c + $(CC) $(CFLAGS) -c $< + +messages.c: ../../protocol/messages.json codegen.py + python codegen.py body > $@ + +messages.h: ../../protocol/messages.json codegen.py + python codegen.py header > $@ + +clean: + rm -f $(TARGET) + rm -f $(OBJECTS) + rm -rf *.dSYM + rm -f depend.mk messages.c messages.h + +depend.mk: + touch messages.h + gcc $(CFLAGS) -M *.c > $@ + rm messages.h + echo "depend.mk:" Makefile *.c >> $@ +-include depend.mk diff --git a/experiments/cmsg/TODO b/experiments/cmsg/TODO new file mode 100644 index 0000000..8caef68 --- /dev/null +++ b/experiments/cmsg/TODO @@ -0,0 +1,59 @@ +Cope with possibility of duplicate uuid, in e.g. queue/fanout/direct. +If a subscription token matches an existing subscription, there's a +collision, so choose a new token until there's no collision. + +SAX-style sexp reader/writer, so that we can do something sensible for +enormous (e.g. gigabyte-sized) messages. + +The "meta" exchange probably wants to be a topic exchange. Or +something. + +The "meta" exchange probably wants to emit how-things-are-now messages +when people subscribe to it, to get them started; a kind of +last-value-cache type thing. + +Website + - http://www.flickr.com/photos/elemishra/158211069/ + +Switch from indirect scheduling to direct scheduling + - before, on walk: 173kHz test1/test3 + - after, no change. Probably because we only have a single thread + active in the system when running test1/test3, so every time some + work comes in, it baton-passes to the next guy in the chain. The + change *would* be an improvement but we have a separate worklist + (to process now) and runlist (to process after polling for events + once) so it always goes back to the scheduler because the worklist + is never longer than one. + - Revisit once we start testing multiple concurrent streams. + + +Extension to Sexps: + + - ! : binds the simplestring to the value when + seen. Note that the new binding for simplestring is NOT in scope + during parsing of the value, so no circular data can be constructed + this way. Any previous binding is discarded *after* the value is + completely read. + + - ? : retrieves the bound value of the simplestring. + +So + !1:a11:hello world(?1:a1: ?1:a1: ?1:a) +is equivalent to + (11:hello world1: 11:hello world1: 11:hello world) + +And, more to the point, after a receiver has seen + !1:a36:af49f5dc-0454-4ba1-9f48-a55e9c10ee35 +then a simple + ?1:a +is all that's needed to refer to that gargantuan UUID. + +Another example: + !1:a()!1:a(?1:a?1:a)!1:a(?1:a?1:a)?1:a +is equivalent to + ((()())(()())) + + +"Having metrics around as many things as possible really helped us + identify a difficult problem to diagnose." + -- https://github.com/blog/767-recent-services-interruptions diff --git a/experiments/cmsg/cmsg_private.h b/experiments/cmsg/cmsg_private.h new file mode 100644 index 0000000..b340dad --- /dev/null +++ b/experiments/cmsg/cmsg_private.h @@ -0,0 +1,55 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_private_h +#define cmsg_private_h + +typedef struct cmsg_bytes_t { + size_t len; + unsigned char *bytes; +} cmsg_bytes_t; + +#define CMSG_BYTES(length, bytes_ptr) ((cmsg_bytes_t) { \ + .len = (length), \ + .bytes = (unsigned char *) (bytes_ptr) \ + }) +#define EMPTY_BYTES CMSG_BYTES(0, NULL) + +static inline cmsg_bytes_t cmsg_cstring_bytes(char const *cstr) { + cmsg_bytes_t result; + result.len = strlen(cstr); + result.bytes = (void *) cstr; + return result; +} + +#define CMSG_UUID_BUF_SIZE 36 +extern int gen_uuid(unsigned char *uuid_buf); /* must be exactly CMSG_UUID_BUF_SIZE bytes long */ + +extern cmsg_bytes_t cmsg_bytes_malloc_dup(cmsg_bytes_t src); +extern cmsg_bytes_t cmsg_bytes_malloc(size_t amount); +extern void cmsg_bytes_free(cmsg_bytes_t bytes); +extern int cmsg_bytes_cmp(cmsg_bytes_t a, cmsg_bytes_t b); + +#define ICHECK(result, message) do { if ((result) == -1) { perror(message); exit(2); } } while (0) +#define BCHECK(result, message) do { if ((result) == 0) { perror(message); exit(2); } } while (0) +#define PCHECK(result, message) do { if ((result) == NULL) { perror(message); exit(2); } } while (0) + +extern __attribute__((noreturn)) void die(char const *format, ...); +extern void warn(char const *format, ...); +extern void info(char const *format, ...); + +#endif diff --git a/experiments/cmsg/codegen.py b/experiments/cmsg/codegen.py new file mode 100644 index 0000000..fa1077e --- /dev/null +++ b/experiments/cmsg/codegen.py @@ -0,0 +1,129 @@ +from __future__ import with_statement + +## Copyright 2010, 2011, 2012 Tony Garnock-Jones . +## +## This file is part of Hop. +## +## Hop is free software: you can redistribute it and/or modify it +## under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## Hop is distributed in the hope that it will be useful, but WITHOUT +## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +## License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Hop. If not, see . + +copyright_stmt = \ +'''/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +''' + +import sys +import json + +def cify(s): + s = s.replace('-', '_') + s = s.replace(' ', '_') + return s + +class MessageType: + def __init__(self, j): + self.wire_selector = j['selector'] + self.selector = cify(self.wire_selector) + self.wire_argnames = j['args'] + self.argnames = map(cify, self.wire_argnames) + + def format_args(self, template, separator = ', '): + return separator.join([template % (x,) for x in self.argnames]) + +with file("../../protocol/messages.json") as f: + spec = map(MessageType, json.load(f)['definitions']) + +def entrypoint_header(): + print copyright_stmt + print + print '#ifndef cmsg_messages_h' + print '#define cmsg_messages_h' + print + print 'extern void init_messages(void);' + print + for t in spec: + print 'extern sexp_t *selector_%s;' % (t.selector,) + print + for t in spec: + print 'extern sexp_t *message_%s(%s);' % (t.selector, t.format_args('sexp_t *%s')) + print + print 'typedef union parsed_message_t_ {' + for t in spec: + if t.argnames: + print ' struct { sexp_t %s; } %s;' % (t.format_args('*%s'), t.selector) + print '} parsed_message_t;' + for t in spec: + print + print 'static inline int parse_%s(sexp_t *message, parsed_message_t *out) {' % \ + (t.selector,) + print ' if (!sexp_pairp(message)) return 0;' + print ' if (sexp_cmp(sexp_head(message), selector_%s) != 0) return 0;' % (t.selector,) + for n in t.argnames: + print ' if (!sexp_pseudo_pop(&message)) return 0;' + print ' out->%s.%s = sexp_head(message);' % (t.selector, n) + print ' return sexp_tail(message) == NULL;' + print '}' + print + print '#endif' + +def entrypoint_body(): + print copyright_stmt + print + print '#include ' + print '#include ' + print '#include ' + print '#include ' + print + print '#include ' + print + print '#include "cmsg_private.h"' + print '#include "ref.h"' + print '#include "sexp.h"' + print '#include "messages.h"' + print + for t in spec: + print 'sexp_t *selector_%s = NULL;' % (t.selector,) + print + print 'void init_messages(void) {' + for t in spec: + print ' selector_%s = sexp_cstring("%s");' % (t.selector, t.wire_selector) + for t in spec: + print ' INCREF(selector_%s);' % (t.selector,) + print '}' + for t in spec: + print + print 'sexp_t *message_%s(%s) {' % (t.selector, t.format_args('sexp_t *%s')) + print ' sexp_t *m = NULL;' + for n in reversed(t.argnames): + print ' m = sexp_cons(%s, m);' % (n,) + print ' return sexp_cons(selector_%s, m);' % (t.selector,) + print '}' + +if __name__ == '__main__': + drivername = sys.argv[1] + globals()['entrypoint_' + drivername]() diff --git a/experiments/cmsg/dataq.c b/experiments/cmsg/dataq.c new file mode 100644 index 0000000..9c4eaa2 --- /dev/null +++ b/experiments/cmsg/dataq.c @@ -0,0 +1,66 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include + +#include "dataq.h" + +#define QLINK(q,x) (*((void **)(((char *) x) + (q)->link_offset))) + +void enqueue(queue_t *q, void *x) { + QLINK(q, x) = NULL; + if (q->head == NULL) { + q->head = x; + } else { + QLINK(q, q->tail) = x; + } + q->tail = x; + q->count++; +} + +void *dequeue(queue_t *q) { + if (q->head == NULL) { + return NULL; + } else { + void *x = q->head; + q->head = QLINK(q, x); + QLINK(q, x) = NULL; + if (q->head == NULL) { + q->tail = NULL; + } + q->count--; + return x; + } +} + +void queue_append(queue_t *q1, queue_t *q2) { + assert(q1->link_offset == q2->link_offset); + + if (q2->head != NULL) { + if (q1->head != NULL) { + QLINK(q1, q1->tail) = q2->head; + } else { + q1->head = q2->head; + } + q1->tail = q2->tail; + q2->head = NULL; + q2->tail = NULL; + } +} diff --git a/experiments/cmsg/dataq.h b/experiments/cmsg/dataq.h new file mode 100644 index 0000000..33304fa --- /dev/null +++ b/experiments/cmsg/dataq.h @@ -0,0 +1,36 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_dataq_h +#define cmsg_dataq_h + +typedef struct queue_t_ { + size_t link_offset; + int count; + void *head; + void *tail; +} queue_t; + +#define EMPTY_QUEUE(element_t, link_field_name) \ + ((queue_t) { offsetof(element_t, link_field_name), 0, NULL, NULL }) + +extern void enqueue(queue_t *q, void *x); +extern void *dequeue(queue_t *q); + +extern void queue_append(queue_t *q1, queue_t *q2); + +#endif diff --git a/experiments/cmsg/direct.c b/experiments/cmsg/direct.c new file mode 100644 index 0000000..01dfe04 --- /dev/null +++ b/experiments/cmsg/direct.c @@ -0,0 +1,125 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "ref.h" +#include "sexp.h" +#include "hashtable.h" +#include "node.h" +#include "messages.h" +#include "subscription.h" +#include "sexpio.h" + +typedef struct direct_extension_t_ { + sexp_t *name; + hashtable_t routing_table; + hashtable_t subscriptions; +} direct_extension_t; + +static sexp_t *direct_extend(node_t *n, sexp_t *args) { + if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { + cmsg_bytes_t name = sexp_data(sexp_head(args)); + direct_extension_t *d = calloc(1, sizeof(*d)); + d->name = INCREF(sexp_head(args)); + init_hashtable(&d->routing_table, 5, NULL, NULL); + init_hashtable(&d->subscriptions, 5, NULL, NULL); + + n->extension = d; + return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); + } else { + return sexp_cstring("invalid args"); + } +} + +static void free_direct_chain(void *context, cmsg_bytes_t key, void *value) { + free_subscription_chain(value); +} + +static void direct_destructor(node_t *n) { + direct_extension_t *d = n->extension; + if (d != NULL) { /* can be NULL if direct_extend was given invalid args */ + DECREF(d->name, sexp_destructor); + hashtable_foreach(&d->routing_table, free_direct_chain, NULL); + destroy_hashtable(&d->routing_table); + destroy_hashtable(&d->subscriptions); + free(d); + } +} + +static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) { + subscription_t *chain = NULL; + subscription_t *newchain; + hashtable_get(&d->routing_table, sexp_data(rk), (void **) &chain); + newchain = send_to_subscription_chain(d->name, &d->subscriptions, chain, body); + if (newchain != chain) { + hashtable_put(&d->routing_table, sexp_data(rk), newchain); + } +} + +static void direct_handle_message(node_t *n, sexp_t *m) { + direct_extension_t *d = n->extension; + parsed_message_t p; + + if (parse_post(m, &p)) { + if (sexp_stringp(p.post.name)) { + route_message(d, p.post.name, p.post.body); + } else { + warn("Non-string routing key in direct\n"); + } + return; + } + + if (parse_subscribe(m, &p)) { + subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, &p); + if (sub != NULL) { + hashtable_get(&d->routing_table, sexp_data(p.subscribe.filter), (void **) &sub->link); + hashtable_put(&d->routing_table, sexp_data(p.subscribe.filter), sub); + } + return; + } + + if (parse_unsubscribe(m, &p)) { + handle_unsubscribe_message(d->name, &d->subscriptions, &p); + return; + } + + warn("Message not understood in direct: "); + sexp_writeln(stderr_h, m); +} + +static node_class_t direct_class = { + .name = "direct", + .extend = direct_extend, + .destroy = direct_destructor, + .handle_message = direct_handle_message +}; + +void init_direct(void) { + register_node_class(&direct_class); +} diff --git a/experiments/cmsg/direct.h b/experiments/cmsg/direct.h new file mode 100644 index 0000000..9c5a18c --- /dev/null +++ b/experiments/cmsg/direct.h @@ -0,0 +1,23 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_direct_h +#define cmsg_direct_h + +extern void init_direct(void); + +#endif diff --git a/experiments/cmsg/fanout.c b/experiments/cmsg/fanout.c new file mode 100644 index 0000000..acc4468 --- /dev/null +++ b/experiments/cmsg/fanout.c @@ -0,0 +1,113 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "ref.h" +#include "sexp.h" +#include "hashtable.h" +#include "node.h" +#include "messages.h" +#include "subscription.h" +#include "sexpio.h" + +typedef struct fanout_extension_t_ { + sexp_t *name; + hashtable_t subscriptions; +} fanout_extension_t; + +static sexp_t *fanout_extend(node_t *n, sexp_t *args) { + if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { + cmsg_bytes_t name = sexp_data(sexp_head(args)); + fanout_extension_t *f = calloc(1, sizeof(*f)); + f->name = INCREF(sexp_head(args)); + init_hashtable(&f->subscriptions, 5, NULL, (void (*)(void *)) free_subscription); + + n->extension = f; + return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); + } else { + return sexp_cstring("invalid args"); + } +} + +static void fanout_destructor(node_t *n) { + fanout_extension_t *f = n->extension; + if (f != NULL) { /* can be NULL if fanout_extend was given invalid args */ + DECREF(f->name, sexp_destructor); + destroy_hashtable(&f->subscriptions); + free(f); + } +} + +struct delivery_context { + fanout_extension_t *f; + sexp_t *body; +}; + +static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) { + struct delivery_context *context = contextv; + subscription_t *sub = subv; + send_to_subscription(context->f->name, &context->f->subscriptions, sub, context->body); +} + +static void fanout_handle_message(node_t *n, sexp_t *m) { + fanout_extension_t *f = n->extension; + parsed_message_t p; + + if (parse_post(m, &p)) { + struct delivery_context context; + context.f = f; + context.body = p.post.body; + hashtable_foreach(&f->subscriptions, send_to_sub, &context); + return; + } + + if (parse_subscribe(m, &p)) { + handle_subscribe_message(f->name, &f->subscriptions, &p); + return; + } + + if (parse_unsubscribe(m, &p)) { + handle_unsubscribe_message(f->name, &f->subscriptions, &p); + return; + } + + warn("Message not understood in fanout: "); + sexp_writeln(stderr_h, m); +} + +static node_class_t fanout_class = { + .name = "fanout", + .extend = fanout_extend, + .destroy = fanout_destructor, + .handle_message = fanout_handle_message +}; + +void init_fanout(void) { + register_node_class(&fanout_class); +} diff --git a/experiments/cmsg/fanout.h b/experiments/cmsg/fanout.h new file mode 100644 index 0000000..aa07c36 --- /dev/null +++ b/experiments/cmsg/fanout.h @@ -0,0 +1,23 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_fanout_h +#define cmsg_fanout_h + +extern void init_fanout(void); + +#endif diff --git a/experiments/cmsg/harness.c b/experiments/cmsg/harness.c new file mode 100644 index 0000000..d193ea5 --- /dev/null +++ b/experiments/cmsg/harness.c @@ -0,0 +1,334 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include +#include + +#include + +#include + +typedef unsigned char u_char; +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "dataq.h" + +#include + +/* What is a sane value for STACK_SIZE? */ +#ifdef __APPLE__ +/* Bollocks. Looks like OS X chokes unless STACK_SIZE is a multiple of 32k. */ +# include +# if !defined(MAC_OS_X_VERSION_10_6) || MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_6 +/* Hmm, and looks like 10.5 has more aggressive stack requirements than 10.6. */ +# define STACK_SIZE 65536 +# else +# define STACK_SIZE 32768 +# endif +#elif linux +# define STACK_SIZE 32768 +#else +# error Define STACK_SIZE for your platform. It should probably not be less than 32k? +#endif + +/* TODO: reuse stacks (via a freelist) */ +/* TODO: investigate avoiding syscall in swapcontext, setcontext etc. */ + +IOHandle *stdin_h = NULL; +IOHandle *stdout_h = NULL; +IOHandle *stderr_h = NULL; + +static volatile int harness_running = 1; +Process *current_process = NULL; + +#define EMPTY_PROCESS_QUEUE EMPTY_QUEUE(Process, link) + +static ucontext_t scheduler; +static queue_t runlist = EMPTY_PROCESS_QUEUE; +static queue_t deadlist = EMPTY_PROCESS_QUEUE; + +static void enqueue_runlist(Process *p) { + p->state = PROCESS_RUNNING; + enqueue(&runlist, p); +} + +static void schedule(void) { + //info("schedule %p\n", current_process); + if (current_process == NULL) { + ICHECK(setcontext(&scheduler), "schedule setcontext"); + } else { + ICHECK(swapcontext(¤t_process->context, &scheduler), "schedule swapcontext"); + } +} + +void yield(void) { + enqueue_runlist(current_process); + schedule(); +} + +void killproc(void) { + assert(current_process->state == PROCESS_RUNNING); + current_process->state = PROCESS_DEAD; + enqueue(&deadlist, current_process); + current_process = NULL; + schedule(); +} + +void suspend(void) { + assert(current_process->state == PROCESS_RUNNING); + current_process->state = PROCESS_WAITING; + schedule(); +} + +int resume(Process *p) { + if (p->state == PROCESS_WAITING) { + enqueue_runlist(p); + return 0; + } else { + return -1; + } +} + +static void driver(void (*f)(void *), void *arg) { + f(arg); + killproc(); +} + +Process *spawn(void (*f)(void *), void *arg) { + Process *p = calloc(1, sizeof(*p)); + PCHECK(p, "spawn calloc"); + + p->state = PROCESS_DEAD; + + p->stack_base = malloc(STACK_SIZE); + PCHECK(p->stack_base, "stack pointer malloc"); + + ICHECK(getcontext(&p->context), "spawn getcontext"); + p->context.uc_link = NULL; + p->context.uc_stack.ss_sp = p->stack_base; + p->context.uc_stack.ss_size = STACK_SIZE; + p->context.uc_stack.ss_flags = 0; + makecontext(&p->context, (void (*)(void)) driver, 2, f, arg); + + p->link = NULL; + + enqueue_runlist(p); + + return p; +} + +typedef struct nap_context_t_ { + Process *p; + int timeout_fired; +} nap_context_t; + +void nap_isr(int fd, short what, void *arg) { + nap_context_t *context = arg; + //info("nap_isr %p\n", p); + if ((context->p->state == PROCESS_WAITING) && (context->p->wait_flags & EV_TIMEOUT)) { + context->timeout_fired = 1; + enqueue_runlist(context->p); + } +} + +int nap(long millis) { + struct event ev; + struct timeval tv; + nap_context_t context; + assert(current_process != NULL); + assert(current_process->state == PROCESS_RUNNING); + context.p = current_process; + context.timeout_fired = 0; + tv.tv_sec = millis / 1000; + tv.tv_usec = (millis % 1000) * 1000; + evtimer_set(&ev, nap_isr, &context); + ICHECK(evtimer_add(&ev, &tv), "evtimer_add"); + current_process->state = PROCESS_WAITING; + current_process->wait_flags |= EV_TIMEOUT; + schedule(); + current_process->wait_flags &= ~EV_TIMEOUT; + evtimer_del(&ev); + return context.timeout_fired; +} + +static void awaken_waiters(IOHandle *h, short mask) { + Process *prev = NULL; + Process *p; + Process *next; + for (p = h->waiters; p != NULL; p = next) { + next = p->link; + assert(p->state == PROCESS_WAITING); + if ((p->wait_flags & mask) != 0) { + if (prev == NULL) { + h->waiters = next; + } else { + prev->link = next; + } + enqueue_runlist(p); + } else { + prev = p; + } + } +} + +static void input_isr(struct bufferevent *bufev, IOHandle *h) { + awaken_waiters(h, EV_READ); +} + +static void output_isr(struct bufferevent *bufev, IOHandle *h) { + awaken_waiters(h, EV_WRITE); +} + +static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) { + unsigned short kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE); + int saved_errno = errno; + info("error_isr 0x%04X fd %d\n", what, h->fd); + h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE); + if (kind == EVBUFFER_EOF) { + h->eof = 1; + } else { + h->error_kind = kind; + h->error_errno = saved_errno; + } + awaken_waiters(h, EV_READ | EV_WRITE); +} + +IOHandle *new_iohandle(int fd) { + IOHandle *h = malloc(sizeof(*h)); + h->waiters = NULL; + h->fd = fd; + h->io = bufferevent_new(fd, + (evbuffercb) input_isr, + (evbuffercb) output_isr, + (everrorcb) error_isr, + h); + PCHECK(h->io, "bufferevent_new"); + bufferevent_setwatermark(h->io, EV_READ, 0, 256 * 1024); + h->eof = 0; + iohandle_clear_error(h); + return h; +} + +void delete_iohandle(IOHandle *h) { + if (h->waiters) { + warn("Deleting IOHandle %p with fd %d: processes are blocked on this handle!\n", + h, + h->fd); + } + bufferevent_free(h->io); + free(h); +} + +void iohandle_clear_error(IOHandle *h) { + h->error_direction = 0; + h->error_kind = 0; + h->error_errno = 0; +} + +static void block_on_io(IOHandle *h, short event) { + assert(current_process->link == NULL); + current_process->link = h->waiters; + h->waiters = current_process; + current_process->state = PROCESS_WAITING; + current_process->wait_flags |= event; + schedule(); + current_process->wait_flags &= ~event; +} + +cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) { + while (EVBUFFER_LENGTH(h->io->input) < at_least) { + if (h->eof || h->error_kind) { + return EMPTY_BYTES; + } + ICHECK(bufferevent_enable(h->io, EV_READ), "bufferevent_enable"); + block_on_io(h, EV_READ); + ICHECK(bufferevent_disable(h->io, EV_READ), "bufferevent_disable"); + } + return CMSG_BYTES(EVBUFFER_LENGTH(h->io->input), EVBUFFER_DATA(h->io->input)); +} + +void iohandle_drain(IOHandle *h, size_t count) { + evbuffer_drain(h->io->input, count); +} + +void iohandle_write(IOHandle *h, cmsg_bytes_t buf) { + ICHECK(bufferevent_write(h->io, buf.bytes, buf.len), "bufferevent_write"); +} + +int iohandle_flush(IOHandle *h) { + while (EVBUFFER_LENGTH(h->io->output) > 0) { + if (h->error_kind) { + return -1; + } + block_on_io(h, EV_WRITE); + } + return 0; +} + +void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write) { + bufferevent_settimeout(h->io, timeout_read, timeout_write); +} + +static void clean_dead_processes(void) { + Process *deadp; + while ((deadp = dequeue(&deadlist)) != NULL) { + free(deadp->stack_base); + free(deadp); + } +} + +void boot_harness(void) { + stdin_h = new_iohandle(0); + stdout_h = new_iohandle(1); + stderr_h = new_iohandle(2); + + ICHECK(getcontext(&scheduler), "boot_harness getcontext"); + + while (1) { + while (runlist.count) { + queue_t work = runlist; + runlist = EMPTY_PROCESS_QUEUE; + //info("Processing %d jobs\n", work.count); + while ((current_process = dequeue(&work)) != NULL) { + //info("entering %p\n", current_process); + ICHECK(swapcontext(&scheduler, ¤t_process->context), "boot_harness swapcontext"); + clean_dead_processes(); + } + //info("Polling for events\n"); + event_loop(EVLOOP_NONBLOCK); + } + if (!harness_running) break; + //info("Blocking for events\n"); + event_loop(EVLOOP_ONCE); + } + + info("Shutting down.\n"); + + delete_iohandle(stdin_h); + delete_iohandle(stdout_h); + delete_iohandle(stderr_h); +} + +void interrupt_harness(void) { + info("Interrupting harness\n"); + harness_running = 0; +} diff --git a/experiments/cmsg/harness.h b/experiments/cmsg/harness.h new file mode 100644 index 0000000..65eb14a --- /dev/null +++ b/experiments/cmsg/harness.h @@ -0,0 +1,72 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_harness_h +#define cmsg_harness_h + +typedef void (*process_main_t)(void *); + +typedef enum process_state_t_ { + PROCESS_DEAD = 0, + PROCESS_RUNNING, + PROCESS_WAITING +} process_state_t; + +typedef struct Process { + process_state_t state; + int wait_flags; + void *stack_base; + ucontext_t context; + struct Process *link; +} Process; + +typedef struct IOHandle { + Process *waiters; + int fd; + struct bufferevent *io; + int eof; + unsigned short error_direction; + unsigned short error_kind; + int error_errno; +} IOHandle; + +extern IOHandle *stdin_h; +extern IOHandle *stdout_h; +extern IOHandle *stderr_h; + +extern Process *current_process; + +extern void yield(void); +extern Process *spawn(process_main_t f, void *arg); +extern int nap(long millis); /* 1 for timeout expired; 0 for resumed early */ + +extern void suspend(void); +extern int resume(Process *p); + +extern IOHandle *new_iohandle(int fd); +extern void delete_iohandle(IOHandle *h); +extern void iohandle_clear_error(IOHandle *h); +extern cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least); +extern void iohandle_drain(IOHandle *h, size_t count); +extern void iohandle_write(IOHandle *h, cmsg_bytes_t buf); +extern int iohandle_flush(IOHandle *h); +extern void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write); + +extern void boot_harness(void); +extern void interrupt_harness(void); + +#endif diff --git a/experiments/cmsg/hashtable.c b/experiments/cmsg/hashtable.c new file mode 100644 index 0000000..566c4c2 --- /dev/null +++ b/experiments/cmsg/hashtable.c @@ -0,0 +1,159 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include + +#include + +#include "cmsg_private.h" +#include "hashtable.h" + +uint32_t hash_bytes(cmsg_bytes_t bytes) { + /* http://en.wikipedia.org/wiki/Jenkins_hash_function */ + uint32_t hash = 0; + size_t i; + + for (i = 0; i < bytes.len; i++) { + hash += bytes.bytes[i]; + hash += (hash << 10); + hash ^= (hash >> 6); + } + hash += (hash << 3); + hash ^= (hash >> 11); + hash += (hash << 15); + return hash; +} + +void init_hashtable(hashtable_t *table, + size_t initial_bucket_count, + void *(*dup_value)(void *), + void (*free_value)(void *)) +{ + table->bucket_count = initial_bucket_count; + table->entry_count = 0; + table->buckets = NULL; + table->dup_value = dup_value; + table->free_value = free_value; + + if (initial_bucket_count > 0) { + table->buckets = calloc(initial_bucket_count, sizeof(hashtable_entry_t *)); + } +} + +static void destroy_entry(hashtable_t *table, hashtable_entry_t *entry) { + cmsg_bytes_free(entry->key); + if (table->free_value != NULL) { + table->free_value(entry->value); + } + free(entry); +} + +void destroy_hashtable(hashtable_t *table) { + if (table->buckets != NULL) { + int i; + for (i = 0; i < table->bucket_count; i++) { + hashtable_entry_t *chain = table->buckets[i]; + table->buckets[i] = NULL; + while (chain != NULL) { + hashtable_entry_t *next = chain->next; + destroy_entry(table, chain); + chain = next; + } + } + free(table->buckets); + } +} + +static hashtable_entry_t **hashtable_find(hashtable_t *table, cmsg_bytes_t key) { + uint32_t h = hash_bytes(key) % table->bucket_count; + hashtable_entry_t **entryptr = &(table->buckets[h]); + hashtable_entry_t *entry = *entryptr; + while (entry != NULL) { + if ((entry->key.len == key.len) && !memcmp(entry->key.bytes, key.bytes, key.len)) { + break; + } + entryptr = &entry->next; + entry = *entryptr; + } + return entryptr; +} + +int hashtable_contains(hashtable_t *table, cmsg_bytes_t key) { + hashtable_entry_t **entryptr = hashtable_find(table, key); + return (*entryptr != NULL); +} + +int hashtable_get(hashtable_t *table, cmsg_bytes_t key, void **valueptr) { + hashtable_entry_t **entryptr = hashtable_find(table, key); + if (*entryptr == NULL) { + return 0; + } else { + *valueptr = (*entryptr)->value; + return 1; + } +} + +int hashtable_put(hashtable_t *table, cmsg_bytes_t key, void *value) { + /* TODO: grow and rehash */ + hashtable_entry_t **entryptr = hashtable_find(table, key); + if (*entryptr == NULL) { + hashtable_entry_t *entry = malloc(sizeof(hashtable_entry_t)); + entry->next = NULL; + entry->key = cmsg_bytes_malloc_dup(key); + entry->value = (table->dup_value == NULL) ? value : table->dup_value(value); + *entryptr = entry; + table->entry_count++; + return 1; + } else { + if (table->free_value != NULL) { + table->free_value((*entryptr)->value); + } + (*entryptr)->value = (table->dup_value == NULL) ? value : table->dup_value(value); + return 0; + } +} + +int hashtable_erase(hashtable_t *table, cmsg_bytes_t key) { + hashtable_entry_t **entryptr = hashtable_find(table, key); + if (*entryptr == NULL) { + return 0; + } else { + hashtable_entry_t *entry = *entryptr; + *entryptr = entry->next; + destroy_entry(table, entry); + table->entry_count--; + return 1; + } +} + +void hashtable_foreach(hashtable_t *table, + hashtable_iterator_t iterator, + void *context) +{ + int i; + for (i = 0; i < table->bucket_count; i++) { + hashtable_entry_t *chain = table->buckets[i]; + while (chain != NULL) { + hashtable_entry_t *next = chain->next; + iterator(context, chain->key, chain->value); + chain = next; + } + } +} diff --git a/experiments/cmsg/hashtable.h b/experiments/cmsg/hashtable.h new file mode 100644 index 0000000..dfcecfc --- /dev/null +++ b/experiments/cmsg/hashtable.h @@ -0,0 +1,53 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_hashtable_h +#define cmsg_hashtable_h + +typedef struct hashtable_entry_t_ { + struct hashtable_entry_t_ *next; + cmsg_bytes_t key; + void *value; +} hashtable_entry_t; + +typedef struct hashtable_t_ { + size_t bucket_count; + size_t entry_count; + hashtable_entry_t **buckets; + void *(*dup_value)(void *); + void (*free_value)(void *); +} hashtable_t; + +typedef void (*hashtable_iterator_t)(void *context, cmsg_bytes_t key, void *value); + +extern uint32_t hash_bytes(cmsg_bytes_t bytes); + +extern void init_hashtable(hashtable_t *table, + size_t initial_bucket_count, + void *(*dup_value)(void *), + void (*free_value)(void *)); +extern void destroy_hashtable(hashtable_t *table); + +extern int hashtable_contains(hashtable_t *table, cmsg_bytes_t key); +extern int hashtable_get(hashtable_t *table, cmsg_bytes_t key, void **valueptr); +extern int hashtable_put(hashtable_t *table, cmsg_bytes_t key, void *value); +extern int hashtable_erase(hashtable_t *table, cmsg_bytes_t key); +extern void hashtable_foreach(hashtable_t *table, + hashtable_iterator_t iterator, + void *context); + +#endif diff --git a/experiments/cmsg/main.c b/experiments/cmsg/main.c new file mode 100644 index 0000000..e3f0e41 --- /dev/null +++ b/experiments/cmsg/main.c @@ -0,0 +1,125 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include + +#include +#include + +#include + +typedef unsigned char u_char; +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "net.h" +#include "ref.h" +#include "sexp.h" +#include "hashtable.h" +#include "node.h" +#include "queue.h" +#include "direct.h" +#include "fanout.h" +#include "relay.h" +#include "meta.h" +#include "messages.h" +#include "sexpio.h" + +#define WANT_CONSOLE_LISTENER 1 + +static void factory_handle_message(node_t *n, sexp_t *m) { + parsed_message_t p; + + if (parse_create(m, &p)) { + if (sexp_stringp(p.create.classname) + && sexp_stringp(p.create.reply_sink) + && sexp_stringp(p.create.reply_name)) { + cmsg_bytes_t classname_bytes = sexp_data(p.create.classname); + node_class_t *nc = lookup_node_class(classname_bytes); + if (nc == NULL) { + warn("Node class not found <<%.*s>>\n", classname_bytes.len, classname_bytes.bytes); + } else { + sexp_t *error = NULL; + sexp_t *reply; + if (new_node(nc, p.create.arg, &error) != NULL) { + reply = message_create_ok(NULL); + } else { + reply = message_create_failed(error); + } + post_node(sexp_data(p.create.reply_sink), + sexp_data(p.create.reply_name), + reply, + sexp_empty_bytes); + } + } + return; + } + + warn("Message not understood in factory: "); + sexp_writeln(stderr_h, m); +} + +static node_class_t factory_class = { + .name = "factory", + .extend = NULL, + .destroy = NULL, + .handle_message = factory_handle_message +}; + +static void init_factory(void) { + bind_node(cmsg_cstring_bytes("factory"), new_node(&factory_class, NULL, NULL)); +} + +#if WANT_CONSOLE_LISTENER +static void console_listener(void *arg) { + IOHandle *in_handle = new_iohandle(0); + while (1) { + cmsg_bytes_t buf = iohandle_readwait(in_handle, 1); + if (buf.len == 0) break; + iohandle_drain(in_handle, buf.len); + } + delete_iohandle(in_handle); + interrupt_harness(); +} +#endif + +int main(int argc, char *argv[]) { + info("cmsg ALPHA, Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved.\n"); + event_init(); + signal(SIGPIPE, SIG_IGN); /* avoid EPIPE when connections drop unexpectedly */ + info("Using libevent version %s\n", event_get_version()); + init_sexp(); + init_messages(); + init_node(cmsg_cstring_bytes("server")); + init_factory(); + init_queue(); + init_direct(); + init_fanout(); + init_relay(); + init_meta(); +#if WANT_CONSOLE_LISTENER + spawn(console_listener, NULL); +#endif + start_net(5671); + boot_harness(); + done_sexp(); + return 0; +} diff --git a/experiments/cmsg/meta.c b/experiments/cmsg/meta.c new file mode 100644 index 0000000..5c8f443 --- /dev/null +++ b/experiments/cmsg/meta.c @@ -0,0 +1,54 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include +#include +#include + +#include + +#include "cmsg_private.h" +#include "ref.h" +#include "sexp.h" +#include "hashtable.h" +#include "node.h" +#include "meta.h" +#include "messages.h" + +void init_meta(void) { + sexp_t *args; + args = INCREF(sexp_cons(sexp_cstring("meta"), NULL)); + new_node(lookup_node_class(cmsg_cstring_bytes("direct")), args, NULL); + DECREF(args, sexp_destructor); +} + +void announce_subscription(sexp_t *source, + sexp_t *filter, + sexp_t *sink, + sexp_t *name, + int onoff) +{ + post_node(cmsg_cstring_bytes("meta"), + sexp_data(source), + onoff + ? message_subscribed(source, filter, sink, name) + : message_unsubscribed(source, filter, sink, name), + NULL); +} diff --git a/experiments/cmsg/meta.h b/experiments/cmsg/meta.h new file mode 100644 index 0000000..6d423df --- /dev/null +++ b/experiments/cmsg/meta.h @@ -0,0 +1,29 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_meta_h +#define cmsg_meta_h + +extern void init_meta(void); + +extern void announce_subscription(sexp_t *source, + sexp_t *filter, + sexp_t *sink, + sexp_t *name, + int onoff); + +#endif diff --git a/experiments/cmsg/misc/direct_scheduling.patch b/experiments/cmsg/misc/direct_scheduling.patch new file mode 100644 index 0000000..ba52588 --- /dev/null +++ b/experiments/cmsg/misc/direct_scheduling.patch @@ -0,0 +1,74 @@ +diff --git a/harness.c b/harness.c +index 9c891b3..74061af 100644 +--- a/harness.c ++++ b/harness.c +@@ -50,18 +50,38 @@ Process *current_process = NULL; + static ucontext_t scheduler; + static queue_t runlist = EMPTY_PROCESS_QUEUE; + static queue_t deadlist = EMPTY_PROCESS_QUEUE; ++static queue_t current_worklist = EMPTY_PROCESS_QUEUE; + + static void enqueue_runlist(Process *p) { + p->state = PROCESS_RUNNING; + enqueue(&runlist, p); + } + ++static void clean_dead_processes(void) { ++ Process *deadp; ++ while ((deadp = dequeue(&deadlist)) != NULL) { ++ free(deadp->stack_base); ++ free(deadp); ++ } ++} ++ + static void schedule(void) { +- //info("schedule %p\n", current_process); + if (current_process == NULL) { + ICHECK(setcontext(&scheduler), "schedule setcontext"); + } else { +- ICHECK(swapcontext(¤t_process->context, &scheduler), "schedule swapcontext"); ++ Process *current = current_process; ++ Process *target_process = dequeue(¤t_worklist); ++ ucontext_t *target; ++ ++ if (target_process == NULL) { ++ target = &scheduler; ++ } else { ++ target = &target_process->context; ++ current_process = target_process; ++ } ++ ++ clean_dead_processes(); /* safe because we know we're not dead ourselves at this point */ ++ ICHECK(swapcontext(¤t->context, target), "schedule swapcontext"); + } + } + +@@ -255,14 +275,6 @@ void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write) { + bufferevent_settimeout(h->io, timeout_read, timeout_write); + } + +-static void clean_dead_processes(void) { +- Process *deadp; +- while ((deadp = dequeue(&deadlist)) != NULL) { +- free(deadp->stack_base); +- free(deadp); +- } +-} +- + void boot_harness(void) { + stdin_h = new_iohandle(0); + stdout_h = new_iohandle(1); +@@ -272,10 +284,10 @@ void boot_harness(void) { + + while (1) { + while (runlist.count) { +- queue_t work = runlist; ++ current_worklist = runlist; + runlist = EMPTY_PROCESS_QUEUE; +- //info("Processing %d jobs\n", work.count); +- while ((current_process = dequeue(&work)) != NULL) { ++ //info("Processing %d jobs\n", current_worklist.count); ++ while ((current_process = dequeue(¤t_worklist)) != NULL) { + //info("entering %p\n", current_process); + ICHECK(swapcontext(&scheduler, ¤t_process->context), "boot_harness swapcontext"); + clean_dead_processes(); diff --git a/experiments/cmsg/misc/t0 b/experiments/cmsg/misc/t0 new file mode 100644 index 0000000..083b8e1 --- /dev/null +++ b/experiments/cmsg/misc/t0 @@ -0,0 +1,2 @@ +(9:subscribe5:test00:0:5:test05:login) +(4:post4:meta(9:subscribe0:5:test08:presence5:test01:k)0:) diff --git a/experiments/cmsg/misc/t1 b/experiments/cmsg/misc/t1 new file mode 100644 index 0000000..1687325 --- /dev/null +++ b/experiments/cmsg/misc/t1 @@ -0,0 +1,3 @@ +(9:subscribe5:test10:0:5:test15:login) +(4:post7:factory(6:create5:queue(2:q1)5:test11:k)0:) +(4:post2:q1(9:subscribe0:5:test18:consumer5:test11:k)0:) diff --git a/experiments/cmsg/misc/t2 b/experiments/cmsg/misc/t2 new file mode 100644 index 0000000..fc76e16 --- /dev/null +++ b/experiments/cmsg/misc/t2 @@ -0,0 +1,4 @@ +(9:subscribe5:test20:0:5:test25:login) +(4:post2:q1(4:post0:8:message10:)0:) +(4:post2:q1(4:post0:8:message20:)0:) +(11:unsubscribe5:test2) diff --git a/experiments/cmsg/misc/t4 b/experiments/cmsg/misc/t4 new file mode 100644 index 0000000..a304da2 --- /dev/null +++ b/experiments/cmsg/misc/t4 @@ -0,0 +1,4 @@ +(9:subscribe5:test40:0:5:test45:login) +(4:post7:factory(6:create6:direct(2:dx)5:test41:k)0:) +(4:post2:dx(9:subscribe1:a5:test48:consumer5:test41:k)0:) +(4:post2:dx(9:subscribe1:c5:test48:consumer5:test41:k)0:) diff --git a/experiments/cmsg/misc/t5 b/experiments/cmsg/misc/t5 new file mode 100644 index 0000000..12bedce --- /dev/null +++ b/experiments/cmsg/misc/t5 @@ -0,0 +1,7 @@ +(9:subscribe5:test50:0:5:test55:login) +(4:post7:factory(6:create6:direct(2:dx)5:test51:k)0:) +(4:post7:factory(6:create5:queue(2:q5)5:test51:k)0:) +(4:post2:q5(9:subscribe0:5:test59:consumer15:test51:k)0:) +(4:post2:q5(9:subscribe0:5:test59:consumer25:test51:k)0:) +(4:post2:dx(9:subscribe1:a2:q50:5:test51:k)0:) +(4:post2:dx(9:subscribe1:b2:q50:5:test51:k)0:) diff --git a/experiments/cmsg/misc/t6 b/experiments/cmsg/misc/t6 new file mode 100644 index 0000000..bda111c --- /dev/null +++ b/experiments/cmsg/misc/t6 @@ -0,0 +1,6 @@ +(9:subscribe5:test60:0:5:test65:login) +(4:post2:dx(4:post1:a9:messageA10:)0:) +(4:post2:dx(4:post1:a9:messageA20:)0:) +(4:post2:dx(4:post1:b8:messageB0:)0:) +(4:post2:dx(4:post1:c8:messageC0:)0:) +(11:unsubscribe5:test6) diff --git a/experiments/cmsg/net.c b/experiments/cmsg/net.c new file mode 100644 index 0000000..eb6c43c --- /dev/null +++ b/experiments/cmsg/net.c @@ -0,0 +1,116 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +typedef unsigned char u_char; +#include + +#include "cmsg_private.h" +#include "relay.h" + +static struct event accept_event; + +void get_addr_name(char *namebuf, size_t buflen, struct sockaddr_in const *sin) { + unsigned char *addr = (unsigned char *) &sin->sin_addr.s_addr; + struct hostent *h = gethostbyaddr(addr, 4, AF_INET); + + if (h == NULL) { + snprintf(namebuf, buflen, "%u.%u.%u.%u", addr[0], addr[1], addr[2], addr[3]); + } else { + snprintf(namebuf, buflen, "%s", h->h_name); + } +} + +void endpoint_name(struct sockaddr_in const *peername, cmsg_bytes_t result) { + char name[256]; + get_addr_name(name, sizeof(name), peername); + snprintf((char *) result.bytes, result.len, "%s:%d", name, ntohs(peername->sin_port)); +} + +static void accept_connection(int servfd, short what, void *arg) { + struct sockaddr_in s; + socklen_t addrlen = sizeof(s); + int fd = accept(servfd, (struct sockaddr *) &s, &addrlen); + + if (fd == -1) { + if (errno != EAGAIN && errno != EINTR) { + warn("accept: errno %d (%s)\n", errno, strerror(errno)); + } + return; + } + + { + int i = 1; + ICHECK(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i)), "setsockopt TCP_NODELAY"); + } + + start_relay(&s, fd); +} + +void start_net(int listen_port) { + int servfd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in s; + + if (servfd < 0) { + die("Could not open listen socket.\n"); + } + + s.sin_family = AF_INET; + s.sin_addr.s_addr = htonl(INADDR_ANY); + s.sin_port = htons(listen_port); + + { + int i = 1; + setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)); // don't care if this fails + } + + if (bind(servfd, (struct sockaddr *) &s, sizeof(s)) < 0) { + die("Could not bind listen socket.\n"); + } + + if (listen(servfd, 5) < 0) { + int savedErrno = errno; + die("Could not listen on listen socket (errno %d: %s).\n", + savedErrno, strerror(savedErrno)); + } + + event_set(&accept_event, servfd, EV_READ | EV_PERSIST, accept_connection, NULL); + if (event_add(&accept_event, NULL) == -1) { + die("Could not add accept_event."); + } + + info("Accepting connections on port %d.\n", listen_port); +} diff --git a/experiments/cmsg/net.h b/experiments/cmsg/net.h new file mode 100644 index 0000000..15332e0 --- /dev/null +++ b/experiments/cmsg/net.h @@ -0,0 +1,26 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_net_h +#define cmsg_net_h + +extern void get_addr_name(char *namebuf, size_t buflen, struct sockaddr_in const *sin); +extern void endpoint_name(struct sockaddr_in const *peername, cmsg_bytes_t result); + +extern void start_net(int listen_port); + +#endif diff --git a/experiments/cmsg/node.c b/experiments/cmsg/node.c new file mode 100644 index 0000000..f315aa7 --- /dev/null +++ b/experiments/cmsg/node.c @@ -0,0 +1,194 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include + +#include + +#include + +#include "cmsg_private.h" +#include "ref.h" +#include "sexp.h" +#include "harness.h" +#include "sexpio.h" +#include "hashtable.h" +#include "node.h" +#include "meta.h" +#include "messages.h" + +static cmsg_bytes_t _container_name; +static hashtable_t node_class_table; +static hashtable_t directory; + +static void *node_incref(void *arg) { + return INCREF((node_t *) arg); +} + +static void node_decref(void *arg) { + DECREF((node_t *) arg, node_destructor); +} + +void init_node(cmsg_bytes_t container_name) { + if (container_name.len == 0) { + unsigned char buf[CMSG_UUID_BUF_SIZE]; + gen_uuid(buf); + _container_name = cmsg_bytes_malloc_dup(CMSG_BYTES(CMSG_UUID_BUF_SIZE, buf)); + } else { + _container_name = cmsg_bytes_malloc_dup(container_name); + } + info("Local container name is <<%.*s>>\n", _container_name.len, _container_name.bytes); + + init_hashtable(&node_class_table, + 31, + NULL, + NULL); + init_hashtable(&directory, + 10007, + node_incref, + node_decref); +} + +cmsg_bytes_t local_container_name(void) { + return _container_name; +} + +void register_node_class(node_class_t *nc) { + cmsg_bytes_t key = cmsg_cstring_bytes(nc->name); + if (hashtable_contains(&node_class_table, key)) { + die("Duplicate node class name %s\n", nc->name); + } + hashtable_put(&node_class_table, key, nc); +} + +node_class_t *lookup_node_class(cmsg_bytes_t name) { + node_class_t *nc = NULL; + hashtable_get(&node_class_table, name, (void **) &nc); + return nc; +} + +static void init_node_names(node_t *n) { + init_hashtable(&n->names, 5, NULL, NULL); +} + +node_t *new_node(node_class_t *nc, sexp_t *args, sexp_t **error_out) { + node_t *n = malloc(sizeof(*n)); + n->refcount = ZERO_REFCOUNT(); + n->node_class = nc; + n->extension = NULL; + init_node_names(n); + if (nc->extend != NULL) { + sexp_t *error = nc->extend(n, args); + if (error != NULL) { + node_destructor(n); + if (error_out != NULL) { + *error_out = error; + } else { + warn("Creating node of class %s failed with ", nc->name); + sexp_writeln(stderr_h, error); + } + return NULL; + } + } + return n; +} + +void node_destructor(node_t *n) { + if (n->node_class->destroy != NULL) { + n->node_class->destroy(n); + } + unbind_all_names_for_node(n); + destroy_hashtable(&n->names); + free(n); +} + +node_t *lookup_node(cmsg_bytes_t name) { + node_t *n = NULL; + hashtable_get(&directory, name, (void **) &n); + return n; +} + +static void announce_binding(cmsg_bytes_t name, int onoff) { + sexp_t *filter = sexp_bytes(name); + INCREF(filter); + announce_subscription(sexp_empty_bytes, filter, sexp_empty_bytes, sexp_empty_bytes, onoff); + DECREF(filter, sexp_destructor); +} + +int bind_node(cmsg_bytes_t name, node_t *n) { + if (name.len == 0) { + warn("Binding to empty name forbidden\n"); + return 0; + } + if (hashtable_contains(&directory, name)) { + return 0; + } + hashtable_put(&directory, name, n); + hashtable_put(&n->names, name, NULL); + info("Binding node <<%.*s>> of class %s\n", name.len, name.bytes, n->node_class->name); + announce_binding(name, 1); + return 1; +} + +int unbind_node(cmsg_bytes_t name) { + node_t *n = NULL; + hashtable_get(&directory, name, (void **) &n); + if (n == NULL) { + return 0; + } else { + info("Unbinding node <<%.*s>> of class %s\n", name.len, name.bytes, n->node_class->name); + hashtable_erase(&n->names, name); + hashtable_erase(&directory, name); + announce_binding(name, 0); + return 1; + } +} + +static void unbind_on_destroy(void *context, cmsg_bytes_t key, void *value) { + unbind_node(key); +} + +void unbind_all_names_for_node(node_t *n) { + hashtable_t names = n->names; + init_node_names(n); + hashtable_foreach(&names, unbind_on_destroy, NULL); + destroy_hashtable(&names); +} + +int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token) { + return send_node_release(node, message_post(sexp_bytes(name), body, token)); +} + +int send_node(cmsg_bytes_t node, sexp_t *message) { + node_t *n = lookup_node(node); + if (n == NULL) { + return 0; + } + n->node_class->handle_message(n, message); + return 1; +} + +int send_node_release(cmsg_bytes_t node, sexp_t *message) { + int result; + INCREF(message); + result = send_node(node, message); + DECREF(message, sexp_destructor); + return result; +} diff --git a/experiments/cmsg/node.h b/experiments/cmsg/node.h new file mode 100644 index 0000000..608f5b4 --- /dev/null +++ b/experiments/cmsg/node.h @@ -0,0 +1,60 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_node_h +#define cmsg_node_h + +typedef struct node_t_ { + refcount_t refcount; + struct node_class_t_ *node_class; + hashtable_t names; + void *extension; /* Each node class puts something different here in its instances */ +} node_t; + +typedef sexp_t *(*node_extension_fn_t)(node_t *n, sexp_t *args); +typedef void (*node_destructor_fn_t)(node_t *n); +typedef void (*node_message_handler_fn_t)(node_t *n, sexp_t *m); + +typedef struct node_class_t_ { + char const *name; + node_extension_fn_t extend; + node_destructor_fn_t destroy; + node_message_handler_fn_t handle_message; +} node_class_t; + +extern void init_node(cmsg_bytes_t container_name); + +extern cmsg_bytes_t local_container_name(void); + +extern void basic_node_destroy(node_t *n); + +extern void register_node_class(node_class_t *nc); +extern node_class_t *lookup_node_class(cmsg_bytes_t name); + +extern node_t *new_node(node_class_t *nc, sexp_t *args, sexp_t **error_out); +extern void node_destructor(node_t *n); + +extern node_t *lookup_node(cmsg_bytes_t name); +extern int bind_node(cmsg_bytes_t name, node_t *n); +extern int unbind_node(cmsg_bytes_t name); +extern void unbind_all_names_for_node(node_t *n); + +extern int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token); +extern int send_node(cmsg_bytes_t node, sexp_t *message); +extern int send_node_release(cmsg_bytes_t node, sexp_t *message); + +#endif diff --git a/experiments/cmsg/queue.c b/experiments/cmsg/queue.c new file mode 100644 index 0000000..9ec2fcb --- /dev/null +++ b/experiments/cmsg/queue.c @@ -0,0 +1,243 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "ref.h" +#include "sexp.h" +#include "sexpio.h" +#include "hashtable.h" +#include "node.h" +#include "queue.h" +#include "dataq.h" +#include "messages.h" +#include "subscription.h" + +typedef struct queue_extension_t_ { + sexp_t *name; + sexp_t *backlog_q; + queue_t waiter_q; + hashtable_t subscriptions; + Process *shovel; + int shovel_awake; +} queue_extension_t; + +static sexp_t *queue_extend(node_t *n, sexp_t *args) { + if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) { + cmsg_bytes_t name = sexp_data(sexp_head(args)); + queue_extension_t *q = calloc(1, sizeof(*q)); + q->name = INCREF(sexp_head(args)); + q->backlog_q = INCREF(sexp_new_queue()); + q->waiter_q = EMPTY_QUEUE(subscription_t, link); + init_hashtable(&q->subscriptions, 5, NULL, NULL); + q->shovel = NULL; + q->shovel_awake = 0; + + n->extension = q; + return bind_node(name, n) ? NULL : sexp_cstring("bind failed"); + } else { + return sexp_cstring("invalid args"); + } +} + +static void queue_destructor(node_t *n) { + queue_extension_t *q = n->extension; + if (q != NULL) { /* can be NULL if queue_extend was given invalid args */ + DECREF(q->name, sexp_destructor); + DECREF(q->backlog_q, sexp_destructor); + { + subscription_t *sub = NULL; + while ((sub = dequeue(&q->waiter_q)) != NULL) { + free_subscription(sub); + } + } + destroy_hashtable(&q->subscriptions); + if (q->shovel) { + warn("TODO: the shovel needs to be taken down as well here\n"); + /* The difficulty is that the shovel may be running at the + moment, so careful ordering of operations is required to + avoid referencing deallocated memory. */ + } + free(q); + } +} + +static void end_burst(queue_extension_t *q, size_t *burst_count_ptr, size_t total_count) { +#if 0 + if (*burst_count_ptr > 0) { + info("Queue <<%.*s>>: burst count %lu; total %lu\n", + sexp_data(q->name).len, sexp_data(q->name).bytes, + *burst_count_ptr, total_count); + } +#endif + *burst_count_ptr = 0; +} + +static void shoveller(void *qv) { + queue_extension_t *q = qv; + + size_t burst_count = 0; + size_t total_count = 0; + sexp_t *body = NULL; /* held */ + subscription_t *sub = NULL; + + { + cmsg_bytes_t n = sexp_data(q->name); + info("Queue <<%.*s>> busy. Shoveller entering\n", n.len, n.bytes); + } + + check_for_work: + //info("Checking for work\n"); + + if (sexp_queue_emptyp(q->backlog_q)) { + //info("Backlog empty\n"); + goto wait_and_shovel; + } + + body = INCREF(sexp_dequeue(q->backlog_q)); /* held */ + + find_valid_waiter: + if (q->waiter_q.count == 0) { + //info("No waiters\n"); + sexp_queue_pushback(q->backlog_q, body); + DECREF(body, sexp_destructor); + goto wait_and_shovel; + } + + sub = dequeue(&q->waiter_q); + + /* + info("Delivering to <<%.*s>>/<<%.*s>>...\n", + sexp_data(sub->sink).len, sexp_data(sub->sink).bytes, + sexp_data(sub->name).len, sexp_data(sub->name).bytes); + */ + + if (!send_to_subscription(q->name, &q->subscriptions, sub, body)) { + goto find_valid_waiter; + } + + burst_count++; + total_count++; + + //info("Delivery successful\n"); + DECREF(body, sexp_destructor); + enqueue(&q->waiter_q, sub); + + if (burst_count >= 10000) { + end_burst(q, &burst_count, total_count); + yield(); + } + + goto check_for_work; + + wait_and_shovel: + end_burst(q, &burst_count, total_count); + //info("Waiting for throck\n"); + q->shovel_awake = 0; + /* TODO: if the number of active processes is large, assume we have + memory pressure, and quit the shovel early rather than waiting + for a few milliseconds to see if we're idle. */ + if (nap(100)) { + cmsg_bytes_t n = sexp_data(q->name); + info("Queue <<%.*s>> idle. Shoveller exiting\n", n.len, n.bytes); + q->shovel = NULL; + return; + } + //info("Throck received!\n"); + goto check_for_work; +} + +static void throck_shovel(queue_extension_t *q) { + //int counter = 0; + retry: + //printf("throck %d %d %p\n", counter++, q->shovel_awake, q->shovel); + if (!q->shovel_awake) { + if (!q->shovel) { + q->shovel_awake = 1; + q->shovel = spawn(shoveller, q); + } else { + if (resume(q->shovel) == -1) { + /* The nap() in the shoveller returned and scheduled the + shoveller *just* before we got to it, but the shoveller + hasn't had a chance to run yet, so hasn't been able to + clear q->shovel and exit. The resume() attempt failed + because q->shovel's state is PROCESS_RUNNING, now that it + has been scheduled by the return of nap(), so we know that + we should back off and try again from the top. */ + yield(); + goto retry; + } else { + /* The resume() was successful, i.e. the nap() hadn't returned + before we tried to resume(). We know that nap() will return + zero (since the timeout didn't fire before the process was + resumed), and so the existing shoveller will continue + running. */ + q->shovel_awake = 1; + } + } + } +} + +static void queue_handle_message(node_t *n, sexp_t *m) { + queue_extension_t *q = n->extension; + parsed_message_t p; + + if (parse_post(m, &p)) { + sexp_enqueue(q->backlog_q, p.post.body); + throck_shovel(q); + return; + } + + if (parse_subscribe(m, &p)) { + subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, &p); + if (sub != NULL) { + enqueue(&q->waiter_q, sub); + throck_shovel(q); + } + return; + } + + if (parse_unsubscribe(m, &p)) { + handle_unsubscribe_message(q->name, &q->subscriptions, &p); + return; + } + + warn("Message not understood in queue: "); + sexp_writeln(stderr_h, m); +} + +static node_class_t queue_class = { + .name = "queue", + .extend = queue_extend, + .destroy = queue_destructor, + .handle_message = queue_handle_message +}; + +void init_queue(void) { + register_node_class(&queue_class); +} diff --git a/experiments/cmsg/queue.h b/experiments/cmsg/queue.h new file mode 100644 index 0000000..0ab7de1 --- /dev/null +++ b/experiments/cmsg/queue.h @@ -0,0 +1,23 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_queue_h +#define cmsg_queue_h + +extern void init_queue(void); + +#endif diff --git a/experiments/cmsg/ref.h b/experiments/cmsg/ref.h new file mode 100644 index 0000000..e7e3d65 --- /dev/null +++ b/experiments/cmsg/ref.h @@ -0,0 +1,56 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_ref_h +#define cmsg_ref_h + +typedef struct refcount_t_ { + unsigned int count; +} refcount_t; + +#define ZERO_REFCOUNT() ((refcount_t) { .count = 0 }) + +#define INCREF(x) ({ \ + typeof(x) __x = (x); \ + if (__x != NULL) { \ + __x->refcount.count++; \ + } \ + __x; \ + }) + +#define UNGRAB(x) ({ \ + typeof(x) __x = (x); \ + if (__x != NULL) { \ + assert(__x->refcount.count); \ + __x->refcount.count--; \ + } \ + __x; \ + }) + +#define DECREF(x, dtor) ({ \ + typeof(x) __x = (x); \ + if (__x != NULL) { \ + assert(__x->refcount.count); \ + (__x->refcount.count)--; \ + if (__x->refcount.count == 0) { \ + (dtor)(__x); \ + } \ + } \ + (typeof(__x)) 0; \ + }) + +#endif diff --git a/experiments/cmsg/relay.c b/experiments/cmsg/relay.c new file mode 100644 index 0000000..ebf322f --- /dev/null +++ b/experiments/cmsg/relay.c @@ -0,0 +1,234 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +typedef unsigned char u_char; +#include + +#include "cmsg_private.h" +#include "harness.h" +#include "relay.h" +#include "net.h" +#include "ref.h" +#include "sexp.h" +#include "sexpio.h" +#include "hashtable.h" +#include "node.h" +#include "messages.h" + +#define WANT_MESSAGE_TRACE 0 + +typedef struct relay_extension_t_ { + struct sockaddr_in peername; + char peername_str[256]; + sexp_t *remote_container_name; + int fd; + IOHandle *outh; +} relay_extension_t; + +static long connection_count = 0; + +static void stats_printer(void *arg) { + while (1) { + info("%ld connections active\n", connection_count); + nap(1000); + } +} + +void init_relay(void) { + spawn(stats_printer, NULL); +} + +static sexp_t *relay_extend(node_t *n, sexp_t *args) { + /* TODO: outbound connections; args==NULL -> server relay, nonNULL -> outbound. */ + n->extension = calloc(1, sizeof(relay_extension_t)); + return NULL; +} + +static void relay_destructor(node_t *n) { + relay_extension_t *r = n->extension; + delete_iohandle(r->outh); + r->outh = NULL; + if (close(r->fd) == -1) { + /* log errors as warnings here and keep on trucking */ + warn("Closing file descriptor %d produced errno %d: %s\n", + r->fd, errno, strerror(errno)); + } + DECREF(r->remote_container_name, sexp_destructor); + free(r); +} + +static void relay_handle_message(node_t *n, sexp_t *m) { + relay_extension_t *r = n->extension; + +#if WANT_MESSAGE_TRACE + info("fd %d <-- ", r->fd); + sexp_writeln(stderr_h, m); +#endif + + BCHECK(!sexp_write(r->outh, m), "relay_handle_message sexp_write"); +} + +static node_class_t relay_class = { + .name = "relay", + .extend = relay_extend, + .destroy = relay_destructor, + .handle_message = relay_handle_message +}; + +static void send_error(IOHandle *h, char const *message, sexp_t *details) { + sexp_t *m = message_error(sexp_cstring(message), details); + INCREF(m); + warn("Sending error: "); + sexp_writeln(stderr_h, m); + iohandle_clear_error(h); + BCHECK(!sexp_write(h, m), "send_error sexp_write"); + DECREF(m, sexp_destructor); + iohandle_flush(h); /* ignore result here, there's not much we can do with it */ +} + +static void send_sexp_syntax_error(IOHandle *h, char const *message) { + char const *url = "http://people.csail.mit.edu/rivest/Sexp.txt"; + send_error(h, message, sexp_cstring(url)); +} + +static void relay_main(node_t *n) { + relay_extension_t *r = n->extension; + IOHandle *inh = new_iohandle(r->fd); + sexp_t *message = NULL; /* held */ + parsed_message_t p; + + INCREF(n); /* because the caller doesn't hold a ref, and we need to + drop ours on our death */ + + info("Accepted connection from %s on fd %d\n", r->peername_str, r->fd); + connection_count++; + + iohandle_write(r->outh, cmsg_cstring_bytes("(3:hop1:0)")); + ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting"); + + { + sexp_t *s = message_subscribe(sexp_bytes(local_container_name()), + sexp_empty_bytes, sexp_empty_bytes, + sexp_empty_bytes, sexp_empty_bytes); + INCREF(s); + sexp_write(r->outh, s); + DECREF(s, sexp_destructor); + } + + //iohandle_settimeout(r->inh, 3, 0); + + while (1) { + DECREF(message, sexp_destructor); + message = NULL; + if (!sexp_read(inh, &message)) goto network_error; + INCREF(message); + +#if WANT_MESSAGE_TRACE + info("fd %d --> ", r->fd); + sexp_writeln(stderr_h, message); +#endif + + if (parse_post(message, &p) && sexp_stringp(p.post.name)) { + cmsg_bytes_t nodename = sexp_data(p.post.name); + if (!send_node(nodename, p.post.body)) { + warn("Was asked to post to unknown node <<%.*s>>\n", nodename.len, nodename.bytes); + } + } else if (parse_subscribe(message, &p) + && sexp_stringp(p.subscribe.filter) + && sexp_stringp(p.subscribe.reply_sink) + && sexp_stringp(p.subscribe.reply_name)) { + if (bind_node(sexp_data(p.subscribe.filter), n)) { + post_node(sexp_data(p.subscribe.reply_sink), + sexp_data(p.subscribe.reply_name), + message_subscribe_ok(p.subscribe.filter), + sexp_empty_bytes); + + DECREF(r->remote_container_name, sexp_destructor); + r->remote_container_name = INCREF(p.subscribe.filter); + } else { + cmsg_bytes_t filter = sexp_data(p.subscribe.filter); + warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes); + } + } else if (parse_unsubscribe(message, &p) + && sexp_stringp(p.unsubscribe.token)) { + cmsg_bytes_t id = sexp_data(p.unsubscribe.token); + if (!unbind_node(id)) { + warn("Unbind failed <<%.*s>>\n", id.len, id.bytes); + } + } else { + send_error(r->outh, "message not understood", message); + goto protocol_error; + } + } + + network_error: + if (inh->eof) { + info("Disconnecting fd %d normally.\n", r->fd); + } else { + switch (inh->error_kind) { + case SEXP_ERROR_OVERFLOW: + send_sexp_syntax_error(r->outh, "sexp too big"); + break; + + case SEXP_ERROR_SYNTAX: + send_sexp_syntax_error(r->outh, "sexp syntax error"); + break; + + default: + warn("Relay handle error 0x%04X on fd %d: %d, %s\n", + inh->error_kind, r->fd, inh->error_errno, strerror(inh->error_errno)); + break; + } + } + + protocol_error: + DECREF(message, sexp_destructor); + delete_iohandle(inh); + unbind_all_names_for_node(n); + DECREF(n, node_destructor); + + connection_count--; +} + +void start_relay(struct sockaddr_in const *peername, int fd) { + node_t *n = new_node(&relay_class, NULL, NULL); + relay_extension_t *r = n->extension; + r->peername = *peername; + endpoint_name(&r->peername, CMSG_BYTES(sizeof(r->peername_str), r->peername_str)); + r->remote_container_name = NULL; + r->fd = fd; + r->outh = new_iohandle(r->fd); + spawn((process_main_t) relay_main, n); +} diff --git a/experiments/cmsg/relay.h b/experiments/cmsg/relay.h new file mode 100644 index 0000000..4402a9d --- /dev/null +++ b/experiments/cmsg/relay.h @@ -0,0 +1,25 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_relay_h +#define cmsg_relay_h + +extern void init_relay(void); + +extern void start_relay(struct sockaddr_in const *peername, int fd); + +#endif diff --git a/experiments/cmsg/sexp.c b/experiments/cmsg/sexp.c new file mode 100644 index 0000000..cc5af20 --- /dev/null +++ b/experiments/cmsg/sexp.c @@ -0,0 +1,255 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include + +#include + +#include "cmsg_private.h" +#include "ref.h" +#include "sexp.h" + +static sexp_t *freelist = NULL; + +sexp_t *sexp_empty_bytes = NULL; + +void init_sexp(void) { + sexp_empty_bytes = INCREF(sexp_cstring("")); +} + +void done_sexp(void) { + int count = 0; + + DECREF(sexp_empty_bytes, sexp_destructor); + sexp_empty_bytes = NULL; + + while (freelist != NULL) { + sexp_t *x = freelist; + freelist = x->data.pair.tail; + free(x); + count++; + } + info("Released %d cached sexp shells.\n", count); +} + +static inline sexp_t *alloc_shell(sexp_type_t kind) { + sexp_t *x = freelist; + if (x == NULL) { + x = malloc(sizeof(*x)); + } else { + freelist = x->data.pair.tail; + } + x->refcount = ZERO_REFCOUNT(); + x->kind = kind; + return x; +} + +static inline void release_shell(sexp_t *x) { + x->data.pair.tail = freelist; + freelist = x; +} + +sexp_t *sexp_incref(sexp_t *x) { + return INCREF(x); +} + +sexp_t *sexp_decref(sexp_t *x) { + return DECREF(x, sexp_destructor); +} + +void sexp_data_destructor(sexp_data_t *data) { + cmsg_bytes_free(data->data); + free(data); +} + +void sexp_destructor(sexp_t *x) { + tail_recursion: + switch (x->kind) { + case SEXP_BYTES: + cmsg_bytes_free(x->data.bytes); + break; + case SEXP_SLICE: + DECREF(x->data.slice.data, sexp_data_destructor); + break; + case SEXP_DISPLAY_HINT: + case SEXP_PAIR: { + sexp_t *next = x->data.pair.tail; + DECREF(x->data.pair.head, sexp_destructor); + if (next != NULL) { + if (next->refcount.count == 1) { + release_shell(x); + x = next; + goto tail_recursion; + } else { + DECREF(next, sexp_destructor); + } + } + break; + } + default: + die("Unknown sexp kind %d in dtor\n", x->kind); + } + release_shell(x); +} + +sexp_data_t *sexp_data_copy(cmsg_bytes_t body, size_t offset, size_t length) { + assert(offset + length <= body.len); + return sexp_data_alias(cmsg_bytes_malloc_dup(CMSG_BYTES(length, body.bytes + offset))); +} + +sexp_data_t *sexp_data_alias(cmsg_bytes_t body) { + sexp_data_t *data = malloc(sizeof(*data)); + data->refcount = ZERO_REFCOUNT(); + data->data = body; + return data; +} + +sexp_t *sexp_cstring(char const *str) { + return sexp_bytes(cmsg_cstring_bytes(str)); +} + +sexp_t *sexp_bytes(cmsg_bytes_t bytes) { + sexp_t *x = alloc_shell(SEXP_BYTES); + x->data.bytes = cmsg_bytes_malloc_dup(bytes); + return x; +} + +sexp_t *sexp_slice(sexp_data_t *data, size_t offset, size_t length) { + sexp_t *x = alloc_shell(SEXP_SLICE); + x->data.slice.data = INCREF(data); + x->data.slice.offset = offset; + x->data.slice.length = length; + return x; +} + +sexp_t *sexp_display_hint(sexp_t *hint, sexp_t *body) { + sexp_t *x = alloc_shell(SEXP_DISPLAY_HINT); + assert(sexp_simple_stringp(hint)); + assert(sexp_simple_stringp(body)); + x->data.pair.head = INCREF(hint); + x->data.pair.tail = INCREF(body); + return x; +} + +sexp_t *sexp_cons(sexp_t *head, sexp_t *tail) { + sexp_t *x = alloc_shell(SEXP_PAIR); + x->data.pair.head = INCREF(head); + x->data.pair.tail = INCREF(tail); + return x; +} + +cmsg_bytes_t sexp_data(sexp_t *x) { + restart: + switch (x->kind) { + case SEXP_BYTES: + return x->data.bytes; + case SEXP_SLICE: + return CMSG_BYTES(x->data.slice.length, + x->data.slice.data->data.bytes + x->data.slice.offset); + case SEXP_DISPLAY_HINT: + x = x->data.pair.tail; + goto restart; + default: + die("Unknown sexp kind %d in data accessor\n", x->kind); + } +} + +int sexp_cmp(sexp_t *a, sexp_t *b) { + tail: + if (a == b) return 0; + if (sexp_stringp(a) && sexp_stringp(b)) { + return cmsg_bytes_cmp(sexp_data(a), sexp_data(b)); + } + if (sexp_pairp(a) && sexp_pairp(b)) { + int result = sexp_cmp(sexp_head(a), sexp_head(b)); + if (result) return result; + a = sexp_tail(a); + b = sexp_tail(b); + goto tail; + } + if (a == NULL) return -1; + if (b == NULL) return 1; + if (a->kind < b->kind) return -1; + return 1; +} + +sexp_t *sexp_assoc(sexp_t *list, cmsg_bytes_t key) { + while (list != NULL) { + sexp_t *candidate = sexp_head(list); + if (sexp_stringp(candidate)) { + cmsg_bytes_t candidate_data = sexp_data(candidate); + if ((candidate_data.len == key.len) + && (!memcmp(candidate_data.bytes, key.bytes, key.len))) { + return candidate; + } + } + list = sexp_tail(list); + } + return NULL; +} + +size_t sexp_length(sexp_t *list) { + size_t result = 0; + while (sexp_pairp(list)) { + result++; + list = sexp_tail(list); + } + return result; +} + +sexp_t *sexp_new_queue(void) { + return sexp_cons(NULL, NULL); +} + +int sexp_queue_emptyp(sexp_t *q) { + return sexp_head(q) == NULL; +} + +void sexp_queue_pushback(sexp_t *q, sexp_t *x) { + sexp_t *cell = sexp_cons(x, sexp_head(q)); + if (sexp_head(q) == NULL) { + sexp_settail(q, cell); + } + sexp_sethead(q, cell); +} + +void sexp_enqueue(sexp_t *q, sexp_t *x) { + sexp_t *cell = sexp_cons(x, NULL); + if (sexp_head(q) == NULL) { + sexp_sethead(q, cell); + } else { + sexp_settail(sexp_tail(q), cell); + } + sexp_settail(q, cell); +} + +sexp_t *sexp_dequeue(sexp_t *q) { + if (sexp_head(q) == NULL) { + return NULL; + } else { + sexp_t *x = INCREF(sexp_head(sexp_head(q))); + sexp_t *cell = sexp_tail(sexp_head(q)); + sexp_sethead(q, cell); + if (cell == NULL) { + sexp_settail(q, NULL); + } + UNGRAB(x); + return x; + } +} diff --git a/experiments/cmsg/sexp.h b/experiments/cmsg/sexp.h new file mode 100644 index 0000000..2e9e233 --- /dev/null +++ b/experiments/cmsg/sexp.h @@ -0,0 +1,162 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_sexp_h +#define cmsg_sexp_h + +typedef struct sexp_data_t_ { + refcount_t refcount; + cmsg_bytes_t data; +} sexp_data_t; + +typedef enum sexp_type_t_ { + SEXP_BYTES, + SEXP_SLICE, + SEXP_DISPLAY_HINT, + SEXP_PAIR +} sexp_type_t; + +typedef struct sexp_t_ { + refcount_t refcount; + sexp_type_t kind; + union { + cmsg_bytes_t bytes; + struct { + sexp_data_t *data; + size_t offset; + size_t length; + } slice; + struct { + struct sexp_t_ *head; + struct sexp_t_ *tail; + } pair; /* and display-hint */ + } data; +} sexp_t; + +extern sexp_t *sexp_empty_bytes; + +extern void init_sexp(void); +extern void done_sexp(void); + +extern sexp_t *sexp_incref(sexp_t *x); +extern sexp_t *sexp_decref(sexp_t *x); + +extern void sexp_data_destructor(sexp_data_t *data); +extern void sexp_destructor(sexp_t *x); + +extern sexp_data_t *sexp_data_copy(cmsg_bytes_t body, size_t offset, size_t length); +extern sexp_data_t *sexp_data_alias(cmsg_bytes_t body); + +extern sexp_t *sexp_cstring(char const *str); +extern sexp_t *sexp_bytes(cmsg_bytes_t bytes); +extern sexp_t *sexp_slice(sexp_data_t *data, size_t offset, size_t length); +extern sexp_t *sexp_display_hint(sexp_t *hint, sexp_t *body); +extern sexp_t *sexp_cons(sexp_t *head, sexp_t *tail); + +static inline int sexp_simple_stringp(sexp_t *x) { + return (x != NULL) && ((x->kind == SEXP_BYTES) || (x->kind == SEXP_SLICE)); +} + +static inline int sexp_stringp(sexp_t *x) { + return sexp_simple_stringp(x) || ((x != NULL) && (x->kind == SEXP_DISPLAY_HINT)); +} + +static inline int sexp_pairp(sexp_t *x) { + return (x != NULL) && (x->kind == SEXP_PAIR); +} + +extern cmsg_bytes_t sexp_data(sexp_t *x); +extern int sexp_cmp(sexp_t *a, sexp_t *b); + +static inline sexp_t *sexp_head(sexp_t *x) { + assert(x->kind == SEXP_PAIR); + return x->data.pair.head; +} + +static inline sexp_t *sexp_tail(sexp_t *x) { + assert(x->kind == SEXP_PAIR); + return x->data.pair.tail; +} + +static inline sexp_t *sexp_hint(sexp_t *x) { + assert(x->kind == SEXP_DISPLAY_HINT); + return x->data.pair.head; +} + +static inline sexp_t *sexp_body(sexp_t *x) { + assert(x->kind == SEXP_DISPLAY_HINT); + return x->data.pair.tail; +} + +#define sexp_setter_(settername,fieldname) \ + static inline sexp_t *settername(sexp_t *x, sexp_t *y) { \ + sexp_t *old; \ + assert(x->kind == SEXP_PAIR); \ + INCREF(y); \ + old = x->data.pair.fieldname; \ + x->data.pair.fieldname = y; \ + DECREF(old, sexp_destructor); \ + return x; \ + } + +sexp_setter_(sexp_sethead, head) +sexp_setter_(sexp_settail, tail) + +static inline sexp_t *sexp_push(sexp_t *oldstack, sexp_t *val) { + sexp_t *newstack = INCREF(sexp_cons(val, oldstack)); + DECREF(oldstack, sexp_destructor); + return newstack; +} + +static inline sexp_t *sexp_pop(sexp_t *oldstack, sexp_t **valp) { + sexp_t *nextstack = INCREF(sexp_tail(oldstack)); + sexp_t *val = INCREF(sexp_head(oldstack)); + DECREF(oldstack, sexp_destructor); + UNGRAB(val); + if (valp != NULL) { + *valp = val; + } + return nextstack; +} + +static inline int sexp_pseudo_pop(sexp_t **px) { + *px = sexp_tail(*px); + return sexp_pairp(*px); +} + +static inline sexp_t *sexp_listtail(sexp_t *list, size_t dropcount) { + while (dropcount) { + list = sexp_tail(list); + dropcount--; + } + return list; +} + +static inline sexp_t *sexp_listref(sexp_t *list, size_t index) { + return sexp_head(sexp_listtail(list, index)); +} + +extern sexp_t *sexp_assoc(sexp_t *list, cmsg_bytes_t key); +extern size_t sexp_length(sexp_t *list); + +extern sexp_t *sexp_new_queue(void); +extern int sexp_queue_emptyp(sexp_t *q); +extern void sexp_queue_pushback(sexp_t *q, sexp_t *x); +extern void sexp_enqueue(sexp_t *q, sexp_t *x); +extern sexp_t *sexp_dequeue(sexp_t *q); + +#endif diff --git a/experiments/cmsg/sexpio.c b/experiments/cmsg/sexpio.c new file mode 100644 index 0000000..88f4ea3 --- /dev/null +++ b/experiments/cmsg/sexpio.c @@ -0,0 +1,247 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include + +#include + +#include + +#include "cmsg_private.h" +#include "ref.h" +#include "sexp.h" +#include "harness.h" +#include "sexpio.h" + +/* TODO: limit size of individual simple strings */ +/* TODO: limit nesting of sexps */ + +static sexp_t *read_simple_string(IOHandle *h, cmsg_bytes_t buf) { + int i = 0; + sexp_t *result; + + while (1) { + buf = iohandle_readwait(h, buf.len + 1); + if (buf.len == 0) return NULL; + /* Don't reset i to zero: avoids scanning the beginning of the + number repeatedly */ + + while (i < buf.len) { + if (i > 10) { + /* More than ten digits of length prefix. We're unlikely to be + able to cope with anything that large. */ + h->error_kind = SEXP_ERROR_OVERFLOW; + return NULL; + } + if (buf.bytes[i] == ':') { + size_t count; + buf.bytes[i] = '\0'; + count = atoi((char *) buf.bytes); + iohandle_drain(h, i + 1); + buf = iohandle_readwait(h, count); + if (buf.len < count) { + /* Error or EOF. */ + return NULL; + } + buf.len = count; + result = sexp_bytes(buf); + iohandle_drain(h, count); + return result; + } + + if (!isdigit(buf.bytes[i])) { + h->error_kind = SEXP_ERROR_SYNTAX; + return NULL; + } + + i++; + } + } +} + +sexp_t *sexp_read_atom(IOHandle *h) { + return read_simple_string(h, EMPTY_BYTES); +} + +#define READ1 \ + buf = iohandle_readwait(h, 1); \ + if (buf.len == 0) goto error; + +int sexp_read(IOHandle *h, sexp_t **result_ptr) { + cmsg_bytes_t buf; + sexp_t *stack = NULL; /* held */ + sexp_t *hint = NULL; /* held */ + sexp_t *body = NULL; /* held */ + sexp_t *accumulator = NULL; /* not held */ + + while (1) { + READ1; + switch (buf.bytes[0]) { + case '[': { + iohandle_drain(h, 1); + hint = INCREF(read_simple_string(h, EMPTY_BYTES)); + if (hint == NULL) goto error; + READ1; + if (buf.bytes[0] != ']') { + h->error_kind = SEXP_ERROR_SYNTAX; + goto error; + } + iohandle_drain(h, 1); + skip_whitespace_in_display_hint: + READ1; + if (isspace(buf.bytes[0])) { + iohandle_drain(h, 1); + goto skip_whitespace_in_display_hint; + } + body = INCREF(read_simple_string(h, EMPTY_BYTES)); + if (body == NULL) goto error; + accumulator = sexp_display_hint(hint, body); + DECREF(hint, sexp_destructor); /* these could be UNGRABs */ + DECREF(body, sexp_destructor); + break; + } + + case '(': + iohandle_drain(h, 1); + stack = sexp_push(stack, sexp_cons(NULL, NULL)); + continue; + + case ')': { + sexp_t *current; + if (stack == NULL) { + h->error_kind = SEXP_ERROR_SYNTAX; + goto error; + } + stack = sexp_pop(stack, ¤t); + INCREF(current); + iohandle_drain(h, 1); + accumulator = INCREF(sexp_head(current)); + DECREF(current, sexp_destructor); + UNGRAB(accumulator); + break; + } + + default: + if (isspace(buf.bytes[0])) { + iohandle_drain(h, 1); + continue; + } + buf.len = 1; /* needed to avoid reading too much in read_simple_string */ + accumulator = read_simple_string(h, buf); + if (accumulator == NULL) goto error; + break; + } + + if (stack == NULL) { + *result_ptr = accumulator; + return 1; + } else { + sexp_t *current = sexp_head(stack); /* not held */ + sexp_t *cell = sexp_cons(accumulator, NULL); + if (sexp_tail(current) == NULL) { + sexp_sethead(current, cell); + } else { + sexp_settail(sexp_tail(current), cell); + } + sexp_settail(current, cell); + } + } + + error: + DECREF(stack, sexp_destructor); + DECREF(hint, sexp_destructor); + DECREF(body, sexp_destructor); + return 0; +} + +void write_simple_string(IOHandle *h, sexp_t *x) { + cmsg_bytes_t data = sexp_data(x); + char lenstr[16]; + snprintf(lenstr, sizeof(lenstr), "%u:", (unsigned int) data.len); + lenstr[sizeof(lenstr) - 1] = '\0'; + iohandle_write(h, cmsg_cstring_bytes(lenstr)); + iohandle_write(h, data); +} + +unsigned short sexp_write(IOHandle *h, sexp_t *x) { + sexp_t *stack = NULL; /* held */ + sexp_t *current = x; + + write1: + if (current == NULL) { + iohandle_write(h, cmsg_cstring_bytes("()")); + } else { + switch (current->kind) { + case SEXP_BYTES: + case SEXP_SLICE: + write_simple_string(h, current); + break; + + case SEXP_DISPLAY_HINT: + iohandle_write(h, cmsg_cstring_bytes("[")); + write_simple_string(h, sexp_hint(current)); + iohandle_write(h, cmsg_cstring_bytes("]")); + write_simple_string(h, sexp_body(current)); + break; + + case SEXP_PAIR: + iohandle_write(h, cmsg_cstring_bytes("(")); + stack = sexp_push(stack, current); + break; + + default: + die("Unknown sexp kind %d in sexp_write\n", current->kind); + } + } + + check_stack: + if (stack == NULL) { + return 0; + } + + { + sexp_t *cell = sexp_head(stack); + if (cell == NULL) { + iohandle_write(h, cmsg_cstring_bytes(")")); + stack = sexp_pop(stack, NULL); /* no need to worry about incref/decref: val is NULL! */ + goto check_stack; + } + + if (sexp_pairp(cell)) { + current = sexp_head(cell); + sexp_sethead(stack, sexp_tail(cell)); + goto write1; + } + + return SEXP_ERROR_SYNTAX; + } +} + +unsigned short sexp_writeln(IOHandle *h, sexp_t *x) { + unsigned short result; + + fflush(NULL); + result = sexp_write(h, x); + if (result == 0) { + iohandle_write(h, cmsg_cstring_bytes("\n")); + ICHECK(iohandle_flush(h), "sexp_writeln iohandle_flush"); + } + return result; +} diff --git a/experiments/cmsg/sexpio.h b/experiments/cmsg/sexpio.h new file mode 100644 index 0000000..b9a6dfc --- /dev/null +++ b/experiments/cmsg/sexpio.h @@ -0,0 +1,29 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_sexpio_h +#define cmsg_sexpio_h + +#define SEXP_ERROR_OVERFLOW 0x8000 +#define SEXP_ERROR_SYNTAX 0x8001 + +extern sexp_t *sexp_read_atom(IOHandle *h); +extern int sexp_read(IOHandle *h, sexp_t **result_ptr); +extern unsigned short sexp_write(IOHandle *h, sexp_t *x); +extern unsigned short sexp_writeln(IOHandle *h, sexp_t *x); + +#endif diff --git a/experiments/cmsg/subscription.c b/experiments/cmsg/subscription.c new file mode 100644 index 0000000..24636b1 --- /dev/null +++ b/experiments/cmsg/subscription.c @@ -0,0 +1,158 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "cmsg_private.h" +#include "ref.h" +#include "sexp.h" +#include "hashtable.h" +#include "node.h" +#include "meta.h" +#include "messages.h" +#include "subscription.h" + +void free_subscription(subscription_t *sub) { + DECREF(sub->uuid, sexp_destructor); + DECREF(sub->filter, sexp_destructor); + DECREF(sub->sink, sexp_destructor); + DECREF(sub->name, sexp_destructor); + free(sub); +} + +void free_subscription_chain(subscription_t *chain) { + while (chain != NULL) { + subscription_t *next = chain->link; + free_subscription(chain); + chain = next; + } +} + +/* Returns true if the subscription has not been unsubscribed and the + destination of the subscription exists. */ +int send_to_subscription(sexp_t *source, + hashtable_t *subscriptions, + subscription_t *sub, + sexp_t *body) +{ + if (sub->uuid == NULL) { + free_subscription(sub); + return 0; + } else if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid)) { + announce_subscription(source, sub->filter, sub->sink, sub->name, 0); + hashtable_erase(subscriptions, sexp_data(sub->uuid)); + free_subscription(sub); + return 0; + } else { + return 1; + } +} + +subscription_t *send_to_subscription_chain(sexp_t *source, + hashtable_t *subscriptions, + subscription_t *chain, + sexp_t *body) +{ + subscription_t *top = chain; + subscription_t *prev = NULL; + while (chain != NULL) { + subscription_t *next = chain->link; + if (!send_to_subscription(source, subscriptions, chain, body)) { + if (prev == NULL) { + top = next; + } else { + prev->link = next; + } + } + prev = chain; + chain = next; + } + return top; +} + +subscription_t *handle_subscribe_message(sexp_t *source, + hashtable_t *subscriptions, + parsed_message_t *p) +{ + unsigned char uuid[CMSG_UUID_BUF_SIZE]; + if (gen_uuid(uuid) != 0) { + warn("Could not generate UUID\n"); + return NULL; + } else { + subscription_t *sub = malloc(sizeof(*sub)); + + sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid))); + sub->filter = p->subscribe.filter; + sub->sink = p->subscribe.sink; + sub->name = p->subscribe.name; + sub->link = NULL; + + if (!sexp_stringp(sub->filter) || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name) + || !sexp_stringp(p->subscribe.reply_sink) || !sexp_stringp(p->subscribe.reply_name)) { + DECREF(sub->uuid, sexp_destructor); + free(sub); + warn("Bad sink/name/reply_sink/reply_name in subscribe"); + return NULL; + } + + INCREF(sub->filter); + INCREF(sub->sink); + INCREF(sub->name); + + hashtable_put(subscriptions, sexp_data(sub->uuid), sub); + + announce_subscription(source, sub->filter, sub->sink, sub->name, 1); + + post_node(sexp_data(p->subscribe.reply_sink), + sexp_data(p->subscribe.reply_name), + message_subscribe_ok(sub->uuid), + sexp_empty_bytes); + + return sub; + } +} + +void handle_unsubscribe_message(sexp_t *source, + hashtable_t *subscriptions, + parsed_message_t *p) +{ + cmsg_bytes_t uuid; + subscription_t *sub; + + if (!sexp_stringp(p->unsubscribe.token)) { + warn("Invalid unsubscription\n"); + return; + } + + uuid = sexp_data(p->unsubscribe.token); + if (hashtable_get(subscriptions, uuid, (void **) &sub)) { + /* TODO: clean up more eagerly perhaps? */ + announce_subscription(source, sub->filter, sub->sink, sub->name, 0); + DECREF(sub->uuid, sexp_destructor); + sub->uuid = NULL; + hashtable_erase(subscriptions, uuid); + } +} diff --git a/experiments/cmsg/subscription.h b/experiments/cmsg/subscription.h new file mode 100644 index 0000000..f48db3f --- /dev/null +++ b/experiments/cmsg/subscription.h @@ -0,0 +1,49 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#ifndef cmsg_subscription_h +#define cmsg_subscription_h + +typedef struct subscription_t_ { + sexp_t *uuid; + sexp_t *filter; + sexp_t *sink; + sexp_t *name; + struct subscription_t_ *link; +} subscription_t; + +extern void free_subscription(subscription_t *sub); +extern void free_subscription_chain(subscription_t *chain); + +extern int send_to_subscription(sexp_t *source, + hashtable_t *subscriptions, + subscription_t *sub, + sexp_t *body); +extern subscription_t *send_to_subscription_chain(sexp_t *source, + hashtable_t *subscriptions, + subscription_t *chain, + sexp_t *body); + +extern subscription_t *handle_subscribe_message(sexp_t *source, + hashtable_t *subscriptions, + parsed_message_t *p); + +extern void handle_unsubscribe_message(sexp_t *source, + hashtable_t *subscriptions, + parsed_message_t *p); + +#endif diff --git a/experiments/cmsg/util.c b/experiments/cmsg/util.c new file mode 100644 index 0000000..9218ac1 --- /dev/null +++ b/experiments/cmsg/util.c @@ -0,0 +1,113 @@ +/* Copyright 2010, 2011, 2012 Tony Garnock-Jones . + * + * This file is part of Hop. + * + * Hop is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Hop is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public + * License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hop. If not, see . + */ +#include +#include +#include + +#include +#include + +/* OSSP UUID */ +#include + +#include "cmsg_private.h" + +#define UUID_CHECK(context) \ + if (result != UUID_RC_OK) { \ + warn("gen_uuid failed with %d at %s\n", result, context); \ + return result; \ + } + +int gen_uuid(unsigned char *uuid_buf) { + uuid_rc_t result; + uuid_t *uuid; + unsigned char temp_buf[UUID_LEN_STR + 1]; + unsigned char *temp_buf_ptr = &temp_buf[0]; /* odd API */ + size_t uuid_buf_len = UUID_LEN_STR + 1; + + assert(CMSG_UUID_BUF_SIZE == UUID_LEN_STR); + + result = uuid_create(&uuid); + UUID_CHECK("uuid_create"); + + result = uuid_make(uuid, UUID_MAKE_V4); + UUID_CHECK("uuid_make"); + + result = uuid_export(uuid, UUID_FMT_STR, &temp_buf_ptr, &uuid_buf_len); + UUID_CHECK("uuid_export"); + assert(uuid_buf_len == (UUID_LEN_STR + 1)); + + memcpy(uuid_buf, temp_buf, CMSG_UUID_BUF_SIZE); + + result = uuid_destroy(uuid); + UUID_CHECK("uuid_destroy"); + + return UUID_RC_OK; +} + +cmsg_bytes_t cmsg_bytes_malloc_dup(cmsg_bytes_t src) { + cmsg_bytes_t result; + result.len = src.len; + result.bytes = malloc(src.len); + if (result.bytes != NULL) { + memcpy(result.bytes, src.bytes, src.len); + } + return result; +} + +cmsg_bytes_t cmsg_bytes_malloc(size_t amount) { + cmsg_bytes_t result; + result.len = amount; + result.bytes = malloc(amount); + return result; +} + +void cmsg_bytes_free(cmsg_bytes_t bytes) { + free(bytes.bytes); +} + +int cmsg_bytes_cmp(cmsg_bytes_t a, cmsg_bytes_t b) { + if (a.len < b.len) return -1; + if (a.len > b.len) return 1; + return memcmp(a.bytes, b.bytes, a.len); +} + +void die(char const *format, ...) { + va_list vl; + va_start(vl, format); + fprintf(stderr, "ERROR: "); + vfprintf(stderr, format, vl); + va_end(vl); + exit(1); +} + +void warn(char const *format, ...) { + va_list vl; + va_start(vl, format); + fprintf(stderr, "WARNING: "); + vfprintf(stderr, format, vl); + va_end(vl); +} + +void info(char const *format, ...) { + va_list vl; + va_start(vl, format); + fprintf(stderr, "INFO: "); + vfprintf(stderr, format, vl); + va_end(vl); +} diff --git a/experiments/lisp/main.lisp b/experiments/lisp/main.lisp new file mode 100644 index 0000000..b8a3325 --- /dev/null +++ b/experiments/lisp/main.lisp @@ -0,0 +1,33 @@ +(ql:quickload "flexi-streams") +;(ql:quickload "babel") +(ql:quickload "usocket") +(ql:quickload "cl-match") + +(ql:quickload "gbbopen") +(require :portable-threads) + +(load "packages.lisp") +(load "sexp.lisp") +(load "network.lisp") + +(in-package :cl-user) + +;; (defun handle-connection (stream) +;; (spki-sexp:write-sexp (spki-sexp:read-sexp stream) stream)) + +;; (defun start-server (port) +;; (usocket:socket-server "localhost" port 'handle-connection '() +;; :in-new-thread t +;; :multi-threading t +;; :reuse-address t +;; :element-type '(unsigned-byte 8))) + +;; (start-server 5671) + +(smsg-network:serve-on-port 5671) + +;; (let ((server-socket (socket-listen "localhost" 5671 +;; :reuse-address t +;; :element-type unsigned-integer))) +;; (loop for conn = (socket-accept server-socket) +;; do (handle-connection conn))) diff --git a/experiments/lisp/network.lisp b/experiments/lisp/network.lisp new file mode 100644 index 0000000..e36803e --- /dev/null +++ b/experiments/lisp/network.lisp @@ -0,0 +1,47 @@ +(in-package :smsg-network) + +(defun command-loop (in out route) + (loop (let ((command (read-sexp in))) + (when (not (handle-inbound-command command in out route)) + (return))))) + +(defun handle-inbound-command (command in out route) + (ematch-sexp command + (("subscribe" filter sink name reply-sink reply-name) + (if (rebind-node filter nil route) + (when (plusp (length reply-sink)) + (post reply-sink reply-name (sexp-build ("subscribe-ok" (= filter))))) + (report! `(rebind-failed ,command)))) + (("unsubscribe" id) + (when (not (rebind-node id route nil)) + (report! `(rebind-failed ,command)))) + (("post" name body token) + (send name body)))) + +(defun relay (in out localname servermode) + (flet ((route (message) + (write-sexp message out) + (write-byte 13 out) + (write-byte 10 out))) + (if servermode + (route (sexp-quote ("hop" "0"))) + (ematch-sexp (read-sexp in) + (("hop" "0") t))) + (force-output out) + (route (sexp-build ("subscribe" (= localname) "" "" "" ""))) + (command-loop in out #'route))) + +(defun handle-connection (stream) + (relay stream stream (sexp-quote "smsg") t)) + +(defun serve-on-port (port) + (usocket:socket-server "localhost" port 'handle-connection '() + :in-new-thread t + :multi-threading t + :reuse-address t + :element-type '(unsigned-byte 8))) + +(defun client (localname hostname portnumber) + (let ((s (usocket:socket-stream + (usocket:socket-connect hostname portnumber :element-type '(unsigned-byte 8))))) + (relay s s localname nil))) diff --git a/experiments/lisp/packages.lisp b/experiments/lisp/packages.lisp new file mode 100644 index 0000000..20d0e8b --- /dev/null +++ b/experiments/lisp/packages.lisp @@ -0,0 +1,33 @@ +(defpackage :spki-sexp + (:use :cl :flexi-streams :cl-match) + (:shadow :read-from-string) + + (:export :read-sexp :write-sexp + + :display-hint + :make-display-hint + :display-hint-p + :copy-display-hint + :display-hint-hint + :display-hint-body + + :syntax-error + :bad-length-prefix + :bad-display-hint + :bad-input-character + :unexpected-close-paren + + :match-failure + + :convert-sexp + :sexp-quote + :sexp-build + + :match-sexp + :ematch-sexp)) + +(defpackage :smsg-network + (:use :cl :flexi-streams :spki-sexp) + (:export :relay + :serve-on-port + :client)) diff --git a/experiments/lisp/sexp.lisp b/experiments/lisp/sexp.lisp new file mode 100644 index 0000000..93d1d9e --- /dev/null +++ b/experiments/lisp/sexp.lisp @@ -0,0 +1,142 @@ +;; SPKI SEXPs for Common Lisp + +(in-package :spki-sexp) + +(define-condition syntax-error (error) ()) +(define-condition bad-length-prefix (syntax-error) ()) +(define-condition bad-display-hint (syntax-error) ()) +(define-condition bad-input-character (syntax-error) ()) +(define-condition unexpected-close-paren (syntax-error) ()) + +(define-condition match-failure (error) ()) + +(defstruct display-hint hint body) + +(defun write-integer (n output-stream) + (labels ((w (n) + (when (plusp n) + (multiple-value-bind (top-half lower-digit) + (floor n 10) + (w top-half) + (write-byte (+ lower-digit 48) output-stream))))) + (if (zerop n) + (write-byte 48 output-stream) + (w n)))) + +(defun write-sexp (sexp &optional (output-stream *standard-output*)) + (etypecase sexp + ((array (unsigned-byte 8)) + (write-integer (length sexp) output-stream) + (write-byte 58 output-stream) ;; #\: + (write-sequence sexp output-stream)) + (cons + (write-byte 40 output-stream) ;; #\( + (loop for v in sexp do (write-sexp v output-stream)) + (write-byte 41 output-stream)) ;; #\) + (display-hint + (write-byte 91 output-stream) + (write-sexp (display-hint-hint sexp) output-stream) + (write-byte 93 output-stream) + (write-sexp (display-hint-body sexp) output-stream)) + (string + (write-sexp (flexi-streams:string-to-octets sexp :external-format :utf-8) output-stream)))) + +(defun read-simple-string (input-stream &optional (len 0)) + (loop (let ((c (read-byte input-stream))) + (if (eql c 58) ;; #\: + (let ((buf (make-array len :element-type '(unsigned-byte 8)))) + (read-sequence buf input-stream) + (return buf)) + (let ((v (digit-char-p c))) + (if v + (setq len (+ (* len 10) v)) + (error 'bad-length-prefix))))))) + +(defun read-sexp-list (input-stream) + (loop for v = (read-sexp-inner input-stream) + until (eq v 'end-of-list-marker) + collect v)) + +(defun read-sexp-inner (input-stream) + (let (result) + (tagbody :retry + (setq result + (let ((c (read-byte input-stream))) + (cond + ((eql c 40) (read-sexp-list input-stream)) ;; #\( + ((eql c 41) 'end-of-list-marker) ;; #\) + ((eql c 91) ;; #\[ + (let ((hint (read-simple-string input-stream))) + (when (not (eql (read-byte input-stream) 93)) ;; #\] + (error 'bad-display-hint)) + (make-display-hint :hint hint :body (read-simple-string input-stream)))) + ((<= 48 c 57) (read-simple-string input-stream (- c 48))) ;; digits + ((<= c 32) ;; whitespace - convenience for testing + (go :retry)) + (t (error 'bad-input-character)))))) + result)) + +(defun read-sexp (&optional (input-stream *standard-input*)) + (let ((v (read-sexp-inner input-stream))) + (if (eq v 'end-of-list-marker) + (error 'unexpected-close-paren) + v))) + +(defun convert-sexp (val) + (etypecase val + ((array (unsigned-byte 8)) val) + (cons (cons (convert-sexp (car val)) + (convert-sexp (cdr val)))) + (null nil) + (display-hint (make-display-hint + :hint (convert-sexp (display-hint-hint val)) + :body (convert-sexp (display-hint-body val)))) + (string (flexi-streams:string-to-octets val :external-format :utf-8)))) + +(defmacro sexp-quote (val) + `(quote ,(convert-sexp val))) + +(defun build-sexp (stx) + (etypecase stx + ((array (unsigned-byte 8)) stx) + (cons (if (eq (car stx) '=) + (cadr stx) + `(cons ,(build-sexp (car stx)) + ,(build-sexp (cdr stx))))) + (null 'nil) + (display-hint `(make-display-hint + :hint ,(build-sexp (display-hint-hint stx)) + :body ,(build-sexp (display-hint-body stx)))) + (string (flexi-streams:string-to-octets stx :external-format :utf-8)))) + +(defmacro sexp-build (template) + (build-sexp template)) + +(defun convert-match-pattern (pattern) + (etypecase pattern + ((array (unsigned-byte 8)) `(array (1 (unsigned-byte 8)) ,(coerce pattern 'list))) + (cons `(cons ,(convert-match-pattern (car pattern)) + ,(convert-match-pattern (cdr pattern)))) + (null 'nil) + (display-hint `(struct display-hint + (:hint ,(convert-match-pattern (display-hint-hint pattern))) + (:body ,(convert-match-pattern (display-hint-body pattern))))) + (string (convert-match-pattern + (flexi-streams:string-to-octets pattern :external-format :utf-8))) + (symbol pattern))) + +(defmacro match-sexp (val &rest clauses) + `(cl-match:match ,val + ,@(mapcar (lambda (clause) + `(,(convert-match-pattern (car clause)) ,@(cdr clause))) + clauses))) + +(defmacro ematch-sexp (val &rest clauses) + `(match-sexp ,val ,@clauses (_ (error 'match-failure)))) + +;; Useful for testing +(defun read-from-string (str &optional (external-format :utf-8)) + (read-sexp (flexi-streams:make-flexi-stream + (flexi-streams:make-in-memory-input-stream + (flexi-streams:string-to-octets str :external-format external-format)) + :external-format external-format)))