syndicate-rkt/syndicate/distributed/tcp.rkt

82 lines
3.2 KiB
Racket

#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide run-tcp-client-relay
run-tcp-server-relay)
(require (only-in file/sha1 bytes->hex-string))
(require syndicate/distributed/gatekeeper)
(require syndicate/drivers/tcp)
(require syndicate/relay)
(require syndicate/rewrite)
(require syndicate/sturdy)
(define-logger syndicate/distributed/tcp)
(define (run-tcp-client-relay ds
#:hostname hostname
#:port port
#:name [name (list 'tcp-client hostname port)]
#:import import-handler)
(define (on-error message)
(stop-current-facet
(log-syndicate/distributed/tcp-error "~a" message)))
(define active-source #f)
(define relay #f)
(establish-connection
ds (TcpRemote hostname port)
#:name (list name 'connection)
#:initial-mode (Mode-bytes)
#:on-connect (lambda (source sink)
(set! active-source source)
(run-relay #:packet-writer (lambda (bs) (send-data sink bs))
#:setup-inputs (lambda (tr) (set! relay tr))
#:then (ref (entity #:name (list name 'import-handler)
#:assert (lambda (a _h) (import-handler a))))
#:name (list name 'relay)
#:initial-oid 0))
#:on-rejected on-error
#:on-disconnect (lambda ()
(stop-current-facet
(log-syndicate/distributed/tcp-info "Disconnected")))
#:on-error on-error
#:on-data (lambda (data _mode)
(send-bytes-credit active-source (bytes-length data))
(accept-bytes relay data))))
(define (run-tcp-server-relay ds #:port port
#:hostname [hostname "0.0.0.0"]
#:name [name (lambda (source)
(list 'tcp-server hostname port source))]
#:export [initial-ref #f])
(define spec (TcpLocal hostname port))
(at ds
(stop-on (asserted (StreamListenerError spec _)))
(during/spawn (StreamConnection $source $sink spec)
#:name (name source)
(run-relay #:packet-writer (lambda (bs) (send-data sink bs))
#:setup-inputs
(lambda (tr)
(handle-connection source sink #:on-data (lambda (d _m) (accept-bytes tr d))))
#:initial-ref initial-ref))))
(module+ main
(standard-actor-system (ds)
(define ds-oid "syndicate")
(define ds-key (make-bytes KEY_LENGTH))
(at ds (assert (Bind (Description (SturdyStepType)
(SturdyDescriptionDetail ds-key ds-oid))
ds
(BindObserver-absent))))
(define root-cap (mint ds-oid ds-key))
(write-preserve/text (->preserve root-cap) #:indent 4 #:commas? #f)
(newline)
(displayln (bytes->hex-string (sturdy-encode (->preserve root-cap))))
(run-tcp-server-relay ds #:port 9001 #:export (make-gatekeeper ds))))