Avoid need for latching on listen-side TCP connections; flip demand-matcher's default orientation

This commit is contained in:
Tony Garnock-Jones 2014-06-06 18:07:58 -04:00
parent f4169206f5
commit 2eb8822c56
11 changed files with 53 additions and 49 deletions

View File

@ -69,7 +69,7 @@
(define (spawn-demand-matcher projection
increase-handler
[decrease-handler unexpected-supply-decrease]
#:demand-is-subscription? [demand-is-subscription? #t]
#:demand-is-subscription? [demand-is-subscription? #f]
#:meta-level [meta-level 0]
#:demand-level [demand-level 0]
#:supply-level [supply-level 0])

View File

@ -31,11 +31,11 @@
(define (spawn-tcp-driver)
(list (spawn-demand-matcher (tcp-channel ? (?! (tcp-listener ?)) ?)
#:demand-is-subscription? #t
#:demand-level 1
#:supply-level 2
spawn-tcp-listener)
(spawn-demand-matcher (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)
#:demand-is-subscription? #f
spawn-tcp-connection)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View File

@ -30,6 +30,7 @@
(define (spawn-websocket-driver)
(spawn-demand-matcher (websocket-message ? (?! (websocket-local-server ? ?)) ?)
#:demand-is-subscription? #t
#:demand-level 1
#:supply-level 2
spawn-websocket-listener))

View File

@ -10,15 +10,21 @@
(spawn-tcp-driver)
(userland-thread
#:gestalt (gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1)
(sub (tcp-channel remote-handle local-handle ?))
(pub (tcp-channel local-handle remote-handle ?))
(pub (tcp-channel local-handle remote-handle ?) #:level 1))
(wait-for-gestalt (pub (tcp-channel local-handle remote-handle ?) #:level 1))
#:gestalt
(gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1)
(sub (tcp-channel remote-handle local-handle ?))
(sub (tcp-channel remote-handle local-handle ?) #:level 1)
(pub (tcp-channel local-handle remote-handle ?)))
(wait-for-gestalt (sub (tcp-channel remote-handle local-handle ?) #:level 1))
(let loop ()
(match (next-event)
[(message (event _ (list (? eof-object?))) 1 #f) (do (quit))]
[(message (event _ (list line)) 1 #f) (do (send (tcp-channel local-handle remote-handle line)))]
[(message (tcp-channel _ _ bs) 0 #f) (write-bytes bs) (flush-output)]
[(routing-update g) (when (gestalt-empty? g) (do (quit)))])
[(message (event _ (list (? eof-object?))) 1 #f)
(do (quit))]
[(message (event _ (list line)) 1 #f)
(do (send (tcp-channel local-handle remote-handle line)))]
[(message (tcp-channel _ _ bs) 0 #f)
(write-bytes bs)
(flush-output)]
[(routing-update g)
(when (gestalt-empty? g) (do (quit)))])
(loop)))

View File

@ -26,5 +26,5 @@
#f
(gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1)
(sub (tcp-channel remote-handle local-handle ?))
(pub (tcp-channel local-handle remote-handle ?))
(pub (tcp-channel local-handle remote-handle ?) #:level 1)))
(sub (tcp-channel remote-handle local-handle ?) #:level 1)
(pub (tcp-channel local-handle remote-handle ?))))

View File

@ -15,17 +15,22 @@
(define (say who fmt . vs)
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
(define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1 #:level 1)
(pub (tcp-channel us them ?) #:meta-level 1)
(define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1)
(sub (tcp-channel them us ?) #:meta-level 1 #:level 1)
(sub (tcp-channel them us ?) #:meta-level 1)))
(define (gestalt->peers g)
(matcher-key-set/single
(gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?)))))
(userland-thread #:gestalt (gestalt-union tcp-gestalt
(sub `(,? says ,?))
(sub `(,? says ,?) #:level 1)
(pub `(,user says ,?)))
(wait-for-gestalt tcp-gestalt)
(define orig-peers (gestalt->peers (wait-for-gestalt tcp-gestalt)))
(send-to-remote "Welcome, ~a.\n" user)
(let loop ((old-peers (set)))
(for/list [(who orig-peers)] (say who "arrived."))
(let loop ((old-peers orig-peers))
(match (next-event)
[(message (tcp-channel _ _ bs) 1 #f)
(do (send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))
@ -35,8 +40,7 @@
(loop old-peers)]
[(routing-update g)
(when (gestalt-empty? (gestalt-filter g tcp-gestalt)) (do (quit)))
(define new-peers (matcher-key-set/single
(gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?)))))
(define new-peers (gestalt->peers g))
(for/list [(who (set-subtract new-peers old-peers))] (say who "arrived."))
(for/list [(who (set-subtract old-peers new-peers))] (say who "departed."))
(loop new-peers)]))))
@ -45,5 +49,4 @@
(spawn-world
(spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?)
#:meta-level 1
#:demand-is-subscription? #f
spawn-session))

View File

@ -10,8 +10,8 @@
(define (send-to-remote fmt . vs)
(do (send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1 #:level 1)
(pub (tcp-channel us them ?) #:meta-level 1)
(define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1)
(sub (tcp-channel them us ?) #:meta-level 1 #:level 1)
(sub (tcp-channel them us ?) #:meta-level 1)))
(define (decode-input bs) (string-trim (bytes->string/utf-8 bs)))
@ -43,8 +43,9 @@
(loop old-peers)]
[(routing-update g)
(when (gestalt-empty? (gestalt-filter g tcp-gestalt)) (do (quit)))
(define new-peers (matcher-key-set/single
(gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?)))))
(define new-peers
(matcher-key-set/single
(gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?)))))
(for/list [(who (set-subtract new-peers old-peers))] (say who "arrived."))
(for/list [(who (set-subtract old-peers new-peers))] (say who "departed."))
(loop new-peers)]))))
@ -53,5 +54,4 @@
(spawn-world
(spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?)
#:meta-level 1
#:demand-is-subscription? #f
spawn-session))

View File

@ -5,8 +5,6 @@
(require "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")
(struct session (seen-remote? peers) #:transparent)
(define (spawn-session them us)
(define user (gensym 'user))
(define remote-detector (compile-gestalt-projection (?!)))
@ -16,36 +14,35 @@
(define (say who fmt . vs)
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
(list (send-to-remote "Welcome, ~a.\n" user)
(spawn (lambda (e state)
(spawn (lambda (e old-peers)
(log-info "~a: ~v --> ~v" user e old-peers)
(match e
[(message (tcp-channel _ _ bs) 1 #f)
(transition state (send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))]
(transition old-peers
(send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))]
[(message `(,who says ,what) 0 #f)
(transition state (say who "says: ~a" what))]
(transition old-peers (say who "says: ~a" what))]
[(routing-update g)
(match-define (session seen-remote? old-peers) state)
(define remote-present?
(not (matcher-empty? (gestalt-project g 1 0 #f remote-detector))))
(define new-peers (matcher-key-set/single (gestalt-project g 0 0 #t peer-detector)))
(define new-peers
(matcher-key-set/single (gestalt-project g 0 0 #t peer-detector)))
(transition
(struct-copy session state
[seen-remote? (or remote-present? seen-remote?)]
[peers new-peers])
(list (when (and seen-remote? (not remote-present?)) (quit))
(for/list [(who (set-subtract new-peers old-peers))] (say who "arrived."))
(for/list [(who (set-subtract old-peers new-peers))] (say who "departed."))))]
new-peers
(list (when (matcher-empty? (gestalt-project g 1 0 #t remote-detector)) (quit))
(for/list [(who (set-subtract new-peers old-peers))]
(say who "arrived."))
(for/list [(who (set-subtract old-peers new-peers))]
(say who "departed."))))]
[#f #f]))
(session #f (set))
(set)
(gestalt-union (sub `(,? says ,?))
(sub `(,? says ,?) #:level 1)
(pub `(,user says ,?))
(sub (tcp-channel them us ?) #:meta-level 1)
(pub (tcp-channel us them ?) #:meta-level 1)
(pub (tcp-channel us them ?) #:meta-level 1 #:level 1)))))
(sub (tcp-channel them us ?) #:meta-level 1 #:level 1)
(pub (tcp-channel us them ?) #:meta-level 1)))))
(spawn-tcp-driver)
(spawn-world
(spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?)
#:meta-level 1
#:demand-is-subscription? #f
spawn-session))

View File

@ -20,11 +20,10 @@
(spawn connection-handler
0
(gestalt-union (sub (tcp-channel c server-id ?))
(pub (tcp-channel server-id c ?))
(pub (tcp-channel server-id c ?) #:level 1))))
(sub (tcp-channel c server-id ?) #:level 1)
(pub (tcp-channel server-id c ?)))))
(spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) server-id ?)
#:demand-is-subscription? #f
spawn-connection-handler
(lambda (c)
(log-info "Connection handler ~v decided to exit" c)

View File

@ -26,7 +26,6 @@
(pub (websocket-message server-id c ?)))))
(spawn-demand-matcher (websocket-message (?! any-client) server-id ?)
#:demand-is-subscription? #f
spawn-connection-handler
(lambda (c)
(log-info "Connection handler ~v decided to exit" c)

View File

@ -25,7 +25,6 @@
(pub (websocket-message server-id c ?)))))
(spawn-demand-matcher (websocket-message (?! any-client) server-id ?)
#:demand-is-subscription? #f
spawn-connection-handler
(lambda (c)
(log-info "Connection handler ~v decided to exit" c)