diff --git a/server/main.c b/server/main.c index f160bb6..c6b0c24 100644 --- a/server/main.c +++ b/server/main.c @@ -101,7 +101,7 @@ int main(int argc, char *argv[]) { signal(SIGPIPE, SIG_IGN); /* avoid EPIPE when connections drop unexpectedly */ info("Using libevent version %s\n", event_get_version()); init_sexp(); - init_node(); + init_node(cmsg_cstring_bytes("server")); init_factory(); init_queue(); init_direct(); diff --git a/server/node.c b/server/node.c index d6ece44..fc7bace 100644 --- a/server/node.c +++ b/server/node.c @@ -18,6 +18,7 @@ #include "node.h" #include "meta.h" +static cmsg_bytes_t _container_name; static hashtable_t node_class_table; static hashtable_t directory; @@ -29,7 +30,16 @@ static void node_decref(void *arg) { DECREF((node_t *) arg, node_destructor); } -void init_node(void) { +void init_node(cmsg_bytes_t container_name) { + if (container_name.len == 0) { + unsigned char buf[CMSG_UUID_BUF_SIZE]; + gen_uuid(buf); + _container_name = cmsg_bytes_malloc_dup(CMSG_BYTES(CMSG_UUID_BUF_SIZE, buf)); + } else { + _container_name = cmsg_bytes_malloc_dup(container_name); + } + info("Local container name is <<%.*s>>\n", _container_name.len, _container_name.bytes); + init_hashtable(&node_class_table, 31, NULL, @@ -40,6 +50,10 @@ void init_node(void) { node_decref); } +cmsg_bytes_t local_container_name(void) { + return _container_name; +} + void register_node_class(node_class_t *nc) { cmsg_bytes_t key = cmsg_cstring_bytes(nc->name); if (hashtable_contains(&node_class_table, key)) { diff --git a/server/node.h b/server/node.h index d33e99e..172e11b 100644 --- a/server/node.h +++ b/server/node.h @@ -21,7 +21,9 @@ typedef struct node_class_t_ { node_message_handler_fn_t handle_message; } node_class_t; -extern void init_node(void); +extern void init_node(cmsg_bytes_t container_name); + +extern cmsg_bytes_t local_container_name(void); extern void basic_node_destroy(node_t *n); diff --git a/server/relay.c b/server/relay.c index 11f0dae..964dfe0 100644 --- a/server/relay.c +++ b/server/relay.c @@ -36,6 +36,7 @@ typedef unsigned char u_char; typedef struct relay_extension_t_ { struct sockaddr_in peername; char peername_str[256]; + sexp_t *remote_container_name; int fd; IOHandle *outh; } relay_extension_t; @@ -55,6 +56,7 @@ static void relay_destructor(node_t *n) { warn("Closing file descriptor %d produced errno %d: %s\n", r->fd, errno, strerror(errno)); } + DECREF(r->remote_container_name, sexp_destructor); free(r); } @@ -106,6 +108,19 @@ static void relay_main(node_t *n) { iohandle_write(r->outh, cmsg_cstring_bytes("(3:hop1:0)")); ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting"); + { + sexp_t *s = NULL; + s = sexp_cons(sexp_empty_bytes, s); + s = sexp_cons(sexp_empty_bytes, s); + s = sexp_cons(sexp_empty_bytes, s); + s = sexp_cons(sexp_empty_bytes, s); + s = sexp_cons(sexp_bytes(local_container_name()), s); + s = sexp_cons(sexp_cstring("subscribe"), s); + INCREF(s); + sexp_write(r->outh, s); + DECREF(s, sexp_destructor); + } + //iohandle_settimeout(r->inh, 3, 0); while (1) { @@ -152,6 +167,9 @@ static void relay_main(node_t *n) { INCREF(subok); post_node(sexp_data(reply_sink), sexp_data(reply_name), subok, sexp_empty_bytes); DECREF(subok, sexp_destructor); + + DECREF(r->remote_container_name, sexp_destructor); + r->remote_container_name = INCREF(filter_sexp); } else { warn("Bind failed <<%.*s>>\n", filter.len, filter.bytes); } @@ -201,6 +219,7 @@ void start_relay(struct sockaddr_in const *peername, int fd) { relay_extension_t *r = n->extension; r->peername = *peername; endpoint_name(&r->peername, CMSG_BYTES(sizeof(r->peername_str), r->peername_str)); + r->remote_container_name = NULL; r->fd = fd; r->outh = new_iohandle(r->fd); spawn((process_main_t) relay_main, n);