#lang racket/base ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021-2024 Tony Garnock-Jones (provide make-tunnel-relay accept-bytes accept-packet encode/canonicalization run-relay) (require racket/match) (require preserves) (require "main.rkt") (require (only-in "actor.rkt" current-turn)) (require "engine.rkt") (require "rewrite.rkt") (require "schemas/protocol.rkt") (require (prefix-in sturdy: "schemas/sturdy.rkt")) (define-logger syndicate/relay) ;;--------------------------------------------------------------------------- (struct inbound (local-handle imported)) (struct wire-symbol (oid ref [count #:mutable]) #:methods gen:custom-write [(define (write-proc ws port mode) (fprintf port "#" (wire-symbol-oid ws) (wire-symbol-count ws) (wire-symbol-ref ws)))]) (struct membrane (oid-map ref-map)) ;; There are other kinds of relay. This one has exactly two participants connected to each other. (struct tunnel-relay (facet name [read-buffer #:mutable] packet-writer inbound-assertions outbound-assertions exported-references imported-references [next-local-oid #:mutable] [pending-turn-rev #:mutable] ) #:transparent) (struct relay-entity (relay oid) #:transparent) ;;--------------------------------------------------------------------------- (define *inert-ref* (entity-ref *dead-facet* (entity #:name '*inert-ref*) '())) ;;--------------------------------------------------------------------------- (define (make-membrane) (membrane (make-hash) (make-hasheq))) (define (grab m getter key transient? f) (let ((ws (hash-ref (getter m) key (lambda () (define ws (f)) (hash-set! (membrane-oid-map m) (wire-symbol-oid ws) ws) (hash-set! (membrane-ref-map m) (wire-symbol-ref ws) ws) ws)))) (when (not transient?) (set-wire-symbol-count! ws (+ (wire-symbol-count ws) 1))) ws)) (define (drop m ws) (set-wire-symbol-count! ws (- (wire-symbol-count ws) 1)) (when (zero? (wire-symbol-count ws)) (hash-remove! (membrane-oid-map m) (wire-symbol-oid ws)) (hash-remove! (membrane-ref-map m) (wire-symbol-ref ws)))) ;;--------------------------------------------------------------------------- (define (make-tunnel-relay name packet-writer) (tunnel-relay (turn-active-facet (current-turn)) name #"" packet-writer (make-hash) (make-hash) (make-membrane) (make-membrane) 0 '())) (define accept-bytes (lambda (tr bs) (turn! (tunnel-relay-facet tr) (lambda () (define buffer (if (positive? (bytes-length (tunnel-relay-read-buffer tr))) (bytes-append (tunnel-relay-read-buffer tr) bs) bs)) (set-tunnel-relay-read-buffer! tr buffer) (define p (open-input-bytes buffer)) (let read-more () (define start-pos (file-position p)) (match (read-preserve/binary p #:read-syntax? #f #:decode-embedded sturdy:parse-WireRef! #:on-short (lambda () eof)) [(? eof-object?) (when (positive? start-pos) (set-tunnel-relay-read-buffer! tr (subbytes buffer start-pos)))] [packet (accept-packet tr packet) (read-more)])))))) (define (lookup-local tr local-oid) (define ws (hash-ref (membrane-oid-map (tunnel-relay-exported-references tr)) local-oid #f)) (if ws (wire-symbol-ref ws) *inert-ref*)) (define (rewrite-in tr assertion) (define imported '()) (define (save! ws) (set! imported (cons ws imported))) (define rewritten ((map-embeddeds (lambda (r) (embedded (rewrite-ref-in tr r save!)))) assertion)) (values rewritten imported)) (define ref-attenuation-refinements (make-weak-hasheq)) (define (rewrite-ref-in tr wire-ref save!) (match wire-ref [(sturdy:WireRef-mine (sturdy:Oid oid)) (define ws (grab (tunnel-relay-imported-references tr) membrane-oid-map oid #f (lambda () (wire-symbol oid (turn-ref this-turn (make-relay-entity tr oid)) 0)))) (save! ws) (wire-symbol-ref ws)] [(sturdy:WireRef-yours (sturdy:Oid oid) attenuation) (define r (lookup-local tr oid)) (if (or (null? attenuation) (eq? *inert-ref* r)) r (hash-ref! (hash-ref! ref-attenuation-refinements r make-hash) attenuation (lambda () (apply attenuate-entity-ref r attenuation))))])) (define (accept-packet tr packet) (match (parse-Turn packet) [(? eof-object?) (error 'accept-packet "Invalid IO.Turn")] [(Turn wire-turn) (log-syndicate/relay-debug "--> ~a" (preserve->string packet)) (for [(ev (in-list wire-turn))] (match-define (TurnEvent (Oid oid) event) ev) (define r (lookup-local tr oid)) (match event [(Event-Assert (Assert (Assertion assertion) (Handle remote-handle))) (define-values (a imported) (rewrite-in tr assertion)) (hash-set! (tunnel-relay-inbound-assertions tr) remote-handle (inbound (turn-assert! this-turn r a) imported))] [(Event-Retract (Retract (Handle remote-handle))) (define i (hash-ref (tunnel-relay-inbound-assertions tr) remote-handle #f)) (when (not i) (error 'accept-packet "Peer retracted invalid handle ~a" remote-handle)) (hash-remove! (tunnel-relay-inbound-assertions tr) remote-handle) (for [(ws (in-list (inbound-imported i)))] (drop (tunnel-relay-imported-references tr) ws)) (turn-retract! this-turn (inbound-local-handle i))] [(Event-Message (Message (Assertion body))) (define-values (a imported) (rewrite-in tr body)) (when (not (null? imported)) (error 'accept-packet "Cannot receive transient reference")) (turn-message! this-turn r a)] [(Event-Sync (Sync peer)) (define imported '()) (define (save! ws) (set! imported (cons ws imported))) (define k (rewrite-ref-in tr peer save!)) (turn-sync! this-turn r (lambda (_true) (turn-message! this-turn k #t) (for [(ws (in-list imported))] (drop (tunnel-relay-imported-references tr) ws))))]))])) (define (send-event tr oid event) (when (null? (tunnel-relay-pending-turn-rev tr)) (queue-task! (actor-engine (facet-actor (tunnel-relay-facet tr))) (lambda () (turn! (tunnel-relay-facet tr) (lambda () (define pending (Turn (reverse (tunnel-relay-pending-turn-rev tr)))) (set-tunnel-relay-pending-turn-rev! tr '()) (log-syndicate/relay-debug "<-- ~a" (preserve->string (->preserve pending))) (parse-Turn! (->preserve pending)) ((tunnel-relay-packet-writer tr) pending)))))) (set-tunnel-relay-pending-turn-rev! tr (cons (TurnEvent (Oid oid) event) (tunnel-relay-pending-turn-rev tr)))) (define (rewrite-out tr assertion transient?) (define exported '()) (define (save! ws) (set! exported (cons ws exported))) (define rewritten ((map-embeddeds (lambda (r) (embedded (rewrite-ref-out tr r transient? save!)))) assertion)) (values rewritten exported)) (define (rewrite-ref-out* tr local-ref transient? save!) (define ws (grab (tunnel-relay-exported-references tr) membrane-ref-map local-ref transient? (lambda () (when transient? (error 'rewrite-ref-out* "Cannot send transient reference")) (define oid (tunnel-relay-next-local-oid tr)) (set-tunnel-relay-next-local-oid! tr (+ oid 1)) (wire-symbol oid local-ref 0)))) (save! ws) (sturdy:WireRef-mine (sturdy:Oid (wire-symbol-oid ws)))) (define (rewrite-ref-out tr local-ref transient? save!) (define re (entity-data (entity-ref-target local-ref))) (cond [(or (not (relay-entity? re)) (not (eq? (relay-entity-relay re) tr))) (rewrite-ref-out* tr local-ref transient? save!)] [(null? (entity-ref-attenuation local-ref)) (sturdy:WireRef-yours (sturdy:Oid (relay-entity-oid re)) '())] [else ;; we may trust the peer to enforce attenuation on our ;; behalf, in which case we can return (sturdy:WireRef-yours ;; (relay-entity-oid re) (entity-ref-attenuation local-ref)) ;; here, but for now we don't. (rewrite-ref-out* tr local-ref transient? save!)])) (define (release-ref-out tr ws) (drop (tunnel-relay-exported-references tr) ws)) (define (register tr assertion maybe-handle) (define-values (rewritten exported) (rewrite-out tr assertion (eq? maybe-handle #f))) (when maybe-handle (hash-set! (tunnel-relay-outbound-assertions tr) maybe-handle exported)) rewritten) (define (deregister tr handle) (for [(ws (in-list (hash-ref (tunnel-relay-outbound-assertions tr) handle '())))] (release-ref-out tr ws)) (hash-remove! (tunnel-relay-outbound-assertions tr) handle)) (define (make-relay-entity tr oid) (entity #:name (list (tunnel-relay-name tr) oid) #:assert (lambda (assertion handle) (send-event tr oid (Event-Assert (Assert (Assertion (register tr assertion handle)) (Handle handle))))) #:retract (lambda (handle) (deregister tr handle) (send-event tr oid (Event-Retract (Retract (Handle handle))))) #:message (lambda (body) (send-event tr oid (Event-Message (Message (Assertion (register tr body #f)))))) #:sync (lambda (peer) (define exported #f) (define (save! ws) (set! exported ws)) (define spe (sync-peer-entity tr oid peer (lambda () exported))) (send-event tr oid (Event-Sync (Sync (rewrite-ref-out tr (turn-ref this-turn spe) #f save!))))) #:data (relay-entity tr oid))) (define (sync-peer-entity tr oid peer get-export) (define handle-map (make-hash)) (entity #:name (list (tunnel-relay-name tr) oid 'sync) #:assert (lambda (assertion handle) (hash-set! handle-map handle (turn-assert! this-turn peer assertion))) #:retract (lambda (handle) (turn-retract! this-turn (hash-ref handle-map handle)) (hash-remove! handle-map handle)) #:message (lambda (body) (release-ref-out tr (get-export)) (turn-message! this-turn peer body)) #:sync (lambda (peer-k) (turn-sync! this-turn peer peer-k)))) (define (encode/canonicalization v) (preserve->bytes (->preserve v) #:canonicalizing? #t #:write-annotations? #f #:encode-embedded ->preserve)) (define (run-relay #:packet-writer packet-writer #:auto-encode? [auto-encode? #t] #:setup-inputs setup-inputs #:then [then #f] #:name [name (gensym 'relay)] #:initial-oid [initial-oid #f] #:initial-ref [initial-ref #f]) (define tr (make-tunnel-relay name (if auto-encode? (lambda (pending) (packet-writer (encode/canonicalization pending))) packet-writer))) (setup-inputs tr) (when initial-ref (rewrite-ref-out tr (if (procedure? initial-ref) (initial-ref) initial-ref) #f (lambda (_ws) (void)))) (when then (turn-assert! this-turn then (and initial-oid (embedded (rewrite-ref-in tr (sturdy:WireRef-mine (sturdy:Oid initial-oid)) (lambda (_ws) (void))))))))