#lang racket/base (require racket/port) (require racket/class) (require racket/match) (require racket/tcp) (require "conversation.rkt") (require "standard-thread.rkt") (provide (struct-out tcp-option) (struct-out tcp-credit) (struct-out tcp-data) (struct-out tcp-eof) (struct-out tcp-address) (struct-out tcp-stream) flip-stream wild-address wild-stream tcp-driver) (struct tcp-option (name value) #:prefab) (struct tcp-credit (amount) #:prefab) (struct tcp-data (chunk) #:prefab) (struct tcp-eof () #:prefab) ;; The address of a TCP endpoint. ;; ;; The field host may be set to #f, in which case it means "some (or ;; all) local interface(s)". If port is a number, it's treated as a ;; TCP port directly; otherwise, it's treated as a local handle for an ;; automatically-allocated port. (struct tcp-address (host port) #:prefab) ;; Identifies a (unidirectional) TCP stream. A single TCP connection ;; has two of these, one in each direction. (struct tcp-stream (source sink) #:prefab) (define (socket-name s) (call-with-values (lambda () (tcp-addresses s #t)) list)) (define (flip-stream s) (tcp-stream (tcp-stream-sink s) (tcp-stream-source s))) (define (wild-address) (tcp-address (wild) (wild))) (define (wild-stream) (tcp-stream (wild-address) (wild-address))) (define (tcp-driver room) (standard-thread (lambda () (define handle (join-room room 'DRIVER)) (send handle assert! (topic-publisher (wild-stream) #:virtual? #t)) (let loop () (match (send handle listen) [(arrived (topic 'subscriber (tcp-stream (tcp-address (? wild?) (? wild?)) (and local-address (tcp-address #f local-handle))) _)) ;;(write `(listening on ,local-handle)) (newline) (define listener (tcp-listen (if (number? local-handle) local-handle 0) 4 #t)) (standard-thread (lambda () (tcp-listen-driver room local-address listener))) (loop)] [(arrived (topic 'subscriber (and name (tcp-stream (tcp-address remote-host remote-port) (tcp-address #f local-handle))) _)) (when (not (number? local-handle)) ;;(write `(connecting to ,remote-host ,remote-port with handle ,local-handle)) (newline) (define-values (i o) (tcp-connect remote-host remote-port #f (if (number? local-handle) local-handle #f))) (standard-thread (lambda () (tcp-read-driver room name i))) (standard-thread (lambda () (tcp-write-driver room (flip-stream name) o)))) (loop)] [_ (loop)]))))) (define (tcp-read-driver room name i) (define handle (join-room room 'READER)) (send handle assert! (topic-publisher name)) (let loop ((credit 0) (mode 'line)) (sync (wrap-evt (send handle listen-evt) (lambda (m) (match m [(departed (topic 'subscriber (== name) _) _) ;;(write `(closing reader ,name)) (newline) (tcp-abandon-port i)] [(says (topic 'subscriber (== name) _) (tcp-option 'mode new-mode)) (loop credit new-mode)] [(says (topic 'subscriber (== name) _) (tcp-credit amount)) (loop (+ credit amount) mode)] [_ (loop credit mode)]))) (if (positive? credit) (match mode ['line (handle-evt (read-line-evt i 'any) (lambda (v) (cond [(eof-object? v) (send handle say (topic-publisher name) (tcp-eof)) (loop 0 mode)] [else ;;(write `(relaying line ,v from ,name)) (newline) (send handle say (topic-publisher name) (tcp-data v)) (loop (- credit 1) mode)])))] ['bytes (handle-evt (read-bytes-evt credit i) (lambda (v) (cond [(eof-object? v) (send handle say (topic-publisher name) (tcp-eof)) (loop 0 mode)] [else ;;(write `(relaying bytes ,v from ,name)) (newline) (send handle say (topic-publisher name) (tcp-data v)) (loop (- credit (bytes-length v)) mode)])))]) never-evt)))) (define (tcp-write-driver room name o) (define handle (join-room room 'WRITER)) (send handle assert! (topic-subscriber name)) (let loop ((mode 'string)) (sync (wrap-evt (send handle listen-evt) (lambda (m) (match m [(departed (topic 'publisher (== name) _) _) ;;(write `(closing writer ,name)) (newline) (tcp-abandon-port o)] [(says (topic 'publisher (== name) _) (tcp-option 'mode new-mode)) (loop new-mode)] [(says (topic 'publisher (== name) _) (tcp-data v)) ;;(write `(writing ,v to ,name)) (newline) (define credit-amount (match mode ['string (write-string v o) 1] ['bytes (write-bytes v o)])) (flush-output o) (send handle say (topic-subscriber name) (tcp-credit credit-amount)) (loop mode)] [_ (loop mode)])))))) (define (tcp-listen-driver room local-address listener) (define handle (join-room room 'LISTENER)) (define name (tcp-stream (wild-address) local-address)) (send handle assert! (topic-publisher name #:virtual? #t)) (let loop () (sync (wrap-evt (send handle listen-evt) (lambda (m) (match m [(departed (topic 'subscriber (== name) _) _) ;;(write `(closing listener ,name)) (newline) (tcp-close listener)] [_ ;;(write `(listener heard ,m)) (newline) (loop)]))) (wrap-evt (tcp-accept-evt listener) (lambda (v) (match-define (list i o) v) (match-define (list _ _ remote-host remote-port) (socket-name i)) (define new-stream (tcp-stream (tcp-address remote-host remote-port) local-address)) (standard-thread (lambda () (tcp-read-driver room new-stream i))) (standard-thread (lambda () (tcp-write-driver room (flip-stream new-stream) o))) (loop))))))