From 59f133a62f071cc2e19f8ca27e70b8d398e2ff6d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 19 Jan 2023 14:47:02 +0100 Subject: [PATCH] Refactor tcp relay support; add client support; add chat example --- syndicate-examples/chat.rkt | 44 +++++++++++++ syndicate/bin/syndicate-server-debug.rkt | 2 +- syndicate/bin/syndicate-server.rkt | 2 +- syndicate/distributed/gatekeeper.rkt | 28 +++++++++ syndicate/distributed/tcp-server.rkt | 48 --------------- syndicate/distributed/tcp.rkt | 78 ++++++++++++++++++++++++ 6 files changed, 152 insertions(+), 50 deletions(-) create mode 100644 syndicate-examples/chat.rkt create mode 100644 syndicate/distributed/gatekeeper.rkt delete mode 100644 syndicate/distributed/tcp-server.rkt create mode 100644 syndicate/distributed/tcp.rkt diff --git a/syndicate-examples/chat.rkt b/syndicate-examples/chat.rkt new file mode 100644 index 0000000..3058975 --- /dev/null +++ b/syndicate-examples/chat.rkt @@ -0,0 +1,44 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones + +(module+ main + (require syndicate/distributed/gatekeeper) + (require syndicate/distributed/tcp) + (require syndicate/driver-support) + (require syndicate/gensym) + (require syndicate/schemas/simpleChatProtocol) + (require syndicate/sturdy) + (require (only-in file/sha1 hex-string->bytes)) + + (define me (symbol->string (strong-gensym 'user))) + (define ref (SturdyRef "syndicate" '() (hex-string->bytes "a6480df5306611ddd0d3882b546e1977"))) + + (standard-actor-system (ds) + (define conn-facet this-facet) + + (define (on-connected remote-ds) + (on-stop (stop-facet conn-facet)) + + (linked-thread + #:name (list 'read-stdin) + (lambda (facet) + (let loop () + (match (read-line) + [(? eof-object?) (log-info "EOF on stdin.")] + [line (turn! facet (lambda () (send! remote-ds (Says me line)))) + (loop)])))) + + (at remote-ds + (assert (Present me)) + (during (Present $who) + (on-start (log-info "~a arrived" who)) + (on-stop (log-info "~a departed" who))) + (on (message (Says $who $what)) (log-info "~a says: ~v" who what)))) + + + (run-tcp-client-relay + ds + #:hostname "localhost" + #:port 8001 + #:import (lambda (v) (gatekeeper-resolve (embedded-value v) ref on-connected))))) diff --git a/syndicate/bin/syndicate-server-debug.rkt b/syndicate/bin/syndicate-server-debug.rkt index d555146..c95a9b0 100644 --- a/syndicate/bin/syndicate-server-debug.rkt +++ b/syndicate/bin/syndicate-server-debug.rkt @@ -6,6 +6,6 @@ (require racket/logging) (with-logging-to-port (current-error-port) (lambda () - (dynamic-require '(submod syndicate/distributed/tcp-server main) #f)) + (dynamic-require '(submod syndicate/distributed/tcp main) #f)) 'debug 'syndicate/relay )) diff --git a/syndicate/bin/syndicate-server.rkt b/syndicate/bin/syndicate-server.rkt index 787d99f..8aa48a7 100644 --- a/syndicate/bin/syndicate-server.rkt +++ b/syndicate/bin/syndicate-server.rkt @@ -3,4 +3,4 @@ #lang racket (module+ main - (dynamic-require '(submod syndicate/distributed/tcp-server main) #f)) + (dynamic-require '(submod syndicate/distributed/tcp main) #f)) diff --git a/syndicate/distributed/gatekeeper.rkt b/syndicate/distributed/gatekeeper.rkt new file mode 100644 index 0000000..2b3c489 --- /dev/null +++ b/syndicate/distributed/gatekeeper.rkt @@ -0,0 +1,28 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021-2023 Tony Garnock-Jones + +(provide make-gatekeeper + gatekeeper-resolve + (all-from-out syndicate/schemas/gatekeeper)) + +(require (only-in racket/list append-map)) +(require syndicate/rewrite) +(require syndicate/schemas/gatekeeper) +(require syndicate/sturdy) + +(define (make-gatekeeper ds #:name [name 'gatekeeper]) + (object #:name name + [(Resolve unvalidated-sturdyref observer) + (at ds + (during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target) + (define sturdyref (validate unvalidated-sturdyref key)) + (define attenuation + (append-map Attenuation-value (reverse (SturdyRef-caveatChain sturdyref)))) + (define attenuated-target (apply attenuate-entity-ref target attenuation)) + (at observer (assert (embedded attenuated-target)))))])) + +(define (gatekeeper-resolve gatekeeper ref k) + (at gatekeeper + (assert (Resolve ref (object #:name (list 'gatekeeper-resolve gatekeeper ref) + [(embedded a) (k a)]))))) diff --git a/syndicate/distributed/tcp-server.rkt b/syndicate/distributed/tcp-server.rkt deleted file mode 100644 index 44fa281..0000000 --- a/syndicate/distributed/tcp-server.rkt +++ /dev/null @@ -1,48 +0,0 @@ -#lang syndicate -;;; SPDX-License-Identifier: LGPL-3.0-or-later -;;; SPDX-FileCopyrightText: Copyright © 2021-2023 Tony Garnock-Jones - -(require (only-in sha bytes->hex-string)) - -(require (only-in racket/list append-map)) - -(require syndicate/relay) -(require syndicate/rewrite) -(require syndicate/sturdy) -(require syndicate/schemas/gatekeeper) -(require syndicate/sturdy) -(require syndicate/drivers/tcp) - -(module+ main - (standard-actor-system (ds) - (define ds-oid "syndicate") - (define ds-key (make-bytes KEY_LENGTH)) - (at ds (assert (Bind ds-oid ds-key ds))) - - (define root-cap (mint ds-oid ds-key)) - (write-preserve/text (->preserve root-cap) #:indent 4 #:commas? #f) - (newline) - (displayln (bytes->hex-string (sturdy-encode (->preserve root-cap)))) - - (define spec (TcpLocal "0.0.0.0" 8001)) - (at ds - (stop-on (asserted (StreamListenerError spec _))) - (during/spawn (StreamConnection $source $sink spec) - #:name (list 'tcp-server source) - (run-relay #:packet-writer (lambda (bs) (send-data sink bs)) - #:setup-inputs - (lambda (tr) - (handle-connection source sink - #:on-data (lambda (d _m) (accept-bytes tr d)))) - #:initial-ref - (object #:name 'gatekeeper - [(Resolve unvalidated-sturdyref observer) - (at ds - (during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target) - (define sturdyref (validate unvalidated-sturdyref key)) - (define attenuation - (append-map Attenuation-value - (reverse (SturdyRef-caveatChain sturdyref)))) - (define attenuated-target - (apply attenuate-entity-ref target attenuation)) - (at observer (assert (embedded attenuated-target)))))])))))) diff --git a/syndicate/distributed/tcp.rkt b/syndicate/distributed/tcp.rkt new file mode 100644 index 0000000..e150ec7 --- /dev/null +++ b/syndicate/distributed/tcp.rkt @@ -0,0 +1,78 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021-2023 Tony Garnock-Jones + +(provide run-tcp-client-relay + run-tcp-server-relay) + +(require (only-in sha bytes->hex-string)) +(require syndicate/distributed/gatekeeper) +(require syndicate/drivers/tcp) +(require syndicate/relay) +(require syndicate/rewrite) +(require syndicate/sturdy) + +(define-logger syndicate/distributed/tcp) + +(define (run-tcp-client-relay ds + #:hostname hostname + #:port port + #:name [name (list 'tcp-client hostname port)] + #:import import-handler) + + (define (on-error message) + (stop-current-facet + (log-syndicate/distributed/tcp-error "~a" message))) + + (define active-source #f) + (define relay #f) + + (establish-connection + ds (TcpRemote hostname port) + #:name (list name 'connection) + #:initial-mode (Mode-bytes) + #:on-connect (lambda (source sink) + (set! active-source source) + (run-relay #:packet-writer (lambda (bs) (send-data sink bs)) + #:setup-inputs (lambda (tr) (set! relay tr)) + #:then (ref (entity #:name (list name 'import-handler) + #:assert (lambda (a _h) (import-handler a)))) + #:name (list name 'relay) + #:initial-oid 0)) + #:on-rejected on-error + #:on-disconnect (lambda () + (stop-current-facet + (log-syndicate/distributed/tcp-info "Disconnected"))) + #:on-error on-error + #:on-data (lambda (data _mode) + (send-bytes-credit active-source (bytes-length data)) + (accept-bytes relay data)))) + +(define (run-tcp-server-relay ds #:port port + #:hostname [hostname "0.0.0.0"] + #:name [name (lambda (source) + (list 'tcp-server hostname port source))] + #:export [initial-ref #f]) + (define spec (TcpLocal hostname port)) + (at ds + (stop-on (asserted (StreamListenerError spec _))) + (during/spawn (StreamConnection $source $sink spec) + #:name (name source) + (run-relay #:packet-writer (lambda (bs) (send-data sink bs)) + #:setup-inputs + (lambda (tr) + (handle-connection source sink #:on-data (lambda (d _m) (accept-bytes tr d)))) + #:initial-ref initial-ref)))) + +(module+ main + (standard-actor-system (ds) + (define ds-oid "syndicate") + (define ds-key (make-bytes KEY_LENGTH)) + (at ds (assert (Bind ds-oid ds-key ds))) + + (define root-cap (mint ds-oid ds-key)) + (write-preserve/text (->preserve root-cap) #:indent 4 #:commas? #f) + (newline) + (displayln (bytes->hex-string (sturdy-encode (->preserve root-cap)))) + + (run-tcp-server-relay ds #:port 8001 #:export (make-gatekeeper ds))))