Peers introduce each other at connect time

This commit is contained in:
Tony Garnock-Jones 2011-01-05 21:29:58 -05:00
parent a3f5e89db8
commit e357165ed2
4 changed files with 38 additions and 3 deletions

View File

@ -101,7 +101,7 @@ int main(int argc, char *argv[]) {
signal(SIGPIPE, SIG_IGN); /* avoid EPIPE when connections drop unexpectedly */
info("Using libevent version %s\n", event_get_version());
init_sexp();
init_node();
init_node(cmsg_cstring_bytes("server"));
init_factory();
init_queue();
init_direct();

View File

@ -18,6 +18,7 @@
#include "node.h"
#include "meta.h"
static cmsg_bytes_t _container_name;
static hashtable_t node_class_table;
static hashtable_t directory;
@ -29,7 +30,16 @@ static void node_decref(void *arg) {
DECREF((node_t *) arg, node_destructor);
}
void init_node(void) {
void init_node(cmsg_bytes_t container_name) {
if (container_name.len == 0) {
unsigned char buf[CMSG_UUID_BUF_SIZE];
gen_uuid(buf);
_container_name = cmsg_bytes_malloc_dup(CMSG_BYTES(CMSG_UUID_BUF_SIZE, buf));
} else {
_container_name = cmsg_bytes_malloc_dup(container_name);
}
info("Local container name is <<%.*s>>\n", _container_name.len, _container_name.bytes);
init_hashtable(&node_class_table,
31,
NULL,
@ -40,6 +50,10 @@ void init_node(void) {
node_decref);
}
cmsg_bytes_t local_container_name(void) {
return _container_name;
}
void register_node_class(node_class_t *nc) {
cmsg_bytes_t key = cmsg_cstring_bytes(nc->name);
if (hashtable_contains(&node_class_table, key)) {

View File

@ -21,7 +21,9 @@ typedef struct node_class_t_ {
node_message_handler_fn_t handle_message;
} node_class_t;
extern void init_node(void);
extern void init_node(cmsg_bytes_t container_name);
extern cmsg_bytes_t local_container_name(void);
extern void basic_node_destroy(node_t *n);

View File

@ -36,6 +36,7 @@ typedef unsigned char u_char;
typedef struct relay_extension_t_ {
struct sockaddr_in peername;
char peername_str[256];
sexp_t *remote_container_name;
int fd;
IOHandle *outh;
} relay_extension_t;
@ -55,6 +56,7 @@ static void relay_destructor(node_t *n) {
warn("Closing file descriptor %d produced errno %d: %s\n",
r->fd, errno, strerror(errno));
}
DECREF(r->remote_container_name, sexp_destructor);
free(r);
}
@ -106,6 +108,19 @@ static void relay_main(node_t *n) {
iohandle_write(r->outh, cmsg_cstring_bytes("(3:hop1:0)"));
ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting");
{
sexp_t *s = NULL;
s = sexp_cons(sexp_empty_bytes, s);
s = sexp_cons(sexp_empty_bytes, s);
s = sexp_cons(sexp_empty_bytes, s);
s = sexp_cons(sexp_empty_bytes, s);
s = sexp_cons(sexp_bytes(local_container_name()), s);
s = sexp_cons(sexp_cstring("subscribe"), s);
INCREF(s);
sexp_write(r->outh, s);
DECREF(s, sexp_destructor);
}
//iohandle_settimeout(r->inh, 3, 0);
while (1) {
@ -152,6 +167,9 @@ static void relay_main(node_t *n) {
INCREF(subok);
post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes);
DECREF(subok, sexp_destructor);
DECREF(r->remote_container_name, sexp_destructor);
r->remote_container_name = INCREF(filter_sexp);
} else {
warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes);
}
@ -201,6 +219,7 @@ void start_relay(struct sockaddr_in const *peername, int fd) {
relay_extension_t *r = n->extension;
r->peername = *peername;
endpoint_name(&r->peername, CMSG_BYTES(sizeof(r->peername_str), r->peername_str));
r->remote_container_name = NULL;
r->fd = fd;
r->outh = new_iohandle(r->fd);
spawn((process_main_t) relay_main, n);