From bd9f124a6246652baa07f66966f48bf424cb3781 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 27 Jun 2011 12:33:51 -0400 Subject: [PATCH] Simple multi-connection testing + stats printing --- java/hop/TestScale.java | 69 +++++++++++++++++++++++++++++++++++++++++ server/main.c | 2 ++ server/relay.c | 16 ++++++++++ server/relay.h | 2 ++ 4 files changed, 89 insertions(+) create mode 100644 java/hop/TestScale.java diff --git a/java/hop/TestScale.java b/java/hop/TestScale.java new file mode 100644 index 0000000..b921299 --- /dev/null +++ b/java/hop/TestScale.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + */ +public class TestScale { + static AtomicLong counter = new AtomicLong(); + + public static void main(final String[] args) { + try { + final String hostname = args[0]; + int count = Integer.parseInt(args[1]); + System.out.println("Hostname: " + hostname); + for (int i = 0; i < count; i++) { + new Thread(new Runnable() { public void run() { + try { + runConnection(hostname); + } catch (Exception e) { + e.printStackTrace(); + } + } }).start(); + Thread.sleep(100); + } + while (true) { + long startTime = System.currentTimeMillis(); + long startCount = counter.longValue(); + Thread.sleep(1000); + long now = System.currentTimeMillis(); + long countNow = counter.longValue(); + report(startTime, startCount, now, countNow); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void report(long t0, long c0, long t1, long c1) { + double dc = c1 - c0; + double dt = (t1 - t0) / 1000.0; + double rate = dc / dt; + System.out.println(dc + " messages in " + dt + "s = " + rate + " Hz"); + } + + public static void runConnection(String hostname) throws IOException, InterruptedException { + NodeContainer nc = new NodeContainer(); + String qName = nc.getName() + "q"; + System.out.println("Queue: " + qName); + + Relay r = new Relay(nc, hostname); + ServerApi api = new ServerApi(nc, r.getRemoteName()); + + api.createQueue(qName); + Subscription sub = api.subscribe(qName, null); + while (true) { + Object in = "a"; + api.post(qName, "", in, null); + api.flush(); + Object out = sub.getQueue().take(); + assert in.equals(out); + counter.incrementAndGet(); + } + } +} diff --git a/server/main.c b/server/main.c index 5bae2b2..4a1cc59 100644 --- a/server/main.c +++ b/server/main.c @@ -23,6 +23,7 @@ typedef unsigned char u_char; #include "queue.h" #include "direct.h" #include "fanout.h" +#include "relay.h" #include "meta.h" #include "messages.h" #include "sexpio.h" @@ -97,6 +98,7 @@ int main(int argc, char *argv[]) { init_queue(); init_direct(); init_fanout(); + init_relay(); init_meta(); #if WANT_CONSOLE_LISTENER spawn(console_listener, NULL); diff --git a/server/relay.c b/server/relay.c index 35c6469..8092d79 100644 --- a/server/relay.c +++ b/server/relay.c @@ -42,6 +42,19 @@ typedef struct relay_extension_t_ { IOHandle *outh; } relay_extension_t; +static long connection_count = 0; + +static void stats_printer(void *arg) { + while (1) { + info("%ld connections active\n", connection_count); + nap(1000); + } +} + +void init_relay(void) { + spawn(stats_printer, NULL); +} + static sexp_t *relay_extend(node_t *n, sexp_t *args) { /* TODO: outbound connections; args==NULL -> server relay, nonNULL -> outbound. */ n->extension = calloc(1, sizeof(relay_extension_t)); @@ -105,6 +118,7 @@ static void relay_main(node_t *n) { drop ours on our death */ info("Accepted connection from %s on fd %d\n", r->peername_str, r->fd); + connection_count++; iohandle_write(r->outh, cmsg_cstring_bytes("(3:hop1:0)")); ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting"); @@ -189,6 +203,8 @@ static void relay_main(node_t *n) { delete_iohandle(inh); unbind_all_names_for_node(n); DECREF(n, node_destructor); + + connection_count--; } void start_relay(struct sockaddr_in const *peername, int fd) { diff --git a/server/relay.h b/server/relay.h index 01e60bc..b5b69f8 100644 --- a/server/relay.h +++ b/server/relay.h @@ -3,6 +3,8 @@ #ifndef cmsg_relay_h #define cmsg_relay_h +extern void init_relay(void); + extern void start_relay(struct sockaddr_in const *peername, int fd); #endif