diff --git a/presence/blocking-box.rkt b/presence/blocking-box.rkt new file mode 100644 index 0000000..d005af8 --- /dev/null +++ b/presence/blocking-box.rkt @@ -0,0 +1,38 @@ +#lang racket/base +;; A box whose value can only be set once, that starts life with no +;; value, and that supports an event waiting for a value to arrive. + +(provide make-blocking-box + blocking-box-evt + blocking-box-value + set-blocking-box!) + +(struct blocking-box (thread set-ch get-ch)) + +(define (make-blocking-box) + (define set-ch (make-channel)) + (define get-ch (make-channel)) + (blocking-box (thread/suspend-to-kill (lambda () (manager set-ch get-ch))) + set-ch + get-ch)) + +(define (manager s g) + (define v (channel-get s)) + (let loop () + (sync s (channel-put-evt g v)) ;; ignore any future settings, answer all future gettings + (loop))) + +(define (blocking-box-evt b) + (guard-evt + (lambda () + ;; Ensure the manager is running within our custodian: + (thread-resume (blocking-box-thread b) (current-thread)) + (blocking-box-get-ch b)))) + +(define (blocking-box-value b) + (sync (blocking-box-evt b))) + +(define (set-blocking-box! b v) + ;; Ensure the manager is running within our custodian: + (thread-resume (blocking-box-thread b) (current-thread)) + (channel-put (blocking-box-set-ch b) v)) diff --git a/presence/conversation-socket.rkt b/presence/conversation-socket.rkt new file mode 100644 index 0000000..49a0197 --- /dev/null +++ b/presence/conversation-socket.rkt @@ -0,0 +1,143 @@ +#lang racket/base + +(require racket/port) +(require racket/class) +(require racket/match) +(require racket/tcp) +(require "conversation.rkt") + +(provide (struct-out set-options) + tcp-server-actor + tcp-client-actor) + +(struct set-options (new-values) #:prefab) + +(define (socket-name role s) + (define-values (local-ip local-port remote-ip remote-port) + (tcp-addresses s #t)) + (list role local-ip local-port remote-ip remote-port)) + +(define (option-value options key [missing-value #f]) + (cond + ((assq key options) => cadr) + (else missing-value))) + +(define (tcp-server-actor room options . tcp-listener-args) + (define listener (apply tcp-listen tcp-listener-args)) + (define name (socket-name 'listener listener)) + (thread (lambda () + (define handle (join-room room name)) + (log-info (format "Listening on ~v" name)) + (let loop ((owner #f) + (remaining-credit (option-value options 'initial-accept-credit 0))) + (sync (handle-evt (send handle disconnected-evt) + (lambda (reason) + (log-error (format "~v: conversation closed: ~v" name reason)) + (tcp-close listener))) + (handle-evt (send handle listen-evt) + (match-lambda + ((arrived who) + (log-info (format "~v: New owner: ~v" name who)) + (loop who remaining-credit)) + ((departed who why) + (if (equal? owner who) + (begin (log-info (format "~v: Owner departed, closing" name)) + (tcp-close listener)) + (loop owner remaining-credit))) + ((says _ (credit _ amount) _) + (define new-credit (+ remaining-credit amount)) + (log-info (format "~v: Credit now ~v" name new-credit)) + (loop owner new-credit)) + (unexpected + (log-warning (format "~v: Ignoring message: ~v" name unexpected)) + (loop owner remaining-credit)))) + (if (positive? remaining-credit) + (handle-evt (tcp-accept-evt listener) + (match-lambda + ((list i o) + (send handle say + (tcp-socket-actor 'inbound-connection options i o) + 'accepted) + (loop owner (- remaining-credit 1))))) + never-evt))))) + room) + +(define (tcp-client-actor room options . tcp-connect-args) + (define-values (i o) (apply tcp-connect tcp-connect-args)) + (tcp-socket-actor 'outbound-connection options i o)) + +(define (tcp-socket-actor role options i o [room (make-room)]) + (define name (socket-name role i)) + (define (close-ports) + (close-input-port i) + (close-output-port o)) + (define (compute-terminator options) + ;; See read-line-evt and friends. + (option-value options 'line-terminator 'any)) + (define (compute-read-evt options) + (define read-mode (option-value options 'read-mode 'bytes)) + (case read-mode + ((bytes) (values (lambda (credit) (read-bytes-evt credit i)) + bytes-length)) + ((lines) (values (lambda (credit) (read-line-evt i (compute-terminator options))) + (lambda (v) 1))) + ((bytes-lines) (values (lambda (credit) (read-bytes-line-evt i (compute-terminator options))) + (lambda (v) 1))) + (else (error 'tcp-socket-actor "Illegal read-evt mode ~v" read-mode)))) + (thread (lambda () + (define handle (join-room room name)) + (log-info (format "~v: New connection" name)) + (with-handlers + ((exn? (lambda (e) + (close-ports) + (raise e)))) + (let loop ((options options) + (peer-count 0) + (remaining-credit (option-value options 'initial-read-credit 0))) + ;;(write `(connection-loop ,options ,peer-count ,remaining-credit)) (newline) + (sync (handle-evt (send handle disconnected-evt) + (lambda (reason) + (log-error (format "~v: conversation closed: ~v" name reason)))) + (handle-evt (send handle listen-evt) + (match-lambda + ((arrived _) + (loop options (+ peer-count 1) remaining-credit)) + ((departed _ _) + (if (= peer-count 1) + (log-info (format "~v: Last peer departed" name)) + (loop options (- peer-count 1) remaining-credit))) + ((says _ (credit _ amount) _) + (loop options peer-count (+ remaining-credit amount))) + ((says _ (? eof-object?) _) + (close-output-port o) + (loop options peer-count remaining-credit)) + ((says _ (? bytes? bs) _) + ;; TODO: credit flow the other way? + (write-bytes bs o) + (flush-output o) + (loop options peer-count remaining-credit)) + ((says _ (? string? s) _) + ;; TODO: credit flow the other way? + (write-string s o) + (flush-output o) + (loop options peer-count remaining-credit)) + ((says _ (set-options new-values) _) + (loop new-values peer-count remaining-credit)) + (unexpected + (log-warning (format "~v: Ignoring message: ~v" + name unexpected)) + (loop options peer-count remaining-credit)))) + (if (positive? remaining-credit) + (let-values (((e-maker credit-adjuster) (compute-read-evt options))) + (handle-evt (e-maker remaining-credit) + (lambda (v) + (if (eof-object? v) + (begin (send handle say v 'eof) + (loop options peer-count 0)) + (begin (send handle say v 'data) + (loop options peer-count + (- remaining-credit + (credit-adjuster v)))))))) + never-evt))) + (close-ports)))) + room) diff --git a/presence/conversation.rkt b/presence/conversation.rkt new file mode 100644 index 0000000..f8f7f3f --- /dev/null +++ b/presence/conversation.rkt @@ -0,0 +1,331 @@ +#lang racket/base + +(require racket/set) +(require racket/match) +(require racket/class) +(require racket/list) + +(require "functional-queue.rkt") +(require "blocking-box.rkt") +(require "standard-thread.rkt") +(require "unify.rkt") + +(provide make-room + room? + room-name + join-room + + ;; Management and communication + (struct-out arrived) + (struct-out departed) + (struct-out says) + + ;; Presence, Advertisement and Subscription + (struct-out topic) + ) + +(struct arrived (who) #:prefab) ;; someone arrived +(struct departed (who why) #:prefab) ;; someone departed with a reason +(struct says (who what) #:prefab) ;; someone said something + +(struct topic (role pattern) #:prefab) + +;; A flow is an intersection between topic patterns: a substitution +;; produced by unify, roughly. Flows are signalled embedded in +;; appropriate topic records, since they're a kind of filtered form of +;; an upstream topic advertisement. + +;; The variables in an advertised topic are +;; universally-quantified. When we unify two such topics, we're asking +;; for the fewest extra restrictions on those variables that satisfy +;; *both* topic when seen as predicates. The remaining variables are +;; again to be interpreted as universally quantified. + +(struct join-message (member-to-room ;; sync channel + room-to-member ;; sync channel + disconnect-box ;; blocking-box + member-thread ;; thread + exit-status) ;; exit-status structure from standard-thread.rkt + #:prefab) +(struct leave-message (reason) #:prefab) + +(struct room (name ch)) + +(struct room-state (name ch members) #:transparent) +(struct binding (name ;; any + flows-box ;; map from advertised topic to + ;; map from flow to set of (cons advertised-co-topic binding) + in-ch ;; sync channel + out-ch ;; sync channel + disconnect-box ;; blocking-box + queue-box ;; box of functional queue + thread ;; thread + exit-status) ;; maybe exit-status + ;; WARNING: do not make this transparent! The equivalence we + ;; want for bindings is eq?, not equal?, because (sadly) + ;; equal? descends into boxes! If we make this transparent, + ;; then set-add (etc) on anything involving a binding will + ;; take a long time, leading to exponential (?) behaviour on + ;; arrivals. + ) + +(define (make-room [name (gensym 'room)]) + (define ch (make-channel)) + (thread (lambda () (room-main (room-state name ch '())))) + (room name ch)) + +(define (join-room room) + (make-object membership% (room-ch room))) + +(define membership% + (class* object% () + (init room-init) + + (super-new) + + (define room room-init) + (define names (set)) ;; set of all our advertised topics + (define flows (set)) ;; set of all current flows + (define in-ch (make-channel)) + (define out-ch (make-channel)) + (define disconnect-box (make-blocking-box)) + (define connected #t) + (define reason #f) + + (define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box) + (lambda (v) + (set! connected #f) + (set! reason v) + v))) + (channel-put room (join-message out-ch in-ch disconnect-box + (current-thread) + (current-thread-exit-status))) + + (define/public (connected?) + connected) + + (define/public (disconnect-reason) + reason) + + (define/public (disconnected-evt) + the-disconnected-evt) + + (define/public (assert!-evt name) + (define cname (upper-case-symbols->canonical name)) + (choice-evt the-disconnected-evt + (wrap-evt (channel-put-evt out-ch (arrived cname)) + (lambda (v) + (set! names (set-add names cname)))))) + + (define/public (assert! name) + (sync (assert!-evt name))) + + (define/public (retract!-evt name [why #f]) + (define cname (upper-case-symbols->canonical name)) + (choice-evt the-disconnected-evt + (wrap-evt (channel-put-evt out-ch (departed cname why)) + (lambda (v) + (set! names (set-remove names cname)))))) + + (define/public (retract! name) + (sync (retract!-evt name))) + + (define/public (say-evt who what) + (define cname (upper-case-symbols->canonical who)) + (when (not (set-member? names cname)) + ;; TODO: Overly restrictive. Topics of conversation should be + ;; contained by registered topics, not equal to them. + (error 'say "Attempt to speak on unregistered topic ~v" cname)) + (choice-evt the-disconnected-evt + (channel-put-evt out-ch (says cname what)))) + + (define/public (say who what) + (sync (say-evt who what))) + + (define/public (depart-evt [why #f]) + (choice-evt the-disconnected-evt + (wrap-evt (channel-put-evt out-ch (leave-message why)) + (lambda (v) + (set! connected #f) + (set! reason why))))) + + (define/public (depart [why #f]) + (sync (depart-evt why))) + + ;; Maybe<(or arrived departed says)> -> Maybe<(or arrived departed says)> + ;; Identity function with side-effects to update our set of flows as required. + (define (apply-message v) + (match v + ((arrived who) (set! flows (set-add flows who))) + ((departed who _) (set! flows (set-remove flows who))) + (_ (void))) + v) + + (define/public (listen-evt) + ;; we wrap this event for two reasons: firstly, because + ;; otherwise we leak authority, and secondly, to update our set + ;; of flows as arrived/departed events come by. + (wrap-evt in-ch apply-message)) + + (define/public (try-listen) + (apply-message (channel-try-get in-ch))) + + (define/public (listen) + (sync (wrap-evt the-disconnected-evt + (lambda (reason) + (error 'listen "Disconnected with reason ~v while listening" reason))) + (listen-evt))))) + +(define (room-main state) + ;;(write `(room-main ,state)) (newline) + (define handler + (sync (foldl (lambda (b acc) + (choice-evt (let ((qb (binding-queue-box b))) + (if (queue-empty? (unbox qb)) + acc + (choice-evt acc + (let-values (((first rest) (dequeue (unbox qb)))) + (handle-evt (channel-put-evt (binding-out-ch b) + first) + (lambda (dummy) + (lambda (state) + (set-box! qb rest) + state))))))) + (handle-evt (binding-in-ch b) + (handle-binding-message b)) + (handle-evt (thread-dead-evt (binding-thread b)) + (handle-binding-death b)))) + (handle-evt (room-state-ch state) join-message-handler) + (room-state-members state)))) + (room-main (handler state))) + +(define (((handle-binding-death b) dummy) state) + (part state b (binding-death-reason b))) + +(define ((join-message-handler message) state) + (match message + [(join-message in-ch out-ch disconnect-box thread exit-status) + (define b (binding (gensym 'binding) + (box (hash)) + in-ch + out-ch + disconnect-box + (box (make-queue)) + thread + exit-status)) + (add-binding state b)] + [unexpected (log-warning (format "room-main: unexpected message ~v" unexpected)) + state])) + +(define (binding-death-reason b) + (define es (binding-exit-status b)) + (and es ;; some threads are not standard-threads + (exit-status-exception es))) + +;; Both left and right must be canonicalized. +(define (topic-intersection left right) + (and ;; They are matching roles: + (or (and (eq? (topic-role left) 'publisher) + (eq? (topic-role right) 'subscriber)) + (and (eq? (topic-role left) 'subscriber) + (eq? (topic-role right) 'publisher))) + ;; They unify: + (mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right))))) + +;; Topic must be canonicalized. +(define (binding-has-topic? b topic) + (hash-has-key? (unbox (binding-flows-box b)) topic)) + +(define (binding-topics b) + (hash-keys (unbox (binding-flows-box b)))) + +;; Topic must be canonicalized. +(define (binding-flows-for-topic b topic) + (hash-ref (unbox (binding-flows-box b)) topic hash)) + +;; Topic must be canonicalized. +(define (set-binding-flows-for-topic! b topic flows) + (set-box! (binding-flows-box b) + (hash-set (unbox (binding-flows-box b)) topic flows))) + +;; Inserts a new flow in the records of b1, and signals b1 if the flow +;; is new to it. +;; Topics and flow must all be canonicalized. +(define (insert-flow! b1 topic1 flow topic2 b2) + (set-binding-flows-for-topic! + b1 topic1 + (let ((old-flows (binding-flows-for-topic b1 topic1))) + (when (not (hash-has-key? old-flows flow)) + (enqueue-message! b1 (arrived (topic (topic-role topic2) flow)))) + (hash-update old-flows + flow + (lambda (old-counterparties) + (set-add old-counterparties (cons topic2 b2))) + set)))) + +;; Removes a flow from the records of b1, signalling b1 if the flow +;; ended after the removal. +;; Topic and flow must be canonicalized. +(define (remove-flow! why b1 topic1 flow topic2 b2) + (define old-flows (binding-flows-for-topic b1 topic1)) + (define old-counterparties (hash-ref old-flows flow set)) + (define new-counterparties (set-remove old-counterparties (cons topic2 b2))) + (define new-flows (if (set-empty? new-counterparties) + (begin (enqueue-message! b1 (departed (topic (topic-role topic2) flow) why)) + (hash-remove old-flows flow)) + (hash-set old-flows flow new-counterparties))) + (set-binding-flows-for-topic! b1 topic1 new-flows)) + +(define (depart! b topic why) + (for* ([(flow counterparties) (binding-flows-for-topic b topic)] + [counterparty counterparties]) + (define other-topic (car counterparty)) + (define other-binding (cdr counterparty)) + (remove-flow! why other-binding other-topic flow topic b)) + (set-box! (binding-flows-box b) (hash-remove (unbox (binding-flows-box b)) topic))) + +(define (((handle-binding-message b) message) state) + ;;(write `(considering ,message from ,(binding-name b))) (newline) + (match message + [(leave-message why) (part state b why)] + [(arrived this-topic) + (when (not (binding-has-topic? b this-topic)) + (set-binding-flows-for-topic! b this-topic (hash)) + (for* ([other-binding (room-state-members state)] + [other-topic (binding-topics other-binding)]) + (let ((flow (topic-intersection this-topic other-topic))) + (when flow + (insert-flow! b this-topic flow other-topic other-binding) + (insert-flow! other-binding other-topic flow this-topic b))))) + state] + [(departed who why) + (depart! b who why) + state] + [(says this-topic what) + (for* ([(flow counterparties) (binding-flows-for-topic b this-topic)] + [counterparty counterparties]) + (define other-topic (car counterparty)) + (define other-binding (cdr counterparty)) + (enqueue-message! other-binding (says (topic (topic-role this-topic) flow) what))) + state] + [else + (log-warning (format "handle-binding-message: unexpected message ~v" message)) + state])) + +(define (part state b why) + (set-blocking-box! (binding-disconnect-box b) why) + (for ([topic (binding-topics b)]) (depart! b topic why)) + (remove-binding state b)) + +(define (add-binding state b) + (struct-copy room-state state + [members (cons b (room-state-members state))])) + +(define (remove-binding state b) + (struct-copy room-state state + [members (remove b (room-state-members state) eq?)])) + +(define (enqueue-message! b message) + (define qb (binding-queue-box b)) + ;;(write `(enqueued ,message for ,(binding-name b))) (newline) + (set-box! qb (enqueue (unbox qb) message))) diff --git a/presence/functional-queue.rkt b/presence/functional-queue.rkt new file mode 100644 index 0000000..14334e1 --- /dev/null +++ b/presence/functional-queue.rkt @@ -0,0 +1,75 @@ +#lang racket/base + +(provide make-queue + queue? + enqueue + enqueue-all + dequeue + list->queue + queue->list + queue-length + queue-empty? + queue-append + queue-extract) + +(struct queue (head tail) #:transparent) + +(define (make-queue) + (queue '() '())) + +(define (enqueue q v) + (queue (queue-head q) + (cons v (queue-tail q)))) + +(define (enqueue-all q v) + (queue (queue-head q) + (append (reverse v) (queue-tail q)))) + +(define (shuffle q) + (if (null? (queue-head q)) + (queue (reverse (queue-tail q)) '()) + q)) + +(define (dequeue q) + (let ((q1 (shuffle q))) + (values (car (queue-head q1)) + (queue (cdr (queue-head q1)) (queue-tail q1))))) + +(define (list->queue xs) + (queue xs '())) + +(define (queue->list q) + (append (queue-head q) (reverse (queue-tail q)))) + +(define (queue-length q) + (+ (length (queue-head q)) + (length (queue-tail q)))) + +(define (queue-empty? q) + (and (null? (queue-head q)) + (null? (queue-tail q)))) + +(define (queue-append q1 q2) + (queue (append (queue-head q1) + (reverse (queue-tail q1)) + (queue-head q2)) + (queue-tail q2))) + +(define (queue-extract q predicate [default-value #f]) + (let search-head ((head (queue-head q)) + (rejected-head-rev '())) + (cond + ((null? head) (let search-tail ((tail (reverse (queue-tail q))) + (rejected-tail-rev '())) + (cond + ((null? tail) (values default-value q)) + ((predicate (car tail)) (values (car tail) + (queue (queue-head q) + (append (reverse (cdr tail)) + rejected-tail-rev)))) + (else (search-tail (cdr tail) (cons (car tail) rejected-tail-rev)))))) + ((predicate (car head)) (values (car head) + (queue (append (reverse rejected-head-rev) + (cdr head)) + (queue-tail q)))) + (else (search-head (cdr head) (cons (car head) rejected-head-rev)))))) diff --git a/presence/standard-thread.rkt b/presence/standard-thread.rkt new file mode 100644 index 0000000..1e168e6 --- /dev/null +++ b/presence/standard-thread.rkt @@ -0,0 +1,48 @@ +#lang racket/base +;; Standard Thread + +(provide exit-status? + exit-status-exception + + current-thread-exit-status + exit-status-evt + + standard-thread) + +(struct exit-status (thread + [exception #:mutable] + ready)) + +(define *current-thread-exit-status* (make-parameter #f)) + +(define (current-thread-exit-status) + (define v (*current-thread-exit-status*)) + (if (exit-status? v) + (if (eq? (current-thread) (exit-status-thread v)) + v + (begin (*current-thread-exit-status* #f) + #f)) + #f)) + +(define (exit-status-evt es) + (wrap-evt (semaphore-peek-evt (exit-status-ready es)) + (lambda (dummy) es))) + +(define (fill-exit-status! es exn) + (set-exit-status-exception! es exn) + (semaphore-post (exit-status-ready es))) + +(define (call-capturing-exit-status thunk) + (define es (exit-status (current-thread) #f (make-semaphore 0))) + (parameterize ((*current-thread-exit-status* es)) + (with-handlers + ((exn? (lambda (e) + (fill-exit-status! es e) + (raise e)))) + (define result (thunk)) + (fill-exit-status! es #f) + result))) + +(define (standard-thread thunk) + (thread (lambda () + (call-capturing-exit-status thunk)))) diff --git a/presence/struct-map.rkt b/presence/struct-map.rkt new file mode 100644 index 0000000..6d6f40c --- /dev/null +++ b/presence/struct-map.rkt @@ -0,0 +1,42 @@ +#lang racket/base + +(provide current-struct-mappers + install-struct-mapper! + struct-map + struct-map/accumulator) + +;; Parameter> +(define current-struct-mappers (make-parameter (hash))) + +;; StructType Mapper -> Void +(define (install-struct-mapper! struct-type m) + (current-struct-mappers (hash-set (current-struct-mappers) struct-type m))) + +;; (X -> Y) Struct -> Struct +(define (struct-map f x) + (define-values (result acc) + (struct-map* 'struct-map (lambda (v acc) (values (f v) acc)) (void) x)) + result) + +;; (X Seed -> Y Seed) Seed Struct -> Struct Seed +(define (struct-map/accumulator f seed x) + (struct-map* 'struct-map/accumulator f seed x)) + +(define (struct-map* name f seed x) + (define-values (i skipped) (struct-info x)) + (when (not i) (error name "Cannot retrieve struct-info for ~v" x)) + (define m (hash-ref (current-struct-mappers) + i + (lambda () + (define key (prefab-struct-key x)) + (when (not key) (error name "No mapper for ~v" x)) + (prefab-struct-mapper key)))) + (m f seed x)) + +(define ((prefab-struct-mapper key) f initial-seed x) + (define-values (new-fields final-seed) + (for/fold ([new-fields '()] [old-seed initial-seed]) + ([old-field (cdr (vector->list (struct->vector x)))]) + (define-values (new-field new-seed) (f old-field old-seed)) + (values (cons new-field new-fields) new-seed))) + (values (apply make-prefab-struct key (reverse new-fields)) final-seed)) diff --git a/presence/test-conversation-socket.rkt b/presence/test-conversation-socket.rkt new file mode 100644 index 0000000..b267d2e --- /dev/null +++ b/presence/test-conversation-socket.rkt @@ -0,0 +1,73 @@ +#lang racket/base + +(require racket/class) +(require racket/match) + +(require "conversation.rkt") +(require "conversation-socket.rkt") + +(define pool (make-room 'everybody)) + +(define (handle-connection sock quit-proc) + (join-room pool) + (define h (join-room sock)) + (match (send h listen) + ((arrived peer-name) + (let loop () + (send h say "Ready>> ") + (sync (handle-evt (send h listen-evt) + (match-lambda + ((says _ _ 'eof) + (send h say "OK, bye\n")) + ((says _ "quit" 'data) + (send h say (credit peer-name 1)) + (quit-proc) + (send h say "OK, will quit accepting\n") + (loop)) + ((says _ what 'data) + (write what) + (newline) + (send h say (credit #f 1)) + (send h say "Carry on\n") + (loop)) + ((departed _ _) (void)) + (else (loop)))) + (handle-evt (send h disconnected-evt) void)))))) + +(define (listen port-no) + (define r (make-room)) + (tcp-server-actor r + `((initial-accept-credit 1) + (read-mode lines) + (initial-read-credit 1)) + port-no) + (define h (join-room r 'main)) + (match (send h listen) + ((arrived listener-name) + (let loop () + (match (send h listen) + ((says _ sock 'accepted) + (thread (lambda () + (handle-connection sock + (lambda () + (send h depart 'told-to-quit))))) + (send h say (credit listener-name 1))) + (unexpected + (write `(unexpected ,unexpected)) + (newline))) + (loop))))) + +(thread (lambda () + (join-room pool) + (listen 5001))) + +(define (wait-until-pool-empty) + (define h (join-room pool)) + (let loop ((count 0)) + (match (send h listen) + ((arrived _) (loop (+ count 1))) + ((departed _ _) (if (= count 1) + 'done + (loop (- count 1)))) + (_ (loop count))))) +(wait-until-pool-empty) diff --git a/presence/test-conversation.rkt b/presence/test-conversation.rkt new file mode 100644 index 0000000..66b9aba --- /dev/null +++ b/presence/test-conversation.rkt @@ -0,0 +1,85 @@ +#lang racket/base + +(require racket/tcp) +(require racket/port) +(require racket/class) +(require racket/match) + +(require "conversation.rkt") +(require "standard-thread.rkt") + +(define r (make-room)) + +(standard-thread + (lambda () + (define handle (join-room r)) + (send handle assert! (topic 'subscriber 'Any)) + (send handle assert! (topic 'publisher 'Any)) + (let loop () + (define m (send handle listen)) + ;;(write `(robot heard ,m)) (newline) + (match m + [(says _ "die") + (error 'robot "Following orders!")] + [(says (topic 'publisher _) _) + (send handle say (topic 'subscriber 'Any) `(robot hears ,m))] + [else (void)]) + (loop)))) + +(define (interaction i o) + (display "What is your name? > " o) + (flush-output o) + (define name (read-line i)) + (if (eof-object? name) + (begin (display "OK, bye then!" o) + (flush-output o)) + (let ((handle (join-room r))) + (define talk-topic (topic 'publisher (list name 'Sink 'speech))) + (define listen-topic (topic 'subscriber (list 'Speaker name 'speech))) + (send handle assert! talk-topic) + (send handle assert! listen-topic) + (let loop () + (display name o) + (display "@ROOM>> " o) + (flush-output o) + (sync (handle-evt (send handle listen-evt) + (lambda (m) + (write `(,name hears ,m) o) + (newline o) + (match m + [(says (topic 'publisher (list (== name) _ _)) _) + (write `(,name not acking own utterance) o) + (newline o)] + [(says (and specific-topic (topic 'publisher _)) _) + (write `(,name acking) o) + (newline o) + (send handle say specific-topic (list name 'ack))] + [_ (void)]) + (flush-output o) + (loop))) + (handle-evt (read-line-evt i 'any) + (lambda (utterance) + (when (equal? utterance "error") + (error 'interaction "Following orders!")) + (when (not (eof-object? utterance)) + (send handle say talk-topic utterance) + (loop))))))))) + +(standard-thread + (lambda () + (interaction (current-input-port) (current-output-port)))) + +(define port-number 5001) +(display "Listening on port ") +(display port-number) +(newline) +(let ((s (tcp-listen port-number 4 #t))) + (let accept-loop () + (define-values (i o) (tcp-accept s)) + (standard-thread + (lambda () + (interaction i o) + (close-input-port i) + (close-output-port o))) + (accept-loop))) + diff --git a/presence/test-unify.rkt b/presence/test-unify.rkt new file mode 100644 index 0000000..2dea4f5 --- /dev/null +++ b/presence/test-unify.rkt @@ -0,0 +1,62 @@ +#lang racket + +(require "unify.rkt") + +(require rackunit) + +(define (uc->v x) + (define-values (x1 env) (upper-case-symbols->variables x)) + x1) + +(define-syntax-rule (check-match expected pat) + (check-pred (lambda (v) (match v [pat #t] [_ #f])) expected)) + +(check-equal? (list->set (set-map (variables-in (uc->v '(a b c))) variable-info)) (set)) +(check-equal? (list->set (set-map (variables-in (uc->v '(a B B))) variable-info)) (set 'B)) +(check-equal? (list->set (set-map (variables-in (uc->v '(a B C))) variable-info)) (set 'B 'C)) + +(check-equal? (unify/vars '(a B c) '(a b c)) '((B . b))) +(check-equal? (unify/vars '(a B B) '(a b c)) #f) +(check-equal? (unify/vars '(a B B) '(a b B)) '((B . b))) + +(check-match (unify/vars '(a b C) '(a b C)) `((C . ,(variable 'C)))) + +(check-equal? (unify/vars '#(A 2 3) '#(1 2 B)) '((B . 3) (A . 1))) + +(check-equal? (unify-match/vars '#(A 2 3) '#(1 2 B)) '((A . 1))) +(check-equal? (unify-match/vars '#(1 2 B) '#(A 2 3)) '((B . 3))) + +(check-equal? (unify-match/vars '(a B B) '(a b C)) '((B . b))) +(check-equal? (unify-match/vars '(a b C) '(a B B)) '((C . b))) + +(check-equal? (unify-match/vars '(a C C) '(a b c)) #f) +(check-equal? (unify-match/vars '(a b c) '(a C C)) #f) +(check-match (unify-match/vars '(a b C) '(a D B)) `((C . ,(variable 'B)))) +(check-match (unify-match/vars '(a C C) '(a D B)) `((C . ,(variable 'D)))) + +(check-equal? (mgu-canonical (uc->v '(a b C)) (uc->v '(a D B))) `(a b ,(canonical-variable 0))) +(check-equal? (canonicalize (uc->v `(A (B A) D))) + (canonicalize (uc->v `(E (F E) H)))) + +(check-equal? (upper-case-symbols->canonical '(A (B A) D)) + (list (canonical-variable 0) + (list (canonical-variable 1) (canonical-variable 0)) + (canonical-variable 2))) + +(let* ((left (uc->v `(A (b A) D))) + (right (freshen left))) + (check-true (not (eq? (car left) (car right)))) + (check-eq? (variable-info (car left)) (variable-info (car right))) + (check-equal? (canonicalize left) (list (canonical-variable 0) + (list 'b (canonical-variable 0)) + (canonical-variable 1))) + (check-equal? (canonicalize left) (canonicalize right))) + +(struct x (y) #:transparent) +(let ((A (variable 'A))) + (check-equal? (unify (x A) (x 123)) `((,A . 123)))) + +(struct x1 (y z) #:prefab) +(check-equal? (unify/vars (x1 'A 'a) (x1 'b 'a)) '((A . b))) +(let ((A (variable 'A))) + (check-equal? (apply-subst `((,A . b)) (x1 A 'a)) (x1 'b 'a))) diff --git a/presence/unify.rkt b/presence/unify.rkt new file mode 100644 index 0000000..65e58fb --- /dev/null +++ b/presence/unify.rkt @@ -0,0 +1,239 @@ +#lang racket/base + +(require racket/set) +(require racket/match) +(require "struct-map.rkt") + +(provide (struct-out variable) + (struct-out canonical-variable) + variables-in + unify + unify/env + unify/vars + unify-match/vars + freshen + canonicalize + mgu-freshen + mgu-canonical + apply-subst + upper-case-symbols->variables + upper-case-symbols->canonical) + +;; A Subst is a Maybe>. +;; TODO: semantics + +;; Compared by eq?, not equal?. In particular, info is not involved in +;; equivalence. +(struct variable (info) + #:property prop:custom-write + (lambda (v port mode) + (display "?" port) + (write (variable-info v) port))) + +;; Compared by equal?, not eq?. The number is a part of the +;; appropriate equivalence relation for canonical-variables. +(struct canonical-variable (index) #:transparent + #:property prop:custom-write + (lambda (v port mode) + (display "?!" port) + (write (canonical-variable-index v) port))) + +;; Any -> Set +(define (variables-in x) + (let walk ((x x) (acc (set))) + (cond + [(variable? x) (set-add acc x)] + [(pair? x) (walk (car x) (walk (cdr x) acc))] + [(vector? x) (foldl walk acc (vector->list x))] + [(struct? x) (walk (struct->vector x #f) acc)] + [else acc]))) + +;; Variable Any -> Boolean +(define (occurs? var val) + (let walk ((x val)) + (cond + [(eq? var x) #t] + [(pair? x) (or (walk (car x)) (walk (cdr x)))] + [(vector? x) (ormap walk (vector->list x))] + [(struct? x) (walk (struct->vector x #f))] + [else #f]))) + +;; Variable Any Subst -> Subst +(define (extend-subst var val env) + (cond + [(eq? var val) + ;; Avoid trivial tautologies. Less trivial ones are not detected, + ;; but are harmless. + env] + [(occurs? var val) + ;; Occurs check. + #f] + [else + (cons (cons var val) env)])) + +;; Any Subst Set -> Any +(define (chase x env seen) + (if (variable? x) + (cond [(set-member? seen x) x] + [(assq x env) => (lambda (cell) (chase (cdr cell) env (set-add seen x)))] + [else x]) + x)) + +;; Any Any -> Subst +(define (unify a b) + (unify/env a b '())) + +;; Any Any Subst -> Subst +(define (unify/env a0 b0 env) + (let walk ((a0 a0) (b0 b0) (env env)) + (and env + (let ((a (chase a0 env (set))) + (b (chase b0 env (set)))) + (cond + [(variable? a) (extend-subst a b env)] + [(variable? b) (extend-subst b a env)] + [(and (pair? a) (pair? b)) + (walk (car a) (car b) (walk (cdr a) (cdr b) env))] + [(and (vector? a) (vector? b) (= (vector-length a) (vector-length b))) + (for/fold ([env env]) ([ea a] [eb b]) (walk ea eb env))] + [(and (struct? a) (struct? b)) + (walk (struct->vector a #f) (struct->vector b #f) env)] + [else (and (equal? a b) env)]))))) + +;; Any -> (values Any AList) +;; Converts upper-case symbols to variables, making sure that +;; eq? symbols map to eq? variables. +(define (upper-case-symbols->variables x) + (let walk ((x x) (env '())) + (cond + [(upper-case-symbol? x) + (cond [(assq x env) => (lambda (cell) (values (cdr cell) env))] + [else (let ((v (variable x))) (values v (cons (cons x v) env)))])] + [(pair? x) + (define-values (a env1) (walk (car x) env)) + (define-values (d env2) (walk (cdr x) env1)) + (values (cons a d) env2)] + [(vector? x) + (define result (make-vector (vector-length x))) + (values result (for/fold ([env env]) ([i (vector-length x)]) + (define-values (val env1) (walk (vector-ref x i) env)) + (vector-set! result i val) + env1))] + [(struct? x) (struct-map/accumulator walk env x)] + [else (values x env)]))) + +;; Any -> Any +(define (upper-case-symbols->canonical t) + (define env (make-hash)) ;; cheeky use of mutation + (let walk ((t t)) + (cond + [(upper-case-symbol? t) + (cond [(hash-ref env t #f)] + [else (define v (canonical-variable (hash-count env))) (hash-set! env t v) v])] + [(pair? t) (cons (walk (car t)) (walk (cdr t)))] + [(vector? t) (list->vector (map walk (vector->list t)))] + [(struct? t) (struct-map walk t)] + [else t]))) + +;; Any -> Boolean +(define (upper-case-symbol? x) + (and (symbol? x) + (let ((name (symbol->string x))) + (and (positive? (string-length name)) + (char-upper-case? (string-ref name 0)))))) + +;; AList -> AList +(define (flip-env env) + (map (lambda (x) (cons (cdr x) (car x))) env)) + +;; Any Any -> Subst +;; Like unify after upper-case-symbols->variables on both arguments. +(define (unify/vars a b) + (define-values (processed env) (upper-case-symbols->variables (cons a b))) + (define s (unify (car processed) (cdr processed))) + (and s (apply-subst s env))) + +;; Any Any -> Subst +;; Like unify-match after upper-case-symbols->variables on both +;; arguments, extracting bindings only from the first argument. +(define (unify-match/vars a b) + (define-values (pa a-env) (upper-case-symbols->variables a)) + (define-values (pb b-env) (upper-case-symbols->variables b)) + (define s (unify pa pb)) + (and s (apply-subst s a-env))) + +;; Utility used by freshen and canonicalize below. +(define (freshen* t var-handler canon-handler) + (define env (make-hash)) ;; cheeky use of mutation + (let walk ((t t)) + (cond + [(or (variable? t) (canonical-variable? t)) + (cond [(hash-ref env t #f)] + [else (define v ((if (canonical-variable? t) canon-handler var-handler) t env)) + (hash-set! env t v) + v])] + [(pair? t) (cons (walk (car t)) (walk (cdr t)))] + [(vector? t) (list->vector (map walk (vector->list t)))] + [(struct? t) (struct-map walk t)] + [else t]))) + +;; Any -> Any +;; +;; Freshens a term by substituting out variables in the term with +;; fresh variables to produce an arbitrary member of the term's +;; alpha-equivalence-class that shares no variables with the original. +;; +;; Treats canonical-variables just like regular ones, freshening them +;; with new ordinary (non-canonical) variables. +(define (freshen t) + (freshen* t + (lambda (var env) (variable (variable-info var))) + (lambda (var env) (variable (canonical-variable-index var))))) + +;; Any -> Any +;; +;; Canonicalizes a term by substituting out variables in the term with +;; canonical-variables to produce a canonical member of the term's +;; alpha-equivalence-class. +;; +;; Canonical variables are used in a structurally-determined order +;; related to print order: generally, all unseen variables to the left +;; of a term's print representation are given canonical equivalents +;; before those to the right. +;; +;; Canonical-variables may not appear in the input term. +(define (canonicalize t) + (freshen* t + (lambda (var env) (canonical-variable (hash-count env))) + (lambda (var env) + (error 'canonicalize "Canonical variables are forbidden in input to canonicalize")))) + +;; Any Any -> Any +;; If the arguments unify, applies the substitution to one of them, +;; yielding a most general unifier, and then freshens the result. +(define (mgu-freshen a b) + (define sub (unify a b)) + (and sub (freshen (apply-subst sub a)))) + +;; Any Any -> Any +;; If the arguments unify, applies the substitution to one of them, +;; yielding a most general unifier, and then canonicalizes the result. +(define (mgu-canonical a b) + (define sub (unify a b)) + (and sub (canonicalize (apply-subst sub a)))) + +;; Subst Any -> Any +(define (apply-subst env x) + (let walk ((x0 x)) + (define x (chase x0 env (set))) + (cond + [(pair? x) (cons (walk (car x)) (walk (cdr x)))] + [(vector? x) (list->vector (map walk (vector->list x)))] + [(struct? x) (struct-map walk x)] + [else x]))) + +(require racket/trace) +(trace ;;unify/env + ;;upper-case-symbols->variables + ;;apply-subst + )