diff --git a/syndicate-examples/speed-tests/box-and-client/without-dataspace.rkt b/syndicate-examples/speed-tests/box-and-client/without-dataspace.rkt index 0af8a26..30e340c 100644 --- a/syndicate-examples/speed-tests/box-and-client/without-dataspace.rkt +++ b/syndicate-examples/speed-tests/box-and-client/without-dataspace.rkt @@ -12,18 +12,17 @@ (define reporter (report-stats REPORT_EVERY)) (send! k (hash 'getter (embedded - (ref (during* #:name 'subscription-handler - (lambda (observer) - (log-info "observer ~v" observer) - (at (embedded-value observer) (assert (value))))))) + (object #:name 'subscription-handler + [#:asserted observer + (log-info "observer ~v" observer) + (at (embedded-value observer) (assert (value)))])) 'setter (embedded - (ref (entity #:name 'update-handler - #:message - (lambda (new-value) - (reporter new-value) - (when (= new-value LIMIT) (stop-facet root-facet)) - (value new-value))))))))) + (object #:name 'update-handler + [#:asserted new-value + (reporter new-value) + (when (= new-value LIMIT) (stop-facet root-facet)) + (value new-value)])))))) (define (client getter setter) (log-info "client start") @@ -32,33 +31,29 @@ (define count 0) (at getter (assert (embedded - (ref (entity #:name 'termination-detector - #:assert - (lambda (_value _handle) - (set! count (+ count 1))) - #:retract - (lambda (_handle) - (set! count (- count 1)) - (when (zero? count) - (log-info "Client detected box termination") - (stop-facet root-facet))))))) + (object #:name 'termination-detector + [#:asserted _ + (set! count (+ count 1)) + #:retracted + (set! count (- count 1)) + (when (zero? count) + (log-info "Client detected box termination") + (stop-facet root-facet))]))) (assert (embedded - (ref (entity #:name 'update-handler - #:assert - (lambda (value _handle) (send! setter (+ value 1)))))))))) + (object #:name 'update-handler + [#:asserted value (send! setter (+ value 1))])))))) (module+ main (time (actor-system (define root-facet this-facet) (define disarm (facet-prevent-inert-check! this-facet)) - (box (ref (entity #:name 'box-boot-handler - #:message - (lambda (refs) - (log-info "refs ~v" refs) - (match-define (hash-table ('getter (embedded g)) - ('setter (embedded s))) refs) - (client g s) - (stop-facet root-facet)))) + (box (object #:name 'box-boot-handler + [#:asserted refs + (log-info "refs ~v" refs) + (match-define (hash-table ('getter (embedded g)) + ('setter (embedded s))) refs) + (client g s) + (stop-facet root-facet)]) 500000 100000)))) diff --git a/syndicate/distributed/tcp-server.rkt b/syndicate/distributed/tcp-server.rkt index 73ea5c9..2df4e64 100644 --- a/syndicate/distributed/tcp-server.rkt +++ b/syndicate/distributed/tcp-server.rkt @@ -28,25 +28,20 @@ (spawn #:name 'tcp-server (at ds (during/spawn (Connection $conn (TcpInbound "0.0.0.0" 5999)) - (define gatekeeper - (ref - (during* #:name (list conn 'gatekeeper) - (lambda (assertion) - (match (parse-Resolve assertion) - [(? eof-object?) (void)] - [(Resolve unvalidated-sturdyref observer) - (at ds - (during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target) - (define sturdyref (validate unvalidated-sturdyref key)) - (define attenuation - (append-map Attenuation-value - (reverse (SturdyRef-caveatChain sturdyref)))) - (define attenuated-target - (apply attenuate-entity-ref target attenuation)) - (at observer (assert (embedded attenuated-target)))))]))))) (run-relay #:name conn #:packet-writer (lambda (bs) (send-data conn bs)) #:setup-inputs (lambda (tr) (accept-connection conn #:on-data (lambda (bs) (accept-bytes tr bs)))) - #:initial-ref gatekeeper)))))) + #:initial-ref + (object #:name (list conn 'gatekeeper) + [(Resolve unvalidated-sturdyref observer) + (at ds + (during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target) + (define sturdyref (validate unvalidated-sturdyref key)) + (define attenuation + (append-map Attenuation-value + (reverse (SturdyRef-caveatChain sturdyref)))) + (define attenuated-target + (apply attenuate-entity-ref target attenuation)) + (at observer (assert (embedded attenuated-target)))))]))))))) diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 6a7bc0a..ecdc32e 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -61,10 +61,11 @@ (on-stop (close-input-port i) (close-output-port o)) (start-inbound-relay connection-custodian name local-peer i) + (define relay (outbound-relay name o)) (at local-peer (assert (ActiveSocket-controller - (ref (entity #:name (list name 'socket) - #:message (outbound-relay name o)))))))))) + (object #:name (list name 'socket) + [#:asserted (Socket data) (relay data)])))))))) (define (spawn-inbound ds custodian i o spec) (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) @@ -74,29 +75,25 @@ (close-output-port o)) (define active-controller #f) - (define active-controller-handle #f) + (define relay (outbound-relay name o)) (at ds (assert (Connection - (ref (entity #:name (list name 'active-socket) - #:assert - (lambda (m handle) - (match (parse-ActiveSocket m) - [(ActiveSocket-controller controller) - (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) - (when (not active-controller) - (start-inbound-relay custodian name controller i)) - (set! active-controller controller) - (set! active-controller-handle handle)] - [(ActiveSocket-close message) - (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) - (stop-current-facet)])) - #:retract - (lambda (handle) - (when (equal? handle active-controller-handle) - (log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor) - (stop-current-facet))) - #:message - (outbound-relay name o))) + (object + #:name (list name 'active-socket) + [#:asserted (ActiveSocket-controller controller) + (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) + (when (not active-controller) + (start-inbound-relay custodian name controller i)) + (set! active-controller controller) + #:retracted + (when (eq? controller active-controller) + (log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor) + (stop-current-facet))] + [#:asserted (ActiveSocket-close message) + (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) + (stop-current-facet)] + [#:asserted (ActiveSocket-Socket (Socket data)) + (relay data)]) spec))))) (define (start-inbound-relay custodian name target i) @@ -113,17 +110,15 @@ (define (outbound-relay name o) (define flush-pending #f) - (lambda (m) - (match (parse-ActiveSocket m) - [(ActiveSocket-Socket (Socket payload)) - (log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name) - (write-bytes payload o) - (when (not flush-pending) - (set! flush-pending #t) - (facet-on-end-of-turn! this-facet - (lambda () - (set! flush-pending #f) - (flush-output o))))]))) + (lambda (payload) + (log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name) + (write-bytes payload o) + (when (not flush-pending) + (set! flush-pending #t) + (facet-on-end-of-turn! this-facet + (lambda () + (set! flush-pending #f) + (flush-output o)))))) (define (read-bytes-avail input-port #:limit [limit 65536]) (define buffer (make-bytes limit)) @@ -134,11 +129,8 @@ (define (accept-connection conn #:on-data on-data) (at conn (assert (ActiveSocket-controller - (ref (entity #:name 'inbound-socket-controller - #:message - (lambda (m) - (match-define (Socket data) (parse-Socket m)) - (on-data data)))))))) + (object #:name 'inbound-socket-controller + [#:asserted (Socket data) (on-data data)]))))) (define (establish-connection ds spec #:on-connected on-connected @@ -146,16 +138,16 @@ #:on-disconnected [on-disconnected (lambda () (stop-current-facet))] #:on-rejected [on-rejected (lambda () (stop-current-facet))]) (define s - (entity #:name 'outbound-socket - #:assert (lambda (m _handle) - (match (parse-ActiveSocket m) - [(ActiveSocket-controller peer) (on-connected peer)] - [(ActiveSocket-close message) (on-rejected message)])) - #:retract (lambda (_handle) (on-disconnected)) - #:message (lambda (m) - (match (parse-ActiveSocket m) - [(ActiveSocket-Socket (Socket data)) (on-data data)])))) - (at ds (assert (Connection (ref s) spec)))) + (object #:name 'outbound-socket + [#:asserted (ActiveSocket-controller peer) + (on-connected peer) + #:retracted + (on-disconnected)] + [#:asserted (ActiveSocket-close message) + (on-rejected message)] + [#:asserted (ActiveSocket-Socket (Socket data)) + (on-data data)])) + (at ds (assert (Connection s spec)))) (define (send-data conn data) (send! conn (Socket (if (bytes? data) data (string->bytes/utf-8 data))))) diff --git a/syndicate/pattern.rkt b/syndicate/pattern.rkt index 23dee46..058eada 100644 --- a/syndicate/pattern.rkt +++ b/syndicate/pattern.rkt @@ -6,9 +6,11 @@ (provide (for-syntax preserves-pattern-registry register-preserves-pattern! analyse-pattern - analyse-pattern-bindings) + analyse-pattern-bindings + analyse-match-pattern) define-preserves-pattern :pattern + :parse pattern->constant-values pattern->constant-paths @@ -42,7 +44,7 @@ (define-syntax (define-preserves-pattern stx) (syntax-case stx () - [(_ top-type-name (ctor-stx field-stxs ...) pattern-stx bindings-stx) + [(_ top-type-name parser-name (ctor-stx field-stxs ...) pattern-stx bindings-stx) #`(begin (begin-for-syntax (register-preserves-pattern! #'ctor-stx @@ -52,6 +54,10 @@ (syntax-case s () [(_ field-stxs ...) pattern-stx] [_ (raise-syntax-error 'ctor-stx "Invalid pattern")])] + ['match-pattern + (syntax-case s () + [(_ field-stxs ...) #`(app parser-name (ctor-stx field-stxs ...))] + [_ (raise-syntax-error 'ctor-stx "Invalid match-pattern")])] ['bindings (syntax-case s () [(_ field-stxs ...) bindings-stx] @@ -130,8 +136,7 @@ [(expander args ...) (pattern-expander-id? #'expander) (pattern-expander-transform disarmed-stx - (lambda (result) - (walk (syntax-rearm result stx))))] + (lambda (result) (walk (syntax-rearm result stx))))] ;; Extremely limited support for quasiquoting and quoting [(quasiquote (unquote p)) (walk #'p)] @@ -231,6 +236,54 @@ [other '()]))) + (define (analyse-match-pattern stx) + (let walk ((stx stx)) + (define disarmed-stx (syntax-disarm stx orig-insp)) + (syntax-case disarmed-stx ($ quasiquote unquote quote) + [(ctor args ...) + (constructor-registered? #'ctor) + ((free-id-table-ref preserves-pattern-registry #'ctor) 'match-pattern disarmed-stx)] + + [(expander args ...) + (pattern-expander-id? #'expander) + (pattern-expander-transform disarmed-stx + (lambda (result) (walk (syntax-rearm result stx))))] + + ;; Extremely limited support for quasiquoting and quoting + [(quasiquote (unquote p)) (walk #'p)] + [(quasiquote (p ...)) (walk #'(list (quasiquote p) ...))] + [(quasiquote p) #''p] + [(quote p) #''p] + + [(unquote p) (raise-syntax-error #f "Out-of-place unquote in match-pattern")] + + [(ctor piece ...) + (struct-info? (id-value #'ctor)) + #`(ctor (:parse piece) ...)] + + [(list-stx piece ...) + (list-id? #'list-stx) + #`(list-stx (:parse piece) ...)] + + [(hash-stx piece ...) + (hash-or-hasheqv-id? #'hash-stx) + #`(hash-table #,@(let loop ((pieces (syntax->list #'(piece ...)))) + (match pieces + ['() '()] + [(list* k v more) (list* k #`(:parse #,v) (loop more))])) + [_ _] ___)] + + [(or-stx piece ...) + (and (identifier? #'or-stx) (free-identifier=? #'or #'or-stx)) + #`(or (:parse piece) ...)] + + [(and-stx piece ...) + (and (identifier? #'and-stx) (free-identifier=? #'and #'and-stx)) + #`(and (:parse piece) ...)] + + [other + #`other]))) + (define (expand-:pattern stx) (syntax-case stx () [(_ pat-stx atomic-literal-transformer) @@ -252,6 +305,11 @@ (lambda (stx) (expand-:pattern stx))) +(define-match-expander :parse + (lambda (stx) + (syntax-case stx () + [(_ pat-stx) (analyse-match-pattern #'pat-stx)]))) + ;;--------------------------------------------------------------------------- (define (select-pattern-leaves desc capture-fn lit-fn) diff --git a/syndicate/schema-compiler.rkt b/syndicate/schema-compiler.rkt index a5e5bfb..4b70837 100644 --- a/syndicate/schema-compiler.rkt +++ b/syndicate/schema-compiler.rkt @@ -117,7 +117,10 @@ [_ #f]))) (if (not fields) (k-nonrecord) - `(define-preserves-pattern ,top-name (,name ,@(map escape (map ty-field-name fields))) + `(define-preserves-pattern + ,top-name + ,(format-symbol "parse-~a" top-name) + (,name ,@(map escape (map ty-field-name fields))) (quasisyntax ,(pat-pattern p)) (append ,@(for/list [(f (in-list fields))] `(map ,(final-atom-destructurer (unwrap (ty-field-pattern f))) @@ -126,16 +129,20 @@ (match def [(Definition-or p0 p1 pN) - `(begin ,@(for/list [(named-alt (in-list (list* p0 p1 pN))) - (alt-ty (in-list (map ty-variant-type (ty-union-variants (definition-ty def)))))] - (match-define (NamedAlternative variant-label-str pattern) named-alt) - (define full-name (format-symbol "~a-~a" name variant-label-str)) - (top-pat name full-name pattern alt-ty - (lambda () - `(define-preserves-pattern ,name (,full-name value) - (analyse-pattern #'value) - (map ,(final-atom-destructurer (unwrap pattern)) - (analyse-pattern-bindings (syntax value))))))))] + `(begin + ,@(for/list [(named-alt (in-list (list* p0 p1 pN))) + (alt-ty (in-list (map ty-variant-type (ty-union-variants (definition-ty def)))))] + (match-define (NamedAlternative variant-label-str pattern) named-alt) + (define full-name (format-symbol "~a-~a" name variant-label-str)) + (top-pat name full-name pattern alt-ty + (lambda () + `(define-preserves-pattern + ,name + ,(format-symbol "parse-~a" name) + (,full-name value) + (analyse-pattern #'value) + (map ,(final-atom-destructurer (unwrap pattern)) + (analyse-pattern-bindings (syntax value))))))))] [(? Definition-and?) `(begin)] [(Definition-Pattern p) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index e45890e..64f07a8 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -9,6 +9,7 @@ entity actor-system + object ref react @@ -20,6 +21,7 @@ sync! send! spawn + spawn/link begin/dataflow define/dataflow @@ -75,6 +77,73 @@ [(_ name: expr ...) #'(actor:actor-system #:name name.N (lambda () expr ...))])) +(define-syntax (object stx) + (syntax-parse stx + [(_ name: handler ...) + #`(let ((state (make-hash))) + (define (handler-function assertion) + (-object-clauses assertion [] [handler ...])) + (ref (entity #:name name.N + #:assert (lambda (m h) (-object-assert state handler-function m h)) + #:retract (lambda (h) (-object-retract state h)) + #:message (lambda (m) (-object-message handler-function m)))))])) + +(define (-object-assert state handler-function assertion handle) + (define k (handler-function assertion)) + (when k (hash-set! state handle k))) + +(define (-object-retract state handle) + (define k (hash-ref state handle #f)) + (when k + (hash-remove! state handle) + (k))) + +(define (-object-message handler-function message) + (define k (handler-function message)) + (when k + (k))) + +(define-syntax (-object-clauses stx) + (syntax-parse stx + [(_ input [completed ...] []) + #'(match input + completed ... + [_ #f])] + + [(_ input [completed ...] [ [#:spawn pat body ...] more ... ]) + #'(-object-clauses input + [completed ...] + [ [#:during pat (spawn/link body ...)] more ... ])] + + [(_ input [completed ...] [ [#:asserted pat body+ ... #:retracted body- ...] more ... ]) + #`(-object-clauses input + [ completed ... [(-object-pattern pat) + body+ ... + #,(if (null? (syntax->list #'(body- ...))) + #`#f + #`(lambda () body- ...))] ] + [more ...])] + + [(_ input [completed ...] [ [#:asserted pat body+ ...] more ... ]) + #'(-object-clauses input + [completed ...] + [ [#:asserted pat body+ ... #:retracted] more ... ])] + + [(_ input [completed ...] [ [pat body ...] more ... ]) + #'(-object-clauses input + [completed ...] + [ [#:asserted pat + (define f (react (facet-prevent-inert-check! this-facet) body ...)) + #:retracted + (stop-facet f)] + more ... ])])) + +(define-match-expander -object-pattern + (lambda (stx) + (syntax-case stx () + [(_ pat-stx) + (analyse-match-pattern #'pat-stx)]))) + (define (ref entity) (entity-ref this-facet entity '())) @@ -114,6 +183,21 @@ this-turn (lambda () setup-expr ...))])) +(define-syntax (spawn/link stx) + (syntax-parse stx + [(_ name-stx: daemon: setup-expr ...) + #`(begin + (define name name-stx.N) + (define monitor (ref (entity/stop-on-retract #:name (list name 'monitor-in-parent)))) + (define monitor-handle (turn-assert! this-turn monitor 'alive)) + (turn-spawn! this-turn + #:name name + #:daemon? daemon.D + #:link + (entity/stop-on-retract #:name (list name 'monitor-in-child)) + (lambda () setup-expr ...) + (hasheq monitor-handle #t)))])) + (define-syntax-rule (begin/dataflow expr ...) (turn-dataflow! this-turn (lambda () expr ...))) @@ -216,23 +300,10 @@ (lambda (stx) (syntax-parse stx [(_ pat name-stx: daemon: expr ...) - #`(assert - (Observe (:pattern pat) - (ref (during* - (lambda (bindings) - (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) - (define name name-stx.N) - (define monitor - (ref (entity/stop-on-retract #:name (list name 'monitor-in-parent)))) - (define monitor-handle (turn-assert! this-turn monitor 'alive)) - (turn-spawn! this-turn - #:name name - #:daemon? daemon.D - #:link - (entity/stop-on-retract #:name - (list name 'monitor-in-child)) - (lambda () expr ...) - (hasheq monitor-handle #t)))))))]))) + #`(assert (Observe (:pattern pat) + (ref (during* (lambda (bindings) + (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) + (spawn/link #:name name-stx.N #:daemon? daemon.D expr ...))))))]))) (define (during* f #:name [name '?]) (define assertion-map (make-hash)) @@ -255,6 +326,7 @@ ;;; Local Variables: ;;; eval: (put 'actor-system/dataspace 'racket-indent-function 1) ;;; eval: (put 'at 'racket-indent-function 1) +;;; eval: (put 'object 'racket-indent-function 0) ;;; eval: (put 'react 'racket-indent-function 0) ;;; eval: (put 'spawn 'racket-indent-function 0) ;;; eval: (put 'stop-when 'racket-indent-function 1)