Turn-based federation and client/server protocol
This commit is contained in:
parent
be8fefaf4d
commit
72b4326651
|
@ -0,0 +1,14 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide make-buffer)
|
||||
|
||||
(define (make-buffer)
|
||||
(field [pending '()])
|
||||
(define (push item)
|
||||
(pending (cons item (pending))))
|
||||
(define (drain handler)
|
||||
(begin/dataflow
|
||||
(when (pair? (pending))
|
||||
(for [(item (in-list (reverse (pending))))] (handler item))
|
||||
(pending '()))))
|
||||
(values push drain))
|
|
@ -5,6 +5,7 @@
|
|||
(require "wire-protocol.rkt")
|
||||
(require "internal-protocol.rkt")
|
||||
(require "protocol.rkt")
|
||||
(require "turn.rkt")
|
||||
(require imperative-syndicate/term)
|
||||
|
||||
(define-logger syndicate/distributed)
|
||||
|
@ -14,6 +15,8 @@
|
|||
(during (observe (from-server $a _)) (assert (server-connection a)))
|
||||
(during (observe (server-connected $a)) (assert (server-connection a))))
|
||||
|
||||
(struct sub (spec [captures #:mutable]) #:transparent)
|
||||
|
||||
(define (generic-client-session-facet address scope w)
|
||||
(on-start (log-syndicate/distributed-info "Connected to ~v" address))
|
||||
(on-stop (log-syndicate/distributed-info "Disconnected from ~v" address))
|
||||
|
@ -25,7 +28,7 @@
|
|||
(log-syndicate/distributed-debug "C OUT ~v ~v" address p)
|
||||
(w* p)))))
|
||||
|
||||
(on-start (w (Connect scope)))
|
||||
(define turn (turn-recorder (lambda (items) (w (Turn items)))))
|
||||
|
||||
(define next-ep
|
||||
(let ((counter 0))
|
||||
|
@ -33,13 +36,38 @@
|
|||
(begin0 counter
|
||||
(set! counter (+ counter 1))))))
|
||||
|
||||
(during (to-server address $a)
|
||||
(define ep (next-ep))
|
||||
(on-start (w (Assert ep a)))
|
||||
(on-stop (w (Clear ep))))
|
||||
(define pubs (hash))
|
||||
(define subs (hash))
|
||||
(define matches (hash))
|
||||
|
||||
(on-start (w (Connect scope)))
|
||||
(on-stop (for* [(s (in-hash-values matches)) (a (in-hash-values (sub-captures s)))] (retract! a)))
|
||||
|
||||
(define (instantiate s vs)
|
||||
(instantiate-term->value (from-server address (sub-spec s)) vs))
|
||||
|
||||
(on (asserted (to-server address $a))
|
||||
(define ep (next-ep))
|
||||
(extend-turn! turn (Assert ep a))
|
||||
(set! pubs (hash-set pubs a ep)))
|
||||
|
||||
(on (retracted (to-server address $a))
|
||||
(define ep (hash-ref pubs a))
|
||||
(extend-turn! turn (Clear ep))
|
||||
(set! pubs (hash-remove pubs a)))
|
||||
|
||||
(on (message (to-server address $a))
|
||||
(w (Message a)))
|
||||
(extend-turn! turn (Message a)))
|
||||
|
||||
(on (asserted (observe (from-server address $spec)))
|
||||
(define ep (next-ep))
|
||||
(extend-turn! turn (Assert ep (observe spec)))
|
||||
(set! subs (hash-set subs spec ep))
|
||||
(set! matches (hash-set matches ep (sub spec (hash)))))
|
||||
|
||||
(on (retracted (observe (from-server address $spec)))
|
||||
(extend-turn! turn (Clear (hash-ref subs spec)))
|
||||
(set! subs (hash-remove subs spec)))
|
||||
|
||||
(on (message (server-packet address (Ping)))
|
||||
(w (Pong)))
|
||||
|
@ -48,12 +76,20 @@
|
|||
(log-syndicate/distributed-error "Error from ~a: ~v" address detail)
|
||||
(stop-current-facet))
|
||||
|
||||
(during (observe ($ pat (from-server address $spec)))
|
||||
(define ep (next-ep))
|
||||
(on-start (w (Assert ep (observe spec))))
|
||||
(on-stop (w (Clear ep)))
|
||||
(on (message (server-packet address (Add ep $vs)))
|
||||
(react (assert (instantiate-term->value pat vs))
|
||||
(stop-when (message (server-packet address (Del ep vs))))))
|
||||
(on (message (server-packet address (Msg ep $vs)))
|
||||
(send! (instantiate-term->value pat vs)))))
|
||||
(on (message (server-packet address (Turn $items)))
|
||||
(for [(item (in-list items))]
|
||||
(match item
|
||||
[(Add ep vs) (let* ((s (hash-ref matches ep))
|
||||
(a (instantiate s vs)))
|
||||
(set-sub-captures! s (hash-set (sub-captures s) vs a))
|
||||
(assert! a))]
|
||||
[(Del ep vs) (let* ((s (hash-ref matches ep))
|
||||
(a (hash-ref (sub-captures s) vs)))
|
||||
(retract! a)
|
||||
(set-sub-captures! s (hash-remove (sub-captures s) vs)))]
|
||||
[(Msg ep vs) (let* ((s (hash-ref matches ep)))
|
||||
(send! (instantiate s vs)))]
|
||||
[(End ep) (let* ((s (hash-ref matches ep #f)))
|
||||
(when s
|
||||
(for [(a (in-hash-values (sub-captures s)))] (retract! a))
|
||||
(set! matches (hash-remove matches ep))))]))))
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
(require "wire-protocol.rkt")
|
||||
(require "internal-protocol.rkt")
|
||||
(require "protocol.rkt")
|
||||
(require "buffer.rkt")
|
||||
(require "turn.rkt")
|
||||
|
||||
(require imperative-syndicate/term)
|
||||
(require imperative-syndicate/reassert)
|
||||
|
@ -100,33 +102,27 @@
|
|||
;; We have to buffer in both directions, because at startup there's latency
|
||||
;; between asserting a federated-link record and it being ready to receive
|
||||
;; message-poa->server records.
|
||||
(field [pending-in '()]
|
||||
[pending-out '()])
|
||||
(define-values (push-in drain-in) (make-buffer))
|
||||
(define-values (push-out drain-out) (make-buffer))
|
||||
|
||||
(on (message (from-server peer-addr (message-server->poa session-id $p)))
|
||||
(pending-in (cons p (pending-in))))
|
||||
|
||||
(push-in p))
|
||||
(on (message (server-envelope management-scope (message-server->poa session-id $p)))
|
||||
(pending-out (cons p (pending-out))))
|
||||
(push-out p))
|
||||
|
||||
(define (wrap p) (message-poa->server session-id p))
|
||||
(during (server-envelope management-scope (observe (message-poa->server session-id _)))
|
||||
(during (from-server peer-addr (observe (message-poa->server session-id _)))
|
||||
(begin/dataflow
|
||||
(when (pair? (pending-in))
|
||||
(for [(p (reverse (pending-in)))]
|
||||
(send! (server-proposal management-scope
|
||||
(message-poa->server session-id p))))
|
||||
(pending-in '())))
|
||||
(begin/dataflow
|
||||
(when (pair? (pending-out))
|
||||
(for [(p (reverse (pending-out)))]
|
||||
(send! (to-server peer-addr (message-poa->server session-id p))))
|
||||
(pending-out '())))))))))
|
||||
(drain-in (lambda (p) (send! (server-proposal management-scope (wrap p)))))
|
||||
(drain-out (lambda (p) (send! (to-server peer-addr (wrap p)))))))))))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
;; Local links.
|
||||
|
||||
(spawn #:name 'federated-local-link-factory
|
||||
|
||||
(struct sub (spec [captures #:mutable]) #:transparent)
|
||||
|
||||
(during (federation-management-scope $management-scope)
|
||||
(during (server-envelope management-scope (federated-link _ $scope))
|
||||
(during/spawn (server-active scope)
|
||||
|
@ -138,93 +134,121 @@
|
|||
(define (!! m)
|
||||
(send! (server-proposal management-scope (message-poa->server session-id m))))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-server->poa session-id
|
||||
(Assert $subid (observe $spec)))))
|
||||
(react
|
||||
(define ((! ctor) cs) (!! (ctor subid cs)))
|
||||
(add-observer-endpoint! (lambda () (server-proposal scope spec))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg))
|
||||
(assert (server-envelope scope (observe spec)))
|
||||
(stop-when (message
|
||||
(server-envelope management-scope
|
||||
(message-server->poa session-id (Clear subid)))))))
|
||||
(define turn (turn-recorder (lambda (items) (!! (Turn items)))))
|
||||
|
||||
(during (observe ($ pat (server-envelope scope $spec)))
|
||||
(define ep (gensym 'ep))
|
||||
(on-start (!! (Assert ep (observe spec))))
|
||||
(on-stop (!! (Clear ep)))
|
||||
(on (message (server-envelope management-scope
|
||||
(message-server->poa session-id (Add ep $captures))))
|
||||
(react (assert (instantiate-term->value pat captures))
|
||||
(stop-when (message
|
||||
(server-envelope management-scope
|
||||
(message-server->poa session-id
|
||||
(Del ep captures)))))))
|
||||
(on (message (server-envelope management-scope
|
||||
(message-server->poa session-id (Msg ep $captures))))
|
||||
(send! (instantiate-term->value pat captures))))))))
|
||||
(define remote-endpoints (hash))
|
||||
(define local-endpoints (hash))
|
||||
(define local-matches (hash))
|
||||
|
||||
(define (instantiate s vs)
|
||||
(instantiate-term->value (server-envelope scope (sub-spec s)) vs))
|
||||
|
||||
(on (asserted (observe (server-envelope scope $spec)))
|
||||
(define ep (gensym 'ep))
|
||||
(extend-turn! turn (Assert ep (observe spec)))
|
||||
(set! local-endpoints (hash-set local-endpoints spec ep))
|
||||
(set! local-matches (hash-set local-matches ep (sub spec (hash)))))
|
||||
|
||||
(on (retracted (observe (server-envelope scope $spec)))
|
||||
(define ep (hash-ref local-endpoints spec))
|
||||
(extend-turn! turn (Clear ep))
|
||||
(set! local-endpoints (hash-remove local-endpoints spec)))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-server->poa session-id (Turn $items))))
|
||||
(for [(item (in-list items))]
|
||||
(match item
|
||||
[(Assert subid (observe spec))
|
||||
(when (hash-has-key? remote-endpoints subid)
|
||||
(error 'local-link "Duplicate endpoint" subid))
|
||||
(react
|
||||
(define ep-facet (current-facet))
|
||||
(set! remote-endpoints (hash-set remote-endpoints subid ep-facet))
|
||||
(on-stop (set! remote-endpoints (hash-remove remote-endpoints subid)))
|
||||
(assert (server-envelope scope (observe spec)))
|
||||
(define ((! ctor) cs) (extend-turn! turn (ctor subid cs)))
|
||||
(add-observer-endpoint! (lambda () (server-proposal scope spec))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg)))]
|
||||
[(Clear subid)
|
||||
(stop-facet (hash-ref remote-endpoints subid)
|
||||
(extend-turn! turn (End subid)))]
|
||||
[(Add ep vs) (let* ((s (hash-ref local-matches ep))
|
||||
(a (instantiate s vs)))
|
||||
(set-sub-captures! s (hash-set (sub-captures s) vs a))
|
||||
(assert! a))]
|
||||
[(Del ep vs) (let* ((s (hash-ref local-matches ep))
|
||||
(a (hash-ref (sub-captures s) vs)))
|
||||
(retract! a)
|
||||
(set-sub-captures! s (hash-remove (sub-captures s) vs)))]
|
||||
[(Msg ep vs) (let* ((s (hash-ref local-matches ep)))
|
||||
(send! (instantiate s vs)))]
|
||||
[(End ep) (let* ((s (hash-ref local-matches ep #f)))
|
||||
(when s
|
||||
(for [(a (in-hash-values (sub-captures s)))] (retract! a))
|
||||
(set! local-matches (hash-remove local-matches ep))))])))))))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
;; Federated scopes.
|
||||
|
||||
;; Internal state
|
||||
(struct subscription (id ;; LocalID
|
||||
spec ;; Assertion
|
||||
holders ;; (Hash LinkID SubscriptionID)
|
||||
matches ;; (Hash (Listof Assertion) (Set LinkID))
|
||||
)
|
||||
#:transparent)
|
||||
|
||||
(spawn #:name 'federated-scope-factory
|
||||
|
||||
(struct subscription (id ;; LocalID
|
||||
spec ;; Assertion
|
||||
holders ;; (Hash LinkID SubscriptionID)
|
||||
matches ;; (Hash (Listof Assertion) (Set LinkID))
|
||||
)
|
||||
#:transparent)
|
||||
|
||||
(during (federation-management-scope $management-scope)
|
||||
|
||||
(define (send-to-link! linkid m)
|
||||
(send! (server-proposal management-scope (message-server->poa linkid m))))
|
||||
|
||||
(during/spawn (server-envelope management-scope (federated-link _ $scope))
|
||||
#:name (list 'federated-scope management-scope scope)
|
||||
|
||||
;; Generates a fresh local ID naming a subscription propagated to our peers.
|
||||
(define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1))))))
|
||||
|
||||
(field [peers (set)] ;; (Set LinkID)
|
||||
(field [turns (hash)] ;; (Map LinkID Turn)
|
||||
[specs (hash)] ;; (Hash Spec LocalID)
|
||||
[subs (hasheq)] ;; (Hash LocalID Subscription)
|
||||
)
|
||||
|
||||
(when (log-level? syndicate/federation-logger 'debug)
|
||||
(begin/dataflow (log-syndicate/federation-debug "~a peers:" scope)
|
||||
(for [(peer (in-set (peers)))]
|
||||
(log-syndicate/federation-debug " link ~v" peer))
|
||||
(log-syndicate/federation-debug "-"))
|
||||
(begin/dataflow (log-syndicate/federation-debug "~a specs:" scope)
|
||||
(for [((spec local) (in-hash (specs)))]
|
||||
(log-syndicate/federation-debug " spec ~v -> local ~a" spec local))
|
||||
(log-syndicate/federation-debug "-"))
|
||||
(begin/dataflow (log-syndicate/federation-debug "~a subs:" scope)
|
||||
(for [((local sub) (in-hash (subs)))]
|
||||
(match-define (subscription _id spec holders matches) sub)
|
||||
(log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec)
|
||||
(when (not (hash-empty? holders))
|
||||
(log-syndicate/federation-debug " holders:")
|
||||
(for [((link ep) (in-hash holders))]
|
||||
(log-syndicate/federation-debug " link ~a -> ep ~a" link ep)))
|
||||
(when (not (hash-empty? matches))
|
||||
(log-syndicate/federation-debug " matches:")
|
||||
(for [((captures holders) (in-hash matches))]
|
||||
(log-syndicate/federation-debug " captures ~v held by ~a"
|
||||
captures holders))))
|
||||
(log-syndicate/federation-debug "-")))
|
||||
(define (send-to-link! peer p)
|
||||
(extend-turn! (hash-ref (turns) peer) p))
|
||||
|
||||
(define (call-with-sub localid linkid f)
|
||||
(when (log-level? syndicate/federation-logger 'debug)
|
||||
(begin/dataflow
|
||||
(log-syndicate/federation-debug "~a turns:" scope)
|
||||
(for [((peer turn) (in-hash (turns)))]
|
||||
(log-syndicate/federation-debug " link ~v -> ~v" peer (turn 'debug)))
|
||||
(log-syndicate/federation-debug "-"))
|
||||
(begin/dataflow
|
||||
(log-syndicate/federation-debug "~a specs:" scope)
|
||||
(for [((spec local) (in-hash (specs)))]
|
||||
(log-syndicate/federation-debug " spec ~v -> local ~a" spec local))
|
||||
(log-syndicate/federation-debug "-"))
|
||||
(begin/dataflow
|
||||
(log-syndicate/federation-debug "~a subs:" scope)
|
||||
(for [((local sub) (in-hash (subs)))]
|
||||
(match-define (subscription _id spec holders matches) sub)
|
||||
(log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec)
|
||||
(when (not (hash-empty? holders))
|
||||
(log-syndicate/federation-debug " holders:")
|
||||
(for [((link ep) (in-hash holders))]
|
||||
(log-syndicate/federation-debug " link ~a -> ep ~a" link ep)))
|
||||
(when (not (hash-empty? matches))
|
||||
(log-syndicate/federation-debug " matches:")
|
||||
(for [((captures holders) (in-hash matches))]
|
||||
(log-syndicate/federation-debug " captures ~v held by ~a"
|
||||
captures holders))))
|
||||
(log-syndicate/federation-debug "-")))
|
||||
|
||||
(define (call-with-sub localid linkid f #:not-found-ok? [not-found-ok? #t])
|
||||
(match (hash-ref (subs) localid #f)
|
||||
[#f (log-syndicate/federation-error
|
||||
"Mention of nonexistent local ID ~v from link ~v. Ignoring."
|
||||
localid
|
||||
linkid)]
|
||||
[#f (when (not not-found-ok?)
|
||||
(log-syndicate/federation-error
|
||||
"Mention of nonexistent local ID ~v from link ~v. Ignoring."
|
||||
localid linkid))]
|
||||
[sub (f sub)]))
|
||||
|
||||
(define (store-sub! sub)
|
||||
|
@ -236,6 +260,7 @@
|
|||
|
||||
(define (unsubscribe! localid linkid)
|
||||
(call-with-sub
|
||||
#:not-found-ok? #f
|
||||
localid linkid
|
||||
(lambda (sub)
|
||||
(define new-holders (hash-remove (subscription-holders sub) linkid))
|
||||
|
@ -248,9 +273,9 @@
|
|||
;; - if =0, we retract the subscription from all peers except linkid
|
||||
|
||||
(match (hash-count new-holders)
|
||||
[0 (for [(peer (in-set (peers)))]
|
||||
[0 (for [((peer turn) (in-hash (turns)))]
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (Clear localid))))]
|
||||
(extend-turn! turn (Clear localid))))]
|
||||
[1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid
|
||||
(send-to-link! peer (Clear localid)))]
|
||||
[_ (void)]))))
|
||||
|
@ -278,15 +303,35 @@
|
|||
|
||||
(during (server-envelope management-scope (federated-link $linkid scope))
|
||||
|
||||
(on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid)
|
||||
(peers (set-add (peers) linkid)))
|
||||
(on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid)
|
||||
(peers (set-remove (peers) linkid)))
|
||||
(define turn (turn-recorder
|
||||
(lambda (items)
|
||||
(send! (server-proposal management-scope
|
||||
(message-server->poa linkid (Turn items)))))))
|
||||
|
||||
(field [link-subs (hash)] ;; (Hash SubscriptionID LocalID)
|
||||
[link-matches (hash)] ;; (Hash LocalID (Set (Listof Assertion)))
|
||||
)
|
||||
|
||||
(define (err! detail)
|
||||
(send! (server-proposal management-scope (message-server->poa linkid (Err detail))))
|
||||
(reset-turn! turn)
|
||||
(stop-current-facet))
|
||||
|
||||
(on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid)
|
||||
(turns (hash-set (turns) linkid turn))
|
||||
(for ([(spec localid) (in-hash (specs))])
|
||||
(extend-turn! turn (Assert localid (observe spec))))
|
||||
(commit-turn! turn))
|
||||
|
||||
(on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid)
|
||||
(turns (hash-remove (turns) linkid))
|
||||
(for [((localid matches) (in-hash (link-matches)))]
|
||||
(for [(captures (in-set matches))]
|
||||
(remove-match! localid captures linkid)))
|
||||
(for ([localid (in-hash-values (link-subs))])
|
||||
(unsubscribe! localid linkid))
|
||||
(commit-turn! turn))
|
||||
|
||||
(when (log-level? syndicate/federation-logger 'debug)
|
||||
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid)
|
||||
(for [((sub local) (in-hash (link-subs)))]
|
||||
|
@ -299,136 +344,111 @@
|
|||
local captures)))
|
||||
(log-syndicate/federation-debug "-")))
|
||||
|
||||
(define (err! detail)
|
||||
(send-to-link! linkid (Err detail))
|
||||
(stop-current-facet))
|
||||
|
||||
(on-start (for ([(spec localid) (in-hash (specs))])
|
||||
(send-to-link! linkid (Assert localid (observe spec)))))
|
||||
|
||||
(on-stop (for [((localid matches) (in-hash (link-matches)))]
|
||||
(for [(captures (in-set matches))]
|
||||
(remove-match! localid captures linkid)))
|
||||
(for ([localid (in-hash-values (link-subs))])
|
||||
(unsubscribe! localid linkid)))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-poa->server linkid
|
||||
(Assert $subid (observe $spec)))))
|
||||
(define known? (hash-has-key? (specs) spec))
|
||||
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
|
||||
(define sub
|
||||
(hash-ref (subs) localid (lambda () (subscription localid spec (hash) (hash)))))
|
||||
(define holders (subscription-holders sub))
|
||||
(cond
|
||||
[(hash-has-key? holders linkid)
|
||||
(log-syndicate/federation-error
|
||||
"Duplicate subscription ~a, ID ~a, from link ~a. Ignoring."
|
||||
spec
|
||||
subid
|
||||
linkid)]
|
||||
[else
|
||||
(link-subs (hash-set (link-subs) subid localid))
|
||||
(when (not known?) (specs (hash-set (specs) spec localid)))
|
||||
(subs (hash-set (subs) localid (struct-copy subscription sub
|
||||
[holders (hash-set holders linkid subid)])))
|
||||
(message-poa->server linkid (Turn $items))))
|
||||
(for [(item (in-list items))]
|
||||
(match item
|
||||
[(Assert subid (observe spec))
|
||||
(define known? (hash-has-key? (specs) spec))
|
||||
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
|
||||
(define sub (hash-ref (subs) localid (lambda () (subscription localid
|
||||
spec
|
||||
(hash)
|
||||
(hash)))))
|
||||
(define holders (subscription-holders sub))
|
||||
(cond
|
||||
[(hash-has-key? holders linkid)
|
||||
(log-syndicate/federation-error
|
||||
"Duplicate subscription ~a, ID ~a, from link ~a."
|
||||
spec subid linkid)
|
||||
(err! 'duplicate-subscription)]
|
||||
[else
|
||||
(link-subs (hash-set (link-subs) subid localid))
|
||||
(when (not known?) (specs (hash-set (specs) spec localid)))
|
||||
(subs (hash-set (subs)
|
||||
localid
|
||||
(struct-copy subscription sub
|
||||
[holders (hash-set holders linkid subid)])))
|
||||
|
||||
;; If not known, then relay the subscription to all peers except `linkid`.
|
||||
;;
|
||||
;; If known, then one or more links that aren't this one have previously
|
||||
;; subscribed with this spec. If exactly one other link has previously
|
||||
;; subscribed, the only subscription that needs sent is to that peer;
|
||||
;; otherwise, no subscriptions at all need sent, since everyone has already
|
||||
;; been informed of this subscription.
|
||||
;; If not known, then relay the subscription to all peers except `linkid`.
|
||||
;;
|
||||
;; If known, then one or more links that aren't this one have previously
|
||||
;; subscribed with this spec. If exactly one other link has previously
|
||||
;; subscribed, the only subscription that needs sent is to that peer;
|
||||
;; otherwise, no subscriptions at all need sent, since everyone has already
|
||||
;; been informed of this subscription.
|
||||
|
||||
(cond
|
||||
[(not known?)
|
||||
(for [(peer (in-set (peers)))]
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (Assert localid (observe spec)))))]
|
||||
[(= (hash-count holders) 1)
|
||||
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid
|
||||
(send-to-link! peer (Assert localid (observe spec))))]
|
||||
[else
|
||||
(void)])
|
||||
(cond
|
||||
[(not known?)
|
||||
(for [((peer peer-turn) (in-hash (turns)))]
|
||||
(when (not (equal? peer linkid))
|
||||
(extend-turn! peer-turn (Assert localid (observe spec)))))]
|
||||
[(= (hash-count holders) 1)
|
||||
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid
|
||||
(send-to-link! peer (Assert localid (observe spec))))]
|
||||
[else
|
||||
(void)])
|
||||
|
||||
;; Once subscription relaying has taken place, send up matches to the active
|
||||
;; link.
|
||||
(for [((captures match-holders) (in-hash (subscription-matches sub)))]
|
||||
;; Compute the number of times someone OTHER THAN this link has asserted
|
||||
;; a match to this spec. If it's nonzero, we need to hear about it:
|
||||
(when (not (set-empty? (set-remove match-holders linkid)))
|
||||
(send-to-link! linkid (Add subid captures))))
|
||||
;; Once subscription relaying has taken place, send up matches to the active
|
||||
;; link.
|
||||
(for [((captures match-holders) (in-hash (subscription-matches sub)))]
|
||||
;; Compute the number of times someone OTHER THAN this link has asserted
|
||||
;; a match to this spec. If it's nonzero, we need to hear about it:
|
||||
(when (not (set-empty? (set-remove match-holders linkid)))
|
||||
(extend-turn! turn (Add subid captures))))
|
||||
|
||||
]))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-poa->server linkid (Clear $subid))))
|
||||
(match (hash-ref (link-subs) subid #f)
|
||||
[#f (log-syndicate/federation-error
|
||||
"Mention of nonexistent subscription ID ~v from link ~v. Ignoring."
|
||||
subid
|
||||
linkid)]
|
||||
[localid
|
||||
(link-subs (hash-remove (link-subs) subid))
|
||||
(unsubscribe! localid linkid)]))
|
||||
|
||||
;; OPTIMISATION. Logically, the remote peer receiving a `Clear` should retract any
|
||||
;; matching `Add`s that it had previously sent. However we don't want to incur the
|
||||
;; cost of the calculation and the network traffic. Happily, we have all the
|
||||
;; information we need locally! So instead we make use of the multicast (!)
|
||||
;; facility Syndicate offers and snoop on outbound messages.
|
||||
(on (message (server-envelope management-scope
|
||||
(message-server->poa linkid (Clear $localid))))
|
||||
(for [(captures (in-set (hash-ref (link-matches) localid set)))]
|
||||
(log-syndicate/federation-debug "Retracting on clear ~v ~v ~v"
|
||||
linkid localid captures)
|
||||
(remove-match! localid captures linkid))
|
||||
(link-matches (hash-remove (link-matches) localid)))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-poa->server linkid (Add $localid $captures))))
|
||||
(define matches (hash-ref (link-matches) localid set))
|
||||
(if (set-member? matches captures)
|
||||
(err! 'duplicate-capture)
|
||||
(begin
|
||||
(link-matches (hash-set (link-matches) localid (set-add matches captures)))
|
||||
(call-with-sub
|
||||
localid linkid
|
||||
(lambda (sub)
|
||||
(define old-matches (subscription-matches sub))
|
||||
(define old-match-holders (hash-ref old-matches captures set))
|
||||
(define new-match-holders (set-add old-match-holders linkid))
|
||||
(define new-matches (hash-set old-matches captures new-match-holders))
|
||||
(store-sub! (struct-copy subscription sub [matches new-matches]))
|
||||
(match (set-count old-match-holders)
|
||||
[0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))]
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (Add peer-subid captures))))]
|
||||
[1 (for [(peer (in-set old-match-holders))] ;; only one, ≠ linkid
|
||||
(define maybe-peer-subid (hash-ref (subscription-holders sub) peer #f))
|
||||
(when maybe-peer-subid ;; the other holder may not itself subscribe!
|
||||
(send-to-link! peer (Add maybe-peer-subid captures))))]
|
||||
[_ (void)]))))))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-poa->server linkid (Del $localid $captures))))
|
||||
(define matches (hash-ref (link-matches) localid set))
|
||||
(if (not (set-member? matches captures))
|
||||
(err! 'nonexistent-capture)
|
||||
(let ((new-matches (set-remove matches captures)))
|
||||
(link-matches (if (set-empty? new-matches)
|
||||
(hash-remove (link-matches) localid)
|
||||
(hash-set (link-matches) localid new-matches)))
|
||||
(remove-match! localid captures linkid))))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-poa->server linkid (Msg $localid $captures))))
|
||||
(call-with-sub
|
||||
localid linkid
|
||||
(lambda (sub)
|
||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (Msg peer-subid captures)))))))
|
||||
|
||||
))))
|
||||
])]
|
||||
[(Clear subid)
|
||||
(match (hash-ref (link-subs) subid #f)
|
||||
[#f (log-syndicate/federation-error
|
||||
"Mention of nonexistent subscription ID ~v from link ~v."
|
||||
subid linkid)
|
||||
(err! 'nonexistent-subscription)]
|
||||
[localid
|
||||
(link-subs (hash-remove (link-subs) subid))
|
||||
(unsubscribe! localid linkid)])]
|
||||
[(End localid)
|
||||
(for [(captures (in-set (hash-ref (link-matches) localid set)))]
|
||||
(remove-match! localid captures linkid))
|
||||
(link-matches (hash-remove (link-matches) localid))]
|
||||
[(Add localid captures)
|
||||
(define matches (hash-ref (link-matches) localid set))
|
||||
(cond
|
||||
[(set-member? matches captures)
|
||||
(err! 'duplicate-capture)]
|
||||
[else
|
||||
(link-matches (hash-set (link-matches) localid (set-add matches captures)))
|
||||
(call-with-sub
|
||||
localid linkid
|
||||
(lambda (sub)
|
||||
(define old-matches (subscription-matches sub))
|
||||
(define old-match-holders (hash-ref old-matches captures set))
|
||||
(define new-match-holders (set-add old-match-holders linkid))
|
||||
(define new-matches (hash-set old-matches captures new-match-holders))
|
||||
(store-sub! (struct-copy subscription sub [matches new-matches]))
|
||||
(match (set-count old-match-holders)
|
||||
[0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))]
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (Add peer-subid captures))))]
|
||||
[1 (for [(peer (in-set old-match-holders))] ;; only one, ≠ linkid
|
||||
(define peer-subid (hash-ref (subscription-holders sub) peer #f))
|
||||
(when peer-subid ;; the other holder may not itself subscribe!
|
||||
(send-to-link! peer (Add peer-subid captures))))]
|
||||
[_ (void)])))])]
|
||||
[(Del localid captures)
|
||||
(define matches (hash-ref (link-matches) localid set))
|
||||
(if (not (set-member? matches captures))
|
||||
(err! 'nonexistent-capture)
|
||||
(let ((new-matches (set-remove matches captures)))
|
||||
(link-matches (if (set-empty? new-matches)
|
||||
(hash-remove (link-matches) localid)
|
||||
(hash-set (link-matches) localid new-matches)))
|
||||
(remove-match! localid captures linkid)))]
|
||||
[(Msg localid captures)
|
||||
(call-with-sub
|
||||
localid linkid
|
||||
(lambda (sub)
|
||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (Msg peer-subid captures))))))]
|
||||
)))))))
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
(require "wire-protocol.rkt")
|
||||
(require "internal-protocol.rkt")
|
||||
(require racket/set)
|
||||
(require "turn.rkt")
|
||||
|
||||
(spawn #:name 'server-factory
|
||||
|
||||
|
@ -11,59 +11,67 @@
|
|||
;; decided is (locally) suggested, it is true that everything
|
||||
;; suggested is decided (in this implementation at least),
|
||||
;; and the following clauses reflect this:
|
||||
(during (server-proposal $scope $assertion)
|
||||
(assert (server-envelope scope assertion)))
|
||||
(on (asserted (server-proposal $scope $assertion))
|
||||
(assert! (server-envelope scope assertion)))
|
||||
(on (retracted (server-proposal $scope $assertion))
|
||||
(retract! (server-envelope scope assertion)))
|
||||
(on (message (server-proposal $scope $body))
|
||||
(send! (server-envelope scope body)))
|
||||
(during (observe (server-envelope $scope $spec))
|
||||
(assert (server-proposal scope (observe spec))))
|
||||
(on (asserted (observe (server-envelope $scope $spec)))
|
||||
(assert! (server-proposal scope (observe spec))))
|
||||
(on (retracted (observe (server-envelope $scope $spec)))
|
||||
(retract! (server-proposal scope (observe spec))))
|
||||
|
||||
(during/spawn (server-poa $id)
|
||||
(define root-facet (current-facet))
|
||||
(on-start
|
||||
(match (let-event [(message (message-poa->server id $p))] p)
|
||||
[(Connect scope) (react (connected id scope))]
|
||||
[_ (send-error! id 'connection-not-setup)]))))
|
||||
[(Connect scope) (react (connected id scope root-facet))]
|
||||
[_ (send! (message-server->poa id (Err 'connection-not-setup)))]))))
|
||||
|
||||
(define (send-error! id detail)
|
||||
(send! (message-server->poa id (Err detail))))
|
||||
(define (connected id scope root-facet)
|
||||
(define endpoints (hash))
|
||||
|
||||
(define turn (turn-recorder (lambda (items) (send! (message-server->poa id (Turn items))))))
|
||||
|
||||
(define (connected id scope)
|
||||
(define endpoints (set))
|
||||
(assert (server-active scope))
|
||||
|
||||
(define (send-error! detail)
|
||||
(send! (message-server->poa id (Err detail)))
|
||||
(reset-turn! turn)
|
||||
(stop-facet root-facet))
|
||||
|
||||
(on (message (message-poa->server id $p))
|
||||
(match p
|
||||
[(Assert ep a) #:when (not (set-member? endpoints ep))
|
||||
(set! endpoints (set-add endpoints ep))
|
||||
(react
|
||||
(on-stop (set! endpoints (set-remove endpoints ep)))
|
||||
[(Turn items)
|
||||
(for [(item (in-list items))]
|
||||
(match item
|
||||
[(Assert ep a)
|
||||
(if (hash-has-key? endpoints ep)
|
||||
(send-error! 'duplicate-endpoint)
|
||||
(react
|
||||
(define ep-facet (current-facet))
|
||||
(set! endpoints (hash-set endpoints ep ep-facet))
|
||||
(on-stop (set! endpoints (hash-remove endpoints ep)))
|
||||
|
||||
(field [assertion a])
|
||||
(assert (server-proposal scope a))
|
||||
|
||||
(assert (server-proposal scope (assertion)))
|
||||
|
||||
(let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs)))))))
|
||||
(add-observer-endpoint! (lambda ()
|
||||
(let ((a (assertion)))
|
||||
(when (observe? a)
|
||||
(server-envelope scope (observe-specification a)))))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg)))
|
||||
|
||||
(on (message (message-poa->server id (Assert ep $new-a)))
|
||||
(assertion new-a))
|
||||
|
||||
(stop-when (message (message-poa->server id (Clear ep)))))]
|
||||
[(Clear ep) #:when (set-member? endpoints ep)
|
||||
(void)] ;; handled by stop-when clause in facet established by Assert handler
|
||||
[(Message body)
|
||||
(send! (server-proposal scope body))]
|
||||
[other
|
||||
(unhandled-message id other)])))
|
||||
|
||||
(define (unhandled-message id p)
|
||||
(match p
|
||||
[(Connect _) (send-error! id 'duplicate-connection-setup)]
|
||||
[(Ping) (send! (message-server->poa id (Pong)))]
|
||||
[(Pong) (void)]
|
||||
[_ (send-error! id 'invalid-message)]))
|
||||
(when (observe? a)
|
||||
(define ((! ctor) cs) (extend-turn! turn (ctor ep cs)))
|
||||
(add-observer-endpoint!
|
||||
(lambda () (server-envelope scope (observe-specification a)))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg)))))]
|
||||
[(Clear ep)
|
||||
(match (hash-ref endpoints ep #f)
|
||||
[#f (send-error! 'unknown-endpoint)]
|
||||
[ep-facet (stop-facet ep-facet (extend-turn! turn (End ep)))])]
|
||||
[(Message body)
|
||||
(send! (server-proposal scope body))]))]
|
||||
[(Ping)
|
||||
(send! (message-server->poa id (Pong)))]
|
||||
[(Pong)
|
||||
(void)]
|
||||
[_
|
||||
(send-error! 'invalid-message)])))
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide turn-recorder
|
||||
extend-turn!
|
||||
commit-turn!
|
||||
reset-turn!)
|
||||
|
||||
(require (submod "../dataspace.rkt" priorities))
|
||||
|
||||
(define (extend-turn! t item) (t 'extend item))
|
||||
(define (commit-turn! t) (t 'commit))
|
||||
(define (reset-turn! t) (t 'reset))
|
||||
|
||||
(define (turn-recorder on-commit)
|
||||
(field [commit-needed #f])
|
||||
(define items '())
|
||||
(define t
|
||||
(match-lambda*
|
||||
[(list 'extend item)
|
||||
(set! items (cons item items))
|
||||
(commit-needed #t)]
|
||||
[(list 'commit)
|
||||
(when (commit-needed)
|
||||
(on-commit (reverse items))
|
||||
(reset-turn! t))]
|
||||
[(list 'reset)
|
||||
(set! items '())
|
||||
(commit-needed #f)]
|
||||
[(list 'debug)
|
||||
(reverse items)]))
|
||||
(begin/dataflow
|
||||
#:priority *idle-priority*
|
||||
(commit-turn! t))
|
||||
t)
|
|
@ -9,6 +9,9 @@
|
|||
;; Enrolment
|
||||
(message-struct Connect (scope)) ;; Client --> Server
|
||||
|
||||
;; Transactions
|
||||
(message-struct Turn (items)) ;; Bidirectional
|
||||
|
||||
;; Actions; Client --> Server (and Peer --> Peer, except for Message)
|
||||
(message-struct Assert (endpoint-name assertion))
|
||||
(message-struct Clear (endpoint-name))
|
||||
|
@ -18,6 +21,7 @@
|
|||
(message-struct Add (endpoint-name captures))
|
||||
(message-struct Del (endpoint-name captures))
|
||||
(message-struct Msg (endpoint-name captures))
|
||||
(message-struct End (endpoint-name))
|
||||
(message-struct Err (detail))
|
||||
|
||||
;; Transport-related; Bidirectional
|
||||
|
|
Loading…
Reference in New Issue