#lang imperative-syndicate (provide server-facet/tcp server-facet/websocket) (require "wire-protocol.rkt") (require "protocol.rkt") (require imperative-syndicate/term) (require racket/set) (require/activate imperative-syndicate/drivers/tcp) (require/activate imperative-syndicate/drivers/web) ;; Internal connection protocol (assertion-struct server-connection (connection-id scope)) (assertion-struct server-inbound (connection-id body)) (assertion-struct server-outbound (connection-id body)) ;; Internal isolation (assertion-struct envelope (scope body)) (define-logger syndicate/broker) (spawn #:name 'server-connection-factory (during/spawn (server-connection $id $scope) (define endpoints (set)) (on (message (server-inbound id (Assert $ep $a))) (when (not (set-member? endpoints ep)) (set! endpoints (set-add endpoints ep)) (react (on-stop (set! endpoints (set-remove endpoints ep))) (field [assertion a]) (define (recompute-endpoint) (define a (assertion)) (if (observe? a) (let* ((pattern (observe-specification a)) (spec (envelope scope pattern))) (values (observe spec) (term->skeleton-interest spec (capture-facet-context (lambda (op . captured-values) (schedule-script! (current-actor) (lambda () (define ctor (match op ['+ Add] ['- Del] ['! Msg])) (send! (server-outbound id (ctor ep captured-values)))))))))) (values (envelope scope a) #f))) (add-endpoint! (current-facet) "server" #t recompute-endpoint) (on (message (server-inbound id (Assert ep $new-a))) (assertion new-a)) (stop-when (message (server-inbound id (Clear ep))))))) (on (message (server-inbound id (Message $body))) (send! (envelope scope body))))) (define (server-facet/tcp id scope) (assert (tcp-accepted id)) (assert (server-connection id scope)) (field [buffer #""]) (begin/dataflow (define-values (packet remainder) (decode (buffer))) (when packet (buffer remainder) (send! (server-inbound id packet)))) (on (message (tcp-in id $bs)) (buffer (bytes-append (buffer) bs))) (on (message (server-outbound id $p)) (send! (tcp-out id (encode p))))) (define (server-facet/websocket id scope) (assert (http-accepted id)) (assert (http-response-websocket id)) (assert (server-connection id scope)) (on (message (websocket-in id $body)) (define-values (packet remainder) (decode body)) (when (not (equal? remainder #"")) (error 'server-facet/websocket "Multiple packets in a single websocket message")) (send! (server-inbound id packet))) (on (message (server-outbound id $p)) (send! (websocket-out id (encode p))))) (when (log-level? syndicate/broker-logger 'debug) (spawn #:name 'server-debug (on (asserted (server-connection $id $scope)) (log-syndicate/broker-debug "C+ ~v ~v" id scope)) (on (retracted (server-connection $id $scope)) (log-syndicate/broker-debug "C- ~v ~v" id scope)) (on (message (server-inbound $id $p)) (log-syndicate/broker-debug "CIN ~v ~v" id p)) (on (message (server-outbound $id $p)) (log-syndicate/broker-debug "COUT ~v ~v" id p))))