racket-matrix-2012/chat-sync.rkt

58 lines
1.9 KiB
Racket

#lang racket/base
(require racket/set)
(require racket/tcp)
(require racket/port)
(define active-connection-mutex (make-semaphore 1))
(define active-connections (set))
(define (main)
(define listener (tcp-listen 5999 4 #t))
(let loop ()
(define-values (cin cout) (tcp-accept listener))
(define-values (local-host local-port remote-host remote-port) (tcp-addresses cin #t))
(thread (connection-handler cin cout (cons remote-host remote-port)))
(loop)))
(define (connection-handler cin cout connection-id)
(lambda ()
(with-handlers ((exn:fail? (lambda (e) (depart! connection-id) (raise e))))
(arrive! connection-id)
(let loop ()
(sync (wrap-evt (read-bytes-line-evt cin 'any)
(lambda (line)
(cond [(eof-object? line) 'done]
[else
(for ([c active-connections])
(thread-send (car c) `(says ,connection-id ,line)))
(loop)])))
(wrap-evt (thread-receive-evt)
(lambda (dummy)
(write (thread-receive) cout)
(newline cout)
(flush-output cout)
(loop)))))
(depart! connection-id))))
(define (arrive! connection-id)
(call-with-semaphore active-connection-mutex
(lambda ()
(thread-send (current-thread) `(arrived ,connection-id))
(for ([c active-connections])
(thread-send (current-thread) `(arrived ,(cdr c)))
(thread-send (car c) `(arrived ,connection-id)))
(define new-connections (set-add active-connections (cons (current-thread) connection-id)))
(set! active-connections new-connections))))
(define (depart! connection-id)
(call-with-semaphore active-connection-mutex
(lambda ()
(define new-connections (set-remove active-connections (cons (current-thread) connection-id)))
(set! active-connections new-connections)
(for ([c active-connections]) (thread-send (car c) `(departed ,connection-id))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(main)