Lwt port of AMQP driver
This commit is contained in:
parent
f8bfb0e9dd
commit
7d5a29c3d6
|
@ -269,10 +269,10 @@ def print_codec():
|
|||
bits_remaining = bits_remaining - 1
|
||||
else:
|
||||
print ' let %s = read_%s input_buf in' % (target, mlify(f.type))
|
||||
print ' ' + m.pattern()
|
||||
print ' | _ -> raise (Amqp_exception (frame_error,'
|
||||
print ' Printf.sprintf "Unknown method number %d/%d"'
|
||||
print ' class_index method_index))'
|
||||
print ' Lwt.return (' + m.pattern() + ')'
|
||||
print ' | _ -> raise_lwt (Amqp_exception (frame_error,'
|
||||
print ' Printf.sprintf "Unknown method number %d/%d"'
|
||||
print ' class_index method_index))'
|
||||
print
|
||||
print 'let method_index m = match m with'
|
||||
for m in methods:
|
||||
|
@ -319,9 +319,9 @@ def print_codec():
|
|||
'Some (read_%s input_buf) else None in') % \
|
||||
(target, 1 << property_bit, mlify(f.type))
|
||||
property_bit = property_bit - 1
|
||||
print ' ' + c.pattern()
|
||||
print ' | _ -> raise (Amqp_exception (frame_error, Printf.sprintf "Unknown class number %d"'
|
||||
print ' class_index))'
|
||||
print ' Lwt.return (' + c.pattern() + ')'
|
||||
print ' | _ -> raise_lwt (Amqp_exception (frame_error, Printf.sprintf "Unknown class number %d"'
|
||||
print ' class_index))'
|
||||
print
|
||||
print 'let class_index p = match p with'
|
||||
for c in classes:
|
||||
|
|
314
amqp_relay.ml
314
amqp_relay.ml
|
@ -15,64 +15,68 @@
|
|||
(* You should have received a copy of the GNU General Public License *)
|
||||
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
|
||||
|
||||
open Unix
|
||||
open Lwt
|
||||
open Printf
|
||||
open Thread
|
||||
open Amqp_spec
|
||||
open Amqp_wireformat
|
||||
|
||||
type connection_t = {
|
||||
peername: Unix.sockaddr;
|
||||
mtx: Mutex.t;
|
||||
cin: in_channel;
|
||||
cout: out_channel;
|
||||
name: Node.name;
|
||||
mutable input_buf: string;
|
||||
mutable output_buf: Buffer.t;
|
||||
mutable frame_max: int;
|
||||
mutable connection_closed: bool;
|
||||
mutable recent_queue_name: Node.name option;
|
||||
mutable delivery_tag: int
|
||||
}
|
||||
peername: Unix.sockaddr;
|
||||
mtx: Lwt_mutex.t;
|
||||
cin: Lwt_io.input_channel;
|
||||
cout: Lwt_io.output_channel;
|
||||
name: Node.name;
|
||||
mutable input_buf: string;
|
||||
mutable output_buf: Obuffer.t;
|
||||
mutable frame_max: int;
|
||||
mutable connection_closed: bool;
|
||||
mutable recent_queue_name: Node.name option;
|
||||
mutable delivery_tag: int
|
||||
}
|
||||
|
||||
let initial_frame_size = frame_min_size
|
||||
let suggested_frame_max = 131072
|
||||
|
||||
let amqp_boot (peername, mtx, cin, cout) = {
|
||||
let amqp_boot (peername, cin, cout) = return {
|
||||
peername = peername;
|
||||
mtx = mtx;
|
||||
mtx = Lwt_mutex.create ();
|
||||
cin = cin;
|
||||
cout = cout;
|
||||
name = Node.name_of_string (Uuid.create ());
|
||||
input_buf = String.create initial_frame_size;
|
||||
output_buf = Buffer.create initial_frame_size;
|
||||
output_buf = Obuffer.create initial_frame_size;
|
||||
frame_max = initial_frame_size;
|
||||
connection_closed = false;
|
||||
recent_queue_name = None;
|
||||
delivery_tag = 1 (* Not 0: 0 means "all deliveries" in an ack *)
|
||||
}
|
||||
|
||||
let input_byte c = lwt b = Lwt_io.read_char c in return (int_of_char b)
|
||||
|
||||
let read_frame conn =
|
||||
let frame_type = input_byte conn.cin in
|
||||
let channel_hi = input_byte conn.cin in
|
||||
let channel_lo = input_byte conn.cin in
|
||||
lwt frame_type = input_byte conn.cin in
|
||||
lwt channel_hi = input_byte conn.cin in
|
||||
lwt channel_lo = input_byte conn.cin in
|
||||
let channel = (channel_hi lsr 8) lor channel_lo in
|
||||
let length = input_binary_int conn.cin in
|
||||
lwt length = Lwt_io.BE.read_int conn.cin in
|
||||
if length > conn.frame_max
|
||||
then die frame_error "Frame longer than current frame_max"
|
||||
else
|
||||
(really_input conn.cin conn.input_buf 0 length;
|
||||
if input_byte conn.cin <> frame_end
|
||||
(lwt () = Lwt_io.read_into_exactly conn.cin conn.input_buf 0 length in
|
||||
lwt end_marker = input_byte conn.cin in
|
||||
if end_marker <> frame_end
|
||||
then die frame_error "Missing frame_end octet"
|
||||
else (frame_type, channel, length))
|
||||
else return (frame_type, channel, length))
|
||||
|
||||
let output_byte c b = Lwt_io.write_char c (char_of_int b)
|
||||
|
||||
let write_frame conn frame_type channel =
|
||||
output_byte conn.cout frame_type;
|
||||
output_byte conn.cout ((channel lsr 8) land 255);
|
||||
output_byte conn.cout (channel land 255);
|
||||
output_binary_int conn.cout (Buffer.length conn.output_buf);
|
||||
Buffer.output_buffer conn.cout conn.output_buf;
|
||||
Buffer.reset conn.output_buf;
|
||||
lwt () = output_byte conn.cout frame_type in
|
||||
lwt () = output_byte conn.cout ((channel lsr 8) land 255) in
|
||||
lwt () = output_byte conn.cout (channel land 255) in
|
||||
lwt () = Lwt_io.BE.write_int conn.cout (Obuffer.length conn.output_buf) in
|
||||
lwt () = Obuffer.write conn.cout conn.output_buf in
|
||||
Obuffer.reset conn.output_buf;
|
||||
output_byte conn.cout frame_end
|
||||
|
||||
let serialize_method buf m =
|
||||
|
@ -97,82 +101,84 @@ let deserialize_header buf =
|
|||
let class_id = read_short buf in
|
||||
let _ = read_short buf in
|
||||
let body_size = Int64.to_int (read_longlong buf) in
|
||||
(body_size, read_properties class_id buf)
|
||||
lwt props = read_properties class_id buf in
|
||||
return (body_size, props)
|
||||
|
||||
let send_content_body conn channel body =
|
||||
let len = String.length body in
|
||||
let rec send_remainder offset =
|
||||
if offset >= len
|
||||
then ()
|
||||
then return ()
|
||||
else
|
||||
let snip_len = min conn.frame_max (len - offset) in
|
||||
Buffer.add_substring conn.output_buf body offset snip_len;
|
||||
write_frame conn frame_body channel;
|
||||
Obuffer.add_substring conn.output_buf body offset snip_len;
|
||||
lwt () = write_frame conn frame_body channel in
|
||||
send_remainder (offset + snip_len)
|
||||
in send_remainder 0
|
||||
|
||||
let next_frame conn required_type =
|
||||
let (frame_type, channel, length) = read_frame conn in
|
||||
lwt (frame_type, channel, length) = read_frame conn in
|
||||
if frame_type <> required_type
|
||||
then die command_invalid (Printf.sprintf "Unexpected frame type %d" frame_type)
|
||||
else (channel, length)
|
||||
else return (channel, length)
|
||||
|
||||
let next_method conn =
|
||||
let (channel, length) = next_frame conn frame_method in
|
||||
(channel, deserialize_method (Ibuffer.create conn.input_buf 0 length))
|
||||
lwt (channel, length) = next_frame conn frame_method in
|
||||
lwt m = deserialize_method (Ibuffer.create conn.input_buf 0 length) in
|
||||
return (channel, m)
|
||||
|
||||
let next_header conn =
|
||||
let (channel, length) = next_frame conn frame_header in
|
||||
(channel, deserialize_header (Ibuffer.create conn.input_buf 0 length))
|
||||
lwt (channel, length) = next_frame conn frame_header in
|
||||
lwt h = deserialize_header (Ibuffer.create conn.input_buf 0 length) in
|
||||
return (channel, h)
|
||||
|
||||
let recv_content_body conn body_size =
|
||||
let buf = Buffer.create body_size in
|
||||
while Buffer.length buf < body_size do
|
||||
let (_, length) = next_frame conn frame_body in
|
||||
Buffer.add_substring buf conn.input_buf 0 length
|
||||
done;
|
||||
Buffer.contents buf
|
||||
let buf = Obuffer.create body_size in
|
||||
lwt () =
|
||||
while_lwt Obuffer.length buf < body_size do
|
||||
lwt (_, length) = next_frame conn frame_body in
|
||||
return (Obuffer.add_substring buf conn.input_buf 0 length)
|
||||
done
|
||||
in
|
||||
return (Obuffer.contents buf)
|
||||
|
||||
let with_conn_mutex conn thunk = Util.with_mutex0 conn.mtx thunk
|
||||
let with_conn_mutex conn thunk = Lwt_mutex.with_lock conn.mtx thunk
|
||||
|
||||
let send_method conn channel m =
|
||||
with_conn_mutex conn (fun () ->
|
||||
serialize_method conn.output_buf m;
|
||||
write_frame conn frame_method channel;
|
||||
flush conn.cout)
|
||||
write_frame conn frame_method channel)
|
||||
|
||||
let send_content_method conn channel m p body_str =
|
||||
with_conn_mutex conn (fun () ->
|
||||
serialize_method conn.output_buf m;
|
||||
write_frame conn frame_method 1;
|
||||
lwt () = write_frame conn frame_method 1 in
|
||||
serialize_header conn.output_buf (String.length body_str) p;
|
||||
write_frame conn frame_header 1;
|
||||
send_content_body conn 1 body_str;
|
||||
flush conn.cout)
|
||||
lwt () = write_frame conn frame_header 1 in
|
||||
send_content_body conn 1 body_str)
|
||||
|
||||
let send_error conn code message =
|
||||
if conn.connection_closed
|
||||
then
|
||||
()
|
||||
then return ()
|
||||
else
|
||||
conn.connection_closed <- true;
|
||||
let m = Connection_close (code, message, 0, 0) in
|
||||
Log.warn "Sending error" [sexp_of_method m];
|
||||
send_method conn 0 m
|
||||
(conn.connection_closed <- true;
|
||||
let m = Connection_close (code, message, 0, 0) in
|
||||
ignore (Log.warn "Sending error" [sexp_of_method m]);
|
||||
send_method conn 0 m)
|
||||
|
||||
let send_warning conn code message =
|
||||
let m = Channel_close (code, message, 0, 0) in
|
||||
Log.warn "Sending warning" [sexp_of_method m];
|
||||
ignore (Log.warn "Sending warning" [sexp_of_method m]);
|
||||
send_method conn 1 m
|
||||
|
||||
let issue_banner cin cout =
|
||||
let handshake = String.create 8 in
|
||||
try
|
||||
really_input cin handshake 0 8;
|
||||
lwt () = Lwt_io.read_into_exactly cin handshake 0 8 in
|
||||
if String.sub handshake 0 4 <> "AMQP"
|
||||
then (output_string cout "AMQP\000\000\009\001"; false)
|
||||
else true
|
||||
with End_of_file -> false
|
||||
then (lwt () = Lwt_io.write cout "AMQP\000\000\009\001" in return false)
|
||||
else return true
|
||||
with End_of_file -> return false
|
||||
|
||||
let reference_to_logs = "See server logs for details"
|
||||
let extract_str v =
|
||||
|
@ -208,8 +214,8 @@ let send_delivery conn consumer_tag body_sexp =
|
|||
Sexp.Str routing_key;
|
||||
properties_sexp;
|
||||
Sexp.Str body_str]} ->
|
||||
let tag = with_conn_mutex conn (fun () ->
|
||||
let v = conn.delivery_tag in conn.delivery_tag <- v + 1; v)
|
||||
lwt tag = with_conn_mutex conn (fun () ->
|
||||
let v = conn.delivery_tag in conn.delivery_tag <- v + 1; return v)
|
||||
in
|
||||
send_content_method conn 1
|
||||
(Basic_deliver (consumer_tag, Int64.of_int tag, false, exchange, routing_key))
|
||||
|
@ -237,11 +243,11 @@ let amqp_handler conn n m_sexp =
|
|||
| _ ->
|
||||
Log.warn "AMQP outbound relay ignoring message" [m_sexp])
|
||||
with
|
||||
| Amqp_exception (code, message) ->
|
||||
| Amqp_exception (code, message) ->
|
||||
send_error conn code message
|
||||
| exn ->
|
||||
send_error conn internal_error "";
|
||||
raise exn
|
||||
| exn ->
|
||||
lwt () = send_error conn internal_error "" in
|
||||
raise_lwt exn
|
||||
|
||||
let get_recent_queue_name conn =
|
||||
match conn.recent_queue_name with
|
||||
|
@ -256,69 +262,74 @@ let expand_mrdq conn queue =
|
|||
let handle_method conn channel m =
|
||||
if channel > 1 then die channel_error "Unsupported channel number" else ();
|
||||
match m with
|
||||
| Connection_close (code, text, _, _) ->
|
||||
Log.info "Client closed AMQP connection" [Sexp.Str (string_of_int code); Sexp.Str text];
|
||||
send_method conn channel Connection_close_ok;
|
||||
conn.connection_closed <- true
|
||||
| Channel_open ->
|
||||
| Connection_close (code, text, _, _) ->
|
||||
ignore (Log.info "Client closed AMQP connection"
|
||||
[Sexp.Str (string_of_int code); Sexp.Str text]);
|
||||
lwt () = send_method conn channel Connection_close_ok in
|
||||
return (conn.connection_closed <- true)
|
||||
| Channel_open ->
|
||||
conn.delivery_tag <- 1;
|
||||
send_method conn channel Channel_open_ok
|
||||
| Channel_close (code, text, _, _) ->
|
||||
Log.info "Client closed AMQP channel" [Sexp.Str (string_of_int code); Sexp.Str text];
|
||||
| Channel_close (code, text, _, _) ->
|
||||
ignore (Log.info "Client closed AMQP channel"
|
||||
[Sexp.Str (string_of_int code); Sexp.Str text]);
|
||||
send_method conn channel Channel_close_ok;
|
||||
| Channel_close_ok ->
|
||||
()
|
||||
| Exchange_declare (exchange, type_, passive, durable, no_wait, arguments) ->
|
||||
| Channel_close_ok ->
|
||||
return ()
|
||||
| Exchange_declare (exchange, type_, passive, durable, no_wait, arguments) ->
|
||||
Node.send_ignore' "factory" (Message.create (Sexp.Str type_,
|
||||
Sexp.Arr [Sexp.Str exchange],
|
||||
Sexp.Str conn.name.Node.label,
|
||||
Sexp.Str "Exchange_declare_reply"))
|
||||
| Queue_declare (queue, passive, durable, exclusive, auto_delete, no_wait, arguments) ->
|
||||
| Queue_declare (queue, passive, durable, exclusive, auto_delete, no_wait, arguments) ->
|
||||
let queue = (if queue = "" then Uuid.create () else queue) in
|
||||
conn.recent_queue_name <- Some (Node.name_of_string queue);
|
||||
Node.send_ignore' "factory" (Message.create (Sexp.Str "queue",
|
||||
Sexp.Arr [Sexp.Str queue],
|
||||
Sexp.Str conn.name.Node.label,
|
||||
Sexp.Str "Queue_declare_reply"))
|
||||
| Queue_bind (queue, exchange, routing_key, no_wait, arguments) ->
|
||||
| Queue_bind (queue, exchange, routing_key, no_wait, arguments) ->
|
||||
let queue = expand_mrdq conn queue in
|
||||
if not (Node.approx_exists queue)
|
||||
then send_warning conn not_found ("Queue "^queue.Node.label^" not found")
|
||||
else
|
||||
if Node.send' exchange (Message.subscribe (Sexp.Str routing_key,
|
||||
Sexp.Str queue.Node.label,
|
||||
Sexp.Str "",
|
||||
Sexp.Str conn.name.Node.label,
|
||||
Sexp.Str "Queue_bind_reply"))
|
||||
then ()
|
||||
else send_warning conn not_found ("Exchange "^exchange^" not found")
|
||||
| Basic_consume (queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) ->
|
||||
(match_lwt Node.send' exchange (Message.subscribe (Sexp.Str routing_key,
|
||||
Sexp.Str queue.Node.label,
|
||||
Sexp.Str "",
|
||||
Sexp.Str conn.name.Node.label,
|
||||
Sexp.Str "Queue_bind_reply")) with
|
||||
| true -> return ()
|
||||
| false -> send_warning conn not_found ("Exchange "^exchange^" not found"))
|
||||
| Basic_consume (queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) ->
|
||||
let queue = expand_mrdq conn queue in
|
||||
let consumer_tag = (if consumer_tag = "" then Uuid.create () else consumer_tag) in
|
||||
if Node.send queue (Message.subscribe
|
||||
(Sexp.Str "",
|
||||
Sexp.Str conn.name.Node.label,
|
||||
Sexp.Arr [Sexp.Str "delivery"; Sexp.Str consumer_tag],
|
||||
Sexp.Str conn.name.Node.label,
|
||||
Sexp.Arr [Sexp.Str "Basic_consume_reply"; Sexp.Str consumer_tag]))
|
||||
then ()
|
||||
else send_warning conn not_found ("Queue "^queue.Node.label^" not found")
|
||||
| Basic_publish (exchange, routing_key, false, false) ->
|
||||
let (_, (body_size, properties)) = next_header conn in
|
||||
let body = recv_content_body conn body_size in
|
||||
if Node.post' exchange
|
||||
(Sexp.Str routing_key)
|
||||
(Sexp.Hint {Sexp.hint = Sexp.Str "amqp";
|
||||
Sexp.body = Sexp.Arr [Sexp.Str exchange;
|
||||
Sexp.Str routing_key;
|
||||
sexp_of_properties properties;
|
||||
Sexp.Str body]})
|
||||
(Sexp.Str "")
|
||||
then ()
|
||||
else send_warning conn not_found ("Exchange "^exchange^" not found")
|
||||
| Basic_ack (delivery_tag, multiple) ->
|
||||
()
|
||||
| _ ->
|
||||
(match_lwt Node.send queue (Message.subscribe
|
||||
(Sexp.Str "",
|
||||
Sexp.Str conn.name.Node.label,
|
||||
Sexp.Arr [Sexp.Str "delivery"; Sexp.Str consumer_tag],
|
||||
Sexp.Str conn.name.Node.label,
|
||||
Sexp.Arr [Sexp.Str "Basic_consume_reply";
|
||||
Sexp.Str consumer_tag])) with
|
||||
| true -> return ()
|
||||
| false -> send_warning conn not_found ("Queue "^queue.Node.label^" not found"))
|
||||
| Basic_publish (exchange, routing_key, false, false) ->
|
||||
lwt (_, (body_size, properties)) = next_header conn in
|
||||
lwt body = recv_content_body conn body_size in
|
||||
(match_lwt
|
||||
Node.post' exchange
|
||||
(Sexp.Str routing_key)
|
||||
(Sexp.Hint {Sexp.hint = Sexp.Str "amqp";
|
||||
Sexp.body = Sexp.Arr [Sexp.Str exchange;
|
||||
Sexp.Str routing_key;
|
||||
sexp_of_properties properties;
|
||||
Sexp.Str body]})
|
||||
(Sexp.Str "")
|
||||
with
|
||||
| true -> return ()
|
||||
| false -> send_warning conn not_found ("Exchange "^exchange^" not found"))
|
||||
| Basic_ack (delivery_tag, multiple) ->
|
||||
return ()
|
||||
| _ ->
|
||||
let (cid, mid) = method_index m in
|
||||
die not_implemented (Printf.sprintf "Unsupported method (or method arguments) %s"
|
||||
(method_name cid mid))
|
||||
|
@ -347,25 +358,26 @@ let check_login_details mechanism response =
|
|||
let tune_connection conn frame_max =
|
||||
with_conn_mutex conn (fun () ->
|
||||
conn.input_buf <- String.create frame_max;
|
||||
conn.output_buf <- Buffer.create frame_max;
|
||||
conn.frame_max <- frame_max)
|
||||
conn.output_buf <- Obuffer.create frame_max;
|
||||
conn.frame_max <- frame_max;
|
||||
return ())
|
||||
|
||||
let handshake_and_tune conn =
|
||||
let (major_version, minor_version, revision) = version in
|
||||
send_method conn 0 (Connection_start (major_version, minor_version, server_properties,
|
||||
"PLAIN AMQPLAIN", "en_US"));
|
||||
let (client_properties, mechanism, response, locale) =
|
||||
match next_method conn with
|
||||
| (0, Connection_start_ok props) -> props
|
||||
| _ -> die not_allowed "Expected Connection_start_ok on channel 0"
|
||||
lwt () = send_method conn 0 (Connection_start (major_version, minor_version, server_properties,
|
||||
"PLAIN AMQPLAIN", "en_US")) in
|
||||
lwt (client_properties, mechanism, response, locale) =
|
||||
match_lwt next_method conn with
|
||||
| (0, Connection_start_ok props) -> return props
|
||||
| _ -> die not_allowed "Expected Connection_start_ok on channel 0"
|
||||
in
|
||||
check_login_details mechanism response;
|
||||
Log.info "Connection from AMQP client" [sexp_of_table client_properties];
|
||||
send_method conn 0 (Connection_tune (1, Int32.of_int suggested_frame_max, 0));
|
||||
let (channel_max, frame_max, heartbeat) =
|
||||
match next_method conn with
|
||||
| (0, Connection_tune_ok props) -> props
|
||||
| _ -> die not_allowed "Expected Connection_tune_ok on channel 0"
|
||||
ignore (Log.info "Connection from AMQP client" [sexp_of_table client_properties]);
|
||||
lwt () = send_method conn 0 (Connection_tune (1, Int32.of_int suggested_frame_max, 0)) in
|
||||
lwt (channel_max, frame_max, heartbeat) =
|
||||
match_lwt next_method conn with
|
||||
| (0, Connection_tune_ok props) -> return props
|
||||
| _ -> die not_allowed "Expected Connection_tune_ok on channel 0"
|
||||
in
|
||||
if channel_max > 1
|
||||
then die not_implemented "Channel numbers higher than 1 are not supported" else ();
|
||||
|
@ -373,38 +385,36 @@ let handshake_and_tune conn =
|
|||
then die syntax_error "Requested frame max too large" else ();
|
||||
if heartbeat > 0
|
||||
then die not_implemented "Heartbeats not yet implemented (patches welcome)" else ();
|
||||
tune_connection conn (Int32.to_int frame_max);
|
||||
let (virtual_host) =
|
||||
match next_method conn with
|
||||
| (0, Connection_open props) -> props
|
||||
| _ -> die not_allowed "Expected Connection_open on channel 0"
|
||||
lwt () = tune_connection conn (Int32.to_int frame_max) in
|
||||
lwt (virtual_host) =
|
||||
match_lwt next_method conn with
|
||||
| (0, Connection_open props) -> return props
|
||||
| _ -> die not_allowed "Expected Connection_open on channel 0"
|
||||
in
|
||||
Log.info "Connected to vhost" [Sexp.Str virtual_host];
|
||||
ignore (Log.info "Connected to vhost" [Sexp.Str virtual_host]);
|
||||
send_method conn 0 Connection_open_ok
|
||||
|
||||
let amqp_mainloop conn n =
|
||||
Node.bind_ignore (conn.name, n);
|
||||
lwt () = Node.bind_ignore (conn.name, n) in
|
||||
(try
|
||||
handshake_and_tune conn;
|
||||
while not conn.connection_closed do
|
||||
let (channel, m) = next_method conn in
|
||||
handle_method conn channel m
|
||||
done
|
||||
with
|
||||
| Amqp_exception (code, message) ->
|
||||
send_error conn code message
|
||||
)
|
||||
lwt () = handshake_and_tune conn in
|
||||
while_lwt not conn.connection_closed do
|
||||
lwt (channel, m) = next_method conn in
|
||||
handle_method conn channel m
|
||||
done
|
||||
with
|
||||
| Amqp_exception (code, message) ->
|
||||
send_error conn code message)
|
||||
|
||||
let start (s, peername) =
|
||||
Connections.start_connection "amqp" issue_banner
|
||||
amqp_boot amqp_handler amqp_mainloop (s, peername)
|
||||
|
||||
let init () =
|
||||
Node.send_ignore' "factory" (Message.create (Sexp.Str "direct",
|
||||
Sexp.Arr [Sexp.Str "amq.direct"],
|
||||
Sexp.Str "", Sexp.Str ""));
|
||||
Node.send_ignore' "factory" (Message.create (Sexp.Str "fanout",
|
||||
Sexp.Arr [Sexp.Str "amq.fanout"],
|
||||
Sexp.Str "", Sexp.Str ""));
|
||||
ignore (Util.create_daemon_thread
|
||||
"AMQP listener" None (Net.start_net "AMQP" Amqp_spec.port) start)
|
||||
lwt () = Node.send_ignore' "factory" (Message.create (Sexp.Str "direct",
|
||||
Sexp.Arr [Sexp.Str "amq.direct"],
|
||||
Sexp.Str "", Sexp.Str "")) in
|
||||
lwt () = Node.send_ignore' "factory" (Message.create (Sexp.Str "fanout",
|
||||
Sexp.Arr [Sexp.Str "amq.fanout"],
|
||||
Sexp.Str "", Sexp.Str "")) in
|
||||
Util.create_daemon_thread "AMQP listener" None (Net.start_net "AMQP" Amqp_spec.port) start
|
||||
|
|
|
@ -137,7 +137,7 @@ and decoded_table t =
|
|||
| Decoded_table fs -> fs
|
||||
| Both_table (_, fs) -> fs
|
||||
|
||||
let write_octet output_buf x = Buffer.add_char output_buf (char_of_int x)
|
||||
let write_octet output_buf x = Obuffer.add_char output_buf (char_of_int x)
|
||||
let write_char output_buf x = write_octet output_buf (int_of_char x)
|
||||
let write_short output_buf x =
|
||||
write_octet output_buf ((x lsr 8) land 255);
|
||||
|
@ -159,10 +159,10 @@ let write_longlong output_buf x =
|
|||
let write_shortstr output_buf x =
|
||||
let len = String.length x in
|
||||
write_octet output_buf len;
|
||||
Buffer.add_string output_buf x
|
||||
Obuffer.add_string output_buf x
|
||||
let write_longstr output_buf x =
|
||||
write_long output_buf (Int32.of_int (String.length x));
|
||||
Buffer.add_string output_buf x
|
||||
Obuffer.add_string output_buf x
|
||||
let write_timestamp output_buf x = write_longlong output_buf x
|
||||
|
||||
let
|
||||
|
@ -194,16 +194,16 @@ and write_table_value output_buf f =
|
|||
| Table_unsigned_long v -> wcode 'i'; write_long output_buf v
|
||||
| Table_signed_longlong v -> wcode 'L'; write_longlong output_buf v
|
||||
| Table_unsigned_longlong v -> wcode 'l'; write_longlong output_buf v
|
||||
| Table_float v -> wcode 'f'; Buffer.add_string output_buf v
|
||||
| Table_double v -> wcode 'd'; Buffer.add_string output_buf v
|
||||
| Table_float v -> wcode 'f'; Obuffer.add_string output_buf v
|
||||
| Table_double v -> wcode 'd'; Obuffer.add_string output_buf v
|
||||
| Table_decimal (scale, v) -> wcode 'D'; write_octet output_buf scale; write_long output_buf v
|
||||
| Table_short_string v -> wcode 's'; write_shortstr output_buf v
|
||||
| Table_string v -> wcode 'S'; write_longstr output_buf v
|
||||
| Table_array vs ->
|
||||
wcode 'A';
|
||||
let buf = Buffer.create 1024 in
|
||||
let buf = Obuffer.create 1024 in
|
||||
encode_unnamed_fields buf vs;
|
||||
write_longstr output_buf (Buffer.contents buf)
|
||||
write_longstr output_buf (Obuffer.contents buf)
|
||||
| Table_timestamp v -> wcode 'T'; write_longlong output_buf v
|
||||
| Table_table t -> wcode 'F'; write_longstr output_buf (encoded_table t)
|
||||
| Table_void -> wcode 'V'
|
||||
|
@ -212,9 +212,9 @@ and encoded_table t =
|
|||
match t.table_body with
|
||||
| Encoded_table s -> s
|
||||
| Decoded_table fs ->
|
||||
let buf = Buffer.create 1024 in
|
||||
let buf = Obuffer.create 1024 in
|
||||
encode_named_fields buf fs;
|
||||
let s = Buffer.contents buf in
|
||||
let s = Obuffer.contents buf in
|
||||
t.table_body <- Both_table (s, fs);
|
||||
s
|
||||
| Both_table (s, _) -> s
|
||||
|
|
|
@ -47,12 +47,12 @@ lwt _ =
|
|||
lwt () = Directnode.init () in
|
||||
lwt () = Meta.init () in
|
||||
hook_log ();
|
||||
(* Amqp_relay.init ();
|
||||
Ui_main.init ();
|
||||
ignore (Amqp_relay.init ());
|
||||
(* Ui_main.init ();
|
||||
Ui_relay.init (); *)
|
||||
Relay.init ();
|
||||
(* Server_control.run_until "AMQP ready";
|
||||
Server_control.run_until "HTTP ready"; *)
|
||||
ignore (Relay.init ());
|
||||
lwt () = Server_control.run_until "AMQP ready" in
|
||||
(* Server_control.run_until "HTTP ready"; *)
|
||||
lwt () = Server_control.run_until "Hop ready" in
|
||||
if Server_control.is_running ()
|
||||
then (lwt () = create_ready_file () in
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
(* Copy of buffer.ml from the ocaml 3.12 source code, extended with a
|
||||
new function write, for integrating with Lwt. Removed a few
|
||||
functions which for some reason don't compile using the Lwt camlp4
|
||||
extensions (?). *)
|
||||
(***********************************************************************)
|
||||
(* *)
|
||||
(* Objective Caml *)
|
||||
(* *)
|
||||
(* Pierre Weis and Xavier Leroy, projet Cristal, INRIA Rocquencourt *)
|
||||
(* *)
|
||||
(* Copyright 1999 Institut National de Recherche en Informatique et *)
|
||||
(* en Automatique. All rights reserved. This file is distributed *)
|
||||
(* under the terms of the GNU Library General Public License, with *)
|
||||
(* the special exception on linking described in file ../LICENSE. *)
|
||||
(* *)
|
||||
(***********************************************************************)
|
||||
|
||||
(* $Id: buffer.ml 10216 2010-03-28 08:16:45Z xleroy $ *)
|
||||
|
||||
(* Extensible buffers *)
|
||||
|
||||
type t =
|
||||
{mutable buffer : string;
|
||||
mutable position : int;
|
||||
mutable length : int;
|
||||
initial_buffer : string}
|
||||
|
||||
let create n =
|
||||
let n = if n < 1 then 1 else n in
|
||||
let n = if n > Sys.max_string_length then Sys.max_string_length else n in
|
||||
let s = String.create n in
|
||||
{buffer = s; position = 0; length = n; initial_buffer = s}
|
||||
|
||||
let contents b = String.sub b.buffer 0 b.position
|
||||
|
||||
let sub b ofs len =
|
||||
if ofs < 0 || len < 0 || ofs > b.position - len
|
||||
then invalid_arg "Obuffer.sub"
|
||||
else begin
|
||||
let r = String.create len in
|
||||
String.blit b.buffer ofs r 0 len;
|
||||
r
|
||||
end
|
||||
;;
|
||||
|
||||
let blit src srcoff dst dstoff len =
|
||||
if len < 0 || srcoff < 0 || srcoff > src.position - len
|
||||
|| dstoff < 0 || dstoff > (String.length dst) - len
|
||||
then invalid_arg "Obuffer.blit"
|
||||
else
|
||||
String.blit src.buffer srcoff dst dstoff len
|
||||
;;
|
||||
|
||||
let nth b ofs =
|
||||
if ofs < 0 || ofs >= b.position then
|
||||
invalid_arg "Obuffer.nth"
|
||||
else String.get b.buffer ofs
|
||||
;;
|
||||
|
||||
let length b = b.position
|
||||
|
||||
let clear b = b.position <- 0
|
||||
|
||||
let reset b =
|
||||
b.position <- 0; b.buffer <- b.initial_buffer;
|
||||
b.length <- String.length b.buffer
|
||||
|
||||
let resize b more =
|
||||
let len = b.length in
|
||||
let new_len = ref len in
|
||||
while b.position + more > !new_len do new_len := 2 * !new_len done;
|
||||
if !new_len > Sys.max_string_length then begin
|
||||
if b.position + more <= Sys.max_string_length
|
||||
then new_len := Sys.max_string_length
|
||||
else failwith "Obuffer.add: cannot grow buffer"
|
||||
end;
|
||||
let new_buffer = String.create !new_len in
|
||||
String.blit b.buffer 0 new_buffer 0 b.position;
|
||||
b.buffer <- new_buffer;
|
||||
b.length <- !new_len
|
||||
|
||||
let add_char b c =
|
||||
let pos = b.position in
|
||||
if pos >= b.length then resize b 1;
|
||||
b.buffer.[pos] <- c;
|
||||
b.position <- pos + 1
|
||||
|
||||
let add_substring b s offset len =
|
||||
if offset < 0 || len < 0 || offset > String.length s - len
|
||||
then invalid_arg "Obuffer.add_substring";
|
||||
let new_position = b.position + len in
|
||||
if new_position > b.length then resize b len;
|
||||
String.blit s offset b.buffer b.position len;
|
||||
b.position <- new_position
|
||||
|
||||
let add_string b s =
|
||||
let len = String.length s in
|
||||
let new_position = b.position + len in
|
||||
if new_position > b.length then resize b len;
|
||||
String.blit s 0 b.buffer b.position len;
|
||||
b.position <- new_position
|
||||
|
||||
let add_buffer b bs =
|
||||
add_substring b bs.buffer 0 bs.position
|
||||
|
||||
let add_channel b ic len =
|
||||
if len < 0 || len > Sys.max_string_length then (* PR#5004 *)
|
||||
invalid_arg "Obuffer.add_channel";
|
||||
if b.position + len > b.length then resize b len;
|
||||
really_input ic b.buffer b.position len;
|
||||
b.position <- b.position + len
|
||||
|
||||
let output_buffer oc b =
|
||||
output oc b.buffer 0 b.position
|
||||
|
||||
let write c b =
|
||||
Lwt_io.write_from_exactly c b.buffer 0 b.position
|
||||
|
||||
let closing = function
|
||||
| '(' -> ')'
|
||||
| '{' -> '}'
|
||||
| _ -> assert false;;
|
||||
|
||||
(* opening and closing: open and close characters, typically ( and )
|
||||
k: balance of opening and closing chars
|
||||
s: the string where we are searching
|
||||
start: the index where we start the search. *)
|
||||
let advance_to_closing opening closing k s start =
|
||||
let rec advance k i lim =
|
||||
if i >= lim then raise Not_found else
|
||||
if s.[i] = opening then advance (k + 1) (i + 1) lim else
|
||||
if s.[i] = closing then
|
||||
if k = 0 then i else advance (k - 1) (i + 1) lim
|
||||
else advance k (i + 1) lim in
|
||||
advance k start (String.length s);;
|
Loading…
Reference in New Issue