diff --git a/chat-os2.rkt b/chat-os2.rkt new file mode 100644 index 0000000..3e16ec0 --- /dev/null +++ b/chat-os2.rkt @@ -0,0 +1,48 @@ +#lang racket/base + +(require racket/port) +(require racket/match) +(require "os2.rkt") +(require "fake-tcp.rkt") + +(define (main) + (ground-vm + (transition 'none + (spawn tcp-driver) + (spawn (nested-vm 'chat-vm + (transition 'no-state + (at-meta-level + (role/anon (tcp-listener 5999) + #:topic t + #:on-presence (spawn (connection-handler t)))))))))) + +(define (connection-handler t) + (match-define (topic _ (tcp-channel connection-id _ _) _) t) + (define-values (cin cout in-topic out-topic) (tcp-accept t)) + (transition 'no-state + (at-meta-level (cin (tcp-mode 'lines))) + (at-meta-level (cin (tcp-credit 1))) + (at-meta-level (role/anon out-topic)) + (at-meta-level (role/anon in-topic + [(tcp-channel _ _ (? eof-object?)) (kill)] + [(tcp-channel _ _ (? bytes? line)) + (list (at-meta-level (cin (tcp-credit 1))) + (send-message `(,connection-id says ,line)))])) + (role/anon (topic-publisher `(,connection-id says ,(wild)))) + (role/anon (topic-subscriber `(,(wild) says ,(wild))) + #:topic t + #:on-presence (match t [(topic _ `(,who ,_ ,_) _) + (at-meta-level (cout (term->bytes (if (equal? who connection-id) + `(you-are ,connection-id) + `(,who arrived)))))]) + #:on-absence (match t [(topic _ `(,who ,_ ,_) _) + (when (not (equal? who connection-id)) + (at-meta-level (cout (term->bytes `(,who departed)))))]) + [message (at-meta-level (cout (term->bytes message)))]))) + +(define (term->bytes v) + (with-output-to-bytes (lambda () (write v) (newline)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(main) diff --git a/chat-sync.rkt b/chat-sync.rkt new file mode 100644 index 0000000..0e61bdc --- /dev/null +++ b/chat-sync.rkt @@ -0,0 +1,62 @@ +#lang racket/base + +(require racket/set) +(require racket/tcp) +(require racket/port) + +(define active-connection-mutex (make-semaphore 1)) +(define active-connections (set)) + +(define (main) + (define listener (tcp-listen 5999 4 #t)) + (let loop () + (define-values (cin cout) (tcp-accept listener)) + (define-values (local-host local-port remote-host remote-port) (tcp-addresses cin #t)) + (thread (connection-handler cin cout (cons remote-host remote-port))) + (loop))) + +(define (connection-handler cin cout connection-id) + (lambda () + (with-handlers ((exn:fail? (lambda (e) (depart! connection-id) (raise e)))) + (arrive! connection-id) + (let loop () + (sync (wrap-evt (read-bytes-line-evt cin 'any) + (lambda (line) + (if (eof-object? line) + 'done + (begin + (call-with-semaphore active-connection-mutex + (lambda () + (for ([c active-connections]) + (thread-send (car c) `(says ,connection-id ,line))))) + (loop))))) + (wrap-evt (thread-receive-evt) + (lambda (dummy) + (define message (thread-receive)) + (write message cout) + (newline cout) + (flush-output cout) + (loop))))) + (depart! connection-id)))) + +(define (arrive! connection-id) + (call-with-semaphore active-connection-mutex + (lambda () + (thread-send (current-thread) `(you-are ,connection-id)) + (for ([c active-connections]) + (thread-send (current-thread) `(arrived ,(cdr c))) + (thread-send (car c) `(arrived ,connection-id))) + (define new-connections (set-add active-connections (cons (current-thread) connection-id))) + (set! active-connections new-connections)))) + +(define (depart! connection-id) + (call-with-semaphore + active-connection-mutex + (lambda () + (define new-connections (set-remove active-connections (cons (current-thread) connection-id))) + (set! active-connections new-connections) + (for ([c active-connections]) (thread-send (car c) `(departed ,connection-id)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(main) diff --git a/fake-tcp.rkt b/fake-tcp.rkt new file mode 100644 index 0000000..921363c --- /dev/null +++ b/fake-tcp.rkt @@ -0,0 +1,36 @@ +#lang racket/base + +(require racket/match) + +(require "os2.rkt") +(require (prefix-in base: "os2-tcp.rkt")) + +(provide (rename-out [base:tcp-driver tcp-driver]) + (rename-out [base:tcp-spy tcp-spy]) + + tcp-listener + tcp-accept + + (rename-out [base:tcp-channel tcp-channel]) + (rename-out [base:tcp-credit tcp-credit]) + (rename-out [base:tcp-mode tcp-mode])) + +;; Returns a topic that can be subscribed to in order to be told about +;; conversations coming and going relating to TCP connections arriving +;; at a local server socket. The server socket is created implicitly +;; as the result of subscribing to this topic. +(define (tcp-listener port) + (topic-subscriber (base:tcp-channel (wild) (base:tcp-listener port) (wild)) + #:monitor? #t)) + +;; Given the topic of a newly-arriving connection, creates helper +;; functions that create communication requests relating to that +;; connection, and also creates topics useful for hearing +;; communications relating to that connection. +(define (tcp-accept new-topic) + (match-define (topic _ (base:tcp-channel remote-addr local-addr _) _) new-topic) + (define (cin feedback) (send-feedback (base:tcp-channel remote-addr local-addr feedback))) + (define (cout data) (send-message (base:tcp-channel local-addr remote-addr data))) + (define in-topic (topic-subscriber (base:tcp-channel remote-addr local-addr (wild)))) + (define out-topic (topic-publisher (base:tcp-channel local-addr remote-addr (wild)))) + (values cin cout in-topic out-topic))