diff --git a/Makefile b/Makefile index b8ce632..5238fcf 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ clean: rm -f $(OBJECTS) rm -rf *.dSYM rm -f depend.mk - rm -f test1 test1.o test3 test3.o + rm -f test{1,3}{,_latency}{,.o} depend.mk: gcc $(CFLAGS) -M *.c > $@ diff --git a/test1_latency.c b/test1_latency.c new file mode 100644 index 0000000..f801853 --- /dev/null +++ b/test1_latency.c @@ -0,0 +1,120 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#define EXPECTEDPREFIX "(4:post8:consumer8:" + +static void hunt_for_latencies_in(char *buf, size_t count) { + struct timeval now; + char *pos = buf; + char *sentinel = buf + count; + + gettimeofday(&now, NULL); + + while (1) { + char *openptr = memchr(pos, '(', sentinel - pos); + char *closeptr; + uint32_t s, us; + + if (openptr == NULL) break; + + closeptr = memchr(openptr + 1, ')', sentinel - (openptr + 1)); + if (closeptr == NULL) break; + + memcpy(&s, openptr + strlen(EXPECTEDPREFIX), sizeof(uint32_t)); + memcpy(&us, openptr + strlen(EXPECTEDPREFIX) + sizeof(uint32_t), sizeof(uint32_t)); + s = ntohl(s); + us = ntohl(us); + + if (s != 0 || us != 0) { + double delta = (now.tv_sec - s) * 1000000.0 + (now.tv_usec - us); + printf("Latency %g microseconds (%g milliseconds)\n", delta, delta / 1000.0); + } + + pos = closeptr + 1; + } +} + +int main(int argc, char *argv[]) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in s; + FILE *f; + struct timeval start_time; + long bytecount = -1; + + if (argc < 2) { + fprintf(stderr, "Usage: test1 \n"); + exit(1); + } + + { + struct hostent *h = gethostbyname(argv[1]); + if (h == NULL) { + fprintf(stderr, "serverhostname lookup: %d\n", h_errno); + exit(1); + } + s.sin_family = AF_INET; + s.sin_addr.s_addr = * (uint32_t *) h->h_addr_list[0]; + s.sin_port = htons(5671); + } + + if (connect(fd, (struct sockaddr *) &s, sizeof(s)) < 0) return 1; + + f = fdopen(fd, "a+"); + + fprintf(f, "(9:subscribe5:test10:0:5:test15:login)(4:post7:factory(6:create5:queue(2:q1)5:test11:k)0:)(4:post2:q1(9:subscribe0:5:test18:consumer5:test11:k)0:)\n"); + fflush(f); + +#define PAYLOADSIZE 8 +#define MESSAGESIZE 59 + PAYLOADSIZE /* 59by overhead, incl 36by subscription token (!) */ + + while (1) { + char buf[1024]; + size_t n = read(fd, buf, sizeof(buf)); + if (n == 0) break; + if (n >= strlen(EXPECTEDPREFIX)) { + if (!memcmp(buf, EXPECTEDPREFIX, strlen(EXPECTEDPREFIX))) { + if (bytecount == -1) { + printf("Buffer at start: <<%.*s>>\n", (int) n, buf); + printf("Starting.\n"); + bytecount = 0; + gettimeofday(&start_time, NULL); + } + } + } + if (bytecount >= 0) { + hunt_for_latencies_in(buf, n); + bytecount += n; + if ((bytecount % 100000) < MESSAGESIZE) { + struct timeval now; + double delta; + gettimeofday(&now, NULL); + delta = (now.tv_sec - start_time.tv_sec) + (now.tv_usec - start_time.tv_usec) / 1000000.0; + printf("So far received %ld bytes in %g seconds = %g bytes/sec and %g msgs/sec\n", + bytecount, + delta, + bytecount / delta, + bytecount / (delta * MESSAGESIZE)); + fflush(stdout); + } + } + } + + return 0; +} diff --git a/test3_latency.c b/test3_latency.c new file mode 100644 index 0000000..981644d --- /dev/null +++ b/test3_latency.c @@ -0,0 +1,137 @@ +/* Copyright (C) 2010 Tony Garnock-Jones. All rights reserved. */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +static size_t build_message(char *message, uint32_t s, uint32_t us) { + char const *msg_prefix = "(4:post2:q1(4:post0:8:"; + char const *msg_suffix = "0:)0:)"; + size_t prefix_len = strlen(msg_prefix); + size_t suffix_len = strlen(msg_suffix); + uint32_t v; + size_t total_len = 0; + + memcpy(message + total_len, msg_prefix, prefix_len); + total_len += prefix_len; + v = htonl(s); + memcpy(message + total_len, &v, sizeof(uint32_t)); + total_len += sizeof(uint32_t); + v = htonl(us); + memcpy(message + total_len, &v, sizeof(uint32_t)); + total_len += sizeof(uint32_t); + memcpy(message + total_len, msg_suffix, suffix_len); + total_len += suffix_len; + + /* + printf("%d<<", total_len); + fwrite(message, total_len, 1, stdout); + printf(">>\n"); + */ + return total_len; +} + +int main(int argc, char *argv[]) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in s; + FILE *f; + struct timeval start_time; + long bytecount = 0; + int i; + unsigned long hz_limit = 1000000; + + assert(sizeof(uint32_t) == 4); + + if (argc < 2) { + fprintf(stderr, "Usage: test1 []\n"); + exit(1); + } + + if (argc > 2) { + hz_limit = strtoul(argv[2], NULL, 0); + } + printf("hz_limit = %lu\n", hz_limit); + + { + struct hostent *h = gethostbyname(argv[1]); + if (h == NULL) { + fprintf(stderr, "serverhostname lookup: %d\n", h_errno); + exit(1); + } + s.sin_family = AF_INET; + s.sin_addr.s_addr = * (uint32_t *) h->h_addr_list[0]; + s.sin_port = htons(5671); + } + + if (connect(fd, (struct sockaddr *) &s, sizeof(s)) < 0) return 1; + + f = fdopen(fd, "a+"); + + fprintf(f, "(9:subscribe5:test30:0:5:test35:login)"); + fflush(f); + + usleep(100000); + { + char buf[4096]; + size_t n = read(fd, buf, sizeof(buf)); + printf("Received: <<%.*s>>\n", (int) n, buf); + } + + gettimeofday(&start_time, NULL); + + for (i = 0; i < 10000000; i++) { + char message[1024]; + size_t msglen; + while (1) { + struct timeval now; + double delta; + gettimeofday(&now, NULL); + delta = (now.tv_sec - start_time.tv_sec) + (now.tv_usec - start_time.tv_usec) / 1000000.0; + if (i / delta <= hz_limit) break; + fflush(f); + usleep(1000); + } + if ((i % (hz_limit / 4)) == 0) { + struct timeval now; + gettimeofday(&now, NULL); + msglen = build_message(message, now.tv_sec, now.tv_usec); + } else { + msglen = build_message(message, 0, 0); + } + fwrite(message, msglen, 1, f); + bytecount += msglen; + if ((bytecount % 100000) < msglen) { + struct timeval now; + double delta; + gettimeofday(&now, NULL); + delta = (now.tv_sec - start_time.tv_sec) + (now.tv_usec - start_time.tv_usec) / 1000000.0; + printf("So far sent %ld bytes in %g seconds = %g bytes/sec and %g msgs/sec\n", + bytecount, + delta, + bytecount / delta, + bytecount / (delta * msglen)); + fflush(stdout); + } + } + + fprintf(f, "(11:unsubscribe5:test3)"); + fflush(f); + + fclose(f); + + return 0; +}