Merge remote branch 'c/master'

This commit is contained in:
Tony Garnock-Jones 2012-05-11 09:04:36 -04:00
commit 3a303015f8
45 changed files with 3850 additions and 0 deletions

5
experiments/cmsg/.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
*.o
messages.h
messages.c
cmsg
depend.mk

45
experiments/cmsg/Makefile Normal file
View File

@ -0,0 +1,45 @@
TARGET = cmsg
OBJECTS = main.o harness.o net.o util.o relay.o hashtable.o dataq.o sexp.o sexpio.o node.o \
queue.o direct.o fanout.o subscription.o meta.o messages.o
UUID_CFLAGS:=$(shell uuid-config --cflags)
UUID_LDFLAGS:=$(shell uuid-config --ldflags)
# grr
ifeq ($(shell uname -s),Darwin)
UUID_LIB=uuid
else
UUID_LIB=ossp-uuid
endif
#CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O0 -g $(UUID_CFLAGS)
CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O3 $(UUID_CFLAGS)
#CFLAGS = -D_XOPEN_SOURCE=600 -Wall -O3 -static $(UUID_CFLAGS)
all: $(TARGET)
$(TARGET): $(OBJECTS)
$(CC) $(CFLAGS) $(UUID_LDFLAGS) -o $@ $(OBJECTS) -l$(UUID_LIB) -levent
# $(CC) $(CFLAGS) $(UUID_LDFLAGS) -o $@ $(OBJECTS) -l$(UUID_LIB) -levent -lrt
%.o: %.c
$(CC) $(CFLAGS) -c $<
messages.c: ../../protocol/messages.json codegen.py
python codegen.py body > $@
messages.h: ../../protocol/messages.json codegen.py
python codegen.py header > $@
clean:
rm -f $(TARGET)
rm -f $(OBJECTS)
rm -rf *.dSYM
rm -f depend.mk messages.c messages.h
depend.mk:
touch messages.h
gcc $(CFLAGS) -M *.c > $@
rm messages.h
echo "depend.mk:" Makefile *.c >> $@
-include depend.mk

59
experiments/cmsg/TODO Normal file
View File

@ -0,0 +1,59 @@
Cope with possibility of duplicate uuid, in e.g. queue/fanout/direct.
If a subscription token matches an existing subscription, there's a
collision, so choose a new token until there's no collision.
SAX-style sexp reader/writer, so that we can do something sensible for
enormous (e.g. gigabyte-sized) messages.
The "meta" exchange probably wants to be a topic exchange. Or
something.
The "meta" exchange probably wants to emit how-things-are-now messages
when people subscribe to it, to get them started; a kind of
last-value-cache type thing.
Website
- http://www.flickr.com/photos/elemishra/158211069/
Switch from indirect scheduling to direct scheduling
- before, on walk: 173kHz test1/test3
- after, no change. Probably because we only have a single thread
active in the system when running test1/test3, so every time some
work comes in, it baton-passes to the next guy in the chain. The
change *would* be an improvement but we have a separate worklist
(to process now) and runlist (to process after polling for events
once) so it always goes back to the scheduler because the worklist
is never longer than one.
- Revisit once we start testing multiple concurrent streams.
Extension to Sexps:
- ! <simplestring> <value> : binds the simplestring to the value when
seen. Note that the new binding for simplestring is NOT in scope
during parsing of the value, so no circular data can be constructed
this way. Any previous binding is discarded *after* the value is
completely read.
- ? <simplestring> : retrieves the bound value of the simplestring.
So
!1:a11:hello world(?1:a1: ?1:a1: ?1:a)
is equivalent to
(11:hello world1: 11:hello world1: 11:hello world)
And, more to the point, after a receiver has seen
!1:a36:af49f5dc-0454-4ba1-9f48-a55e9c10ee35
then a simple
?1:a
is all that's needed to refer to that gargantuan UUID.
Another example:
!1:a()!1:a(?1:a?1:a)!1:a(?1:a?1:a)?1:a
is equivalent to
((()())(()()))
"Having metrics around as many things as possible really helped us
identify a difficult problem to diagnose."
-- https://github.com/blog/767-recent-services-interruptions

View File

@ -0,0 +1,55 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_private_h
#define cmsg_private_h
typedef struct cmsg_bytes_t {
size_t len;
unsigned char *bytes;
} cmsg_bytes_t;
#define CMSG_BYTES(length, bytes_ptr) ((cmsg_bytes_t) { \
.len = (length), \
.bytes = (unsigned char *) (bytes_ptr) \
})
#define EMPTY_BYTES CMSG_BYTES(0, NULL)
static inline cmsg_bytes_t cmsg_cstring_bytes(char const *cstr) {
cmsg_bytes_t result;
result.len = strlen(cstr);
result.bytes = (void *) cstr;
return result;
}
#define CMSG_UUID_BUF_SIZE 36
extern int gen_uuid(unsigned char *uuid_buf); /* must be exactly CMSG_UUID_BUF_SIZE bytes long */
extern cmsg_bytes_t cmsg_bytes_malloc_dup(cmsg_bytes_t src);
extern cmsg_bytes_t cmsg_bytes_malloc(size_t amount);
extern void cmsg_bytes_free(cmsg_bytes_t bytes);
extern int cmsg_bytes_cmp(cmsg_bytes_t a, cmsg_bytes_t b);
#define ICHECK(result, message) do { if ((result) == -1) { perror(message); exit(2); } } while (0)
#define BCHECK(result, message) do { if ((result) == 0) { perror(message); exit(2); } } while (0)
#define PCHECK(result, message) do { if ((result) == NULL) { perror(message); exit(2); } } while (0)
extern __attribute__((noreturn)) void die(char const *format, ...);
extern void warn(char const *format, ...);
extern void info(char const *format, ...);
#endif

129
experiments/cmsg/codegen.py Normal file
View File

@ -0,0 +1,129 @@
from __future__ import with_statement
## Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
##
## This file is part of Hop.
##
## Hop is free software: you can redistribute it and/or modify it
## under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Hop is distributed in the hope that it will be useful, but WITHOUT
## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
## License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Hop. If not, see <http://www.gnu.org/licenses/>.
copyright_stmt = \
'''/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
'''
import sys
import json
def cify(s):
s = s.replace('-', '_')
s = s.replace(' ', '_')
return s
class MessageType:
def __init__(self, j):
self.wire_selector = j['selector']
self.selector = cify(self.wire_selector)
self.wire_argnames = j['args']
self.argnames = map(cify, self.wire_argnames)
def format_args(self, template, separator = ', '):
return separator.join([template % (x,) for x in self.argnames])
with file("../../protocol/messages.json") as f:
spec = map(MessageType, json.load(f)['definitions'])
def entrypoint_header():
print copyright_stmt
print
print '#ifndef cmsg_messages_h'
print '#define cmsg_messages_h'
print
print 'extern void init_messages(void);'
print
for t in spec:
print 'extern sexp_t *selector_%s;' % (t.selector,)
print
for t in spec:
print 'extern sexp_t *message_%s(%s);' % (t.selector, t.format_args('sexp_t *%s'))
print
print 'typedef union parsed_message_t_ {'
for t in spec:
if t.argnames:
print ' struct { sexp_t %s; } %s;' % (t.format_args('*%s'), t.selector)
print '} parsed_message_t;'
for t in spec:
print
print 'static inline int parse_%s(sexp_t *message, parsed_message_t *out) {' % \
(t.selector,)
print ' if (!sexp_pairp(message)) return 0;'
print ' if (sexp_cmp(sexp_head(message), selector_%s) != 0) return 0;' % (t.selector,)
for n in t.argnames:
print ' if (!sexp_pseudo_pop(&message)) return 0;'
print ' out->%s.%s = sexp_head(message);' % (t.selector, n)
print ' return sexp_tail(message) == NULL;'
print '}'
print
print '#endif'
def entrypoint_body():
print copyright_stmt
print
print '#include <stdlib.h>'
print '#include <string.h>'
print '#include <stdio.h>'
print '#include <signal.h>'
print
print '#include <assert.h>'
print
print '#include "cmsg_private.h"'
print '#include "ref.h"'
print '#include "sexp.h"'
print '#include "messages.h"'
print
for t in spec:
print 'sexp_t *selector_%s = NULL;' % (t.selector,)
print
print 'void init_messages(void) {'
for t in spec:
print ' selector_%s = sexp_cstring("%s");' % (t.selector, t.wire_selector)
for t in spec:
print ' INCREF(selector_%s);' % (t.selector,)
print '}'
for t in spec:
print
print 'sexp_t *message_%s(%s) {' % (t.selector, t.format_args('sexp_t *%s'))
print ' sexp_t *m = NULL;'
for n in reversed(t.argnames):
print ' m = sexp_cons(%s, m);' % (n,)
print ' return sexp_cons(selector_%s, m);' % (t.selector,)
print '}'
if __name__ == '__main__':
drivername = sys.argv[1]
globals()['entrypoint_' + drivername]()

66
experiments/cmsg/dataq.c Normal file
View File

@ -0,0 +1,66 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include "dataq.h"
#define QLINK(q,x) (*((void **)(((char *) x) + (q)->link_offset)))
void enqueue(queue_t *q, void *x) {
QLINK(q, x) = NULL;
if (q->head == NULL) {
q->head = x;
} else {
QLINK(q, q->tail) = x;
}
q->tail = x;
q->count++;
}
void *dequeue(queue_t *q) {
if (q->head == NULL) {
return NULL;
} else {
void *x = q->head;
q->head = QLINK(q, x);
QLINK(q, x) = NULL;
if (q->head == NULL) {
q->tail = NULL;
}
q->count--;
return x;
}
}
void queue_append(queue_t *q1, queue_t *q2) {
assert(q1->link_offset == q2->link_offset);
if (q2->head != NULL) {
if (q1->head != NULL) {
QLINK(q1, q1->tail) = q2->head;
} else {
q1->head = q2->head;
}
q1->tail = q2->tail;
q2->head = NULL;
q2->tail = NULL;
}
}

36
experiments/cmsg/dataq.h Normal file
View File

@ -0,0 +1,36 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_dataq_h
#define cmsg_dataq_h
typedef struct queue_t_ {
size_t link_offset;
int count;
void *head;
void *tail;
} queue_t;
#define EMPTY_QUEUE(element_t, link_field_name) \
((queue_t) { offsetof(element_t, link_field_name), 0, NULL, NULL })
extern void enqueue(queue_t *q, void *x);
extern void *dequeue(queue_t *q);
extern void queue_append(queue_t *q1, queue_t *q2);
#endif

125
experiments/cmsg/direct.c Normal file
View File

@ -0,0 +1,125 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include <ucontext.h>
#include "cmsg_private.h"
#include "harness.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
#include "messages.h"
#include "subscription.h"
#include "sexpio.h"
typedef struct direct_extension_t_ {
sexp_t *name;
hashtable_t routing_table;
hashtable_t subscriptions;
} direct_extension_t;
static sexp_t *direct_extend(node_t *n, sexp_t *args) {
if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) {
cmsg_bytes_t name = sexp_data(sexp_head(args));
direct_extension_t *d = calloc(1, sizeof(*d));
d->name = INCREF(sexp_head(args));
init_hashtable(&d->routing_table, 5, NULL, NULL);
init_hashtable(&d->subscriptions, 5, NULL, NULL);
n->extension = d;
return bind_node(name, n) ? NULL : sexp_cstring("bind failed");
} else {
return sexp_cstring("invalid args");
}
}
static void free_direct_chain(void *context, cmsg_bytes_t key, void *value) {
free_subscription_chain(value);
}
static void direct_destructor(node_t *n) {
direct_extension_t *d = n->extension;
if (d != NULL) { /* can be NULL if direct_extend was given invalid args */
DECREF(d->name, sexp_destructor);
hashtable_foreach(&d->routing_table, free_direct_chain, NULL);
destroy_hashtable(&d->routing_table);
destroy_hashtable(&d->subscriptions);
free(d);
}
}
static void route_message(direct_extension_t *d, sexp_t *rk, sexp_t *body) {
subscription_t *chain = NULL;
subscription_t *newchain;
hashtable_get(&d->routing_table, sexp_data(rk), (void **) &chain);
newchain = send_to_subscription_chain(d->name, &d->subscriptions, chain, body);
if (newchain != chain) {
hashtable_put(&d->routing_table, sexp_data(rk), newchain);
}
}
static void direct_handle_message(node_t *n, sexp_t *m) {
direct_extension_t *d = n->extension;
parsed_message_t p;
if (parse_post(m, &p)) {
if (sexp_stringp(p.post.name)) {
route_message(d, p.post.name, p.post.body);
} else {
warn("Non-string routing key in direct\n");
}
return;
}
if (parse_subscribe(m, &p)) {
subscription_t *sub = handle_subscribe_message(d->name, &d->subscriptions, &p);
if (sub != NULL) {
hashtable_get(&d->routing_table, sexp_data(p.subscribe.filter), (void **) &sub->link);
hashtable_put(&d->routing_table, sexp_data(p.subscribe.filter), sub);
}
return;
}
if (parse_unsubscribe(m, &p)) {
handle_unsubscribe_message(d->name, &d->subscriptions, &p);
return;
}
warn("Message not understood in direct: ");
sexp_writeln(stderr_h, m);
}
static node_class_t direct_class = {
.name = "direct",
.extend = direct_extend,
.destroy = direct_destructor,
.handle_message = direct_handle_message
};
void init_direct(void) {
register_node_class(&direct_class);
}

23
experiments/cmsg/direct.h Normal file
View File

@ -0,0 +1,23 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_direct_h
#define cmsg_direct_h
extern void init_direct(void);
#endif

113
experiments/cmsg/fanout.c Normal file
View File

@ -0,0 +1,113 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include <ucontext.h>
#include "cmsg_private.h"
#include "harness.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
#include "messages.h"
#include "subscription.h"
#include "sexpio.h"
typedef struct fanout_extension_t_ {
sexp_t *name;
hashtable_t subscriptions;
} fanout_extension_t;
static sexp_t *fanout_extend(node_t *n, sexp_t *args) {
if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) {
cmsg_bytes_t name = sexp_data(sexp_head(args));
fanout_extension_t *f = calloc(1, sizeof(*f));
f->name = INCREF(sexp_head(args));
init_hashtable(&f->subscriptions, 5, NULL, (void (*)(void *)) free_subscription);
n->extension = f;
return bind_node(name, n) ? NULL : sexp_cstring("bind failed");
} else {
return sexp_cstring("invalid args");
}
}
static void fanout_destructor(node_t *n) {
fanout_extension_t *f = n->extension;
if (f != NULL) { /* can be NULL if fanout_extend was given invalid args */
DECREF(f->name, sexp_destructor);
destroy_hashtable(&f->subscriptions);
free(f);
}
}
struct delivery_context {
fanout_extension_t *f;
sexp_t *body;
};
static void send_to_sub(void *contextv, cmsg_bytes_t key, void *subv) {
struct delivery_context *context = contextv;
subscription_t *sub = subv;
send_to_subscription(context->f->name, &context->f->subscriptions, sub, context->body);
}
static void fanout_handle_message(node_t *n, sexp_t *m) {
fanout_extension_t *f = n->extension;
parsed_message_t p;
if (parse_post(m, &p)) {
struct delivery_context context;
context.f = f;
context.body = p.post.body;
hashtable_foreach(&f->subscriptions, send_to_sub, &context);
return;
}
if (parse_subscribe(m, &p)) {
handle_subscribe_message(f->name, &f->subscriptions, &p);
return;
}
if (parse_unsubscribe(m, &p)) {
handle_unsubscribe_message(f->name, &f->subscriptions, &p);
return;
}
warn("Message not understood in fanout: ");
sexp_writeln(stderr_h, m);
}
static node_class_t fanout_class = {
.name = "fanout",
.extend = fanout_extend,
.destroy = fanout_destructor,
.handle_message = fanout_handle_message
};
void init_fanout(void) {
register_node_class(&fanout_class);
}

23
experiments/cmsg/fanout.h Normal file
View File

@ -0,0 +1,23 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_fanout_h
#define cmsg_fanout_h
extern void init_fanout(void);
#endif

334
experiments/cmsg/harness.c Normal file
View File

@ -0,0 +1,334 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stddef.h>
#include <errno.h>
#include <sys/time.h>
#include <ucontext.h>
typedef unsigned char u_char;
#include <event.h>
#include "cmsg_private.h"
#include "harness.h"
#include "dataq.h"
#include <assert.h>
/* What is a sane value for STACK_SIZE? */
#ifdef __APPLE__
/* Bollocks. Looks like OS X chokes unless STACK_SIZE is a multiple of 32k. */
# include <AvailabilityMacros.h>
# if !defined(MAC_OS_X_VERSION_10_6) || MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_6
/* Hmm, and looks like 10.5 has more aggressive stack requirements than 10.6. */
# define STACK_SIZE 65536
# else
# define STACK_SIZE 32768
# endif
#elif linux
# define STACK_SIZE 32768
#else
# error Define STACK_SIZE for your platform. It should probably not be less than 32k?
#endif
/* TODO: reuse stacks (via a freelist) */
/* TODO: investigate avoiding syscall in swapcontext, setcontext etc. */
IOHandle *stdin_h = NULL;
IOHandle *stdout_h = NULL;
IOHandle *stderr_h = NULL;
static volatile int harness_running = 1;
Process *current_process = NULL;
#define EMPTY_PROCESS_QUEUE EMPTY_QUEUE(Process, link)
static ucontext_t scheduler;
static queue_t runlist = EMPTY_PROCESS_QUEUE;
static queue_t deadlist = EMPTY_PROCESS_QUEUE;
static void enqueue_runlist(Process *p) {
p->state = PROCESS_RUNNING;
enqueue(&runlist, p);
}
static void schedule(void) {
//info("schedule %p\n", current_process);
if (current_process == NULL) {
ICHECK(setcontext(&scheduler), "schedule setcontext");
} else {
ICHECK(swapcontext(&current_process->context, &scheduler), "schedule swapcontext");
}
}
void yield(void) {
enqueue_runlist(current_process);
schedule();
}
void killproc(void) {
assert(current_process->state == PROCESS_RUNNING);
current_process->state = PROCESS_DEAD;
enqueue(&deadlist, current_process);
current_process = NULL;
schedule();
}
void suspend(void) {
assert(current_process->state == PROCESS_RUNNING);
current_process->state = PROCESS_WAITING;
schedule();
}
int resume(Process *p) {
if (p->state == PROCESS_WAITING) {
enqueue_runlist(p);
return 0;
} else {
return -1;
}
}
static void driver(void (*f)(void *), void *arg) {
f(arg);
killproc();
}
Process *spawn(void (*f)(void *), void *arg) {
Process *p = calloc(1, sizeof(*p));
PCHECK(p, "spawn calloc");
p->state = PROCESS_DEAD;
p->stack_base = malloc(STACK_SIZE);
PCHECK(p->stack_base, "stack pointer malloc");
ICHECK(getcontext(&p->context), "spawn getcontext");
p->context.uc_link = NULL;
p->context.uc_stack.ss_sp = p->stack_base;
p->context.uc_stack.ss_size = STACK_SIZE;
p->context.uc_stack.ss_flags = 0;
makecontext(&p->context, (void (*)(void)) driver, 2, f, arg);
p->link = NULL;
enqueue_runlist(p);
return p;
}
typedef struct nap_context_t_ {
Process *p;
int timeout_fired;
} nap_context_t;
void nap_isr(int fd, short what, void *arg) {
nap_context_t *context = arg;
//info("nap_isr %p\n", p);
if ((context->p->state == PROCESS_WAITING) && (context->p->wait_flags & EV_TIMEOUT)) {
context->timeout_fired = 1;
enqueue_runlist(context->p);
}
}
int nap(long millis) {
struct event ev;
struct timeval tv;
nap_context_t context;
assert(current_process != NULL);
assert(current_process->state == PROCESS_RUNNING);
context.p = current_process;
context.timeout_fired = 0;
tv.tv_sec = millis / 1000;
tv.tv_usec = (millis % 1000) * 1000;
evtimer_set(&ev, nap_isr, &context);
ICHECK(evtimer_add(&ev, &tv), "evtimer_add");
current_process->state = PROCESS_WAITING;
current_process->wait_flags |= EV_TIMEOUT;
schedule();
current_process->wait_flags &= ~EV_TIMEOUT;
evtimer_del(&ev);
return context.timeout_fired;
}
static void awaken_waiters(IOHandle *h, short mask) {
Process *prev = NULL;
Process *p;
Process *next;
for (p = h->waiters; p != NULL; p = next) {
next = p->link;
assert(p->state == PROCESS_WAITING);
if ((p->wait_flags & mask) != 0) {
if (prev == NULL) {
h->waiters = next;
} else {
prev->link = next;
}
enqueue_runlist(p);
} else {
prev = p;
}
}
}
static void input_isr(struct bufferevent *bufev, IOHandle *h) {
awaken_waiters(h, EV_READ);
}
static void output_isr(struct bufferevent *bufev, IOHandle *h) {
awaken_waiters(h, EV_WRITE);
}
static void error_isr(struct bufferevent *bufev, short what, IOHandle *h) {
unsigned short kind = what & ~(EVBUFFER_READ | EVBUFFER_WRITE);
int saved_errno = errno;
info("error_isr 0x%04X fd %d\n", what, h->fd);
h->error_direction = what & (EVBUFFER_READ | EVBUFFER_WRITE);
if (kind == EVBUFFER_EOF) {
h->eof = 1;
} else {
h->error_kind = kind;
h->error_errno = saved_errno;
}
awaken_waiters(h, EV_READ | EV_WRITE);
}
IOHandle *new_iohandle(int fd) {
IOHandle *h = malloc(sizeof(*h));
h->waiters = NULL;
h->fd = fd;
h->io = bufferevent_new(fd,
(evbuffercb) input_isr,
(evbuffercb) output_isr,
(everrorcb) error_isr,
h);
PCHECK(h->io, "bufferevent_new");
bufferevent_setwatermark(h->io, EV_READ, 0, 256 * 1024);
h->eof = 0;
iohandle_clear_error(h);
return h;
}
void delete_iohandle(IOHandle *h) {
if (h->waiters) {
warn("Deleting IOHandle %p with fd %d: processes are blocked on this handle!\n",
h,
h->fd);
}
bufferevent_free(h->io);
free(h);
}
void iohandle_clear_error(IOHandle *h) {
h->error_direction = 0;
h->error_kind = 0;
h->error_errno = 0;
}
static void block_on_io(IOHandle *h, short event) {
assert(current_process->link == NULL);
current_process->link = h->waiters;
h->waiters = current_process;
current_process->state = PROCESS_WAITING;
current_process->wait_flags |= event;
schedule();
current_process->wait_flags &= ~event;
}
cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least) {
while (EVBUFFER_LENGTH(h->io->input) < at_least) {
if (h->eof || h->error_kind) {
return EMPTY_BYTES;
}
ICHECK(bufferevent_enable(h->io, EV_READ), "bufferevent_enable");
block_on_io(h, EV_READ);
ICHECK(bufferevent_disable(h->io, EV_READ), "bufferevent_disable");
}
return CMSG_BYTES(EVBUFFER_LENGTH(h->io->input), EVBUFFER_DATA(h->io->input));
}
void iohandle_drain(IOHandle *h, size_t count) {
evbuffer_drain(h->io->input, count);
}
void iohandle_write(IOHandle *h, cmsg_bytes_t buf) {
ICHECK(bufferevent_write(h->io, buf.bytes, buf.len), "bufferevent_write");
}
int iohandle_flush(IOHandle *h) {
while (EVBUFFER_LENGTH(h->io->output) > 0) {
if (h->error_kind) {
return -1;
}
block_on_io(h, EV_WRITE);
}
return 0;
}
void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write) {
bufferevent_settimeout(h->io, timeout_read, timeout_write);
}
static void clean_dead_processes(void) {
Process *deadp;
while ((deadp = dequeue(&deadlist)) != NULL) {
free(deadp->stack_base);
free(deadp);
}
}
void boot_harness(void) {
stdin_h = new_iohandle(0);
stdout_h = new_iohandle(1);
stderr_h = new_iohandle(2);
ICHECK(getcontext(&scheduler), "boot_harness getcontext");
while (1) {
while (runlist.count) {
queue_t work = runlist;
runlist = EMPTY_PROCESS_QUEUE;
//info("Processing %d jobs\n", work.count);
while ((current_process = dequeue(&work)) != NULL) {
//info("entering %p\n", current_process);
ICHECK(swapcontext(&scheduler, &current_process->context), "boot_harness swapcontext");
clean_dead_processes();
}
//info("Polling for events\n");
event_loop(EVLOOP_NONBLOCK);
}
if (!harness_running) break;
//info("Blocking for events\n");
event_loop(EVLOOP_ONCE);
}
info("Shutting down.\n");
delete_iohandle(stdin_h);
delete_iohandle(stdout_h);
delete_iohandle(stderr_h);
}
void interrupt_harness(void) {
info("Interrupting harness\n");
harness_running = 0;
}

View File

@ -0,0 +1,72 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_harness_h
#define cmsg_harness_h
typedef void (*process_main_t)(void *);
typedef enum process_state_t_ {
PROCESS_DEAD = 0,
PROCESS_RUNNING,
PROCESS_WAITING
} process_state_t;
typedef struct Process {
process_state_t state;
int wait_flags;
void *stack_base;
ucontext_t context;
struct Process *link;
} Process;
typedef struct IOHandle {
Process *waiters;
int fd;
struct bufferevent *io;
int eof;
unsigned short error_direction;
unsigned short error_kind;
int error_errno;
} IOHandle;
extern IOHandle *stdin_h;
extern IOHandle *stdout_h;
extern IOHandle *stderr_h;
extern Process *current_process;
extern void yield(void);
extern Process *spawn(process_main_t f, void *arg);
extern int nap(long millis); /* 1 for timeout expired; 0 for resumed early */
extern void suspend(void);
extern int resume(Process *p);
extern IOHandle *new_iohandle(int fd);
extern void delete_iohandle(IOHandle *h);
extern void iohandle_clear_error(IOHandle *h);
extern cmsg_bytes_t iohandle_readwait(IOHandle *h, size_t at_least);
extern void iohandle_drain(IOHandle *h, size_t count);
extern void iohandle_write(IOHandle *h, cmsg_bytes_t buf);
extern int iohandle_flush(IOHandle *h);
extern void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write);
extern void boot_harness(void);
extern void interrupt_harness(void);
#endif

View File

@ -0,0 +1,159 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <assert.h>
#include "cmsg_private.h"
#include "hashtable.h"
uint32_t hash_bytes(cmsg_bytes_t bytes) {
/* http://en.wikipedia.org/wiki/Jenkins_hash_function */
uint32_t hash = 0;
size_t i;
for (i = 0; i < bytes.len; i++) {
hash += bytes.bytes[i];
hash += (hash << 10);
hash ^= (hash >> 6);
}
hash += (hash << 3);
hash ^= (hash >> 11);
hash += (hash << 15);
return hash;
}
void init_hashtable(hashtable_t *table,
size_t initial_bucket_count,
void *(*dup_value)(void *),
void (*free_value)(void *))
{
table->bucket_count = initial_bucket_count;
table->entry_count = 0;
table->buckets = NULL;
table->dup_value = dup_value;
table->free_value = free_value;
if (initial_bucket_count > 0) {
table->buckets = calloc(initial_bucket_count, sizeof(hashtable_entry_t *));
}
}
static void destroy_entry(hashtable_t *table, hashtable_entry_t *entry) {
cmsg_bytes_free(entry->key);
if (table->free_value != NULL) {
table->free_value(entry->value);
}
free(entry);
}
void destroy_hashtable(hashtable_t *table) {
if (table->buckets != NULL) {
int i;
for (i = 0; i < table->bucket_count; i++) {
hashtable_entry_t *chain = table->buckets[i];
table->buckets[i] = NULL;
while (chain != NULL) {
hashtable_entry_t *next = chain->next;
destroy_entry(table, chain);
chain = next;
}
}
free(table->buckets);
}
}
static hashtable_entry_t **hashtable_find(hashtable_t *table, cmsg_bytes_t key) {
uint32_t h = hash_bytes(key) % table->bucket_count;
hashtable_entry_t **entryptr = &(table->buckets[h]);
hashtable_entry_t *entry = *entryptr;
while (entry != NULL) {
if ((entry->key.len == key.len) && !memcmp(entry->key.bytes, key.bytes, key.len)) {
break;
}
entryptr = &entry->next;
entry = *entryptr;
}
return entryptr;
}
int hashtable_contains(hashtable_t *table, cmsg_bytes_t key) {
hashtable_entry_t **entryptr = hashtable_find(table, key);
return (*entryptr != NULL);
}
int hashtable_get(hashtable_t *table, cmsg_bytes_t key, void **valueptr) {
hashtable_entry_t **entryptr = hashtable_find(table, key);
if (*entryptr == NULL) {
return 0;
} else {
*valueptr = (*entryptr)->value;
return 1;
}
}
int hashtable_put(hashtable_t *table, cmsg_bytes_t key, void *value) {
/* TODO: grow and rehash */
hashtable_entry_t **entryptr = hashtable_find(table, key);
if (*entryptr == NULL) {
hashtable_entry_t *entry = malloc(sizeof(hashtable_entry_t));
entry->next = NULL;
entry->key = cmsg_bytes_malloc_dup(key);
entry->value = (table->dup_value == NULL) ? value : table->dup_value(value);
*entryptr = entry;
table->entry_count++;
return 1;
} else {
if (table->free_value != NULL) {
table->free_value((*entryptr)->value);
}
(*entryptr)->value = (table->dup_value == NULL) ? value : table->dup_value(value);
return 0;
}
}
int hashtable_erase(hashtable_t *table, cmsg_bytes_t key) {
hashtable_entry_t **entryptr = hashtable_find(table, key);
if (*entryptr == NULL) {
return 0;
} else {
hashtable_entry_t *entry = *entryptr;
*entryptr = entry->next;
destroy_entry(table, entry);
table->entry_count--;
return 1;
}
}
void hashtable_foreach(hashtable_t *table,
hashtable_iterator_t iterator,
void *context)
{
int i;
for (i = 0; i < table->bucket_count; i++) {
hashtable_entry_t *chain = table->buckets[i];
while (chain != NULL) {
hashtable_entry_t *next = chain->next;
iterator(context, chain->key, chain->value);
chain = next;
}
}
}

View File

@ -0,0 +1,53 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_hashtable_h
#define cmsg_hashtable_h
typedef struct hashtable_entry_t_ {
struct hashtable_entry_t_ *next;
cmsg_bytes_t key;
void *value;
} hashtable_entry_t;
typedef struct hashtable_t_ {
size_t bucket_count;
size_t entry_count;
hashtable_entry_t **buckets;
void *(*dup_value)(void *);
void (*free_value)(void *);
} hashtable_t;
typedef void (*hashtable_iterator_t)(void *context, cmsg_bytes_t key, void *value);
extern uint32_t hash_bytes(cmsg_bytes_t bytes);
extern void init_hashtable(hashtable_t *table,
size_t initial_bucket_count,
void *(*dup_value)(void *),
void (*free_value)(void *));
extern void destroy_hashtable(hashtable_t *table);
extern int hashtable_contains(hashtable_t *table, cmsg_bytes_t key);
extern int hashtable_get(hashtable_t *table, cmsg_bytes_t key, void **valueptr);
extern int hashtable_put(hashtable_t *table, cmsg_bytes_t key, void *value);
extern int hashtable_erase(hashtable_t *table, cmsg_bytes_t key);
extern void hashtable_foreach(hashtable_t *table,
hashtable_iterator_t iterator,
void *context);
#endif

125
experiments/cmsg/main.c Normal file
View File

@ -0,0 +1,125 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <signal.h>
#include <netinet/in.h>
#include <ucontext.h>
#include <assert.h>
typedef unsigned char u_char;
#include <event.h>
#include "cmsg_private.h"
#include "harness.h"
#include "net.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
#include "queue.h"
#include "direct.h"
#include "fanout.h"
#include "relay.h"
#include "meta.h"
#include "messages.h"
#include "sexpio.h"
#define WANT_CONSOLE_LISTENER 1
static void factory_handle_message(node_t *n, sexp_t *m) {
parsed_message_t p;
if (parse_create(m, &p)) {
if (sexp_stringp(p.create.classname)
&& sexp_stringp(p.create.reply_sink)
&& sexp_stringp(p.create.reply_name)) {
cmsg_bytes_t classname_bytes = sexp_data(p.create.classname);
node_class_t *nc = lookup_node_class(classname_bytes);
if (nc == NULL) {
warn("Node class not found <<%.*s>>\n", classname_bytes.len, classname_bytes.bytes);
} else {
sexp_t *error = NULL;
sexp_t *reply;
if (new_node(nc, p.create.arg, &error) != NULL) {
reply = message_create_ok(NULL);
} else {
reply = message_create_failed(error);
}
post_node(sexp_data(p.create.reply_sink),
sexp_data(p.create.reply_name),
reply,
sexp_empty_bytes);
}
}
return;
}
warn("Message not understood in factory: ");
sexp_writeln(stderr_h, m);
}
static node_class_t factory_class = {
.name = "factory",
.extend = NULL,
.destroy = NULL,
.handle_message = factory_handle_message
};
static void init_factory(void) {
bind_node(cmsg_cstring_bytes("factory"), new_node(&factory_class, NULL, NULL));
}
#if WANT_CONSOLE_LISTENER
static void console_listener(void *arg) {
IOHandle *in_handle = new_iohandle(0);
while (1) {
cmsg_bytes_t buf = iohandle_readwait(in_handle, 1);
if (buf.len == 0) break;
iohandle_drain(in_handle, buf.len);
}
delete_iohandle(in_handle);
interrupt_harness();
}
#endif
int main(int argc, char *argv[]) {
info("cmsg ALPHA, Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved.\n");
event_init();
signal(SIGPIPE, SIG_IGN); /* avoid EPIPE when connections drop unexpectedly */
info("Using libevent version %s\n", event_get_version());
init_sexp();
init_messages();
init_node(cmsg_cstring_bytes("server"));
init_factory();
init_queue();
init_direct();
init_fanout();
init_relay();
init_meta();
#if WANT_CONSOLE_LISTENER
spawn(console_listener, NULL);
#endif
start_net(5671);
boot_harness();
done_sexp();
return 0;
}

54
experiments/cmsg/meta.c Normal file
View File

@ -0,0 +1,54 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include "cmsg_private.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
#include "meta.h"
#include "messages.h"
void init_meta(void) {
sexp_t *args;
args = INCREF(sexp_cons(sexp_cstring("meta"), NULL));
new_node(lookup_node_class(cmsg_cstring_bytes("direct")), args, NULL);
DECREF(args, sexp_destructor);
}
void announce_subscription(sexp_t *source,
sexp_t *filter,
sexp_t *sink,
sexp_t *name,
int onoff)
{
post_node(cmsg_cstring_bytes("meta"),
sexp_data(source),
onoff
? message_subscribed(source, filter, sink, name)
: message_unsubscribed(source, filter, sink, name),
NULL);
}

29
experiments/cmsg/meta.h Normal file
View File

@ -0,0 +1,29 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_meta_h
#define cmsg_meta_h
extern void init_meta(void);
extern void announce_subscription(sexp_t *source,
sexp_t *filter,
sexp_t *sink,
sexp_t *name,
int onoff);
#endif

View File

@ -0,0 +1,74 @@
diff --git a/harness.c b/harness.c
index 9c891b3..74061af 100644
--- a/harness.c
+++ b/harness.c
@@ -50,18 +50,38 @@ Process *current_process = NULL;
static ucontext_t scheduler;
static queue_t runlist = EMPTY_PROCESS_QUEUE;
static queue_t deadlist = EMPTY_PROCESS_QUEUE;
+static queue_t current_worklist = EMPTY_PROCESS_QUEUE;
static void enqueue_runlist(Process *p) {
p->state = PROCESS_RUNNING;
enqueue(&runlist, p);
}
+static void clean_dead_processes(void) {
+ Process *deadp;
+ while ((deadp = dequeue(&deadlist)) != NULL) {
+ free(deadp->stack_base);
+ free(deadp);
+ }
+}
+
static void schedule(void) {
- //info("schedule %p\n", current_process);
if (current_process == NULL) {
ICHECK(setcontext(&scheduler), "schedule setcontext");
} else {
- ICHECK(swapcontext(&current_process->context, &scheduler), "schedule swapcontext");
+ Process *current = current_process;
+ Process *target_process = dequeue(&current_worklist);
+ ucontext_t *target;
+
+ if (target_process == NULL) {
+ target = &scheduler;
+ } else {
+ target = &target_process->context;
+ current_process = target_process;
+ }
+
+ clean_dead_processes(); /* safe because we know we're not dead ourselves at this point */
+ ICHECK(swapcontext(&current->context, target), "schedule swapcontext");
}
}
@@ -255,14 +275,6 @@ void iohandle_settimeout(IOHandle *h, int timeout_read, int timeout_write) {
bufferevent_settimeout(h->io, timeout_read, timeout_write);
}
-static void clean_dead_processes(void) {
- Process *deadp;
- while ((deadp = dequeue(&deadlist)) != NULL) {
- free(deadp->stack_base);
- free(deadp);
- }
-}
-
void boot_harness(void) {
stdin_h = new_iohandle(0);
stdout_h = new_iohandle(1);
@@ -272,10 +284,10 @@ void boot_harness(void) {
while (1) {
while (runlist.count) {
- queue_t work = runlist;
+ current_worklist = runlist;
runlist = EMPTY_PROCESS_QUEUE;
- //info("Processing %d jobs\n", work.count);
- while ((current_process = dequeue(&work)) != NULL) {
+ //info("Processing %d jobs\n", current_worklist.count);
+ while ((current_process = dequeue(&current_worklist)) != NULL) {
//info("entering %p\n", current_process);
ICHECK(swapcontext(&scheduler, &current_process->context), "boot_harness swapcontext");
clean_dead_processes();

2
experiments/cmsg/misc/t0 Normal file
View File

@ -0,0 +1,2 @@
(9:subscribe5:test00:0:5:test05:login)
(4:post4:meta(9:subscribe0:5:test08:presence5:test01:k)0:)

3
experiments/cmsg/misc/t1 Normal file
View File

@ -0,0 +1,3 @@
(9:subscribe5:test10:0:5:test15:login)
(4:post7:factory(6:create5:queue(2:q1)5:test11:k)0:)
(4:post2:q1(9:subscribe0:5:test18:consumer5:test11:k)0:)

4
experiments/cmsg/misc/t2 Normal file
View File

@ -0,0 +1,4 @@
(9:subscribe5:test20:0:5:test25:login)
(4:post2:q1(4:post0:8:message10:)0:)
(4:post2:q1(4:post0:8:message20:)0:)
(11:unsubscribe5:test2)

4
experiments/cmsg/misc/t4 Normal file
View File

@ -0,0 +1,4 @@
(9:subscribe5:test40:0:5:test45:login)
(4:post7:factory(6:create6:direct(2:dx)5:test41:k)0:)
(4:post2:dx(9:subscribe1:a5:test48:consumer5:test41:k)0:)
(4:post2:dx(9:subscribe1:c5:test48:consumer5:test41:k)0:)

7
experiments/cmsg/misc/t5 Normal file
View File

@ -0,0 +1,7 @@
(9:subscribe5:test50:0:5:test55:login)
(4:post7:factory(6:create6:direct(2:dx)5:test51:k)0:)
(4:post7:factory(6:create5:queue(2:q5)5:test51:k)0:)
(4:post2:q5(9:subscribe0:5:test59:consumer15:test51:k)0:)
(4:post2:q5(9:subscribe0:5:test59:consumer25:test51:k)0:)
(4:post2:dx(9:subscribe1:a2:q50:5:test51:k)0:)
(4:post2:dx(9:subscribe1:b2:q50:5:test51:k)0:)

6
experiments/cmsg/misc/t6 Normal file
View File

@ -0,0 +1,6 @@
(9:subscribe5:test60:0:5:test65:login)
(4:post2:dx(4:post1:a9:messageA10:)0:)
(4:post2:dx(4:post1:a9:messageA20:)0:)
(4:post2:dx(4:post1:b8:messageB0:)0:)
(4:post2:dx(4:post1:c8:messageC0:)0:)
(11:unsubscribe5:test6)

116
experiments/cmsg/net.c Normal file
View File

@ -0,0 +1,116 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <time.h>
#include <sys/time.h>
#include <stdarg.h>
#include <assert.h>
typedef unsigned char u_char;
#include <event.h>
#include "cmsg_private.h"
#include "relay.h"
static struct event accept_event;
void get_addr_name(char *namebuf, size_t buflen, struct sockaddr_in const *sin) {
unsigned char *addr = (unsigned char *) &sin->sin_addr.s_addr;
struct hostent *h = gethostbyaddr(addr, 4, AF_INET);
if (h == NULL) {
snprintf(namebuf, buflen, "%u.%u.%u.%u", addr[0], addr[1], addr[2], addr[3]);
} else {
snprintf(namebuf, buflen, "%s", h->h_name);
}
}
void endpoint_name(struct sockaddr_in const *peername, cmsg_bytes_t result) {
char name[256];
get_addr_name(name, sizeof(name), peername);
snprintf((char *) result.bytes, result.len, "%s:%d", name, ntohs(peername->sin_port));
}
static void accept_connection(int servfd, short what, void *arg) {
struct sockaddr_in s;
socklen_t addrlen = sizeof(s);
int fd = accept(servfd, (struct sockaddr *) &s, &addrlen);
if (fd == -1) {
if (errno != EAGAIN && errno != EINTR) {
warn("accept: errno %d (%s)\n", errno, strerror(errno));
}
return;
}
{
int i = 1;
ICHECK(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i)), "setsockopt TCP_NODELAY");
}
start_relay(&s, fd);
}
void start_net(int listen_port) {
int servfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in s;
if (servfd < 0) {
die("Could not open listen socket.\n");
}
s.sin_family = AF_INET;
s.sin_addr.s_addr = htonl(INADDR_ANY);
s.sin_port = htons(listen_port);
{
int i = 1;
setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)); // don't care if this fails
}
if (bind(servfd, (struct sockaddr *) &s, sizeof(s)) < 0) {
die("Could not bind listen socket.\n");
}
if (listen(servfd, 5) < 0) {
int savedErrno = errno;
die("Could not listen on listen socket (errno %d: %s).\n",
savedErrno, strerror(savedErrno));
}
event_set(&accept_event, servfd, EV_READ | EV_PERSIST, accept_connection, NULL);
if (event_add(&accept_event, NULL) == -1) {
die("Could not add accept_event.");
}
info("Accepting connections on port %d.\n", listen_port);
}

26
experiments/cmsg/net.h Normal file
View File

@ -0,0 +1,26 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_net_h
#define cmsg_net_h
extern void get_addr_name(char *namebuf, size_t buflen, struct sockaddr_in const *sin);
extern void endpoint_name(struct sockaddr_in const *peername, cmsg_bytes_t result);
extern void start_net(int listen_port);
#endif

194
experiments/cmsg/node.c Normal file
View File

@ -0,0 +1,194 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <assert.h>
#include <ucontext.h>
#include "cmsg_private.h"
#include "ref.h"
#include "sexp.h"
#include "harness.h"
#include "sexpio.h"
#include "hashtable.h"
#include "node.h"
#include "meta.h"
#include "messages.h"
static cmsg_bytes_t _container_name;
static hashtable_t node_class_table;
static hashtable_t directory;
static void *node_incref(void *arg) {
return INCREF((node_t *) arg);
}
static void node_decref(void *arg) {
DECREF((node_t *) arg, node_destructor);
}
void init_node(cmsg_bytes_t container_name) {
if (container_name.len == 0) {
unsigned char buf[CMSG_UUID_BUF_SIZE];
gen_uuid(buf);
_container_name = cmsg_bytes_malloc_dup(CMSG_BYTES(CMSG_UUID_BUF_SIZE, buf));
} else {
_container_name = cmsg_bytes_malloc_dup(container_name);
}
info("Local container name is <<%.*s>>\n", _container_name.len, _container_name.bytes);
init_hashtable(&node_class_table,
31,
NULL,
NULL);
init_hashtable(&directory,
10007,
node_incref,
node_decref);
}
cmsg_bytes_t local_container_name(void) {
return _container_name;
}
void register_node_class(node_class_t *nc) {
cmsg_bytes_t key = cmsg_cstring_bytes(nc->name);
if (hashtable_contains(&node_class_table, key)) {
die("Duplicate node class name %s\n", nc->name);
}
hashtable_put(&node_class_table, key, nc);
}
node_class_t *lookup_node_class(cmsg_bytes_t name) {
node_class_t *nc = NULL;
hashtable_get(&node_class_table, name, (void **) &nc);
return nc;
}
static void init_node_names(node_t *n) {
init_hashtable(&n->names, 5, NULL, NULL);
}
node_t *new_node(node_class_t *nc, sexp_t *args, sexp_t **error_out) {
node_t *n = malloc(sizeof(*n));
n->refcount = ZERO_REFCOUNT();
n->node_class = nc;
n->extension = NULL;
init_node_names(n);
if (nc->extend != NULL) {
sexp_t *error = nc->extend(n, args);
if (error != NULL) {
node_destructor(n);
if (error_out != NULL) {
*error_out = error;
} else {
warn("Creating node of class %s failed with ", nc->name);
sexp_writeln(stderr_h, error);
}
return NULL;
}
}
return n;
}
void node_destructor(node_t *n) {
if (n->node_class->destroy != NULL) {
n->node_class->destroy(n);
}
unbind_all_names_for_node(n);
destroy_hashtable(&n->names);
free(n);
}
node_t *lookup_node(cmsg_bytes_t name) {
node_t *n = NULL;
hashtable_get(&directory, name, (void **) &n);
return n;
}
static void announce_binding(cmsg_bytes_t name, int onoff) {
sexp_t *filter = sexp_bytes(name);
INCREF(filter);
announce_subscription(sexp_empty_bytes, filter, sexp_empty_bytes, sexp_empty_bytes, onoff);
DECREF(filter, sexp_destructor);
}
int bind_node(cmsg_bytes_t name, node_t *n) {
if (name.len == 0) {
warn("Binding to empty name forbidden\n");
return 0;
}
if (hashtable_contains(&directory, name)) {
return 0;
}
hashtable_put(&directory, name, n);
hashtable_put(&n->names, name, NULL);
info("Binding node <<%.*s>> of class %s\n", name.len, name.bytes, n->node_class->name);
announce_binding(name, 1);
return 1;
}
int unbind_node(cmsg_bytes_t name) {
node_t *n = NULL;
hashtable_get(&directory, name, (void **) &n);
if (n == NULL) {
return 0;
} else {
info("Unbinding node <<%.*s>> of class %s\n", name.len, name.bytes, n->node_class->name);
hashtable_erase(&n->names, name);
hashtable_erase(&directory, name);
announce_binding(name, 0);
return 1;
}
}
static void unbind_on_destroy(void *context, cmsg_bytes_t key, void *value) {
unbind_node(key);
}
void unbind_all_names_for_node(node_t *n) {
hashtable_t names = n->names;
init_node_names(n);
hashtable_foreach(&names, unbind_on_destroy, NULL);
destroy_hashtable(&names);
}
int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token) {
return send_node_release(node, message_post(sexp_bytes(name), body, token));
}
int send_node(cmsg_bytes_t node, sexp_t *message) {
node_t *n = lookup_node(node);
if (n == NULL) {
return 0;
}
n->node_class->handle_message(n, message);
return 1;
}
int send_node_release(cmsg_bytes_t node, sexp_t *message) {
int result;
INCREF(message);
result = send_node(node, message);
DECREF(message, sexp_destructor);
return result;
}

60
experiments/cmsg/node.h Normal file
View File

@ -0,0 +1,60 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_node_h
#define cmsg_node_h
typedef struct node_t_ {
refcount_t refcount;
struct node_class_t_ *node_class;
hashtable_t names;
void *extension; /* Each node class puts something different here in its instances */
} node_t;
typedef sexp_t *(*node_extension_fn_t)(node_t *n, sexp_t *args);
typedef void (*node_destructor_fn_t)(node_t *n);
typedef void (*node_message_handler_fn_t)(node_t *n, sexp_t *m);
typedef struct node_class_t_ {
char const *name;
node_extension_fn_t extend;
node_destructor_fn_t destroy;
node_message_handler_fn_t handle_message;
} node_class_t;
extern void init_node(cmsg_bytes_t container_name);
extern cmsg_bytes_t local_container_name(void);
extern void basic_node_destroy(node_t *n);
extern void register_node_class(node_class_t *nc);
extern node_class_t *lookup_node_class(cmsg_bytes_t name);
extern node_t *new_node(node_class_t *nc, sexp_t *args, sexp_t **error_out);
extern void node_destructor(node_t *n);
extern node_t *lookup_node(cmsg_bytes_t name);
extern int bind_node(cmsg_bytes_t name, node_t *n);
extern int unbind_node(cmsg_bytes_t name);
extern void unbind_all_names_for_node(node_t *n);
extern int post_node(cmsg_bytes_t node, cmsg_bytes_t name, sexp_t *body, sexp_t *token);
extern int send_node(cmsg_bytes_t node, sexp_t *message);
extern int send_node_release(cmsg_bytes_t node, sexp_t *message);
#endif

243
experiments/cmsg/queue.c Normal file
View File

@ -0,0 +1,243 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include <ucontext.h>
#include "cmsg_private.h"
#include "harness.h"
#include "ref.h"
#include "sexp.h"
#include "sexpio.h"
#include "hashtable.h"
#include "node.h"
#include "queue.h"
#include "dataq.h"
#include "messages.h"
#include "subscription.h"
typedef struct queue_extension_t_ {
sexp_t *name;
sexp_t *backlog_q;
queue_t waiter_q;
hashtable_t subscriptions;
Process *shovel;
int shovel_awake;
} queue_extension_t;
static sexp_t *queue_extend(node_t *n, sexp_t *args) {
if ((sexp_length(args) == 1) && sexp_stringp(sexp_head(args))) {
cmsg_bytes_t name = sexp_data(sexp_head(args));
queue_extension_t *q = calloc(1, sizeof(*q));
q->name = INCREF(sexp_head(args));
q->backlog_q = INCREF(sexp_new_queue());
q->waiter_q = EMPTY_QUEUE(subscription_t, link);
init_hashtable(&q->subscriptions, 5, NULL, NULL);
q->shovel = NULL;
q->shovel_awake = 0;
n->extension = q;
return bind_node(name, n) ? NULL : sexp_cstring("bind failed");
} else {
return sexp_cstring("invalid args");
}
}
static void queue_destructor(node_t *n) {
queue_extension_t *q = n->extension;
if (q != NULL) { /* can be NULL if queue_extend was given invalid args */
DECREF(q->name, sexp_destructor);
DECREF(q->backlog_q, sexp_destructor);
{
subscription_t *sub = NULL;
while ((sub = dequeue(&q->waiter_q)) != NULL) {
free_subscription(sub);
}
}
destroy_hashtable(&q->subscriptions);
if (q->shovel) {
warn("TODO: the shovel needs to be taken down as well here\n");
/* The difficulty is that the shovel may be running at the
moment, so careful ordering of operations is required to
avoid referencing deallocated memory. */
}
free(q);
}
}
static void end_burst(queue_extension_t *q, size_t *burst_count_ptr, size_t total_count) {
#if 0
if (*burst_count_ptr > 0) {
info("Queue <<%.*s>>: burst count %lu; total %lu\n",
sexp_data(q->name).len, sexp_data(q->name).bytes,
*burst_count_ptr, total_count);
}
#endif
*burst_count_ptr = 0;
}
static void shoveller(void *qv) {
queue_extension_t *q = qv;
size_t burst_count = 0;
size_t total_count = 0;
sexp_t *body = NULL; /* held */
subscription_t *sub = NULL;
{
cmsg_bytes_t n = sexp_data(q->name);
info("Queue <<%.*s>> busy. Shoveller entering\n", n.len, n.bytes);
}
check_for_work:
//info("Checking for work\n");
if (sexp_queue_emptyp(q->backlog_q)) {
//info("Backlog empty\n");
goto wait_and_shovel;
}
body = INCREF(sexp_dequeue(q->backlog_q)); /* held */
find_valid_waiter:
if (q->waiter_q.count == 0) {
//info("No waiters\n");
sexp_queue_pushback(q->backlog_q, body);
DECREF(body, sexp_destructor);
goto wait_and_shovel;
}
sub = dequeue(&q->waiter_q);
/*
info("Delivering to <<%.*s>>/<<%.*s>>...\n",
sexp_data(sub->sink).len, sexp_data(sub->sink).bytes,
sexp_data(sub->name).len, sexp_data(sub->name).bytes);
*/
if (!send_to_subscription(q->name, &q->subscriptions, sub, body)) {
goto find_valid_waiter;
}
burst_count++;
total_count++;
//info("Delivery successful\n");
DECREF(body, sexp_destructor);
enqueue(&q->waiter_q, sub);
if (burst_count >= 10000) {
end_burst(q, &burst_count, total_count);
yield();
}
goto check_for_work;
wait_and_shovel:
end_burst(q, &burst_count, total_count);
//info("Waiting for throck\n");
q->shovel_awake = 0;
/* TODO: if the number of active processes is large, assume we have
memory pressure, and quit the shovel early rather than waiting
for a few milliseconds to see if we're idle. */
if (nap(100)) {
cmsg_bytes_t n = sexp_data(q->name);
info("Queue <<%.*s>> idle. Shoveller exiting\n", n.len, n.bytes);
q->shovel = NULL;
return;
}
//info("Throck received!\n");
goto check_for_work;
}
static void throck_shovel(queue_extension_t *q) {
//int counter = 0;
retry:
//printf("throck %d %d %p\n", counter++, q->shovel_awake, q->shovel);
if (!q->shovel_awake) {
if (!q->shovel) {
q->shovel_awake = 1;
q->shovel = spawn(shoveller, q);
} else {
if (resume(q->shovel) == -1) {
/* The nap() in the shoveller returned and scheduled the
shoveller *just* before we got to it, but the shoveller
hasn't had a chance to run yet, so hasn't been able to
clear q->shovel and exit. The resume() attempt failed
because q->shovel's state is PROCESS_RUNNING, now that it
has been scheduled by the return of nap(), so we know that
we should back off and try again from the top. */
yield();
goto retry;
} else {
/* The resume() was successful, i.e. the nap() hadn't returned
before we tried to resume(). We know that nap() will return
zero (since the timeout didn't fire before the process was
resumed), and so the existing shoveller will continue
running. */
q->shovel_awake = 1;
}
}
}
}
static void queue_handle_message(node_t *n, sexp_t *m) {
queue_extension_t *q = n->extension;
parsed_message_t p;
if (parse_post(m, &p)) {
sexp_enqueue(q->backlog_q, p.post.body);
throck_shovel(q);
return;
}
if (parse_subscribe(m, &p)) {
subscription_t *sub = handle_subscribe_message(q->name, &q->subscriptions, &p);
if (sub != NULL) {
enqueue(&q->waiter_q, sub);
throck_shovel(q);
}
return;
}
if (parse_unsubscribe(m, &p)) {
handle_unsubscribe_message(q->name, &q->subscriptions, &p);
return;
}
warn("Message not understood in queue: ");
sexp_writeln(stderr_h, m);
}
static node_class_t queue_class = {
.name = "queue",
.extend = queue_extend,
.destroy = queue_destructor,
.handle_message = queue_handle_message
};
void init_queue(void) {
register_node_class(&queue_class);
}

23
experiments/cmsg/queue.h Normal file
View File

@ -0,0 +1,23 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_queue_h
#define cmsg_queue_h
extern void init_queue(void);
#endif

56
experiments/cmsg/ref.h Normal file
View File

@ -0,0 +1,56 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_ref_h
#define cmsg_ref_h
typedef struct refcount_t_ {
unsigned int count;
} refcount_t;
#define ZERO_REFCOUNT() ((refcount_t) { .count = 0 })
#define INCREF(x) ({ \
typeof(x) __x = (x); \
if (__x != NULL) { \
__x->refcount.count++; \
} \
__x; \
})
#define UNGRAB(x) ({ \
typeof(x) __x = (x); \
if (__x != NULL) { \
assert(__x->refcount.count); \
__x->refcount.count--; \
} \
__x; \
})
#define DECREF(x, dtor) ({ \
typeof(x) __x = (x); \
if (__x != NULL) { \
assert(__x->refcount.count); \
(__x->refcount.count)--; \
if (__x->refcount.count == 0) { \
(dtor)(__x); \
} \
} \
(typeof(__x)) 0; \
})
#endif

234
experiments/cmsg/relay.c Normal file
View File

@ -0,0 +1,234 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <time.h>
#include <sys/time.h>
#include <assert.h>
typedef unsigned char u_char;
#include <event.h>
#include "cmsg_private.h"
#include "harness.h"
#include "relay.h"
#include "net.h"
#include "ref.h"
#include "sexp.h"
#include "sexpio.h"
#include "hashtable.h"
#include "node.h"
#include "messages.h"
#define WANT_MESSAGE_TRACE 0
typedef struct relay_extension_t_ {
struct sockaddr_in peername;
char peername_str[256];
sexp_t *remote_container_name;
int fd;
IOHandle *outh;
} relay_extension_t;
static long connection_count = 0;
static void stats_printer(void *arg) {
while (1) {
info("%ld connections active\n", connection_count);
nap(1000);
}
}
void init_relay(void) {
spawn(stats_printer, NULL);
}
static sexp_t *relay_extend(node_t *n, sexp_t *args) {
/* TODO: outbound connections; args==NULL -> server relay, nonNULL -> outbound. */
n->extension = calloc(1, sizeof(relay_extension_t));
return NULL;
}
static void relay_destructor(node_t *n) {
relay_extension_t *r = n->extension;
delete_iohandle(r->outh);
r->outh = NULL;
if (close(r->fd) == -1) {
/* log errors as warnings here and keep on trucking */
warn("Closing file descriptor %d produced errno %d: %s\n",
r->fd, errno, strerror(errno));
}
DECREF(r->remote_container_name, sexp_destructor);
free(r);
}
static void relay_handle_message(node_t *n, sexp_t *m) {
relay_extension_t *r = n->extension;
#if WANT_MESSAGE_TRACE
info("fd %d <-- ", r->fd);
sexp_writeln(stderr_h, m);
#endif
BCHECK(!sexp_write(r->outh, m), "relay_handle_message sexp_write");
}
static node_class_t relay_class = {
.name = "relay",
.extend = relay_extend,
.destroy = relay_destructor,
.handle_message = relay_handle_message
};
static void send_error(IOHandle *h, char const *message, sexp_t *details) {
sexp_t *m = message_error(sexp_cstring(message), details);
INCREF(m);
warn("Sending error: ");
sexp_writeln(stderr_h, m);
iohandle_clear_error(h);
BCHECK(!sexp_write(h, m), "send_error sexp_write");
DECREF(m, sexp_destructor);
iohandle_flush(h); /* ignore result here, there's not much we can do with it */
}
static void send_sexp_syntax_error(IOHandle *h, char const *message) {
char const *url = "http://people.csail.mit.edu/rivest/Sexp.txt";
send_error(h, message, sexp_cstring(url));
}
static void relay_main(node_t *n) {
relay_extension_t *r = n->extension;
IOHandle *inh = new_iohandle(r->fd);
sexp_t *message = NULL; /* held */
parsed_message_t p;
INCREF(n); /* because the caller doesn't hold a ref, and we need to
drop ours on our death */
info("Accepted connection from %s on fd %d\n", r->peername_str, r->fd);
connection_count++;
iohandle_write(r->outh, cmsg_cstring_bytes("(3:hop1:0)"));
ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting");
{
sexp_t *s = message_subscribe(sexp_bytes(local_container_name()),
sexp_empty_bytes, sexp_empty_bytes,
sexp_empty_bytes, sexp_empty_bytes);
INCREF(s);
sexp_write(r->outh, s);
DECREF(s, sexp_destructor);
}
//iohandle_settimeout(r->inh, 3, 0);
while (1) {
DECREF(message, sexp_destructor);
message = NULL;
if (!sexp_read(inh, &message)) goto network_error;
INCREF(message);
#if WANT_MESSAGE_TRACE
info("fd %d --> ", r->fd);
sexp_writeln(stderr_h, message);
#endif
if (parse_post(message, &p) && sexp_stringp(p.post.name)) {
cmsg_bytes_t nodename = sexp_data(p.post.name);
if (!send_node(nodename, p.post.body)) {
warn("Was asked to post to unknown node <<%.*s>>\n", nodename.len, nodename.bytes);
}
} else if (parse_subscribe(message, &p)
&& sexp_stringp(p.subscribe.filter)
&& sexp_stringp(p.subscribe.reply_sink)
&& sexp_stringp(p.subscribe.reply_name)) {
if (bind_node(sexp_data(p.subscribe.filter), n)) {
post_node(sexp_data(p.subscribe.reply_sink),
sexp_data(p.subscribe.reply_name),
message_subscribe_ok(p.subscribe.filter),
sexp_empty_bytes);
DECREF(r->remote_container_name, sexp_destructor);
r->remote_container_name = INCREF(p.subscribe.filter);
} else {
cmsg_bytes_t filter = sexp_data(p.subscribe.filter);
warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes);
}
} else if (parse_unsubscribe(message, &p)
&& sexp_stringp(p.unsubscribe.token)) {
cmsg_bytes_t id = sexp_data(p.unsubscribe.token);
if (!unbind_node(id)) {
warn("Unbind failed <<%.*s>>\n", id.len, id.bytes);
}
} else {
send_error(r->outh, "message not understood", message);
goto protocol_error;
}
}
network_error:
if (inh->eof) {
info("Disconnecting fd %d normally.\n", r->fd);
} else {
switch (inh->error_kind) {
case SEXP_ERROR_OVERFLOW:
send_sexp_syntax_error(r->outh, "sexp too big");
break;
case SEXP_ERROR_SYNTAX:
send_sexp_syntax_error(r->outh, "sexp syntax error");
break;
default:
warn("Relay handle error 0x%04X on fd %d: %d, %s\n",
inh->error_kind, r->fd, inh->error_errno, strerror(inh->error_errno));
break;
}
}
protocol_error:
DECREF(message, sexp_destructor);
delete_iohandle(inh);
unbind_all_names_for_node(n);
DECREF(n, node_destructor);
connection_count--;
}
void start_relay(struct sockaddr_in const *peername, int fd) {
node_t *n = new_node(&relay_class, NULL, NULL);
relay_extension_t *r = n->extension;
r->peername = *peername;
endpoint_name(&r->peername, CMSG_BYTES(sizeof(r->peername_str), r->peername_str));
r->remote_container_name = NULL;
r->fd = fd;
r->outh = new_iohandle(r->fd);
spawn((process_main_t) relay_main, n);
}

25
experiments/cmsg/relay.h Normal file
View File

@ -0,0 +1,25 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_relay_h
#define cmsg_relay_h
extern void init_relay(void);
extern void start_relay(struct sockaddr_in const *peername, int fd);
#endif

255
experiments/cmsg/sexp.c Normal file
View File

@ -0,0 +1,255 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include "cmsg_private.h"
#include "ref.h"
#include "sexp.h"
static sexp_t *freelist = NULL;
sexp_t *sexp_empty_bytes = NULL;
void init_sexp(void) {
sexp_empty_bytes = INCREF(sexp_cstring(""));
}
void done_sexp(void) {
int count = 0;
DECREF(sexp_empty_bytes, sexp_destructor);
sexp_empty_bytes = NULL;
while (freelist != NULL) {
sexp_t *x = freelist;
freelist = x->data.pair.tail;
free(x);
count++;
}
info("Released %d cached sexp shells.\n", count);
}
static inline sexp_t *alloc_shell(sexp_type_t kind) {
sexp_t *x = freelist;
if (x == NULL) {
x = malloc(sizeof(*x));
} else {
freelist = x->data.pair.tail;
}
x->refcount = ZERO_REFCOUNT();
x->kind = kind;
return x;
}
static inline void release_shell(sexp_t *x) {
x->data.pair.tail = freelist;
freelist = x;
}
sexp_t *sexp_incref(sexp_t *x) {
return INCREF(x);
}
sexp_t *sexp_decref(sexp_t *x) {
return DECREF(x, sexp_destructor);
}
void sexp_data_destructor(sexp_data_t *data) {
cmsg_bytes_free(data->data);
free(data);
}
void sexp_destructor(sexp_t *x) {
tail_recursion:
switch (x->kind) {
case SEXP_BYTES:
cmsg_bytes_free(x->data.bytes);
break;
case SEXP_SLICE:
DECREF(x->data.slice.data, sexp_data_destructor);
break;
case SEXP_DISPLAY_HINT:
case SEXP_PAIR: {
sexp_t *next = x->data.pair.tail;
DECREF(x->data.pair.head, sexp_destructor);
if (next != NULL) {
if (next->refcount.count == 1) {
release_shell(x);
x = next;
goto tail_recursion;
} else {
DECREF(next, sexp_destructor);
}
}
break;
}
default:
die("Unknown sexp kind %d in dtor\n", x->kind);
}
release_shell(x);
}
sexp_data_t *sexp_data_copy(cmsg_bytes_t body, size_t offset, size_t length) {
assert(offset + length <= body.len);
return sexp_data_alias(cmsg_bytes_malloc_dup(CMSG_BYTES(length, body.bytes + offset)));
}
sexp_data_t *sexp_data_alias(cmsg_bytes_t body) {
sexp_data_t *data = malloc(sizeof(*data));
data->refcount = ZERO_REFCOUNT();
data->data = body;
return data;
}
sexp_t *sexp_cstring(char const *str) {
return sexp_bytes(cmsg_cstring_bytes(str));
}
sexp_t *sexp_bytes(cmsg_bytes_t bytes) {
sexp_t *x = alloc_shell(SEXP_BYTES);
x->data.bytes = cmsg_bytes_malloc_dup(bytes);
return x;
}
sexp_t *sexp_slice(sexp_data_t *data, size_t offset, size_t length) {
sexp_t *x = alloc_shell(SEXP_SLICE);
x->data.slice.data = INCREF(data);
x->data.slice.offset = offset;
x->data.slice.length = length;
return x;
}
sexp_t *sexp_display_hint(sexp_t *hint, sexp_t *body) {
sexp_t *x = alloc_shell(SEXP_DISPLAY_HINT);
assert(sexp_simple_stringp(hint));
assert(sexp_simple_stringp(body));
x->data.pair.head = INCREF(hint);
x->data.pair.tail = INCREF(body);
return x;
}
sexp_t *sexp_cons(sexp_t *head, sexp_t *tail) {
sexp_t *x = alloc_shell(SEXP_PAIR);
x->data.pair.head = INCREF(head);
x->data.pair.tail = INCREF(tail);
return x;
}
cmsg_bytes_t sexp_data(sexp_t *x) {
restart:
switch (x->kind) {
case SEXP_BYTES:
return x->data.bytes;
case SEXP_SLICE:
return CMSG_BYTES(x->data.slice.length,
x->data.slice.data->data.bytes + x->data.slice.offset);
case SEXP_DISPLAY_HINT:
x = x->data.pair.tail;
goto restart;
default:
die("Unknown sexp kind %d in data accessor\n", x->kind);
}
}
int sexp_cmp(sexp_t *a, sexp_t *b) {
tail:
if (a == b) return 0;
if (sexp_stringp(a) && sexp_stringp(b)) {
return cmsg_bytes_cmp(sexp_data(a), sexp_data(b));
}
if (sexp_pairp(a) && sexp_pairp(b)) {
int result = sexp_cmp(sexp_head(a), sexp_head(b));
if (result) return result;
a = sexp_tail(a);
b = sexp_tail(b);
goto tail;
}
if (a == NULL) return -1;
if (b == NULL) return 1;
if (a->kind < b->kind) return -1;
return 1;
}
sexp_t *sexp_assoc(sexp_t *list, cmsg_bytes_t key) {
while (list != NULL) {
sexp_t *candidate = sexp_head(list);
if (sexp_stringp(candidate)) {
cmsg_bytes_t candidate_data = sexp_data(candidate);
if ((candidate_data.len == key.len)
&& (!memcmp(candidate_data.bytes, key.bytes, key.len))) {
return candidate;
}
}
list = sexp_tail(list);
}
return NULL;
}
size_t sexp_length(sexp_t *list) {
size_t result = 0;
while (sexp_pairp(list)) {
result++;
list = sexp_tail(list);
}
return result;
}
sexp_t *sexp_new_queue(void) {
return sexp_cons(NULL, NULL);
}
int sexp_queue_emptyp(sexp_t *q) {
return sexp_head(q) == NULL;
}
void sexp_queue_pushback(sexp_t *q, sexp_t *x) {
sexp_t *cell = sexp_cons(x, sexp_head(q));
if (sexp_head(q) == NULL) {
sexp_settail(q, cell);
}
sexp_sethead(q, cell);
}
void sexp_enqueue(sexp_t *q, sexp_t *x) {
sexp_t *cell = sexp_cons(x, NULL);
if (sexp_head(q) == NULL) {
sexp_sethead(q, cell);
} else {
sexp_settail(sexp_tail(q), cell);
}
sexp_settail(q, cell);
}
sexp_t *sexp_dequeue(sexp_t *q) {
if (sexp_head(q) == NULL) {
return NULL;
} else {
sexp_t *x = INCREF(sexp_head(sexp_head(q)));
sexp_t *cell = sexp_tail(sexp_head(q));
sexp_sethead(q, cell);
if (cell == NULL) {
sexp_settail(q, NULL);
}
UNGRAB(x);
return x;
}
}

162
experiments/cmsg/sexp.h Normal file
View File

@ -0,0 +1,162 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_sexp_h
#define cmsg_sexp_h
typedef struct sexp_data_t_ {
refcount_t refcount;
cmsg_bytes_t data;
} sexp_data_t;
typedef enum sexp_type_t_ {
SEXP_BYTES,
SEXP_SLICE,
SEXP_DISPLAY_HINT,
SEXP_PAIR
} sexp_type_t;
typedef struct sexp_t_ {
refcount_t refcount;
sexp_type_t kind;
union {
cmsg_bytes_t bytes;
struct {
sexp_data_t *data;
size_t offset;
size_t length;
} slice;
struct {
struct sexp_t_ *head;
struct sexp_t_ *tail;
} pair; /* and display-hint */
} data;
} sexp_t;
extern sexp_t *sexp_empty_bytes;
extern void init_sexp(void);
extern void done_sexp(void);
extern sexp_t *sexp_incref(sexp_t *x);
extern sexp_t *sexp_decref(sexp_t *x);
extern void sexp_data_destructor(sexp_data_t *data);
extern void sexp_destructor(sexp_t *x);
extern sexp_data_t *sexp_data_copy(cmsg_bytes_t body, size_t offset, size_t length);
extern sexp_data_t *sexp_data_alias(cmsg_bytes_t body);
extern sexp_t *sexp_cstring(char const *str);
extern sexp_t *sexp_bytes(cmsg_bytes_t bytes);
extern sexp_t *sexp_slice(sexp_data_t *data, size_t offset, size_t length);
extern sexp_t *sexp_display_hint(sexp_t *hint, sexp_t *body);
extern sexp_t *sexp_cons(sexp_t *head, sexp_t *tail);
static inline int sexp_simple_stringp(sexp_t *x) {
return (x != NULL) && ((x->kind == SEXP_BYTES) || (x->kind == SEXP_SLICE));
}
static inline int sexp_stringp(sexp_t *x) {
return sexp_simple_stringp(x) || ((x != NULL) && (x->kind == SEXP_DISPLAY_HINT));
}
static inline int sexp_pairp(sexp_t *x) {
return (x != NULL) && (x->kind == SEXP_PAIR);
}
extern cmsg_bytes_t sexp_data(sexp_t *x);
extern int sexp_cmp(sexp_t *a, sexp_t *b);
static inline sexp_t *sexp_head(sexp_t *x) {
assert(x->kind == SEXP_PAIR);
return x->data.pair.head;
}
static inline sexp_t *sexp_tail(sexp_t *x) {
assert(x->kind == SEXP_PAIR);
return x->data.pair.tail;
}
static inline sexp_t *sexp_hint(sexp_t *x) {
assert(x->kind == SEXP_DISPLAY_HINT);
return x->data.pair.head;
}
static inline sexp_t *sexp_body(sexp_t *x) {
assert(x->kind == SEXP_DISPLAY_HINT);
return x->data.pair.tail;
}
#define sexp_setter_(settername,fieldname) \
static inline sexp_t *settername(sexp_t *x, sexp_t *y) { \
sexp_t *old; \
assert(x->kind == SEXP_PAIR); \
INCREF(y); \
old = x->data.pair.fieldname; \
x->data.pair.fieldname = y; \
DECREF(old, sexp_destructor); \
return x; \
}
sexp_setter_(sexp_sethead, head)
sexp_setter_(sexp_settail, tail)
static inline sexp_t *sexp_push(sexp_t *oldstack, sexp_t *val) {
sexp_t *newstack = INCREF(sexp_cons(val, oldstack));
DECREF(oldstack, sexp_destructor);
return newstack;
}
static inline sexp_t *sexp_pop(sexp_t *oldstack, sexp_t **valp) {
sexp_t *nextstack = INCREF(sexp_tail(oldstack));
sexp_t *val = INCREF(sexp_head(oldstack));
DECREF(oldstack, sexp_destructor);
UNGRAB(val);
if (valp != NULL) {
*valp = val;
}
return nextstack;
}
static inline int sexp_pseudo_pop(sexp_t **px) {
*px = sexp_tail(*px);
return sexp_pairp(*px);
}
static inline sexp_t *sexp_listtail(sexp_t *list, size_t dropcount) {
while (dropcount) {
list = sexp_tail(list);
dropcount--;
}
return list;
}
static inline sexp_t *sexp_listref(sexp_t *list, size_t index) {
return sexp_head(sexp_listtail(list, index));
}
extern sexp_t *sexp_assoc(sexp_t *list, cmsg_bytes_t key);
extern size_t sexp_length(sexp_t *list);
extern sexp_t *sexp_new_queue(void);
extern int sexp_queue_emptyp(sexp_t *q);
extern void sexp_queue_pushback(sexp_t *q, sexp_t *x);
extern void sexp_enqueue(sexp_t *q, sexp_t *x);
extern sexp_t *sexp_dequeue(sexp_t *q);
#endif

247
experiments/cmsg/sexpio.c Normal file
View File

@ -0,0 +1,247 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <ctype.h>
#include <assert.h>
#include <ucontext.h>
#include "cmsg_private.h"
#include "ref.h"
#include "sexp.h"
#include "harness.h"
#include "sexpio.h"
/* TODO: limit size of individual simple strings */
/* TODO: limit nesting of sexps */
static sexp_t *read_simple_string(IOHandle *h, cmsg_bytes_t buf) {
int i = 0;
sexp_t *result;
while (1) {
buf = iohandle_readwait(h, buf.len + 1);
if (buf.len == 0) return NULL;
/* Don't reset i to zero: avoids scanning the beginning of the
number repeatedly */
while (i < buf.len) {
if (i > 10) {
/* More than ten digits of length prefix. We're unlikely to be
able to cope with anything that large. */
h->error_kind = SEXP_ERROR_OVERFLOW;
return NULL;
}
if (buf.bytes[i] == ':') {
size_t count;
buf.bytes[i] = '\0';
count = atoi((char *) buf.bytes);
iohandle_drain(h, i + 1);
buf = iohandle_readwait(h, count);
if (buf.len < count) {
/* Error or EOF. */
return NULL;
}
buf.len = count;
result = sexp_bytes(buf);
iohandle_drain(h, count);
return result;
}
if (!isdigit(buf.bytes[i])) {
h->error_kind = SEXP_ERROR_SYNTAX;
return NULL;
}
i++;
}
}
}
sexp_t *sexp_read_atom(IOHandle *h) {
return read_simple_string(h, EMPTY_BYTES);
}
#define READ1 \
buf = iohandle_readwait(h, 1); \
if (buf.len == 0) goto error;
int sexp_read(IOHandle *h, sexp_t **result_ptr) {
cmsg_bytes_t buf;
sexp_t *stack = NULL; /* held */
sexp_t *hint = NULL; /* held */
sexp_t *body = NULL; /* held */
sexp_t *accumulator = NULL; /* not held */
while (1) {
READ1;
switch (buf.bytes[0]) {
case '[': {
iohandle_drain(h, 1);
hint = INCREF(read_simple_string(h, EMPTY_BYTES));
if (hint == NULL) goto error;
READ1;
if (buf.bytes[0] != ']') {
h->error_kind = SEXP_ERROR_SYNTAX;
goto error;
}
iohandle_drain(h, 1);
skip_whitespace_in_display_hint:
READ1;
if (isspace(buf.bytes[0])) {
iohandle_drain(h, 1);
goto skip_whitespace_in_display_hint;
}
body = INCREF(read_simple_string(h, EMPTY_BYTES));
if (body == NULL) goto error;
accumulator = sexp_display_hint(hint, body);
DECREF(hint, sexp_destructor); /* these could be UNGRABs */
DECREF(body, sexp_destructor);
break;
}
case '(':
iohandle_drain(h, 1);
stack = sexp_push(stack, sexp_cons(NULL, NULL));
continue;
case ')': {
sexp_t *current;
if (stack == NULL) {
h->error_kind = SEXP_ERROR_SYNTAX;
goto error;
}
stack = sexp_pop(stack, &current);
INCREF(current);
iohandle_drain(h, 1);
accumulator = INCREF(sexp_head(current));
DECREF(current, sexp_destructor);
UNGRAB(accumulator);
break;
}
default:
if (isspace(buf.bytes[0])) {
iohandle_drain(h, 1);
continue;
}
buf.len = 1; /* needed to avoid reading too much in read_simple_string */
accumulator = read_simple_string(h, buf);
if (accumulator == NULL) goto error;
break;
}
if (stack == NULL) {
*result_ptr = accumulator;
return 1;
} else {
sexp_t *current = sexp_head(stack); /* not held */
sexp_t *cell = sexp_cons(accumulator, NULL);
if (sexp_tail(current) == NULL) {
sexp_sethead(current, cell);
} else {
sexp_settail(sexp_tail(current), cell);
}
sexp_settail(current, cell);
}
}
error:
DECREF(stack, sexp_destructor);
DECREF(hint, sexp_destructor);
DECREF(body, sexp_destructor);
return 0;
}
void write_simple_string(IOHandle *h, sexp_t *x) {
cmsg_bytes_t data = sexp_data(x);
char lenstr[16];
snprintf(lenstr, sizeof(lenstr), "%u:", (unsigned int) data.len);
lenstr[sizeof(lenstr) - 1] = '\0';
iohandle_write(h, cmsg_cstring_bytes(lenstr));
iohandle_write(h, data);
}
unsigned short sexp_write(IOHandle *h, sexp_t *x) {
sexp_t *stack = NULL; /* held */
sexp_t *current = x;
write1:
if (current == NULL) {
iohandle_write(h, cmsg_cstring_bytes("()"));
} else {
switch (current->kind) {
case SEXP_BYTES:
case SEXP_SLICE:
write_simple_string(h, current);
break;
case SEXP_DISPLAY_HINT:
iohandle_write(h, cmsg_cstring_bytes("["));
write_simple_string(h, sexp_hint(current));
iohandle_write(h, cmsg_cstring_bytes("]"));
write_simple_string(h, sexp_body(current));
break;
case SEXP_PAIR:
iohandle_write(h, cmsg_cstring_bytes("("));
stack = sexp_push(stack, current);
break;
default:
die("Unknown sexp kind %d in sexp_write\n", current->kind);
}
}
check_stack:
if (stack == NULL) {
return 0;
}
{
sexp_t *cell = sexp_head(stack);
if (cell == NULL) {
iohandle_write(h, cmsg_cstring_bytes(")"));
stack = sexp_pop(stack, NULL); /* no need to worry about incref/decref: val is NULL! */
goto check_stack;
}
if (sexp_pairp(cell)) {
current = sexp_head(cell);
sexp_sethead(stack, sexp_tail(cell));
goto write1;
}
return SEXP_ERROR_SYNTAX;
}
}
unsigned short sexp_writeln(IOHandle *h, sexp_t *x) {
unsigned short result;
fflush(NULL);
result = sexp_write(h, x);
if (result == 0) {
iohandle_write(h, cmsg_cstring_bytes("\n"));
ICHECK(iohandle_flush(h), "sexp_writeln iohandle_flush");
}
return result;
}

29
experiments/cmsg/sexpio.h Normal file
View File

@ -0,0 +1,29 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_sexpio_h
#define cmsg_sexpio_h
#define SEXP_ERROR_OVERFLOW 0x8000
#define SEXP_ERROR_SYNTAX 0x8001
extern sexp_t *sexp_read_atom(IOHandle *h);
extern int sexp_read(IOHandle *h, sexp_t **result_ptr);
extern unsigned short sexp_write(IOHandle *h, sexp_t *x);
extern unsigned short sexp_writeln(IOHandle *h, sexp_t *x);
#endif

View File

@ -0,0 +1,158 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include <ucontext.h>
#include "cmsg_private.h"
#include "ref.h"
#include "sexp.h"
#include "hashtable.h"
#include "node.h"
#include "meta.h"
#include "messages.h"
#include "subscription.h"
void free_subscription(subscription_t *sub) {
DECREF(sub->uuid, sexp_destructor);
DECREF(sub->filter, sexp_destructor);
DECREF(sub->sink, sexp_destructor);
DECREF(sub->name, sexp_destructor);
free(sub);
}
void free_subscription_chain(subscription_t *chain) {
while (chain != NULL) {
subscription_t *next = chain->link;
free_subscription(chain);
chain = next;
}
}
/* Returns true if the subscription has not been unsubscribed and the
destination of the subscription exists. */
int send_to_subscription(sexp_t *source,
hashtable_t *subscriptions,
subscription_t *sub,
sexp_t *body)
{
if (sub->uuid == NULL) {
free_subscription(sub);
return 0;
} else if (!post_node(sexp_data(sub->sink), sexp_data(sub->name), body, sub->uuid)) {
announce_subscription(source, sub->filter, sub->sink, sub->name, 0);
hashtable_erase(subscriptions, sexp_data(sub->uuid));
free_subscription(sub);
return 0;
} else {
return 1;
}
}
subscription_t *send_to_subscription_chain(sexp_t *source,
hashtable_t *subscriptions,
subscription_t *chain,
sexp_t *body)
{
subscription_t *top = chain;
subscription_t *prev = NULL;
while (chain != NULL) {
subscription_t *next = chain->link;
if (!send_to_subscription(source, subscriptions, chain, body)) {
if (prev == NULL) {
top = next;
} else {
prev->link = next;
}
}
prev = chain;
chain = next;
}
return top;
}
subscription_t *handle_subscribe_message(sexp_t *source,
hashtable_t *subscriptions,
parsed_message_t *p)
{
unsigned char uuid[CMSG_UUID_BUF_SIZE];
if (gen_uuid(uuid) != 0) {
warn("Could not generate UUID\n");
return NULL;
} else {
subscription_t *sub = malloc(sizeof(*sub));
sub->uuid = INCREF(sexp_bytes(CMSG_BYTES(sizeof(uuid), uuid)));
sub->filter = p->subscribe.filter;
sub->sink = p->subscribe.sink;
sub->name = p->subscribe.name;
sub->link = NULL;
if (!sexp_stringp(sub->filter) || !sexp_stringp(sub->sink) || !sexp_stringp(sub->name)
|| !sexp_stringp(p->subscribe.reply_sink) || !sexp_stringp(p->subscribe.reply_name)) {
DECREF(sub->uuid, sexp_destructor);
free(sub);
warn("Bad sink/name/reply_sink/reply_name in subscribe");
return NULL;
}
INCREF(sub->filter);
INCREF(sub->sink);
INCREF(sub->name);
hashtable_put(subscriptions, sexp_data(sub->uuid), sub);
announce_subscription(source, sub->filter, sub->sink, sub->name, 1);
post_node(sexp_data(p->subscribe.reply_sink),
sexp_data(p->subscribe.reply_name),
message_subscribe_ok(sub->uuid),
sexp_empty_bytes);
return sub;
}
}
void handle_unsubscribe_message(sexp_t *source,
hashtable_t *subscriptions,
parsed_message_t *p)
{
cmsg_bytes_t uuid;
subscription_t *sub;
if (!sexp_stringp(p->unsubscribe.token)) {
warn("Invalid unsubscription\n");
return;
}
uuid = sexp_data(p->unsubscribe.token);
if (hashtable_get(subscriptions, uuid, (void **) &sub)) {
/* TODO: clean up more eagerly perhaps? */
announce_subscription(source, sub->filter, sub->sink, sub->name, 0);
DECREF(sub->uuid, sexp_destructor);
sub->uuid = NULL;
hashtable_erase(subscriptions, uuid);
}
}

View File

@ -0,0 +1,49 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef cmsg_subscription_h
#define cmsg_subscription_h
typedef struct subscription_t_ {
sexp_t *uuid;
sexp_t *filter;
sexp_t *sink;
sexp_t *name;
struct subscription_t_ *link;
} subscription_t;
extern void free_subscription(subscription_t *sub);
extern void free_subscription_chain(subscription_t *chain);
extern int send_to_subscription(sexp_t *source,
hashtable_t *subscriptions,
subscription_t *sub,
sexp_t *body);
extern subscription_t *send_to_subscription_chain(sexp_t *source,
hashtable_t *subscriptions,
subscription_t *chain,
sexp_t *body);
extern subscription_t *handle_subscribe_message(sexp_t *source,
hashtable_t *subscriptions,
parsed_message_t *p);
extern void handle_unsubscribe_message(sexp_t *source,
hashtable_t *subscriptions,
parsed_message_t *p);
#endif

113
experiments/cmsg/util.c Normal file
View File

@ -0,0 +1,113 @@
/* Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
*
* This file is part of Hop.
*
* Hop is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Hop is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
* License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hop. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <stdarg.h>
/* OSSP UUID */
#include <uuid.h>
#include "cmsg_private.h"
#define UUID_CHECK(context) \
if (result != UUID_RC_OK) { \
warn("gen_uuid failed with %d at %s\n", result, context); \
return result; \
}
int gen_uuid(unsigned char *uuid_buf) {
uuid_rc_t result;
uuid_t *uuid;
unsigned char temp_buf[UUID_LEN_STR + 1];
unsigned char *temp_buf_ptr = &temp_buf[0]; /* odd API */
size_t uuid_buf_len = UUID_LEN_STR + 1;
assert(CMSG_UUID_BUF_SIZE == UUID_LEN_STR);
result = uuid_create(&uuid);
UUID_CHECK("uuid_create");
result = uuid_make(uuid, UUID_MAKE_V4);
UUID_CHECK("uuid_make");
result = uuid_export(uuid, UUID_FMT_STR, &temp_buf_ptr, &uuid_buf_len);
UUID_CHECK("uuid_export");
assert(uuid_buf_len == (UUID_LEN_STR + 1));
memcpy(uuid_buf, temp_buf, CMSG_UUID_BUF_SIZE);
result = uuid_destroy(uuid);
UUID_CHECK("uuid_destroy");
return UUID_RC_OK;
}
cmsg_bytes_t cmsg_bytes_malloc_dup(cmsg_bytes_t src) {
cmsg_bytes_t result;
result.len = src.len;
result.bytes = malloc(src.len);
if (result.bytes != NULL) {
memcpy(result.bytes, src.bytes, src.len);
}
return result;
}
cmsg_bytes_t cmsg_bytes_malloc(size_t amount) {
cmsg_bytes_t result;
result.len = amount;
result.bytes = malloc(amount);
return result;
}
void cmsg_bytes_free(cmsg_bytes_t bytes) {
free(bytes.bytes);
}
int cmsg_bytes_cmp(cmsg_bytes_t a, cmsg_bytes_t b) {
if (a.len < b.len) return -1;
if (a.len > b.len) return 1;
return memcmp(a.bytes, b.bytes, a.len);
}
void die(char const *format, ...) {
va_list vl;
va_start(vl, format);
fprintf(stderr, "ERROR: ");
vfprintf(stderr, format, vl);
va_end(vl);
exit(1);
}
void warn(char const *format, ...) {
va_list vl;
va_start(vl, format);
fprintf(stderr, "WARNING: ");
vfprintf(stderr, format, vl);
va_end(vl);
}
void info(char const *format, ...) {
va_list vl;
va_start(vl, format);
fprintf(stderr, "INFO: ");
vfprintf(stderr, format, vl);
va_end(vl);
}

View File

@ -0,0 +1,33 @@
(ql:quickload "flexi-streams")
;(ql:quickload "babel")
(ql:quickload "usocket")
(ql:quickload "cl-match")
(ql:quickload "gbbopen")
(require :portable-threads)
(load "packages.lisp")
(load "sexp.lisp")
(load "network.lisp")
(in-package :cl-user)
;; (defun handle-connection (stream)
;; (spki-sexp:write-sexp (spki-sexp:read-sexp stream) stream))
;; (defun start-server (port)
;; (usocket:socket-server "localhost" port 'handle-connection '()
;; :in-new-thread t
;; :multi-threading t
;; :reuse-address t
;; :element-type '(unsigned-byte 8)))
;; (start-server 5671)
(smsg-network:serve-on-port 5671)
;; (let ((server-socket (socket-listen "localhost" 5671
;; :reuse-address t
;; :element-type unsigned-integer)))
;; (loop for conn = (socket-accept server-socket)
;; do (handle-connection conn)))

View File

@ -0,0 +1,47 @@
(in-package :smsg-network)
(defun command-loop (in out route)
(loop (let ((command (read-sexp in)))
(when (not (handle-inbound-command command in out route))
(return)))))
(defun handle-inbound-command (command in out route)
(ematch-sexp command
(("subscribe" filter sink name reply-sink reply-name)
(if (rebind-node filter nil route)
(when (plusp (length reply-sink))
(post reply-sink reply-name (sexp-build ("subscribe-ok" (= filter)))))
(report! `(rebind-failed ,command))))
(("unsubscribe" id)
(when (not (rebind-node id route nil))
(report! `(rebind-failed ,command))))
(("post" name body token)
(send name body))))
(defun relay (in out localname servermode)
(flet ((route (message)
(write-sexp message out)
(write-byte 13 out)
(write-byte 10 out)))
(if servermode
(route (sexp-quote ("hop" "0")))
(ematch-sexp (read-sexp in)
(("hop" "0") t)))
(force-output out)
(route (sexp-build ("subscribe" (= localname) "" "" "" "")))
(command-loop in out #'route)))
(defun handle-connection (stream)
(relay stream stream (sexp-quote "smsg") t))
(defun serve-on-port (port)
(usocket:socket-server "localhost" port 'handle-connection '()
:in-new-thread t
:multi-threading t
:reuse-address t
:element-type '(unsigned-byte 8)))
(defun client (localname hostname portnumber)
(let ((s (usocket:socket-stream
(usocket:socket-connect hostname portnumber :element-type '(unsigned-byte 8)))))
(relay s s localname nil)))

View File

@ -0,0 +1,33 @@
(defpackage :spki-sexp
(:use :cl :flexi-streams :cl-match)
(:shadow :read-from-string)
(:export :read-sexp :write-sexp
:display-hint
:make-display-hint
:display-hint-p
:copy-display-hint
:display-hint-hint
:display-hint-body
:syntax-error
:bad-length-prefix
:bad-display-hint
:bad-input-character
:unexpected-close-paren
:match-failure
:convert-sexp
:sexp-quote
:sexp-build
:match-sexp
:ematch-sexp))
(defpackage :smsg-network
(:use :cl :flexi-streams :spki-sexp)
(:export :relay
:serve-on-port
:client))

142
experiments/lisp/sexp.lisp Normal file
View File

@ -0,0 +1,142 @@
;; SPKI SEXPs for Common Lisp
(in-package :spki-sexp)
(define-condition syntax-error (error) ())
(define-condition bad-length-prefix (syntax-error) ())
(define-condition bad-display-hint (syntax-error) ())
(define-condition bad-input-character (syntax-error) ())
(define-condition unexpected-close-paren (syntax-error) ())
(define-condition match-failure (error) ())
(defstruct display-hint hint body)
(defun write-integer (n output-stream)
(labels ((w (n)
(when (plusp n)
(multiple-value-bind (top-half lower-digit)
(floor n 10)
(w top-half)
(write-byte (+ lower-digit 48) output-stream)))))
(if (zerop n)
(write-byte 48 output-stream)
(w n))))
(defun write-sexp (sexp &optional (output-stream *standard-output*))
(etypecase sexp
((array (unsigned-byte 8))
(write-integer (length sexp) output-stream)
(write-byte 58 output-stream) ;; #\:
(write-sequence sexp output-stream))
(cons
(write-byte 40 output-stream) ;; #\(
(loop for v in sexp do (write-sexp v output-stream))
(write-byte 41 output-stream)) ;; #\)
(display-hint
(write-byte 91 output-stream)
(write-sexp (display-hint-hint sexp) output-stream)
(write-byte 93 output-stream)
(write-sexp (display-hint-body sexp) output-stream))
(string
(write-sexp (flexi-streams:string-to-octets sexp :external-format :utf-8) output-stream))))
(defun read-simple-string (input-stream &optional (len 0))
(loop (let ((c (read-byte input-stream)))
(if (eql c 58) ;; #\:
(let ((buf (make-array len :element-type '(unsigned-byte 8))))
(read-sequence buf input-stream)
(return buf))
(let ((v (digit-char-p c)))
(if v
(setq len (+ (* len 10) v))
(error 'bad-length-prefix)))))))
(defun read-sexp-list (input-stream)
(loop for v = (read-sexp-inner input-stream)
until (eq v 'end-of-list-marker)
collect v))
(defun read-sexp-inner (input-stream)
(let (result)
(tagbody :retry
(setq result
(let ((c (read-byte input-stream)))
(cond
((eql c 40) (read-sexp-list input-stream)) ;; #\(
((eql c 41) 'end-of-list-marker) ;; #\)
((eql c 91) ;; #\[
(let ((hint (read-simple-string input-stream)))
(when (not (eql (read-byte input-stream) 93)) ;; #\]
(error 'bad-display-hint))
(make-display-hint :hint hint :body (read-simple-string input-stream))))
((<= 48 c 57) (read-simple-string input-stream (- c 48))) ;; digits
((<= c 32) ;; whitespace - convenience for testing
(go :retry))
(t (error 'bad-input-character))))))
result))
(defun read-sexp (&optional (input-stream *standard-input*))
(let ((v (read-sexp-inner input-stream)))
(if (eq v 'end-of-list-marker)
(error 'unexpected-close-paren)
v)))
(defun convert-sexp (val)
(etypecase val
((array (unsigned-byte 8)) val)
(cons (cons (convert-sexp (car val))
(convert-sexp (cdr val))))
(null nil)
(display-hint (make-display-hint
:hint (convert-sexp (display-hint-hint val))
:body (convert-sexp (display-hint-body val))))
(string (flexi-streams:string-to-octets val :external-format :utf-8))))
(defmacro sexp-quote (val)
`(quote ,(convert-sexp val)))
(defun build-sexp (stx)
(etypecase stx
((array (unsigned-byte 8)) stx)
(cons (if (eq (car stx) '=)
(cadr stx)
`(cons ,(build-sexp (car stx))
,(build-sexp (cdr stx)))))
(null 'nil)
(display-hint `(make-display-hint
:hint ,(build-sexp (display-hint-hint stx))
:body ,(build-sexp (display-hint-body stx))))
(string (flexi-streams:string-to-octets stx :external-format :utf-8))))
(defmacro sexp-build (template)
(build-sexp template))
(defun convert-match-pattern (pattern)
(etypecase pattern
((array (unsigned-byte 8)) `(array (1 (unsigned-byte 8)) ,(coerce pattern 'list)))
(cons `(cons ,(convert-match-pattern (car pattern))
,(convert-match-pattern (cdr pattern))))
(null 'nil)
(display-hint `(struct display-hint
(:hint ,(convert-match-pattern (display-hint-hint pattern)))
(:body ,(convert-match-pattern (display-hint-body pattern)))))
(string (convert-match-pattern
(flexi-streams:string-to-octets pattern :external-format :utf-8)))
(symbol pattern)))
(defmacro match-sexp (val &rest clauses)
`(cl-match:match ,val
,@(mapcar (lambda (clause)
`(,(convert-match-pattern (car clause)) ,@(cdr clause)))
clauses)))
(defmacro ematch-sexp (val &rest clauses)
`(match-sexp ,val ,@clauses (_ (error 'match-failure))))
;; Useful for testing
(defun read-from-string (str &optional (external-format :utf-8))
(read-sexp (flexi-streams:make-flexi-stream
(flexi-streams:make-in-memory-input-stream
(flexi-streams:string-to-octets str :external-format external-format))
:external-format external-format)))