79 lines
3.0 KiB
Racket
79 lines
3.0 KiB
Racket
|
#lang syndicate
|
||
|
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||
|
;;; SPDX-FileCopyrightText: Copyright © 2021-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||
|
|
||
|
(provide run-tcp-client-relay
|
||
|
run-tcp-server-relay)
|
||
|
|
||
|
(require (only-in sha bytes->hex-string))
|
||
|
(require syndicate/distributed/gatekeeper)
|
||
|
(require syndicate/drivers/tcp)
|
||
|
(require syndicate/relay)
|
||
|
(require syndicate/rewrite)
|
||
|
(require syndicate/sturdy)
|
||
|
|
||
|
(define-logger syndicate/distributed/tcp)
|
||
|
|
||
|
(define (run-tcp-client-relay ds
|
||
|
#:hostname hostname
|
||
|
#:port port
|
||
|
#:name [name (list 'tcp-client hostname port)]
|
||
|
#:import import-handler)
|
||
|
|
||
|
(define (on-error message)
|
||
|
(stop-current-facet
|
||
|
(log-syndicate/distributed/tcp-error "~a" message)))
|
||
|
|
||
|
(define active-source #f)
|
||
|
(define relay #f)
|
||
|
|
||
|
(establish-connection
|
||
|
ds (TcpRemote hostname port)
|
||
|
#:name (list name 'connection)
|
||
|
#:initial-mode (Mode-bytes)
|
||
|
#:on-connect (lambda (source sink)
|
||
|
(set! active-source source)
|
||
|
(run-relay #:packet-writer (lambda (bs) (send-data sink bs))
|
||
|
#:setup-inputs (lambda (tr) (set! relay tr))
|
||
|
#:then (ref (entity #:name (list name 'import-handler)
|
||
|
#:assert (lambda (a _h) (import-handler a))))
|
||
|
#:name (list name 'relay)
|
||
|
#:initial-oid 0))
|
||
|
#:on-rejected on-error
|
||
|
#:on-disconnect (lambda ()
|
||
|
(stop-current-facet
|
||
|
(log-syndicate/distributed/tcp-info "Disconnected")))
|
||
|
#:on-error on-error
|
||
|
#:on-data (lambda (data _mode)
|
||
|
(send-bytes-credit active-source (bytes-length data))
|
||
|
(accept-bytes relay data))))
|
||
|
|
||
|
(define (run-tcp-server-relay ds #:port port
|
||
|
#:hostname [hostname "0.0.0.0"]
|
||
|
#:name [name (lambda (source)
|
||
|
(list 'tcp-server hostname port source))]
|
||
|
#:export [initial-ref #f])
|
||
|
(define spec (TcpLocal hostname port))
|
||
|
(at ds
|
||
|
(stop-on (asserted (StreamListenerError spec _)))
|
||
|
(during/spawn (StreamConnection $source $sink spec)
|
||
|
#:name (name source)
|
||
|
(run-relay #:packet-writer (lambda (bs) (send-data sink bs))
|
||
|
#:setup-inputs
|
||
|
(lambda (tr)
|
||
|
(handle-connection source sink #:on-data (lambda (d _m) (accept-bytes tr d))))
|
||
|
#:initial-ref initial-ref))))
|
||
|
|
||
|
(module+ main
|
||
|
(standard-actor-system (ds)
|
||
|
(define ds-oid "syndicate")
|
||
|
(define ds-key (make-bytes KEY_LENGTH))
|
||
|
(at ds (assert (Bind ds-oid ds-key ds)))
|
||
|
|
||
|
(define root-cap (mint ds-oid ds-key))
|
||
|
(write-preserve/text (->preserve root-cap) #:indent 4 #:commas? #f)
|
||
|
(newline)
|
||
|
(displayln (bytes->hex-string (sturdy-encode (->preserve root-cap))))
|
||
|
|
||
|
(run-tcp-server-relay ds #:port 8001 #:export (make-gatekeeper ds))))
|