(* Copyright 2012 Tony Garnock-Jones . *) (* This file is part of Hop. *) (* Hop is free software: you can redistribute it and/or modify it *) (* under the terms of the GNU General Public License as published by the *) (* Free Software Foundation, either version 3 of the License, or (at your *) (* option) any later version. *) (* Hop is distributed in the hope that it will be useful, but *) (* WITHOUT ANY WARRANTY; without even the implied warranty of *) (* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) (* General Public License for more details. *) (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) open Lwt open Sexp open Status (* TODO: on unsubscribe, wake up the shoveller to make it clean out its waiters queue *) type t = { name: Node.name; subscriptions: Subscription.set_t; backlog_in: Sexp.t Lwt_stream.t; backlog_out: Sexp.t option -> unit; waiters_in: Subscription.t Lwt_stream.t; waiters_out: Subscription.t option -> unit; mutable backlog: int; mutable waiters: int; } let classname = (Bytes.of_string "queue") let report info = while_lwt true do lwt () = Log.info (Printf.sprintf "%s: %d backlog, %d waiters" (Bytes.to_string info.name.Node.label) info.backlog info.waiters) [] in Lwt_unix.sleep 1.0 done let shoveller info = let rec message_loop () = lwt body = Lwt_stream.next info.backlog_in in let rec waiter_loop () = lwt sub = Lwt_stream.next info.waiters_in in match_lwt Subscription.send_to_subscription info.name info.subscriptions sub body with | true -> info.backlog <- info.backlog - 1; info.waiters_out (Some sub); message_loop () | false -> info.waiters <- info.waiters - 1; waiter_loop () in waiter_loop () in message_loop () let queue_handler info n sexp = match Message.message_of_sexp sexp with | Message.Post (name, body, token) -> lwt () = if info.backlog > 1000 && info.waiters > 0 then Lwt_unix.yield () else return () in info.backlog <- info.backlog + 1; info.backlog_out (Some body); return () | Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) -> (match_lwt Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name with | Subscription.New sub -> info.waiters <- info.waiters + 1; info.waiters_out (Some sub); return () | Subscription.Old sub -> return ()) | Message.Unsubscribe (Str token) -> ignore (Subscription.delete info.name info.subscriptions token); info.waiters <- info.waiters - 1; return () | m -> Util.message_not_understood "queue" m let queue_factory arg = match arg with | (Arr [Str name_str]) -> let (bin, bout) = Lwt_stream.create () in let (win, wout) = Lwt_stream.create () in let info = { name = Node.name_of_bytes name_str; subscriptions = Subscription.new_set (); backlog_in = bin; backlog_out = bout; waiters_in = win; waiters_out = wout; backlog = 0; waiters = 0; } in replace_ok (Node.make_idempotent_named classname info.name (fun () -> ignore (Util.create_thread name_str None shoveller info); ignore (report info); return ()) (queue_handler info)) (Str name_str) | _ -> return (Problem (litstr "bad-arg")) let init () = Factory.register_class classname queue_factory