Merge branch 'master' of vapour:racket-matrix

This commit is contained in:
Tony Garnock-Jones 2012-05-09 13:44:34 -04:00
commit 7d515e4e58
3 changed files with 124 additions and 54 deletions

View File

@ -3,4 +3,4 @@
(mapcar #'(lambda (x) (put x 'scheme-indent-function 1))
'(transition extend-transition))
(mapcar #'(lambda (x) (put x 'scheme-indent-function 2))
'(role yield)))
'(role role/fresh yield)))

View File

@ -62,7 +62,8 @@
[(set-member? active-handles local-addr) active-handles]
[else
(transition (set-add active-handles local-addr)
(spawn (udp-socket-manager local-addr)))])]))
(spawn (udp-socket-manager local-addr)
#:debug-name (list 'udp-socket local-addr)))])]))
(role 'handle-mapping-reaper
(topic-subscriber (handle-mapping (wild) (wild)) #:virtual? #t)
#:state active-handles
@ -71,10 +72,16 @@
[(topic _ (handle-mapping local-addr socket) _)
(transition (set-remove active-handles local-addr))]))))
(define (bind-socket! s local-addr)
(cond
[(udp-listener? local-addr) (udp-bind! s #f (udp-listener-port local-addr))]
[(udp-handle? local-addr) (udp-bind! s #f 0)]
[else (void)]))
;; UdpAddress -> BootK
(define ((udp-socket-manager local-addr) self-pid)
(define s (udp-open-socket #f #f))
(when (udp-listener? local-addr) (udp-bind! s #f (udp-listener-port local-addr)))
(bind-socket! s local-addr)
(define buffer (make-bytes 65536)) ;; TODO: buffer size control
(transition 'socket-is-open
;; Offers a handle-mapping on the local network so that the
@ -96,7 +103,8 @@
(when (eq? state 'socket-is-open)
(spawn (lambda (dummy-pid)
(udp-close s)
(transition 'dummy (kill))))))
(transition 'dummy (kill)))
#:debug-name (list 'udp-socket-closer local-addr))))
[(udp-packet (== local-addr) (udp-address remote-host remote-port) body)
(udp-send-to s remote-host remote-port body)
state])

162
os2.rkt
View File

@ -23,6 +23,7 @@
extend-transition
role
role/fresh
(except-out (struct-out add-role) add-role)
(rename-out [make-add-role add-role])
(except-out (struct-out delete-role) delete-role)
@ -57,7 +58,10 @@
non-wild?
;; Reexports from racket/match for convenience
(all-from-out racket/match))
(all-from-out racket/match)
;; For debugging
current-ground-transition)
;; Endpoints are the units of deduplication.
;; Flows (in canonical form) are the units of presence.
@ -93,7 +97,8 @@
;; A QuasiQueue<X> is a list of Xs in *reversed* order.
(struct vm (processes ;; Hash<PID, Process>
(struct vm (name ;; Any - for debugging complex VM trees
processes ;; Hash<PID, Process>
endpoints ;; Hash<EID, Endpoint>
next-process-id ;; PID
pending-actions ;; QuasiQueue<(cons PID Action)>
@ -104,9 +109,10 @@
;; topic) in a conversation.
(struct endpoint (id topics handlers) #:transparent)
;; A Process is an Exists State . (process PID State Set<EID>),
;; representing a VM process and its collection of active endpoints.
(struct process (id state endpoints) #:transparent)
;; A Process is an Exists State . (process Any PID State Set<EID>
;; Set<EID>), representing a VM process and its collection of active
;; endpoints at this level and at the VM's container's level.
(struct process (name id state endpoints meta-endpoints) #:transparent)
;; A Topic is a (topic Role Pattern Boolean), describing an Endpoint's
;; role in a conversation.
@ -137,8 +143,8 @@
;; (send-message Any Role)
(struct send-message (body role) #:prefab)
;;
;; (spawn BootK Maybe<TrapK<PID>>)
(struct spawn (main k) #:prefab)
;; (spawn BootK Maybe<TrapK<PID>> Any)
(struct spawn (main k debug-name) #:prefab)
;;
;; (kill Maybe<PID> Any)
(struct kill (pid reason) #:prefab)
@ -184,6 +190,10 @@
topics-expr
(handlers presence-handler absence-handler message-handler)))])))
(define-syntax-rule (role/fresh pre-eid-var rest ...)
(let ((pre-eid-var (gensym 'role)))
(role pre-eid-var rest ...)))
(define-syntax-rule (yield-macro #:state state-pattern body ...)
(yield (match-lambda [state-pattern body ...])))
@ -194,7 +204,7 @@
(define make-add-role add-role) ;; no special treatment required at present
(define (make-delete-role pre-eid [reason #f]) (delete-role pre-eid reason))
(define (make-send-message body [role 'publisher]) (send-message body role))
(define (make-spawn main [k #f]) (spawn main k))
(define (make-spawn main [k #f] #:debug-name [debug-name #f]) (spawn main k debug-name))
(define (make-kill [pid #f] #:reason [reason #f]) (kill pid reason))
(define (extend-transition t . more-actions)
@ -248,11 +258,12 @@
;;---------------------------------------------------------------------------
;; Core virtualizable virtual machine.
(define (make-vm boot)
(vm (hash)
(define (make-vm name boot)
(vm name
(hash)
(hash)
0
(list (cons -1 (spawn boot #f)))))
(list (cons -1 (spawn boot #f 'primordial-process)))))
(define (run-vm state)
(let loop ((remaining-actions (reverse (vm-pending-actions state)))
@ -267,22 +278,32 @@
[(cons (cons pid action) rest)
(match action
[(at-meta-level preaction)
(define transformed-preaction (transform-meta-action pid preaction))
(loop rest state (cons transformed-preaction outbound-actions))]
(let-values (((transformed-preaction state) (transform-meta-action pid preaction state)))
(loop rest state (cons transformed-preaction outbound-actions)))]
[(yield k)
(loop rest (run-ready state pid k) outbound-actions)]
[preaction
(loop rest (perform-action pid preaction state) outbound-actions)])])))
(let-values (((new-outbound-actions-rev state) (perform-action pid preaction state)))
(loop rest state (append new-outbound-actions-rev outbound-actions)))])])))
(define (vm-idle? state)
(null? (vm-pending-actions state)))
(define (collect-dead-processes state)
;; dns-read-driver is being collected because it only has a metarole.%%%
(define (process-alive? pid p)
(or (not (set-empty? (process-endpoints p)))
(not (set-empty? (process-meta-endpoints p)))
(ormap (lambda (entry) (= (car entry) pid))
(vm-pending-actions state))))
(struct-copy vm state
[processes (for/hash ([(pid p) (in-hash (vm-processes state))]
#:when (or (not (set-empty? (process-endpoints p)))
(ormap (lambda (entry) (= (car entry) pid))
(vm-pending-actions state))))
#:when (or (process-alive? pid p)
(begin (printf "~a PID ~v (~a) garbage-collected~n"
(vm-name state)
pid
(process-name p))
#f)))
(values pid p))]))
(define (send-to-user failure-proc f . args)
@ -299,11 +320,16 @@
(define (perform-action pid preaction state)
(match preaction
[(add-role pre-eid topics hs) (do-subscribe pid pre-eid (ensure-topic-union topics) hs state)]
[(delete-role pre-eid reason) (do-unsubscribe pid pre-eid reason state)]
[(send-message body role) (route-and-deliver role body state)]
[(spawn main k) (do-spawn pid main k state)]
[(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)]))
[(add-role pre-eid topics hs)
(values '() (do-subscribe pid pre-eid (ensure-topic-union topics) hs state))]
[(delete-role pre-eid reason)
(values '() (do-unsubscribe pid pre-eid reason state))]
[(send-message body role)
(values '() (route-and-deliver role body state))]
[(spawn main k debug-name)
(values '() (do-spawn pid main k debug-name state))]
[(kill pid-to-kill reason)
(do-kill (or pid-to-kill pid) reason state)]))
(define (do-subscribe pid pre-eid topics hs state)
(cond
@ -322,9 +348,15 @@
(define ((add-process-eid new-eid) p)
(struct-copy process p [endpoints (set-add (process-endpoints p) new-eid)]))
(define ((add-process-meta-eid new-eid) p)
(struct-copy process p [meta-endpoints (set-add (process-meta-endpoints p) new-eid)]))
(define ((remove-process-eid old-eid) p)
(struct-copy process p [endpoints (set-remove (process-endpoints p) old-eid)]))
(define ((remove-process-meta-eid old-eid) p)
(struct-copy process p [meta-endpoints (set-remove (process-meta-endpoints p) old-eid)]))
(define (install-endpoint state new-eid new-endpoint)
(struct-copy vm state [endpoints (hash-set (vm-endpoints state) new-eid new-endpoint)]))
@ -394,36 +426,52 @@
(match-define (endpoint (eid pid _) _ (handlers _ _ message-handler)) e)
(run-trapk state pid message-handler message-topic body)))
(define (do-spawn spawning-pid main k state)
(define (do-spawn spawning-pid main k debug-name state)
(define new-pid (vm-next-process-id state))
(define new-name (or debug-name new-pid))
(match-define (transition initial-state initial-actions)
(cond
[(procedure? main) (send-to-user (lambda (e) (transition #f (kill #f e))) main new-pid)]
[(transition? main) main]))
(define initial-process (process new-pid initial-state (set)))
(define initial-process (process new-name new-pid initial-state (set) (set)))
(define spawned-state
(struct-copy vm (enqueue-actions state new-pid initial-actions)
[processes (hash-set (vm-processes state) new-pid initial-process)]
[next-process-id (+ new-pid 1)]))
(printf "~a PID ~v (~a) started~n" (vm-name state) new-pid new-name)
(run-trapk spawned-state spawning-pid k new-pid))
(define (print-kill pid-to-kill reason)
(define (print-kill vm-name pid-to-kill process-name reason)
(cond
[(eq? reason #f) (printf "PID ~v exited normally~n" pid-to-kill)]
[(eq? reason #f) (printf "~a PID ~v (~a) exited normally~n"
vm-name
pid-to-kill
process-name)]
[(exn? reason) ((error-display-handler)
(format "PID ~v exited with exception~n~a" pid-to-kill (exn-message reason))
(format "~a PID ~v (~a) exited with exception~n~a"
vm-name
pid-to-kill
process-name
(exn-message reason))
reason)]
[else (printf "PID ~v exited with reason: ~a~n" pid-to-kill reason)]))
[else (printf "~a PID ~v (~a) exited with reason: ~a~n"
vm-name
pid-to-kill
process-name
reason)]))
(define (do-kill pid-to-kill reason state)
(cond
[(hash-has-key? (vm-processes state) pid-to-kill)
(print-kill pid-to-kill reason)
(define dying-endpoints (process-endpoints (hash-ref (vm-processes state) pid-to-kill)))
(let* ((state (for/fold ([state state]) ([eid (in-set dying-endpoints)])
(do-unsubscribe pid-to-kill (eid-pre-eid eid) reason state))))
(struct-copy vm state [processes (hash-remove (vm-processes state) pid-to-kill)]))]
[else state]))
(define dying-process (hash-ref (vm-processes state) pid-to-kill))
(print-kill (vm-name state) pid-to-kill (process-name dying-process) reason)
(let* ((state (for/fold ([state state]) ([eid (in-set (process-endpoints dying-process))])
(do-unsubscribe pid-to-kill (eid-pre-eid eid) reason state)))
(new-outbound-actions (for/list ([eid (in-set (process-meta-endpoints dying-process))])
(delete-role eid reason))))
(values new-outbound-actions
(struct-copy vm state [processes (hash-remove (vm-processes state) pid-to-kill)])))]
[else (values '() state)]))
(define (run-trapk state pid trap-k . args)
(if trap-k
@ -472,29 +520,41 @@
[pending-actions (append (reverse flat-actions) (vm-pending-actions state))]))
(define (((wrap-trapk pid trapk) . args) state)
(apply run-trapk state pid trapk args))
(if (hash-has-key? (vm-processes state) pid)
(run-vm (apply run-trapk state pid trapk args))
state))
(define (transform-meta-action pid preaction)
(define (transform-meta-action pid preaction state)
(match preaction
[(add-role pre-eid topics hs)
(add-role (eid pid pre-eid)
topics
(handlers (wrap-trapk pid (handlers-presence hs))
(wrap-trapk pid (handlers-absence hs))
(wrap-trapk pid (handlers-message hs))))]
(define new-eid (eid pid pre-eid))
(values (add-role new-eid
topics
(handlers (wrap-trapk pid (handlers-presence hs))
(wrap-trapk pid (handlers-absence hs))
(wrap-trapk pid (handlers-message hs))))
(if (hash-has-key? (vm-processes state) pid)
(generic-update-process state pid (add-process-meta-eid new-eid))
state))]
[(delete-role pre-eid reason)
(delete-role (eid pid pre-eid) reason)]
[(? send-message? p) p]
[(spawn main k)
(spawn main (wrap-trapk pid k))]
[(? kill? p) p]))
(define old-eid (eid pid pre-eid))
(values (delete-role old-eid reason)
(if (hash-has-key? (vm-processes state) pid)
(generic-update-process state pid (remove-process-meta-eid old-eid))
state))]
[(? send-message? p)
(values p state)]
[(spawn main k debug-name)
(values (spawn main (wrap-trapk pid k) debug-name) state)]
[(? kill? p)
(values p state)]))
(define (nested-vm boot)
(lambda (self-pid) (run-vm (make-vm boot))))
(define (nested-vm name boot)
(lambda (self-pid) (run-vm (make-vm name boot))))
(define (ground-vm boot)
(let loop ((state (make-vm boot)))
(match (run-vm state)
(let loop ((state (make-vm 'ground-vm boot)))
(match (let ((t (run-vm state))) (set! current-ground-transition t) t)
[(transition state actions)
(define is-blocking?
(match actions
@ -521,3 +581,5 @@
(wrap-evt always-evt (lambda (dummy) values)))
active-events)))
(loop (interruptk state))))])))
(define current-ground-transition #f)