TCP driver; simple TCP example; chat room TCP example.
This commit is contained in:
parent
9782abba70
commit
96247daae3
|
@ -0,0 +1,183 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/match)
|
||||
(require (prefix-in tcp: racket/tcp))
|
||||
(require (only-in racket/port read-bytes-avail!-evt))
|
||||
(require (only-in web-server/private/util exn->string))
|
||||
(require "../main.rkt")
|
||||
(require "../demand-matcher.rkt")
|
||||
|
||||
(require racket/unit)
|
||||
(require net/tcp-sig)
|
||||
(require net/tcp-unit)
|
||||
|
||||
(provide (struct-out tcp-address)
|
||||
(struct-out tcp-handle)
|
||||
(struct-out tcp-listener)
|
||||
(struct-out tcp-channel)
|
||||
spawn-tcp-driver)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Protocol messages
|
||||
|
||||
(struct tcp-address (host port) #:prefab)
|
||||
(struct tcp-handle (id) #:prefab)
|
||||
(struct tcp-listener (port) #:prefab)
|
||||
|
||||
(struct tcp-channel (source destination subpacket) #:prefab)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Ground-level communication messages
|
||||
|
||||
(struct tcp-accepted (remote-addr local-addr cin cout) #:prefab)
|
||||
;; tcp-channel does double-duty as a ground-level message as well
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Driver
|
||||
|
||||
(define (spawn-tcp-driver)
|
||||
(list (spawn-demand-matcher (advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?)))
|
||||
(advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?)))
|
||||
spawn-tcp-listener)
|
||||
(spawn-demand-matcher (observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
|
||||
(advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
|
||||
spawn-tcp-connection)))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Listener
|
||||
|
||||
(struct listener-state (control-ch server-addr) #:transparent)
|
||||
|
||||
(define (tcp-listener-thread control-ch listener server-addr)
|
||||
(let loop ((blocked? #t))
|
||||
(sync (handle-evt control-ch
|
||||
(match-lambda
|
||||
['unblock (loop #f)]
|
||||
['quit (void)]))
|
||||
(if blocked?
|
||||
never-evt
|
||||
(handle-evt (tcp:tcp-accept-evt listener)
|
||||
(lambda (cin+cout)
|
||||
(match-define (list cin cout) cin+cout)
|
||||
(define-values (local-hostname local-port remote-hostname remote-port)
|
||||
(tcp:tcp-addresses cin #t))
|
||||
(send-ground-message
|
||||
(tcp-accepted (tcp-address remote-hostname remote-port)
|
||||
server-addr
|
||||
cin
|
||||
cout))
|
||||
(loop blocked?))))))
|
||||
(tcp:tcp-close listener))
|
||||
|
||||
(define (tcp-listener-behavior e state)
|
||||
(match e
|
||||
[(? patch? p)
|
||||
(define ch (listener-state-control-ch state))
|
||||
(cond [(patch/removed? p) (channel-put ch 'quit) (quit)]
|
||||
[(patch/added? p) (channel-put ch 'unblock) #f]
|
||||
[else #f])]
|
||||
[(message (at-meta (tcp-accepted remote-addr _ cin cout)))
|
||||
(transition state (spawn-connection (listener-state-server-addr state)
|
||||
remote-addr
|
||||
cin
|
||||
cout))]
|
||||
[_ #f]))
|
||||
|
||||
(define (spawn-tcp-listener server-addr)
|
||||
(match-define (tcp-listener port) server-addr)
|
||||
(define listener (tcp:tcp-listen port 4 #t))
|
||||
(define control-ch (make-channel))
|
||||
(thread (lambda () (tcp-listener-thread control-ch listener server-addr)))
|
||||
(spawn tcp-listener-behavior
|
||||
(listener-state control-ch server-addr)
|
||||
(sub (advertise (observe (tcp-channel ? server-addr ?)))) ;; monitor peer
|
||||
(pub (advertise (tcp-channel ? server-addr ?))) ;; declare we might make connections
|
||||
(sub (tcp-accepted ? server-addr ? ?) #:meta-level 1) ;; events from driver thread
|
||||
))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Outbound Connection
|
||||
|
||||
(define (spawn-tcp-connection local-addr remote-addr)
|
||||
(match-define (tcp-address remote-hostname remote-port) remote-addr)
|
||||
(define-values (cin cout)
|
||||
(with-handlers ([exn:fail:network? (lambda (e)
|
||||
;; TODO: it'd be nice to
|
||||
;; somehow communicate the
|
||||
;; actual error to the local
|
||||
;; peer.
|
||||
(log-error "~a" (exn->string e))
|
||||
(define o (open-output-string))
|
||||
(close-output-port o)
|
||||
(values (open-input-string "")
|
||||
o))])
|
||||
(tcp:tcp-connect remote-hostname remote-port)))
|
||||
(spawn-connection local-addr remote-addr cin cout))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Connection
|
||||
|
||||
(struct connection-state (control-ch cout) #:transparent)
|
||||
|
||||
(define (read-bytes-avail-evt len input-port)
|
||||
(guard-evt
|
||||
(lambda ()
|
||||
(let ([bstr (make-bytes len)])
|
||||
(handle-evt
|
||||
(read-bytes-avail!-evt bstr input-port)
|
||||
(lambda (v)
|
||||
(if (number? v)
|
||||
(if (= v len) bstr (subbytes bstr 0 v))
|
||||
v)))))))
|
||||
|
||||
(define (tcp-connection-thread remote-addr local-addr control-ch cin)
|
||||
(let loop ((blocked? #t))
|
||||
(sync (handle-evt control-ch
|
||||
(match-lambda
|
||||
['unblock (loop #f)]
|
||||
['quit (void)]))
|
||||
(if blocked?
|
||||
never-evt
|
||||
(handle-evt (read-bytes-avail-evt 32768 cin)
|
||||
(lambda (eof-or-bs)
|
||||
(send-ground-message (tcp-channel remote-addr local-addr eof-or-bs))
|
||||
(loop (or blocked? (eof-object? eof-or-bs))))))))
|
||||
(close-input-port cin))
|
||||
|
||||
(define (shutdown-connection! state)
|
||||
(match-define (connection-state control-ch cout) state)
|
||||
(when control-ch (channel-put control-ch 'quit))
|
||||
(when cout (close-output-port cout)))
|
||||
|
||||
(define (tcp-connection e state)
|
||||
(with-handlers [((lambda (exn) #t)
|
||||
(lambda (exn)
|
||||
(shutdown-connection! state)
|
||||
(raise exn)))]
|
||||
(match e
|
||||
[(message (at-meta (tcp-channel remote-addr local-addr (? eof-object?))))
|
||||
(shutdown-connection! state)
|
||||
(quit)]
|
||||
[(message (at-meta (tcp-channel remote-addr local-addr (? bytes? bs))))
|
||||
(transition state (message (tcp-channel remote-addr local-addr bs)))]
|
||||
[(message (tcp-channel _ _ bs))
|
||||
(write-bytes bs (connection-state-cout state))
|
||||
(flush-output (connection-state-cout state))
|
||||
#f]
|
||||
[(? patch? p)
|
||||
(define ch (connection-state-control-ch state))
|
||||
(cond [(patch/removed? p) (shutdown-connection! state) (quit)]
|
||||
[(patch/added? p) (channel-put ch 'unblock) #f]
|
||||
[else #f])]
|
||||
[#f #f])))
|
||||
|
||||
(define (spawn-connection local-addr remote-addr cin cout)
|
||||
(define control-ch (make-channel))
|
||||
(thread (lambda () (tcp-connection-thread remote-addr local-addr control-ch cin)))
|
||||
(spawn tcp-connection
|
||||
(connection-state control-ch cout)
|
||||
(sub (observe (tcp-channel remote-addr local-addr ?))) ;; monitor peer
|
||||
(pub (tcp-channel remote-addr local-addr ?)) ;; may send segments to peer
|
||||
(sub (tcp-channel local-addr remote-addr ?)) ;; want segments from peer
|
||||
(sub (tcp-channel remote-addr local-addr ?) #:meta-level 1) ;; segments from driver thread
|
||||
))
|
|
@ -0,0 +1,43 @@
|
|||
#lang prospect
|
||||
|
||||
(require (only-in racket/string string-trim))
|
||||
(require "../drivers/tcp.rkt")
|
||||
(require "../demand-matcher.rkt")
|
||||
|
||||
(define (spawn-session them us)
|
||||
(define user (gensym 'user))
|
||||
(define remote-detector (compile-projection (at-meta (?!))))
|
||||
(define peer-detector (compile-projection (advertise `(,(?!) says ,?))))
|
||||
(define (send-to-remote fmt . vs)
|
||||
(message (at-meta (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
|
||||
(define (say who fmt . vs)
|
||||
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
|
||||
(list (send-to-remote "Welcome, ~a.\n" user)
|
||||
(spawn/stateless
|
||||
(lambda (e)
|
||||
(match e
|
||||
[(message (at-meta (tcp-channel _ _ bs)))
|
||||
(message `(,user says ,(string-trim (bytes->string/utf-8 bs))))]
|
||||
[(message `(,who says ,what))
|
||||
(say who "says: ~a" what)]
|
||||
[(? patch? p)
|
||||
(if (patch/removed? (patch-project p remote-detector))
|
||||
(quit (send-to-remote "Goodbye!\n"))
|
||||
(let-values (((arrived departed) (patch-project/set/single p peer-detector)))
|
||||
(list (for/list [(who arrived)] (say who "arrived."))
|
||||
(for/list [(who departed)] (say who "departed.")))))]
|
||||
[#f #f]))
|
||||
(sub `(,? says ,?)) ;; read actual chat messages
|
||||
(sub (advertise `(,? says ,?))) ;; observe peer presence
|
||||
(pub `(,user says ,?)) ;; advertise our presence
|
||||
(sub (tcp-channel them us ?) #:meta-level 1) ;; read from remote client
|
||||
(sub (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client
|
||||
(pub (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client
|
||||
)))
|
||||
|
||||
(spawn-tcp-driver)
|
||||
(spawn-world
|
||||
(spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
|
||||
(observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
|
||||
#:meta-level 1
|
||||
spawn-session))
|
|
@ -0,0 +1,36 @@
|
|||
#lang prospect
|
||||
|
||||
(require "../drivers/tcp.rkt")
|
||||
(require "../demand-matcher.rkt")
|
||||
|
||||
(spawn-tcp-driver)
|
||||
|
||||
(define server-id (tcp-listener 5999))
|
||||
|
||||
(define (spawn-connection-handler c)
|
||||
(log-info "spawn-connection-handler ~v" c)
|
||||
(define (connection-handler e n)
|
||||
(when e (log-info "connection-handler ~v: ~v /// ~v" c e n))
|
||||
(match e
|
||||
[(? patch/removed?) (quit)]
|
||||
[(message (tcp-channel src dst #"quit\n"))
|
||||
(quit (message (tcp-channel dst src #"OK, then.\n")))]
|
||||
[(message (tcp-channel src dst bs))
|
||||
(transition n (message (tcp-channel dst src (string->bytes/utf-8
|
||||
(format "You said: ~a" bs)))))]
|
||||
[_
|
||||
(and (< n 5)
|
||||
(transition (+ n 1) (message (tcp-channel server-id c (string->bytes/utf-8
|
||||
(format "msg ~v\n" n))))))]))
|
||||
(spawn connection-handler
|
||||
0
|
||||
(sub (advertise (tcp-channel c server-id ?)))
|
||||
(sub (tcp-channel c server-id ?))
|
||||
(pub (tcp-channel server-id c ?))))
|
||||
|
||||
(spawn-demand-matcher (advertise (tcp-channel (?!) server-id ?))
|
||||
(observe (tcp-channel (?!) server-id ?))
|
||||
spawn-connection-handler
|
||||
(lambda (c)
|
||||
(log-info "Connection handler ~v decided to exit" c)
|
||||
'()))
|
Loading…
Reference in New Issue