syndicate-rkt/syndicate/relay.rkt

327 lines
13 KiB
Racket

#lang racket/base
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide make-tunnel-relay
accept-bytes
accept-packet
encode/canonicalization
run-relay)
(require racket/match)
(require preserves)
(require "main.rkt")
(require (only-in "actor.rkt" current-turn))
(require "engine.rkt")
(require "rewrite.rkt")
(require "schemas/protocol.rkt")
(require (prefix-in sturdy: "schemas/sturdy.rkt"))
(define-logger syndicate/relay)
;;---------------------------------------------------------------------------
(struct inbound (local-handle imported))
(struct wire-symbol (oid ref [count #:mutable])
#:methods gen:custom-write
[(define (write-proc ws port mode)
(fprintf port "#<wire-symbol:~a/~a ~v>"
(wire-symbol-oid ws)
(wire-symbol-count ws)
(wire-symbol-ref ws)))])
(struct membrane (oid-map ref-map))
;; There are other kinds of relay. This one has exactly two participants connected to each other.
(struct tunnel-relay (facet
name
[read-buffer #:mutable]
packet-writer
inbound-assertions
outbound-assertions
exported-references
imported-references
[next-local-oid #:mutable]
[pending-turn-rev #:mutable]
)
#:transparent)
(struct relay-entity (relay oid) #:transparent)
;;---------------------------------------------------------------------------
(define *inert-ref*
(entity-ref *dead-facet*
(entity #:name '*inert-ref*)
'()))
;;---------------------------------------------------------------------------
(define (make-membrane)
(membrane (make-hash) (make-hasheq)))
(define (grab m getter key transient? f)
(let ((ws (hash-ref (getter m)
key
(lambda ()
(define ws (f))
(hash-set! (membrane-oid-map m) (wire-symbol-oid ws) ws)
(hash-set! (membrane-ref-map m) (wire-symbol-ref ws) ws)
ws))))
(when (not transient?) (set-wire-symbol-count! ws (+ (wire-symbol-count ws) 1)))
ws))
(define (drop m ws)
(set-wire-symbol-count! ws (- (wire-symbol-count ws) 1))
(when (zero? (wire-symbol-count ws))
(hash-remove! (membrane-oid-map m) (wire-symbol-oid ws))
(hash-remove! (membrane-ref-map m) (wire-symbol-ref ws))))
;;---------------------------------------------------------------------------
(define (make-tunnel-relay name packet-writer)
(tunnel-relay (turn-active-facet (current-turn))
name
#""
packet-writer
(make-hash)
(make-hash)
(make-membrane)
(make-membrane)
0
'()))
(define accept-bytes
(lambda (tr bs)
(turn! (tunnel-relay-facet tr)
(lambda ()
(define buffer (if (positive? (bytes-length (tunnel-relay-read-buffer tr)))
(bytes-append (tunnel-relay-read-buffer tr) bs)
bs))
(set-tunnel-relay-read-buffer! tr buffer)
(define p (open-input-bytes buffer))
(let read-more ()
(define start-pos (file-position p))
(match (read-preserve/binary p
#:read-syntax? #f
#:decode-embedded sturdy:parse-WireRef!
#:on-short (lambda () eof))
[(? eof-object?)
(when (positive? start-pos)
(set-tunnel-relay-read-buffer! tr (subbytes buffer start-pos)))]
[packet
(accept-packet tr packet)
(read-more)]))))))
(define (lookup-local tr local-oid)
(define ws (hash-ref (membrane-oid-map (tunnel-relay-exported-references tr)) local-oid #f))
(if ws (wire-symbol-ref ws) *inert-ref*))
(define (rewrite-in tr assertion)
(define imported '())
(define (save! ws) (set! imported (cons ws imported)))
(define rewritten ((map-embeddeds (lambda (r) (embedded (rewrite-ref-in tr r save!)))) assertion))
(values rewritten imported))
(define ref-attenuation-refinements (make-weak-hasheq))
(define (rewrite-ref-in tr wire-ref save!)
(match wire-ref
[(sturdy:WireRef-mine (sturdy:Oid oid))
(define ws (grab (tunnel-relay-imported-references tr)
membrane-oid-map
oid
#f
(lambda ()
(wire-symbol oid (turn-ref this-turn (make-relay-entity tr oid)) 0))))
(save! ws)
(wire-symbol-ref ws)]
[(sturdy:WireRef-yours (sturdy:Oid oid) attenuation)
(define r (lookup-local tr oid))
(if (or (null? attenuation) (eq? *inert-ref* r))
r
(hash-ref! (hash-ref! ref-attenuation-refinements r make-hash)
attenuation
(lambda () (apply attenuate-entity-ref r attenuation))))]))
(define (accept-packet tr packet)
(match (parse-Turn packet)
[(? eof-object?) (error 'accept-packet "Invalid IO.Turn")]
[(Turn wire-turn)
(log-syndicate/relay-debug "--> ~a" (preserve->string packet))
(for [(ev (in-list wire-turn))]
(match-define (TurnEvent (Oid oid) event) ev)
(define r (lookup-local tr oid))
(match event
[(Event-Assert (Assert (Assertion assertion) (Handle remote-handle)))
(define-values (a imported) (rewrite-in tr assertion))
(hash-set! (tunnel-relay-inbound-assertions tr)
remote-handle
(inbound (turn-assert! this-turn r a)
imported))]
[(Event-Retract (Retract (Handle remote-handle)))
(define i (hash-ref (tunnel-relay-inbound-assertions tr) remote-handle #f))
(when (not i) (error 'accept-packet "Peer retracted invalid handle ~a" remote-handle))
(hash-remove! (tunnel-relay-inbound-assertions tr) remote-handle)
(for [(ws (in-list (inbound-imported i)))]
(drop (tunnel-relay-imported-references tr) ws))
(turn-retract! this-turn (inbound-local-handle i))]
[(Event-Message (Message (Assertion body)))
(define-values (a imported) (rewrite-in tr body))
(when (not (null? imported))
(error 'accept-packet "Cannot receive transient reference"))
(turn-message! this-turn r a)]
[(Event-Sync (Sync peer))
(define imported '())
(define (save! ws) (set! imported (cons ws imported)))
(define k (rewrite-ref-in tr peer save!))
(turn-sync! this-turn
r
(lambda (_true)
(turn-message! this-turn k #t)
(for [(ws (in-list imported))]
(drop (tunnel-relay-imported-references tr) ws))))]))]))
(define (send-event tr oid event)
(when (null? (tunnel-relay-pending-turn-rev tr))
(queue-task! (actor-engine (facet-actor (tunnel-relay-facet tr)))
(lambda ()
(turn! (tunnel-relay-facet tr)
(lambda ()
(define pending (Turn (reverse (tunnel-relay-pending-turn-rev tr))))
(set-tunnel-relay-pending-turn-rev! tr '())
(log-syndicate/relay-debug "<-- ~a"
(preserve->string (->preserve pending)))
(parse-Turn! (->preserve pending))
((tunnel-relay-packet-writer tr) pending))))))
(set-tunnel-relay-pending-turn-rev! tr (cons (TurnEvent (Oid oid) event)
(tunnel-relay-pending-turn-rev tr))))
(define (rewrite-out tr assertion transient?)
(define exported '())
(define (save! ws) (set! exported (cons ws exported)))
(define rewritten
((map-embeddeds (lambda (r) (embedded (rewrite-ref-out tr r transient? save!)))) assertion))
(values rewritten exported))
(define (rewrite-ref-out* tr local-ref transient? save!)
(define ws (grab (tunnel-relay-exported-references tr)
membrane-ref-map
local-ref
transient?
(lambda ()
(when transient?
(error 'rewrite-ref-out* "Cannot send transient reference"))
(define oid (tunnel-relay-next-local-oid tr))
(set-tunnel-relay-next-local-oid! tr (+ oid 1))
(wire-symbol oid local-ref 0))))
(save! ws)
(sturdy:WireRef-mine (sturdy:Oid (wire-symbol-oid ws))))
(define (rewrite-ref-out tr local-ref transient? save!)
(define re (entity-data (entity-ref-target local-ref)))
(cond [(or (not (relay-entity? re)) (not (eq? (relay-entity-relay re) tr)))
(rewrite-ref-out* tr local-ref transient? save!)]
[(null? (entity-ref-attenuation local-ref))
(sturdy:WireRef-yours (sturdy:Oid (relay-entity-oid re)) '())]
[else
;; we may trust the peer to enforce attenuation on our
;; behalf, in which case we can return (sturdy:WireRef-yours
;; (relay-entity-oid re) (entity-ref-attenuation local-ref))
;; here, but for now we don't.
(rewrite-ref-out* tr local-ref transient? save!)]))
(define (release-ref-out tr ws)
(drop (tunnel-relay-exported-references tr) ws))
(define (register tr assertion maybe-handle)
(define-values (rewritten exported) (rewrite-out tr assertion (eq? maybe-handle #f)))
(when maybe-handle (hash-set! (tunnel-relay-outbound-assertions tr) maybe-handle exported))
rewritten)
(define (deregister tr handle)
(for [(ws (in-list (hash-ref (tunnel-relay-outbound-assertions tr) handle '())))]
(release-ref-out tr ws))
(hash-remove! (tunnel-relay-outbound-assertions tr) handle))
(define (make-relay-entity tr oid)
(entity #:name
(list (tunnel-relay-name tr) oid)
#:assert
(lambda (assertion handle)
(send-event tr oid (Event-Assert
(Assert (Assertion (register tr assertion handle))
(Handle handle)))))
#:retract
(lambda (handle)
(deregister tr handle)
(send-event tr oid (Event-Retract (Retract (Handle handle)))))
#:message
(lambda (body)
(send-event tr oid (Event-Message (Message (Assertion (register tr body #f))))))
#:sync
(lambda (peer)
(define exported #f)
(define (save! ws) (set! exported ws))
(define spe (sync-peer-entity tr oid peer (lambda () exported)))
(send-event tr oid (Event-Sync
(Sync (rewrite-ref-out tr (turn-ref this-turn spe) #f save!)))))
#:data
(relay-entity tr oid)))
(define (sync-peer-entity tr oid peer get-export)
(define handle-map (make-hash))
(entity #:name
(list (tunnel-relay-name tr) oid 'sync)
#:assert
(lambda (assertion handle)
(hash-set! handle-map handle (turn-assert! this-turn peer assertion)))
#:retract
(lambda (handle)
(turn-retract! this-turn (hash-ref handle-map handle))
(hash-remove! handle-map handle))
#:message
(lambda (body)
(release-ref-out tr (get-export))
(turn-message! this-turn peer body))
#:sync
(lambda (peer-k)
(turn-sync! this-turn peer peer-k))))
(define (encode/canonicalization v)
(preserve->bytes (->preserve v)
#:canonicalizing? #t
#:write-annotations? #f
#:encode-embedded ->preserve))
(define (run-relay #:packet-writer packet-writer
#:auto-encode? [auto-encode? #t]
#:setup-inputs setup-inputs
#:then [then #f]
#:name [name (gensym 'relay)]
#:initial-oid [initial-oid #f]
#:initial-ref [initial-ref #f])
(define tr (make-tunnel-relay name
(if auto-encode?
(lambda (pending)
(packet-writer (encode/canonicalization pending)))
packet-writer)))
(setup-inputs tr)
(when initial-ref (rewrite-ref-out tr
(if (procedure? initial-ref)
(initial-ref)
initial-ref)
#f
(lambda (_ws) (void))))
(when then
(turn-assert! this-turn
then
(and initial-oid
(embedded
(rewrite-ref-in tr
(sturdy:WireRef-mine
(sturdy:Oid initial-oid))
(lambda (_ws) (void))))))))