racket-ssh-2012/conversation-socket.rkt

144 lines
5.1 KiB
Racket

#lang racket/base
(require racket/port)
(require racket/class)
(require racket/match)
(require racket/tcp)
(require "conversation.rkt")
(provide (struct-out set-options)
tcp-server-actor
tcp-client-actor)
(struct set-options (new-values) #:prefab)
(define (socket-name role s)
(define-values (local-ip local-port remote-ip remote-port)
(tcp-addresses s #t))
(list role local-ip local-port remote-ip remote-port))
(define (option-value options key [missing-value #f])
(cond
((assq key options) => cadr)
(else missing-value)))
(define (tcp-server-actor room options . tcp-listener-args)
(define listener (apply tcp-listen tcp-listener-args))
(define name (socket-name 'listener listener))
(thread (lambda ()
(define handle (join-room room name))
(log-info (format "Listening on ~v" name))
(let loop ((owner #f)
(remaining-credit (option-value options 'initial-accept-credit 0)))
(sync (handle-evt (send handle disconnected-evt)
(lambda (reason)
(log-error (format "~v: conversation closed: ~v" name reason))
(tcp-close listener)))
(handle-evt (send handle listen-evt)
(match-lambda
((arrived who)
(log-info (format "~v: New owner: ~v" name who))
(loop who remaining-credit))
((departed who why)
(if (equal? owner who)
(begin (log-info (format "~v: Owner departed, closing" name))
(tcp-close listener))
(loop owner remaining-credit)))
((says _ (credit _ amount) _)
(define new-credit (+ remaining-credit amount))
(log-info (format "~v: Credit now ~v" name new-credit))
(loop owner new-credit))
(unexpected
(log-warning (format "~v: Ignoring message: ~v" name unexpected))
(loop owner remaining-credit))))
(if (positive? remaining-credit)
(handle-evt (tcp-accept-evt listener)
(match-lambda
((list i o)
(send handle say
(tcp-socket-actor 'inbound-connection options i o)
'accepted)
(loop owner (- remaining-credit 1)))))
never-evt)))))
room)
(define (tcp-client-actor room options . tcp-connect-args)
(define-values (i o) (apply tcp-connect tcp-connect-args))
(tcp-socket-actor 'outbound-connection options i o))
(define (tcp-socket-actor role options i o [room (make-room)])
(define name (socket-name role i))
(define (close-ports)
(close-input-port i)
(close-output-port o))
(define (compute-terminator options)
;; See read-line-evt and friends.
(option-value options 'line-terminator 'any))
(define (compute-read-evt options)
(define read-mode (option-value options 'read-mode 'bytes))
(case read-mode
((bytes) (values (lambda (credit) (read-bytes-evt credit i))
bytes-length))
((lines) (values (lambda (credit) (read-line-evt i (compute-terminator options)))
(lambda (v) 1)))
((bytes-lines) (values (lambda (credit) (read-bytes-line-evt i (compute-terminator options)))
(lambda (v) 1)))
(else (error 'tcp-socket-actor "Illegal read-evt mode ~v" read-mode))))
(thread (lambda ()
(define handle (join-room room name))
(log-info (format "~v: New connection" name))
(with-handlers
((exn? (lambda (e)
(close-ports)
(raise e))))
(let loop ((options options)
(peer-count 0)
(remaining-credit (option-value options 'initial-read-credit 0)))
;;(write `(connection-loop ,options ,peer-count ,remaining-credit)) (newline)
(sync (handle-evt (send handle disconnected-evt)
(lambda (reason)
(log-error (format "~v: conversation closed: ~v" name reason))))
(handle-evt (send handle listen-evt)
(match-lambda
((arrived _)
(loop options (+ peer-count 1) remaining-credit))
((departed _ _)
(if (= peer-count 1)
(log-info (format "~v: Last peer departed" name))
(loop options (- peer-count 1) remaining-credit)))
((says _ (credit _ amount) _)
(loop options peer-count (+ remaining-credit amount)))
((says _ (? eof-object?) _)
(close-output-port o)
(loop options peer-count remaining-credit))
((says _ (? bytes? bs) _)
;; TODO: credit flow the other way?
(write-bytes bs o)
(flush-output o)
(loop options peer-count remaining-credit))
((says _ (? string? s) _)
;; TODO: credit flow the other way?
(write-string s o)
(flush-output o)
(loop options peer-count remaining-credit))
((says _ (set-options new-values) _)
(loop new-values peer-count remaining-credit))
(unexpected
(log-warning (format "~v: Ignoring message: ~v"
name unexpected))
(loop options peer-count remaining-credit))))
(if (positive? remaining-credit)
(let-values (((e-maker credit-adjuster) (compute-read-evt options)))
(handle-evt (e-maker remaining-credit)
(lambda (v)
(if (eof-object? v)
(begin (send handle say v 'eof)
(loop options peer-count 0))
(begin (send handle say v 'data)
(loop options peer-count
(- remaining-credit
(credit-adjuster v))))))))
never-evt)))
(close-ports))))
room)