syndicate-rkt/syndicate/distributed/server.rkt

84 lines
3.3 KiB
Racket
Raw Normal View History

2019-03-18 15:34:14 +00:00
#lang imperative-syndicate
(provide (struct-out server-poa)
(struct-out message-poa->server)
(struct-out message-server->poa)
(struct-out server-proposal)
(struct-out server-envelope))
2019-03-18 15:34:14 +00:00
(require "wire-protocol.rkt")
(require racket/set)
;; Internal connection protocol
(assertion-struct server-poa (connection-id)) ;; "Point of Attachment"
(assertion-struct message-poa->server (connection-id body))
(assertion-struct message-server->poa (connection-id body))
;; Internal isolation -- these are isomorphic to `to-server` and `from-server`!
;; (and, for that matter, to `outbound` and `inbound`!)
(assertion-struct server-proposal (scope body)) ;; suggestions (~ actions)
(assertion-struct server-envelope (scope body)) ;; decisions (~ events)
(spawn #:name 'server-factory
;; Previously, we just had server-envelope. Now, we have both
;; server-envelope and server-proposal. While not everything
;; decided is (locally) suggested, it is true that everything
;; suggested is decided (in this implementation at least),
;; and the following clause reflects this:
(during (server-proposal $scope $assertion)
(assert (server-envelope scope assertion)))
(during/spawn (server-poa $id)
(on (message (message-poa->server id $p))
(match p
[(Connect scope) (stop-current-facet (react (connected id scope)))]
[(Peer scope) (stop-current-facet (react (peering id scope)))]
[_ (send-error! id 'connection-not-setup)]))))
(define (send-error! id detail)
(send! (message-server->poa id (Err detail))))
(define (unhandled-message id p)
(match p
[(Connect _) (send-error! id 'duplicate-connection-setup)]
[(Peer _) (send-error! id 'duplicate-connection-setup)]
[(Ping) (send! (message-server->poa id (Pong)))]
[_ (send-error! id 'invalid-message)]))
(define (connected id scope)
(define endpoints (set))
(on (message (message-poa->server id $p))
(match p
[(Assert ep a) #:when (not (set-member? endpoints ep))
(set! endpoints (set-add endpoints ep))
(react
(on-stop (set! endpoints (set-remove endpoints ep)))
(field [assertion a])
(assert (server-proposal scope (assertion)))
(let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs)))))))
(add-observer-endpoint! (lambda ()
(let ((a (assertion)))
(when (observe? a)
(server-envelope scope (observe-specification a)))))
#:on-add (! Add)
#:on-remove (! Del)
#:on-message (! Msg)))
(on (message (message-poa->server id (Assert ep $new-a)))
(assertion new-a))
(stop-when (message (message-poa->server id (Clear ep)))))]
[(Clear ep) #:when (set-member? endpoints ep)
(void)] ;; handled by stop-when clause in facet established by Assert handler
[(Message body)
(send! (server-envelope scope body))]
[other
(unhandled-message id other)])))
(define (peering id scope)
(error 'peering "Not yet implemented"))