2012-03-11 01:26:02 +00:00
|
|
|
#lang racket/base
|
|
|
|
|
|
|
|
(require racket/port)
|
|
|
|
(require racket/class)
|
|
|
|
(require racket/match)
|
|
|
|
(require racket/tcp)
|
|
|
|
(require "conversation.rkt")
|
2012-03-11 17:08:04 +00:00
|
|
|
(require "standard-thread.rkt")
|
2012-03-11 01:26:02 +00:00
|
|
|
|
2012-03-11 17:08:04 +00:00
|
|
|
(provide (struct-out tcp-option)
|
|
|
|
(struct-out tcp-credit)
|
|
|
|
(struct-out tcp-data)
|
|
|
|
(struct-out tcp-eof)
|
|
|
|
(struct-out tcp-address)
|
|
|
|
(struct-out tcp-stream)
|
|
|
|
flip-stream
|
|
|
|
wild-address
|
|
|
|
wild-stream
|
|
|
|
tcp-driver)
|
2012-03-11 01:26:02 +00:00
|
|
|
|
2012-03-11 17:08:04 +00:00
|
|
|
(struct tcp-option (name value) #:prefab)
|
|
|
|
(struct tcp-credit (amount) #:prefab)
|
2012-03-13 19:08:32 +00:00
|
|
|
(struct tcp-data (chunk) #:prefab)
|
2012-03-11 17:08:04 +00:00
|
|
|
(struct tcp-eof () #:prefab)
|
2012-03-11 01:26:02 +00:00
|
|
|
|
2012-03-11 17:08:04 +00:00
|
|
|
;; The address of a TCP endpoint.
|
|
|
|
;;
|
|
|
|
;; The field host may be set to #f, in which case it means "some (or
|
|
|
|
;; all) local interface(s)". If port is a number, it's treated as a
|
|
|
|
;; TCP port directly; otherwise, it's treated as a local handle for an
|
|
|
|
;; automatically-allocated port.
|
|
|
|
(struct tcp-address (host port) #:prefab)
|
2012-03-11 01:26:02 +00:00
|
|
|
|
2012-03-11 17:08:04 +00:00
|
|
|
;; Identifies a (unidirectional) TCP stream. A single TCP connection
|
|
|
|
;; has two of these, one in each direction.
|
|
|
|
(struct tcp-stream (source sink) #:prefab)
|
2012-03-11 01:26:02 +00:00
|
|
|
|
2012-03-11 17:08:04 +00:00
|
|
|
(define (socket-name s)
|
|
|
|
(call-with-values (lambda () (tcp-addresses s #t)) list))
|
2012-03-11 01:26:02 +00:00
|
|
|
|
2012-03-11 17:08:04 +00:00
|
|
|
(define (flip-stream s)
|
|
|
|
(tcp-stream (tcp-stream-sink s) (tcp-stream-source s)))
|
2012-03-11 01:26:02 +00:00
|
|
|
|
2012-03-11 17:08:04 +00:00
|
|
|
(define (wild-address) (tcp-address (wild) (wild)))
|
|
|
|
(define (wild-stream) (tcp-stream (wild-address) (wild-address)))
|
|
|
|
|
|
|
|
(define (tcp-driver room)
|
|
|
|
(standard-thread
|
|
|
|
(lambda ()
|
|
|
|
(define handle (join-room room 'DRIVER))
|
|
|
|
(send handle assert! (topic-publisher (wild-stream) #:virtual? #t))
|
|
|
|
(let loop ()
|
|
|
|
(match (send handle listen)
|
|
|
|
[(arrived (topic 'subscriber
|
|
|
|
(tcp-stream (tcp-address (? wild?) (? wild?))
|
|
|
|
(and local-address (tcp-address #f local-handle)))
|
|
|
|
_))
|
|
|
|
;;(write `(listening on ,local-handle)) (newline)
|
|
|
|
(define listener (tcp-listen (if (number? local-handle) local-handle 0) 4 #t))
|
|
|
|
(standard-thread (lambda () (tcp-listen-driver room local-address listener)))
|
|
|
|
(loop)]
|
|
|
|
[(arrived (topic 'subscriber
|
|
|
|
(and name (tcp-stream (tcp-address remote-host remote-port)
|
|
|
|
(tcp-address #f local-handle)))
|
|
|
|
_))
|
|
|
|
(when (not (number? local-handle))
|
|
|
|
;;(write `(connecting to ,remote-host ,remote-port with handle ,local-handle)) (newline)
|
|
|
|
(define-values (i o)
|
|
|
|
(tcp-connect remote-host remote-port
|
|
|
|
#f (if (number? local-handle) local-handle #f)))
|
|
|
|
(standard-thread (lambda () (tcp-read-driver room name i)))
|
|
|
|
(standard-thread (lambda () (tcp-write-driver room (flip-stream name) o))))
|
|
|
|
(loop)]
|
|
|
|
[_
|
|
|
|
(loop)])))))
|
|
|
|
|
|
|
|
(define (tcp-read-driver room name i)
|
|
|
|
(define handle (join-room room 'READER))
|
|
|
|
(send handle assert! (topic-publisher name))
|
|
|
|
(let loop ((credit 0) (mode 'line))
|
|
|
|
(sync (wrap-evt (send handle listen-evt)
|
|
|
|
(lambda (m)
|
|
|
|
(match m
|
|
|
|
[(departed (topic 'subscriber (== name) _) _)
|
|
|
|
;;(write `(closing reader ,name)) (newline)
|
|
|
|
(tcp-abandon-port i)]
|
|
|
|
[(says (topic 'subscriber (== name) _) (tcp-option 'mode new-mode))
|
|
|
|
(loop credit new-mode)]
|
|
|
|
[(says (topic 'subscriber (== name) _) (tcp-credit amount))
|
|
|
|
(loop (+ credit amount) mode)]
|
|
|
|
[_
|
|
|
|
(loop credit mode)])))
|
|
|
|
(if (positive? credit)
|
|
|
|
(match mode
|
|
|
|
['line (handle-evt (read-line-evt i 'any)
|
|
|
|
(lambda (v)
|
|
|
|
(cond
|
|
|
|
[(eof-object? v)
|
|
|
|
(send handle say (topic-publisher name) (tcp-eof))
|
|
|
|
(loop 0 mode)]
|
|
|
|
[else
|
|
|
|
;;(write `(relaying line ,v from ,name)) (newline)
|
2012-03-13 19:08:32 +00:00
|
|
|
(send handle say (topic-publisher name) (tcp-data v))
|
2012-03-11 17:08:04 +00:00
|
|
|
(loop (- credit 1) mode)])))]
|
|
|
|
['bytes (handle-evt (read-bytes-evt credit i)
|
|
|
|
(lambda (v)
|
|
|
|
(cond
|
|
|
|
[(eof-object? v)
|
|
|
|
(send handle say (topic-publisher name) (tcp-eof))
|
|
|
|
(loop 0 mode)]
|
|
|
|
[else
|
|
|
|
;;(write `(relaying bytes ,v from ,name)) (newline)
|
2012-03-13 19:08:32 +00:00
|
|
|
(send handle say (topic-publisher name) (tcp-data v))
|
2012-03-11 17:08:04 +00:00
|
|
|
(loop (- credit (bytes-length v)) mode)])))])
|
|
|
|
never-evt))))
|
|
|
|
|
|
|
|
(define (tcp-write-driver room name o)
|
|
|
|
(define handle (join-room room 'WRITER))
|
|
|
|
(send handle assert! (topic-subscriber name))
|
|
|
|
(let loop ((mode 'string))
|
|
|
|
(sync (wrap-evt (send handle listen-evt)
|
|
|
|
(lambda (m)
|
|
|
|
(match m
|
|
|
|
[(departed (topic 'publisher (== name) _) _)
|
|
|
|
;;(write `(closing writer ,name)) (newline)
|
|
|
|
(tcp-abandon-port o)]
|
|
|
|
[(says (topic 'publisher (== name) _) (tcp-option 'mode new-mode))
|
|
|
|
(loop new-mode)]
|
2012-03-13 19:08:32 +00:00
|
|
|
[(says (topic 'publisher (== name) _) (tcp-data v))
|
2012-03-11 17:08:04 +00:00
|
|
|
;;(write `(writing ,v to ,name)) (newline)
|
|
|
|
(define credit-amount
|
|
|
|
(match mode
|
|
|
|
['string (write-string v o)
|
|
|
|
1]
|
|
|
|
['bytes (write-bytes v o)]))
|
|
|
|
(flush-output o)
|
|
|
|
(send handle say (topic-subscriber name) (tcp-credit credit-amount))
|
|
|
|
(loop mode)]
|
|
|
|
[_
|
|
|
|
(loop mode)]))))))
|
|
|
|
|
|
|
|
(define (tcp-listen-driver room local-address listener)
|
|
|
|
(define handle (join-room room 'LISTENER))
|
|
|
|
(define name (tcp-stream (wild-address) local-address))
|
|
|
|
(send handle assert! (topic-publisher name #:virtual? #t))
|
|
|
|
(let loop ()
|
|
|
|
(sync (wrap-evt (send handle listen-evt)
|
|
|
|
(lambda (m)
|
|
|
|
(match m
|
|
|
|
[(departed (topic 'subscriber (== name) _) _)
|
|
|
|
;;(write `(closing listener ,name)) (newline)
|
|
|
|
(tcp-close listener)]
|
|
|
|
[_
|
|
|
|
;;(write `(listener heard ,m)) (newline)
|
|
|
|
(loop)])))
|
|
|
|
(wrap-evt (tcp-accept-evt listener)
|
|
|
|
(lambda (v)
|
|
|
|
(match-define (list i o) v)
|
|
|
|
(match-define (list _ _ remote-host remote-port) (socket-name i))
|
|
|
|
(define new-stream
|
|
|
|
(tcp-stream (tcp-address remote-host remote-port) local-address))
|
|
|
|
(standard-thread
|
|
|
|
(lambda () (tcp-read-driver room new-stream i)))
|
|
|
|
(standard-thread
|
|
|
|
(lambda () (tcp-write-driver room (flip-stream new-stream) o)))
|
|
|
|
(loop))))))
|