Simple multi-connection testing + stats printing

This commit is contained in:
Tony Garnock-Jones 2011-06-27 12:33:51 -04:00
parent d9d4b02002
commit bd9f124a62
4 changed files with 89 additions and 0 deletions

69
java/hop/TestScale.java Normal file
View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class TestScale {
static AtomicLong counter = new AtomicLong();
public static void main(final String[] args) {
try {
final String hostname = args[0];
int count = Integer.parseInt(args[1]);
System.out.println("Hostname: " + hostname);
for (int i = 0; i < count; i++) {
new Thread(new Runnable() { public void run() {
try {
runConnection(hostname);
} catch (Exception e) {
e.printStackTrace();
}
} }).start();
Thread.sleep(100);
}
while (true) {
long startTime = System.currentTimeMillis();
long startCount = counter.longValue();
Thread.sleep(1000);
long now = System.currentTimeMillis();
long countNow = counter.longValue();
report(startTime, startCount, now, countNow);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void report(long t0, long c0, long t1, long c1) {
double dc = c1 - c0;
double dt = (t1 - t0) / 1000.0;
double rate = dc / dt;
System.out.println(dc + " messages in " + dt + "s = " + rate + " Hz");
}
public static void runConnection(String hostname) throws IOException, InterruptedException {
NodeContainer nc = new NodeContainer();
String qName = nc.getName() + "q";
System.out.println("Queue: " + qName);
Relay r = new Relay(nc, hostname);
ServerApi api = new ServerApi(nc, r.getRemoteName());
api.createQueue(qName);
Subscription sub = api.subscribe(qName, null);
while (true) {
Object in = "a";
api.post(qName, "", in, null);
api.flush();
Object out = sub.getQueue().take();
assert in.equals(out);
counter.incrementAndGet();
}
}
}

View File

@ -23,6 +23,7 @@ typedef unsigned char u_char;
#include "queue.h"
#include "direct.h"
#include "fanout.h"
#include "relay.h"
#include "meta.h"
#include "messages.h"
#include "sexpio.h"
@ -97,6 +98,7 @@ int main(int argc, char *argv[]) {
init_queue();
init_direct();
init_fanout();
init_relay();
init_meta();
#if WANT_CONSOLE_LISTENER
spawn(console_listener, NULL);

View File

@ -42,6 +42,19 @@ typedef struct relay_extension_t_ {
IOHandle *outh;
} relay_extension_t;
static long connection_count = 0;
static void stats_printer(void *arg) {
while (1) {
info("%ld connections active\n", connection_count);
nap(1000);
}
}
void init_relay(void) {
spawn(stats_printer, NULL);
}
static sexp_t *relay_extend(node_t *n, sexp_t *args) {
/* TODO: outbound connections; args==NULL -> server relay, nonNULL -> outbound. */
n->extension = calloc(1, sizeof(relay_extension_t));
@ -105,6 +118,7 @@ static void relay_main(node_t *n) {
drop ours on our death */
info("Accepted connection from %s on fd %d\n", r->peername_str, r->fd);
connection_count++;
iohandle_write(r->outh, cmsg_cstring_bytes("(3:hop1:0)"));
ICHECK(iohandle_flush(r->outh), "iohandle_flush greeting");
@ -189,6 +203,8 @@ static void relay_main(node_t *n) {
delete_iohandle(inh);
unbind_all_names_for_node(n);
DECREF(n, node_destructor);
connection_count--;
}
void start_relay(struct sockaddr_in const *peername, int fd) {

View File

@ -3,6 +3,8 @@
#ifndef cmsg_relay_h
#define cmsg_relay_h
extern void init_relay(void);
extern void start_relay(struct sockaddr_in const *peername, int fd);
#endif