Yesterday's work on relay + TCP distribution

This commit is contained in:
Tony Garnock-Jones 2021-06-08 09:33:56 +02:00
parent 6f9ee4eb44
commit 468b6541f9
3 changed files with 425 additions and 0 deletions

View File

@ -0,0 +1,85 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(require (only-in sha bytes->hex-string))
(require racket/tcp)
(require (only-in racket/list append-map))
(require syndicate/relay)
(require syndicate/sturdy)
(require syndicate/schemas/gen/gatekeeper)
(require syndicate/sturdy)
(require syndicate/driver-support)
(define (read-bytes-avail input-port #:limit [limit 65536])
(define buffer (make-bytes limit))
(match (read-bytes-avail! buffer input-port)
[(? number? count) (subbytes buffer 0 count)]
[other other]))
(module+ main
(actor-system/dataspace (ds)
(define ds-oid "syndicate")
(define ds-key (make-bytes KEY_LENGTH))
(at ds (assert (Bind ds-oid ds-key ds)))
(define root-cap (mint ds-oid ds-key))
(write-preserve/text (SturdyRef->preserves root-cap) #:indent 4 #:commas? #f)
(newline)
(displayln (bytes->hex-string (sturdy-encode (SturdyRef->preserves root-cap))))
(define spawn-connection
(action (connection-custodian i o)
(define name-base (call-with-values (lambda () (tcp-addresses i #t)) list))
(spawn-relay
this-turn
#:name name-base
#:packet-writer (lambda (bs) (write-bytes bs o))
#:setup-inputs (action (tr)
(on-stop (close-input-port i)
(close-output-port o))
(linked-thread
#:name (cons 'input-thread name-base)
#:custodian connection-custodian
this-turn
(ref (entity #:name (cons 'socket-monitor name-base)
#:retract (action (_handle) (stop-current-facet))))
(lambda ()
(let loop ()
(define bs (read-bytes-avail i))
(when (bytes? bs)
(accept-bytes tr bs)
(loop))))))
#:initial-ref
(ref (during* #:name (cons 'gatekeeper name-base)
(action (assertion)
(match (parse-Resolve assertion)
[(? eof-object?) (void)]
[(Resolve unvalidated-sturdyref observer)
(at ds
(during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target)
(define sturdyref (validate unvalidated-sturdyref key))
(define attenuation
(append-map values (reverse (SturdyRef-caveatChain sturdyref))))
(define attenuated-target
(apply attenuate-entity-ref target attenuation))
(at observer (assert (embedded attenuated-target)))))])))))))
(spawn
#:name 'tcp-server
(linked-thread
#:name 'tcp-server
this-turn
(ref (entity #:name 'listen-monitor #:retract (action (_handle) (stop-current-facet))))
(lambda ()
(define listener (tcp-listen 5999 512 #t "0.0.0.0"))
(let loop ()
(define connection-custodian (make-custodian))
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-accept listener)))
(turn-freshen this-turn (action () (spawn-connection this-turn connection-custodian i o)))
(loop)))))))

View File

@ -0,0 +1,40 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide linked-thread)
(require "engine.rkt")
(define-logger syndicate/driver-support)
(define linked-thread
(action (peer
thunk
#:name [name (gensym 'linked-thread)]
#:custodian [c (make-custodian)])
(define handle #f)
(define armed? #t)
(define !
(action ()
(when armed?
(set! armed? #f)
(log-syndicate/driver-support-info "~a shutdown" name)
(turn-retract! this-turn handle)
(queue-task! (actor-engine this-actor) (lambda () (custodian-shutdown-all c)))
(actor-remove-exit-hook! this-actor !))))
(on-stop (! this-turn))
(actor-add-exit-hook! this-actor !)
(log-syndicate/driver-support-info "~a startup" name)
(set! handle
(parameterize ((current-custodian c))
(turn-assert! this-turn
peer
(embedded
(thread (lambda ()
(with-handlers ([(lambda (_e) #t) (lambda (_e) (void))]) (thunk))
(turn-freshen this-turn !)))))))))

300
syndicate/relay.rkt Normal file
View File

@ -0,0 +1,300 @@
#lang racket/base
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide make-tunnel-relay
accept-bytes
spawn-relay)
(require racket/match)
(require preserves)
(require "main.rkt")
(require "engine.rkt")
(require "schemas/gen/protocol.rkt")
(require (prefix-in sturdy: "schemas/gen/sturdy.rkt"))
;;---------------------------------------------------------------------------
(struct inbound (local-handle imported))
(struct wire-symbol (oid ref [count #:mutable]))
(struct membrane (oid-map ref-map))
;; There are other kinds of relay. This one has exactly two participants connected to each other.
(struct tunnel-relay (facet
name
[read-buffer #:mutable]
packet-writer
inbound-assertions
outbound-assertions
exported-references
imported-references
[next-local-oid #:mutable]
[pending-turn-rev #:mutable]
)
#:transparent)
(struct relay-entity (relay oid) #:transparent)
;;---------------------------------------------------------------------------
(define *inert-ref*
(entity-ref *dead-facet*
(entity #:name '*inert-ref*)
'()))
;;---------------------------------------------------------------------------
(define (make-membrane)
(membrane (make-hash) (make-hasheq)))
(define (grab m getter key transient? f)
(let ((ws (hash-ref (getter m)
key
(lambda ()
(define ws (f))
(hash-set! (membrane-oid-map m) (wire-symbol-oid ws) ws)
(hash-set! (membrane-ref-map m) (wire-symbol-ref ws) ws)
ws))))
(when (not transient?) (set-wire-symbol-count! ws (+ (wire-symbol-count ws) 1)))
ws))
(define (drop m ws)
(set-wire-symbol-count! ws (- (wire-symbol-count ws) 1))
(when (zero? (wire-symbol-count ws))
(hash-remove! (membrane-oid-map m) (wire-symbol-oid ws))
(hash-remove! (membrane-ref-map m) (wire-symbol-ref ws))))
;;---------------------------------------------------------------------------
(define (make-tunnel-relay turn name packet-writer)
(define tr
(tunnel-relay (turn-active-facet turn)
name
#""
packet-writer
(make-hash)
(make-hash)
(make-membrane)
(make-membrane)
0
'()))
tr)
(define accept-bytes
(lambda (tr bs)
(turn! (tunnel-relay-facet tr)
(action ()
(define buffer (if (positive? (bytes-length (tunnel-relay-read-buffer tr)))
(bytes-append (tunnel-relay-read-buffer tr) bs)
bs))
(set-tunnel-relay-read-buffer! tr buffer)
(define p (open-input-bytes buffer))
(let read-more ()
(define start-pos (file-position p))
(match (read-preserve/binary p
#:read-syntax? #f
#:decode-embedded decode-embedded:protocol
#:on-short (lambda () eof))
[(? eof-object?)
(when (positive? start-pos)
(set-tunnel-relay-read-buffer! tr (subbytes buffer start-pos)))]
[packet
(handle-packet this-turn tr packet)
(read-more)]))))))
(define (lookup-local tr local-oid)
(define ws (hash-ref (membrane-oid-map (tunnel-relay-exported-references tr)) local-oid #f))
(if ws (wire-symbol-ref ws) *inert-ref*))
(define rewrite-in
(action (tr assertion)
(define imported '())
(define (save! ws) (set! imported (cons ws imported)))
(define rewritten
((map-embeddeds (lambda (r) (embedded (rewrite-ref-in this-turn tr r save!)))) assertion))
(values rewritten imported)))
(define ref-attenuation-refinements (make-weak-hasheq))
(define rewrite-ref-in
(action (tr wire-ref save!)
(match wire-ref
[(sturdy:WireRef-mine oid)
(define ws (grab (tunnel-relay-imported-references tr)
membrane-oid-map
oid
#f
(lambda ()
(wire-symbol oid (turn-ref this-turn (make-relay-entity tr oid)) 0))))
(save! ws)
(wire-symbol-ref ws)]
[(sturdy:WireRef-yours oid attenuation)
(define r (lookup-local tr oid))
(if (or (null? attenuation) (eq? *inert-ref* r))
r
(hash-ref! (hash-ref! ref-attenuation-refinements r make-hash)
attenuation
(lambda () (apply attenuate-entity-ref r attenuation))))])))
(define handle-packet
(action (tr packet)
(match (parse-Turn packet)
[(? eof-object?) (error 'handle-packet "Invalid IO.Turn")]
[wire-turn
(for [(ev (in-list wire-turn))]
(match-define (TurnEvent oid event) ev)
(define r (lookup-local tr oid))
(match event
[(Event-Assert (Assert assertion remote-handle))
(define-values (a imported) (rewrite-in this-turn tr assertion))
(hash-set! (tunnel-relay-inbound-assertions tr)
remote-handle
(inbound (turn-assert! this-turn r a)
imported))]
[(Event-Retract (Retract remote-handle))
(define i (hash-ref (tunnel-relay-inbound-assertions tr) remote-handle #f))
(when (not i) (error 'handle-packet "Peer retracted invalid handle ~a" remote-handle))
(hash-remove! (tunnel-relay-inbound-assertions tr) remote-handle)
(for [(ws (in-list (inbound-imported i)))]
(drop (tunnel-relay-imported-references tr) ws))
(turn-retract! this-turn (inbound-local-handle i))]
[(Event-Message (Message body))
(define-values (a imported) (rewrite-in this-turn tr body))
(when (not (null? imported))
(error 'handle-packet "Cannot receive transient reference"))
(turn-message! this-turn r a)]
[(Event-Sync (Sync peer))
(define imported '())
(define (save! ws) (set! imported (cons ws imported)))
(define k (rewrite-ref-in this-turn tr peer save!))
(turn-sync! this-turn
r
(action (_true)
(turn-message! this-turn k #t)
(for [(ws (in-list imported))]
(drop (tunnel-relay-imported-references tr) ws))))]))])))
(define (send-event tr event)
(when (null? (tunnel-relay-pending-turn-rev tr))
(queue-task! (actor-engine (facet-actor (tunnel-relay-facet tr)))
(lambda ()
(define pending (reverse (tunnel-relay-pending-turn-rev tr)))
(set-tunnel-relay-pending-turn-rev! tr '())
((tunnel-relay-packet-writer tr)
(preserve->bytes (Turn->preserves pending)
#:canonicalizing? #t
#:write-annotations? #f
#:encode-embedded encode-embedded:protocol)))))
(set-tunnel-relay-pending-turn-rev! tr (cons event (tunnel-relay-pending-turn-rev tr))))
(define (rewrite-out tr assertion transient?)
(define exported '())
(define (save! ws) (set! exported (cons ws exported)))
(define rewritten
((map-embeddeds (lambda (r) (embedded (rewrite-ref-out tr r transient? save!)))) assertion))
(values rewritten exported))
(define (rewrite-ref-out* tr local-ref transient? save!)
(define ws (grab (tunnel-relay-exported-references tr)
membrane-ref-map
local-ref
transient?
(lambda ()
(define oid (tunnel-relay-next-local-oid tr))
(set-tunnel-relay-next-local-oid! tr (+ oid 1))
(wire-symbol oid local-ref 0))))
(save! ws)
(sturdy:WireRef-mine (wire-symbol-oid ws)))
(define (rewrite-ref-out tr local-ref transient? save!)
(define re (entity-data (entity-ref-target local-ref)))
(cond [(or (not (relay-entity? re)) (not (eq? (relay-entity-relay re) tr)))
(rewrite-ref-out* tr local-ref transient? save!)]
[(null? (entity-ref-attenuation local-ref))
(sturdy:WireRef-yours (relay-entity-oid re) '())]
[else
;; we may trust the peer to enforce attenuation on our
;; behalf, in which case we can return (sturdy:WireRef-yours
;; (relay-entity-oid re) (entity-ref-attenuation local-ref))
;; here, but for now we don't.
(rewrite-ref-out* tr local-ref transient? save!)]))
(define (release-ref-out tr ws)
(drop (tunnel-relay-exported-references tr) ws))
(define (register tr assertion maybe-handle)
(define-values (rewritten exported) (rewrite-out tr assertion (eq? maybe-handle #f)))
(when maybe-handle (hash-set! (tunnel-relay-outbound-assertions tr) maybe-handle exported))
rewritten)
(define (deregister tr handle)
(for [(ws (in-list (hash-ref (tunnel-relay-outbound-assertions tr) handle '())))]
(release-ref-out tr ws))
(hash-remove! (tunnel-relay-outbound-assertions tr) handle))
(define (make-relay-entity tr oid)
(entity #:name (list (tunnel-relay-name tr) oid)
#:assert (action (assertion handle)
(send-event tr (Event-Assert (Assert (register tr assertion handle) handle))))
#:retract (action (handle)
(deregister tr handle)
(send-event tr (Event-Retract (Retract handle))))
#:message (action (body)
(send-event tr (Event-Message (Message (register tr body #f)))))
#:sync (action (peer)
(define exported #f)
(define (save! ws) (set! exported ws))
(define spe (sync-peer-entity tr oid peer (lambda () exported)))
(send-event tr
(Event-Sync
(Sync (embedded
(rewrite-ref-out tr (turn-ref this-turn spe) #f save!))))))
#:data (relay-entity tr oid)))
(define (sync-peer-entity tr oid peer get-export)
(define handle-map (make-hash))
(entity #:name (list (tunnel-relay-name tr) oid 'sync)
#:assert (action (assertion handle)
(hash-set! handle-map handle (turn-assert! this-turn peer assertion)))
#:retract (action (handle)
(turn-retract! this-turn (hash-ref handle-map handle))
(hash-remove! handle-map handle))
#:message (action (body)
(release-ref-out tr (get-export))
(turn-message! this-turn peer body))
#:sync (action (peer-k)
(turn-sync! this-turn peer peer-k))))
(define (spawn-relay turn
#:packet-writer packet-writer
#:setup-inputs setup-inputs
#:then [then #f]
#:name [name (gensym 'relay)]
#:initial-oid [initial-oid #f]
#:initial-ref [initial-ref #f])
(turn-spawn! #:name name
turn
(action ()
(define tr (make-tunnel-relay this-turn name packet-writer))
(setup-inputs this-turn tr)
(when initial-ref
(rewrite-ref-out tr initial-ref #f (lambda (_ws) (void))))
(when then
(turn-assert! this-turn
then
(and initial-oid
(embedded (rewrite-ref-in this-turn
tr
(sturdy:WireRef-mine initial-oid)
(lambda (_ws) (void))))))))))
(define-syntax-rule (D v0)
(let ((v v0))
(log-info "~a: ~v" 'v0 v)
v))
(require racket/trace)
(trace rewrite-out)