tcp2 line-reader
This commit is contained in:
parent
801470ebaa
commit
22998de0dc
|
@ -3,17 +3,10 @@
|
||||||
(provide (struct-out tcp-channel-line))
|
(provide (struct-out tcp-channel-line))
|
||||||
|
|
||||||
(require "tcp.rkt")
|
(require "tcp.rkt")
|
||||||
|
(require "../support/bytes.rkt")
|
||||||
|
|
||||||
(struct tcp-channel-line (source destination bytes) #:prefab)
|
(struct tcp-channel-line (source destination bytes) #:prefab)
|
||||||
|
|
||||||
;; This should probably be in the standard library.
|
|
||||||
(define (bytes-index bs b)
|
|
||||||
(define len (bytes-length bs))
|
|
||||||
(let loop ((i 0))
|
|
||||||
(cond [(= i len) #f]
|
|
||||||
[(eqv? (bytes-ref bs i) b) i]
|
|
||||||
[else (loop (+ i 1))])))
|
|
||||||
|
|
||||||
(spawn #:name 'line-reader-factory
|
(spawn #:name 'line-reader-factory
|
||||||
(during/spawn (observe (tcp-channel-line $src $dst _))
|
(during/spawn (observe (tcp-channel-line $src $dst _))
|
||||||
#:name `(line-reader ,src ,dst)
|
#:name `(line-reader ,src ,dst)
|
||||||
|
|
|
@ -171,7 +171,9 @@
|
||||||
[(message (inbound (tcp-channel remote-addr local-addr (? bytes? bs))))
|
[(message (inbound (tcp-channel remote-addr local-addr (? bytes? bs))))
|
||||||
(transition state (message (tcp-channel remote-addr local-addr bs)))]
|
(transition state (message (tcp-channel remote-addr local-addr bs)))]
|
||||||
[(message (tcp-channel _ _ bs))
|
[(message (tcp-channel _ _ bs))
|
||||||
(write-bytes bs (connection-state-cout state))
|
(if (string? bs)
|
||||||
|
(write-string bs (connection-state-cout state))
|
||||||
|
(write-bytes bs (connection-state-cout state)))
|
||||||
(flush-output (connection-state-cout state))
|
(flush-output (connection-state-cout state))
|
||||||
#f]
|
#f]
|
||||||
[(? patch? p)
|
[(? patch? p)
|
||||||
|
|
|
@ -12,17 +12,20 @@
|
||||||
(struct-out tcp-accepted)
|
(struct-out tcp-accepted)
|
||||||
(struct-out tcp-out)
|
(struct-out tcp-out)
|
||||||
(struct-out tcp-in)
|
(struct-out tcp-in)
|
||||||
|
(struct-out tcp-in-line)
|
||||||
(struct-out tcp-address) ;; \_ From syndicate/drivers/tcp
|
(struct-out tcp-address) ;; \_ From syndicate/drivers/tcp
|
||||||
(struct-out tcp-listener) ;; /
|
(struct-out tcp-listener) ;; /
|
||||||
)
|
)
|
||||||
|
|
||||||
(require syndicate/protocol/advertise)
|
|
||||||
(require/activate syndicate/drivers/tcp)
|
(require/activate syndicate/drivers/tcp)
|
||||||
|
(require syndicate/protocol/advertise)
|
||||||
|
(require syndicate/support/bytes)
|
||||||
|
|
||||||
(struct tcp-connection (id spec) #:prefab)
|
(assertion-struct tcp-connection (id spec))
|
||||||
(struct tcp-accepted (id) #:prefab)
|
(assertion-struct tcp-accepted (id))
|
||||||
(struct tcp-out (id text) #:prefab)
|
(message-struct tcp-out (id bytes))
|
||||||
(struct tcp-in (id text) #:prefab)
|
(message-struct tcp-in (id bytes))
|
||||||
|
(message-struct tcp-in-line (id bytes))
|
||||||
|
|
||||||
(spawn #:name 'tcp2-listen-driver
|
(spawn #:name 'tcp2-listen-driver
|
||||||
(during/spawn (observe (tcp-connection _ (tcp-listener $port)))
|
(during/spawn (observe (tcp-connection _ (tcp-listener $port)))
|
||||||
|
@ -51,3 +54,14 @@
|
||||||
(on (message (tcp-channel them us $bs)) (send! (tcp-in id bs)))
|
(on (message (tcp-channel them us $bs)) (send! (tcp-in id bs)))
|
||||||
(on (message (tcp-out id $bs)) (send! (tcp-channel us them bs)))))
|
(on (message (tcp-out id $bs)) (send! (tcp-channel us them bs)))))
|
||||||
|
|
||||||
|
(spawn #:name 'tcp2-line-reader-factory
|
||||||
|
(during/spawn (observe (tcp-in-line $id _))
|
||||||
|
#:name (list 'tcp2-line-reader id)
|
||||||
|
(field [buffer #""])
|
||||||
|
(on (message (tcp-in id $bs)) (buffer (bytes-append (buffer) bs)))
|
||||||
|
(begin/dataflow
|
||||||
|
(define newline-pos (bytes-index (buffer) (char->integer #\newline)))
|
||||||
|
(when newline-pos
|
||||||
|
(define line (subbytes (buffer) 0 newline-pos))
|
||||||
|
(buffer (subbytes (buffer) (+ newline-pos 1)))
|
||||||
|
(send! (tcp-in-line id line))))))
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
#lang racket/base
|
||||||
|
|
||||||
|
(provide bytes-index)
|
||||||
|
|
||||||
|
;; This should probably be in the standard library.
|
||||||
|
(define (bytes-index bs b)
|
||||||
|
(define len (bytes-length bs))
|
||||||
|
(let loop ((i 0))
|
||||||
|
(cond [(= i len) #f]
|
||||||
|
[(eqv? (bytes-ref bs i) b) i]
|
||||||
|
[else (loop (+ i 1))])))
|
Loading…
Reference in New Issue