Abbreviated TCP OS2 interface, and two programs to compare.
This commit is contained in:
parent
cf4ca5c8bd
commit
59469d68f8
|
@ -0,0 +1,48 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/port)
|
||||
(require racket/match)
|
||||
(require "os2.rkt")
|
||||
(require "fake-tcp.rkt")
|
||||
|
||||
(define (main)
|
||||
(ground-vm
|
||||
(transition 'none
|
||||
(spawn tcp-driver)
|
||||
(spawn (nested-vm 'chat-vm
|
||||
(transition 'no-state
|
||||
(at-meta-level
|
||||
(role/anon (tcp-listener 5999)
|
||||
#:topic t
|
||||
#:on-presence (spawn (connection-handler t))))))))))
|
||||
|
||||
(define (connection-handler t)
|
||||
(match-define (topic _ (tcp-channel connection-id _ _) _) t)
|
||||
(define-values (cin cout in-topic out-topic) (tcp-accept t))
|
||||
(transition 'no-state
|
||||
(at-meta-level (cin (tcp-mode 'lines)))
|
||||
(at-meta-level (cin (tcp-credit 1)))
|
||||
(at-meta-level (role/anon out-topic))
|
||||
(at-meta-level (role/anon in-topic
|
||||
[(tcp-channel _ _ (? eof-object?)) (kill)]
|
||||
[(tcp-channel _ _ (? bytes? line))
|
||||
(list (at-meta-level (cin (tcp-credit 1)))
|
||||
(send-message `(,connection-id says ,line)))]))
|
||||
(role/anon (topic-publisher `(,connection-id says ,(wild))))
|
||||
(role/anon (topic-subscriber `(,(wild) says ,(wild)))
|
||||
#:topic t
|
||||
#:on-presence (match t [(topic _ `(,who ,_ ,_) _)
|
||||
(at-meta-level (cout (term->bytes (if (equal? who connection-id)
|
||||
`(you-are ,connection-id)
|
||||
`(,who arrived)))))])
|
||||
#:on-absence (match t [(topic _ `(,who ,_ ,_) _)
|
||||
(when (not (equal? who connection-id))
|
||||
(at-meta-level (cout (term->bytes `(,who departed)))))])
|
||||
[message (at-meta-level (cout (term->bytes message)))])))
|
||||
|
||||
(define (term->bytes v)
|
||||
(with-output-to-bytes (lambda () (write v) (newline))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(main)
|
|
@ -0,0 +1,62 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/set)
|
||||
(require racket/tcp)
|
||||
(require racket/port)
|
||||
|
||||
(define active-connection-mutex (make-semaphore 1))
|
||||
(define active-connections (set))
|
||||
|
||||
(define (main)
|
||||
(define listener (tcp-listen 5999 4 #t))
|
||||
(let loop ()
|
||||
(define-values (cin cout) (tcp-accept listener))
|
||||
(define-values (local-host local-port remote-host remote-port) (tcp-addresses cin #t))
|
||||
(thread (connection-handler cin cout (cons remote-host remote-port)))
|
||||
(loop)))
|
||||
|
||||
(define (connection-handler cin cout connection-id)
|
||||
(lambda ()
|
||||
(with-handlers ((exn:fail? (lambda (e) (depart! connection-id) (raise e))))
|
||||
(arrive! connection-id)
|
||||
(let loop ()
|
||||
(sync (wrap-evt (read-bytes-line-evt cin 'any)
|
||||
(lambda (line)
|
||||
(if (eof-object? line)
|
||||
'done
|
||||
(begin
|
||||
(call-with-semaphore active-connection-mutex
|
||||
(lambda ()
|
||||
(for ([c active-connections])
|
||||
(thread-send (car c) `(says ,connection-id ,line)))))
|
||||
(loop)))))
|
||||
(wrap-evt (thread-receive-evt)
|
||||
(lambda (dummy)
|
||||
(define message (thread-receive))
|
||||
(write message cout)
|
||||
(newline cout)
|
||||
(flush-output cout)
|
||||
(loop)))))
|
||||
(depart! connection-id))))
|
||||
|
||||
(define (arrive! connection-id)
|
||||
(call-with-semaphore active-connection-mutex
|
||||
(lambda ()
|
||||
(thread-send (current-thread) `(you-are ,connection-id))
|
||||
(for ([c active-connections])
|
||||
(thread-send (current-thread) `(arrived ,(cdr c)))
|
||||
(thread-send (car c) `(arrived ,connection-id)))
|
||||
(define new-connections (set-add active-connections (cons (current-thread) connection-id)))
|
||||
(set! active-connections new-connections))))
|
||||
|
||||
(define (depart! connection-id)
|
||||
(call-with-semaphore
|
||||
active-connection-mutex
|
||||
(lambda ()
|
||||
(define new-connections (set-remove active-connections (cons (current-thread) connection-id)))
|
||||
(set! active-connections new-connections)
|
||||
(for ([c active-connections]) (thread-send (car c) `(departed ,connection-id))))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(main)
|
|
@ -0,0 +1,36 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/match)
|
||||
|
||||
(require "os2.rkt")
|
||||
(require (prefix-in base: "os2-tcp.rkt"))
|
||||
|
||||
(provide (rename-out [base:tcp-driver tcp-driver])
|
||||
(rename-out [base:tcp-spy tcp-spy])
|
||||
|
||||
tcp-listener
|
||||
tcp-accept
|
||||
|
||||
(rename-out [base:tcp-channel tcp-channel])
|
||||
(rename-out [base:tcp-credit tcp-credit])
|
||||
(rename-out [base:tcp-mode tcp-mode]))
|
||||
|
||||
;; Returns a topic that can be subscribed to in order to be told about
|
||||
;; conversations coming and going relating to TCP connections arriving
|
||||
;; at a local server socket. The server socket is created implicitly
|
||||
;; as the result of subscribing to this topic.
|
||||
(define (tcp-listener port)
|
||||
(topic-subscriber (base:tcp-channel (wild) (base:tcp-listener port) (wild))
|
||||
#:monitor? #t))
|
||||
|
||||
;; Given the topic of a newly-arriving connection, creates helper
|
||||
;; functions that create communication requests relating to that
|
||||
;; connection, and also creates topics useful for hearing
|
||||
;; communications relating to that connection.
|
||||
(define (tcp-accept new-topic)
|
||||
(match-define (topic _ (base:tcp-channel remote-addr local-addr _) _) new-topic)
|
||||
(define (cin feedback) (send-feedback (base:tcp-channel remote-addr local-addr feedback)))
|
||||
(define (cout data) (send-message (base:tcp-channel local-addr remote-addr data)))
|
||||
(define in-topic (topic-subscriber (base:tcp-channel remote-addr local-addr (wild))))
|
||||
(define out-topic (topic-publisher (base:tcp-channel local-addr remote-addr (wild))))
|
||||
(values cin cout in-topic out-topic))
|
Loading…
Reference in New Issue