diff --git a/imperative/broker/server-connection.rkt b/imperative/broker/server-connection.rkt new file mode 100644 index 0000000..797e67b --- /dev/null +++ b/imperative/broker/server-connection.rkt @@ -0,0 +1,59 @@ +#lang imperative-syndicate + +(provide (struct-out server-connection) + (struct-out server-inbound) + (struct-out server-outbound) + (struct-out server-envelope)) + +(require "wire-protocol.rkt") +(require imperative-syndicate/term) +(require racket/set) + +;; Internal connection protocol +(assertion-struct server-connection (connection-id scope)) +(assertion-struct server-inbound (connection-id body)) +(assertion-struct server-outbound (connection-id body)) + +;; Internal isolation +(assertion-struct server-envelope (scope body)) + +(spawn #:name 'server-connection-factory + (during/spawn (server-connection $id $scope) + (define endpoints (set)) + + (on (message (server-inbound id (Assert $ep $a))) + (when (not (set-member? endpoints ep)) + (set! endpoints (set-add endpoints ep)) + (react + (on-stop (set! endpoints (set-remove endpoints ep))) + + (field [assertion a]) + + (define (recompute-endpoint) + (define a (assertion)) + (if (observe? a) + (let* ((pattern (observe-specification a)) + (spec (server-envelope scope pattern))) + (values (observe spec) + (term->skeleton-interest + spec + (capture-facet-context + (lambda (op . captured-values) + (schedule-script! + (current-actor) + (lambda () + (define ctor (match op ['+ Add] ['- Del] ['! Msg])) + (send! (server-outbound id (ctor ep captured-values)))))))))) + (values (server-envelope scope a) #f))) + (add-endpoint! (current-facet) "server" #t recompute-endpoint) + + (on (message (server-inbound id (Assert ep $new-a))) + (assertion new-a)) + + (stop-when (message (server-inbound id (Clear ep))))))) + + (on (message (server-inbound id (Message $body))) + (send! (server-envelope scope body))) + + (on (message (server-inbound id (Ping))) + (send! (server-outbound id (Pong)))))) diff --git a/imperative/broker/server.rkt b/imperative/broker/server.rkt index e8d3435..3d54ce2 100644 --- a/imperative/broker/server.rkt +++ b/imperative/broker/server.rkt @@ -4,65 +4,14 @@ server-facet/websocket) (require "wire-protocol.rkt") -(require "protocol.rkt") -(require imperative-syndicate/term) -(require racket/set) (require/activate imperative-syndicate/drivers/tcp) (require/activate imperative-syndicate/drivers/web) (require/activate imperative-syndicate/drivers/timer) - -;; Internal connection protocol -(assertion-struct server-connection (connection-id scope)) -(assertion-struct server-inbound (connection-id body)) -(assertion-struct server-outbound (connection-id body)) - -;; Internal isolation -(assertion-struct envelope (scope body)) +(require/activate imperative-syndicate/broker/server-connection) (define-logger syndicate/broker) -(spawn #:name 'server-connection-factory - (during/spawn (server-connection $id $scope) - (define endpoints (set)) - - (on (message (server-inbound id (Assert $ep $a))) - (when (not (set-member? endpoints ep)) - (set! endpoints (set-add endpoints ep)) - (react - (on-stop (set! endpoints (set-remove endpoints ep))) - - (field [assertion a]) - - (define (recompute-endpoint) - (define a (assertion)) - (if (observe? a) - (let* ((pattern (observe-specification a)) - (spec (envelope scope pattern))) - (values (observe spec) - (term->skeleton-interest - spec - (capture-facet-context - (lambda (op . captured-values) - (schedule-script! - (current-actor) - (lambda () - (define ctor (match op ['+ Add] ['- Del] ['! Msg])) - (send! (server-outbound id (ctor ep captured-values)))))))))) - (values (envelope scope a) #f))) - (add-endpoint! (current-facet) "server" #t recompute-endpoint) - - (on (message (server-inbound id (Assert ep $new-a))) - (assertion new-a)) - - (stop-when (message (server-inbound id (Clear ep))))))) - - (on (message (server-inbound id (Message $body))) - (send! (envelope scope body))) - - (on (message (server-inbound id (Ping))) - (send! (server-outbound id (Pong)))))) - (define (server-facet/tcp id scope) (assert (tcp-accepted id)) (assert (server-connection id scope))