syndicate-rkt/syndicate/async.rkt

75 lines
2.1 KiB
Racket

#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2022 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide async
async?
suspend
await)
(require (only-in (submod "actor.rkt" internals) with-active-facet))
(define prompt-tag (make-continuation-prompt-tag 'syndicate))
(define-syntax-rule (async body ...)
(async* (lambda () body ...)))
(define (async* thunk)
(call-with-continuation-prompt thunk prompt-tag))
(define (async?)
(continuation-prompt-available? prompt-tag))
(define (capture-chain proc)
(call-with-composable-continuation
(lambda (k)
(let ((k (lambda results
(async* (lambda () (apply k results))))))
(abort-current-continuation prompt-tag (lambda () (proc k)))))
prompt-tag))
(define (suspend proc)
(unless (async?) (error 'suspend "Cannot suspend async chain outside async block"))
(define facet this-facet)
(capture-chain
(lambda (k)
(proc (lambda results
(with-active-facet facet
(lambda ()
(apply k results))))))))
(define-syntax-rule (await k body ...)
(suspend (lambda (k)
(react
(define facet this-facet)
(let ((k (lambda results (stop-facet facet (apply k results)))))
body ...)))))
(module+ test
(require "dataspace.rkt")
(actor-system/dataspace (ds)
(spawn
#:name 'service
(at ds
(during 'waiting
(assert 'ready)
(on-stop (stop-current-facet)))))
(at ds
(define (D x) (log-info "~v ~a" this-facet x))
(on-start (D "outer+"))
(on-stop (D "outer-"))
(async (D "1")
(await k (D "@") (k (void)))
(D "2")
(await k
(on-start (D "A1+"))
(on-stop (D "A1-"))
(assert 'waiting)
(on (asserted 'ready)
(D "A2")
(k (void))))
(D "3")
(await k (D "B") (k (void)))
(D "4")
(stop-current-facet)))))