101 lines
3.7 KiB
Racket
101 lines
3.7 KiB
Racket
#lang imperative-syndicate
|
|
|
|
(provide server-facet/tcp
|
|
server-facet/websocket)
|
|
|
|
(require "wire-protocol.rkt")
|
|
(require "protocol.rkt")
|
|
(require imperative-syndicate/term)
|
|
(require racket/set)
|
|
|
|
(require/activate imperative-syndicate/drivers/tcp)
|
|
(require/activate imperative-syndicate/drivers/web)
|
|
|
|
;; Internal connection protocol
|
|
(assertion-struct server-connection (connection-id scope))
|
|
(assertion-struct server-inbound (connection-id body))
|
|
(assertion-struct server-outbound (connection-id body))
|
|
|
|
;; Internal isolation
|
|
(assertion-struct envelope (scope body))
|
|
|
|
(define-logger syndicate/broker)
|
|
|
|
(spawn #:name 'server-connection-factory
|
|
(during/spawn (server-connection $id $scope)
|
|
(define endpoints (set))
|
|
|
|
(on (message (server-inbound id (Assert $ep $a)))
|
|
(when (not (set-member? endpoints ep))
|
|
(set! endpoints (set-add endpoints ep))
|
|
(react
|
|
(on-stop (set! endpoints (set-remove endpoints ep)))
|
|
|
|
(field [assertion a])
|
|
|
|
(define (recompute-endpoint)
|
|
(define a (assertion))
|
|
(if (observe? a)
|
|
(let* ((pattern (observe-specification a))
|
|
(spec (envelope scope pattern)))
|
|
(values (observe spec)
|
|
(term->skeleton-interest
|
|
spec
|
|
(capture-facet-context
|
|
(lambda (op . captured-values)
|
|
(schedule-script!
|
|
(current-actor)
|
|
(lambda ()
|
|
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
|
|
(send! (server-outbound id (ctor ep captured-values))))))))))
|
|
(values (envelope scope a) #f)))
|
|
(add-endpoint! (current-facet) "server" #t recompute-endpoint)
|
|
|
|
(on (message (server-inbound id (Assert ep $new-a)))
|
|
(assertion new-a))
|
|
|
|
(stop-when (message (server-inbound id (Clear ep)))))))
|
|
|
|
(on (message (server-inbound id (Message $body)))
|
|
(send! (envelope scope body)))))
|
|
|
|
(define (server-facet/tcp id scope)
|
|
(assert (tcp-accepted id))
|
|
(assert (server-connection id scope))
|
|
|
|
(field [buffer #""])
|
|
(begin/dataflow
|
|
(define-values (packet remainder) (decode (buffer)))
|
|
(when packet
|
|
(buffer remainder)
|
|
(send! (server-inbound id packet))))
|
|
|
|
(on (message (tcp-in id $bs))
|
|
(buffer (bytes-append (buffer) bs)))
|
|
(on (message (server-outbound id $p))
|
|
(send! (tcp-out id (encode p)))))
|
|
|
|
(define (server-facet/websocket id scope)
|
|
(assert (http-accepted id))
|
|
(assert (http-response-websocket id))
|
|
(assert (server-connection id scope))
|
|
|
|
(on (message (websocket-in id $body))
|
|
(define-values (packet remainder) (decode body))
|
|
(when (not (equal? remainder #""))
|
|
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
|
|
(send! (server-inbound id packet)))
|
|
(on (message (server-outbound id $p))
|
|
(send! (websocket-out id (encode p)))))
|
|
|
|
(when (log-level? syndicate/broker-logger 'debug)
|
|
(spawn #:name 'server-debug
|
|
(on (asserted (server-connection $id $scope))
|
|
(log-syndicate/broker-debug "C+ ~v ~v" id scope))
|
|
(on (retracted (server-connection $id $scope))
|
|
(log-syndicate/broker-debug "C- ~v ~v" id scope))
|
|
(on (message (server-inbound $id $p))
|
|
(log-syndicate/broker-debug "CIN ~v ~v" id p))
|
|
(on (message (server-outbound $id $p))
|
|
(log-syndicate/broker-debug "COUT ~v ~v" id p))))
|