racket-matrix-2012/presence/conversation-socket.rkt

167 lines
5.6 KiB
Racket

#lang racket/base
(require racket/port)
(require racket/class)
(require racket/match)
(require racket/tcp)
(require "conversation.rkt")
(require "standard-thread.rkt")
(provide (struct-out tcp-option)
(struct-out tcp-credit)
(struct-out tcp-data)
(struct-out tcp-eof)
(struct-out tcp-address)
(struct-out tcp-stream)
flip-stream
wild-address
wild-stream
tcp-driver)
(struct tcp-option (name value) #:prefab)
(struct tcp-credit (amount) #:prefab)
(struct tcp-data (chunk) #:prefab)
(struct tcp-eof () #:prefab)
;; The address of a TCP endpoint.
;;
;; The field host may be set to #f, in which case it means "some (or
;; all) local interface(s)". If port is a number, it's treated as a
;; TCP port directly; otherwise, it's treated as a local handle for an
;; automatically-allocated port.
(struct tcp-address (host port) #:prefab)
;; Identifies a (unidirectional) TCP stream. A single TCP connection
;; has two of these, one in each direction.
(struct tcp-stream (source sink) #:prefab)
(define (socket-name s)
(call-with-values (lambda () (tcp-addresses s #t)) list))
(define (flip-stream s)
(tcp-stream (tcp-stream-sink s) (tcp-stream-source s)))
(define (wild-address) (tcp-address (wild) (wild)))
(define (wild-stream) (tcp-stream (wild-address) (wild-address)))
(define (tcp-driver room)
(standard-thread
(lambda ()
(define handle (join-room room 'DRIVER))
(send handle assert! (topic-publisher (wild-stream) #:virtual? #t))
(let loop ()
(match (send handle listen)
[(arrived (topic 'subscriber
(tcp-stream (tcp-address (? wild?) (? wild?))
(and local-address (tcp-address #f local-handle)))
_))
;;(write `(listening on ,local-handle)) (newline)
(define listener (tcp-listen (if (number? local-handle) local-handle 0) 4 #t))
(standard-thread (lambda () (tcp-listen-driver room local-address listener)))
(loop)]
[(arrived (topic 'subscriber
(and name (tcp-stream (tcp-address remote-host remote-port)
(tcp-address #f local-handle)))
_))
(when (not (number? local-handle))
;;(write `(connecting to ,remote-host ,remote-port with handle ,local-handle)) (newline)
(define-values (i o)
(tcp-connect remote-host remote-port
#f (if (number? local-handle) local-handle #f)))
(standard-thread (lambda () (tcp-read-driver room name i)))
(standard-thread (lambda () (tcp-write-driver room (flip-stream name) o))))
(loop)]
[_
(loop)])))))
(define (tcp-read-driver room name i)
(define handle (join-room room 'READER))
(send handle assert! (topic-publisher name))
(let loop ((credit 0) (mode 'line))
(sync (wrap-evt (send handle listen-evt)
(lambda (m)
(match m
[(departed (topic 'subscriber (== name) _) _)
;;(write `(closing reader ,name)) (newline)
(tcp-abandon-port i)]
[(says (topic 'subscriber (== name) _) (tcp-option 'mode new-mode))
(loop credit new-mode)]
[(says (topic 'subscriber (== name) _) (tcp-credit amount))
(loop (+ credit amount) mode)]
[_
(loop credit mode)])))
(if (positive? credit)
(match mode
['line (handle-evt (read-line-evt i 'any)
(lambda (v)
(cond
[(eof-object? v)
(send handle say (topic-publisher name) (tcp-eof))
(loop 0 mode)]
[else
;;(write `(relaying line ,v from ,name)) (newline)
(send handle say (topic-publisher name) (tcp-data v))
(loop (- credit 1) mode)])))]
['bytes (handle-evt (read-bytes-evt credit i)
(lambda (v)
(cond
[(eof-object? v)
(send handle say (topic-publisher name) (tcp-eof))
(loop 0 mode)]
[else
;;(write `(relaying bytes ,v from ,name)) (newline)
(send handle say (topic-publisher name) (tcp-data v))
(loop (- credit (bytes-length v)) mode)])))])
never-evt))))
(define (tcp-write-driver room name o)
(define handle (join-room room 'WRITER))
(send handle assert! (topic-subscriber name))
(let loop ((mode 'string))
(sync (wrap-evt (send handle listen-evt)
(lambda (m)
(match m
[(departed (topic 'publisher (== name) _) _)
;;(write `(closing writer ,name)) (newline)
(tcp-abandon-port o)]
[(says (topic 'publisher (== name) _) (tcp-option 'mode new-mode))
(loop new-mode)]
[(says (topic 'publisher (== name) _) (tcp-data v))
;;(write `(writing ,v to ,name)) (newline)
(define credit-amount
(match mode
['string (write-string v o)
1]
['bytes (write-bytes v o)]))
(flush-output o)
(send handle say (topic-subscriber name) (tcp-credit credit-amount))
(loop mode)]
[_
(loop mode)]))))))
(define (tcp-listen-driver room local-address listener)
(define handle (join-room room 'LISTENER))
(define name (tcp-stream (wild-address) local-address))
(send handle assert! (topic-publisher name #:virtual? #t))
(let loop ()
(sync (wrap-evt (send handle listen-evt)
(lambda (m)
(match m
[(departed (topic 'subscriber (== name) _) _)
;;(write `(closing listener ,name)) (newline)
(tcp-close listener)]
[_
;;(write `(listener heard ,m)) (newline)
(loop)])))
(wrap-evt (tcp-accept-evt listener)
(lambda (v)
(match-define (list i o) v)
(match-define (list _ _ remote-host remote-port) (socket-name i))
(define new-stream
(tcp-stream (tcp-address remote-host remote-port) local-address))
(standard-thread
(lambda () (tcp-read-driver room new-stream i)))
(standard-thread
(lambda () (tcp-write-driver room (flip-stream new-stream) o)))
(loop))))))