From 801a9b785884f8a0f23c8a771c58c4e5c2322a38 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 10 May 2012 17:45:37 -0400 Subject: [PATCH] Very stupid straight-line throughput tests --- server/experiments/.gitignore | 2 + server/experiments/Makefile | 7 ++ server/experiments/consumer.c | 150 ++++++++++++++++++++++++++++++++++ server/experiments/producer.c | 149 +++++++++++++++++++++++++++++++++ 4 files changed, 308 insertions(+) create mode 100644 server/experiments/.gitignore create mode 100644 server/experiments/Makefile create mode 100644 server/experiments/consumer.c create mode 100644 server/experiments/producer.c diff --git a/server/experiments/.gitignore b/server/experiments/.gitignore new file mode 100644 index 0000000..f77eb46 --- /dev/null +++ b/server/experiments/.gitignore @@ -0,0 +1,2 @@ +consumer +producer diff --git a/server/experiments/Makefile b/server/experiments/Makefile new file mode 100644 index 0000000..e3a42a9 --- /dev/null +++ b/server/experiments/Makefile @@ -0,0 +1,7 @@ +all: consumer producer + +clean: + rm -f consumer producer + +%: %.c + $(CC) -O3 -o $@ $< diff --git a/server/experiments/consumer.c b/server/experiments/consumer.c new file mode 100644 index 0000000..fb7d2f9 --- /dev/null +++ b/server/experiments/consumer.c @@ -0,0 +1,150 @@ +/* Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#define EXPECTEDPREFIX "(4:post8:consumer8:" + +static size_t hunt_for_latencies_in(char *buf, size_t count) { + struct timeval now; + char *pos = buf; + char *sentinel = buf + count; + size_t msgsize = 0; + + gettimeofday(&now, NULL); + + while (1) { + char *openptr = memchr(pos, '(', sentinel - pos); + char *closeptr; + uint32_t s, us; + + if (openptr == NULL) break; + + closeptr = memchr(openptr + 1, ')', sentinel - (openptr + 1)); + if (closeptr == NULL) break; + + memcpy(&s, openptr + strlen(EXPECTEDPREFIX), sizeof(uint32_t)); + memcpy(&us, openptr + strlen(EXPECTEDPREFIX) + sizeof(uint32_t), sizeof(uint32_t)); + s = ntohl(s); + us = ntohl(us); + + if (s != 0 || us != 0) { + double delta = (now.tv_sec - s) * 1000000.0 + (now.tv_usec - us); + printf("Latency %g microseconds (%g milliseconds)\n", delta, delta / 1000.0); + } + + msgsize = closeptr + 1 - openptr; + + pos = closeptr + 1; + } + + return msgsize; +} + +int main(int argc, char *argv[]) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in s; + FILE *f; + struct timeval start_time; + long bytecount = -1; + size_t message_size = 0; + long last_report_bytecount = 0; + char idchar = '1'; + char *qclass = "queue"; + + if (argc < 2) { + fprintf(stderr, "Usage: test1 [ []]\n"); + exit(1); + } + + if (argc > 2) { + idchar = argv[2][0]; + } + printf("Idchar: '%c'\n", idchar); + + if (argc > 3) { + qclass = argv[3]; + } + printf("Qclass: %s\n", qclass); + + { + struct hostent *h = gethostbyname(argv[1]); + if (h == NULL) { + fprintf(stderr, "serverhostname lookup: %d\n", h_errno); + exit(1); + } + s.sin_family = AF_INET; + s.sin_addr.s_addr = * (uint32_t *) h->h_addr_list[0]; + s.sin_port = htons(5671); + } + + if (connect(fd, (struct sockaddr *) &s, sizeof(s)) < 0) return 1; + + { + int i = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i)); + } + + f = fdopen(fd, "a+"); + + fprintf(f, "(9:subscribe5:test%c0:0:5:test%c5:login)(4:post7:factory(6:create%d:%s(2:q1)5:test%c1:k)0:)(4:post2:q1(9:subscribe0:5:test%c8:consumer5:test%c1:k)0:)\n", + idchar, idchar, (int) strlen(qclass), qclass, idchar, idchar, idchar); + fflush(f); + + while (1) { + char buf[1024]; + size_t n = read(fd, buf, sizeof(buf)); + if (n == 0) break; + if (n >= strlen(EXPECTEDPREFIX)) { + if (!memcmp(buf, EXPECTEDPREFIX, strlen(EXPECTEDPREFIX))) { + if (bytecount == -1) { + printf("Buffer at start: <<%.*s>>\n", (int) n, buf); + printf("Starting.\n"); + bytecount = 0; + gettimeofday(&start_time, NULL); + } + } + } + if (bytecount >= 0) { + size_t detected_msgsize = hunt_for_latencies_in(buf, n); + bytecount += n; + if (detected_msgsize != 0 && message_size == 0) { + message_size = detected_msgsize; + printf("message_size = %lu\n", message_size); + } + if (message_size != 0) { + if ((bytecount - last_report_bytecount) > (100000 * message_size)) { + struct timeval now; + double delta; + gettimeofday(&now, NULL); + delta = (now.tv_sec - start_time.tv_sec) + (now.tv_usec - start_time.tv_usec) / 1000000.0; + printf("So far received %ld bytes in %g seconds = %g bytes/sec and %g msgs/sec\n", + bytecount, + delta, + bytecount / delta, + bytecount / (delta * message_size)); + fflush(stdout); + last_report_bytecount = bytecount; + } + } + } + } + + return 0; +} diff --git a/server/experiments/producer.c b/server/experiments/producer.c new file mode 100644 index 0000000..8cb2fcb --- /dev/null +++ b/server/experiments/producer.c @@ -0,0 +1,149 @@ +/* Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +static size_t build_message(char *message, uint32_t s, uint32_t us) { + char const *msg_prefix = "(4:post2:q1(4:post0:8:"; + char const *msg_suffix = "0:)0:)"; + size_t prefix_len = strlen(msg_prefix); + size_t suffix_len = strlen(msg_suffix); + uint32_t v; + size_t total_len = 0; + + memcpy(message + total_len, msg_prefix, prefix_len); + total_len += prefix_len; + v = htonl(s); + memcpy(message + total_len, &v, sizeof(uint32_t)); + total_len += sizeof(uint32_t); + v = htonl(us); + memcpy(message + total_len, &v, sizeof(uint32_t)); + total_len += sizeof(uint32_t); + memcpy(message + total_len, msg_suffix, suffix_len); + total_len += suffix_len; + + /* + printf("%d<<", total_len); + fwrite(message, total_len, 1, stdout); + printf(">>\n"); + */ + return total_len; +} + +int main(int argc, char *argv[]) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in s; + FILE *f; + struct timeval start_time; + long bytecount = 0; + int i; + unsigned long hz_limit = 1000000; + unsigned long msgcount = 10000000; + + assert(sizeof(uint32_t) == 4); + + if (argc < 2) { + fprintf(stderr, "Usage: test1 [ []]\n"); + exit(1); + } + + if (argc > 2) { + hz_limit = strtoul(argv[2], NULL, 0); + } + printf("hz_limit = %lu\n", hz_limit); + + if (argc > 3) { + msgcount = strtoul(argv[3], NULL, 0); + } + printf("msgcount = %lu\n", msgcount); + + { + struct hostent *h = gethostbyname(argv[1]); + if (h == NULL) { + fprintf(stderr, "serverhostname lookup: %d\n", h_errno); + exit(1); + } + s.sin_family = AF_INET; + s.sin_addr.s_addr = * (uint32_t *) h->h_addr_list[0]; + s.sin_port = htons(5671); + } + + if (connect(fd, (struct sockaddr *) &s, sizeof(s)) < 0) return 1; + + { + int i = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i)); + } + + f = fdopen(fd, "a+"); + + fprintf(f, "(9:subscribe5:test30:0:5:test35:login)"); + fflush(f); + + usleep(100000); + { + char buf[4096]; + size_t n = read(fd, buf, sizeof(buf)); + printf("Received: <<%.*s>>\n", (int) n, buf); + } + + gettimeofday(&start_time, NULL); + + for (i = 0; i < msgcount; i++) { + char message[1024]; + size_t msglen; + while (1) { + struct timeval now; + double delta; + gettimeofday(&now, NULL); + delta = (now.tv_sec - start_time.tv_sec) + (now.tv_usec - start_time.tv_usec) / 1000000.0; + if (i / delta <= hz_limit) break; + fflush(f); + usleep(1000); + } + if ((i % (hz_limit / 4)) == 0) { + struct timeval now; + gettimeofday(&now, NULL); + msglen = build_message(message, now.tv_sec, now.tv_usec); + } else { + msglen = build_message(message, 0, 0); + } + fwrite(message, msglen, 1, f); + bytecount += msglen; + if ((bytecount % 100000) < msglen) { + struct timeval now; + double delta; + gettimeofday(&now, NULL); + delta = (now.tv_sec - start_time.tv_sec) + (now.tv_usec - start_time.tv_usec) / 1000000.0; + printf("So far sent %ld bytes in %g seconds = %g bytes/sec and %g msgs/sec\n", + bytecount, + delta, + bytecount / delta, + bytecount / (delta * msglen)); + fflush(stdout); + } + } + + fprintf(f, "(11:unsubscribe5:test3)"); + fflush(f); + + fclose(f); + + return 0; +}