Refactor tcp relay support; add client support; add chat example

This commit is contained in:
Tony Garnock-Jones 2023-01-19 14:47:02 +01:00
parent 0d361dc065
commit 59f133a62f
6 changed files with 152 additions and 50 deletions

View File

@ -0,0 +1,44 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(module+ main
(require syndicate/distributed/gatekeeper)
(require syndicate/distributed/tcp)
(require syndicate/driver-support)
(require syndicate/gensym)
(require syndicate/schemas/simpleChatProtocol)
(require syndicate/sturdy)
(require (only-in file/sha1 hex-string->bytes))
(define me (symbol->string (strong-gensym 'user)))
(define ref (SturdyRef "syndicate" '() (hex-string->bytes "a6480df5306611ddd0d3882b546e1977")))
(standard-actor-system (ds)
(define conn-facet this-facet)
(define (on-connected remote-ds)
(on-stop (stop-facet conn-facet))
(linked-thread
#:name (list 'read-stdin)
(lambda (facet)
(let loop ()
(match (read-line)
[(? eof-object?) (log-info "EOF on stdin.")]
[line (turn! facet (lambda () (send! remote-ds (Says me line))))
(loop)]))))
(at remote-ds
(assert (Present me))
(during (Present $who)
(on-start (log-info "~a arrived" who))
(on-stop (log-info "~a departed" who)))
(on (message (Says $who $what)) (log-info "~a says: ~v" who what))))
(run-tcp-client-relay
ds
#:hostname "localhost"
#:port 8001
#:import (lambda (v) (gatekeeper-resolve (embedded-value v) ref on-connected)))))

View File

@ -6,6 +6,6 @@
(require racket/logging)
(with-logging-to-port (current-error-port)
(lambda ()
(dynamic-require '(submod syndicate/distributed/tcp-server main) #f))
(dynamic-require '(submod syndicate/distributed/tcp main) #f))
'debug 'syndicate/relay
))

View File

@ -3,4 +3,4 @@
#lang racket
(module+ main
(dynamic-require '(submod syndicate/distributed/tcp-server main) #f))
(dynamic-require '(submod syndicate/distributed/tcp main) #f))

View File

@ -0,0 +1,28 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide make-gatekeeper
gatekeeper-resolve
(all-from-out syndicate/schemas/gatekeeper))
(require (only-in racket/list append-map))
(require syndicate/rewrite)
(require syndicate/schemas/gatekeeper)
(require syndicate/sturdy)
(define (make-gatekeeper ds #:name [name 'gatekeeper])
(object #:name name
[(Resolve unvalidated-sturdyref observer)
(at ds
(during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target)
(define sturdyref (validate unvalidated-sturdyref key))
(define attenuation
(append-map Attenuation-value (reverse (SturdyRef-caveatChain sturdyref))))
(define attenuated-target (apply attenuate-entity-ref target attenuation))
(at observer (assert (embedded attenuated-target)))))]))
(define (gatekeeper-resolve gatekeeper ref k)
(at gatekeeper
(assert (Resolve ref (object #:name (list 'gatekeeper-resolve gatekeeper ref)
[(embedded a) (k a)])))))

View File

@ -1,48 +0,0 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(require (only-in sha bytes->hex-string))
(require (only-in racket/list append-map))
(require syndicate/relay)
(require syndicate/rewrite)
(require syndicate/sturdy)
(require syndicate/schemas/gatekeeper)
(require syndicate/sturdy)
(require syndicate/drivers/tcp)
(module+ main
(standard-actor-system (ds)
(define ds-oid "syndicate")
(define ds-key (make-bytes KEY_LENGTH))
(at ds (assert (Bind ds-oid ds-key ds)))
(define root-cap (mint ds-oid ds-key))
(write-preserve/text (->preserve root-cap) #:indent 4 #:commas? #f)
(newline)
(displayln (bytes->hex-string (sturdy-encode (->preserve root-cap))))
(define spec (TcpLocal "0.0.0.0" 8001))
(at ds
(stop-on (asserted (StreamListenerError spec _)))
(during/spawn (StreamConnection $source $sink spec)
#:name (list 'tcp-server source)
(run-relay #:packet-writer (lambda (bs) (send-data sink bs))
#:setup-inputs
(lambda (tr)
(handle-connection source sink
#:on-data (lambda (d _m) (accept-bytes tr d))))
#:initial-ref
(object #:name 'gatekeeper
[(Resolve unvalidated-sturdyref observer)
(at ds
(during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target)
(define sturdyref (validate unvalidated-sturdyref key))
(define attenuation
(append-map Attenuation-value
(reverse (SturdyRef-caveatChain sturdyref))))
(define attenuated-target
(apply attenuate-entity-ref target attenuation))
(at observer (assert (embedded attenuated-target)))))]))))))

View File

@ -0,0 +1,78 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide run-tcp-client-relay
run-tcp-server-relay)
(require (only-in sha bytes->hex-string))
(require syndicate/distributed/gatekeeper)
(require syndicate/drivers/tcp)
(require syndicate/relay)
(require syndicate/rewrite)
(require syndicate/sturdy)
(define-logger syndicate/distributed/tcp)
(define (run-tcp-client-relay ds
#:hostname hostname
#:port port
#:name [name (list 'tcp-client hostname port)]
#:import import-handler)
(define (on-error message)
(stop-current-facet
(log-syndicate/distributed/tcp-error "~a" message)))
(define active-source #f)
(define relay #f)
(establish-connection
ds (TcpRemote hostname port)
#:name (list name 'connection)
#:initial-mode (Mode-bytes)
#:on-connect (lambda (source sink)
(set! active-source source)
(run-relay #:packet-writer (lambda (bs) (send-data sink bs))
#:setup-inputs (lambda (tr) (set! relay tr))
#:then (ref (entity #:name (list name 'import-handler)
#:assert (lambda (a _h) (import-handler a))))
#:name (list name 'relay)
#:initial-oid 0))
#:on-rejected on-error
#:on-disconnect (lambda ()
(stop-current-facet
(log-syndicate/distributed/tcp-info "Disconnected")))
#:on-error on-error
#:on-data (lambda (data _mode)
(send-bytes-credit active-source (bytes-length data))
(accept-bytes relay data))))
(define (run-tcp-server-relay ds #:port port
#:hostname [hostname "0.0.0.0"]
#:name [name (lambda (source)
(list 'tcp-server hostname port source))]
#:export [initial-ref #f])
(define spec (TcpLocal hostname port))
(at ds
(stop-on (asserted (StreamListenerError spec _)))
(during/spawn (StreamConnection $source $sink spec)
#:name (name source)
(run-relay #:packet-writer (lambda (bs) (send-data sink bs))
#:setup-inputs
(lambda (tr)
(handle-connection source sink #:on-data (lambda (d _m) (accept-bytes tr d))))
#:initial-ref initial-ref))))
(module+ main
(standard-actor-system (ds)
(define ds-oid "syndicate")
(define ds-key (make-bytes KEY_LENGTH))
(at ds (assert (Bind ds-oid ds-key ds)))
(define root-cap (mint ds-oid ds-key))
(write-preserve/text (->preserve root-cap) #:indent 4 #:commas? #f)
(newline)
(displayln (bytes->hex-string (sturdy-encode (->preserve root-cap))))
(run-tcp-server-relay ds #:port 8001 #:export (make-gatekeeper ds))))