Compare commits

..

1 Commits
main ... WIP

Author SHA1 Message Date
Tony Garnock-Jones 6a00f105af WIP 2016-03-14 13:17:06 -04:00
18 changed files with 74 additions and 706 deletions

View File

@ -10,7 +10,6 @@
- echoserver.erl is a matching ping server in Erlang
- echo-server-minimart-tcp-driver.rkt is a server using Minimart's TCP driver
- echo-server-prospect-tcp-driver.rkt is a server using Prospect's TCP driver
- echo-server-imperative-syndicate-tcp-driver.rkt is a server using Imperative Syndicate's TCP driver
- echo-server.rkt is a server using Minimart, but eschewing the TCP driver
- plain-racket-server.rkt is a server using built-in Racket threads
- uvserver.c is a server written in C with libuv
@ -21,14 +20,11 @@
symbols instead of fixnums to identify peers.
- internal-latency-prospect.rkt is the same, but using Prospect
instead of Minimart.
- internal-latency-imperative-syndicate.rkt is the same, but using
Imperative Syndicate instead of Minimart.
- observe-all-minimart.rkt, observe-all-prospect.rkt,
observe-all-imperative-syndicate.rkt, sum-all-minimart.rkt,
sum-all-prospect.rkt, and sum-all-imperative-syndicate.rkt measure
the costs of presence notification and processing in Minimart,
Prospect, and Imperative Syndicate worlds, respectively.
sum-all-minimart.rkt and sum-all-prospect.rkt measure the costs of
presence notification and processing in Minimart and Prospect
worlds, respectively.
- pingpong.rkt and pingpong.erl are simple measurements of Racket's
and Erlang's built-in thread communication latency, respectively.

View File

@ -1,92 +0,0 @@
#lang imperative-syndicate/test-implementation
;; Measurement of *broadcast* message delivery latency.
(require logbook)
(message-struct ping (src dst timestamp))
(message-struct pong (timestamp))
(define (send-ping! src dst)
(send! (ping src dst (current-inexact-milliseconds))))
(define (run #:echoer-count [echoer-count 100]
#:run-time [run-time 10000])
(define total-latency 0)
(define total-roundtrips 0)
(define boot-start-time (current-inexact-milliseconds))
(define run-start-time #f)
(define (rate-at count)
;; count is the number of roundtrips
;; each roundtrip involves (+ echoer-count 1) messages
;; we want messages per second
(/ (* count (+ echoer-count 1)) ;; echoer-count pings and one pong per roundtrip
(/ total-latency 1000.0) ;; latency in seconds
))
(test-case
[(for [(id (in-range echoer-count))]
(spawn (on (message (ping $src $dst $stamp))
(when (= dst id) (send! (pong stamp))))))
(let ((src echoer-count)
(dst 0))
(spawn (on-start (flush!)
(set! run-start-time (current-inexact-milliseconds))
(send-ping! src dst))
(on (message (pong $start-time))
(define stop-time (current-inexact-milliseconds))
;; TODO: is there a way of reducing the measurement error here,
;; perhaps by recording against run-start-time instead of start-time, somehow?
;; TODO: first, characterize the measurement error
(define delta (- stop-time start-time))
(set! total-latency (+ total-latency delta))
(set! total-roundtrips (+ total-roundtrips 1))
(when (zero? (modulo total-roundtrips 10000))
(log-info "After ~a roundtrips, ~a milliseconds; ~a Hz"
total-roundtrips
total-latency
(rate-at total-roundtrips)))
(when (< (- stop-time run-start-time) run-time)
(send-ping! src dst)))))])
(values total-roundtrips
(rate-at total-roundtrips)
(- run-start-time boot-start-time)))
(module+ main
(define t 5000)
(define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "broadcast-latency-imperative-syndicate"))
(define T (logbook-table E "broadcast-latency"
#:column-spec '(number-of-echoers
secs/msg
msgs/sec
boot-delay-ms
secs/process-booted
roundtrip-count
run-duration-ms)))
;; Warmup
(let ()
(run #:echoer-count 1 #:run-time 1000)
(run #:echoer-count 10 #:run-time 1000)
(void))
;; Real run
(for ((n
(list* 1 2 5
(let loop ((n 10))
(if (>= n 30000)
'()
(cons (inexact->exact (round n))
(loop (* n (sqrt (sqrt 2))))))))
))
(collect-garbage)
(collect-garbage)
(collect-garbage)
(define-values (count v boot-delay-ms) (run #:echoer-count n #:run-time t))
(write-logbook-datum! T (list n
(/ 1.0 v)
v
boot-delay-ms
(/ (/ boot-delay-ms 1000.0) n)
count
t))))

View File

@ -2,7 +2,7 @@
;; Measurement of *broadcast* message delivery latency.
(require racket/match)
(require syndicate)
(require prospect)
(require logbook)
(provide run)
@ -25,15 +25,11 @@
;; each roundtrip involves (+ echoer-count 1) messages
;; we want messages per second
(/ (* count (+ echoer-count 1)) ;; echoer-count pings and one pong per roundtrip
(max 0.000001 (/ total-latency 1000.0)) ;; latency in seconds
(/ total-latency 1000.0) ;; latency in seconds
))
(define (pinger src dst)
(actor #:name 'pinger
#:assertions* (patch->initial-assertions
(patch-seq (sub (pong ?))
(sub 'kickoff)))
(lambda (e s)
(spawn (lambda (e s)
(match e
[(message 'kickoff)
(set! run-start-time (current-inexact-milliseconds))
@ -46,7 +42,7 @@
(define delta (- stop-time start-time))
(set! total-latency (+ total-latency delta))
(set! total-roundtrips (+ total-roundtrips 1))
(when (zero? (modulo total-roundtrips 10000))
(when (zero? (modulo total-roundtrips 1000))
(log-info "After ~a roundtrips, ~a milliseconds; ~a Hz"
total-roundtrips
total-latency
@ -57,19 +53,19 @@
'()))]
[_ #f]))
#f
'()))
(patch-seq (sub (pong ?))
(sub 'kickoff)
(pub (ping src dst ?)))))
(define (echoer id)
(actor #:name (list 'echoer id)
#:assertions* (patch->initial-assertions
(patch-seq (sub (ping ? ? ?))))
(lambda (e s)
(spawn (lambda (e s)
(match e
[(message (ping src (== id) stamp))
(transition s (message (pong stamp)))]
[_ #f]))
#f
'()))
(patch-seq (sub (ping ? ? ?))
(pub (pong ?)))))
(begin
(run-ground (for/list [(id (in-range echoer-count))] (echoer id))
@ -79,7 +75,7 @@
(module+ main
(define t 10000)
(define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "broadcast-latency-syndicate"))
(define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "broadcast-latency-prospect"))
(define T (logbook-table E "broadcast-latency"
#:column-spec '(number-of-echoers
secs/msg

View File

@ -1,66 +0,0 @@
#lang imperative-syndicate
(require racket/port)
(require logbook)
(require racket/cmdline)
(require/activate imperative-syndicate/drivers/timer)
(require/activate imperative-syndicate/drivers/tcp)
(begin-for-declarations
(define server-entry-name #f)
(define server-entry-type #f)
(command-line #:program "echo-server-imperative-syndicate-tcp-driver.rkt"
#:once-each
["--logbook-entry-name" name
"set logbook entry name to use when recording run statistics"
(set! server-entry-name name)]
["--logbook-entry-type" type
"set logbook entry type to use"
(set! server-entry-type type)])
(when (not server-entry-type)
(error 'echo-server "Please supply the --logbook-entry-type command-line argument."))
(define L (default-logbook))
(define E (logbook-entry L "minimart" server-entry-name server-entry-type))
(define Tmem (logbook-table E "server-memory-use" #:column-spec '(time-seconds memory-use)))
(define Tconn (logbook-table E "server-connections" #:column-spec '(time-seconds connection-count)))
(define connection-count 0)
(define first-connection-seen? #f)
(define statistics-poll-interval 2000))
(define (statistician)
(spawn #:name 'statistician
(field [sample-deadline (+ (current-inexact-milliseconds) statistics-poll-interval)])
(on (asserted (later-than (sample-deadline)))
;; (collect-garbage)
;; (collect-garbage)
;; (collect-garbage)
(define now (sample-deadline))
(write-logbook-datum! Tmem (list (/ now 1000.0) (current-memory-use)))
(write-logbook-datum! Tconn (list (/ now 1000.0) connection-count))
(when (and first-connection-seen? (zero? connection-count))
(exit 0))
(sample-deadline (+ (sample-deadline) statistics-poll-interval)))))
(define (listener port-number)
(spawn #:name 'listener
(during/spawn (tcp-connection $id (tcp-listener port-number))
#:name (list 'connection id)
(assert (tcp-accepted id))
(on-start
(issue-credit! (tcp-listener port-number))
(issue-unbounded-credit! tcp-in id)
(set! connection-count (+ connection-count 1))
(set! first-connection-seen? #t))
(on-stop
(set! connection-count (- connection-count 1)))
(on (message (tcp-in id $bs))
(send! (tcp-out id bs))))))
(listener 5999)
(statistician)

View File

@ -27,15 +27,7 @@
["--prospect+tcp"
"use prospect server with TCP driver"
(set! server-variation 'prospect+tcp)]
["--imperative-syndicate+tcp"
"use imperative-syndicate server with TCP driver"
(set! server-variation 'imperative-syndicate+tcp)]
["--syndicate-js"
"use Syndicate/js"
(set! server-variation 'syndicate-js)]
["--racket" "use threaded racket server" (set! server-variation 'racket)]
["--racket-evt" "use evented racket server" (set! server-variation 'racket-evt)]
["--racket-semi-threaded" "use semi-threaded racket server" (set! server-variation 'racket-semi-threaded)]
["--racket" "use plain racket server" (set! server-variation 'racket)]
["--other" name "use other server" (set! server-variation name)])
(when (not server-variation)
@ -72,10 +64,7 @@
['minimart (format-racket-server-command-line "echo-server.rkt")]
['minimart+tcp (format-racket-server-command-line "echo-server-minimart-tcp-driver.rkt")]
['prospect+tcp (format-racket-server-command-line "echo-server-prospect-tcp-driver.rkt")]
['imperative-syndicate+tcp (format-racket-server-command-line "echo-server-imperative-syndicate-tcp-driver.rkt")]
['racket (format-racket-server-command-line "plain-racket-server.rkt")]
['racket-evt (format-racket-server-command-line "plain-racket-server-evt.rkt")]
['racket-semi-threaded (format-racket-server-command-line "plain-racket-server-semi-threaded.rkt")]
['erlang
(define erlang-version-command
"erl -noshell -eval 'io:format(erlang:system_info(otp_release)), halt().'")
@ -86,8 +75,6 @@
['uv
(write-logbook-datum! Tmachine #:label "uv-banner" (capture-output "./uvserver -v"))
"./uvserver"]
['syndicate-js
"node echo-server-syndicate-js.js"]
[(? string? other)
(printf "Please start the other server on hostname '~a' now.\n" server-hostname)
(printf "Press enter when it has started.\n")

View File

@ -1,88 +0,0 @@
#lang imperative-syndicate/test-implementation
;; Measurement of message delivery latency.
(require logbook)
(message-struct ping (src dst timestamp))
(define (send-ping! src dst)
(send! (ping src dst (current-inexact-milliseconds))))
(define (run #:echoer-count [echoer-count 100]
#:run-time [run-time 10000])
(define total-latency 0)
(define total-roundtrips 0)
(define boot-start-time (current-inexact-milliseconds))
(define run-start-time #f)
(define (rate-at count)
(/ (* count 2) ;; two messages per roundtrip
(/ total-latency 1000.0) ;; latency in seconds
))
(test-case
[(for [(id (in-range echoer-count))]
(spawn (on (message (ping $src id $stamp))
(send! (ping id src stamp)))))
(let ((src echoer-count)
(dst 0))
(spawn (on-start
(flush!)
(set! run-start-time (current-inexact-milliseconds))
(send-ping! src dst))
(on (message (ping dst src $start-time))
(define stop-time (current-inexact-milliseconds))
;; TODO: is there a way of reducing the measurement error here,
;; perhaps by recording against run-start-time instead of start-time, somehow?
;; TODO: first, characterize the measurement error
(define delta (- stop-time start-time))
(set! total-latency (+ total-latency delta))
(set! total-roundtrips (+ total-roundtrips 1))
(when (zero? (modulo total-roundtrips 10000))
(log-info "After ~a roundtrips, ~a milliseconds; ~a Hz"
total-roundtrips
total-latency
(rate-at total-roundtrips)))
(when (< (- stop-time run-start-time) run-time)
(send-ping! src dst)))))])
(values total-roundtrips (rate-at total-roundtrips) (- run-start-time boot-start-time)))
(module+ main
(define t 2000)
(define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "internal-latency-imperative-syndicate"))
(define T (logbook-table E "internal-latency"
#:column-spec '(number-of-echoers
secs/msg
msgs/sec
boot-delay-ms
secs/process-booted
roundtrip-count
run-duration-ms)))
;; Warmup
(let ()
(run #:echoer-count 1 #:run-time 1000)
(run #:echoer-count 10 #:run-time 1000)
(void))
;; Real run
(for ((n
(list* 1 2 5
(let loop ((n 10))
(if (>= n 30000)
'()
(cons (inexact->exact (round n))
(loop (* n (sqrt (sqrt 2))))))))
))
(collect-garbage)
(collect-garbage)
(collect-garbage)
(define-values (count v boot-delay-ms) (run #:echoer-count n #:run-time t))
(write-logbook-datum! T (list n
(/ 1.0 v)
v
boot-delay-ms
(/ (/ boot-delay-ms 1000.0) n)
count
t))))

View File

@ -2,68 +2,75 @@
;; Measurement of message delivery latency.
(require racket/match)
(require syndicate)
(require prospect)
(require logbook)
(provide run)
(struct ping (src dst timestamp) #:transparent)
;; (define (current-microseconds)
;; (* (current-inexact-milliseconds) 1000.0))
(define (current-microseconds)
(modulo (* (current-inexact-milliseconds) 1000.0) 1000000000))
;; (define (current-microseconds)
;; (truncate (modulo (* (current-inexact-milliseconds) 1000.0) 1000000000)))
;; (define (current-microseconds)
;; (inexact->exact (truncate (modulo (* (current-inexact-milliseconds) 1000.0) 1000000000))))
(define (send-ping src dst)
(message (ping src dst (current-inexact-milliseconds))))
(message (ping src dst (current-microseconds))))
(define (run #:echoer-count [echoer-count 100]
#:run-time [run-time 10000])
(define total-latency 0)
(define total-roundtrips 0)
(define boot-start-time (current-inexact-milliseconds))
(define boot-start-time (current-microseconds))
(define run-start-time #f)
(define (rate-at count)
(/ (* count 2) ;; two messages per roundtrip
(/ total-latency 1000.0) ;; latency in seconds
(/ total-latency 1000000.0) ;; latency in seconds
))
(define (pinger src dst)
(actor #:assertions* (patch->initial-assertions
(patch-seq (sub (ping dst src ?))
(sub 'kickoff)))
(lambda (e s)
(spawn (lambda (e s)
(match e
[(message 'kickoff)
(set! run-start-time (current-inexact-milliseconds))
(set! run-start-time (current-microseconds))
(transition s (send-ping src dst))]
[(message (ping (== dst) (== src) start-time))
(define stop-time (current-inexact-milliseconds))
(define stop-time (current-microseconds))
;; TODO: is there a way of reducing the measurement error here,
;; perhaps by recording against run-start-time instead of start-time, somehow?
;; TODO: first, characterize the measurement error
(define delta (- stop-time start-time))
(set! total-latency (+ total-latency delta))
(set! total-roundtrips (+ total-roundtrips 1))
(when (zero? (modulo total-roundtrips 10000))
(log-info "After ~a roundtrips, ~a milliseconds; ~a Hz"
(when (zero? (modulo total-roundtrips 1000))
(log-info "After ~a roundtrips, ~a milliseconds; ~a Hz."
total-roundtrips
total-latency
(rate-at total-roundtrips)))
(transition s
(if (< (- stop-time run-start-time) run-time)
(if (< (/ (- stop-time run-start-time) 1000.0) run-time)
(send-ping src dst)
'()))]
[_ #f]))
#f
'()))
(patch-seq (sub (ping dst src ?))
(sub 'kickoff)
(pub (ping src dst ?)))))
(define (echoer id)
(actor #:assertions* (patch->initial-assertions
(sub (ping ? id ?)))
(lambda (e s)
(spawn (lambda (e s)
(match e
[(message (ping src (== id) stamp))
(transition s (message (ping id src stamp)))]
[_ #f]))
#f
'()))
(patch-seq (sub (ping ? id ?))
(pub (ping id ? ?)))))
(begin
(run-ground (for/list [(id (in-range echoer-count))] (echoer id))
@ -73,15 +80,15 @@
(module+ main
(define t 10000)
(define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "internal-latency-syndicate"))
(define T (logbook-table E "internal-latency"
#:column-spec '(number-of-echoers
secs/msg
msgs/sec
boot-delay-ms
secs/process-booted
roundtrip-count
run-duration-ms)))
;; (define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "internal-latency-prospect"))
;; (define T (logbook-table E "internal-latency"
;; #:column-spec '(number-of-echoers
;; secs/msg
;; msgs/sec
;; boot-delay-ms
;; secs/process-booted
;; roundtrip-count
;; run-duration-ms)))
;; Warmup
(let ()
(run #:echoer-count 1 #:run-time 1000)
@ -90,7 +97,7 @@
;; Real run
(for ((n
(list* 1 2 5
(let loop ((n 10))
'()#;(let loop ((n 10))
(if (>= n 30000)
'()
(cons (inexact->exact (round n))
@ -100,7 +107,7 @@
(collect-garbage)
(collect-garbage)
(define-values (count v boot-delay-ms) (run #:echoer-count n #:run-time t))
(write-logbook-datum! T (list n
(void)#;(write-logbook-datum! T (list n
(/ 1.0 v)
v
boot-delay-ms

View File

@ -1,58 +0,0 @@
#lang imperative-syndicate/test-implementation
;; Measurement of presence processing.
;; Peers observe each other, but do not process the resulting routing events.
(require logbook)
(assertion-struct presence (id))
(define (run #:peer-count [peer-count 100])
(define event-count 0)
(define-values (results cpu-time wall-clock-time gc-time)
(time-apply
(lambda ()
(test-case
[(for [(id (in-range peer-count))]
(spawn (assert (presence id))
(on (asserted (presence $peer))
(set! event-count (+ event-count 1)))))]))
'()))
(values event-count (max 1 (- cpu-time gc-time))))
(module+ main
(define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "observe-all-imperative-syndicate"))
(define T (logbook-table E "presence-processing"
#:column-spec '(number-of-peers
secs/routing-update
routing-updates/sec
secs/peer
peers/sec
event-count
run-duration-ms)))
;; Warmup
(let ()
(run #:peer-count 1)
(run #:peer-count 10)
(void))
;; Real run
(for ((n
(list* 1 2 5
(let loop ((n 10))
(if (>= n 1000)
'()
(cons (inexact->exact (round n))
(loop (* n (sqrt (sqrt 2))))))))
))
(collect-garbage)
(collect-garbage)
(collect-garbage)
(define-values (event-count run-duration-ms) (run #:peer-count n))
(write-logbook-datum! T (list n
(/ (/ run-duration-ms 1000.0) event-count)
(/ event-count (/ run-duration-ms 1000.0))
(/ (/ run-duration-ms 1000.0) n)
(/ n (/ run-duration-ms 1000.0))
event-count
run-duration-ms))))

View File

@ -1,82 +0,0 @@
#lang imperative-syndicate/test-implementation
;; Measurement of presence processing.
;; Peers observe just one distinguished peer, and do not process resulting routing events.
(require logbook)
(provide run)
(assertion-struct client ())
(define (run #:peer-count [peer-count 100] #:client-first? [client-first? #f])
(define event-count 0)
(define-values (results cpu-total-ms wall-ms cpu-gc-ms)
(time-apply
(lambda ()
(test-case
[(define (spawn-client)
(spawn (assert (client))))
(define (spawn-server id)
(spawn (on (asserted (client))
(set! event-count (+ event-count 1)))))
(define (spawn-servers)
(for [(id (in-range (- peer-count 1)))]
(spawn-server id)))
(if client-first?
(begin (spawn-client) (spawn-servers))
(begin (spawn-servers) (spawn-client)))]))
'()))
(values event-count (max 1 (- cpu-total-ms cpu-gc-ms))))
(module+ main
(define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "observe-some-imperative-syndicate"))
(define T-client-first (logbook-table E "presence-processing-narrow-client-first"
#:column-spec '(number-of-peers
secs/routing-update
routing-updates/sec
secs/peer
peers/sec
event-count
run-duration-ms)))
(define T-client-last (logbook-table E "presence-processing-narrow-client-last"
#:column-spec '(number-of-peers
secs/routing-update
routing-updates/sec
secs/peer
peers/sec
event-count
run-duration-ms)))
;; Warmup
(let ()
(run #:peer-count 1)
(run #:peer-count 10)
(void))
;; Real run
(for ((n
#;(list 5)
(list* 2 5
(let loop ((n 10))
(if (>= n 100000)
'()
(cons (inexact->exact (round n))
(loop (* n (sqrt (sqrt 2))))))))
))
(collect-garbage)
(collect-garbage)
(collect-garbage)
(for ((client-first? (list #t #f)))
(define-values (event-count run-duration-ms)
(run #:peer-count n #:client-first? client-first?))
(write-logbook-datum! (if client-first? T-client-first T-client-last)
(list n
(/ (/ run-duration-ms 1000.0) event-count)
(/ event-count (/ run-duration-ms 1000.0))
(/ (/ run-duration-ms 1000.0) n)
(/ n (/ run-duration-ms 1000.0))
event-count
run-duration-ms)))))

View File

@ -1,74 +0,0 @@
#lang racket/base
(require racket/match)
(require racket/tcp)
(require racket/port)
(require logbook)
(require racket/cmdline)
(define server-entry-name #f)
(define server-entry-type #f)
(command-line #:program "plain-racket-server-evt.rkt"
#:once-each
["--logbook-entry-name" name
"set logbook entry name to use when recording run statistics"
(set! server-entry-name name)]
["--logbook-entry-type" type
"set logbook entry type to use"
(set! server-entry-type type)])
(when (not server-entry-type)
(error 'plain-racket-server "Please supply the --logbook-entry-type command-line argument."))
(define L (default-logbook))
(define E (logbook-entry L "minimart" server-entry-name server-entry-type))
(define Tmem (logbook-table E "server-memory-use" #:column-spec '(time-seconds memory-use)))
(define Tconn (logbook-table E "server-connections" #:column-spec '(time-seconds connection-count)))
(define connection-count 0)
(define first-connection-seen? #f)
(define statistics-poll-interval 2000)
(void (thread
(lambda ()
(let loop ()
(sleep (/ statistics-poll-interval 1000.0))
(printf "~a connections\n" connection-count)
(flush-output)
(define now (current-inexact-milliseconds))
(write-logbook-datum! Tmem (list (/ now 1000.0) (current-memory-use)))
(write-logbook-datum! Tconn (list (/ now 1000.0) connection-count))
(when (and first-connection-seen? (zero? connection-count))
(close-logbook L)
(exit 0))
(loop)))))
(define (connection in out)
(set! connection-count (+ connection-count 1))
(set! first-connection-seen? #t)
(define reader-evt
(handle-evt (read-line-evt in 'any)
(lambda (item)
(match item
[(? eof-object?)
(close-input-port in)
(close-output-port out)
(set! connection-count (- connection-count 1))
(lambda (events) (remq reader-evt events))]
[line
(fprintf out "~a\n" line)
(flush-output out)
values]))))
(lambda (events) (cons reader-evt events)))
(define (listener port-number)
(define s (tcp-listen port-number 128 #t))
(let loop ((events (list (handle-evt (tcp-accept-evt s)
(lambda (ports)
(match-define (list in out) ports)
(connection in out))))))
(loop ((or (apply sync/timeout 1.0 events) values) events))))
(listener 5999)

View File

@ -1,74 +0,0 @@
#lang racket/base
(require racket/match)
(require racket/tcp)
(require racket/port)
(require logbook)
(require racket/cmdline)
(define server-entry-name #f)
(define server-entry-type #f)
(command-line #:program "plain-racket-server-semi-threaded.rkt"
#:once-each
["--logbook-entry-name" name
"set logbook entry name to use when recording run statistics"
(set! server-entry-name name)]
["--logbook-entry-type" type
"set logbook entry type to use"
(set! server-entry-type type)])
(when (not server-entry-type)
(error 'plain-racket-server "Please supply the --logbook-entry-type command-line argument."))
(define L (default-logbook))
(define E (logbook-entry L "minimart" server-entry-name server-entry-type))
(define Tmem (logbook-table E "server-memory-use" #:column-spec '(time-seconds memory-use)))
(define Tconn (logbook-table E "server-connections" #:column-spec '(time-seconds connection-count)))
(define connection-count 0)
(define first-connection-seen? #f)
(define statistics-poll-interval 2000)
(void (thread
(lambda ()
(let loop ()
(sleep (/ statistics-poll-interval 1000.0))
(printf "~a connections\n" connection-count)
(flush-output)
(define now (current-inexact-milliseconds))
(write-logbook-datum! Tmem (list (/ now 1000.0) (current-memory-use)))
(write-logbook-datum! Tconn (list (/ now 1000.0) connection-count))
(when (and first-connection-seen? (zero? connection-count))
(close-logbook L)
(exit 0))
(loop)))))
(define (connection in out)
(set! connection-count (+ connection-count 1))
(set! first-connection-seen? #t)
(thread
(lambda ()
(let loop ()
(sync (handle-evt (read-line-evt in 'any)
(match-lambda
[(? eof-object?)
(close-input-port in)
(close-output-port out)
(set! connection-count (- connection-count 1))]
[line
(fprintf out "~a\n" line)
(flush-output out)
(loop)])))))))
(define (listener port-number)
(define s (tcp-listen port-number 128 #t))
(let loop ()
(sync (handle-evt (tcp-accept-evt s)
(lambda (item)
(match-define (list in out) item)
(connection in out)
(loop))))))
(listener 5999)

View File

@ -14,21 +14,14 @@ racket external-latency.rkt --max-waypoint $MAX_WAYPOINT --uv
## racket external-latency.rkt --max-waypoint $MAX_MINIMART_WAYPOINT --minimart ## don't bother
racket external-latency.rkt --max-waypoint $MAX_MINIMART_WAYPOINT --minimart+tcp
racket external-latency.rkt --max-waypoint $MAX_WAYPOINT --prospect+tcp
racket external-latency.rkt --max-waypoint $MAX_WAYPOINT --imperative-syndicate+tcp
racket external-latency.rkt --max-waypoint $MAX_WAYPOINT --racket
racket external-latency.rkt --max-waypoint $MAX_WAYPOINT --racket-evt
racket internal-latency.rkt
racket internal-latency-prospect.rkt
racket internal-latency-imperative-syndicate.rkt
racket broadcast-latency-prospect.rkt
racket broadcast-latency-imperative-syndicate.rkt
racket observe-all-minimart.rkt
racket observe-all-prospect.rkt
racket observe-all-imperative-syndicate.rkt
racket observe-some-minimart.rkt
racket observe-some-prospect.rkt
racket observe-some-imperative-syndicate.rkt

View File

@ -1,56 +0,0 @@
#lang imperative-syndicate/test-implementation
;; Measurement of presence processing.
;; Peers observe each other, AND ALSO process the resulting routing events.
(require logbook)
(assertion-struct presence (id))
(define (run #:peer-count [peer-count 100])
(define event-count 0)
(define start-time (current-inexact-milliseconds))
(test-case [(for [(id (in-range peer-count))]
(spawn (assert (presence id))
(field [peer-count 0])
(on (asserted (presence $peer))
(set! event-count (+ event-count 1))
(peer-count (+ (peer-count) 1)))))])
(define stop-time (current-inexact-milliseconds))
(define delta (- stop-time start-time))
(values event-count delta))
(module+ main
(define E (standard-logbook-entry (default-logbook #:verbose? #t) "minimart" "sum-all-imperative-syndicate"))
(define T (logbook-table E "presence-processing"
#:column-spec '(number-of-peers
secs/event
events/sec
secs/peer
peers/sec
event-count
run-duration-ms)))
;; Warmup
(let ()
(run #:peer-count 1)
(run #:peer-count 10)
(void))
;; Real run
(for ((n
(list* 1 2 5
(let loop ((n 10))
(if (>= n 1000)
'()
(cons (inexact->exact (round n))
(loop (* n (sqrt (sqrt 2))))))))
))
(collect-garbage)
(collect-garbage)
(collect-garbage)
(define-values (event-count run-duration-ms) (run #:peer-count n))
(write-logbook-datum! T (list n
(/ (/ run-duration-ms 1000.0) event-count)
(/ event-count (/ run-duration-ms 1000.0))
(/ (/ run-duration-ms 1000.0) n)
(/ n (/ run-duration-ms 1000.0))
event-count
run-duration-ms))))

View File

@ -5,17 +5,14 @@
#include <uv.h>
static uv_tcp_t serversock;
static uv_timer_t stats;
static int connection_count = 0;
static void on_timer(uv_timer_t *timer) {
static void on_timer(uv_timer_t *timer, int status) {
printf("%d connections\n", connection_count);
}
static void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = calloc(1, suggested_size);
buf->len = suggested_size;
static uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size) {
return uv_buf_init(calloc(1, suggested_size), suggested_size);
}
static void on_write_complete(uv_write_t *req, int status) {
@ -24,40 +21,22 @@ static void on_write_complete(uv_write_t *req, int status) {
free(req);
}
static void on_timer_closed(uv_handle_t *timer) {
printf("Exiting after timer shutdown.\n");
uv_stop(timer->loop);
}
static void on_serversock_closed(uv_handle_t *serversock) {
printf("Shutting down statistics timer after server socket shutdown.\n");
uv_timer_stop(&stats);
uv_close((uv_handle_t *) &stats, on_timer_closed);
}
static void on_conn_closed(uv_handle_t *conn) {
connection_count--;
if (connection_count == 0) {
printf("Shutting down server socket on zero connection count.\n");
uv_close((uv_handle_t *) &serversock, on_serversock_closed);
}
free(conn);
}
static void on_input(uv_stream_t *conn, ssize_t nread, uv_buf_t const *buf) {
if (nread < 0) {
static void on_input(uv_stream_t *conn, ssize_t nread, uv_buf_t buf) {
if (nread == -1) {
/* here we could check uv_last_error(conn->loop).code to see whether it's UV_EOF or not */
/* but we don't care; just close, and be done */
uv_close((uv_handle_t *) conn, on_conn_closed);
if (buf->base != NULL) {
free(buf->base);
connection_count--;
if (connection_count == 0) {
printf("Exiting on zero connection count.\n");
uv_stop(conn->loop);
}
uv_close((uv_handle_t *) conn, NULL);
} else {
/* we just echo what came in. */
uv_write_t *req = calloc(1, sizeof(uv_write_t));
uv_buf_t outbuf = { .base = buf->base, .len = nread };
req->data = buf->base;
uv_write(req, conn, &outbuf, 1, on_write_complete);
/* we just echo what came in. */
req->data = buf.base;
buf.len = nread;
uv_write(req, conn, &buf, 1, on_write_complete);
}
}
@ -82,7 +61,9 @@ static void on_connection(uv_stream_t *serversock, int status) {
static int const PORTNUMBER = 5999;
int main(int argc, char const *argv[]) {
uv_loop_t uv;
uv_loop_t *uv = uv_default_loop();
uv_tcp_t serversock;
uv_timer_t stats;
struct sockaddr_in bind_addr;
printf("uvserver; libuv version %s\n", uv_version_string());
@ -91,19 +72,17 @@ int main(int argc, char const *argv[]) {
return 0;
}
uv_loop_init(&uv);
printf("Accepting connections on port %d.\n", PORTNUMBER);
uv_tcp_init(&uv, &serversock);
uv_ip4_addr("0.0.0.0", PORTNUMBER, &bind_addr);
uv_tcp_bind(&serversock, (struct sockaddr const *) &bind_addr, 0);
uv_tcp_init(uv, &serversock);
bind_addr = uv_ip4_addr("0.0.0.0", PORTNUMBER);
uv_tcp_bind(&serversock, bind_addr);
uv_listen((uv_stream_t *) &serversock, 4, on_connection);
uv_timer_init(&uv, &stats);
uv_timer_init(uv, &stats);
uv_timer_start(&stats, on_timer, 2000, 2000);
uv_run(&uv, UV_RUN_DEFAULT);
uv_run(uv, UV_RUN_DEFAULT);
uv_loop_close(&uv);
uv_loop_delete(uv);
return 0;
}