syndicate-rkt/syndicate/distributed/ports.rkt

44 lines
1.8 KiB
Racket

#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2022-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide run-port-relay)
(require syndicate/relay)
(require syndicate/driver-support)
(define-logger syndicate/distributed/ports)
(define (run-port-relay #:input-port [input-port (current-input-port)]
#:output-port [output-port (current-output-port)]
#:name [name (gensym 'stdio-relay)]
#:export [initial-ref #f]
#:import [import-handler #f])
(run-relay #:packet-writer
(lambda (bs)
(log-syndicate/distributed/ports-debug "OUT ~v" bs)
(write-bytes bs output-port)
(flush-output output-port))
#:setup-inputs
(lambda (tr)
(linked-thread
#:name (list name 'input)
(lambda (facet)
(define buffer (make-bytes 2048))
(let loop ()
(match (read-bytes-avail! buffer input-port)
[(? number? n)
(define bs (subbytes buffer 0 n))
(log-syndicate/distributed/ports-debug "IN ~v" bs)
(turn! facet (lambda () (accept-bytes tr bs)))
(loop)]
[(? eof-object?)
(turn! facet (stop-current-facet))])))))
#:then
(and import-handler
(ref (entity #:name (list name 'import-handler)
#:assert (lambda (a _h) (import-handler a)))))
#:name name
#:initial-oid (and import-handler 0)
#:initial-ref initial-ref))