diff --git a/minimart/demand-matcher.rkt b/minimart/demand-matcher.rkt index 5dbd542..ef6be2e 100644 --- a/minimart/demand-matcher.rkt +++ b/minimart/demand-matcher.rkt @@ -69,7 +69,7 @@ (define (spawn-demand-matcher projection increase-handler [decrease-handler unexpected-supply-decrease] - #:demand-is-subscription? [demand-is-subscription? #t] + #:demand-is-subscription? [demand-is-subscription? #f] #:meta-level [meta-level 0] #:demand-level [demand-level 0] #:supply-level [supply-level 0]) diff --git a/minimart/drivers/tcp.rkt b/minimart/drivers/tcp.rkt index 718856f..4e0d89a 100644 --- a/minimart/drivers/tcp.rkt +++ b/minimart/drivers/tcp.rkt @@ -31,11 +31,11 @@ (define (spawn-tcp-driver) (list (spawn-demand-matcher (tcp-channel ? (?! (tcp-listener ?)) ?) + #:demand-is-subscription? #t #:demand-level 1 #:supply-level 2 spawn-tcp-listener) (spawn-demand-matcher (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?) - #:demand-is-subscription? #f spawn-tcp-connection))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/minimart/drivers/websocket.rkt b/minimart/drivers/websocket.rkt index 61af07e..a723a4f 100644 --- a/minimart/drivers/websocket.rkt +++ b/minimart/drivers/websocket.rkt @@ -30,6 +30,7 @@ (define (spawn-websocket-driver) (spawn-demand-matcher (websocket-message ? (?! (websocket-local-server ? ?)) ?) + #:demand-is-subscription? #t #:demand-level 1 #:supply-level 2 spawn-websocket-listener)) diff --git a/minimart/examples/chat-client-userland.rkt b/minimart/examples/chat-client-userland.rkt index 2aede1a..f15e9b5 100644 --- a/minimart/examples/chat-client-userland.rkt +++ b/minimart/examples/chat-client-userland.rkt @@ -10,15 +10,21 @@ (spawn-tcp-driver) (userland-thread - #:gestalt (gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1) - (sub (tcp-channel remote-handle local-handle ?)) - (pub (tcp-channel local-handle remote-handle ?)) - (pub (tcp-channel local-handle remote-handle ?) #:level 1)) - (wait-for-gestalt (pub (tcp-channel local-handle remote-handle ?) #:level 1)) + #:gestalt + (gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1) + (sub (tcp-channel remote-handle local-handle ?)) + (sub (tcp-channel remote-handle local-handle ?) #:level 1) + (pub (tcp-channel local-handle remote-handle ?))) + (wait-for-gestalt (sub (tcp-channel remote-handle local-handle ?) #:level 1)) (let loop () (match (next-event) - [(message (event _ (list (? eof-object?))) 1 #f) (do (quit))] - [(message (event _ (list line)) 1 #f) (do (send (tcp-channel local-handle remote-handle line)))] - [(message (tcp-channel _ _ bs) 0 #f) (write-bytes bs) (flush-output)] - [(routing-update g) (when (gestalt-empty? g) (do (quit)))]) + [(message (event _ (list (? eof-object?))) 1 #f) + (do (quit))] + [(message (event _ (list line)) 1 #f) + (do (send (tcp-channel local-handle remote-handle line)))] + [(message (tcp-channel _ _ bs) 0 #f) + (write-bytes bs) + (flush-output)] + [(routing-update g) + (when (gestalt-empty? g) (do (quit)))]) (loop))) diff --git a/minimart/examples/chat-client.rkt b/minimart/examples/chat-client.rkt index 799c65f..a54c1b2 100644 --- a/minimart/examples/chat-client.rkt +++ b/minimart/examples/chat-client.rkt @@ -26,5 +26,5 @@ #f (gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1) (sub (tcp-channel remote-handle local-handle ?)) - (pub (tcp-channel local-handle remote-handle ?)) - (pub (tcp-channel local-handle remote-handle ?) #:level 1))) + (sub (tcp-channel remote-handle local-handle ?) #:level 1) + (pub (tcp-channel local-handle remote-handle ?)))) diff --git a/minimart/examples/chat-userland.rkt b/minimart/examples/chat-userland.rkt index 7e85d08..6e292de 100644 --- a/minimart/examples/chat-userland.rkt +++ b/minimart/examples/chat-userland.rkt @@ -15,17 +15,22 @@ (define (say who fmt . vs) (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) - (define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1 #:level 1) - (pub (tcp-channel us them ?) #:meta-level 1) + (define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1) + (sub (tcp-channel them us ?) #:meta-level 1 #:level 1) (sub (tcp-channel them us ?) #:meta-level 1))) + (define (gestalt->peers g) + (matcher-key-set/single + (gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?))))) + (userland-thread #:gestalt (gestalt-union tcp-gestalt (sub `(,? says ,?)) (sub `(,? says ,?) #:level 1) (pub `(,user says ,?))) - (wait-for-gestalt tcp-gestalt) + (define orig-peers (gestalt->peers (wait-for-gestalt tcp-gestalt))) (send-to-remote "Welcome, ~a.\n" user) - (let loop ((old-peers (set))) + (for/list [(who orig-peers)] (say who "arrived.")) + (let loop ((old-peers orig-peers)) (match (next-event) [(message (tcp-channel _ _ bs) 1 #f) (do (send `(,user says ,(string-trim (bytes->string/utf-8 bs))))) @@ -35,8 +40,7 @@ (loop old-peers)] [(routing-update g) (when (gestalt-empty? (gestalt-filter g tcp-gestalt)) (do (quit))) - (define new-peers (matcher-key-set/single - (gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?))))) + (define new-peers (gestalt->peers g)) (for/list [(who (set-subtract new-peers old-peers))] (say who "arrived.")) (for/list [(who (set-subtract old-peers new-peers))] (say who "departed.")) (loop new-peers)])))) @@ -45,5 +49,4 @@ (spawn-world (spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?) #:meta-level 1 - #:demand-is-subscription? #f spawn-session)) diff --git a/minimart/examples/chat-userland2.rkt b/minimart/examples/chat-userland2.rkt index 385a409..0f2db70 100644 --- a/minimart/examples/chat-userland2.rkt +++ b/minimart/examples/chat-userland2.rkt @@ -10,8 +10,8 @@ (define (send-to-remote fmt . vs) (do (send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))))) - (define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1 #:level 1) - (pub (tcp-channel us them ?) #:meta-level 1) + (define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1) + (sub (tcp-channel them us ?) #:meta-level 1 #:level 1) (sub (tcp-channel them us ?) #:meta-level 1))) (define (decode-input bs) (string-trim (bytes->string/utf-8 bs))) @@ -43,8 +43,9 @@ (loop old-peers)] [(routing-update g) (when (gestalt-empty? (gestalt-filter g tcp-gestalt)) (do (quit))) - (define new-peers (matcher-key-set/single - (gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?))))) + (define new-peers + (matcher-key-set/single + (gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?))))) (for/list [(who (set-subtract new-peers old-peers))] (say who "arrived.")) (for/list [(who (set-subtract old-peers new-peers))] (say who "departed.")) (loop new-peers)])))) @@ -53,5 +54,4 @@ (spawn-world (spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?) #:meta-level 1 - #:demand-is-subscription? #f spawn-session)) diff --git a/minimart/examples/chat.rkt b/minimart/examples/chat.rkt index 4bd08cf..d86050f 100644 --- a/minimart/examples/chat.rkt +++ b/minimart/examples/chat.rkt @@ -5,8 +5,6 @@ (require "../drivers/tcp.rkt") (require "../demand-matcher.rkt") -(struct session (seen-remote? peers) #:transparent) - (define (spawn-session them us) (define user (gensym 'user)) (define remote-detector (compile-gestalt-projection (?!))) @@ -16,36 +14,35 @@ (define (say who fmt . vs) (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) (list (send-to-remote "Welcome, ~a.\n" user) - (spawn (lambda (e state) + (spawn (lambda (e old-peers) + (log-info "~a: ~v --> ~v" user e old-peers) (match e [(message (tcp-channel _ _ bs) 1 #f) - (transition state (send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))] + (transition old-peers + (send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))] [(message `(,who says ,what) 0 #f) - (transition state (say who "says: ~a" what))] + (transition old-peers (say who "says: ~a" what))] [(routing-update g) - (match-define (session seen-remote? old-peers) state) - (define remote-present? - (not (matcher-empty? (gestalt-project g 1 0 #f remote-detector)))) - (define new-peers (matcher-key-set/single (gestalt-project g 0 0 #t peer-detector))) + (define new-peers + (matcher-key-set/single (gestalt-project g 0 0 #t peer-detector))) (transition - (struct-copy session state - [seen-remote? (or remote-present? seen-remote?)] - [peers new-peers]) - (list (when (and seen-remote? (not remote-present?)) (quit)) - (for/list [(who (set-subtract new-peers old-peers))] (say who "arrived.")) - (for/list [(who (set-subtract old-peers new-peers))] (say who "departed."))))] + new-peers + (list (when (matcher-empty? (gestalt-project g 1 0 #t remote-detector)) (quit)) + (for/list [(who (set-subtract new-peers old-peers))] + (say who "arrived.")) + (for/list [(who (set-subtract old-peers new-peers))] + (say who "departed."))))] [#f #f])) - (session #f (set)) + (set) (gestalt-union (sub `(,? says ,?)) (sub `(,? says ,?) #:level 1) (pub `(,user says ,?)) (sub (tcp-channel them us ?) #:meta-level 1) - (pub (tcp-channel us them ?) #:meta-level 1) - (pub (tcp-channel us them ?) #:meta-level 1 #:level 1))))) + (sub (tcp-channel them us ?) #:meta-level 1 #:level 1) + (pub (tcp-channel us them ?) #:meta-level 1))))) (spawn-tcp-driver) (spawn-world (spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?) #:meta-level 1 - #:demand-is-subscription? #f spawn-session)) diff --git a/minimart/examples/tcp-hello.rkt b/minimart/examples/tcp-hello.rkt index 51e6468..4765758 100644 --- a/minimart/examples/tcp-hello.rkt +++ b/minimart/examples/tcp-hello.rkt @@ -20,11 +20,10 @@ (spawn connection-handler 0 (gestalt-union (sub (tcp-channel c server-id ?)) - (pub (tcp-channel server-id c ?)) - (pub (tcp-channel server-id c ?) #:level 1)))) + (sub (tcp-channel c server-id ?) #:level 1) + (pub (tcp-channel server-id c ?))))) (spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) server-id ?) - #:demand-is-subscription? #f spawn-connection-handler (lambda (c) (log-info "Connection handler ~v decided to exit" c) diff --git a/minimart/examples/ws-hello-ssl.rkt b/minimart/examples/ws-hello-ssl.rkt index 95b1def..89e5b38 100644 --- a/minimart/examples/ws-hello-ssl.rkt +++ b/minimart/examples/ws-hello-ssl.rkt @@ -26,7 +26,6 @@ (pub (websocket-message server-id c ?))))) (spawn-demand-matcher (websocket-message (?! any-client) server-id ?) - #:demand-is-subscription? #f spawn-connection-handler (lambda (c) (log-info "Connection handler ~v decided to exit" c) diff --git a/minimart/examples/ws-hello.rkt b/minimart/examples/ws-hello.rkt index 035d8b5..209a7f9 100644 --- a/minimart/examples/ws-hello.rkt +++ b/minimart/examples/ws-hello.rkt @@ -25,7 +25,6 @@ (pub (websocket-message server-id c ?))))) (spawn-demand-matcher (websocket-message (?! any-client) server-id ?) - #:demand-is-subscription? #f spawn-connection-handler (lambda (c) (log-info "Connection handler ~v decided to exit" c)