From 474a8f1f745c4709cedb3dd111a0b5c98c3c37cf Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 1 Jun 2020 14:34:55 +0200 Subject: [PATCH] Quick-and-dirty port forward to ocaml 4.08.1 --- server/Makefile | 2 +- server/amqp_codegen.py | 10 +- server/amqp_relay.ml | 235 +++++++++++++++++++++----------------- server/amqp_wireformat.ml | 116 ++++++++++--------- server/app_info.ml | 14 +-- server/codegen.py | 6 +- server/config.ml | 5 +- server/connections.ml | 13 +-- server/datastructures.ml | 14 +-- server/directnode.ml | 32 +++--- server/factory.ml | 20 ++-- server/fanoutnode.ml | 14 +-- server/gpath.ml | 14 +-- server/hop_server.ml | 23 ++-- server/hopstr.ml | 9 +- server/httpd.ml | 43 ++++--- server/httpd_file.ml | 6 +- server/ibuffer.ml | 18 +-- server/json.ml | 24 ++-- server/log.ml | 6 +- server/meta.ml | 7 +- server/net.ml | 7 +- server/node.ml | 52 ++++----- server/obuffer.ml | 34 +++--- server/queuenode.ml | 8 +- server/relay.ml | 25 ++-- server/server_control.ml | 6 +- server/sexp.ml | 46 +++++--- server/sexpjson.ml | 27 ++--- server/streamutil.ml | 6 +- server/subscription.ml | 26 ++--- server/ui_main.ml | 17 +-- server/ui_relay.ml | 35 +++--- server/util.ml | 8 +- 34 files changed, 489 insertions(+), 439 deletions(-) diff --git a/server/Makefile b/server/Makefile index 0a38e2d..ab8c0e3 100644 --- a/server/Makefile +++ b/server/Makefile @@ -4,7 +4,7 @@ HTML=$(subst web/bootstrap/templates/,web/,$(subst .xml,.html,$(TEMPLATES))) # Static builds. So far I've only seen this work on Linux. OS X complains about missing -lcrt0.o. # OCAMLBUILD=ocamlbuild -classic-display -use-ocamlfind -X scratch -lflag -cclib -lflag -static -OCAMLBUILD=ocamlbuild -classic-display -use-ocamlfind -X scratch +OCAMLBUILD=ocamlbuild -tag thread -classic-display -use-ocamlfind -X scratch all: \ message.ml amqp_spec.ml \ diff --git a/server/amqp_codegen.py b/server/amqp_codegen.py index 29a9bef..75fecda 100644 --- a/server/amqp_codegen.py +++ b/server/amqp_codegen.py @@ -241,13 +241,13 @@ def print_codec(): for m in methods: print m.match_clause if m.accessible_fields: - print ' Arr [Str "%s"; Str "%s"' % (m.class_name, m.name) + print ' Arr [litstr "%s"; litstr "%s"' % (m.class_name, m.name) for f in m.accessible_fields: - print ' ; Arr [Str "%s"; sexp_of_%s(%s)]' % \ + print ' ; Arr [litstr "%s"; sexp_of_%s(%s)]' % \ (f.name, mlify(f.type), mlify(f.name)) print ' ]' else: - print ' Arr [Str "%s"; Str "%s"]' % (m.class_name, m.name) + print ' Arr [litstr "%s"; litstr "%s"]' % (m.class_name, m.name) print print 'let method_name class_index method_index = match (class_index, method_index) with' for m in methods: @@ -300,7 +300,7 @@ def print_codec(): print c.match_clause print ' let fields__ = [] in' for f in reversed(c.accessible_fields): - print ' let fields__ = (match %s with Some v -> Arr [Str "%s"; sexp_of_%s(v)] :: fields__ | None -> fields__) in' % \ + print ' let fields__ = (match %s with Some v -> Arr [litstr "%s"; sexp_of_%s(v)] :: fields__ | None -> fields__) in' % \ (mlify(f.name), f.name, mlify(f.type)) print ' Arr fields__' print @@ -339,7 +339,7 @@ def print_codec(): print ' | Arr ps ->' print ' List.iter (fun (p) -> match p with' for f in c.accessible_fields: - print ' | Arr [Str "%s"; v] -> %s := Some (%s_of_sexp v)' % \ + print ' | Arr [Str k; v] when k = Bytes.of_string "%s" -> %s := Some (%s_of_sexp v)' % \ (f.name, mlify(f.name), mlify(f.type)) print ' | _ -> ()) ps' print ' | _ -> ());' diff --git a/server/amqp_relay.ml b/server/amqp_relay.ml index d91c1f2..8d226d7 100644 --- a/server/amqp_relay.ml +++ b/server/amqp_relay.ml @@ -26,7 +26,7 @@ type connection_t = { cin: Lwt_io.input_channel; cout: Lwt_io.output_channel; name: Node.name; - mutable input_buf: string; + mutable input_buf: bytes; mutable output_buf: Obuffer.t; mutable frame_max: int; mutable connection_closed: bool; @@ -42,8 +42,8 @@ let amqp_boot (peername, cin, cout) = return { mtx = Lwt_mutex.create (); cin = cin; cout = cout; - name = Node.name_of_string (Uuid.create ()); - input_buf = String.create initial_frame_size; + name = Node.name_of_bytes (Bytes.of_string (Uuid.create ())); + input_buf = Bytes.create initial_frame_size; output_buf = Obuffer.create initial_frame_size; frame_max = initial_frame_size; connection_closed = false; @@ -105,7 +105,7 @@ let deserialize_header buf = return (body_size, props) let send_content_body conn channel body = - let len = String.length body in + let len = Bytes.length body in let rec send_remainder offset = if offset >= len then return () @@ -149,43 +149,43 @@ let send_method conn channel m = serialize_method conn.output_buf m; write_frame conn frame_method channel) -let send_content_method conn channel m p body_str = +let send_content_method conn channel m p body_bs = with_conn_mutex conn (fun () -> serialize_method conn.output_buf m; lwt () = write_frame conn frame_method 1 in - serialize_header conn.output_buf (String.length body_str) p; + serialize_header conn.output_buf (Bytes.length body_bs) p; lwt () = write_frame conn frame_header 1 in - send_content_body conn 1 body_str) + send_content_body conn 1 body_bs) let send_error conn code message = if conn.connection_closed then return () else (conn.connection_closed <- true; - let m = Connection_close (code, message, 0, 0) in + let m = Connection_close (code, Bytes.of_string message, 0, 0) in ignore (Log.warn "Sending error" [sexp_of_method m]); send_method conn 0 m) let send_warning conn code message = - let m = Channel_close (code, message, 0, 0) in + let m = Channel_close (code, Bytes.of_string message, 0, 0) in ignore (Log.warn "Sending warning" [sexp_of_method m]); send_method conn 1 m let issue_banner cin cout = - let handshake = String.create 8 in + let handshake = Bytes.create 8 in try lwt () = Lwt_io.read_into_exactly cin handshake 0 8 in - if String.sub handshake 0 4 <> "AMQP" - then (lwt () = Lwt_io.write cout "AMQP\000\000\009\001" in return false) + if Bytes.sub handshake 0 4 <> (Bytes.of_string "AMQP") + then (lwt () = Lwt_io.write_from_exactly cout (Bytes.of_string "AMQP\000\000\009\001") 0 8 in return false) else (ignore (Log.info "AMQP handshake bytes" - [Sexp.Str (string_of_int (int_of_char (String.get handshake 4))); - Sexp.Str (string_of_int (int_of_char (String.get handshake 5))); - Sexp.Str (string_of_int (int_of_char (String.get handshake 6))); - Sexp.Str (string_of_int (int_of_char (String.get handshake 7)))]); + [Sexp.str (string_of_int (int_of_char (Bytes.get handshake 4))); + Sexp.str (string_of_int (int_of_char (Bytes.get handshake 5))); + Sexp.str (string_of_int (int_of_char (Bytes.get handshake 6))); + Sexp.str (string_of_int (int_of_char (Bytes.get handshake 7)))]); return true) with End_of_file -> return false -let reference_to_logs = "See server logs for details" +let reference_to_logs = (Bytes.of_string "See server logs for details") let extract_str v = match v with | Sexp.Str s -> s @@ -197,14 +197,14 @@ let reply_to_declaration conn status ok_fn = send_method conn 1 (ok_fn info) | Message.Create_failed reason -> (match reason with - | Sexp.Arr [Sexp.Str "factory"; Sexp.Str "class-not-found"] -> + | Sexp.Arr [Sexp.Str who; Sexp.Str code] when who = (Bytes.of_string "factory") && code = (Bytes.of_string "class-not-found")-> send_error conn command_invalid "Object type not supported by server" - | Sexp.Arr [Sexp.Str "constructor"; Sexp.Str "class-mismatch"] -> + | Sexp.Arr [Sexp.Str who; Sexp.Str code] when who = (Bytes.of_string "constructor") && code = (Bytes.of_string "class-mismatch") -> send_error conn not_allowed "Redeclaration with different object type not permitted" | Sexp.Arr [Sexp.Str who; explanation] -> - send_warning conn precondition_failed (who^" failed: "^(extract_str explanation)) + send_warning conn precondition_failed ((Bytes.to_string who)^" failed: "^(Bytes.to_string (extract_str explanation))) | _ -> - send_warning conn precondition_failed reference_to_logs) + send_warning conn precondition_failed (Bytes.to_string reference_to_logs)) | _ -> die internal_error "Declare reply malformed" let make_queue_declare_ok info = @@ -214,39 +214,47 @@ let make_queue_declare_ok info = let send_delivery conn consumer_tag body_sexp = match body_sexp with - | Sexp.Arr [Sexp.Hint {Sexp.hint = "amqp"; Sexp.body = ""}; + | Sexp.Arr [Sexp.Hint {Sexp.hint = maybe_amqp; Sexp.body = h_body_bs}; Sexp.Str exchange; Sexp.Str routing_key; properties_sexp; - Sexp.Str body_str] -> + Sexp.Str body_bs] when maybe_amqp = (Bytes.of_string "amqp") && h_body_bs = Bytes.empty -> lwt tag = with_conn_mutex conn (fun () -> let v = conn.delivery_tag in conn.delivery_tag <- v + 1; return v) in send_content_method conn 1 (Basic_deliver (consumer_tag, Int64.of_int tag, false, exchange, routing_key)) (properties_of_sexp basic_class_id properties_sexp) - body_str + body_bs | _ -> die internal_error "Malformed AMQP message body sexp" let amqp_handler conn n m_sexp = try (match Message.message_of_sexp m_sexp with - | Message.Post (Sexp.Str "Exchange_declare_reply", status, _) -> - reply_to_declaration conn status (fun (_) -> Exchange_declare_ok) - | Message.Post (Sexp.Str "Queue_declare_reply", status, _) -> - reply_to_declaration conn status make_queue_declare_ok - | Message.Post (Sexp.Str "Queue_bind_reply", status, _) -> - (match Message.message_of_sexp status with - | Message.Subscribe_ok _ -> send_method conn 1 Queue_bind_ok - | _ -> die internal_error "Queue bind reply malformed") - | Message.Post (Sexp.Arr [Sexp.Str "Basic_consume_reply"; Sexp.Str consumer_tag], status, _) -> - (match Message.message_of_sexp status with - | Message.Subscribe_ok _ -> send_method conn 1 (Basic_consume_ok consumer_tag) - | _ -> die internal_error "Basic consume reply malformed") - | Message.Post (Sexp.Arr [Sexp.Str "delivery"; Sexp.Str consumer_tag], body, _) -> - send_delivery conn consumer_tag body - | _ -> - Log.warn "AMQP outbound relay ignoring message" [m_sexp]) + | Message.Post (Sexp.Str type_bs, status, _) -> + (match Bytes.to_string type_bs with + | "Exchange_declare_reply" -> + reply_to_declaration conn status (fun (_) -> Exchange_declare_ok) + | "Queue_declare_reply" -> + reply_to_declaration conn status make_queue_declare_ok + | "Queue_bind_reply" -> + (match Message.message_of_sexp status with + | Message.Subscribe_ok _ -> send_method conn 1 Queue_bind_ok + | _ -> die internal_error "Queue bind reply malformed") + | _ -> + Log.warn "AMQP outbound relay ignoring message" [m_sexp]) + | Message.Post (Sexp.Arr [Sexp.Str type_bs; Sexp.Str consumer_tag], status_or_body, _) -> + (match Bytes.to_string type_bs with + | "Basic_consume_reply" -> + (match Message.message_of_sexp status_or_body with + | Message.Subscribe_ok _ -> send_method conn 1 (Basic_consume_ok consumer_tag) + | _ -> die internal_error "Basic consume reply malformed") + | "delivery" -> + send_delivery conn consumer_tag status_or_body + | _ -> + Log.warn "AMQP outbound relay ignoring message" [m_sexp]) + | _ -> + Log.warn "AMQP outbound relay ignoring message" [m_sexp]) with | Amqp_exception (code, message) -> send_error conn code message @@ -260,29 +268,26 @@ let get_recent_queue_name conn = | None -> die syntax_error "Attempt to use nonexistent most-recently-declared-queue name" let expand_mrdq conn queue = - match queue with - | "" -> get_recent_queue_name conn - | other -> Node.name_of_string other + if queue = Bytes.empty then get_recent_queue_name conn + else Node.name_of_bytes queue let handle_method conn channel m = (* ignore (Log.info "method" [sexp_of_method m]); *) if channel > 1 then die channel_error "Unsupported channel number" else (); match m with | Connection_close (code, text, _, _) -> - ignore (Log.info "Client closed AMQP connection" - [Sexp.Str (string_of_int code); Sexp.Str text]); + ignore (Log.info "Client closed AMQP connection" [Sexp.str (string_of_int code); Sexp.Str text]); lwt () = send_method conn channel Connection_close_ok in return (conn.connection_closed <- true) | Channel_open -> conn.delivery_tag <- 1; send_method conn channel Channel_open_ok | Channel_close (code, text, _, _) -> - ignore (Log.info "Client closed AMQP channel" - [Sexp.Str (string_of_int code); Sexp.Str text]); + ignore (Log.info "Client closed AMQP channel" [Sexp.str (string_of_int code); Sexp.Str text]); send_method conn channel Channel_close_ok; | Channel_close_ok -> return () - | Exchange_declare ("", type_, passive, durable, no_wait, arguments) -> + | Exchange_declare (maybe_empty, type_, passive, durable, no_wait, arguments) when maybe_empty = Bytes.empty -> (* Qpid does this bizarre thing of declaring the default exchange. *) if no_wait then return () @@ -290,21 +295,25 @@ let handle_method conn channel m = | Exchange_declare (exchange, type_, passive, durable, no_wait, arguments) -> let (reply_sink, reply_name) = if no_wait - then ("", "") - else (conn.name.Node.label, "Exchange_declare_reply") + then (Bytes.empty, Bytes.empty) + else (conn.name.Node.label, Bytes.of_string "Exchange_declare_reply") in - Node.send_ignore' "factory" (Message.create (Sexp.Str type_, - Sexp.Arr [Sexp.Str exchange], - Sexp.Str reply_sink, - Sexp.Str reply_name)) + Node.send_ignore' + (Bytes.of_string "factory") + (Message.create (Sexp.Str type_, + Sexp.Arr [Sexp.Str exchange], + Sexp.Str reply_sink, + Sexp.Str reply_name)) | Queue_declare (queue, passive, durable, exclusive, auto_delete, no_wait, arguments) -> - let queue = (if queue = "" then Uuid.create () else queue) in - conn.recent_queue_name <- Some (Node.name_of_string queue); - Node.send_ignore' "factory" (Message.create (Sexp.Str "queue", - Sexp.Arr [Sexp.Str queue], - Sexp.Str conn.name.Node.label, - Sexp.Str "Queue_declare_reply")) - | Queue_bind (queue, "", routing_key, no_wait, arguments) -> + let queue = (if queue = Bytes.empty then Bytes.of_string (Uuid.create ()) else queue) in + conn.recent_queue_name <- Some (Node.name_of_bytes queue); + Node.send_ignore' + (Bytes.of_string "factory") + (Message.create (Sexp.litstr "queue", + Sexp.Arr [Sexp.Str queue], + Sexp.Str conn.name.Node.label, + Sexp.litstr "Queue_declare_reply")) + | Queue_bind (queue, maybe_empty, routing_key, no_wait, arguments) when maybe_empty = Bytes.empty -> (* Qpid does this bizarre thing of binding to the default exchange. *) if no_wait then return () @@ -312,54 +321,54 @@ let handle_method conn channel m = | Queue_bind (queue, exchange, routing_key, no_wait, arguments) -> let queue = expand_mrdq conn queue in if not (Node.approx_exists queue) - then send_warning conn not_found ("Queue '"^queue.Node.label^"' not found") + then send_warning conn not_found ("Queue '"^(Bytes.to_string queue.Node.label)^"' not found") else (match_lwt Node.send' exchange (Message.subscribe (Sexp.Str routing_key, Sexp.Str queue.Node.label, - Sexp.Str "", + Sexp.emptystr, Sexp.Str conn.name.Node.label, - Sexp.Str "Queue_bind_reply")) with + Sexp.litstr "Queue_bind_reply")) with | true -> return () - | false -> send_warning conn not_found ("Exchange '"^exchange^"' not found")) + | false -> send_warning conn not_found ("Exchange '"^(Bytes.to_string exchange)^"' not found")) | Basic_consume (queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) -> let queue = expand_mrdq conn queue in - let consumer_tag = (if consumer_tag = "" then Uuid.create () else consumer_tag) in + let consumer_tag = (if consumer_tag = Bytes.empty then (Bytes.of_string (Uuid.create ())) else consumer_tag) in (match_lwt Node.send queue (Message.subscribe - (Sexp.Str "", + (Sexp.emptystr, Sexp.Str conn.name.Node.label, - Sexp.Arr [Sexp.Str "delivery"; Sexp.Str consumer_tag], + Sexp.Arr [Sexp.litstr "delivery"; Sexp.Str consumer_tag], Sexp.Str conn.name.Node.label, - Sexp.Arr [Sexp.Str "Basic_consume_reply"; + Sexp.Arr [Sexp.litstr "Basic_consume_reply"; Sexp.Str consumer_tag])) with | true -> return () - | false -> send_warning conn not_found ("Queue '"^queue.Node.label^"' not found")) + | false -> send_warning conn not_found ("Queue '"^(Bytes.to_string queue.Node.label)^"' not found")) | Basic_publish (exchange, routing_key, false, false) -> lwt (_, (body_size, properties)) = next_header conn in lwt body = recv_content_body conn body_size in let (pseudotype, sink, name) = - if exchange = "" - then ("Queue", routing_key, "") + if exchange = Bytes.empty + then ("Queue", routing_key, Bytes.empty) else ("Exchange", exchange, routing_key) in (match_lwt Node.post' sink (Sexp.Str name) - (Sexp.Arr [Sexp.Hint {Sexp.hint = "amqp"; Sexp.body = ""}; + (Sexp.Arr [Sexp.Hint {Sexp.hint = (Bytes.of_string "amqp"); Sexp.body = Bytes.empty}; Sexp.Str exchange; Sexp.Str routing_key; sexp_of_properties properties; Sexp.Str body]) - (Sexp.Str "") + Sexp.emptystr with | true -> return () - | false -> send_warning conn not_found (pseudotype^" '"^sink^"' not found")) + | false -> send_warning conn not_found (pseudotype^" '"^(Bytes.to_string sink)^"' not found")) | Basic_ack (delivery_tag, multiple) -> return () | Basic_qos (_, _, _) -> ignore (Log.warn "Ignoring Basic_qos instruction from client" []); send_method conn channel Basic_qos_ok | Channel_flow (on) -> - ignore (Log.warn "Ignoring Channel_flow setting" [Sexp.Str (string_of_bool on)]); + ignore (Log.warn "Ignoring Channel_flow setting" [Sexp.str (string_of_bool on)]); send_method conn channel (Channel_flow_ok on) | _ -> let (cid, mid) = method_index m in @@ -367,37 +376,53 @@ let handle_method conn channel m = (method_name cid mid)) let server_properties = table_of_list [ - ("product", Table_string App_info.product); - ("version", Table_string App_info.version); - ("copyright", Table_string App_info.copyright); - ("licence", Table_string App_info.licence_blurb); - ("capabilities", Table_table (table_of_list [])); + ((Bytes.of_string "product"), Table_string App_info.product); + ((Bytes.of_string "version"), Table_string App_info.version); + ((Bytes.of_string "copyright"), Table_string App_info.copyright); + ((Bytes.of_string "licence"), Table_string App_info.licence_blurb); + ((Bytes.of_string "capabilities"), Table_table (table_of_list [])); ] let check_login_details mechanism response = - match mechanism with - | "PLAIN" -> - (match (Str.split (Str.regexp "\000") response) with - | ["guest"; "guest"] -> () - | _ -> die access_refused "Access refused") - | "AMQPLAIN" -> - (let fields = decode_named_fields (Ibuffer.of_string response) in - match (field_lookup_some "LOGIN" fields, field_lookup_some "PASSWORD" fields) with - | (Some (Table_string "guest"), Some (Table_string "guest")) -> () - | _ -> die access_refused "Access refused") - | _ -> die access_refused "Bad auth mechanism" + match Bytes.to_string mechanism with + | "PLAIN" -> + if (match Bytes.index_opt response '\000' with + | Some pos -> + let user = Bytes.sub response 0 pos in + let pass = Bytes.sub response (pos + 1) ((Bytes.length response) - (pos + 1)) in + (Bytes.to_string user) = "guest" && (Bytes.to_string pass) = "guest" + | None -> false) + then () + else die access_refused "Access refused" + | "AMQPLAIN" -> + (let fields = decode_named_fields (Ibuffer.of_bytes response) in + if + (match (field_lookup_some (Bytes.of_string "LOGIN") fields, + field_lookup_some (Bytes.of_string "PASSWORD") fields) with + | (Some (Table_string user), Some (Table_string pass)) + when user = Bytes.of_string "guest" && pass = Bytes.of_string "guest" -> true + | _ -> false) + then + () + else + die access_refused "Access refused") + | _ -> + die access_refused "Bad auth mechanism" let tune_connection conn frame_max = with_conn_mutex conn (fun () -> - conn.input_buf <- String.create frame_max; + conn.input_buf <- Bytes.create frame_max; conn.output_buf <- Obuffer.create frame_max; conn.frame_max <- frame_max; return ()) let handshake_and_tune conn = let (major_version, minor_version, revision) = version in - lwt () = send_method conn 0 (Connection_start (major_version, minor_version, server_properties, - "PLAIN AMQPLAIN", "en_US")) in + lwt () = send_method conn 0 (Connection_start (major_version, + minor_version, + server_properties, + (Bytes.of_string "PLAIN AMQPLAIN"), + (Bytes.of_string "en_US"))) in lwt (client_properties, mechanism, response, locale) = match_lwt next_method conn with | (0, Connection_start_ok props) -> return props @@ -443,11 +468,17 @@ let start (s, peername) = amqp_boot amqp_handler amqp_mainloop (s, peername) let init () = - lwt () = Node.send_ignore' "factory" (Message.create (Sexp.Str "direct", - Sexp.Arr [Sexp.Str "amq.direct"], - Sexp.Str "", Sexp.Str "")) in - lwt () = Node.send_ignore' "factory" (Message.create (Sexp.Str "fanout", - Sexp.Arr [Sexp.Str "amq.fanout"], - Sexp.Str "", Sexp.Str "")) in + lwt () = Node.send_ignore' + (Bytes.of_string "factory") + (Message.create (Sexp.litstr "direct", + Sexp.Arr [Sexp.litstr "amq.direct"], + Sexp.emptystr, + Sexp.emptystr)) in + lwt () = Node.send_ignore' + (Bytes.of_string "factory") + (Message.create (Sexp.litstr "fanout", + Sexp.Arr [Sexp.litstr "amq.fanout"], + Sexp.emptystr, + Sexp.emptystr)) in let port = Config.get_int "amqp.port" Amqp_spec.port in - Util.create_daemon_thread "AMQP listener" None (Net.start_net "AMQP" port) start + Util.create_daemon_thread (Bytes.of_string "AMQP listener") None (Net.start_net "AMQP" port) start diff --git a/server/amqp_wireformat.ml b/server/amqp_wireformat.ml index 0deb634..8a1a704 100644 --- a/server/amqp_wireformat.ml +++ b/server/amqp_wireformat.ml @@ -25,16 +25,16 @@ type octet_t = int type short_t = int type long_t = int32 type longlong_t = int64 -type shortstr_t = string -type longstr_t = string +type shortstr_t = bytes +type longstr_t = bytes type bit_t = bool type timestamp_t = int64 type table_t = { mutable table_body: table_body_t } and table_body_t = - | Encoded_table of string - | Decoded_table of (string * table_value_t) list - | Both_table of (string * (string * table_value_t) list) + | Encoded_table of bytes + | Decoded_table of (bytes * table_value_t) list + | Both_table of (bytes * (bytes * table_value_t) list) and table_value_t = | Table_bool of bool (* t *) | Table_signed_byte of int (* b *) @@ -45,11 +45,11 @@ and table_value_t = | Table_unsigned_long of int32 (* i *) | Table_signed_longlong of int64 (* L *) | Table_unsigned_longlong of int64 (* l *) - | Table_float of string (* f -- there seems to be no I/O for binary floats? *) - | Table_double of string (* d -- there seems to be no I/O for binary floats? *) + | Table_float of bytes (* f -- there seems to be no I/O for binary floats? *) + | Table_double of bytes (* d -- there seems to be no I/O for binary floats? *) | Table_decimal of (int * int32) (* D *) - | Table_short_string of string (* s *) - | Table_string of string (* S *) + | Table_short_string of bytes (* s *) + | Table_string of bytes (* S *) | Table_array of table_value_t list (* A *) | Table_timestamp of int64 (* T *) | Table_table of table_t (* F *) @@ -131,7 +131,7 @@ and read_table_value input_buf = and decoded_table t = match t.table_body with | Encoded_table s -> - let fs = decode_named_fields (Ibuffer.create s 0 (String.length s)) in + let fs = decode_named_fields (Ibuffer.create s 0 (Bytes.length s)) in t.table_body <- Both_table (s, fs); fs | Decoded_table fs -> fs @@ -157,11 +157,11 @@ let write_longlong output_buf x = write_octet output_buf ((Int64.to_int (Int64.shift_right_logical x 8)) land 255); write_octet output_buf ((Int64.to_int x) land 255) let write_shortstr output_buf x = - let len = String.length x in + let len = Bytes.length x in write_octet output_buf len; Obuffer.add_string output_buf x let write_longstr output_buf x = - write_long output_buf (Int32.of_int (String.length x)); + write_long output_buf (Int32.of_int (Bytes.length x)); Obuffer.add_string output_buf x let write_timestamp output_buf x = write_longlong output_buf x @@ -221,17 +221,17 @@ and encoded_table t = and write_table output_buf x = write_longstr output_buf (encoded_table x) -let sexp_of_octet x = Str (string_of_int x) -let sexp_of_short x = Str (string_of_int x) -let sexp_of_long x = Str (Int32.to_string x) -let sexp_of_longlong x = Str (Int64.to_string x) +let sexp_of_octet x = Str (Sexp.bytes_of_int x) +let sexp_of_short x = Str (Sexp.bytes_of_int x) +let sexp_of_long x = Str (Bytes.of_string (Int32.to_string x)) +let sexp_of_longlong x = Str (Bytes.of_string (Int64.to_string x)) let sexp_of_shortstr x = Str x let sexp_of_longstr x = Str x -let sexp_of_bit x = if x then Str "1" else Str "" -let sexp_of_timestamp x = Str (Int64.to_string x) +let sexp_of_bit x = if x then Sexp.litstr "1" else Sexp.emptystr +let sexp_of_timestamp x = Str (Bytes.of_string (Int64.to_string x)) let rec - sexp_of_table x = Arr ((Hint {hint = "table"; body = ""}) :: + sexp_of_table x = Arr ((Hint {hint = Bytes.of_string "table"; body = Bytes.empty}) :: (List.map sexp_of_named_field (decoded_table x))) and sexp_of_named_field (s, f) = let (t, v) = tag_val f in @@ -240,10 +240,10 @@ and sexp_of_unnamed_field f = let (t, v) = tag_val f in Arr [t; v] and tag_val f = - let h hs v = (Str hs, v) in + let h hs v = (Sexp.litstr hs, v) in match f with - | Table_bool true -> h "t" (Str "1") - | Table_bool false -> h "t" (Str "") + | Table_bool true -> h "t" (Sexp.litstr "1") + | Table_bool false -> h "t" Sexp.emptystr | Table_signed_byte v -> h "b" (sexp_of_octet (signed_to_unsigned v 256)) | Table_unsigned_byte v -> h "B" (sexp_of_octet v) | Table_signed_short v -> h "U" (sexp_of_short (signed_to_unsigned v 65536)) @@ -254,8 +254,8 @@ and tag_val f = | Table_unsigned_longlong v -> h "l" (sexp_of_longlong v) | Table_float v -> h "f" (Str v) | Table_double v -> h "d" (Str v) - | Table_decimal (scale, v) -> h "D" (Arr [Arr [Str "scale"; sexp_of_octet scale]; - Arr [Str "value"; sexp_of_long v]]) + | Table_decimal (scale, v) -> h "D" (Arr [Arr [Sexp.litstr "scale"; sexp_of_octet scale]; + Arr [Sexp.litstr "value"; sexp_of_long v]]) | Table_short_string v -> h "s" (Str v) | Table_string v -> h "S" (Str v) | Table_array vs -> h "A" (Arr (List.map sexp_of_unnamed_field vs)) @@ -269,58 +269,60 @@ let reserved_value_octet = 0 let reserved_value_short = 0 let reserved_value_long = Int32.zero let reserved_value_longlong = Int64.zero -let reserved_value_shortstr = "" -let reserved_value_longstr = "" +let reserved_value_shortstr = Bytes.empty +let reserved_value_longstr = Bytes.empty let reserved_value_bit = false let reserved_value_timestamp = Int64.zero -let reserved_value_table = { table_body = Encoded_table "" } +let reserved_value_table = { table_body = Encoded_table Bytes.empty } -let octet_of_sexp v = match v with Str x -> int_of_string x | _ -> reserved_value_octet -let short_of_sexp v = match v with Str x -> int_of_string x | _ -> reserved_value_short -let long_of_sexp v = match v with Str x -> Int32.of_string x | _ -> reserved_value_long -let longlong_of_sexp v = match v with Str x -> Int64.of_string x | _ -> reserved_value_longlong +let octet_of_sexp v = match v with Str x -> Sexp.int_of_bytes x | _ -> reserved_value_octet +let short_of_sexp v = match v with Str x -> Sexp.int_of_bytes x | _ -> reserved_value_short +let long_of_sexp v = match v with Str x -> Int32.of_string (Bytes.to_string x) | _ -> reserved_value_long +let longlong_of_sexp v = match v with Str x -> Int64.of_string (Bytes.to_string x) | _ -> reserved_value_longlong let shortstr_of_sexp v = match v with Str x -> x | _ -> reserved_value_shortstr let longstr_of_sexp v = match v with Str x -> x | _ -> reserved_value_longstr -let bit_of_sexp v = match v with Str x -> x <> "" | _ -> reserved_value_bit -let timestamp_of_sexp v = match v with Str x -> Int64.of_string x | _ -> reserved_value_timestamp +let bit_of_sexp v = match v with Str x -> x <> Bytes.empty | _ -> reserved_value_bit +let timestamp_of_sexp v = match v with Str x -> Int64.of_string (Bytes.to_string x) | _ -> reserved_value_timestamp let rec table_of_sexp v = match v with - | Arr ((Hint {hint = "table"; body = ""}) :: field_sexps) -> + | Arr ((Hint {hint = hint; body = body}) :: field_sexps) + when hint = Bytes.of_string "table" && body = Bytes.empty -> table_of_list (List.map named_sexp_field field_sexps) | _ -> table_of_list [] and named_sexp_field v = match v with | Arr [Str s; Str t; f] -> (s, untag_val (t, f)) - | _ -> ("", Table_void) + | _ -> (Bytes.empty, Table_void) and field_of_sexp v = match v with | Arr [Str t; f] -> untag_val (t, f) | _ -> Table_void and untag_val (t, v) = - match (t, v) with - | ("t", Str x) -> Table_bool (x <> "") - | ("b", v) -> Table_signed_byte (unsigned_to_signed (octet_of_sexp v) 256) - | ("B", v) -> Table_unsigned_byte (octet_of_sexp v) - | ("U", v) -> Table_signed_short (unsigned_to_signed (short_of_sexp v) 65536) - | ("u", v) -> Table_unsigned_short (short_of_sexp v) - | ("I", v) -> Table_signed_long (long_of_sexp v) - | ("i", v) -> Table_unsigned_long (long_of_sexp v) - | ("L", v) -> Table_signed_longlong (longlong_of_sexp v) - | ("l", v) -> Table_unsigned_longlong (longlong_of_sexp v) - | ("f", Str v) -> Table_float v - | ("d", Str v) -> Table_double v - | ("D", Arr [Arr [Str "scale"; scale]; - Arr [Str "value"; v]]) -> - Table_decimal (octet_of_sexp scale, long_of_sexp v) - | ("s", Str v) -> Table_short_string v - | ("S", Str v) -> Table_string v - | ("A", Arr vs) -> Table_array (List.map field_of_sexp vs) - | ("T", v) -> Table_timestamp (longlong_of_sexp v) - | ("F", v) -> Table_table (table_of_sexp v) - | ("V", Arr []) -> Table_void - | _ -> Table_void + if Bytes.length t <> 1 then Table_void else + match (Bytes.get t 0, v) with + | ('t', Str x) -> Table_bool (x <> Bytes.empty) + | ('b', v) -> Table_signed_byte (unsigned_to_signed (octet_of_sexp v) 256) + | ('B', v) -> Table_unsigned_byte (octet_of_sexp v) + | ('U', v) -> Table_signed_short (unsigned_to_signed (short_of_sexp v) 65536) + | ('u', v) -> Table_unsigned_short (short_of_sexp v) + | ('I', v) -> Table_signed_long (long_of_sexp v) + | ('i', v) -> Table_unsigned_long (long_of_sexp v) + | ('L', v) -> Table_signed_longlong (longlong_of_sexp v) + | ('l', v) -> Table_unsigned_longlong (longlong_of_sexp v) + | ('f', Str v) -> Table_float v + | ('d', Str v) -> Table_double v + | ('D', Arr [Arr [Str maybe_scale; scale]; Arr [Str maybe_value; v]]) + when maybe_scale = Bytes.of_string "scale" && maybe_value = Bytes.of_string "value" -> + Table_decimal (octet_of_sexp scale, long_of_sexp v) + | ('s', Str v) -> Table_short_string v + | ('S', Str v) -> Table_string v + | ('A', Arr vs) -> Table_array (List.map field_of_sexp vs) + | ('T', v) -> Table_timestamp (longlong_of_sexp v) + | ('F', v) -> Table_table (table_of_sexp v) + | ('V', Arr []) -> Table_void + | _ -> Table_void let field_lookup k def fs = try List.assoc k fs diff --git a/server/app_info.ml b/server/app_info.ml index 710f9ad..049c1d2 100644 --- a/server/app_info.ml +++ b/server/app_info.ml @@ -15,11 +15,11 @@ (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) -let product = "hop" -let version = "ALPHA" -let copyright = "Copyright (C) 2012 Tony Garnock-Jones." -let homepage = "the GNU General Public License (version 3 or later)" (* TODO: real homepage *) +let product = (Bytes.of_string "hop") +let version = (Bytes.of_string "ALPHA") +let copyright = (Bytes.of_string "Copyright (C) 2012 Tony Garnock-Jones.") +let homepage = (Bytes.of_string "the GNU General Public License (version 3 or later)") (* TODO: real homepage *) let licence_blurb = - "This program comes with ABSOLUTELY NO WARRANTY. This is free software,\n"^ - "and you are welcome to redistribute it under certain conditions.\n"^ - "See "^homepage^" for details." + (Bytes.of_string ("This program comes with ABSOLUTELY NO WARRANTY. This is free software,\n"^ + "and you are welcome to redistribute it under certain conditions.\n"^ + "See "^(Bytes.to_string homepage)^" for details.")) diff --git a/server/codegen.py b/server/codegen.py index 4b9d1c1..fea16a4 100644 --- a/server/codegen.py +++ b/server/codegen.py @@ -81,7 +81,7 @@ def print_codec(): if t.argnames: print_list(' (', [n for n in t.argnames], ', ', ')') print ' ->' - sys.stdout.write(' Arr [Str "%s"' % t.wire_selector) + sys.stdout.write(' Arr [Sexp.litstr "%s"' % t.wire_selector) if t.argnames: print_list('; ', t.argnames, '; ', '') print ']' @@ -89,10 +89,10 @@ def print_codec(): print print 'let message_of_sexp s = match s with' for t in spec: - sys.stdout.write(' | Arr [Str "%s"' % t.wire_selector) + sys.stdout.write(' | Arr [Str label_bs') if t.argnames: print_list('; ', t.argnames, '; ', '') - print '] ->' + print ('] when label_bs = Bytes.of_string "%s" ->' % t.wire_selector) sys.stdout.write(' %s' % t.constructor) if t.argnames: print_list(' (', [n for n in t.argnames], ', ', ')') diff --git a/server/config.ml b/server/config.ml index c0bf8ed..755d57d 100644 --- a/server/config.ml +++ b/server/config.ml @@ -64,15 +64,14 @@ let init () = then loop (index + 1) (String.sub opt 2 (String.length opt - 2)) else let v = (try Json.of_string opt with _ -> Json.Str opt) in - ignore (Log.info "Setting command-line parameter" - [Sexp.Str current_key; Sexpjson.sexp_of_json v]); + ignore (Log.info "Setting command-line parameter" [Sexp.str current_key; Sexpjson.sexp_of_json v]); set current_key v; loop (index + 1) current_key) in loop 1 ""; (match get "config-file" with | Some (Json.Str config_filename) -> - ignore (Log.info "Reading configuration file" [Sexp.Str config_filename]); + ignore (Log.info "Reading configuration file" [Sexp.str config_filename]); let file_config = try Json.load config_filename with Sys_error _ -> Json.Rec [] in config := Json.merge_right file_config !config | _ -> diff --git a/server/connections.ml b/server/connections.ml index 057fecb..df2a6bc 100644 --- a/server/connections.ml +++ b/server/connections.ml @@ -28,22 +28,21 @@ let endpoint_name n = | _ -> "??unknown??" let connection_main class_name peername cin cout issue_banner boot_fn node_fn mainloop = - ignore (Log.info ("Accepted "^class_name) [Str (endpoint_name peername)]); + ignore (Log.info ("Accepted "^class_name) [str (endpoint_name peername)]); match_lwt issue_banner cin cout with | true -> lwt shared_state = boot_fn (peername, cin, cout) in - let n = Node.make class_name (node_fn shared_state) in + let n = Node.make (Bytes.of_string class_name) (node_fn shared_state) in lwt () = (try_lwt mainloop shared_state n with | End_of_file -> - Log.info ("Disconnecting "^class_name^" normally") [Str (endpoint_name peername)] + Log.info ("Disconnecting "^class_name^" normally") [str (endpoint_name peername)] | Sys_error message -> - Log.warn ("Disconnected "^class_name^" by Sys_error") - [Str (endpoint_name peername); Str message] + Log.warn ("Disconnected "^class_name^" by Sys_error") [str (endpoint_name peername); str message] | exn -> - Log.error ("Uncaught exception in "^class_name) [Str (Printexc.to_string exn)]) + Log.error ("Uncaught exception in "^class_name) [str (Printexc.to_string exn)]) in Node.unbind_all n | false -> @@ -60,7 +59,7 @@ let start_connection' class_name issue_banner boot_fn node_fn mainloop (s, peern let start_connection class_name issue_banner boot_fn node_fn mainloop (s, peername) = Util.create_thread - (endpoint_name peername ^ " input") + (Bytes.of_string (endpoint_name peername ^ " input")) None (start_connection' class_name issue_banner boot_fn node_fn mainloop) (s, peername) diff --git a/server/datastructures.ml b/server/datastructures.ml index 093c136..2ce1ce2 100644 --- a/server/datastructures.ml +++ b/server/datastructures.ml @@ -15,12 +15,12 @@ (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) -module StringSet = Set.Make(String) -module StringMap = Map.Make(String) +module BytesSet = Set.Make(Bytes) +module BytesMap = Map.Make(Bytes) module SexpMap = Map.Make(Sexp) -module UuidSet = StringSet +module UuidSet = BytesSet -let string_map_keys m = StringMap.fold (fun k _ acc -> k :: acc) m [] +let bytes_map_keys m = BytesMap.fold (fun k _ acc -> k :: acc) m [] let classify f xs = let rec loop acc xs = @@ -30,10 +30,10 @@ let classify f xs = (match f x with | Some (classification, v) -> loop - (StringMap.add + (BytesMap.add classification - (v :: (try StringMap.find classification acc with Not_found -> [])) + (v :: (try BytesMap.find classification acc with Not_found -> [])) acc) xs' | None -> loop acc xs') - in loop StringMap.empty xs + in loop BytesMap.empty xs diff --git a/server/directnode.ml b/server/directnode.ml index a9e7993..793cc78 100644 --- a/server/directnode.ml +++ b/server/directnode.ml @@ -23,22 +23,22 @@ open Status type t = { name: Node.name; subscriptions: Subscription.set_t; - mutable routing_table: UuidSet.t StringMap.t; + mutable routing_table: UuidSet.t BytesMap.t; } let classname = "direct" let unsubscribe info uuid = - match_lwt Subscription.delete info.name info.subscriptions uuid with + match_lwt Subscription.delete info.name info.subscriptions (Bytes.of_string uuid) with | Some sub -> (match sub.Subscription.filter with | Str binding_key -> (try - let old_set = StringMap.find binding_key info.routing_table in - let new_set = UuidSet.remove sub.Subscription.uuid old_set in + let old_set = BytesMap.find binding_key info.routing_table in + let new_set = UuidSet.remove (Bytes.of_string sub.Subscription.uuid) old_set in if UuidSet.is_empty new_set - then info.routing_table <- StringMap.remove binding_key info.routing_table - else info.routing_table <- StringMap.add binding_key new_set info.routing_table + then info.routing_table <- BytesMap.remove binding_key info.routing_table + else info.routing_table <- BytesMap.add binding_key new_set info.routing_table with Not_found -> ()); return () @@ -49,7 +49,7 @@ let route_message info n sexp = match Message.message_of_sexp sexp with | Message.Post (Str name, body, token) -> let routing_snapshot = info.routing_table in - let matching = (try StringMap.find name routing_snapshot with Not_found -> UuidSet.empty) in + let matching = (try BytesMap.find name routing_snapshot with Not_found -> UuidSet.empty) in Lwt_list.iter_s (fun (uuid) -> match Subscription.lookup info.subscriptions uuid with @@ -65,14 +65,14 @@ let route_message info n sexp = with | Subscription.New sub -> let old_set = - (try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in - let new_set = UuidSet.add sub.Subscription.uuid old_set in - info.routing_table <- StringMap.add binding_key new_set info.routing_table; + (try BytesMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in + let new_set = UuidSet.add (Bytes.of_string sub.Subscription.uuid) old_set in + info.routing_table <- BytesMap.add binding_key new_set info.routing_table; return () | Subscription.Old sub -> return ()) | Message.Unsubscribe (Str token) -> - unsubscribe info token + unsubscribe info (Bytes.to_string token) | m -> Util.message_not_understood classname m @@ -80,15 +80,15 @@ let factory arg = match arg with | (Arr [Str name_str]) -> let info = { - name = Node.name_of_string name_str; + name = Node.name_of_bytes name_str; subscriptions = Subscription.new_set (); - routing_table = StringMap.empty; + routing_table = BytesMap.empty; } in replace_ok - (Node.make_idempotent_named classname info.name return (route_message info)) + (Node.make_idempotent_named (Bytes.of_string classname) info.name return (route_message info)) (Str name_str) | _ -> - return (Problem (Str "bad-arg")) + return (Problem (litstr "bad-arg")) let init () = - Factory.register_class classname factory + Factory.register_class (Bytes.of_string classname) factory diff --git a/server/factory.ml b/server/factory.ml index 133f78c..66f091e 100644 --- a/server/factory.ml +++ b/server/factory.ml @@ -22,22 +22,22 @@ open Datastructures type factory_t = Sexp.t -> (Sexp.t, Sexp.t) Status.t Lwt.t -let classes = ref StringMap.empty +let classes = ref BytesMap.empty let register_class name factory = - if StringMap.mem name !classes + if BytesMap.mem name !classes then (ignore (Log.error "Duplicate node class name" [Str name]); - Server_control.shutdown_now [Str "Duplicate node class name"; Str name]; + Server_control.shutdown_now [litstr "Duplicate node class name"; Str name]; Lwt_unix.yield ()) else (ignore (Log.info "Registered node class" [Str name]); - classes := StringMap.add name factory !classes; + classes := BytesMap.add name factory !classes; return ()) let all_class_names () = - Datastructures.string_map_keys !classes + Datastructures.bytes_map_keys !classes let lookup_class name = - try Some (StringMap.find name !classes) + try Some (BytesMap.find name !classes) with Not_found -> None let factory_handler n sexp = @@ -54,14 +54,14 @@ let factory_handler n sexp = | Status.Problem explanation -> ignore (Log.info "Node create failed" [Str classname; arg; Str reply_sink; Str reply_name; explanation]); - return (Message.create_failed (Arr [Str "constructor"; explanation]))) + return (Message.create_failed (Arr [litstr "constructor"; explanation]))) | None -> ignore (Log.warn "Node class not found" [Str classname]); - return (Message.create_failed (Arr [Str "factory"; Str "class-not-found"])) + return (Message.create_failed (Arr [litstr "factory"; litstr "class-not-found"])) in - Node.post_ignore' reply_sink (Str reply_name) reply (Str "") + Node.post_ignore' reply_sink (Str reply_name) reply emptystr | m -> Util.message_not_understood "factory" m let init () = - Node.bind_ignore (Node.name_of_string "factory", Node.make "factory" factory_handler) + Node.bind_ignore (Node.name_of_bytes (Bytes.of_string "factory"), Node.make (Bytes.of_string "factory") factory_handler) diff --git a/server/fanoutnode.ml b/server/fanoutnode.ml index dcfdf7a..24c1a09 100644 --- a/server/fanoutnode.ml +++ b/server/fanoutnode.ml @@ -28,7 +28,7 @@ type t = { let classname = "fanout" let unsubscribe info uuid = - lwt _ = Subscription.delete info.name info.subscriptions uuid in return () + lwt _ = Subscription.delete info.name info.subscriptions (Bytes.of_string uuid) in return () let route_message info n sexp = match Message.message_of_sexp sexp with @@ -37,13 +37,13 @@ let route_message info n sexp = Lwt_list.iter_s (fun (uuid, sub) -> lwt _ = Subscription.send_to_subscription' sub body (unsubscribe info) in return ()) - (StringMap.bindings snapshot) + (BytesMap.bindings snapshot) | Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) -> lwt _ = (Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name) in return () | Message.Unsubscribe (Str token) -> - unsubscribe info token + unsubscribe info (Bytes.to_string token) | m -> Util.message_not_understood classname m @@ -51,14 +51,14 @@ let factory arg = match arg with | (Arr [Str name_str]) -> let info = { - name = Node.name_of_string name_str; + name = Node.name_of_bytes name_str; subscriptions = Subscription.new_set () } in replace_ok - (Node.make_idempotent_named classname info.name return (route_message info)) + (Node.make_idempotent_named (Bytes.of_string classname) info.name return (route_message info)) (Str name_str) | _ -> - return (Problem (Str "bad-arg")) + return (Problem (litstr "bad-arg")) let init () = - Factory.register_class classname factory + Factory.register_class (Bytes.of_string classname) factory diff --git a/server/gpath.ml b/server/gpath.ml index 44e5156..e12e059 100644 --- a/server/gpath.ml +++ b/server/gpath.ml @@ -17,7 +17,7 @@ type 'a t = | Index of int - | Field of string + | Field of bytes | Push type 'a adapter_t = { @@ -26,8 +26,8 @@ type 'a adapter_t = { push: 'a -> 'a -> 'a; empty_array: unit -> 'a; - get_field: string -> 'a -> 'a; - set_field: string -> 'a -> 'a -> 'a; + get_field: bytes -> 'a -> 'a; + set_field: bytes -> 'a -> 'a -> 'a; empty_record: unit -> 'a; } @@ -43,9 +43,7 @@ let parse_single b = Ibuffer.skip_byte b; (* drop the open bracket *) let istr = Ibuffer.until_char ']' b in Ibuffer.skip_byte b; (* drop the close bracket *) - (match istr with - | "+" -> Push - | _ -> Index (int_of_string istr)) + if istr = Bytes.of_string "+" then Push else Index (int_of_string (Bytes.to_string istr)) | '.' -> Ibuffer.skip_byte b; parse_fieldref b @@ -59,14 +57,14 @@ let rec parse b = with End_of_file -> [] -let of_string s = parse (Ibuffer.of_string s) +let of_string s = parse (Ibuffer.of_bytes (Bytes.of_string s)) let to_string ps = let rec walk is_first ps = match ps with | [] -> "" | Index i :: rest -> "[" ^ string_of_int i ^ "]" ^ walk false rest - | Field s :: rest -> (if is_first then "" else ".") ^ s ^ walk false rest + | Field s :: rest -> (if is_first then "" else ".") ^ (Bytes.to_string s) ^ walk false rest | Push :: rest -> "[+]" ^ walk false rest in walk true ps diff --git a/server/hop_server.ml b/server/hop_server.ml index cb71e80..6efbd58 100644 --- a/server/hop_server.ml +++ b/server/hop_server.ml @@ -17,12 +17,12 @@ open Lwt -let n_system_log = Node.name_of_string "system.log" +let n_system_log = Node.name_of_bytes (Bytes.of_string "system.log") let hook_log () = let old_hook = !Log.hook in let new_hook label body = - ignore (Node.post n_system_log (Sexp.Str label) body (Sexp.Str "")); + ignore (Node.post n_system_log (Sexp.str label) body Sexp.emptystr); old_hook label body in Log.hook := new_hook @@ -30,7 +30,7 @@ let hook_log () = let create_ready_file () = match Config.get "ready-file" with | Some (Json.Str ready_file_path) -> - ignore (Log.info "Creating ready file" [Sexp.Str ready_file_path]); + ignore (Log.info "Creating ready file" [Sexp.str ready_file_path]); return (close_out (open_out ready_file_path)) | Some other -> ignore (Log.error "Ready file path not a string" [Sexpjson.sexp_of_json other]); @@ -40,12 +40,15 @@ let create_ready_file () = let console_watcher () = lwt _ = Lwt_io.read_line Lwt_io.stdin in - Server_control.milestone "Shutdown requested"; + Server_control.milestone (Bytes.of_string "Shutdown requested"); return () lwt _ = Printf.printf "%s %s, %s\n%s\n%!" - App_info.product App_info.version App_info.copyright App_info.licence_blurb; + (Bytes.to_string App_info.product) + (Bytes.to_string App_info.version) + (Bytes.to_string App_info.copyright) + (Bytes.to_string App_info.licence_blurb); Sys.set_signal Sys.sigpipe Sys.Signal_ignore; Uuid.init (); Config.init (); @@ -57,20 +60,20 @@ lwt _ = hook_log (); lwt () = Config.conditional "amqp.enabled" true (fun () -> ignore (Amqp_relay.init ()); - Server_control.run_until "AMQP ready") + Server_control.run_until (Bytes.of_string "AMQP ready")) in lwt () = Config.conditional "http.enabled" true (fun () -> ignore (Ui_main.init ()); ignore (Ui_relay.init ()); - Server_control.run_until "HTTP ready") + Server_control.run_until (Bytes.of_string "HTTP ready")) in lwt () = Config.conditional "hop.enabled" true (fun () -> ignore (Relay.init ()); - Server_control.run_until "Hop ready") + Server_control.run_until (Bytes.of_string "Hop ready")) in ignore (console_watcher ()); if Server_control.is_running () then (lwt () = create_ready_file () in - Server_control.milestone "Server initialized"; - Server_control.run_until "Shutdown requested") + Server_control.milestone (Bytes.of_string "Server initialized"); + Server_control.run_until (Bytes.of_string "Shutdown requested")) else return () diff --git a/server/hopstr.ml b/server/hopstr.ml index 2028bcf..f9df40c 100644 --- a/server/hopstr.ml +++ b/server/hopstr.ml @@ -15,12 +15,15 @@ (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) -let string_of_revlist acc len = - let buf = String.make len ' ' in +let bytes_of_revlist acc len = + let buf = Bytes.make len ' ' in let rec fill cs i = match cs with | [] -> () - | c :: cs' -> (String.set buf i c; fill cs' (i - 1)) + | c :: cs' -> (Bytes.set buf i c; fill cs' (i - 1)) in fill acc (len - 1); buf + +let string_of_revlist acc len = + Bytes.to_string (bytes_of_revlist acc len) diff --git a/server/httpd.ml b/server/httpd.ml index c1f42af..709124a 100644 --- a/server/httpd.ml +++ b/server/httpd.ml @@ -20,14 +20,14 @@ open Hof type version = [`HTTP_1_0 | `HTTP_1_1] type resp_version = [version | `SAME_AS_REQUEST] -type content = Fixed of string | Variable of string Lwt_stream.t +type content = Fixed of bytes | Variable of bytes Lwt_stream.t type body = { headers: (string * string) list; content: content } -let empty_content = Fixed "" +let empty_content = Fixed Bytes.empty let empty_body = {headers = []; content = empty_content} type req = { @@ -81,12 +81,12 @@ let http_error code reason body = raise_lwt (HTTPError (code, reason, body)) let http_error_plain code reason = http_error code reason - {headers = [text_content_type_header]; content = Fixed reason} + {headers = [text_content_type_header]; content = Fixed (Bytes.of_string reason)} let http_error_html_doc code reason doc = http_error code reason {headers = [html_content_type_header]; - content = Variable (Html.stream_of_html_doc doc)} + content = Variable (Streamutil.stream_encode (Html.stream_of_html_doc doc))} let html_error_doc code reason extra_body = let code_str = string_of_int code in @@ -115,7 +115,7 @@ let resp_generic_ok headers content = let resp_html_doc code reason extra_headers doc = resp_generic code reason (html_content_type_header :: extra_headers) - (Variable (Html.stream_of_html_doc doc)) + (Variable (Streamutil.stream_encode (Html.stream_of_html_doc doc))) let resp_html_doc_ok extra_headers doc = resp_html_doc 200 "OK" extra_headers doc @@ -173,22 +173,21 @@ let render_header cout (k, v) = Lwt_io.write cout "\r\n" let render_chunk cout chunk = - match chunk with - | "" -> return () - | _ -> - lwt () = Lwt_io.write cout (Printf.sprintf "%x\r\n" (String.length chunk)) in - lwt () = Lwt_io.write cout chunk in - Lwt_io.write cout "\r\n" + let chunk_len = Bytes.length chunk in + if chunk_len = 0 then return () else + lwt () = Lwt_io.write cout (Printf.sprintf "%x\r\n" chunk_len) in + lwt () = Lwt_io.write_from_exactly cout chunk 0 chunk_len in + Lwt_io.write cout "\r\n" let render_fixed_content cout s headers_only = - lwt () = render_header cout ("Content-Length", string_of_int (String.length s)) in + lwt () = render_header cout ("Content-Length", string_of_int (Bytes.length s)) in lwt () = Lwt_io.write cout "\r\n" in - if headers_only then return () else Lwt_io.write cout s + if headers_only then return () else Lwt_io.write_from_exactly cout s 0 (Bytes.length s) -let string_of_content c = +let bytes_of_content c = match c with | Fixed s -> return s - | Variable s -> Streamutil.stream_to_string s + | Variable s -> Streamutil.stream_to_bytes s let render_content cout v c headers_only = match c with @@ -197,7 +196,7 @@ let render_content cout v c headers_only = | Variable s -> match v with | `HTTP_1_0 -> - lwt str = Streamutil.stream_to_string s in + lwt str = Streamutil.stream_to_bytes s in render_fixed_content cout str headers_only | `HTTP_1_1 -> if headers_only @@ -260,12 +259,12 @@ let parse_urlencoded q = Lwt_list.map_s parse_urlencoded_binding pieces let find_header' name hs = - let lc_name = String.lowercase name in + let lc_name = String.lowercase_ascii name in let rec search hs = match hs with | [] -> raise Not_found | (k, v) :: hs' -> - if String.lowercase k = lc_name + if String.lowercase_ascii k = lc_name then v else search hs' in @@ -299,7 +298,7 @@ let parse_chunks cin = fun () -> lwt hexlen_str = input_crlf cin in let chunk_len = Util.unhex hexlen_str in - let buffer = String.make chunk_len '\000' in + let buffer = Bytes.make chunk_len '\000' in lwt () = Lwt_io.read_into_exactly cin buffer 0 chunk_len in lwt chunk_terminator = input_crlf cin in if chunk_terminator <> "" @@ -319,7 +318,7 @@ let parse_body cin = return {headers = headers; content = empty_content} | Some length_str -> let length = int_of_string length_str in - let buffer = String.make length '\000' in + let buffer = Bytes.make length '\000' in lwt () = Lwt_io.read_into_exactly cin buffer 0 length in return {headers = headers; content = Fixed buffer}) | Some "chunked" -> @@ -424,9 +423,9 @@ let main handle_req (s, peername) = resp_body = body; completion_callbacks = [] } | Sys_error message -> - Log.info "Sys_error in httpd handler" [Sexp.Str message] + Log.info "Sys_error in httpd handler" [Sexp.str message] | exn -> - Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)] + Log.error "Uncaught exception in httpd handler" [Sexp.str (Printexc.to_string exn)] in lwt () = fire_pending_callbacks () in diff --git a/server/httpd_file.ml b/server/httpd_file.ml index ccd286c..7c0b288 100644 --- a/server/httpd_file.ml +++ b/server/httpd_file.ml @@ -30,7 +30,7 @@ let sanitize_path p = String.concat "/" (List.filter visible_path_component (Str.split sanitize_path_re p)) let extension_map ext = - match String.lowercase ext with + match String.lowercase_ascii ext with | ".txt" -> Httpd.text_content_type | ".html" | ".htm" -> Httpd.html_content_type | ".bin" -> "application/octet-stream" @@ -51,11 +51,11 @@ let analyze_path p = let stream_file handle = let buflen = 4096 in - let buffer = String.make buflen '\000' in + let buffer = Bytes.make buflen '\000' in fun () -> let count = input handle buffer 0 buflen in if count > 0 - then return (Some (String.sub buffer 0 count)) + then return (Some (Bytes.sub buffer 0 count)) else return None let rec read_dir dirhandle = diff --git a/server/ibuffer.ml b/server/ibuffer.ml index 5ce1d58..9473b56 100644 --- a/server/ibuffer.ml +++ b/server/ibuffer.ml @@ -18,7 +18,7 @@ type t = { mutable pos: int; limit: int; - buf: string; + buf: bytes; } let create s ofs len = { @@ -27,7 +27,7 @@ let create s ofs len = { buf = s } -let of_string s = create s 0 (String.length s) +let of_bytes s = create s 0 (Bytes.length s) let sub b ofs len = if b.pos + ofs + len > b.limit @@ -46,13 +46,13 @@ let skip_byte b = else raise End_of_file let skip_ws b = - while b.pos < b.limit && String.get b.buf b.pos <= ' ' do + while b.pos < b.limit && Bytes.get b.buf b.pos <= ' ' do b.pos <- b.pos + 1 done let peek_char b = if b.pos < b.limit - then String.get b.buf b.pos + then Bytes.get b.buf b.pos else raise End_of_file let peek_byte b = int_of_char (peek_char b) @@ -60,7 +60,7 @@ let peek_byte b = int_of_char (peek_char b) let next_char b = if b.pos < b.limit then - let v = String.get b.buf b.pos in + let v = Bytes.get b.buf b.pos in b.pos <- b.pos + 1; v else @@ -73,8 +73,8 @@ let next_chars b n = then raise End_of_file else - let dst = String.create n in - String.blit b.buf b.pos dst 0 n; + let dst = Bytes.create n in + Bytes.blit b.buf b.pos dst 0 n; b.pos <- b.pos + n; dst @@ -88,12 +88,12 @@ let until_pred pred b = if remaining b = 0 then if pred None - then Hopstr.string_of_revlist acc len + then Hopstr.bytes_of_revlist acc len else raise End_of_file else let ch = peek_char b in if pred (Some ch) - then Hopstr.string_of_revlist acc len + then Hopstr.bytes_of_revlist acc len else loop (next_char b :: acc) (len + 1) in loop [] 0 diff --git a/server/json.ml b/server/json.ml index 4c849c9..a7bda0d 100644 --- a/server/json.ml +++ b/server/json.ml @@ -135,7 +135,7 @@ let rec parse_str b (acc, len) = | 'n' -> parse_str b (Char.chr 10 :: acc, len + 1) | 'r' -> parse_str b (Char.chr 13 :: acc, len + 1) | 't' -> parse_str b (Char.chr 9 :: acc, len + 1) - | 'u' -> parse_str b (accumulate_utf8 (Util.unhex (Ibuffer.next_chars b 4)) (acc, len)) + | 'u' -> parse_str b (accumulate_utf8 (Util.unhex (Bytes.to_string (Ibuffer.next_chars b 4))) (acc, len)) | c -> parse_str b (c :: acc, len + 1)) | c -> parse_str b (c :: acc, len + 1) @@ -174,21 +174,21 @@ and parse b = | '\"' -> parse_str b ([], 0) | '[' -> parse_arr b [] | '{' -> parse_rec b [] - | 't' -> if Ibuffer.next_chars b 3 = "rue" then Flg true else raise Syntax_error - | 'f' -> if Ibuffer.next_chars b 4 = "alse" then Flg false else raise Syntax_error - | 'n' -> if Ibuffer.next_chars b 3 = "ull" then Nil else raise Syntax_error + | 't' -> if Ibuffer.next_chars b 3 = (Bytes.of_string "rue") then Flg true else raise Syntax_error + | 'f' -> if Ibuffer.next_chars b 4 = (Bytes.of_string "alse") then Flg false else raise Syntax_error + | 'n' -> if Ibuffer.next_chars b 3 = (Bytes.of_string "ull") then Nil else raise Syntax_error | '/' -> (* cheating *) skip_line_comment b; parse b | _ -> raise Syntax_error -let of_string s = parse (Ibuffer.of_string s) +let of_string s = parse (Ibuffer.of_bytes (Bytes.of_string s)) let resp code reason extra_headers j = Httpd.resp_generic code reason ((Httpd.content_type_header_name, "application/json") :: extra_headers) - (Httpd.Fixed (to_string j)) + (Httpd.Fixed (Bytes.of_string (to_string j))) let resp_ok extra_headers j = resp 200 "OK" extra_headers j -let load filename = of_string (Util.file_contents filename) +let load filename = of_string (Bytes.to_string (Util.file_contents filename)) let get j i = match j with @@ -197,7 +197,7 @@ let get j i = let find s j = match j with - | Rec kvs -> List.assoc s kvs + | Rec kvs -> List.assoc (Bytes.to_string s) kvs | _ -> failwith "Json.find" let set j i v = @@ -209,9 +209,11 @@ let set j i v = | _ -> failwith "Json.set" -let add k v j = +let add kb v j = match j with - | Rec kvs -> Rec (List.remove_assoc k kvs @ [k, v]) + | Rec kvs -> + let k = Bytes.to_string kb in + Rec (List.remove_assoc k kvs @ [k, v]) | _ -> failwith "Json.add" let push j v = @@ -250,7 +252,7 @@ let path_fold seed f g j = let rec loop seed kvs = match kvs with | [] -> seed - | (k, v) :: kvs' -> loop (walk seed (Gpath.Field k :: prefixrev) v) kvs' + | (k, v) :: kvs' -> loop (walk seed (Gpath.Field (Bytes.of_string k) :: prefixrev) v) kvs' in loop (g (List.rev prefixrev) (Rec []) seed) kvs | other -> f (List.rev prefixrev) other seed diff --git a/server/log.ml b/server/log.ml index e704da6..9d4bea9 100644 --- a/server/log.ml +++ b/server/log.ml @@ -30,6 +30,6 @@ let write_to_log label body = let hook = ref write_to_log -let info message args = (!hook) "info" (Arr (Str message :: args)) -let warn message args = (!hook) "warn" (Arr (Str message :: args)) -let error message args = (!hook) "error" (Arr (Str message :: args)) +let info message args = (!hook) "info" (Arr (Sexp.str message :: args)) +let warn message args = (!hook) "warn" (Arr (Sexp.str message :: args)) +let error message args = (!hook) "error" (Arr (Sexp.str message :: args)) diff --git a/server/meta.ml b/server/meta.ml index 521ea84..9f03c81 100644 --- a/server/meta.ml +++ b/server/meta.ml @@ -17,14 +17,15 @@ open Sexp -let n_meta = Node.name_of_string "meta" +let n_meta = Node.name_of_bytes (Bytes.of_string "meta") let announce_subscription source filter sink name on_off = Node.post_ignore n_meta (Str source.Node.label) (if on_off then Message.subscribed (Str source.Node.label, filter, Str sink, name) else Message.unsubscribed (Str source.Node.label, filter, Str sink, name)) - (Str "") + emptystr let init () = - Node.send_ignore' "factory" (Message.create (Str "direct", Arr [Str "meta"], Str "", Str "")) + Node.send_ignore' (Bytes.of_string "factory") + (Message.create (litstr "direct", Arr [litstr "meta"], emptystr, emptystr)) diff --git a/server/net.ml b/server/net.ml index f7b86ae..b4f919d 100644 --- a/server/net.ml +++ b/server/net.ml @@ -26,9 +26,8 @@ let rec accept_loop sock connection_start_fn = let start_net protocol_name port_number connection_start_fn = let sock = socket Unix.PF_INET Unix.SOCK_STREAM 0 in setsockopt sock Unix.SO_REUSEADDR true; - bind sock (Unix.ADDR_INET (Unix.inet_addr_any, port_number)); + lwt () = bind sock (Unix.ADDR_INET (Unix.inet_addr_any, port_number)) in listen sock 5; - Server_control.milestone (protocol_name ^ " ready"); - ignore (Log.info "Accepting connections" - [Sexp.Str protocol_name; Sexp.Str (string_of_int port_number)]); + Server_control.milestone (Bytes.of_string (protocol_name ^ " ready")); + ignore (Log.info "Accepting connections" [Sexp.str protocol_name; Sexp.str (string_of_int port_number)]); accept_loop sock connection_start_fn diff --git a/server/node.ml b/server/node.ml index d88da53..b0c8ee9 100644 --- a/server/node.ml +++ b/server/node.ml @@ -22,13 +22,13 @@ open Status type handle_message_t = t -> Sexp.t -> unit Lwt.t and t = { - mutable names: StringSet.t; + mutable names: BytesSet.t; mutable send_counter: int; - class_name: string; + class_name: bytes; handle_message: handle_message_t } and name = { - label: string; + label: bytes; mutable binding: t option } @@ -39,31 +39,31 @@ module NameTable = Weak.Make(struct end) module NameSet = Set.Make(struct type t = name - let compare a b = String.compare a.label b.label + let compare a b = Bytes.compare a.label b.label end) let name_table = NameTable.create 100 let directory = ref NameSet.empty -let name_of_string str = +let name_of_bytes str = let template = {label = str; binding = None} in NameTable.merge name_table template -let caching_name_of_string () = +let caching_name_of_bytes () = let cache = ref None in fun str -> match !cache with | Some ({label = k} as n) when k = str -> n | _ -> - let n = name_of_string str in + let n = name_of_bytes str in cache := Some n; n -let local_container_name () = "server" +let local_container_name () = (Bytes.of_string "server") let make class_name handler = { - names = StringSet.empty; + names = BytesSet.empty; send_counter = 0; class_name = class_name; handle_message = handler @@ -72,7 +72,7 @@ let make class_name handler = { let lookup name = name.binding let all_node_names () = NameSet.elements !directory -let all_node_name_strings () = List.map (fun x -> x.label) (all_node_names ()) +let all_node_name_bytes () = List.map (fun x -> x.label) (all_node_names ()) (* Approximate because it doesn't lock or run in a transaction *) let approx_exists name = @@ -81,14 +81,14 @@ let approx_exists name = | None -> false let bind (filter, node) = - if filter.label = "" + if filter.label = Bytes.empty then (ignore (Log.warn "Binding to empty name forbidden" []); return false) else match filter.binding with | None -> filter.binding <- Some node; directory := NameSet.add filter !directory; - node.names <- StringSet.add filter.label node.names; + node.names <- BytesSet.add filter.label node.names; ignore (Log.info "Node bound" [Sexp.Str filter.label; Sexp.Str node.class_name]); return true | Some _ -> @@ -99,7 +99,7 @@ let make_named class_name node_name handler = let node = make class_name handler in match_lwt bind (node_name, node) with | true -> return (Ok node) - | false -> return (Problem (Sexp.Str "bind-failed")) + | false -> return (Problem (Sexp.litstr "bind-failed")) (* For use in factory constructor functions, hence the odd return type and values *) let make_idempotent_named class_name node_name if_new_node handler = @@ -107,18 +107,18 @@ let make_idempotent_named class_name node_name if_new_node handler = | Some n -> return (if n.class_name = class_name then Ok n - else Problem (Sexp.Str "class-mismatch")) + else Problem (Sexp.litstr "class-mismatch")) | None -> let node = make class_name handler in match_lwt bind (node_name, node) with | true -> lwt () = if_new_node () in return (Ok node) - | false -> return (Problem (Sexp.Str "bind-failed")) + | false -> return (Problem (Sexp.litstr "bind-failed")) let unbind name = match lookup name with | Some n -> ignore (Log.info "Node unbound" [Sexp.Str name.label; Sexp.Str n.class_name]); - n.names <- StringSet.remove name.label n.names; + n.names <- BytesSet.remove name.label n.names; name.binding <- None; directory := NameSet.remove name !directory; return true @@ -128,10 +128,10 @@ let unbind name = let unbind_all n = lwt () = Lwt_list.iter_s - (fun name -> lwt _ = unbind (name_of_string name) in return ()) - (StringSet.elements n.names) + (fun name -> lwt _ = unbind (name_of_bytes name) in return ()) + (BytesSet.elements n.names) in - n.names <- StringSet.empty; + n.names <- BytesSet.empty; return () let send name body = @@ -141,8 +141,8 @@ let send name body = (try_lwt n.handle_message n body with e -> Log.warn "Node message handler raised exception" - [Sexp.Str name.label; - Sexp.Str (Printexc.to_string e)]) + [Sexp.Str name.label; + Sexp.str (Printexc.to_string e)]) in n.send_counter <- n.send_counter + 1; lwt () = @@ -154,12 +154,12 @@ let send name body = | None -> return false -let send' str body = send (name_of_string str) body +let send' str body = send (name_of_bytes str) body let post name label body token = send name (Message.post (label, body, token)) -let post' str label body token = post (name_of_string str) label body token +let post' str label body token = post (name_of_bytes str) label body token let bind_ignore (filter, node) = match_lwt bind (filter, node) with @@ -170,13 +170,13 @@ let send_ignore name body = match_lwt send name body with | true -> return () | false -> - if name.label = "" + if name.label = Bytes.empty then return () else Log.warn "send to missing node" [Sexp.Str name.label; body] -let send_ignore' str body = send_ignore (name_of_string str) body +let send_ignore' str body = send_ignore (name_of_bytes str) body let post_ignore name label body token = send_ignore name (Message.post (label, body, token)) -let post_ignore' str label body token = post_ignore (name_of_string str) label body token +let post_ignore' str label body token = post_ignore (name_of_bytes str) label body token diff --git a/server/obuffer.ml b/server/obuffer.ml index 5ea3ab0..4e1cff1 100644 --- a/server/obuffer.ml +++ b/server/obuffer.ml @@ -20,41 +20,41 @@ (* Extensible buffers *) type t = - {mutable buffer : string; + {mutable buffer : bytes; mutable position : int; mutable length : int; - initial_buffer : string} + initial_buffer : bytes} let create n = let n = if n < 1 then 1 else n in let n = if n > Sys.max_string_length then Sys.max_string_length else n in - let s = String.create n in + let s = Bytes.create n in {buffer = s; position = 0; length = n; initial_buffer = s} -let contents b = String.sub b.buffer 0 b.position +let contents b = Bytes.sub b.buffer 0 b.position let sub b ofs len = if ofs < 0 || len < 0 || ofs > b.position - len then invalid_arg "Obuffer.sub" else begin - let r = String.create len in - String.blit b.buffer ofs r 0 len; + let r = Bytes.create len in + Bytes.blit b.buffer ofs r 0 len; r end ;; let blit src srcoff dst dstoff len = if len < 0 || srcoff < 0 || srcoff > src.position - len - || dstoff < 0 || dstoff > (String.length dst) - len + || dstoff < 0 || dstoff > (Bytes.length dst) - len then invalid_arg "Obuffer.blit" else - String.blit src.buffer srcoff dst dstoff len + Bytes.blit src.buffer srcoff dst dstoff len ;; let nth b ofs = if ofs < 0 || ofs >= b.position then invalid_arg "Obuffer.nth" - else String.get b.buffer ofs + else Bytes.get b.buffer ofs ;; let length b = b.position @@ -63,7 +63,7 @@ let clear b = b.position <- 0 let reset b = b.position <- 0; b.buffer <- b.initial_buffer; - b.length <- String.length b.buffer + b.length <- Bytes.length b.buffer let resize b more = let len = b.length in @@ -74,30 +74,30 @@ let resize b more = then new_len := Sys.max_string_length else failwith "Obuffer.add: cannot grow buffer" end; - let new_buffer = String.create !new_len in - String.blit b.buffer 0 new_buffer 0 b.position; + let new_buffer = Bytes.create !new_len in + Bytes.blit b.buffer 0 new_buffer 0 b.position; b.buffer <- new_buffer; b.length <- !new_len let add_char b c = let pos = b.position in if pos >= b.length then resize b 1; - b.buffer.[pos] <- c; + Bytes.set b.buffer pos c; b.position <- pos + 1 let add_substring b s offset len = - if offset < 0 || len < 0 || offset > String.length s - len + if offset < 0 || len < 0 || offset > Bytes.length s - len then invalid_arg "Obuffer.add_substring"; let new_position = b.position + len in if new_position > b.length then resize b len; - String.blit s offset b.buffer b.position len; + Bytes.blit s offset b.buffer b.position len; b.position <- new_position let add_string b s = - let len = String.length s in + let len = Bytes.length s in let new_position = b.position + len in if new_position > b.length then resize b len; - String.blit s 0 b.buffer b.position len; + Bytes.blit s 0 b.buffer b.position len; b.position <- new_position let add_buffer b bs = diff --git a/server/queuenode.ml b/server/queuenode.ml index ad346fe..06067f0 100644 --- a/server/queuenode.ml +++ b/server/queuenode.ml @@ -32,12 +32,12 @@ type t = { mutable waiters: int; } -let classname = "queue" +let classname = (Bytes.of_string "queue") let report info = while_lwt true do lwt () = Log.info (Printf.sprintf "%s: %d backlog, %d waiters" - info.name.Node.label + (Bytes.to_string info.name.Node.label) info.backlog info.waiters) [] in Lwt_unix.sleep 1.0 @@ -93,7 +93,7 @@ let queue_factory arg = let (bin, bout) = Lwt_stream.create () in let (win, wout) = Lwt_stream.create () in let info = { - name = Node.name_of_string name_str; + name = Node.name_of_bytes name_str; subscriptions = Subscription.new_set (); backlog_in = bin; backlog_out = bout; @@ -111,7 +111,7 @@ let queue_factory arg = (queue_handler info)) (Str name_str) | _ -> - return (Problem (Str "bad-arg")) + return (Problem (litstr "bad-arg")) let init () = Factory.register_class classname queue_factory diff --git a/server/relay.ml b/server/relay.ml index 27ccd80..3a36178 100644 --- a/server/relay.ml +++ b/server/relay.ml @@ -26,36 +26,36 @@ let send_error ch message details = ch m let send_sexp_syntax_error ch explanation = - send_error ch explanation (Str "http://people.csail.mit.edu/rivest/Sexp.txt") + send_error ch explanation (litstr "http://people.csail.mit.edu/rivest/Sexp.txt") let dispatch_message n ch = - let lookup = Node.caching_name_of_string () in + let lookup = Node.caching_name_of_bytes () in function | Message.Post (Str name, body, token) -> Node.send_ignore (lookup name) body | Message.Subscribe (Str filter, sink, name, Str reply_sink, Str reply_name) -> - (match_lwt Node.bind (Node.name_of_string filter, n) with + (match_lwt Node.bind (Node.name_of_bytes filter, n) with | true -> Node.post_ignore' reply_sink (Str reply_name) (Message.subscribe_ok (Str filter)) - (Str "") + emptystr | false -> Log.warn "Bind failed" [Str filter]) | Message.Unsubscribe (Str token) -> - (match_lwt Node.unbind (Node.name_of_string token) with + (match_lwt Node.unbind (Node.name_of_bytes token) with | true -> return () | false -> Log.warn "Unbind failed" [Str token]) | m -> - send_error ch "Message not understood" (Message.sexp_of_message m) + send_error ch (Bytes.of_string "Message not understood") (Message.sexp_of_message m) let issue_banner cin cout = - lwt () = output_sexp cout (Arr [Str "hop"; Str ""]) in + lwt () = output_sexp cout (Arr [litstr "hop"; emptystr]) in lwt () = Lwt_io.flush cout in lwt () = output_sexp cout (Message.subscribe (Str (Node.local_container_name()), - Str "", Str "", - Str "", Str "")) in + emptystr, emptystr, + emptystr, emptystr)) in lwt () = Lwt_io.flush cout in return true @@ -74,9 +74,10 @@ let relay_mainloop (peername, mtx, cin, cout) n = done with | Sexp.Syntax_error explanation -> - lwt () = send_sexp_syntax_error write_sexp explanation in + lwt () = send_sexp_syntax_error write_sexp (Bytes.of_string explanation) in Log.info "Disconnected relay for syntax error" - [Str (Connections.endpoint_name peername); Str explanation]) + [str (Connections.endpoint_name peername); + str explanation]) let start (s, peername) = Connections.start_connection "relay" issue_banner @@ -84,4 +85,4 @@ let start (s, peername) = let init () = let port = Config.get_int "hop.port" 5671 in - Util.create_daemon_thread "Hop listener" None (Net.start_net "Hop" port) start + Util.create_daemon_thread (Bytes.of_string "Hop listener") None (Net.start_net "Hop" port) start diff --git a/server/server_control.ml b/server/server_control.ml index 3ebb3b5..13473eb 100644 --- a/server/server_control.ml +++ b/server/server_control.ml @@ -21,7 +21,7 @@ open Datastructures let continue_running = ref true let (cq_in, cq_out) = Lwt_stream.create () -let achieved_milestones = ref StringSet.empty +let achieved_milestones = ref BytesSet.empty let milestone name = cq_out (Some (`Milestone name)) @@ -30,7 +30,7 @@ let shutdown_now details = cq_out (Some (`Shutdown details)) let is_milestone_achieved m = match m with | Some m' -> - StringSet.mem m' !achieved_milestones + BytesSet.mem m' !achieved_milestones | None -> false @@ -46,7 +46,7 @@ let rec run' until_milestone = return () | `Milestone name -> ignore (Log.info "Achieved milestone" [Sexp.Str name]); - achieved_milestones := StringSet.add name !achieved_milestones; + achieved_milestones := BytesSet.add name !achieved_milestones; run' until_milestone) let is_running () = !continue_running diff --git a/server/sexp.ml b/server/sexp.ml index 05da56b..63ca094 100644 --- a/server/sexp.ml +++ b/server/sexp.ml @@ -22,33 +22,39 @@ open Lwt_io exception Syntax_error of string -type display_hint_t = {hint : string; body : string} +type display_hint_t = {hint : bytes; body : bytes} and t = - | Str of string + | Str of bytes | Hint of display_hint_t | Arr of t list -let compare a b = Pervasives.compare a b +let emptystr = Str Bytes.empty +let litstr s = Str (Bytes.unsafe_of_string s) +let str s = Str (Bytes.of_string s) + +let compare a b = Stdlib.compare a b let digit_val c = (int_of_char c) - (int_of_char '0') let val_digit n = char_of_int (n + 48) -let intstr = +let bytes_of_int = let siz = 40 in - let buf = String.make siz (* enough for 128 bits *) ' ' in + let buf = Bytes.make siz (* enough for 128 bits *) ' ' in function - | 0 -> "0" + | 0 -> Bytes.of_string "0" | n -> let rec loop n i = if n = 0 - then String.sub buf (siz - i) i - else (String.unsafe_set buf (siz - i - 1) (val_digit (n mod 10)); + then Bytes.sub buf (siz - i) i + else (Bytes.unsafe_set buf (siz - i - 1) (val_digit (n mod 10)); loop (n / 10) (i + 1)) in loop n 0 +let int_of_bytes bs = int_of_string (Bytes.to_string bs) + let write_simple_string write s = - lwt () = write (intstr (String.length s)) in - lwt () = write ":" in + lwt () = write (bytes_of_int (Bytes.length s)) in + lwt () = write (Bytes.of_string ":") in write s let generic_output_sexp write x = @@ -58,17 +64,19 @@ let generic_output_sexp write x = | Str s -> writestr s | Hint {hint = h; body = b} -> - lwt () = write "[" in + lwt () = write (Bytes.of_string "[") in lwt () = writestr h in - lwt () = write "]" in + lwt () = write (Bytes.of_string "]") in writestr b | Arr xs -> - lwt () = write "(" in + lwt () = write (Bytes.of_string "(") in lwt () = Lwt_list.iter_s walk xs in - write ")" + write (Bytes.of_string ")") in walk x -let output_sexp ch x = generic_output_sexp (write ch) x +let write_bytes ch bs = write_from_exactly ch bs 0 (Bytes.length bs) + +let output_sexp ch x = generic_output_sexp (write_bytes ch) x let stream_of_sexp x = Streamutil.stream_generator (fun yield -> generic_output_sexp yield x) let write_char_escaped ch c = @@ -78,7 +86,7 @@ let write_char_escaped ch c = let write_simple_string_human ch s = lwt () = write_char ch '\"' in - lwt () = write ch (String.escaped s) in + lwt () = write_bytes ch (Bytes.escaped s) in write_char ch '\"' let rec output_sexp_human ch x = @@ -109,7 +117,7 @@ let char_numeric c = '0' <= c && c <= '9' let char_whitespace c = c <= ' ' let input_bytes ch count = - let buf = String.create count in (* mutable strings?!?! *) + let buf = Bytes.create count in lwt () = read_into_exactly ch buf 0 count in return buf @@ -154,8 +162,8 @@ let parse b = input_sexp_outer (fun () -> return (Ibuffer.next_char b)) (fun count -> return (Ibuffer.next_chars b count)) -let sexp_of_string s = parse (Ibuffer.of_string s) -let string_of_sexp x = Streamutil.stream_to_string (stream_of_sexp x) +let sexp_of_bytes s = parse (Ibuffer.of_bytes s) +let bytes_of_sexp x = Streamutil.stream_to_bytes (stream_of_sexp x) let assoc' key v = match v with diff --git a/server/sexpjson.ml b/server/sexpjson.ml index ea5fc5b..8f78f1e 100644 --- a/server/sexpjson.ml +++ b/server/sexpjson.ml @@ -2,29 +2,30 @@ open Sexp let rec sexp_of_json j = match j with - | Json.Num f -> Hint {hint = "num"; body = Json.to_string j} - | Json.Str s -> Str s + | Json.Num f -> Hint {hint = Bytes.of_string "num"; body = Bytes.of_string (Json.to_string j)} + | Json.Str s -> str s | Json.Arr js -> Arr (List.map sexp_of_json js) - | Json.Rec kvs -> Arr ((Hint {hint = "obj"; body = ""}) :: - (List.map (fun (k, v) -> Arr [Str k; sexp_of_json v]) kvs)) - | Json.Flg f -> Hint {hint = "bool"; body = string_of_bool f} - | Json.Nil -> Hint {hint = "null"; body = ""} + | Json.Rec kvs -> Arr ((Hint {hint = Bytes.of_string "obj"; body = Bytes.empty}) :: + (List.map (fun (k, v) -> Arr [str k; sexp_of_json v]) kvs)) + | Json.Flg f -> Hint {hint = (Bytes.of_string "bool"); body = (Bytes.of_string (string_of_bool f))} + | Json.Nil -> Hint {hint = (Bytes.of_string "null"); body = Bytes.empty} let json_of_sexp x = let rec walk x = match x with - | Hint {hint = "num"; body = n} -> Json.Num (float_of_string n) - | Str s -> Json.Str s - | Arr ((Hint {hint = "obj"; body = ""}) :: kvs) -> + | Hint {hint = hint; body = n} when hint = Bytes.of_string "num" -> + Json.Num (float_of_string (Bytes.to_string n)) + | Str s -> Json.Str (Bytes.to_string s) + | Arr ((Hint {hint = hint; body = bs}) :: kvs) when hint = Bytes.of_string "obj" && bs = Bytes.empty -> Json.Rec (List.map (fun kv -> (match kv with - | Arr [Str k; v] -> (k, walk v) + | Arr [Str k; v] -> (Bytes.to_string k, walk v) | _ -> raise (Syntax_error "Bad JSON-SEXP key-value"))) kvs) | Arr xs -> Json.Arr (List.map walk xs) - | Hint {hint = "bool"; body = bs} -> Json.Flg (bool_of_string bs) - | Hint {hint = "null"; body = ""} -> Json.Nil - | Hint {hint = h; body = b} -> Json.Rec ["_hint", Json.Str h; "_body", Json.Str b] + | Hint {hint = hint; body = bs} when hint = Bytes.of_string "bool" -> Json.Flg (bool_of_string (Bytes.to_string bs)) + | Hint {hint = hint; body = bs} when hint = Bytes.of_string "null" && bs = Bytes.empty -> Json.Nil + | Hint {hint = h; body = b} -> Json.Rec ["_hint", Json.Str (Bytes.to_string h); "_body", Json.Str (Bytes.to_string b)] in Lwt.wrap1 walk x diff --git a/server/streamutil.ml b/server/streamutil.ml index 821beee..d63bf98 100644 --- a/server/streamutil.ml +++ b/server/streamutil.ml @@ -17,9 +17,9 @@ open Lwt -let stream_to_string s = +let stream_to_bytes s = lwt pieces = Lwt_stream.to_list s in - return (String.concat "" pieces) + return (Bytes.concat Bytes.empty pieces) let stream_generator f = let mbox = Lwt_mvar.create_empty () in @@ -27,3 +27,5 @@ let stream_generator f = ignore (lwt () = f yield in Lwt_mvar.put mbox None); Lwt_stream.from (fun () -> Lwt_mvar.take mbox) + +let stream_encode s = Lwt_stream.map Bytes.of_string s diff --git a/server/subscription.ml b/server/subscription.ml index c462b23..51723f0 100644 --- a/server/subscription.ml +++ b/server/subscription.ml @@ -32,29 +32,29 @@ type creation_t = type set_t = { mutable subscription_table: Uuid.t SexpMap.t; - mutable uuid_table: t StringMap.t + mutable uuid_table: t BytesMap.t } let new_set () = { subscription_table = SexpMap.empty; - uuid_table = StringMap.empty + uuid_table = BytesMap.empty } let count subs = SexpMap.cardinal subs.subscription_table -let key_from sink_str name filter = Sexp.Arr [Sexp.Str sink_str; name; filter] +let key_from sink_bs name filter = Sexp.Arr [Sexp.Str sink_bs; name; filter] let create source subs filter sink_str name reply_sink reply_name = let key = key_from sink_str name filter in try let uuid = SexpMap.find key subs.subscription_table in lwt () = - Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "") + Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.str uuid)) Sexp.emptystr in - return (Old (StringMap.find uuid subs.uuid_table)) + return (Old (BytesMap.find (Bytes.of_string uuid) subs.uuid_table)) with Not_found -> let uuid = Uuid.create () in - let sink = Node.name_of_string sink_str in + let sink = Node.name_of_bytes sink_str in let sub = { live = true; uuid = uuid; @@ -62,19 +62,19 @@ let create source subs filter sink_str name reply_sink reply_name = sink = sink; name = name } in - subs.uuid_table <- StringMap.add uuid sub subs.uuid_table; + subs.uuid_table <- BytesMap.add (Bytes.of_string uuid) sub subs.uuid_table; subs.subscription_table <- SexpMap.add key uuid subs.subscription_table; lwt () = Lwt.join [ Meta.announce_subscription source filter sink_str name true; - Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "") + Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.str uuid)) Sexp.emptystr ] in return (New sub) let delete source subs uuid = try - let sub = StringMap.find uuid subs.uuid_table in + let sub = BytesMap.find uuid subs.uuid_table in sub.live <- false; - subs.uuid_table <- StringMap.remove uuid subs.uuid_table; + subs.uuid_table <- BytesMap.remove uuid subs.uuid_table; let key = key_from sub.sink.Node.label sub.name sub.filter in subs.subscription_table <- SexpMap.remove key subs.subscription_table; lwt () = Meta.announce_subscription source sub.filter sub.sink.Node.label sub.name false in @@ -83,16 +83,16 @@ let delete source subs uuid = return None let lookup subs uuid = - try Some (StringMap.find uuid subs.uuid_table) + try Some (BytesMap.find uuid subs.uuid_table) with Not_found -> None let send_to_subscription' sub body delete_action = if not sub.live then return false else - match_lwt Node.post sub.sink sub.name body (Sexp.Str sub.uuid) with + match_lwt Node.post sub.sink sub.name body (Sexp.str sub.uuid) with | true -> return true | false -> (lwt _ = delete_action sub.uuid in return false) let send_to_subscription source subs sub body = - send_to_subscription' sub body (fun (uuid) -> delete source subs uuid) + send_to_subscription' sub body (fun (uuid) -> delete source subs (Bytes.of_string uuid)) diff --git a/server/ui_main.ml b/server/ui_main.ml index 68bd21b..a8ada36 100644 --- a/server/ui_main.ml +++ b/server/ui_main.ml @@ -56,7 +56,7 @@ let handle_req id r = let start (s, peername) = let id = "http-" ^ Uuid.create () in - Util.create_thread (Connections.endpoint_name peername ^ " HTTP service") + Util.create_thread (Bytes.of_string (Connections.endpoint_name peername ^ " HTTP service")) None (Httpd.main (handle_req id)) (s, peername) @@ -67,7 +67,7 @@ let api_server_stats _ id r = ["connection_count", Json.Num (float_of_int !Connections.connection_count); "boot_time", Json.Num boot_time; "uptime", Json.Num (Unix.time () -. boot_time); - "classes", Json.Arr (List.map Json.str (Factory.all_class_names ()))]) + "classes", Json.Arr (List.map Json.str (List.map Bytes.to_string (Factory.all_class_names ())))]) |> Httpd.add_date_header let api_nodes _ id r = @@ -80,16 +80,17 @@ let api_nodes _ id r = Json.resp_ok [] (Json.Rec (List.map - (fun (class_name, node_names) -> (class_name, Json.Arr (List.map Json.str node_names))) - (StringMap.bindings info))) + (fun (class_name, node_names) -> (Bytes.to_string class_name, + Json.Arr (List.map Json.str (List.map Bytes.to_string node_names)))) + (BytesMap.bindings info))) |> Httpd.add_date_header let api_node_info suffix id r = - (match Node.lookup (Node.name_of_string suffix) with + (match Node.lookup (Node.name_of_bytes (Bytes.of_string suffix)) with | Some n -> Json.resp_ok [] (Json.Rec - ["names", Json.Arr (List.map Json.str (StringSet.elements n.Node.names)); - "class_name", Json.Str n.Node.class_name]) + ["names", Json.Arr (List.map Json.str (List.map Bytes.to_string (BytesSet.elements n.Node.names))); + "class_name", Json.Str (Bytes.to_string n.Node.class_name)]) | None -> Json.resp 404 "No such node name" [] Json.Nil) |> Httpd.add_date_header @@ -99,4 +100,4 @@ let init () = register_dispatcher ("/_/nodes", api_nodes); register_dispatcher ("/_/node/", api_node_info); let port = Config.get_int "http.port" 5678 in - Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" port) start + Util.create_daemon_thread (Bytes.of_string "HTTP listener") None (Net.start_net "HTTP" port) start diff --git a/server/ui_relay.ml b/server/ui_relay.ml index 25a4d55..e0d39d8 100644 --- a/server/ui_relay.ml +++ b/server/ui_relay.ml @@ -19,20 +19,20 @@ open Lwt open Hof open Datastructures -let all_sources = ref StringMap.empty +let all_sources = ref BytesMap.empty let rec api_tap_source id r = let mbox = Lwt_mvar.create (Some (Message.subscribe (Sexp.Str (Node.local_container_name()), - Sexp.Str "", Sexp.Str "", - Sexp.Str "", Sexp.Str ""))) in + Sexp.emptystr, Sexp.emptystr, + Sexp.emptystr, Sexp.emptystr))) in let handle_message n sexp = Lwt_mvar.put mbox (Some sexp) in - let n = Node.make "http_tap" handle_message in - all_sources := StringMap.add id n !all_sources; + let n = Node.make (Bytes.of_string "http_tap") handle_message in + all_sources := BytesMap.add id n !all_sources; let shutdown () = - all_sources := StringMap.remove id !all_sources; + all_sources := BytesMap.remove id !all_sources; lwt () = Node.unbind_all n in Lwt_mvar.put mbox None in @@ -40,10 +40,11 @@ let rec api_tap_source id r = let generator yield = let body_counter = ref 0 in let yield_and_count s = - body_counter := String.length s + !body_counter; - yield s + let bs = Bytes.of_string s in + body_counter := Bytes.length bs + !body_counter; + yield bs in - lwt () = yield_and_count (id ^ ";" ^ String.make 2048 'h' ^ ";") in + lwt () = yield_and_count ((Bytes.to_string id) ^ ";" ^ String.make 2048 'h' ^ ";") in let rec drain_mbox () = match_lwt Lwt_mvar.take mbox with | None -> return () @@ -74,20 +75,20 @@ let dispatch_message n m = lwt () = Node.send_ignore' name body in Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content) | Message.Subscribe (Sexp.Str filter, sink, name, Sexp.Str reply_sink, Sexp.Str reply_name) -> - (match_lwt Node.bind (Node.name_of_string filter, n) with + (match_lwt Node.bind (Node.name_of_bytes filter, n) with | true -> lwt () = Node.post_ignore' reply_sink (Sexp.Str reply_name) (Message.subscribe_ok (Sexp.Str filter)) - (Sexp.Str "") + Sexp.emptystr in Httpd.resp_generic 204 "Bound" [] (Httpd.empty_content) | false -> lwt () = Log.warn "Bind failed" [Sexp.Str filter] in Httpd.http_error_html 409 "Bind failed" []) | Message.Unsubscribe (Sexp.Str token) -> - (match_lwt Node.unbind (Node.name_of_string token) with + (match_lwt Node.unbind (Node.name_of_bytes token) with | true -> Httpd.resp_generic 204 "Unbound" [] (Httpd.empty_content) | false -> @@ -97,13 +98,13 @@ let dispatch_message n m = Httpd.http_error_html 406 "Message not understood" [] let api_tap_sink irrelevant_id r = - lwt content = Httpd.string_of_content r.Httpd.req_body.Httpd.content in - lwt params = Httpd.parse_urlencoded content in + lwt content = Httpd.bytes_of_content r.Httpd.req_body.Httpd.content in + lwt params = Httpd.parse_urlencoded (Bytes.to_string content) in match Httpd.find_param "metadata.type" params with | Some (Some "send") -> (match Httpd.find_param "metadata.id" params with | Some (Some id) -> - (match (try Some (StringMap.find id !all_sources) with Not_found -> None) with + (match (try Some (BytesMap.find (Bytes.of_string id) !all_sources) with Not_found -> None) with | Some n -> (match Httpd.find_param "data" params with | Some (Some data_str) -> @@ -123,8 +124,8 @@ let api_tap_sink irrelevant_id r = let api_tap _ id r = match r.Httpd.verb with - | "GET" -> api_tap_source id r - | "POST" -> api_tap_sink id r + | "GET" -> api_tap_source (Bytes.of_string id) r + | "POST" -> api_tap_sink (Bytes.of_string id) r | _ -> Httpd.http_error_html 400 "Unsupported tap method" [] let init () = diff --git a/server/util.ml b/server/util.ml index 7724d6e..7571be1 100644 --- a/server/util.ml +++ b/server/util.ml @@ -20,14 +20,14 @@ open Sexp open Printf let message_not_understood context m = - Log.warn "Message not understood" [Str context; Message.sexp_of_message m] + Log.warn "Message not understood" [str context; Message.sexp_of_message m] let create_thread name cleanup main initarg = let guarded_main initarg = try_lwt main initarg with e -> - lwt () = Log.warn "Thread died with exception" [Str name; Str (Printexc.to_string e)] in + lwt () = Log.warn "Thread died with exception" [Str name; str (Printexc.to_string e)] in (match cleanup with | Some cleaner -> cleaner e | None -> return ()) @@ -38,7 +38,7 @@ let daemon_thread_died name nested_cleaner e = lwt () = (match nested_cleaner with | Some c -> c e | None -> return ()) in - Server_control.shutdown_now [Sexp.Str "Daemon thread exited"; Sexp.Str name]; + Server_control.shutdown_now [litstr "Daemon thread exited"; Str name]; return () let create_daemon_thread name cleanup main initarg = @@ -107,7 +107,7 @@ let stream_generator f = let file_contents filename = let ch = open_in filename in let len = in_channel_length ch in - let buf = String.make len ' ' in + let buf = Bytes.make len ' ' in really_input ch buf 0 len; close_in ch; buf