subscribe-to-topic -> subscriber; publish-on-topic -> publisher
This commit is contained in:
parent
e51276baa1
commit
e8a7c253dd
|
@ -18,7 +18,7 @@
|
|||
(flush-output)
|
||||
(at-meta-level: Void
|
||||
(name-endpoint `(event-relay ,self-id ,e)
|
||||
(subscribe-to-topic: Void (cons e ?)
|
||||
(subscriber: Void (cons e ?)
|
||||
(on-message
|
||||
[msg (begin (printf "FIRED ~v -> ~v~n" e msg)
|
||||
(flush-output)
|
||||
|
|
|
@ -108,7 +108,7 @@
|
|||
(match-state state
|
||||
(match-conversation c
|
||||
(on-absence (handle-absence 'subscriber c state)))))
|
||||
(subscribe-to-topic (cons (tcp:tcp-accept-evt listener) (wild))
|
||||
(subscriber (cons (tcp:tcp-accept-evt listener) (wild))
|
||||
(on-message
|
||||
[(cons _ (list cin cout))
|
||||
(let-values (((local-hostname local-port remote-hostname remote-port)
|
||||
|
@ -151,16 +151,16 @@
|
|||
(quit)))
|
||||
|
||||
(transition #t ;; open
|
||||
(subscribe-to-topic (cons (read-bytes-avail-evt 4096 cin) (wild))
|
||||
(subscriber (cons (read-bytes-avail-evt 4096 cin) (wild))
|
||||
(match-state is-open
|
||||
(on-message
|
||||
[(cons _ (? eof-object?)) (close-transition is-open #t)]
|
||||
[(cons _ (? bytes? bs)) (transition is-open
|
||||
(send-message (tcp-channel remote-addr local-addr bs)))])))
|
||||
(subscribe-to-topic (cons (eof-evt cin) (wild))
|
||||
(subscriber (cons (eof-evt cin) (wild))
|
||||
(match-state is-open
|
||||
(on-message [(cons (? evt?) _) (close-transition is-open #t)])))
|
||||
(subscribe-to-topic (tcp-channel local-addr remote-addr (wild))
|
||||
(subscriber (tcp-channel local-addr remote-addr (wild))
|
||||
(match-state is-open
|
||||
(on-absence (close-transition is-open #f))
|
||||
(on-message
|
||||
|
@ -173,6 +173,6 @@
|
|||
[(? bytes? bs) (begin (write-bytes bs cout)
|
||||
(flush-output cout)
|
||||
(transition is-open))])])))
|
||||
(publish-on-topic (tcp-channel remote-addr local-addr (wild))
|
||||
(publisher (tcp-channel remote-addr local-addr (wild))
|
||||
(match-state is-open
|
||||
(on-absence (close-transition is-open #f))))))
|
||||
|
|
|
@ -162,7 +162,7 @@
|
|||
(match-state state
|
||||
(match-conversation c
|
||||
(on-absence (handle-absence 'subscriber c state)))))
|
||||
(subscribe-to-topic (cons (tcp:tcp-accept-evt listener) (wild))
|
||||
(subscriber (cons (tcp:tcp-accept-evt listener) (wild))
|
||||
(on-message
|
||||
[(cons _ (list cin cout))
|
||||
(let-values (((local-hostname local-port remote-hostname remote-port)
|
||||
|
@ -200,7 +200,7 @@
|
|||
(case (tcp-connection-state-mode state)
|
||||
[(lines)
|
||||
(name-endpoint 'inbound-relay
|
||||
(subscribe-to-topic (cons (read-bytes-line-evt cin 'any) (wild))
|
||||
(subscriber (cons (read-bytes-line-evt cin 'any) (wild))
|
||||
(match-state state
|
||||
(on-message
|
||||
[(cons _ (? eof-object?))
|
||||
|
@ -210,7 +210,7 @@
|
|||
(send-message (tcp-channel remote-addr local-addr bs)))]))))]
|
||||
[(bytes)
|
||||
(name-endpoint 'inbound-relay
|
||||
(subscribe-to-topic (cons (read-bytes-evt new-credit cin) (wild))
|
||||
(subscriber (cons (read-bytes-evt new-credit cin) (wild))
|
||||
(match-state state
|
||||
(on-message
|
||||
[(cons _ (? eof-object?))
|
||||
|
@ -221,12 +221,12 @@
|
|||
(send-message
|
||||
(tcp-channel remote-addr local-addr bs))))]))))])))))
|
||||
(transition (tcp-connection-state 'bytes 0)
|
||||
(subscribe-to-topic (cons (eof-evt cin) (wild))
|
||||
(subscriber (cons (eof-evt cin) (wild))
|
||||
(match-state state
|
||||
(on-message
|
||||
[(cons (? evt?) _)
|
||||
(close-transition state #t)])))
|
||||
(subscribe-to-topic (tcp-channel local-addr remote-addr (wild))
|
||||
(subscriber (tcp-channel local-addr remote-addr (wild))
|
||||
(match-state state
|
||||
(on-absence (close-transition state #f))
|
||||
(on-message
|
||||
|
@ -241,7 +241,7 @@
|
|||
[_
|
||||
(error 'tcp-connection-manager*
|
||||
"Publisher on a channel isn't supposed to issue channel control messages")])])))
|
||||
(publish-on-topic (tcp-channel remote-addr local-addr (wild))
|
||||
(publisher (tcp-channel remote-addr local-addr (wild))
|
||||
(match-state state
|
||||
(on-absence (close-transition state #f))
|
||||
(on-message
|
||||
|
|
|
@ -152,14 +152,14 @@
|
|||
(spawn: #:parent : ParentState
|
||||
#:child : DriverState
|
||||
(transition: (driver-state (make-timer-heap)) : DriverState
|
||||
(subscribe-to-topic: DriverState (set-timer-pattern (wild) (wild) (wild))
|
||||
(subscriber: DriverState (set-timer-pattern (wild) (wild) (wild))
|
||||
(match-state state
|
||||
(on-message
|
||||
[(set-timer label msecs 'relative)
|
||||
(install-timer! state label (+ (current-inexact-milliseconds) msecs))]
|
||||
[(set-timer label msecs 'absolute)
|
||||
(install-timer! state label msecs)])))
|
||||
(publish-on-topic: DriverState (timer-expired-pattern (wild) (wild)))))))
|
||||
(publisher: DriverState (timer-expired-pattern (wild) (wild)))))))
|
||||
|
||||
(: install-timer! : DriverState TimerLabel Real -> (Transition DriverState))
|
||||
(define (install-timer! state label deadline)
|
||||
|
@ -173,7 +173,7 @@
|
|||
(delete-endpoint 'time-listener)
|
||||
(and next
|
||||
(name-endpoint 'time-listener
|
||||
(subscribe-to-topic: DriverState (cons (timer-evt (pending-timer-deadline next)) (wild))
|
||||
(subscriber: DriverState (cons (timer-evt (pending-timer-deadline next)) (wild))
|
||||
(match-state state
|
||||
(on-message
|
||||
[(cons (? evt?) (? real? now))
|
||||
|
@ -193,7 +193,7 @@
|
|||
#:child : RelayState
|
||||
(transition: (relay-state 0 (make-immutable-hash '())) : RelayState
|
||||
(at-meta-level: RelayState
|
||||
(subscribe-to-topic: RelayState (timer-expired-pattern (wild) (wild))
|
||||
(subscriber: RelayState (timer-expired-pattern (wild) (wild))
|
||||
(match-state (relay-state next-counter active-timers)
|
||||
(on-message
|
||||
[(timer-expired (list (== self-id) (? exact-nonnegative-integer? counter))
|
||||
|
@ -203,7 +203,7 @@
|
|||
(and (hash-has-key? active-timers counter)
|
||||
(send-message (timer-expired (hash-ref active-timers counter)
|
||||
now))))]))))
|
||||
(subscribe-to-topic: RelayState (set-timer-pattern (wild) (wild) (wild))
|
||||
(subscriber: RelayState (set-timer-pattern (wild) (wild) (wild))
|
||||
(match-state (relay-state next-counter active-timers)
|
||||
(on-message
|
||||
[(set-timer label msecs kind)
|
||||
|
@ -212,4 +212,4 @@
|
|||
: RelayState
|
||||
(at-meta-level: RelayState
|
||||
(send-message (set-timer (list self-id next-counter) msecs kind))))])))
|
||||
(publish-on-topic: RelayState (timer-expired-pattern (wild) (wild)))))))
|
||||
(publisher: RelayState (timer-expired-pattern (wild) (wild)))))))
|
||||
|
|
|
@ -201,17 +201,17 @@
|
|||
(transition: #t : SocketManagerState
|
||||
;; Offers a handle-mapping on the local network so that
|
||||
;; the driver/factory can clean up when this process dies.
|
||||
(publish-on-topic: SocketManagerState (handle-mapping local-addr s))
|
||||
(publisher: SocketManagerState (handle-mapping local-addr s))
|
||||
;; If our counterparty removes either of their endpoints
|
||||
;; as the subscriber end of the remote-to-local stream or
|
||||
;; the publisher end of the local-to-remote stream, shut
|
||||
;; ourselves down. Also, relay messages published on the
|
||||
;; local-to-remote stream out on the actual socket.
|
||||
(publish-on-topic: SocketManagerState
|
||||
(publisher: SocketManagerState
|
||||
(udp-packet-pattern any-remote local-addr (wild))
|
||||
(match-state socket-is-open?
|
||||
(on-absence (handle-absence socket-is-open?))))
|
||||
(subscribe-to-topic: SocketManagerState
|
||||
(subscriber: SocketManagerState
|
||||
(udp-packet-pattern local-addr any-remote (wild))
|
||||
(match-state socket-is-open?
|
||||
(on-absence (handle-absence socket-is-open?))
|
||||
|
@ -223,7 +223,7 @@
|
|||
(transition: socket-is-open? : SocketManagerState))])))
|
||||
;; Listen for messages arriving on the actual socket using
|
||||
;; a ground event, and relay them at this level.
|
||||
(subscribe-to-topic: SocketManagerState (cons (udp-receive!-evt s buffer) (wild))
|
||||
(subscriber: SocketManagerState (cons (udp-receive!-evt s buffer) (wild))
|
||||
(on-message
|
||||
[(cons (? evt?) (list (? exact-integer? packet-length)
|
||||
(? string? remote-host)
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
;; Here we have a console output driver just to show how it's done.
|
||||
(name-process 'console-output-driver
|
||||
(spawn (transition/no-state
|
||||
(subscribe-to-topic (list 'console-output ?)
|
||||
(subscriber (list 'console-output ?)
|
||||
(on-message [(list 'console-output item)
|
||||
(printf "~a" item)
|
||||
(void)])))))
|
||||
|
@ -14,11 +14,11 @@
|
|||
(name-process 'console-input-driver
|
||||
(spawn (transition/no-state
|
||||
(name-endpoint 'input-relay
|
||||
(publish-on-topic (list 'console-input ?)
|
||||
(publisher (list 'console-input ?)
|
||||
(on-absence
|
||||
(send-message (list 'console-output "Connection terminated.\n"))
|
||||
(quit))))
|
||||
(subscribe-to-topic (cons (read-line-evt (current-input-port) 'any) ?)
|
||||
(subscriber (cons (read-line-evt (current-input-port) 'any) ?)
|
||||
(on-message
|
||||
[(cons _ (? eof-object?))
|
||||
(send-message (list 'console-output "Terminating on local EOF.\n"))
|
||||
|
@ -30,14 +30,14 @@
|
|||
(spawn (let ((local (tcp-handle 'outbound))
|
||||
(remote (tcp-address "localhost" 5999)))
|
||||
(transition/no-state
|
||||
(subscribe-to-topic (list 'console-input ?)
|
||||
(subscriber (list 'console-input ?)
|
||||
(on-absence (quit))
|
||||
(on-message
|
||||
[(list 'console-input line)
|
||||
(send-message (list 'console-output (format "> ~a \n" line)))
|
||||
(send-message (tcp-channel local remote (string-append line "\n")))]))
|
||||
(publish-on-topic (tcp-channel local remote ?))
|
||||
(subscribe-to-topic (tcp-channel remote local ?)
|
||||
(publisher (tcp-channel local remote ?))
|
||||
(subscriber (tcp-channel remote local ?)
|
||||
(on-absence (quit))
|
||||
(on-message
|
||||
[(tcp-channel _ _ (? eof-object?))
|
||||
|
|
|
@ -18,12 +18,12 @@
|
|||
(define (listen-to-user user them us)
|
||||
(list
|
||||
(at-meta-level
|
||||
(subscribe-to-topic (tcp-channel them us ?)
|
||||
(subscriber (tcp-channel them us ?)
|
||||
(on-absence (quit))
|
||||
(on-message
|
||||
[(tcp-channel _ _ (? bytes? text))
|
||||
(send-message `(,user says ,text))])))
|
||||
(publish-on-topic `(,user says ,?))))
|
||||
(publisher `(,user says ,?))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
(define (speak-to-user user them us)
|
||||
|
@ -37,8 +37,8 @@
|
|||
(list
|
||||
(say "You are ~s.~n" user)
|
||||
(at-meta-level
|
||||
(publish-on-topic (tcp-channel us them ?)))
|
||||
(subscribe-to-topic `(,? says ,?)
|
||||
(publisher (tcp-channel us them ?)))
|
||||
(subscriber `(,? says ,?)
|
||||
(match-conversation `(,who says ,_)
|
||||
(on-presence (announce who 'arrived))
|
||||
(on-absence (announce who 'departed))
|
||||
|
|
|
@ -19,9 +19,9 @@
|
|||
|
||||
(define (listen-to-user user them us)
|
||||
(list
|
||||
(publish-on-topic `(,user says ,?))
|
||||
(publisher `(,user says ,?))
|
||||
(at-meta-level
|
||||
(subscribe-to-topic (tcp-channel them us ?)
|
||||
(subscriber (tcp-channel them us ?)
|
||||
(on-absence (quit))
|
||||
(on-message
|
||||
[(tcp-channel _ _ (? bytes? text))
|
||||
|
@ -38,8 +38,8 @@
|
|||
(list
|
||||
(say "You are ~s.~n" user)
|
||||
(at-meta-level
|
||||
(publish-on-topic (tcp-channel us them ?)))
|
||||
(subscribe-to-topic `(,? says ,?)
|
||||
(publisher (tcp-channel us them ?)))
|
||||
(subscriber `(,? says ,?)
|
||||
(match-conversation `(,who says ,_)
|
||||
(on-presence (announce who 'arrived))
|
||||
(on-absence (announce who 'departed))
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
(define (echoer from to)
|
||||
(transition stateless
|
||||
(subscribe-to-topic (tcp-channel from to ?)
|
||||
(subscriber (tcp-channel from to ?)
|
||||
(on-absence (quit))
|
||||
(on-message
|
||||
[(tcp-channel _ _ data)
|
||||
|
|
|
@ -6,13 +6,13 @@
|
|||
|
||||
(define (echoer from to)
|
||||
(transition/no-state
|
||||
(subscribe-to-topic (tcp-channel from to ?)
|
||||
(subscriber (tcp-channel from to ?)
|
||||
(on-absence (quit))
|
||||
(on-message
|
||||
[(tcp-channel _ _ data)
|
||||
(send-message (tcp-channel to from data))]))))
|
||||
|
||||
(ground-vm tcp
|
||||
(subscribe-to-topic (tcp-channel ? (tcp-listener 5999) ?)
|
||||
(subscriber (tcp-channel ? (tcp-listener 5999) ?)
|
||||
(match-conversation (tcp-channel from to _)
|
||||
(on-presence (spawn (echoer from to))))))
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
observe-subscribers/everything:
|
||||
observe-publishers:
|
||||
observe-publishers/everything:
|
||||
publish-on-topic:
|
||||
subscribe-to-topic:
|
||||
publisher:
|
||||
subscriber:
|
||||
build-endpoint:)
|
||||
|
||||
(define&provide-dsl-helper-syntaxes "endpoint definition context"
|
||||
|
@ -73,13 +73,13 @@
|
|||
(core:role 'subscriber (cast topic core:Topic) 'everything)
|
||||
clause ...))
|
||||
|
||||
(define-syntax-rule (publish-on-topic: State topic clause ...)
|
||||
(define-syntax-rule (publisher: State topic clause ...)
|
||||
(build-endpoint: State
|
||||
(gensym 'anonymous-endpoint)
|
||||
(core:role 'publisher (cast topic core:Topic) 'participant)
|
||||
clause ...))
|
||||
|
||||
(define-syntax-rule (subscribe-to-topic: State topic clause ...)
|
||||
(define-syntax-rule (subscriber: State topic clause ...)
|
||||
(build-endpoint: State
|
||||
(gensym 'anonymous-endpoint)
|
||||
(core:role 'subscriber (cast topic core:Topic) 'participant)
|
||||
|
@ -247,8 +247,8 @@
|
|||
;;; eval: (put 'observe-subscribers/everything: 'scheme-indent-function 2)
|
||||
;;; eval: (put 'observe-publishers: 'scheme-indent-function 2)
|
||||
;;; eval: (put 'observe-publishers/everything: 'scheme-indent-function 2)
|
||||
;;; eval: (put 'publish-on-topic: 'scheme-indent-function 2)
|
||||
;;; eval: (put 'subscribe-to-topic: 'scheme-indent-function 2)
|
||||
;;; eval: (put 'publisher: 'scheme-indent-function 2)
|
||||
;;; eval: (put 'subscriber: 'scheme-indent-function 2)
|
||||
;;; eval: (put 'match-state 'scheme-indent-function 1)
|
||||
;;; eval: (put 'match-orientation 'scheme-indent-function 1)
|
||||
;;; eval: (put 'match-conversation 'scheme-indent-function 1)
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
observe-subscribers/everything
|
||||
observe-publishers
|
||||
observe-publishers/everything
|
||||
publish-on-topic
|
||||
subscribe-to-topic
|
||||
publisher
|
||||
subscriber
|
||||
build-endpoint)
|
||||
|
||||
(define&provide-dsl-helper-syntaxes "endpoint definition context"
|
||||
|
@ -67,12 +67,12 @@
|
|||
(core:role 'subscriber topic 'everything)
|
||||
clause ...))
|
||||
|
||||
(define-syntax-rule (publish-on-topic topic clause ...)
|
||||
(define-syntax-rule (publisher topic clause ...)
|
||||
(build-endpoint (gensym 'anonymous-endpoint)
|
||||
(core:role 'publisher topic 'participant)
|
||||
clause ...))
|
||||
|
||||
(define-syntax-rule (subscribe-to-topic topic clause ...)
|
||||
(define-syntax-rule (subscriber topic clause ...)
|
||||
(build-endpoint (gensym 'anonymous-endpoint)
|
||||
(core:role 'subscriber topic 'participant)
|
||||
clause ...))
|
||||
|
@ -230,8 +230,8 @@
|
|||
;;; eval: (put 'observe-subscribers/everything 'scheme-indent-function 1)
|
||||
;;; eval: (put 'observe-publishers 'scheme-indent-function 1)
|
||||
;;; eval: (put 'observe-publishers/everything 'scheme-indent-function 1)
|
||||
;;; eval: (put 'publish-on-topic 'scheme-indent-function 1)
|
||||
;;; eval: (put 'subscribe-to-topic 'scheme-indent-function 1)
|
||||
;;; eval: (put 'publisher 'scheme-indent-function 1)
|
||||
;;; eval: (put 'subscriber 'scheme-indent-function 1)
|
||||
;;; eval: (put 'match-state 'scheme-indent-function 1)
|
||||
;;; eval: (put 'match-orientation 'scheme-indent-function 1)
|
||||
;;; eval: (put 'match-conversation 'scheme-indent-function 1)
|
||||
|
|
Loading…
Reference in New Issue