diff --git a/syndicate/distributed/tcp-server.rkt b/syndicate/distributed/tcp-server.rkt index 40a5d70..4f0c27c 100644 --- a/syndicate/distributed/tcp-server.rkt +++ b/syndicate/distributed/tcp-server.rkt @@ -4,7 +4,6 @@ (require (only-in sha bytes->hex-string)) -(require racket/tcp) (require (only-in racket/list append-map)) (require syndicate/relay) @@ -12,13 +11,7 @@ (require syndicate/sturdy) (require syndicate/schemas/gen/gatekeeper) (require syndicate/sturdy) -(require syndicate/driver-support) - -(define (read-bytes-avail input-port #:limit [limit 65536]) - (define buffer (make-bytes limit)) - (match (read-bytes-avail! buffer input-port) - [(? number? count) (subbytes buffer 0 count)] - [other other])) +(require syndicate/drivers/tcp) (module+ main (actor-system/dataspace (ds) @@ -31,59 +24,32 @@ (newline) (displayln (bytes->hex-string (sturdy-encode (->preserve root-cap)))) - (define spawn-connection - (action (connection-custodian i o) - (define name-base (call-with-values (lambda () (tcp-addresses i #t)) list)) - (spawn-relay - this-turn - #:name name-base - #:packet-writer (lambda (bs) - (write-bytes bs o) - (flush-output o)) - #:setup-inputs (action (tr) - - (on-stop (close-input-port i) - (close-output-port o)) - - (linked-thread - #:name (cons 'input-thread name-base) - #:custodian connection-custodian - this-turn - (ref (entity #:name (cons 'socket-monitor name-base) - #:retract (action (_handle) (stop-current-facet)))) - (lambda () - (let loop () - (define bs (read-bytes-avail i)) - (when (bytes? bs) - (accept-bytes tr bs) - (loop)))))) - #:initial-ref - (action () - (ref (during* #:name (cons 'gatekeeper name-base) - (action (assertion) - (match (parse-Resolve assertion) - [(? eof-object?) (void)] - [(Resolve unvalidated-sturdyref observer) - (at ds - (during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target) - (define sturdyref (validate unvalidated-sturdyref key)) - (define attenuation - (append-map Attenuation-value (reverse (SturdyRef-caveatChain sturdyref)))) - (define attenuated-target - (apply attenuate-entity-ref target attenuation)) - (at observer (assert (embedded attenuated-target)))))])))))))) - - (spawn - #:name 'tcp-server - (linked-thread - #:name 'tcp-server - this-turn - (ref (entity #:name 'listen-monitor #:retract (action (_handle) (stop-current-facet)))) - (lambda () - (define listener (tcp-listen 5999 512 #t "0.0.0.0")) - (let loop () - (define connection-custodian (make-custodian)) - (define-values (i o) (parameterize ((current-custodian connection-custodian)) - (tcp-accept listener))) - (turn-freshen this-turn (action () (spawn-connection this-turn connection-custodian i o))) - (loop))))))) + (spawn-tcp-driver this-turn ds) + (spawn #:name 'tcp-server + (at ds + (during/spawn (Connection $conn (TcpInbound "0.0.0.0" 5999)) + (define gatekeeper + (ref + (during* #:name (list conn 'gatekeeper) + (action (assertion) + (match (parse-Resolve assertion) + [(? eof-object?) (void)] + [(Resolve unvalidated-sturdyref observer) + (at ds + (during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target) + (define sturdyref (validate unvalidated-sturdyref key)) + (define attenuation + (append-map Attenuation-value + (reverse (SturdyRef-caveatChain sturdyref)))) + (define attenuated-target + (apply attenuate-entity-ref target attenuation)) + (at observer (assert (embedded attenuated-target)))))]))))) + ((run-relay #:name conn + #:packet-writer (action (bs) (send-data this-turn conn bs)) + #:setup-inputs + (action (tr) + (accept-connection this-turn conn + #:on-data (action (bs) (accept-bytes tr bs)))) + #:initial-ref + (action () gatekeeper)) + this-turn)))))) diff --git a/syndicate/relay.rkt b/syndicate/relay.rkt index b245505..d6dcf0a 100644 --- a/syndicate/relay.rkt +++ b/syndicate/relay.rkt @@ -4,7 +4,7 @@ (provide make-tunnel-relay accept-bytes - spawn-relay) + run-relay) (require racket/match) (require preserves) @@ -77,18 +77,16 @@ ;;--------------------------------------------------------------------------- (define (make-tunnel-relay turn name packet-writer) - (define tr - (tunnel-relay (turn-active-facet turn) - name - #"" - packet-writer - (make-hash) - (make-hash) - (make-membrane) - (make-membrane) - 0 - '())) - tr) + (tunnel-relay (turn-active-facet turn) + name + #"" + packet-writer + (make-hash) + (make-hash) + (make-membrane) + (make-membrane) + 0 + '())) (define accept-bytes (lambda (tr bs) @@ -198,6 +196,7 @@ (log-info "OUT (raw): ~v" (->preserve pending)) (parse-Turn! (->preserve pending)) ((tunnel-relay-packet-writer tr) + this-turn (preserve->bytes (->preserve pending) #:canonicalizing? #t #:write-annotations? #f @@ -294,32 +293,28 @@ (action (peer-k) (turn-sync! this-turn peer peer-k)))) -(define (spawn-relay turn - #:packet-writer packet-writer - #:setup-inputs setup-inputs - #:then [then #f] - #:name [name (gensym 'relay)] - #:initial-oid [initial-oid #f] - #:initial-ref [initial-ref #f]) - (turn-spawn! #:name name - turn - (action () - (define tr (make-tunnel-relay this-turn name packet-writer)) - (setup-inputs this-turn tr) - (when initial-ref - (rewrite-ref-out tr - (if (procedure? initial-ref) - (initial-ref this-turn) - initial-ref) - #f - (lambda (_ws) (void)))) - (when then - (turn-assert! this-turn - then - (and initial-oid - (embedded - (rewrite-ref-in this-turn - tr - (sturdy:WireRef-mine - (sturdy:Oid initial-oid)) - (lambda (_ws) (void)))))))))) +(define (run-relay #:packet-writer packet-writer + #:setup-inputs setup-inputs + #:then [then #f] + #:name [name (gensym 'relay)] + #:initial-oid [initial-oid #f] + #:initial-ref [initial-ref #f]) + (action () + (define tr (make-tunnel-relay this-turn name packet-writer)) + (setup-inputs this-turn tr) + (when initial-ref (rewrite-ref-out tr + (if (procedure? initial-ref) + (initial-ref this-turn) + initial-ref) + #f + (lambda (_ws) (void)))) + (when then + (turn-assert! this-turn + then + (and initial-oid + (embedded + (rewrite-ref-in this-turn + tr + (sturdy:WireRef-mine + (sturdy:Oid initial-oid)) + (lambda (_ws) (void)))))))))