(* Copyright 2012 Tony Garnock-Jones . *) (* This file is part of Hop. *) (* Hop is free software: you can redistribute it and/or modify it *) (* under the terms of the GNU General Public License as published by the *) (* Free Software Foundation, either version 3 of the License, or (at your *) (* option) any later version. *) (* Hop is distributed in the hope that it will be useful, but *) (* WITHOUT ANY WARRANTY; without even the implied warranty of *) (* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) (* General Public License for more details. *) (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) open Lwt open Sexp open Status (* TODO: on unsubscribe, wake up the shoveller to make it clean out its waiters queue *) type t = { name: Node.name; subscriptions: Subscription.set_t; backlog_in: Sexp.t Lwt_stream.t; backlog_out: Sexp.t option -> unit; waiters_in: Subscription.t Lwt_stream.t; waiters_out: Subscription.t option -> unit; mutable backlog: int; mutable waiters: int; } let classname = "queue" let report info n = Log.info (Printf.sprintf "do_burst %d backlog, %d waiters, %d ticks left\n%!" info.backlog info.waiters n) [] let shoveller info = let rec message_loop () = lwt body = Lwt_stream.next info.backlog_in in let rec waiter_loop () = lwt sub = Lwt_stream.next info.waiters_in in match_lwt Subscription.send_to_subscription info.name info.subscriptions sub body with | true -> info.backlog <- info.backlog - 1; info.waiters_out (Some sub); message_loop () | false -> waiter_loop () in waiter_loop () in message_loop () let queue_handler info n sexp = match Message.message_of_sexp sexp with | Message.Post (name, body, token) -> info.backlog <- info.backlog + 1; info.backlog_out (Some body); return () | Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) -> lwt sub = Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name in info.waiters <- info.waiters + 1; info.waiters_out (Some sub); return () | Message.Unsubscribe (Str token) -> ignore (Subscription.delete info.name info.subscriptions token); info.waiters <- info.waiters - 1; return () | m -> Util.message_not_understood "queue" m let queue_factory arg = match arg with | (Arr [Str name_str]) -> let (bin, bout) = Lwt_stream.create () in let (win, wout) = Lwt_stream.create () in let info = { name = Node.name_of_string name_str; subscriptions = Subscription.new_set (); backlog_in = bin; backlog_out = bout; waiters_in = win; waiters_out = wout; backlog = 0; waiters = 0 } in ignore (Util.create_thread name_str None shoveller info); replace_ok (Node.make_idempotent_named classname info.name (queue_handler info)) (Str name_str) | _ -> return (Problem (Str "bad-arg")) let init () = Factory.register_class classname queue_factory