From a572f270f0759b83c7ab13f8eb707b933cb4c829 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 2 Jun 2014 16:36:55 -0400 Subject: [PATCH] Delimited-continuation based threaded style --- minimart/examples/chat-client-userland.rkt | 24 ++++++ minimart/examples/chat-userland.rkt | 50 ++++++++++++ minimart/examples/chat-userland2.rkt | 57 +++++++++++++ minimart/userland.rkt | 94 ++++++++++++++++++++++ 4 files changed, 225 insertions(+) create mode 100644 minimart/examples/chat-client-userland.rkt create mode 100644 minimart/examples/chat-userland.rkt create mode 100644 minimart/examples/chat-userland2.rkt create mode 100644 minimart/userland.rkt diff --git a/minimart/examples/chat-client-userland.rkt b/minimart/examples/chat-client-userland.rkt new file mode 100644 index 0000000..2aede1a --- /dev/null +++ b/minimart/examples/chat-client-userland.rkt @@ -0,0 +1,24 @@ +#lang minimart + +(require (only-in racket/port read-bytes-line-evt)) +(require "../drivers/tcp.rkt") +(require "../userland.rkt") + +(define local-handle (tcp-handle 'chat)) +(define remote-handle (tcp-address "localhost" 5999)) + +(spawn-tcp-driver) + +(userland-thread + #:gestalt (gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1) + (sub (tcp-channel remote-handle local-handle ?)) + (pub (tcp-channel local-handle remote-handle ?)) + (pub (tcp-channel local-handle remote-handle ?) #:level 1)) + (wait-for-gestalt (pub (tcp-channel local-handle remote-handle ?) #:level 1)) + (let loop () + (match (next-event) + [(message (event _ (list (? eof-object?))) 1 #f) (do (quit))] + [(message (event _ (list line)) 1 #f) (do (send (tcp-channel local-handle remote-handle line)))] + [(message (tcp-channel _ _ bs) 0 #f) (write-bytes bs) (flush-output)] + [(routing-update g) (when (gestalt-empty? g) (do (quit)))]) + (loop))) diff --git a/minimart/examples/chat-userland.rkt b/minimart/examples/chat-userland.rkt new file mode 100644 index 0000000..f3815eb --- /dev/null +++ b/minimart/examples/chat-userland.rkt @@ -0,0 +1,50 @@ +#lang minimart + +(require racket/set) +(require (only-in racket/string string-trim)) +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") +(require "../userland.rkt") + +(define (spawn-session them us) + (define user (gensym 'user)) + + (define (send-to-remote fmt . vs) + (do (send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))))) + + (define (say who fmt . vs) + (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) + + (define remote-detector (pub #:meta-level 1 #:level 1 (tcp-channel us them ?))) + + (userland-thread #:gestalt (gestalt-union (sub `(,? says ,?)) + (sub `(,? says ,?) #:level 1) + (pub `(,user says ,?)) + (sub (tcp-channel them us ?) #:meta-level 1) + (pub (tcp-channel us them ?) #:meta-level 1) + remote-detector) + (wait-for-gestalt remote-detector) + (send-to-remote "Welcome, ~a.\n" user) + (let loop ((old-peers (set))) + (match (next-event) + [(message (tcp-channel _ _ bs) 1 #f) + (do (send `(,user says ,(string-trim (bytes->string/utf-8 bs))))) + (loop old-peers)] + [(message `(,who says ,what) 0 #f) + (say who "says: ~a" what) + (loop old-peers)] + [(routing-update g) + (define remote-present? (not (gestalt-empty? (gestalt-filter g remote-detector)))) + (define new-peers (matcher-key-set/single + (gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?))))) + (when (not remote-present?) (do (quit))) + (for/list [(who (set-subtract new-peers old-peers))] (say who "arrived.")) + (for/list [(who (set-subtract old-peers new-peers))] (say who "departed.")) + (loop new-peers)])))) + +(spawn-tcp-driver) +(spawn-world + (spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?) + #:meta-level 1 + #:demand-is-subscription? #f + spawn-session)) diff --git a/minimart/examples/chat-userland2.rkt b/minimart/examples/chat-userland2.rkt new file mode 100644 index 0000000..e7f085a --- /dev/null +++ b/minimart/examples/chat-userland2.rkt @@ -0,0 +1,57 @@ +#lang minimart + +(require racket/set) +(require (only-in racket/string string-trim)) +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") +(require "../userland.rkt") + +(define (spawn-session them us) + (define (send-to-remote fmt . vs) + (do (send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))))) + + (define tcp-gestalt (gestalt-union (pub (tcp-channel us them ?) #:meta-level 1 #:level 1) + (pub (tcp-channel us them ?) #:meta-level 1) + (sub (tcp-channel them us ?) #:meta-level 1))) + + (define (decode-input bs) (string-trim (bytes->string/utf-8 bs))) + (define (read-chunk) (receive [(message (tcp-channel _ _ bs) 1 #f) (decode-input bs)])) + + (userland-thread #:gestalt tcp-gestalt + + (wait-for-gestalt tcp-gestalt) + (send-to-remote "Welcome. What is your name? > ") + (define user (read-chunk)) + + (do (routing-update (gestalt-union tcp-gestalt + (sub `(,? says ,?) #:level 1) + (sub `(,? says ,?)) + (pub `(,user says ,?))))) + + (define (say who fmt . vs) + (unless (equal? who user) + (send-to-remote "~a ~a\n" who (apply format fmt vs)))) + + (let loop ((old-peers (set))) + (match (next-event) + [(message (tcp-channel _ _ bs) 1 #f) + (do (send `(,user says ,(decode-input bs)))) + (loop old-peers)] + [(message `(,who says ,what) 0 #f) + (say who "says: ~a" what) + (loop old-peers)] + [(routing-update g) + (define remote-present? (not (gestalt-empty? (gestalt-filter g tcp-gestalt)))) + (define new-peers (matcher-key-set/single + (gestalt-project g 0 0 #t (compile-gestalt-projection `(,(?!) says ,?))))) + (when (not remote-present?) (do (quit))) + (for/list [(who (set-subtract new-peers old-peers))] (say who "arrived.")) + (for/list [(who (set-subtract old-peers new-peers))] (say who "departed.")) + (loop new-peers)])))) + +(spawn-tcp-driver) +(spawn-world + (spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?) + #:meta-level 1 + #:demand-is-subscription? #f + spawn-session)) diff --git a/minimart/userland.rkt b/minimart/userland.rkt new file mode 100644 index 0000000..f84f950 --- /dev/null +++ b/minimart/userland.rkt @@ -0,0 +1,94 @@ +#lang racket/base + +(require (for-syntax syntax/parse)) +(require (for-syntax racket/base)) + +(require racket/match) +(require "main.rkt") +(require "functional-queue.rkt") + +(provide userland-thread + receive + do + next-event + all-queued-events + pushback-events! + wait-for-gestalt) + +(struct do-command (actions k) #:transparent) +(struct receive-command (single? k) #:transparent) +(struct pushback-command (events k) #:transparent) + +(define-syntax userland-thread + (lambda (stx) + (syntax-parse stx + [(_ (~or (~optional (~seq #:gestalt g) #:defaults ([g #'(gestalt-empty)]) #:name "#:gestalt")) ... + body ...) + #`(spawn-userland* (lambda () body ...) g)]))) + +(define (spawn-userland* main [initial-gestalt (gestalt-empty)]) + (spawn (lambda (e k) (k e)) + (lambda (first-event) + (interpret-command (make-queue) + (list->queue (list first-event)) + ((reply-to (lambda (dummy) + (main) + (do (quit)))) + (void)))) + initial-gestalt)) + +(define-syntax-rule (receive [pat clausebody ...] ...) + (receive* (lambda (e sentinel) (match e [pat clausebody ...] ... [_ sentinel])))) + +(define sentinel (cons 'sentinel '())) +(define (receive* f) + (let loop ((events (all-queued-events)) (discarded-rev '())) + (match events + [(cons e rest) + (call-with-values (lambda () (f e sentinel)) + (lambda vs + (if (equal? vs (list sentinel)) + (loop rest (cons e discarded-rev)) + (begin (pushback-events! (reverse discarded-rev)) + (apply values vs)))))] + ['() + (loop (all-queued-events) discarded-rev)]))) + +(define (wait-for-gestalt probe) + (receive [(routing-update g) + (if (gestalt-empty? (gestalt-filter g probe)) + (wait-for-gestalt probe) + g)])) + +(define (do . actions) (call-in-raw-context (lambda (k) (do-command actions k)))) +(define (next-event) (call-in-raw-context (lambda (k) (receive-command #t k)))) +(define (all-queued-events) (call-in-raw-context (lambda (k) (receive-command #f k)))) +(define (pushback-events! events) (call-in-raw-context (lambda (k) (pushback-command events k)))) + +(define prompt (make-continuation-prompt-tag 'minimart-userland)) + +(define (reply-to k) + (lambda (reply) + (call-with-continuation-prompt (lambda () (k reply)) prompt))) + +(define (call-in-raw-context proc) + (call-with-composable-continuation + (lambda (k) (abort-current-continuation prompt (lambda () (proc (reply-to k))))) + prompt)) + +(define (interpret-command actions events command) + (match command + [(do-command new-action-chunk k) + (interpret-command (enqueue actions new-action-chunk) events (k (void)))] + [(receive-command single? k) + (cond + [(queue-empty? events) + (transition (lambda (e) (and e (interpret-command (make-queue) (list->queue (list e)) command))) + (queue->list actions))] + [single? + (define-values (e rest) (dequeue events)) + (interpret-command actions rest (k e))] + [else + (interpret-command actions (make-queue) (k (queue->list events)))])] + [(pushback-command events-to-push k) + (interpret-command actions (queue-append (list->queue events-to-push) events) (k (void)))]))