/* Copyright (C) 2010, 2011 Tony Garnock-Jones. All rights reserved. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define EXPECTEDPREFIX "(4:post8:consumer8:" static size_t hunt_for_latencies_in(char *buf, size_t count) { struct timeval now; char *pos = buf; char *sentinel = buf + count; size_t msgsize = 0; gettimeofday(&now, NULL); while (1) { char *openptr = memchr(pos, '(', sentinel - pos); char *closeptr; uint32_t s, us; if (openptr == NULL) break; closeptr = memchr(openptr + 1, ')', sentinel - (openptr + 1)); if (closeptr == NULL) break; memcpy(&s, openptr + strlen(EXPECTEDPREFIX), sizeof(uint32_t)); memcpy(&us, openptr + strlen(EXPECTEDPREFIX) + sizeof(uint32_t), sizeof(uint32_t)); s = ntohl(s); us = ntohl(us); if (s != 0 || us != 0) { double delta = (now.tv_sec - s) * 1000000.0 + (now.tv_usec - us); printf("Latency %g microseconds (%g milliseconds)\n", delta, delta / 1000.0); } msgsize = closeptr + 1 - openptr; pos = closeptr + 1; } return msgsize; } int main(int argc, char *argv[]) { int fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in s; FILE *f; struct timeval start_time; long bytecount = -1; size_t message_size = 0; long last_report_bytecount = 0; char idchar = '1'; char *qclass = "queue"; if (argc < 2) { fprintf(stderr, "Usage: test1 [ []]\n"); exit(1); } if (argc > 2) { idchar = argv[2][0]; } printf("Idchar: '%c'\n", idchar); if (argc > 3) { qclass = argv[3]; } printf("Qclass: %s\n", qclass); { struct hostent *h = gethostbyname(argv[1]); if (h == NULL) { fprintf(stderr, "serverhostname lookup: %d\n", h_errno); exit(1); } s.sin_family = AF_INET; s.sin_addr.s_addr = * (uint32_t *) h->h_addr_list[0]; s.sin_port = htons(5671); } if (connect(fd, (struct sockaddr *) &s, sizeof(s)) < 0) return 1; { int i = 1; setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i)); } f = fdopen(fd, "a+"); fprintf(f, "(9:subscribe5:test%c0:0:5:test%c5:login)\n", idchar, idchar); fprintf(f, "(4:post7:factory(6:create%d:%s(2:q1)5:test%c1:k)0:)\n", (int) strlen(qclass), qclass, idchar); fflush(f); usleep(100000); fprintf(f, "(4:post2:q1(9:subscribe0:5:test%c8:consumer5:test%c1:k)0:)\n", idchar, idchar); fflush(f); while (1) { char buf[1024]; size_t n = read(fd, buf, sizeof(buf)); if (n == 0) break; if (n >= strlen(EXPECTEDPREFIX)) { if (!memcmp(buf, EXPECTEDPREFIX, strlen(EXPECTEDPREFIX))) { if (bytecount == -1) { printf("Buffer at start: <<%.*s>>\n", (int) n, buf); printf("Starting.\n"); bytecount = 0; gettimeofday(&start_time, NULL); } } } if (bytecount >= 0) { size_t detected_msgsize = hunt_for_latencies_in(buf, n); bytecount += n; if (detected_msgsize != 0 && message_size == 0) { message_size = detected_msgsize; printf("message_size = %lu\n", message_size); } if (message_size != 0) { if ((bytecount - last_report_bytecount) > (100000 * message_size)) { struct timeval now; double delta; gettimeofday(&now, NULL); delta = (now.tv_sec - start_time.tv_sec) + (now.tv_usec - start_time.tv_usec) / 1000000.0; printf("So far received %ld bytes in %g seconds = %g bytes/sec and %g msgs/sec\n", bytecount, delta, bytecount / delta, bytecount / (delta * message_size)); fflush(stdout); last_report_bytecount = bytecount; } } } } return 0; }