Delimited-continuation based threaded style

This commit is contained in:
Tony Garnock-Jones 2014-06-02 16:36:55 -04:00
parent 52faa78444
commit a572f270f0
4 changed files with 225 additions and 0 deletions

View File

@ -0,0 +1,24 @@
#lang minimart
(require (only-in racket/port read-bytes-line-evt))
(require "../drivers/tcp.rkt")
(require "../userland.rkt")
(define local-handle (tcp-handle 'chat))
(define remote-handle (tcp-address "localhost" 5999))
(spawn-tcp-driver)
(userland-thread
#:gestalt (gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1)
(sub (tcp-channel remote-handle local-handle ?))
(pub (tcp-channel local-handle remote-handle ?))
(pub (tcp-channel local-handle remote-handle ?) #:level 1))
(wait-for-gestalt (pub (tcp-channel local-handle remote-handle ?) #:level 1))
(let loop ()
(match (next-event)
[(message (event _ (list (? eof-object?))) 1 #f) (do (quit))]
[(message (event _ (list line)) 1 #f) (do (send (tcp-channel local-handle remote-handle line)))]
[(message (tcp-channel _ _ bs) 0 #f) (write-bytes bs) (flush-output)]
[(routing-update g) (when (gestalt-empty? g) (do (quit)))])
(loop)))

View File

@ -0,0 +1,50 @@
#lang minimart
(require racket/set)
(require (only-in racket/string string-trim))
(require "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")
(require "../userland.rkt")
(define (spawn-session them us)
(define user (gensym 'user))
(define (send-to-remote fmt . vs)
(do (send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define (say who fmt . vs)
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
(define remote-detector (pub #:meta-level 1 #:level 1 (tcp-channel us them ?)))
(userland-thread #:gestalt (gestalt-union (sub `(,? says ,?))
(sub `(,? says ,?) #:level 1)
(pub `(,user says ,?))
(sub (tcp-channel them us ?) #:meta-level 1)
(pub (tcp-channel us them ?) #:meta-level 1)
remote-detector)
(wait-for-gestalt remote-detector)
(send-to-remote "Welcome, ~a.\n" user)
(let loop ((old-peers (set)))
(match (next-event)
[(message (tcp-channel _ _ bs) 1 #f)
(do (send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))
(loop old-peers)]
[(message `(,who says ,what) 0 #f)
(say who "says: ~a" what)
(loop old-peers)]
[(routing-update g)
(define remote-present? (not (gestalt-empty? (gestalt-filter g remote-detector))))
(define new-peers (matcher-key-set/single
(gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?)))))
(when (not remote-present?) (do (quit)))
(for/list [(who (set-subtract new-peers old-peers))] (say who "arrived."))
(for/list [(who (set-subtract old-peers new-peers))] (say who "departed."))
(loop new-peers)]))))
(spawn-tcp-driver)
(spawn-world
(spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?)
#:meta-level 1
#:demand-is-subscription? #f
spawn-session))

View File

@ -0,0 +1,57 @@
#lang minimart
(require racket/set)
(require (only-in racket/string string-trim))
(require "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")
(require "../userland.rkt")
(define (spawn-session them us)
(define (send-to-remote fmt . vs)
(do (send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1 #:level 1)
(pub (tcp-channel us them ?) #:meta-level 1)
(sub (tcp-channel them us ?) #:meta-level 1)))
(define (decode-input bs) (string-trim (bytes->string/utf-8 bs)))
(define (read-chunk) (receive [(message (tcp-channel _ _ bs) 1 #f) (decode-input bs)]))
(userland-thread #:gestalt tcp-gestalt
(wait-for-gestalt tcp-gestalt)
(send-to-remote "Welcome. What is your name? > ")
(define user (read-chunk))
(do (routing-update (gestalt-union tcp-gestalt
(sub `(,? says ,?) #:level 1)
(sub `(,? says ,?))
(pub `(,user says ,?)))))
(define (say who fmt . vs)
(unless (equal? who user)
(send-to-remote "~a ~a\n" who (apply format fmt vs))))
(let loop ((old-peers (set)))
(match (next-event)
[(message (tcp-channel _ _ bs) 1 #f)
(do (send `(,user says ,(decode-input bs))))
(loop old-peers)]
[(message `(,who says ,what) 0 #f)
(say who "says: ~a" what)
(loop old-peers)]
[(routing-update g)
(define remote-present? (not (gestalt-empty? (gestalt-filter g tcp-gestalt))))
(define new-peers (matcher-key-set/single
(gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?)))))
(when (not remote-present?) (do (quit)))
(for/list [(who (set-subtract new-peers old-peers))] (say who "arrived."))
(for/list [(who (set-subtract old-peers new-peers))] (say who "departed."))
(loop new-peers)]))))
(spawn-tcp-driver)
(spawn-world
(spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?)
#:meta-level 1
#:demand-is-subscription? #f
spawn-session))

94
minimart/userland.rkt Normal file
View File

@ -0,0 +1,94 @@
#lang racket/base
(require (for-syntax syntax/parse))
(require (for-syntax racket/base))
(require racket/match)
(require "main.rkt")
(require "functional-queue.rkt")
(provide userland-thread
receive
do
next-event
all-queued-events
pushback-events!
wait-for-gestalt)
(struct do-command (actions k) #:transparent)
(struct receive-command (single? k) #:transparent)
(struct pushback-command (events k) #:transparent)
(define-syntax userland-thread
(lambda (stx)
(syntax-parse stx
[(_ (~or (~optional (~seq #:gestalt g) #:defaults ([g #'(gestalt-empty)]) #:name "#:gestalt")) ...
body ...)
#`(spawn-userland* (lambda () body ...) g)])))
(define (spawn-userland* main [initial-gestalt (gestalt-empty)])
(spawn (lambda (e k) (k e))
(lambda (first-event)
(interpret-command (make-queue)
(list->queue (list first-event))
((reply-to (lambda (dummy)
(main)
(do (quit))))
(void))))
initial-gestalt))
(define-syntax-rule (receive [pat clausebody ...] ...)
(receive* (lambda (e sentinel) (match e [pat clausebody ...] ... [_ sentinel]))))
(define sentinel (cons 'sentinel '()))
(define (receive* f)
(let loop ((events (all-queued-events)) (discarded-rev '()))
(match events
[(cons e rest)
(call-with-values (lambda () (f e sentinel))
(lambda vs
(if (equal? vs (list sentinel))
(loop rest (cons e discarded-rev))
(begin (pushback-events! (reverse discarded-rev))
(apply values vs)))))]
['()
(loop (all-queued-events) discarded-rev)])))
(define (wait-for-gestalt probe)
(receive [(routing-update g)
(if (gestalt-empty? (gestalt-filter g probe))
(wait-for-gestalt probe)
g)]))
(define (do . actions) (call-in-raw-context (lambda (k) (do-command actions k))))
(define (next-event) (call-in-raw-context (lambda (k) (receive-command #t k))))
(define (all-queued-events) (call-in-raw-context (lambda (k) (receive-command #f k))))
(define (pushback-events! events) (call-in-raw-context (lambda (k) (pushback-command events k))))
(define prompt (make-continuation-prompt-tag 'minimart-userland))
(define (reply-to k)
(lambda (reply)
(call-with-continuation-prompt (lambda () (k reply)) prompt)))
(define (call-in-raw-context proc)
(call-with-composable-continuation
(lambda (k) (abort-current-continuation prompt (lambda () (proc (reply-to k)))))
prompt))
(define (interpret-command actions events command)
(match command
[(do-command new-action-chunk k)
(interpret-command (enqueue actions new-action-chunk) events (k (void)))]
[(receive-command single? k)
(cond
[(queue-empty? events)
(transition (lambda (e) (and e (interpret-command (make-queue) (list->queue (list e)) command)))
(queue->list actions))]
[single?
(define-values (e rest) (dequeue events))
(interpret-command actions rest (k e))]
[else
(interpret-command actions (make-queue) (k (queue->list events)))])]
[(pushback-command events-to-push k)
(interpret-command actions (queue-append (list->queue events-to-push) events) (k (void)))]))