@ -4,7 +4,7 @@ HTML=$(subst web/bootstrap/templates/,web/,$(subst .xml,.html,$(TEMPLATES)))
# Static builds. So far I've only seen this work on Linux. OS X complains about missing -lcrt0.o.
# OCAMLBUILD=ocamlbuild -classic-display -use-ocamlfind -X scratch -lflag -cclib -lflag -static
OCAMLBUILD=ocamlbuild -classic-display -use-ocamlfind -X scratch
OCAMLBUILD=ocamlbuild -tag thread -classic-display -use-ocamlfind -X scratch
all: \
message.ml amqp_spec.ml \

@ -241,13 +241,13 @@ def print_codec():
for m in methods:
print m.match_clause
if m.accessible_fields:
print ' Arr [Str "%s"; Str "%s"' % (m.class_name, m.name)
print ' Arr [litstr "%s"; litstr "%s"' % (m.class_name, m.name)
for f in m.accessible_fields:
print ' ; Arr [Str "%s"; sexp_of_%s(%s)]' % \
print ' ; Arr [litstr "%s"; sexp_of_%s(%s)]' % \
(f.name, mlify(f.type), mlify(f.name))
print ' ]'
print ' Arr [Str "%s"; Str "%s"]' % (m.class_name, m.name)
print ' Arr [litstr "%s"; litstr "%s"]' % (m.class_name, m.name)
print 'let method_name class_index method_index = match (class_index, method_index) with'
for m in methods:
@ -300,7 +300,7 @@ def print_codec():
print c.match_clause
print ' let fields__ = [] in'
for f in reversed(c.accessible_fields):
print ' let fields__ = (match %s with Some v -> Arr [Str "%s"; sexp_of_%s(v)] :: fields__ | None -> fields__) in' % \
print ' let fields__ = (match %s with Some v -> Arr [litstr "%s"; sexp_of_%s(v)] :: fields__ | None -> fields__) in' % \
(mlify(f.name), f.name, mlify(f.type))
print ' Arr fields__'
@ -339,7 +339,7 @@ def print_codec():
print ' | Arr ps ->'
print ' List.iter (fun (p) -> match p with'
for f in c.accessible_fields:
print ' | Arr [Str "%s"; v] -> %s := Some (%s_of_sexp v)' % \
print ' | Arr [Str k; v] when k = Bytes.of_string "%s" -> %s := Some (%s_of_sexp v)' % \
(f.name, mlify(f.name), mlify(f.type))
print ' | _ -> ()) ps'
print ' | _ -> ());'

@ -26,7 +26,7 @@ type connection_t = {
cin: Lwt_io.input_channel;
cout: Lwt_io.output_channel;
name: Node.name;
mutable input_buf: string;
mutable input_buf: bytes;
mutable output_buf: Obuffer.t;
mutable frame_max: int;
mutable connection_closed: bool;
@ -42,8 +42,8 @@ let amqp_boot (peername, cin, cout) = return {
mtx = Lwt_mutex.create ();
cin = cin;
cout = cout;
name = Node.name_of_string (Uuid.create ());
input_buf = String.create initial_frame_size;
name = Node.name_of_bytes (Bytes.of_string (Uuid.create ()));
input_buf = Bytes.create initial_frame_size;
output_buf = Obuffer.create initial_frame_size;
frame_max = initial_frame_size;
connection_closed = false;
@ -105,7 +105,7 @@ let deserialize_header buf =
return (body_size, props)
let send_content_body conn channel body =
let len = String.length body in
let len = Bytes.length body in
let rec send_remainder offset =
if offset >= len
then return ()
@ -149,43 +149,43 @@ let send_method conn channel m =
serialize_method conn.output_buf m;
write_frame conn frame_method channel)
let send_content_method conn channel m p body_str =
let send_content_method conn channel m p body_bs =
with_conn_mutex conn (fun () ->
serialize_method conn.output_buf m;
lwt () = write_frame conn frame_method 1 in
serialize_header conn.output_buf (String.length body_str) p;
serialize_header conn.output_buf (Bytes.length body_bs) p;
lwt () = write_frame conn frame_header 1 in
send_content_body conn 1 body_str)
send_content_body conn 1 body_bs)
let send_error conn code message =
if conn.connection_closed
then return ()
(conn.connection_closed <- true;
let m = Connection_close (code, message, 0, 0) in
let m = Connection_close (code, Bytes.of_string message, 0, 0) in
ignore (Log.warn "Sending error" [sexp_of_method m]);
send_method conn 0 m)
let send_warning conn code message =
let m = Channel_close (code, message, 0, 0) in
let m = Channel_close (code, Bytes.of_string message, 0, 0) in
ignore (Log.warn "Sending warning" [sexp_of_method m]);
send_method conn 1 m
let issue_banner cin cout =
let handshake = String.create 8 in
let handshake = Bytes.create 8 in
lwt () = Lwt_io.read_into_exactly cin handshake 0 8 in
if String.sub handshake 0 4 <> "AMQP"
then (lwt () = Lwt_io.write cout "AMQP\000\000\009\001" in return false)
if Bytes.sub handshake 0 4 <> (Bytes.of_string "AMQP")
then (lwt () = Lwt_io.write_from_exactly cout (Bytes.of_string "AMQP\000\000\009\001") 0 8 in return false)
else (ignore (Log.info "AMQP handshake bytes"
[Sexp.Str (string_of_int (int_of_char (String.get handshake 4)));
Sexp.Str (string_of_int (int_of_char (String.get handshake 5)));
Sexp.Str (string_of_int (int_of_char (String.get handshake 6)));
Sexp.Str (string_of_int (int_of_char (String.get handshake 7)))]);
[Sexp.str (string_of_int (int_of_char (Bytes.get handshake 4)));
Sexp.str (string_of_int (int_of_char (Bytes.get handshake 5)));
Sexp.str (string_of_int (int_of_char (Bytes.get handshake 6)));
Sexp.str (string_of_int (int_of_char (Bytes.get handshake 7)))]);
return true)
with End_of_file -> return false
let reference_to_logs = "See server logs for details"
let reference_to_logs = (Bytes.of_string "See server logs for details")
let extract_str v =
match v with
| Sexp.Str s -> s
@ -197,14 +197,14 @@ let reply_to_declaration conn status ok_fn =
send_method conn 1 (ok_fn info)
| Message.Create_failed reason ->
(match reason with
| Sexp.Arr [Sexp.Str "factory"; Sexp.Str "class-not-found"] ->
| Sexp.Arr [Sexp.Str who; Sexp.Str code] when who = (Bytes.of_string "factory") && code = (Bytes.of_string "class-not-found")->
send_error conn command_invalid "Object type not supported by server"
| Sexp.Arr [Sexp.Str "constructor"; Sexp.Str "class-mismatch"] ->
| Sexp.Arr [Sexp.Str who; Sexp.Str code] when who = (Bytes.of_string "constructor") && code = (Bytes.of_string "class-mismatch") ->
send_error conn not_allowed "Redeclaration with different object type not permitted"
| Sexp.Arr [Sexp.Str who; explanation] ->
send_warning conn precondition_failed (who^" failed: "^(extract_str explanation))
send_warning conn precondition_failed ((Bytes.to_string who)^" failed: "^(Bytes.to_string (extract_str explanation)))
| _ ->
send_warning conn precondition_failed reference_to_logs)
send_warning conn precondition_failed (Bytes.to_string reference_to_logs))
| _ -> die internal_error "Declare reply malformed"
let make_queue_declare_ok info =
@ -214,39 +214,47 @@ let make_queue_declare_ok info =
let send_delivery conn consumer_tag body_sexp =
match body_sexp with
| Sexp.Arr [Sexp.Hint {Sexp.hint = "amqp"; Sexp.body = ""};
| Sexp.Arr [Sexp.Hint {Sexp.hint = maybe_amqp; Sexp.body = h_body_bs};
Sexp.Str exchange;
Sexp.Str routing_key;
Sexp.Str body_str] ->
Sexp.Str body_bs] when maybe_amqp = (Bytes.of_string "amqp") && h_body_bs = Bytes.empty ->
lwt tag = with_conn_mutex conn (fun () ->
let v = conn.delivery_tag in conn.delivery_tag <- v + 1; return v)
send_content_method conn 1
(Basic_deliver (consumer_tag, Int64.of_int tag, false, exchange, routing_key))
(properties_of_sexp basic_class_id properties_sexp)
| _ -> die internal_error "Malformed AMQP message body sexp"
let amqp_handler conn n m_sexp =
(match Message.message_of_sexp m_sexp with
| Message.Post (Sexp.Str "Exchange_declare_reply", status, _) ->
reply_to_declaration conn status (fun (_) -> Exchange_declare_ok)
| Message.Post (Sexp.Str "Queue_declare_reply", status, _) ->
reply_to_declaration conn status make_queue_declare_ok
| Message.Post (Sexp.Str "Queue_bind_reply", status, _) ->
(match Message.message_of_sexp status with
| Message.Subscribe_ok _ -> send_method conn 1 Queue_bind_ok
| _ -> die internal_error "Queue bind reply malformed")
| Message.Post (Sexp.Arr [Sexp.Str "Basic_consume_reply"; Sexp.Str consumer_tag], status, _) ->
(match Message.message_of_sexp status with
| Message.Subscribe_ok _ -> send_method conn 1 (Basic_consume_ok consumer_tag)
| _ -> die internal_error "Basic consume reply malformed")
| Message.Post (Sexp.Arr [Sexp.Str "delivery"; Sexp.Str consumer_tag], body, _) ->
send_delivery conn consumer_tag body
| _ ->
Log.warn "AMQP outbound relay ignoring message" [m_sexp])
| Message.Post (Sexp.Str type_bs, status, _) ->
(match Bytes.to_string type_bs with
| "Exchange_declare_reply" ->
reply_to_declaration conn status (fun (_) -> Exchange_declare_ok)
| "Queue_declare_reply" ->
reply_to_declaration conn status make_queue_declare_ok
| "Queue_bind_reply" ->
(match Message.message_of_sexp status with
| Message.Subscribe_ok _ -> send_method conn 1 Queue_bind_ok
| _ -> die internal_error "Queue bind reply malformed")
| _ ->
Log.warn "AMQP outbound relay ignoring message" [m_sexp])
| Message.Post (Sexp.Arr [Sexp.Str type_bs; Sexp.Str consumer_tag], status_or_body, _) ->
(match Bytes.to_string type_bs with
| "Basic_consume_reply" ->
(match Message.message_of_sexp status_or_body with
| Message.Subscribe_ok _ -> send_method conn 1 (Basic_consume_ok consumer_tag)
| _ -> die internal_error "Basic consume reply malformed")
| "delivery" ->
send_delivery conn consumer_tag status_or_body
| _ ->
Log.warn "AMQP outbound relay ignoring message" [m_sexp])
| _ ->
Log.warn "AMQP outbound relay ignoring message" [m_sexp])
| Amqp_exception (code, message) ->
send_error conn code message
@ -260,29 +268,26 @@ let get_recent_queue_name conn =
| None -> die syntax_error "Attempt to use nonexistent most-recently-declared-queue name"
let expand_mrdq conn queue =
match queue with
| "" -> get_recent_queue_name conn
| other -> Node.name_of_string other
if queue = Bytes.empty then get_recent_queue_name conn
else Node.name_of_bytes queue
let handle_method conn channel m =
(* ignore (Log.info "method" [sexp_of_method m]); *)
if channel > 1 then die channel_error "Unsupported channel number" else ();
match m with
| Connection_close (code, text, _, _) ->
ignore (Log.info "Client closed AMQP connection"
[Sexp.Str (string_of_int code); Sexp.Str text]);
ignore (Log.info "Client closed AMQP connection" [Sexp.str (string_of_int code); Sexp.Str text]);
lwt () = send_method conn channel Connection_close_ok in
return (conn.connection_closed <- true)
| Channel_open ->
conn.delivery_tag <- 1;
send_method conn channel Channel_open_ok
| Channel_close (code, text, _, _) ->
ignore (Log.info "Client closed AMQP channel"
[Sexp.Str (string_of_int code); Sexp.Str text]);
ignore (Log.info "Client closed AMQP channel" [Sexp.str (string_of_int code); Sexp.Str text]);
send_method conn channel Channel_close_ok;
| Channel_close_ok ->
return ()
| Exchange_declare ("", type_, passive, durable, no_wait, arguments) ->
| Exchange_declare (maybe_empty, type_, passive, durable, no_wait, arguments) when maybe_empty = Bytes.empty ->
(* Qpid does this bizarre thing of declaring the default exchange. *)
if no_wait
then return ()
@ -290,21 +295,25 @@ let handle_method conn channel m =
| Exchange_declare (exchange, type_, passive, durable, no_wait, arguments) ->
let (reply_sink, reply_name) =
if no_wait
then ("", "")
else (conn.name.Node.label, "Exchange_declare_reply")
then (Bytes.empty, Bytes.empty)
else (conn.name.Node.label, Bytes.of_string "Exchange_declare_reply")
Node.send_ignore' "factory" (Message.create (Sexp.Str type_,
Sexp.Arr [Sexp.Str exchange],
Sexp.Str reply_sink,
Sexp.Str reply_name))
(Bytes.of_string "factory")
(Message.create (Sexp.Str type_,
Sexp.Arr [Sexp.Str exchange],
Sexp.Str reply_sink,
Sexp.Str reply_name))
| Queue_declare (queue, passive, durable, exclusive, auto_delete, no_wait, arguments) ->
let queue = (if queue = "" then Uuid.create () else queue) in
conn.recent_queue_name <- Some (Node.name_of_string queue);
Node.send_ignore' "factory" (Message.create (Sexp.Str "queue",
Sexp.Arr [Sexp.Str queue],
Sexp.Str conn.name.Node.label,
Sexp.Str "Queue_declare_reply"))
| Queue_bind (queue, "", routing_key, no_wait, arguments) ->
let queue = (if queue = Bytes.empty then Bytes.of_string (Uuid.create ()) else queue) in
conn.recent_queue_name <- Some (Node.name_of_bytes queue);
(Bytes.of_string "factory")
(Message.create (Sexp.litstr "queue",
Sexp.Arr [Sexp.Str queue],
Sexp.Str conn.name.Node.label,
Sexp.litstr "Queue_declare_reply"))
| Queue_bind (queue, maybe_empty, routing_key, no_wait, arguments) when maybe_empty = Bytes.empty ->
(* Qpid does this bizarre thing of binding to the default exchange. *)
if no_wait
then return ()
@ -312,54 +321,54 @@ let handle_method conn channel m =
| Queue_bind (queue, exchange, routing_key, no_wait, arguments) ->
let queue = expand_mrdq conn queue in
if not (Node.approx_exists queue)
then send_warning conn not_found ("Queue '"^queue.Node.label^"' not found")
then send_warning conn not_found ("Queue '"^(Bytes.to_string queue.Node.label)^"' not found")
(match_lwt Node.send' exchange (Message.subscribe (Sexp.Str routing_key,
Sexp.Str queue.Node.label,
Sexp.Str "",
Sexp.Str conn.name.Node.label,
Sexp.Str "Queue_bind_reply")) with
Sexp.litstr "Queue_bind_reply")) with
| true -> return ()
| false -> send_warning conn not_found ("Exchange '"^exchange^"' not found"))
| false -> send_warning conn not_found ("Exchange '"^(Bytes.to_string exchange)^"' not found"))
| Basic_consume (queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) ->
let queue = expand_mrdq conn queue in
let consumer_tag = (if consumer_tag = "" then Uuid.create () else consumer_tag) in
let consumer_tag = (if consumer_tag = Bytes.empty then (Bytes.of_string (Uuid.create ())) else consumer_tag) in
(match_lwt Node.send queue (Message.subscribe
(Sexp.Str "",
Sexp.Str conn.name.Node.label,
Sexp.Arr [Sexp.Str "delivery"; Sexp.Str consumer_tag],
Sexp.Arr [Sexp.litstr "delivery"; Sexp.Str consumer_tag],
Sexp.Str conn.name.Node.label,
Sexp.Arr [Sexp.Str "Basic_consume_reply";
Sexp.Arr [Sexp.litstr "Basic_consume_reply";
Sexp.Str consumer_tag])) with
| true -> return ()
| false -> send_warning conn not_found ("Queue '"^queue.Node.label^"' not found"))
| false -> send_warning conn not_found ("Queue '"^(Bytes.to_string queue.Node.label)^"' not found"))
| Basic_publish (exchange, routing_key, false, false) ->
lwt (_, (body_size, properties)) = next_header conn in
lwt body = recv_content_body conn body_size in
let (pseudotype, sink, name) =
if exchange = ""
then ("Queue", routing_key, "")
if exchange = Bytes.empty
then ("Queue", routing_key, Bytes.empty)
else ("Exchange", exchange, routing_key)
Node.post' sink
(Sexp.Str name)
(Sexp.Arr [Sexp.Hint {Sexp.hint = "amqp"; Sexp.body = ""};
(Sexp.Arr [Sexp.Hint {Sexp.hint = (Bytes.of_string "amqp"); Sexp.body = Bytes.empty};
Sexp.Str exchange;
Sexp.Str routing_key;
sexp_of_properties properties;
Sexp.Str body])
(Sexp.Str "")
| true -> return ()
| false -> send_warning conn not_found (pseudotype^" '"^sink^"' not found"))
| false -> send_warning conn not_found (pseudotype^" '"^(Bytes.to_string sink)^"' not found"))
| Basic_ack (delivery_tag, multiple) ->
return ()
| Basic_qos (_, _, _) ->
ignore (Log.warn "Ignoring Basic_qos instruction from client" []);
send_method conn channel Basic_qos_ok
| Channel_flow (on) ->
ignore (Log.warn "Ignoring Channel_flow setting" [Sexp.Str (string_of_bool on)]);
ignore (Log.warn "Ignoring Channel_flow setting" [Sexp.str (string_of_bool on)]);
send_method conn channel (Channel_flow_ok on)
| _ ->
let (cid, mid) = method_index m in
@ -367,37 +376,53 @@ let handle_method conn channel m =
(method_name cid mid))
let server_properties = table_of_list [
("product", Table_string App_info.product);
("version", Table_string App_info.version);
("copyright", Table_string App_info.copyright);
("licence", Table_string App_info.licence_blurb);
("capabilities", Table_table (table_of_list []));
((Bytes.of_string "product"), Table_string App_info.product);
((Bytes.of_string "version"), Table_string App_info.version);
((Bytes.of_string "copyright"), Table_string App_info.copyright);
((Bytes.of_string "licence"), Table_string App_info.licence_blurb);
((Bytes.of_string "capabilities"), Table_table (table_of_list []));
let check_login_details mechanism response =
match mechanism with
| "PLAIN" ->
(match (Str.split (Str.regexp "\000") response) with
| ["guest"; "guest"] -> ()
| _ -> die access_refused "Access refused")
(let fields = decode_named_fields (Ibuffer.of_string response) in
match (field_lookup_some "LOGIN" fields, field_lookup_some "PASSWORD" fields) with
| (Some (Table_string "guest"), Some (Table_string "guest")) -> ()
| _ -> die access_refused "Access refused")
| _ -> die access_refused "Bad auth mechanism"
match Bytes.to_string mechanism with
| "PLAIN" ->
if (match Bytes.index_opt response '\000' with
| Some pos ->
let user = Bytes.sub response 0 pos in
let pass = Bytes.sub response (pos + 1) ((Bytes.length response) - (pos + 1)) in
(Bytes.to_string user) = "guest" && (Bytes.to_string pass) = "guest"
| None -> false)
then ()
else die access_refused "Access refused"
(let fields = decode_named_fields (Ibuffer.of_bytes response) in
(match (field_lookup_some (Bytes.of_string "LOGIN") fields,
field_lookup_some (Bytes.of_string "PASSWORD") fields) with
| (Some (Table_string user), Some (Table_string pass))
when user = Bytes.of_string "guest" && pass = Bytes.of_string "guest" -> true
| _ -> false)
die access_refused "Access refused")
| _ ->
die access_refused "Bad auth mechanism"
let tune_connection conn frame_max =
with_conn_mutex conn (fun () ->
conn.input_buf <- String.create frame_max;
conn.input_buf <- Bytes.create frame_max;
conn.output_buf <- Obuffer.create frame_max;
conn.frame_max <- frame_max;
return ())
let handshake_and_tune conn =
let (major_version, minor_version, revision) = version in
lwt () = send_method conn 0 (Connection_start (major_version, minor_version, server_properties,
"PLAIN AMQPLAIN", "en_US")) in
lwt () = send_method conn 0 (Connection_start (major_version,
(Bytes.of_string "PLAIN AMQPLAIN"),
(Bytes.of_string "en_US"))) in
lwt (client_properties, mechanism, response, locale) =
match_lwt next_method conn with
| (0, Connection_start_ok props) -> return props
@ -443,11 +468,17 @@ let start (s, peername) =
amqp_boot amqp_handler amqp_mainloop (s, peername)
let init () =
lwt () = Node.send_ignore' "factory" (Message.create (Sexp.Str "direct",
Sexp.Arr [Sexp.Str "amq.direct"],
Sexp.Str "", Sexp.Str "")) in
lwt () = Node.send_ignore' "factory" (Message.create (Sexp.Str "fanout",
Sexp.Arr [Sexp.Str "amq.fanout"],
Sexp.Str "", Sexp.Str "")) in
lwt () = Node.send_ignore'
(Bytes.of_string "factory")
(Message.create (Sexp.litstr "direct",
Sexp.Arr [Sexp.litstr "amq.direct"],
Sexp.emptystr)) in
lwt () = Node.send_ignore'
(Bytes.of_string "factory")
(Message.create (Sexp.litstr "fanout",
Sexp.Arr [Sexp.litstr "amq.fanout"],
Sexp.emptystr)) in
let port = Config.get_int "amqp.port" Amqp_spec.port in
Util.create_daemon_thread "AMQP listener" None (Net.start_net "AMQP" port) start
Util.create_daemon_thread (Bytes.of_string "AMQP listener") None (Net.start_net "AMQP" port) start

@ -25,16 +25,16 @@ type octet_t = int
type short_t = int
type long_t = int32
type longlong_t = int64
type shortstr_t = string
type longstr_t = string
type shortstr_t = bytes
type longstr_t = bytes
type bit_t = bool
type timestamp_t = int64
type table_t = { mutable table_body: table_body_t }
and table_body_t =
| Encoded_table of string
| Decoded_table of (string * table_value_t) list
| Both_table of (string * (string * table_value_t) list)
| Encoded_table of bytes
| Decoded_table of (bytes * table_value_t) list
| Both_table of (bytes * (bytes * table_value_t) list)
and table_value_t =
| Table_bool of bool (* t *)
| Table_signed_byte of int (* b *)
@ -45,11 +45,11 @@ and table_value_t =
| Table_unsigned_long of int32 (* i *)
| Table_signed_longlong of int64 (* L *)
| Table_unsigned_longlong of int64 (* l *)
| Table_float of string (* f -- there seems to be no I/O for binary floats? *)
| Table_double of string (* d -- there seems to be no I/O for binary floats? *)
| Table_float of bytes (* f -- there seems to be no I/O for binary floats? *)
| Table_double of bytes (* d -- there seems to be no I/O for binary floats? *)
| Table_decimal of (int * int32) (* D *)
| Table_short_string of string (* s *)
| Table_string of string (* S *)
| Table_short_string of bytes (* s *)
| Table_string of bytes (* S *)
| Table_array of table_value_t list (* A *)
| Table_timestamp of int64 (* T *)
| Table_table of table_t (* F *)
@ -131,7 +131,7 @@ and read_table_value input_buf =
and decoded_table t =
match t.table_body with
| Encoded_table s ->
let fs = decode_named_fields (Ibuffer.create s 0 (String.length s)) in
let fs = decode_named_fields (Ibuffer.create s 0 (Bytes.length s)) in
t.table_body <- Both_table (s, fs);
| Decoded_table fs -> fs
@ -157,11 +157,11 @@ let write_longlong output_buf x =
write_octet output_buf ((Int64.to_int (Int64.shift_right_logical x 8)) land 255);
write_octet output_buf ((Int64.to_int x) land 255)
let write_shortstr output_buf x =
let len = String.length x in
let len = Bytes.length x in
write_octet output_buf len;
Obuffer.add_string output_buf x
let write_longstr output_buf x =
write_long output_buf (Int32.of_int (String.length x));
write_long output_buf (Int32.of_int (Bytes.length x));
Obuffer.add_string output_buf x
let write_timestamp output_buf x = write_longlong output_buf x
@ -221,17 +221,17 @@ and encoded_table t =
and write_table output_buf x = write_longstr output_buf (encoded_table x)
let sexp_of_octet x = Str (string_of_int x)
let sexp_of_short x = Str (string_of_int x)
let sexp_of_long x = Str (Int32.to_string x)
let sexp_of_longlong x = Str (Int64.to_string x)
let sexp_of_octet x = Str (Sexp.bytes_of_int x)
let sexp_of_short x = Str (Sexp.bytes_of_int x)
let sexp_of_long x = Str (Bytes.of_string (Int32.to_string x))
let sexp_of_longlong x = Str (Bytes.of_string (Int64.to_string x))
let sexp_of_shortstr x = Str x
let sexp_of_longstr x = Str x
let sexp_of_bit x = if x then Str "1" else Str ""
let sexp_of_timestamp x = Str (Int64.to_string x)
let sexp_of_bit x = if x then Sexp.litstr "1" else Sexp.emptystr
let sexp_of_timestamp x = Str (Bytes.of_string (Int64.to_string x))
let rec
sexp_of_table x = Arr ((Hint {hint = "table"; body = ""}) ::
sexp_of_table x = Arr ((Hint {hint = Bytes.of_string "table"; body = Bytes.empty}) ::
(List.map sexp_of_named_field (decoded_table x)))
and sexp_of_named_field (s, f) =
let (t, v) = tag_val f in
@ -240,10 +240,10 @@ and sexp_of_unnamed_field f =
let (t, v) = tag_val f in
Arr [t; v]
and tag_val f =
let h hs v = (Str hs, v) in
let h hs v = (Sexp.litstr hs, v) in
match f with
| Table_bool true -> h "t" (Str "1")
| Table_bool false -> h "t" (Str "")
| Table_bool true -> h "t" (Sexp.litstr "1")
| Table_bool false -> h "t" Sexp.emptystr
| Table_signed_byte v -> h "b" (sexp_of_octet (signed_to_unsigned v 256))
| Table_unsigned_byte v -> h "B" (sexp_of_octet v)
| Table_signed_short v -> h "U" (sexp_of_short (signed_to_unsigned v 65536))
@ -254,8 +254,8 @@ and tag_val f =
| Table_unsigned_longlong v -> h "l" (sexp_of_longlong v)
| Table_float v -> h "f" (Str v)
| Table_double v -> h "d" (Str v)
| Table_decimal (scale, v) -> h "D" (Arr [Arr [Str "scale"; sexp_of_octet scale];
Arr [Str "value"; sexp_of_long v]])
| Table_decimal (scale, v) -> h "D" (Arr [Arr [Sexp.litstr "scale"; sexp_of_octet scale];
Arr [Sexp.litstr "value"; sexp_of_long v]])
| Table_short_string v -> h "s" (Str v)
| Table_string v -> h "S" (Str v)
| Table_array vs -> h "A" (Arr (List.map sexp_of_unnamed_field vs))
@ -269,58 +269,60 @@ let reserved_value_octet = 0
let reserved_value_short = 0
let reserved_value_long = Int32.zero
let reserved_value_longlong = Int64.zero
let reserved_value_shortstr = ""
let reserved_value_longstr = ""
let reserved_value_shortstr = Bytes.empty
let reserved_value_longstr = Bytes.empty
let reserved_value_bit = false
let reserved_value_timestamp = Int64.zero
let reserved_value_table = { table_body = Encoded_table "" }
let reserved_value_table = { table_body = Encoded_table Bytes.empty }
let octet_of_sexp v = match v with Str x -> int_of_string x | _ -> reserved_value_octet
let short_of_sexp v = match v with Str x -> int_of_string x | _ -> reserved_value_short
let long_of_sexp v = match v with Str x -> Int32.of_string x | _ -> reserved_value_long
let longlong_of_sexp v = match v with Str x -> Int64.of_string x | _ -> reserved_value_longlong
let octet_of_sexp v = match v with Str x -> Sexp.int_of_bytes x | _ -> reserved_value_octet
let short_of_sexp v = match v with Str x -> Sexp.int_of_bytes x | _ -> reserved_value_short
let long_of_sexp v = match v with Str x -> Int32.of_string (Bytes.to_string x) | _ -> reserved_value_long
let longlong_of_sexp v = match v with Str x -> Int64.of_string (Bytes.to_string x) | _ -> reserved_value_longlong
let shortstr_of_sexp v = match v with Str x -> x | _ -> reserved_value_shortstr
let longstr_of_sexp v = match v with Str x -> x | _ -> reserved_value_longstr
let bit_of_sexp v = match v with Str x -> x <> "" | _ -> reserved_value_bit
let timestamp_of_sexp v = match v with Str x -> Int64.of_string x | _ -> reserved_value_timestamp
let bit_of_sexp v = match v with Str x -> x <> Bytes.empty | _ -> reserved_value_bit
let timestamp_of_sexp v = match v with Str x -> Int64.of_string (Bytes.to_string x) | _ -> reserved_value_timestamp
let rec table_of_sexp v =
match v with
| Arr ((Hint {hint = "table"; body = ""}) :: field_sexps) ->
| Arr ((Hint {hint = hint; body = body}) :: field_sexps)
when hint = Bytes.of_string "table" && body = Bytes.empty ->
table_of_list (List.map named_sexp_field field_sexps)
| _ ->
table_of_list []
and named_sexp_field v =
match v with
| Arr [Str s; Str t; f] -> (s, untag_val (t, f))
| _ -> ("", Table_void)
| _ -> (Bytes.empty, Table_void)
and field_of_sexp v =
match v with
| Arr [Str t; f] -> untag_val (t, f)
| _ -> Table_void
and untag_val (t, v) =
match (t, v) with
| ("t", Str x) -> Table_bool (x <> "")
| ("b", v) -> Table_signed_byte (unsigned_to_signed (octet_of_sexp v) 256)
| ("B", v) -> Table_unsigned_byte (octet_of_sexp v)
| ("U", v) -> Table_signed_short (unsigned_to_signed (short_of_sexp v) 65536)
| ("u", v) -> Table_unsigned_short (short_of_sexp v)
| ("I", v) -> Table_signed_long (long_of_sexp v)
| ("i", v) -> Table_unsigned_long (long_of_sexp v)
| ("L", v) -> Table_signed_longlong (longlong_of_sexp v)
| ("l", v) -> Table_unsigned_longlong (longlong_of_sexp v)
| ("f", Str v) -> Table_float v
| ("d", Str v) -> Table_double v
| ("D", Arr [Arr [Str "scale"; scale];
Arr [Str "value"; v]]) ->
Table_decimal (octet_of_sexp scale, long_of_sexp v)
| ("s", Str v) -> Table_short_string v
| ("S", Str v) -> Table_string v
| ("A", Arr vs) -> Table_array (List.map field_of_sexp vs)
| ("T", v) -> Table_timestamp (longlong_of_sexp v)
| ("F", v) -> Table_table (table_of_sexp v)
| ("V", Arr []) -> Table_void
| _ -> Table_void
if Bytes.length t <> 1 then Table_void else
match (Bytes.get t 0, v) with
| ('t', Str x) -> Table_bool (x <> Bytes.empty)
| ('b', v) -> Table_signed_byte (unsigned_to_signed (octet_of_sexp v) 256)
| ('B', v) -> Table_unsigned_byte (octet_of_sexp v)
| ('U', v) -> Table_signed_short (unsigned_to_signed (short_of_sexp v) 65536)
| ('u', v) -> Table_unsigned_short (short_of_sexp v)
| ('I', v) -> Table_signed_long (long_of_sexp v)
| ('i', v) -> Table_unsigned_long (long_of_sexp v)
| ('L', v) -> Table_signed_longlong (longlong_of_sexp v)
| ('l', v) -> Table_unsigned_longlong (longlong_of_sexp v)
| ('f', Str v) -> Table_float v
| ('d', Str v) -> Table_double v
| ('D', Arr [Arr [Str maybe_scale; scale]; Arr [Str maybe_value; v]])
when maybe_scale = Bytes.of_string "scale" && maybe_value = Bytes.of_string "value" ->
Table_decimal (octet_of_sexp scale, long_of_sexp v)
| ('s', Str v) -> Table_short_string v
| ('S', Str v) -> Table_string v
| ('A', Arr vs) -> Table_array (List.map field_of_sexp vs)
| ('T', v) -> Table_timestamp (longlong_of_sexp v)
| ('F', v) -> Table_table (table_of_sexp v)
| ('V', Arr []) -> Table_void
| _ -> Table_void
let field_lookup k def fs =
try List.assoc k fs

@ -15,11 +15,11 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
let product = "hop"
let version = "ALPHA"
let copyright = "Copyright (C) 2012 Tony Garnock-Jones."
let homepage = "the GNU General Public License (version 3 or later)" (* TODO: real homepage *)
let product = (Bytes.of_string "hop")
let version = (Bytes.of_string "ALPHA")
let copyright = (Bytes.of_string "Copyright (C) 2012 Tony Garnock-Jones.")
let homepage = (Bytes.of_string "the GNU General Public License (version 3 or later)") (* TODO: real homepage *)
let licence_blurb =
"This program comes with ABSOLUTELY NO WARRANTY. This is free software,\n"^
"and you are welcome to redistribute it under certain conditions.\n"^
"See "^homepage^" for details."
(Bytes.of_string ("This program comes with ABSOLUTELY NO WARRANTY. This is free software,\n"^
"and you are welcome to redistribute it under certain conditions.\n"^
"See "^(Bytes.to_string homepage)^" for details."))

@ -81,7 +81,7 @@ def print_codec():
if t.argnames:
print_list(' (', [n for n in t.argnames], ', ', ')')
print ' ->'
sys.stdout.write(' Arr [Str "%s"' % t.wire_selector)
sys.stdout.write(' Arr [Sexp.litstr "%s"' % t.wire_selector)
if t.argnames:
print_list('; ', t.argnames, '; ', '')
print ']'
@ -89,10 +89,10 @@ def print_codec():
print 'let message_of_sexp s = match s with'
for t in spec:
sys.stdout.write(' | Arr [Str "%s"' % t.wire_selector)
sys.stdout.write(' | Arr [Str label_bs')
if t.argnames:
print_list('; ', t.argnames, '; ', '')
print '] ->'
print ('] when label_bs = Bytes.of_string "%s" ->' % t.wire_selector)
sys.stdout.write(' %s' % t.constructor)
if t.argnames:
print_list(' (', [n for n in t.argnames], ', ', ')')

@ -64,15 +64,14 @@ let init () =
then loop (index + 1) (String.sub opt 2 (String.length opt - 2))
let v = (try Json.of_string opt with _ -> Json.Str opt) in
ignore (Log.info "Setting command-line parameter"
[Sexp.Str current_key; Sexpjson.sexp_of_json v]);
ignore (Log.info "Setting command-line parameter" [Sexp.str current_key; Sexpjson.sexp_of_json v]);
set current_key v;
loop (index + 1) current_key)
loop 1 "";
(match get "config-file" with
| Some (Json.Str config_filename) ->
ignore (Log.info "Reading configuration file" [Sexp.Str config_filename]);
ignore (Log.info "Reading configuration file" [Sexp.str config_filename]);
let file_config = try Json.load config_filename with Sys_error _ -> Json.Rec [] in
config := Json.merge_right file_config !config
| _ ->

@ -28,22 +28,21 @@ let endpoint_name n =
| _ -> "??unknown??"
let connection_main class_name peername cin cout issue_banner boot_fn node_fn mainloop =
ignore (Log.info ("Accepted "^class_name) [Str (endpoint_name peername)]);
ignore (Log.info ("Accepted "^class_name) [str (endpoint_name peername)]);
match_lwt issue_banner cin cout with
| true ->
lwt shared_state = boot_fn (peername, cin, cout) in
let n = Node.make class_name (node_fn shared_state) in
let n = Node.make (Bytes.of_string class_name) (node_fn shared_state) in
lwt () =
mainloop shared_state n
| End_of_file ->
Log.info ("Disconnecting "^class_name^" normally") [Str (endpoint_name peername)]
Log.info ("Disconnecting "^class_name^" normally") [str (endpoint_name peername)]
| Sys_error message ->
Log.warn ("Disconnected "^class_name^" by Sys_error")
[Str (endpoint_name peername); Str message]
Log.warn ("Disconnected "^class_name^" by Sys_error") [str (endpoint_name peername); str message]
| exn ->
Log.error ("Uncaught exception in "^class_name) [Str (Printexc.to_string exn)])
Log.error ("Uncaught exception in "^class_name) [str (Printexc.to_string exn)])
Node.unbind_all n
| false ->
@ -60,7 +59,7 @@ let start_connection' class_name issue_banner boot_fn node_fn mainloop (s, peern
let start_connection class_name issue_banner boot_fn node_fn mainloop (s, peername) =
(endpoint_name peername ^ " input")
(Bytes.of_string (endpoint_name peername ^ " input"))
(start_connection' class_name issue_banner boot_fn node_fn mainloop)
(s, peername)

@ -15,12 +15,12 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
module StringSet = Set.Make(String)
module StringMap = Map.Make(String)
module BytesSet = Set.Make(Bytes)
module BytesMap = Map.Make(Bytes)
module SexpMap = Map.Make(Sexp)
module UuidSet = StringSet
module UuidSet = BytesSet
let string_map_keys m = StringMap.fold (fun k _ acc -> k :: acc) m []
let bytes_map_keys m = BytesMap.fold (fun k _ acc -> k :: acc) m []
let classify f xs =
let rec loop acc xs =
@ -30,10 +30,10 @@ let classify f xs =
(match f x with
| Some (classification, v) ->
(v :: (try StringMap.find classification acc with Not_found -> []))
(v :: (try BytesMap.find classification acc with Not_found -> []))
| None -> loop acc xs')
in loop StringMap.empty xs
in loop BytesMap.empty xs

@ -23,22 +23,22 @@ open Status
type t = {
name: Node.name;
subscriptions: Subscription.set_t;
mutable routing_table: UuidSet.t StringMap.t;
mutable routing_table: UuidSet.t BytesMap.t;
let classname = "direct"
let unsubscribe info uuid =
match_lwt Subscription.delete info.name info.subscriptions uuid with
match_lwt Subscription.delete info.name info.subscriptions (Bytes.of_string uuid) with
| Some sub ->
(match sub.Subscription.filter with
| Str binding_key ->
let old_set = StringMap.find binding_key info.routing_table in
let new_set = UuidSet.remove sub.Subscription.uuid old_set in
let old_set = BytesMap.find binding_key info.routing_table in
let new_set = UuidSet.remove (Bytes.of_string sub.Subscription.uuid) old_set in
if UuidSet.is_empty new_set
then info.routing_table <- StringMap.remove binding_key info.routing_table
else info.routing_table <- StringMap.add binding_key new_set info.routing_table
then info.routing_table <- BytesMap.remove binding_key info.routing_table
else info.routing_table <- BytesMap.add binding_key new_set info.routing_table
with Not_found ->
return ()
@ -49,7 +49,7 @@ let route_message info n sexp =
match Message.message_of_sexp sexp with
| Message.Post (Str name, body, token) ->
let routing_snapshot = info.routing_table in
let matching = (try StringMap.find name routing_snapshot with Not_found -> UuidSet.empty) in
let matching = (try BytesMap.find name routing_snapshot with Not_found -> UuidSet.empty) in
(fun (uuid) ->
match Subscription.lookup info.subscriptions uuid with
@ -65,14 +65,14 @@ let route_message info n sexp =
| Subscription.New sub ->
let old_set =
(try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in
let new_set = UuidSet.add sub.Subscription.uuid old_set in
info.routing_table <- StringMap.add binding_key new_set info.routing_table;
(try BytesMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in
let new_set = UuidSet.add (Bytes.of_string sub.Subscription.uuid) old_set in
info.routing_table <- BytesMap.add binding_key new_set info.routing_table;
return ()
| Subscription.Old sub ->
return ())
| Message.Unsubscribe (Str token) ->
unsubscribe info token
unsubscribe info (Bytes.to_string token)
| m ->
Util.message_not_understood classname m
@ -80,15 +80,15 @@ let factory arg =
match arg with
| (Arr [Str name_str]) ->
let info = {
name = Node.name_of_string name_str;
name = Node.name_of_bytes name_str;
subscriptions = Subscription.new_set ();
routing_table = StringMap.empty;
routing_table = BytesMap.empty;
} in
(Node.make_idempotent_named classname info.name return (route_message info))
(Node.make_idempotent_named (Bytes.of_string classname) info.name return (route_message info))
(Str name_str)
| _ ->
return (Problem (Str "bad-arg"))
return (Problem (litstr "bad-arg"))
let init () =
Factory.register_class classname factory
Factory.register_class (Bytes.of_string classname) factory

@ -22,22 +22,22 @@ open Datastructures
type factory_t = Sexp.t -> (Sexp.t, Sexp.t) Status.t Lwt.t
let classes = ref StringMap.empty
let classes = ref BytesMap.empty
let register_class name factory =
if StringMap.mem name !classes
if BytesMap.mem name !classes
then (ignore (Log.error "Duplicate node class name" [Str name]);
Server_control.shutdown_now [Str "Duplicate node class name"; Str name];
Server_control.shutdown_now [litstr "Duplicate node class name"; Str name];
Lwt_unix.yield ())
else (ignore (Log.info "Registered node class" [Str name]);
classes := StringMap.add name factory !classes;
classes := BytesMap.add name factory !classes;
return ())
let all_class_names () =
Datastructures.string_map_keys !classes
Datastructures.bytes_map_keys !classes
let lookup_class name =
try Some (StringMap.find name !classes)
try Some (BytesMap.find name !classes)
with Not_found -> None
let factory_handler n sexp =
@ -54,14 +54,14 @@ let factory_handler n sexp =
| Status.Problem explanation ->
ignore (Log.info "Node create failed"
[Str classname; arg; Str reply_sink; Str reply_name; explanation]);
return (Message.create_failed (Arr [Str "constructor"; explanation])))
return (Message.create_failed (Arr [litstr "constructor"; explanation])))
| None ->
ignore (Log.warn "Node class not found" [Str classname]);
return (Message.create_failed (Arr [Str "factory"; Str "class-not-found"]))
return (Message.create_failed (Arr [litstr "factory"; litstr "class-not-found"]))
Node.post_ignore' reply_sink (Str reply_name) reply (Str "")
Node.post_ignore' reply_sink (Str reply_name) reply emptystr
| m ->
Util.message_not_understood "factory" m
let init () =
Node.bind_ignore (Node.name_of_string "factory", Node.make "factory" factory_handler)
Node.bind_ignore (Node.name_of_bytes (Bytes.of_string "factory"), Node.make (Bytes.of_string "factory") factory_handler)

@ -28,7 +28,7 @@ type t = {
let classname = "fanout"
let unsubscribe info uuid =
lwt _ = Subscription.delete info.name info.subscriptions uuid in return ()
lwt _ = Subscription.delete info.name info.subscriptions (Bytes.of_string uuid) in return ()
let route_message info n sexp =
match Message.message_of_sexp sexp with
@ -37,13 +37,13 @@ let route_message info n sexp =
(fun (uuid, sub) ->
lwt _ = Subscription.send_to_subscription' sub body (unsubscribe info) in return ())
(StringMap.bindings snapshot)
(BytesMap.bindings snapshot)
| Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) ->
lwt _ = (Subscription.create
info.name info.subscriptions filter sink name reply_sink reply_name) in
return ()
| Message.Unsubscribe (Str token) ->
unsubscribe info token
unsubscribe info (Bytes.to_string token)
| m ->
Util.message_not_understood classname m
@ -51,14 +51,14 @@ let factory arg =
match arg with
| (Arr [Str name_str]) ->
let info = {
name = Node.name_of_string name_str;
name = Node.name_of_bytes name_str;
subscriptions = Subscription.new_set ()
} in
(Node.make_idempotent_named classname info.name return (route_message info))
(Node.make_idempotent_named (Bytes.of_string classname) info.name return (route_message info))
(Str name_str)
| _ ->
return (Problem (Str "bad-arg"))
return (Problem (litstr "bad-arg"))
let init () =
Factory.register_class classname factory
Factory.register_class (Bytes.of_string classname) factory

@ -17,7 +17,7 @@
type 'a t =
| Index of int
| Field of string
| Field of bytes
| Push
type 'a adapter_t = {
@ -26,8 +26,8 @@ type 'a adapter_t = {
push: 'a -> 'a -> 'a;
empty_array: unit -> 'a;
get_field: string -> 'a -> 'a;
set_field: string -> 'a -> 'a -> 'a;
get_field: bytes -> 'a -> 'a;
set_field: bytes -> 'a -> 'a -> 'a;
empty_record: unit -> 'a;
@ -43,9 +43,7 @@ let parse_single b =
Ibuffer.skip_byte b; (* drop the open bracket *)
let istr = Ibuffer.until_char ']' b in
Ibuffer.skip_byte b; (* drop the close bracket *)
(match istr with
| "+" -> Push
| _ -> Index (int_of_string istr))
if istr = Bytes.of_string "+" then Push else Index (int_of_string (Bytes.to_string istr))
| '.' ->
Ibuffer.skip_byte b;
parse_fieldref b
@ -59,14 +57,14 @@ let rec parse b =
with End_of_file ->
let of_string s = parse (Ibuffer.of_string s)
let of_string s = parse (Ibuffer.of_bytes (Bytes.of_string s))
let to_string ps =
let rec walk is_first ps =
match ps with
| [] -> ""
| Index i :: rest -> "[" ^ string_of_int i ^ "]" ^ walk false rest
| Field s :: rest -> (if is_first then "" else ".") ^ s ^ walk false rest
| Field s :: rest -> (if is_first then "" else ".") ^ (Bytes.to_string s) ^ walk false rest
| Push :: rest -> "[+]" ^ walk false rest
in walk true ps

@ -17,12 +17,12 @@
open Lwt
let n_system_log = Node.name_of_string "system.log"
let n_system_log = Node.name_of_bytes (Bytes.of_string "system.log")
let hook_log () =
let old_hook = !Log.hook in
let new_hook label body =
ignore (Node.post n_system_log (Sexp.Str label) body (Sexp.Str ""));
ignore (Node.post n_system_log (Sexp.str label) body Sexp.emptystr);
old_hook label body
Log.hook := new_hook
@ -30,7 +30,7 @@ let hook_log () =
let create_ready_file () =
match Config.get "ready-file" with
| Some (Json.Str ready_file_path) ->
ignore (Log.info "Creating ready file" [Sexp.Str ready_file_path]);
ignore (Log.info "Creating ready file" [Sexp.str ready_file_path]);
return (close_out (open_out ready_file_path))
| Some other ->
ignore (Log.error "Ready file path not a string" [Sexpjson.sexp_of_json other]);
@ -40,12 +40,15 @@ let create_ready_file () =
let console_watcher () =
lwt _ = Lwt_io.read_line Lwt_io.stdin in
Server_control.milestone "Shutdown requested";
Server_control.milestone (Bytes.of_string "Shutdown requested");
return ()
lwt _ =
Printf.printf "%s %s, %s\n%s\n%!"
App_info.product App_info.version App_info.copyright App_info.licence_blurb;
(Bytes.to_string App_info.product)
(Bytes.to_string App_info.version)
(Bytes.to_string App_info.copyright)
(Bytes.to_string App_info.licence_blurb);
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
Uuid.init ();
Config.init ();
@ -57,20 +60,20 @@ lwt _ =
hook_log ();
lwt () = Config.conditional "amqp.enabled" true (fun () ->
ignore (Amqp_relay.init ());
Server_control.run_until "AMQP ready")
Server_control.run_until (Bytes.of_string "AMQP ready"))
lwt () = Config.conditional "http.enabled" true (fun () ->
ignore (Ui_main.init ());
ignore (Ui_relay.init ());
Server_control.run_until "HTTP ready")
Server_control.run_until (Bytes.of_string "HTTP ready"))
lwt () = Config.conditional "hop.enabled" true (fun () ->
ignore (Relay.init ());
Server_control.run_until "Hop ready")
Server_control.run_until (Bytes.of_string "Hop ready"))
ignore (console_watcher ());
if Server_control.is_running ()
then (lwt () = create_ready_file () in
Server_control.milestone "Server initialized";
Server_control.run_until "Shutdown requested")
Server_control.milestone (Bytes.of_string "Server initialized");
Server_control.run_until (Bytes.of_string "Shutdown requested"))
else return ()

@ -15,12 +15,15 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
let string_of_revlist acc len =
let buf = String.make len ' ' in
let bytes_of_revlist acc len =
let buf = Bytes.make len ' ' in
let rec fill cs i =
match cs with
| [] -> ()
| c :: cs' -> (String.set buf i c; fill cs' (i - 1))
| c :: cs' -> (Bytes.set buf i c; fill cs' (i - 1))
fill acc (len - 1);
let string_of_revlist acc len =
Bytes.to_string (bytes_of_revlist acc len)

@ -20,14 +20,14 @@ open Hof
type version = [`HTTP_1_0 | `HTTP_1_1]
type resp_version = [version | `SAME_AS_REQUEST]
type content = Fixed of string | Variable of string Lwt_stream.t
type content = Fixed of bytes | Variable of bytes Lwt_stream.t
type body = {
headers: (string * string) list;
content: content
let empty_content = Fixed ""
let empty_content = Fixed Bytes.empty
let empty_body = {headers = []; content = empty_content}
type req = {
@ -81,12 +81,12 @@ let http_error code reason body = raise_lwt (HTTPError (code, reason, body))
let http_error_plain code reason =
http_error code reason
{headers = [text_content_type_header]; content = Fixed reason}
{headers = [text_content_type_header]; content = Fixed (Bytes.of_string reason)}
let http_error_html_doc code reason doc =
http_error code reason
{headers = [html_content_type_header];
content = Variable (Html.stream_of_html_doc doc)}
content = Variable (Streamutil.stream_encode (Html.stream_of_html_doc doc))}
let html_error_doc code reason extra_body =
let code_str = string_of_int code in
@ -115,7 +115,7 @@ let resp_generic_ok headers content =
let resp_html_doc code reason extra_headers doc =
resp_generic code reason
(html_content_type_header :: extra_headers)
(Variable (Html.stream_of_html_doc doc))
(Variable (Streamutil.stream_encode (Html.stream_of_html_doc doc)))
let resp_html_doc_ok extra_headers doc = resp_html_doc 200 "OK" extra_headers doc
@ -173,22 +173,21 @@ let render_header cout (k, v) =
Lwt_io.write cout "\r\n"
let render_chunk cout chunk =
match chunk with
| "" -> return ()
| _ ->
lwt () = Lwt_io.write cout (Printf.sprintf "%x\r\n" (String.length chunk)) in
lwt () = Lwt_io.write cout chunk in
Lwt_io.write cout "\r\n"
let chunk_len = Bytes.length chunk in
if chunk_len = 0 then return () else
lwt () = Lwt_io.write cout (Printf.sprintf "%x\r\n" chunk_len) in
lwt () = Lwt_io.write_from_exactly cout chunk 0 chunk_len in
Lwt_io.write cout "\r\n"
let render_fixed_content cout s headers_only =
lwt () = render_header cout ("Content-Length", string_of_int (String.length s)) in
lwt () = render_header cout ("Content-Length", string_of_int (Bytes.length s)) in
lwt () = Lwt_io.write cout "\r\n" in
if headers_only then return () else Lwt_io.write cout s
if headers_only then return () else Lwt_io.write_from_exactly cout s 0 (Bytes.length s)
let string_of_content c =
let bytes_of_content c =
match c with
| Fixed s -> return s
| Variable s -> Streamutil.stream_to_string s
| Variable s -> Streamutil.stream_to_bytes s
let render_content cout v c headers_only =
match c with
@ -197,7 +196,7 @@ let render_content cout v c headers_only =
| Variable s ->
match v with
| `HTTP_1_0 ->
lwt str = Streamutil.stream_to_string s in
lwt str = Streamutil.stream_to_bytes s in
render_fixed_content cout str headers_only
| `HTTP_1_1 ->
if headers_only
@ -260,12 +259,12 @@ let parse_urlencoded q =
Lwt_list.map_s parse_urlencoded_binding pieces
let find_header' name hs =
let lc_name = String.lowercase name in
let lc_name = String.lowercase_ascii name in
let rec search hs =
match hs with
| [] -> raise Not_found
| (k, v) :: hs' ->
if String.lowercase k = lc_name
if String.lowercase_ascii k = lc_name
then v
else search hs'
@ -299,7 +298,7 @@ let parse_chunks cin =
fun () ->
lwt hexlen_str = input_crlf cin in
let chunk_len = Util.unhex hexlen_str in
let buffer = String.make chunk_len '\000' in
let buffer = Bytes.make chunk_len '\000' in
lwt () = Lwt_io.read_into_exactly cin buffer 0 chunk_len in
lwt chunk_terminator = input_crlf cin in
if chunk_terminator <> ""
@ -319,7 +318,7 @@ let parse_body cin =
return {headers = headers; content = empty_content}
| Some length_str ->
let length = int_of_string length_str in
let buffer = String.make length '\000' in
let buffer = Bytes.make length '\000' in
lwt () = Lwt_io.read_into_exactly cin buffer 0 length in
return {headers = headers; content = Fixed buffer})
| Some "chunked" ->
@ -424,9 +423,9 @@ let main handle_req (s, peername) =
resp_body = body;
completion_callbacks = [] }
| Sys_error message ->
Log.info "Sys_error in httpd handler" [Sexp.Str message]
Log.info "Sys_error in httpd handler" [Sexp.str message]
| exn ->
Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)]
Log.error "Uncaught exception in httpd handler" [Sexp.str (Printexc.to_string exn)]
lwt () = fire_pending_callbacks () in

@ -30,7 +30,7 @@ let sanitize_path p =
String.concat "/" (List.filter visible_path_component (Str.split sanitize_path_re p))
let extension_map ext =
match String.lowercase ext with
match String.lowercase_ascii ext with
| ".txt" -> Httpd.text_content_type
| ".html" | ".htm" -> Httpd.html_content_type
| ".bin" -> "application/octet-stream"
@ -51,11 +51,11 @@ let analyze_path p =
let stream_file handle =
let buflen = 4096 in
let buffer = String.make buflen '\000' in
let buffer = Bytes.make buflen '\000' in
fun () ->
let count = input handle buffer 0 buflen in
if count > 0
then return (Some (String.sub buffer 0 count))
then return (Some (Bytes.sub buffer 0 count))
else return None
let rec read_dir dirhandle =

@ -18,7 +18,7 @@
type t = {
mutable pos: int;
limit: int;
buf: string;
buf: bytes;
let create s ofs len = {
@ -27,7 +27,7 @@ let create s ofs len = {
buf = s
let of_string s = create s 0 (String.length s)
let of_bytes s = create s 0 (Bytes.length s)
let sub b ofs len =
if b.pos + ofs + len > b.limit
@ -46,13 +46,13 @@ let skip_byte b =
else raise End_of_file
let skip_ws b =
while b.pos < b.limit && String.get b.buf b.pos <= ' ' do
while b.pos < b.limit && Bytes.get b.buf b.pos <= ' ' do
b.pos <- b.pos + 1
let peek_char b =
if b.pos < b.limit
then String.get b.buf b.pos
then Bytes.get b.buf b.pos
else raise End_of_file
let peek_byte b = int_of_char (peek_char b)
@ -60,7 +60,7 @@ let peek_byte b = int_of_char (peek_char b)
let next_char b =
if b.pos < b.limit
let v = String.get b.buf b.pos in
let v = Bytes.get b.buf b.pos in
b.pos <- b.pos + 1;
@ -73,8 +73,8 @@ let next_chars b n =
raise End_of_file
let dst = String.create n in
String.blit b.buf b.pos dst 0 n;
let dst = Bytes.create n in
Bytes.blit b.buf b.pos dst 0 n;
b.pos <- b.pos + n;
@ -88,12 +88,12 @@ let until_pred pred b =
if remaining b = 0
if pred None
then Hopstr.string_of_revlist acc len
then Hopstr.bytes_of_revlist acc len
else raise End_of_file
let ch = peek_char b in
if pred (Some ch)
then Hopstr.string_of_revlist acc len
then Hopstr.bytes_of_revlist acc len
else loop (next_char b :: acc) (len + 1)
in loop [] 0

@ -135,7 +135,7 @@ let rec parse_str b (acc, len) =
| 'n' -> parse_str b (Char.chr 10 :: acc, len + 1)
| 'r' -> parse_str b (Char.chr 13 :: acc, len + 1)
| 't' -> parse_str b (Char.chr 9 :: acc, len + 1)
| 'u' -> parse_str b (accumulate_utf8 (Util.unhex (Ibuffer.next_chars b 4)) (acc, len))
| 'u' -> parse_str b (accumulate_utf8 (Util.unhex (Bytes.to_string (Ibuffer.next_chars b 4))) (acc, len))
| c -> parse_str b (c :: acc, len + 1))
| c -> parse_str b (c :: acc, len + 1)
@ -174,21 +174,21 @@ and parse b =
| '\"' -> parse_str b ([], 0)
| '[' -> parse_arr b []
| '{' -> parse_rec b []
| 't' -> if Ibuffer.next_chars b 3 = "rue" then Flg true else raise Syntax_error
| 'f' -> if Ibuffer.next_chars b 4 = "alse" then Flg false else raise Syntax_error
| 'n' -> if Ibuffer.next_chars b 3 = "ull" then Nil else raise Syntax_error
| 't' -> if Ibuffer.next_chars b 3 = (Bytes.of_string "rue") then Flg true else raise Syntax_error
| 'f' -> if Ibuffer.next_chars b 4 = (Bytes.of_string "alse") then Flg false else raise Syntax_error
| 'n' -> if Ibuffer.next_chars b 3 = (Bytes.of_string "ull") then Nil else raise Syntax_error
| '/' -> (* cheating *) skip_line_comment b; parse b
| _ -> raise Syntax_error
let of_string s = parse (Ibuffer.of_string s)
let of_string s = parse (Ibuffer.of_bytes (Bytes.of_string s))
let resp code reason extra_headers j =
Httpd.resp_generic code reason
((Httpd.content_type_header_name, "application/json") :: extra_headers)
(Httpd.Fixed (to_string j))
(Httpd.Fixed (Bytes.of_string (to_string j)))
let resp_ok extra_headers j = resp 200 "OK" extra_headers j
let load filename = of_string (Util.file_contents filename)
let load filename = of_string (Bytes.to_string (Util.file_contents filename))
let get j i =
match j with
@ -197,7 +197,7 @@ let get j i =
let find s j =
match j with
| Rec kvs -> List.assoc s kvs
| Rec kvs -> List.assoc (Bytes.to_string s) kvs
| _ -> failwith "Json.find"
let set j i v =
@ -209,9 +209,11 @@ let set j i v =
| _ ->
failwith "Json.set"
let add k v j =
let add kb v j =
match j with
| Rec kvs -> Rec (List.remove_assoc k kvs @ [k, v])
| Rec kvs ->
let k = Bytes.to_string kb in
Rec (List.remove_assoc k kvs @ [k, v])
| _ -> failwith "Json.add"
let push j v =
@ -250,7 +252,7 @@ let path_fold seed f g j =
let rec loop seed kvs =
match kvs with
| [] -> seed
| (k, v) :: kvs' -> loop (walk seed (Gpath.Field k :: prefixrev) v) kvs'
| (k, v) :: kvs' -> loop (walk seed (Gpath.Field (Bytes.of_string k) :: prefixrev) v) kvs'
in loop (g (List.rev prefixrev) (Rec []) seed) kvs
| other ->
f (List.rev prefixrev) other seed

@ -30,6 +30,6 @@ let write_to_log label body =
let hook = ref write_to_log
let info message args = (!hook) "info" (Arr (Str message :: args))
let warn message args = (!hook) "warn" (Arr (Str message :: args))
let error message args = (!hook) "error" (Arr (Str message :: args))
let info message args = (!hook) "info" (Arr (Sexp.str message :: args))
let warn message args = (!hook) "warn" (Arr (Sexp.str message :: args))
let error message args = (!hook) "error" (Arr (Sexp.str message :: args))

View File

@ -17,14 +17,15 @@
open Sexp
let n_meta = Node.name_of_string "meta"
let n_meta = Node.name_of_bytes (Bytes.of_string "meta")
let announce_subscription source filter sink name on_off =
Node.post_ignore n_meta (Str source.Node.label)
(if on_off
then Message.subscribed (Str source.Node.label, filter, Str sink, name)
else Message.unsubscribed (Str source.Node.label, filter, Str sink, name))
(Str "")
let init () =
Node.send_ignore' "factory" (Message.create (Str "direct", Arr [Str "meta"], Str "", Str ""))
Node.send_ignore' (Bytes.of_string "factory")
(Message.create (litstr "direct", Arr [litstr "meta"], emptystr, emptystr))

@ -26,9 +26,8 @@ let rec accept_loop sock connection_start_fn =
let start_net protocol_name port_number connection_start_fn =
let sock = socket Unix.PF_INET Unix.SOCK_STREAM 0 in
setsockopt sock Unix.SO_REUSEADDR true;
bind sock (Unix.ADDR_INET (Unix.inet_addr_any, port_number));
lwt () = bind sock (Unix.ADDR_INET (Unix.inet_addr_any, port_number)) in
listen sock 5;
Server_control.milestone (protocol_name ^ " ready");
ignore (Log.info "Accepting connections"
[Sexp.Str protocol_name; Sexp.Str (string_of_int port_number)]);
Server_control.milestone (Bytes.of_string (protocol_name ^ " ready"));
ignore (Log.info "Accepting connections" [Sexp.str protocol_name; Sexp.str (string_of_int port_number)]);
accept_loop sock connection_start_fn

@ -22,13 +22,13 @@ open Status
type handle_message_t = t -> Sexp.t -> unit Lwt.t
and t = {
mutable names: StringSet.t;
mutable names: BytesSet.t;
mutable send_counter: int;
class_name: string;
class_name: bytes;
handle_message: handle_message_t
and name = {
label: string;
label: bytes;
mutable binding: t option
@ -39,31 +39,31 @@ module NameTable = Weak.Make(struct
module NameSet = Set.Make(struct
type t = name
let compare a b = String.compare a.label b.label
let compare a b = Bytes.compare a.label b.label
let name_table = NameTable.create 100
let directory = ref NameSet.empty
let name_of_string str =
let name_of_bytes str =
let template = {label = str; binding = None} in
NameTable.merge name_table template
let caching_name_of_string () =
let caching_name_of_bytes () =
let cache = ref None in
fun str ->
match !cache with
| Some ({label = k} as n) when k = str ->
| _ ->
let n = name_of_string str in
let n = name_of_bytes str in
cache := Some n;
let local_container_name () = "server"
let local_container_name () = (Bytes.of_string "server")
let make class_name handler = {
names = StringSet.empty;
names = BytesSet.empty;
send_counter = 0;
class_name = class_name;
handle_message = handler
@ -72,7 +72,7 @@ let make class_name handler = {
let lookup name = name.binding
let all_node_names () = NameSet.elements !directory
let all_node_name_strings () = List.map (fun x -> x.label) (all_node_names ())
let all_node_name_bytes () = List.map (fun x -> x.label) (all_node_names ())
(* Approximate because it doesn't lock or run in a transaction *)
let approx_exists name =
@ -81,14 +81,14 @@ let approx_exists name =
| None -> false
let bind (filter, node) =
if filter.label = ""
if filter.label = Bytes.empty
then (ignore (Log.warn "Binding to empty name forbidden" []); return false)
match filter.binding with
| None ->
filter.binding <- Some node;
directory := NameSet.add filter !directory;
node.names <- StringSet.add filter.label node.names;
node.names <- BytesSet.add filter.label node.names;
ignore (Log.info "Node bound" [Sexp.Str filter.label; Sexp.Str node.class_name]);
return true
| Some _ ->
@ -99,7 +99,7 @@ let make_named class_name node_name handler =
let node = make class_name handler in
match_lwt bind (node_name, node) with
| true -> return (Ok node)
| false -> return (Problem (Sexp.Str "bind-failed"))
| false -> return (Problem (Sexp.litstr "bind-failed"))
(* For use in factory constructor functions, hence the odd return type and values *)
let make_idempotent_named class_name node_name if_new_node handler =
@ -107,18 +107,18 @@ let make_idempotent_named class_name node_name if_new_node handler =
| Some n ->
return (if n.class_name = class_name
then Ok n
else Problem (Sexp.Str "class-mismatch"))
else Problem (Sexp.litstr "class-mismatch"))
| None ->
let node = make class_name handler in
match_lwt bind (node_name, node) with
| true -> lwt () = if_new_node () in return (Ok node)
| false -> return (Problem (Sexp.Str "bind-failed"))
| false -> return (Problem (Sexp.litstr "bind-failed"))
let unbind name =
match lookup name with
| Some n ->
ignore (Log.info "Node unbound" [Sexp.Str name.label; Sexp.Str n.class_name]);
n.names <- StringSet.remove name.label n.names;
n.names <- BytesSet.remove name.label n.names;
name.binding <- None;
directory := NameSet.remove name !directory;
return true
@ -128,10 +128,10 @@ let unbind name =
let unbind_all n =
lwt () =
(fun name -> lwt _ = unbind (name_of_string name) in return ())
(StringSet.elements n.names)
(fun name -> lwt _ = unbind (name_of_bytes name) in return ())
(BytesSet.elements n.names)
n.names <- StringSet.empty;
n.names <- BytesSet.empty;
return ()
let send name body =
@ -141,8 +141,8 @@ let send name body =
(try_lwt n.handle_message n body
with e ->
Log.warn "Node message handler raised exception"
[Sexp.Str name.label;
Sexp.Str (Printexc.to_string e)])
[Sexp.Str name.label;
Sexp.str (Printexc.to_string e)])
n.send_counter <- n.send_counter + 1;
lwt () =
@ -154,12 +154,12 @@ let send name body =
| None ->
return false
let send' str body = send (name_of_string str) body
let send' str body = send (name_of_bytes str) body
let post name label body token =
send name (Message.post (label, body, token))
let post' str label body token = post (name_of_string str) label body token
let post' str label body token = post (name_of_bytes str) label body token
let bind_ignore (filter, node) =
match_lwt bind (filter, node) with
@ -170,13 +170,13 @@ let send_ignore name body =
match_lwt send name body with
| true -> return ()
| false ->
if name.label = ""
if name.label = Bytes.empty
then return ()
else Log.warn "send to missing node" [Sexp.Str name.label; body]
let send_ignore' str body = send_ignore (name_of_string str) body
let send_ignore' str body = send_ignore (name_of_bytes str) body
let post_ignore name label body token =
send_ignore name (Message.post (label, body, token))
let post_ignore' str label body token = post_ignore (name_of_string str) label body token
let post_ignore' str label body token = post_ignore (name_of_bytes str) label body token

@ -20,41 +20,41 @@
(* Extensible buffers *)
type t =
{mutable buffer : string;
{mutable buffer : bytes;
mutable position : int;
mutable length : int;
initial_buffer : string}
initial_buffer : bytes}
let create n =
let n = if n < 1 then 1 else n in
let n = if n > Sys.max_string_length then Sys.max_string_length else n in
let s = String.create n in
let s = Bytes.create n in
{buffer = s; position = 0; length = n; initial_buffer = s}
let contents b = String.sub b.buffer 0 b.position
let contents b = Bytes.sub b.buffer 0 b.position
let sub b ofs len =
if ofs < 0 || len < 0 || ofs > b.position - len
then invalid_arg "Obuffer.sub"
else begin
let r = String.create len in
String.blit b.buffer ofs r 0 len;
let r = Bytes.create len in
Bytes.blit b.buffer ofs r 0 len;
let blit src srcoff dst dstoff len =
if len < 0 || srcoff < 0 || srcoff > src.position - len
|| dstoff < 0 || dstoff > (String.length dst) - len
|| dstoff < 0 || dstoff > (Bytes.length dst) - len
then invalid_arg "Obuffer.blit"
String.blit src.buffer srcoff dst dstoff len
Bytes.blit src.buffer srcoff dst dstoff len
let nth b ofs =
if ofs < 0 || ofs >= b.position then
invalid_arg "Obuffer.nth"
else String.get b.buffer ofs
else Bytes.get b.buffer ofs
let length b = b.position
@ -63,7 +63,7 @@ let clear b = b.position <- 0
let reset b =
b.position <- 0; b.buffer <- b.initial_buffer;
b.length <- String.length b.buffer
b.length <- Bytes.length b.buffer
let resize b more =
let len = b.length in
@ -74,30 +74,30 @@ let resize b more =
then new_len := Sys.max_string_length
else failwith "Obuffer.add: cannot grow buffer"
let new_buffer = String.create !new_len in
String.blit b.buffer 0 new_buffer 0 b.position;
let new_buffer = Bytes.create !new_len in
Bytes.blit b.buffer 0 new_buffer 0 b.position;
b.buffer <- new_buffer;
b.length <- !new_len
let add_char b c =
let pos = b.position in
if pos >= b.length then resize b 1;
b.buffer.[pos] <- c;
Bytes.set b.buffer pos c;
b.position <- pos + 1
let add_substring b s offset len =
if offset < 0 || len < 0 || offset > String.length s - len
if offset < 0 || len < 0 || offset > Bytes.length s - len
then invalid_arg "Obuffer.add_substring";
let new_position = b.position + len in
if new_position > b.length then resize b len;
String.blit s offset b.buffer b.position len;
Bytes.blit s offset b.buffer b.position len;
b.position <- new_position
let add_string b s =
let len = String.length s in
let len = Bytes.length s in
let new_position = b.position + len in
if new_position > b.length then resize b len;
String.blit s 0 b.buffer b.position len;
Bytes.blit s 0 b.buffer b.position len;
b.position <- new_position
let add_buffer b bs =

@ -32,12 +32,12 @@ type t = {
mutable waiters: int;
let classname = "queue"
let classname = (Bytes.of_string "queue")
let report info =
while_lwt true do
lwt () = Log.info (Printf.sprintf "%s: %d backlog, %d waiters"
(Bytes.to_string info.name.Node.label)
info.waiters) [] in
Lwt_unix.sleep 1.0
@ -93,7 +93,7 @@ let queue_factory arg =
let (bin, bout) = Lwt_stream.create () in
let (win, wout) = Lwt_stream.create () in
let info = {
name = Node.name_of_string name_str;
name = Node.name_of_bytes name_str;
subscriptions = Subscription.new_set ();
backlog_in = bin;
backlog_out = bout;
@ -111,7 +111,7 @@ let queue_factory arg =
(queue_handler info))
(Str name_str)
| _ ->
return (Problem (Str "bad-arg"))
return (Problem (litstr "bad-arg"))
let init () =
Factory.register_class classname queue_factory

@ -26,36 +26,36 @@ let send_error ch message details =
ch m
let send_sexp_syntax_error ch explanation =
send_error ch explanation (Str "http://people.csail.mit.edu/rivest/Sexp.txt")
send_error ch explanation (litstr "http://people.csail.mit.edu/rivest/Sexp.txt")
let dispatch_message n ch =
let lookup = Node.caching_name_of_string () in
let lookup = Node.caching_name_of_bytes () in
| Message.Post (Str name, body, token) ->
Node.send_ignore (lookup name) body
| Message.Subscribe (Str filter, sink, name, Str reply_sink, Str reply_name) ->
(match_lwt Node.bind (Node.name_of_string filter, n) with
(match_lwt Node.bind (Node.name_of_bytes filter, n) with
| true ->
(Str reply_name)
(Message.subscribe_ok (Str filter))
(Str "")
| false ->
Log.warn "Bind failed" [Str filter])
| Message.Unsubscribe (Str token) ->
(match_lwt Node.unbind (Node.name_of_string token) with
(match_lwt Node.unbind (Node.name_of_bytes token) with
| true -> return ()
| false -> Log.warn "Unbind failed" [Str token])
| m ->
send_error ch "Message not understood" (Message.sexp_of_message m)
send_error ch (Bytes.of_string "Message not understood") (Message.sexp_of_message m)
let issue_banner cin cout =
lwt () = output_sexp cout (Arr [Str "hop"; Str ""]) in
lwt () = output_sexp cout (Arr [litstr "hop"; emptystr]) in
lwt () = Lwt_io.flush cout in
lwt () = output_sexp cout (Message.subscribe (Str (Node.local_container_name()),
Str "", Str "",
Str "", Str "")) in
emptystr, emptystr,
emptystr, emptystr)) in
lwt () = Lwt_io.flush cout in
return true
@ -74,9 +74,10 @@ let relay_mainloop (peername, mtx, cin, cout) n =
| Sexp.Syntax_error explanation ->
lwt () = send_sexp_syntax_error write_sexp explanation in
lwt () = send_sexp_syntax_error write_sexp (Bytes.of_string explanation) in
Log.info "Disconnected relay for syntax error"
[Str (Connections.endpoint_name peername); Str explanation])
[str (Connections.endpoint_name peername);
str explanation])
let start (s, peername) =
Connections.start_connection "relay" issue_banner
@ -84,4 +85,4 @@ let start (s, peername) =
let init () =
let port = Config.get_int "hop.port" 5671 in
Util.create_daemon_thread "Hop listener" None (Net.start_net "Hop" port) start
Util.create_daemon_thread (Bytes.of_string "Hop listener") None (Net.start_net "Hop" port) start

@ -21,7 +21,7 @@ open Datastructures
let continue_running = ref true
let (cq_in, cq_out) = Lwt_stream.create ()
let achieved_milestones = ref StringSet.empty
let achieved_milestones = ref BytesSet.empty
let milestone name = cq_out (Some (`Milestone name))
@ -30,7 +30,7 @@ let shutdown_now details = cq_out (Some (`Shutdown details))
let is_milestone_achieved m =
match m with
| Some m' ->
StringSet.mem m' !achieved_milestones
BytesSet.mem m' !achieved_milestones
| None ->
@ -46,7 +46,7 @@ let rec run' until_milestone =
return ()
| `Milestone name ->
ignore (Log.info "Achieved milestone" [Sexp.Str name]);
achieved_milestones := StringSet.add name !achieved_milestones;
achieved_milestones := BytesSet.add name !achieved_milestones;
run' until_milestone)
let is_running () = !continue_running

@ -22,33 +22,39 @@ open Lwt_io
exception Syntax_error of string
type display_hint_t = {hint : string; body : string}
type display_hint_t = {hint : bytes; body : bytes}
and t =
| Str of string
| Str of bytes
| Hint of display_hint_t
| Arr of t list
let compare a b = Pervasives.compare a b
let emptystr = Str Bytes.empty
let litstr s = Str (Bytes.unsafe_of_string s)
let str s = Str (Bytes.of_string s)
let compare a b = Stdlib.compare a b
let digit_val c = (int_of_char c) - (int_of_char '0')
let val_digit n = char_of_int (n + 48)
let intstr =
let bytes_of_int =
let siz = 40 in
let buf = String.make siz (* enough for 128 bits *) ' ' in
let buf = Bytes.make siz (* enough for 128 bits *) ' ' in
| 0 -> "0"
| 0 -> Bytes.of_string "0"
| n ->
let rec loop n i =
if n = 0
then String.sub buf (siz - i) i
else (String.unsafe_set buf (siz - i - 1) (val_digit (n mod 10));
then Bytes.sub buf (siz - i) i
else (Bytes.unsafe_set buf (siz - i - 1) (val_digit (n mod 10));
loop (n / 10) (i + 1))
in loop n 0
let int_of_bytes bs = int_of_string (Bytes.to_string bs)
let write_simple_string write s =
lwt () = write (intstr (String.length s)) in
lwt () = write ":" in
lwt () = write (bytes_of_int (Bytes.length s)) in
lwt () = write (Bytes.of_string ":") in
write s
let generic_output_sexp write x =
@ -58,17 +64,19 @@ let generic_output_sexp write x =
| Str s ->
writestr s
| Hint {hint = h; body = b} ->
lwt () = write "[" in
lwt () = write (Bytes.of_string "[") in
lwt () = writestr h in
lwt () = write "]" in
lwt () = write (Bytes.of_string "]") in
writestr b
| Arr xs ->
lwt () = write "(" in
lwt () = write (Bytes.of_string "(") in
lwt () = Lwt_list.iter_s walk xs in
write ")"
write (Bytes.of_string ")")
in walk x
let output_sexp ch x = generic_output_sexp (write ch) x
let write_bytes ch bs = write_from_exactly ch bs 0 (Bytes.length bs)
let output_sexp ch x = generic_output_sexp (write_bytes ch) x
let stream_of_sexp x = Streamutil.stream_generator (fun yield -> generic_output_sexp yield x)
let write_char_escaped ch c =
@ -78,7 +86,7 @@ let write_char_escaped ch c =
let write_simple_string_human ch s =
lwt () = write_char ch '\"' in
lwt () = write ch (String.escaped s) in
lwt () = write_bytes ch (Bytes.escaped s) in
write_char ch '\"'
let rec output_sexp_human ch x =
@ -109,7 +117,7 @@ let char_numeric c = '0' <= c && c <= '9'
let char_whitespace c = c <= ' '
let input_bytes ch count =
let buf = String.create count in (* mutable strings?!?! *)
let buf = Bytes.create count in
lwt () = read_into_exactly ch buf 0 count in
return buf
@ -154,8 +162,8 @@ let parse b =
(fun () -> return (Ibuffer.next_char b))
(fun count -> return (Ibuffer.next_chars b count))
let sexp_of_string s = parse (Ibuffer.of_string s)
let string_of_sexp x = Streamutil.stream_to_string (stream_of_sexp x)
let sexp_of_bytes s = parse (Ibuffer.of_bytes s)
let bytes_of_sexp x = Streamutil.stream_to_bytes (stream_of_sexp x)
let assoc' key v =
match v with

@ -2,29 +2,30 @@ open Sexp
let rec sexp_of_json j =
match j with
| Json.Num f -> Hint {hint = "num"; body = Json.to_string j}
| Json.Str s -> Str s
| Json.Num f -> Hint {hint = Bytes.of_string "num"; body = Bytes.of_string (Json.to_string j)}
| Json.Str s -> str s
| Json.Arr js -> Arr (List.map sexp_of_json js)
| Json.Rec kvs -> Arr ((Hint {hint = "obj"; body = ""}) ::
(List.map (fun (k, v) -> Arr [Str k; sexp_of_json v]) kvs))
| Json.Flg f -> Hint {hint = "bool"; body = string_of_bool f}
| Json.Nil -> Hint {hint = "null"; body = ""}
| Json.Rec kvs -> Arr ((Hint {hint = Bytes.of_string "obj"; body = Bytes.empty}) ::
(List.map (fun (k, v) -> Arr [str k; sexp_of_json v]) kvs))
| Json.Flg f -> Hint {hint = (Bytes.of_string "bool"); body = (Bytes.of_string (string_of_bool f))}
| Json.Nil -> Hint {hint = (Bytes.of_string "null"); body = Bytes.empty}
let json_of_sexp x =
let rec walk x =
match x with
| Hint {hint = "num"; body = n} -> Json.Num (float_of_string n)
| Str s -> Json.Str s
| Arr ((Hint {hint = "obj"; body = ""}) :: kvs) ->
| Hint {hint = hint; body = n} when hint = Bytes.of_string "num" ->
Json.Num (float_of_string (Bytes.to_string n))
| Str s -> Json.Str (Bytes.to_string s)
| Arr ((Hint {hint = hint; body = bs}) :: kvs) when hint = Bytes.of_string "obj" && bs = Bytes.empty ->
Json.Rec (List.map
(fun kv ->
(match kv with
| Arr [Str k; v] -> (k, walk v)
| Arr [Str k; v] -> (Bytes.to_string k, walk v)
| _ -> raise (Syntax_error "Bad JSON-SEXP key-value")))
| Arr xs -> Json.Arr (List.map walk xs)
| Hint {hint = "bool"; body = bs} -> Json.Flg (bool_of_string bs)
| Hint {hint = "null"; body = ""} -> Json.Nil
| Hint {hint = h; body = b} -> Json.Rec ["_hint", Json.Str h; "_body", Json.Str b]
| Hint {hint = hint; body = bs} when hint = Bytes.of_string "bool" -> Json.Flg (bool_of_string (Bytes.to_string bs))
| Hint {hint = hint; body = bs} when hint = Bytes.of_string "null" && bs = Bytes.empty -> Json.Nil
| Hint {hint = h; body = b} -> Json.Rec ["_hint", Json.Str (Bytes.to_string h); "_body", Json.Str (Bytes.to_string b)]
Lwt.wrap1 walk x

@ -17,9 +17,9 @@
open Lwt
let stream_to_string s =
let stream_to_bytes s =
lwt pieces = Lwt_stream.to_list s in
return (String.concat "" pieces)
return (Bytes.concat Bytes.empty pieces)
let stream_generator f =
let mbox = Lwt_mvar.create_empty () in
@ -27,3 +27,5 @@ let stream_generator f =
ignore (lwt () = f yield in
Lwt_mvar.put mbox None);
Lwt_stream.from (fun () -> Lwt_mvar.take mbox)
let stream_encode s = Lwt_stream.map Bytes.of_string s

@ -32,29 +32,29 @@ type creation_t =
type set_t = {
mutable subscription_table: Uuid.t SexpMap.t;
mutable uuid_table: t StringMap.t
mutable uuid_table: t BytesMap.t
let new_set () = {
subscription_table = SexpMap.empty;
uuid_table = StringMap.empty
uuid_table = BytesMap.empty
let count subs = SexpMap.cardinal subs.subscription_table
let key_from sink_str name filter = Sexp.Arr [Sexp.Str sink_str; name; filter]
let key_from sink_bs name filter = Sexp.Arr [Sexp.Str sink_bs; name; filter]
let create source subs filter sink_str name reply_sink reply_name =
let key = key_from sink_str name filter in
let uuid = SexpMap.find key subs.subscription_table in
lwt () =
Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "")
Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.str uuid)) Sexp.emptystr
return (Old (StringMap.find uuid subs.uuid_table))
return (Old (BytesMap.find (Bytes.of_string uuid) subs.uuid_table))
with Not_found ->
let uuid = Uuid.create () in
let sink = Node.name_of_string sink_str in
let sink = Node.name_of_bytes sink_str in
let sub = {
live = true;
uuid = uuid;
@ -62,19 +62,19 @@ let create source subs filter sink_str name reply_sink reply_name =
sink = sink;
name = name
} in
subs.uuid_table <- StringMap.add uuid sub subs.uuid_table;
subs.uuid_table <- BytesMap.add (Bytes.of_string uuid) sub subs.uuid_table;
subs.subscription_table <- SexpMap.add key uuid subs.subscription_table;
lwt () = Lwt.join [
Meta.announce_subscription source filter sink_str name true;
Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "")
Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.str uuid)) Sexp.emptystr
] in
return (New sub)
let delete source subs uuid =
let sub = StringMap.find uuid subs.uuid_table in
let sub = BytesMap.find uuid subs.uuid_table in
sub.live <- false;
subs.uuid_table <- StringMap.remove uuid subs.uuid_table;
subs.uuid_table <- BytesMap.remove uuid subs.uuid_table;
let key = key_from sub.sink.Node.label sub.name sub.filter in
subs.subscription_table <- SexpMap.remove key subs.subscription_table;
lwt () = Meta.announce_subscription source sub.filter sub.sink.Node.label sub.name false in
@ -83,16 +83,16 @@ let delete source subs uuid =
return None
let lookup subs uuid =
try Some (StringMap.find uuid subs.uuid_table)
try Some (BytesMap.find uuid subs.uuid_table)
with Not_found -> None
let send_to_subscription' sub body delete_action =
if not sub.live
then return false
match_lwt Node.post sub.sink sub.name body (Sexp.Str sub.uuid) with
match_lwt Node.post sub.sink sub.name body (Sexp.str sub.uuid) with
| true -> return true
| false -> (lwt _ = delete_action sub.uuid in return false)
let send_to_subscription source subs sub body =
send_to_subscription' sub body (fun (uuid) -> delete source subs uuid)
send_to_subscription' sub body (fun (uuid) -> delete source subs (Bytes.of_string uuid))

@ -56,7 +56,7 @@ let handle_req id r =
let start (s, peername) =
let id = "http-" ^ Uuid.create () in
Util.create_thread (Connections.endpoint_name peername ^ " HTTP service")
Util.create_thread (Bytes.of_string (Connections.endpoint_name peername ^ " HTTP service"))
(Httpd.main (handle_req id))
(s, peername)
@ -67,7 +67,7 @@ let api_server_stats _ id r =
["connection_count", Json.Num (float_of_int !Connections.connection_count);
"boot_time", Json.Num boot_time;
"uptime", Json.Num (Unix.time () -. boot_time);
"classes", Json.Arr (List.map Json.str (Factory.all_class_names ()))])
"classes", Json.Arr (List.map Json.str (List.map Bytes.to_string (Factory.all_class_names ())))])
|> Httpd.add_date_header
let api_nodes _ id r =
@ -80,16 +80,17 @@ let api_nodes _ id r =
Json.resp_ok []
(fun (class_name, node_names) -> (class_name, Json.Arr (List.map Json.str node_names)))
(StringMap.bindings info)))
(fun (class_name, node_names) -> (Bytes.to_string class_name,
Json.Arr (List.map Json.str (List.map Bytes.to_string node_names))))
(BytesMap.bindings info)))
|> Httpd.add_date_header
let api_node_info suffix id r =
(match Node.lookup (Node.name_of_string suffix) with
(match Node.lookup (Node.name_of_bytes (Bytes.of_string suffix)) with
| Some n ->
Json.resp_ok [] (Json.Rec
["names", Json.Arr (List.map Json.str (StringSet.elements n.Node.names));
"class_name", Json.Str n.Node.class_name])
["names", Json.Arr (List.map Json.str (List.map Bytes.to_string (BytesSet.elements n.Node.names)));
"class_name", Json.Str (Bytes.to_string n.Node.class_name)])
| None ->
Json.resp 404 "No such node name" [] Json.Nil)
|> Httpd.add_date_header
@ -99,4 +100,4 @@ let init () =
register_dispatcher ("/_/nodes", api_nodes);
register_dispatcher ("/_/node/", api_node_info);
let port = Config.get_int "http.port" 5678 in
Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" port) start
Util.create_daemon_thread (Bytes.of_string "HTTP listener") None (Net.start_net "HTTP" port) start

@ -19,20 +19,20 @@ open Lwt
open Hof
open Datastructures
let all_sources = ref StringMap.empty
let all_sources = ref BytesMap.empty
let rec api_tap_source id r =
let mbox = Lwt_mvar.create
(Some (Message.subscribe (Sexp.Str (Node.local_container_name()),
Sexp.Str "", Sexp.Str "",
Sexp.Str "", Sexp.Str ""))) in
Sexp.emptystr, Sexp.emptystr,
Sexp.emptystr, Sexp.emptystr))) in
let handle_message n sexp = Lwt_mvar.put mbox (Some sexp) in
let n = Node.make "http_tap" handle_message in
all_sources := StringMap.add id n !all_sources;
let n = Node.make (Bytes.of_string "http_tap") handle_message in
all_sources := BytesMap.add id n !all_sources;
let shutdown () =
all_sources := StringMap.remove id !all_sources;
all_sources := BytesMap.remove id !all_sources;
lwt () = Node.unbind_all n in
Lwt_mvar.put mbox None
@ -40,10 +40,11 @@ let rec api_tap_source id r =
let generator yield =
let body_counter = ref 0 in
let yield_and_count s =
body_counter := String.length s + !body_counter;
yield s
let bs = Bytes.of_string s in
body_counter := Bytes.length bs + !body_counter;
yield bs
lwt () = yield_and_count (id ^ ";" ^ String.make 2048 'h' ^ ";") in
lwt () = yield_and_count ((Bytes.to_string id) ^ ";" ^ String.make 2048 'h' ^ ";") in
let rec drain_mbox () =
match_lwt Lwt_mvar.take mbox with
| None -> return ()
@ -74,20 +75,20 @@ let dispatch_message n m =
lwt () = Node.send_ignore' name body in
Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content)
| Message.Subscribe (Sexp.Str filter, sink, name, Sexp.Str reply_sink, Sexp.Str reply_name) ->
(match_lwt Node.bind (Node.name_of_string filter, n) with
(match_lwt Node.bind (Node.name_of_bytes filter, n) with
| true ->
lwt () = Node.post_ignore'
(Sexp.Str reply_name)
(Message.subscribe_ok (Sexp.Str filter))
(Sexp.Str "")
Httpd.resp_generic 204 "Bound" [] (Httpd.empty_content)
| false ->
lwt () = Log.warn "Bind failed" [Sexp.Str filter] in
Httpd.http_error_html 409 "Bind failed" [])
| Message.Unsubscribe (Sexp.Str token) ->
(match_lwt Node.unbind (Node.name_of_string token) with
(match_lwt Node.unbind (Node.name_of_bytes token) with
| true ->
Httpd.resp_generic 204 "Unbound" [] (Httpd.empty_content)
| false ->
@ -97,13 +98,13 @@ let dispatch_message n m =
Httpd.http_error_html 406 "Message not understood" []
let api_tap_sink irrelevant_id r =
lwt content = Httpd.string_of_content r.Httpd.req_body.Httpd.content in
lwt params = Httpd.parse_urlencoded content in
lwt content = Httpd.bytes_of_content r.Httpd.req_body.Httpd.content in
lwt params = Httpd.parse_urlencoded (Bytes.to_string content) in
match Httpd.find_param "metadata.type" params with
| Some (Some "send") ->
(match Httpd.find_param "metadata.id" params with
| Some (Some id) ->
(match (try Some (StringMap.find id !all_sources) with Not_found -> None) with
(match (try Some (BytesMap.find (Bytes.of_string id) !all_sources) with Not_found -> None) with
| Some n ->
(match Httpd.find_param "data" params with
| Some (Some data_str) ->
@ -123,8 +124,8 @@ let api_tap_sink irrelevant_id r =
let api_tap _ id r =
match r.Httpd.verb with
| "GET" -> api_tap_source id r
| "POST" -> api_tap_sink id r
| "GET" -> api_tap_source (Bytes.of_string id) r
| "POST" -> api_tap_sink (Bytes.of_string id) r
| _ -> Httpd.http_error_html 400 "Unsupported tap method" []
let init () =

@ -20,14 +20,14 @@ open Sexp
open Printf
let message_not_understood context m =
Log.warn "Message not understood" [Str context; Message.sexp_of_message m]
Log.warn "Message not understood" [str context; Message.sexp_of_message m]
let create_thread name cleanup main initarg =
let guarded_main initarg =
main initarg
with e ->
lwt () = Log.warn "Thread died with exception" [Str name; Str (Printexc.to_string e)] in
lwt () = Log.warn "Thread died with exception" [Str name; str (Printexc.to_string e)] in
(match cleanup with
| Some cleaner -> cleaner e
| None -> return ())
@ -38,7 +38,7 @@ let daemon_thread_died name nested_cleaner e =
lwt () = (match nested_cleaner with
| Some c -> c e
| None -> return ()) in
Server_control.shutdown_now [Sexp.Str "Daemon thread exited"; Sexp.Str name];
Server_control.shutdown_now [litstr "Daemon thread exited"; Str name];
return ()
let create_daemon_thread name cleanup main initarg =
@ -107,7 +107,7 @@ let stream_generator f =
let file_contents filename =
let ch = open_in filename in
let len = in_channel_length ch in
let buf = String.make len ' ' in
let buf = Bytes.make len ' ' in
really_input ch buf 0 len;
close_in ch;