#lang imperative-syndicate (require "wire-protocol.rkt") (require "internal-protocol.rkt") (require racket/set) (require imperative-syndicate/protocol/credit) (spawn #:name 'server-factory ;; Previously, we just had server-envelope. Now, we have both ;; server-envelope and server-proposal. While not everything ;; decided is (locally) suggested, it is true that everything ;; suggested is decided (in this implementation at least), ;; and the following clauses reflect this: (during (server-proposal $scope $assertion) (assert (server-envelope scope assertion))) (on (message (server-proposal $scope $body)) (send! (server-envelope scope body))) (during/spawn (server-poa $id) (on-start (issue-credit! message-poa->server id) (let-event [(message (message-poa->server id $p))] (match p [(Connect scope) (react (connected id scope))] [(Peer scope) (react (assert (federated-link id scope)))] [_ (send-error! id 'connection-not-setup)]))))) (define (send-error! id detail) (send! (message-server->poa id (Err detail)))) (define (connected id scope) (define endpoints (set)) (assert (server-active scope)) (on-start (issue-unbounded-credit! message-poa->server id)) (on (message (message-poa->server id $p)) (match p [(Assert ep a) #:when (not (set-member? endpoints ep)) (set! endpoints (set-add endpoints ep)) (react (on-stop (set! endpoints (set-remove endpoints ep))) (field [assertion a]) (assert (server-proposal scope (assertion))) (let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs))))))) (add-observer-endpoint! (lambda () (let ((a (assertion))) (when (observe? a) (server-envelope scope (observe-specification a))))) #:on-add (! Add) #:on-remove (! Del) #:on-message (! Msg))) (on (message (message-poa->server id (Assert ep $new-a))) (assertion new-a)) (stop-when (message (message-poa->server id (Clear ep)))))] [(Clear ep) #:when (set-member? endpoints ep) (void)] ;; handled by stop-when clause in facet established by Assert handler [(Message body) (send! (server-proposal scope body))] [other (unhandled-message id other)]))) (define (unhandled-message id p) (match p [(Connect _) (send-error! id 'duplicate-connection-setup)] [(Peer _) (send-error! id 'duplicate-connection-setup)] [(Ping) (send! (message-server->poa id (Pong)))] [(Pong) (void)] [_ (send-error! id 'invalid-message)]))