144 lines
5.1 KiB
Racket
144 lines
5.1 KiB
Racket
#lang racket/base
|
|
|
|
(require racket/port)
|
|
(require racket/class)
|
|
(require racket/match)
|
|
(require racket/tcp)
|
|
(require "conversation.rkt")
|
|
|
|
(provide (struct-out set-options)
|
|
tcp-server-actor
|
|
tcp-client-actor)
|
|
|
|
(struct set-options (new-values) #:prefab)
|
|
|
|
(define (socket-name role s)
|
|
(define-values (local-ip local-port remote-ip remote-port)
|
|
(tcp-addresses s #t))
|
|
(list role local-ip local-port remote-ip remote-port))
|
|
|
|
(define (option-value options key [missing-value #f])
|
|
(cond
|
|
((assq key options) => cadr)
|
|
(else missing-value)))
|
|
|
|
(define (tcp-server-actor room options . tcp-listener-args)
|
|
(define listener (apply tcp-listen tcp-listener-args))
|
|
(define name (socket-name 'listener listener))
|
|
(thread (lambda ()
|
|
(define handle (join-room room name))
|
|
(log-info (format "Listening on ~v" name))
|
|
(let loop ((owner #f)
|
|
(remaining-credit (option-value options 'initial-accept-credit 0)))
|
|
(sync (handle-evt (send handle disconnected-evt)
|
|
(lambda (reason)
|
|
(log-error (format "~v: conversation closed: ~v" name reason))
|
|
(tcp-close listener)))
|
|
(handle-evt (send handle listen-evt)
|
|
(match-lambda
|
|
((arrived who)
|
|
(log-info (format "~v: New owner: ~v" name who))
|
|
(loop who remaining-credit))
|
|
((departed who why)
|
|
(if (equal? owner who)
|
|
(begin (log-info (format "~v: Owner departed, closing" name))
|
|
(tcp-close listener))
|
|
(loop owner remaining-credit)))
|
|
((says _ (credit _ amount) _)
|
|
(define new-credit (+ remaining-credit amount))
|
|
(log-info (format "~v: Credit now ~v" name new-credit))
|
|
(loop owner new-credit))
|
|
(unexpected
|
|
(log-warning (format "~v: Ignoring message: ~v" name unexpected))
|
|
(loop owner remaining-credit))))
|
|
(if (positive? remaining-credit)
|
|
(handle-evt (tcp-accept-evt listener)
|
|
(match-lambda
|
|
((list i o)
|
|
(send handle say
|
|
(tcp-socket-actor 'inbound-connection options i o)
|
|
'accepted)
|
|
(loop owner (- remaining-credit 1)))))
|
|
never-evt)))))
|
|
room)
|
|
|
|
(define (tcp-client-actor room options . tcp-connect-args)
|
|
(define-values (i o) (apply tcp-connect tcp-connect-args))
|
|
(tcp-socket-actor 'outbound-connection options i o))
|
|
|
|
(define (tcp-socket-actor role options i o [room (make-room)])
|
|
(define name (socket-name role i))
|
|
(define (close-ports)
|
|
(close-input-port i)
|
|
(close-output-port o))
|
|
(define (compute-terminator options)
|
|
;; See read-line-evt and friends.
|
|
(option-value options 'line-terminator 'any))
|
|
(define (compute-read-evt options)
|
|
(define read-mode (option-value options 'read-mode 'bytes))
|
|
(case read-mode
|
|
((bytes) (values (lambda (credit) (read-bytes-evt credit i))
|
|
bytes-length))
|
|
((lines) (values (lambda (credit) (read-line-evt i (compute-terminator options)))
|
|
(lambda (v) 1)))
|
|
((bytes-lines) (values (lambda (credit) (read-bytes-line-evt i (compute-terminator options)))
|
|
(lambda (v) 1)))
|
|
(else (error 'tcp-socket-actor "Illegal read-evt mode ~v" read-mode))))
|
|
(thread (lambda ()
|
|
(define handle (join-room room name))
|
|
(log-info (format "~v: New connection" name))
|
|
(with-handlers
|
|
((exn? (lambda (e)
|
|
(close-ports)
|
|
(raise e))))
|
|
(let loop ((options options)
|
|
(peer-count 0)
|
|
(remaining-credit (option-value options 'initial-read-credit 0)))
|
|
;;(write `(connection-loop ,options ,peer-count ,remaining-credit)) (newline)
|
|
(sync (handle-evt (send handle disconnected-evt)
|
|
(lambda (reason)
|
|
(log-error (format "~v: conversation closed: ~v" name reason))))
|
|
(handle-evt (send handle listen-evt)
|
|
(match-lambda
|
|
((arrived _)
|
|
(loop options (+ peer-count 1) remaining-credit))
|
|
((departed _ _)
|
|
(if (= peer-count 1)
|
|
(log-info (format "~v: Last peer departed" name))
|
|
(loop options (- peer-count 1) remaining-credit)))
|
|
((says _ (credit _ amount) _)
|
|
(loop options peer-count (+ remaining-credit amount)))
|
|
((says _ (? eof-object?) _)
|
|
(close-output-port o)
|
|
(loop options peer-count remaining-credit))
|
|
((says _ (? bytes? bs) _)
|
|
;; TODO: credit flow the other way?
|
|
(write-bytes bs o)
|
|
(flush-output o)
|
|
(loop options peer-count remaining-credit))
|
|
((says _ (? string? s) _)
|
|
;; TODO: credit flow the other way?
|
|
(write-string s o)
|
|
(flush-output o)
|
|
(loop options peer-count remaining-credit))
|
|
((says _ (set-options new-values) _)
|
|
(loop new-values peer-count remaining-credit))
|
|
(unexpected
|
|
(log-warning (format "~v: Ignoring message: ~v"
|
|
name unexpected))
|
|
(loop options peer-count remaining-credit))))
|
|
(if (positive? remaining-credit)
|
|
(let-values (((e-maker credit-adjuster) (compute-read-evt options)))
|
|
(handle-evt (e-maker remaining-credit)
|
|
(lambda (v)
|
|
(if (eof-object? v)
|
|
(begin (send handle say v 'eof)
|
|
(loop options peer-count 0))
|
|
(begin (send handle say v 'data)
|
|
(loop options peer-count
|
|
(- remaining-credit
|
|
(credit-adjuster v))))))))
|
|
never-evt)))
|
|
(close-ports))))
|
|
room)
|