From 468b6541f9c36f964b999bee95b901b6645b03c7 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 8 Jun 2021 09:33:56 +0200 Subject: [PATCH] Yesterday's work on relay + TCP distribution --- syndicate/distributed/tcp-server.rkt | 85 ++++++++ syndicate/driver-support.rkt | 40 ++++ syndicate/relay.rkt | 300 +++++++++++++++++++++++++++ 3 files changed, 425 insertions(+) create mode 100644 syndicate/distributed/tcp-server.rkt create mode 100644 syndicate/driver-support.rkt create mode 100644 syndicate/relay.rkt diff --git a/syndicate/distributed/tcp-server.rkt b/syndicate/distributed/tcp-server.rkt new file mode 100644 index 0000000..e0b033f --- /dev/null +++ b/syndicate/distributed/tcp-server.rkt @@ -0,0 +1,85 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(require (only-in sha bytes->hex-string)) + +(require racket/tcp) +(require (only-in racket/list append-map)) + +(require syndicate/relay) +(require syndicate/sturdy) +(require syndicate/schemas/gen/gatekeeper) +(require syndicate/sturdy) +(require syndicate/driver-support) + +(define (read-bytes-avail input-port #:limit [limit 65536]) + (define buffer (make-bytes limit)) + (match (read-bytes-avail! buffer input-port) + [(? number? count) (subbytes buffer 0 count)] + [other other])) + +(module+ main + (actor-system/dataspace (ds) + (define ds-oid "syndicate") + (define ds-key (make-bytes KEY_LENGTH)) + (at ds (assert (Bind ds-oid ds-key ds))) + + (define root-cap (mint ds-oid ds-key)) + (write-preserve/text (SturdyRef->preserves root-cap) #:indent 4 #:commas? #f) + (newline) + (displayln (bytes->hex-string (sturdy-encode (SturdyRef->preserves root-cap)))) + + (define spawn-connection + (action (connection-custodian i o) + (define name-base (call-with-values (lambda () (tcp-addresses i #t)) list)) + (spawn-relay + this-turn + #:name name-base + #:packet-writer (lambda (bs) (write-bytes bs o)) + #:setup-inputs (action (tr) + + (on-stop (close-input-port i) + (close-output-port o)) + + (linked-thread + #:name (cons 'input-thread name-base) + #:custodian connection-custodian + this-turn + (ref (entity #:name (cons 'socket-monitor name-base) + #:retract (action (_handle) (stop-current-facet)))) + (lambda () + (let loop () + (define bs (read-bytes-avail i)) + (when (bytes? bs) + (accept-bytes tr bs) + (loop)))))) + #:initial-ref + (ref (during* #:name (cons 'gatekeeper name-base) + (action (assertion) + (match (parse-Resolve assertion) + [(? eof-object?) (void)] + [(Resolve unvalidated-sturdyref observer) + (at ds + (during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target) + (define sturdyref (validate unvalidated-sturdyref key)) + (define attenuation + (append-map values (reverse (SturdyRef-caveatChain sturdyref)))) + (define attenuated-target + (apply attenuate-entity-ref target attenuation)) + (at observer (assert (embedded attenuated-target)))))]))))))) + + (spawn + #:name 'tcp-server + (linked-thread + #:name 'tcp-server + this-turn + (ref (entity #:name 'listen-monitor #:retract (action (_handle) (stop-current-facet)))) + (lambda () + (define listener (tcp-listen 5999 512 #t "0.0.0.0")) + (let loop () + (define connection-custodian (make-custodian)) + (define-values (i o) (parameterize ((current-custodian connection-custodian)) + (tcp-accept listener))) + (turn-freshen this-turn (action () (spawn-connection this-turn connection-custodian i o))) + (loop))))))) diff --git a/syndicate/driver-support.rkt b/syndicate/driver-support.rkt new file mode 100644 index 0000000..4435204 --- /dev/null +++ b/syndicate/driver-support.rkt @@ -0,0 +1,40 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(provide linked-thread) + +(require "engine.rkt") + +(define-logger syndicate/driver-support) + +(define linked-thread + (action (peer + thunk + #:name [name (gensym 'linked-thread)] + #:custodian [c (make-custodian)]) + (define handle #f) + + (define armed? #t) + + (define ! + (action () + (when armed? + (set! armed? #f) + (log-syndicate/driver-support-info "~a shutdown" name) + (turn-retract! this-turn handle) + (queue-task! (actor-engine this-actor) (lambda () (custodian-shutdown-all c))) + (actor-remove-exit-hook! this-actor !)))) + + (on-stop (! this-turn)) + (actor-add-exit-hook! this-actor !) + + (log-syndicate/driver-support-info "~a startup" name) + (set! handle + (parameterize ((current-custodian c)) + (turn-assert! this-turn + peer + (embedded + (thread (lambda () + (with-handlers ([(lambda (_e) #t) (lambda (_e) (void))]) (thunk)) + (turn-freshen this-turn !))))))))) diff --git a/syndicate/relay.rkt b/syndicate/relay.rkt new file mode 100644 index 0000000..b37b5e6 --- /dev/null +++ b/syndicate/relay.rkt @@ -0,0 +1,300 @@ +#lang racket/base +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(provide make-tunnel-relay + accept-bytes + spawn-relay) + +(require racket/match) +(require preserves) + +(require "main.rkt") +(require "engine.rkt") +(require "schemas/gen/protocol.rkt") +(require (prefix-in sturdy: "schemas/gen/sturdy.rkt")) + +;;--------------------------------------------------------------------------- + +(struct inbound (local-handle imported)) + +(struct wire-symbol (oid ref [count #:mutable])) + +(struct membrane (oid-map ref-map)) + +;; There are other kinds of relay. This one has exactly two participants connected to each other. +(struct tunnel-relay (facet + name + [read-buffer #:mutable] + packet-writer + inbound-assertions + outbound-assertions + exported-references + imported-references + [next-local-oid #:mutable] + [pending-turn-rev #:mutable] + ) + #:transparent) + +(struct relay-entity (relay oid) #:transparent) + +;;--------------------------------------------------------------------------- + +(define *inert-ref* + (entity-ref *dead-facet* + (entity #:name '*inert-ref*) + '())) + +;;--------------------------------------------------------------------------- + +(define (make-membrane) + (membrane (make-hash) (make-hasheq))) + +(define (grab m getter key transient? f) + (let ((ws (hash-ref (getter m) + key + (lambda () + (define ws (f)) + (hash-set! (membrane-oid-map m) (wire-symbol-oid ws) ws) + (hash-set! (membrane-ref-map m) (wire-symbol-ref ws) ws) + ws)))) + (when (not transient?) (set-wire-symbol-count! ws (+ (wire-symbol-count ws) 1))) + ws)) + +(define (drop m ws) + (set-wire-symbol-count! ws (- (wire-symbol-count ws) 1)) + (when (zero? (wire-symbol-count ws)) + (hash-remove! (membrane-oid-map m) (wire-symbol-oid ws)) + (hash-remove! (membrane-ref-map m) (wire-symbol-ref ws)))) + +;;--------------------------------------------------------------------------- + +(define (make-tunnel-relay turn name packet-writer) + (define tr + (tunnel-relay (turn-active-facet turn) + name + #"" + packet-writer + (make-hash) + (make-hash) + (make-membrane) + (make-membrane) + 0 + '())) + tr) + +(define accept-bytes + (lambda (tr bs) + (turn! (tunnel-relay-facet tr) + (action () + (define buffer (if (positive? (bytes-length (tunnel-relay-read-buffer tr))) + (bytes-append (tunnel-relay-read-buffer tr) bs) + bs)) + (set-tunnel-relay-read-buffer! tr buffer) + (define p (open-input-bytes buffer)) + (let read-more () + (define start-pos (file-position p)) + (match (read-preserve/binary p + #:read-syntax? #f + #:decode-embedded decode-embedded:protocol + #:on-short (lambda () eof)) + [(? eof-object?) + (when (positive? start-pos) + (set-tunnel-relay-read-buffer! tr (subbytes buffer start-pos)))] + [packet + (handle-packet this-turn tr packet) + (read-more)])))))) + +(define (lookup-local tr local-oid) + (define ws (hash-ref (membrane-oid-map (tunnel-relay-exported-references tr)) local-oid #f)) + (if ws (wire-symbol-ref ws) *inert-ref*)) + +(define rewrite-in + (action (tr assertion) + (define imported '()) + (define (save! ws) (set! imported (cons ws imported))) + (define rewritten + ((map-embeddeds (lambda (r) (embedded (rewrite-ref-in this-turn tr r save!)))) assertion)) + (values rewritten imported))) + +(define ref-attenuation-refinements (make-weak-hasheq)) + +(define rewrite-ref-in + (action (tr wire-ref save!) + (match wire-ref + [(sturdy:WireRef-mine oid) + (define ws (grab (tunnel-relay-imported-references tr) + membrane-oid-map + oid + #f + (lambda () + (wire-symbol oid (turn-ref this-turn (make-relay-entity tr oid)) 0)))) + (save! ws) + (wire-symbol-ref ws)] + [(sturdy:WireRef-yours oid attenuation) + (define r (lookup-local tr oid)) + (if (or (null? attenuation) (eq? *inert-ref* r)) + r + (hash-ref! (hash-ref! ref-attenuation-refinements r make-hash) + attenuation + (lambda () (apply attenuate-entity-ref r attenuation))))]))) + +(define handle-packet + (action (tr packet) + (match (parse-Turn packet) + [(? eof-object?) (error 'handle-packet "Invalid IO.Turn")] + [wire-turn + (for [(ev (in-list wire-turn))] + (match-define (TurnEvent oid event) ev) + (define r (lookup-local tr oid)) + (match event + [(Event-Assert (Assert assertion remote-handle)) + (define-values (a imported) (rewrite-in this-turn tr assertion)) + (hash-set! (tunnel-relay-inbound-assertions tr) + remote-handle + (inbound (turn-assert! this-turn r a) + imported))] + [(Event-Retract (Retract remote-handle)) + (define i (hash-ref (tunnel-relay-inbound-assertions tr) remote-handle #f)) + (when (not i) (error 'handle-packet "Peer retracted invalid handle ~a" remote-handle)) + (hash-remove! (tunnel-relay-inbound-assertions tr) remote-handle) + (for [(ws (in-list (inbound-imported i)))] + (drop (tunnel-relay-imported-references tr) ws)) + (turn-retract! this-turn (inbound-local-handle i))] + [(Event-Message (Message body)) + (define-values (a imported) (rewrite-in this-turn tr body)) + (when (not (null? imported)) + (error 'handle-packet "Cannot receive transient reference")) + (turn-message! this-turn r a)] + [(Event-Sync (Sync peer)) + (define imported '()) + (define (save! ws) (set! imported (cons ws imported))) + (define k (rewrite-ref-in this-turn tr peer save!)) + (turn-sync! this-turn + r + (action (_true) + (turn-message! this-turn k #t) + (for [(ws (in-list imported))] + (drop (tunnel-relay-imported-references tr) ws))))]))]))) + +(define (send-event tr event) + (when (null? (tunnel-relay-pending-turn-rev tr)) + (queue-task! (actor-engine (facet-actor (tunnel-relay-facet tr))) + (lambda () + (define pending (reverse (tunnel-relay-pending-turn-rev tr))) + (set-tunnel-relay-pending-turn-rev! tr '()) + ((tunnel-relay-packet-writer tr) + (preserve->bytes (Turn->preserves pending) + #:canonicalizing? #t + #:write-annotations? #f + #:encode-embedded encode-embedded:protocol))))) + (set-tunnel-relay-pending-turn-rev! tr (cons event (tunnel-relay-pending-turn-rev tr)))) + +(define (rewrite-out tr assertion transient?) + (define exported '()) + (define (save! ws) (set! exported (cons ws exported))) + (define rewritten + ((map-embeddeds (lambda (r) (embedded (rewrite-ref-out tr r transient? save!)))) assertion)) + (values rewritten exported)) + +(define (rewrite-ref-out* tr local-ref transient? save!) + (define ws (grab (tunnel-relay-exported-references tr) + membrane-ref-map + local-ref + transient? + (lambda () + (define oid (tunnel-relay-next-local-oid tr)) + (set-tunnel-relay-next-local-oid! tr (+ oid 1)) + (wire-symbol oid local-ref 0)))) + (save! ws) + (sturdy:WireRef-mine (wire-symbol-oid ws))) + +(define (rewrite-ref-out tr local-ref transient? save!) + (define re (entity-data (entity-ref-target local-ref))) + (cond [(or (not (relay-entity? re)) (not (eq? (relay-entity-relay re) tr))) + (rewrite-ref-out* tr local-ref transient? save!)] + [(null? (entity-ref-attenuation local-ref)) + (sturdy:WireRef-yours (relay-entity-oid re) '())] + [else + ;; we may trust the peer to enforce attenuation on our + ;; behalf, in which case we can return (sturdy:WireRef-yours + ;; (relay-entity-oid re) (entity-ref-attenuation local-ref)) + ;; here, but for now we don't. + (rewrite-ref-out* tr local-ref transient? save!)])) + +(define (release-ref-out tr ws) + (drop (tunnel-relay-exported-references tr) ws)) + +(define (register tr assertion maybe-handle) + (define-values (rewritten exported) (rewrite-out tr assertion (eq? maybe-handle #f))) + (when maybe-handle (hash-set! (tunnel-relay-outbound-assertions tr) maybe-handle exported)) + rewritten) + +(define (deregister tr handle) + (for [(ws (in-list (hash-ref (tunnel-relay-outbound-assertions tr) handle '())))] + (release-ref-out tr ws)) + (hash-remove! (tunnel-relay-outbound-assertions tr) handle)) + +(define (make-relay-entity tr oid) + (entity #:name (list (tunnel-relay-name tr) oid) + #:assert (action (assertion handle) + (send-event tr (Event-Assert (Assert (register tr assertion handle) handle)))) + #:retract (action (handle) + (deregister tr handle) + (send-event tr (Event-Retract (Retract handle)))) + #:message (action (body) + (send-event tr (Event-Message (Message (register tr body #f))))) + #:sync (action (peer) + (define exported #f) + (define (save! ws) (set! exported ws)) + (define spe (sync-peer-entity tr oid peer (lambda () exported))) + (send-event tr + (Event-Sync + (Sync (embedded + (rewrite-ref-out tr (turn-ref this-turn spe) #f save!)))))) + #:data (relay-entity tr oid))) + +(define (sync-peer-entity tr oid peer get-export) + (define handle-map (make-hash)) + (entity #:name (list (tunnel-relay-name tr) oid 'sync) + #:assert (action (assertion handle) + (hash-set! handle-map handle (turn-assert! this-turn peer assertion))) + #:retract (action (handle) + (turn-retract! this-turn (hash-ref handle-map handle)) + (hash-remove! handle-map handle)) + #:message (action (body) + (release-ref-out tr (get-export)) + (turn-message! this-turn peer body)) + #:sync (action (peer-k) + (turn-sync! this-turn peer peer-k)))) + +(define (spawn-relay turn + #:packet-writer packet-writer + #:setup-inputs setup-inputs + #:then [then #f] + #:name [name (gensym 'relay)] + #:initial-oid [initial-oid #f] + #:initial-ref [initial-ref #f]) + (turn-spawn! #:name name + turn + (action () + (define tr (make-tunnel-relay this-turn name packet-writer)) + (setup-inputs this-turn tr) + (when initial-ref + (rewrite-ref-out tr initial-ref #f (lambda (_ws) (void)))) + (when then + (turn-assert! this-turn + then + (and initial-oid + (embedded (rewrite-ref-in this-turn + tr + (sturdy:WireRef-mine initial-oid) + (lambda (_ws) (void)))))))))) + +(define-syntax-rule (D v0) + (let ((v v0)) + (log-info "~a: ~v" 'v0 v) + v)) + +(require racket/trace) +(trace rewrite-out)