Improve and re-use happy-eyeballs
This commit is contained in:
parent
e2ba31f00d
commit
3fa3cccb4c
@ -1,4 +1,4 @@
|
||||
# A simple HTTP client (http/1.1 & h2) with [Miou][miou]
|
||||
# A simple HTTP client/server (http/1.1 & h2) with [Miou][miou]
|
||||
|
||||

|
||||
|
||||
|
||||
34
app/pars.ml
34
app/pars.ml
@ -22,9 +22,9 @@ let reporter ppf =
|
||||
{ Logs.report }
|
||||
|
||||
let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()
|
||||
let () = Logs.set_reporter (reporter Fmt.stderr)
|
||||
|
||||
(* let () = Logs.set_level ~all:true (Some Logs.Debug) *)
|
||||
(* let () = Logs.set_reporter (reporter Fmt.stderr) *)
|
||||
let () = Logs.set_level ~all:true (Some Logs.Debug)
|
||||
let () = Logs_threaded.enable ()
|
||||
let () = Printexc.record_backtrace true
|
||||
|
||||
@ -62,7 +62,7 @@ let epr fmt =
|
||||
|
||||
type reporter = All_knowning of int Progress.Reporter.t | Unknown of int
|
||||
|
||||
let download ~orphans ~events ~uid ~uri =
|
||||
let download ~orphans ~events ~uid ~resolver ~uri =
|
||||
let _prm =
|
||||
Miou.call ~orphans @@ fun () ->
|
||||
let got_response = ref false in
|
||||
@ -75,7 +75,7 @@ let download ~orphans ~events ~uid ~uri =
|
||||
end;
|
||||
Miou.Queue.enqueue events (`Data (uid, String.length str))
|
||||
in
|
||||
match Httpcats.request ~uri ~f () with
|
||||
match Httpcats.request ~resolver ~uri ~f () with
|
||||
| Ok (_response, ()) -> Miou.Queue.enqueue events (`End uid)
|
||||
| Error err -> Miou.Queue.enqueue events (`Error (uid, err))
|
||||
in
|
||||
@ -96,9 +96,10 @@ type t =
|
||||
; reporters : reporter array
|
||||
; display : display
|
||||
; align : int
|
||||
; resolver : Happy.stack
|
||||
}
|
||||
|
||||
let make ~filenames =
|
||||
let make ~resolver ~filenames =
|
||||
let gen = Atomic.make 0 in
|
||||
let orphans = Miou.orphans () in
|
||||
let events = Miou.Queue.create () in
|
||||
@ -116,7 +117,7 @@ let make ~filenames =
|
||||
0 filenames
|
||||
in
|
||||
let [] = Progress.Display.reporters display in
|
||||
{ gen; orphans; events; reporters; display; align }
|
||||
{ gen; orphans; events; reporters; display; align; resolver }
|
||||
|
||||
let get_length { Httpcats.headers; _ } =
|
||||
let headers = H2.Headers.to_list headers in
|
||||
@ -165,10 +166,13 @@ let consume t events =
|
||||
| All_knowning reporter -> Progress.Reporter.finalise reporter
|
||||
| _ -> ())
|
||||
| `Error (uid, err) ->
|
||||
Progress.interject_with (fun () ->
|
||||
Fmt.pr "[%a]: %a\n%!"
|
||||
Fmt.(styled `Red string)
|
||||
"ERROR" Httpcats.pp_error err);
|
||||
Logs.err (fun m ->
|
||||
m "Got an error for (%04d): %a" uid Httpcats.pp_error err)
|
||||
in
|
||||
Logs.debug (fun m -> m "Handle %d event(s)" (List.length events));
|
||||
List.iter handle events
|
||||
|
||||
let rec run t uris () =
|
||||
@ -181,7 +185,7 @@ let rec run t uris () =
|
||||
| None, uri :: rest ->
|
||||
download ~orphans:t.orphans ~events:t.events
|
||||
~uid:(Atomic.fetch_and_add t.gen 1)
|
||||
~uri;
|
||||
~resolver:t.resolver ~uri;
|
||||
let events' = Miou.Queue.(to_list (transfer t.events)) in
|
||||
consume t events';
|
||||
run t rest ()
|
||||
@ -189,7 +193,7 @@ let rec run t uris () =
|
||||
Option.iter Miou.await_exn prm;
|
||||
download ~orphans:t.orphans ~events:t.events
|
||||
~uid:(Atomic.fetch_and_add t.gen 1)
|
||||
~uri;
|
||||
~resolver:t.resolver ~uri;
|
||||
let events' = Miou.Queue.(to_list (transfer t.events)) in
|
||||
consume t events';
|
||||
run t rest ()
|
||||
@ -236,6 +240,14 @@ let () =
|
||||
uris
|
||||
in
|
||||
Logs.debug (fun m -> m "Got %d uri(s)" (List.length uris));
|
||||
let t = make ~filenames in
|
||||
let daemon, resolver = Happy.stack () in
|
||||
let nameservers =
|
||||
(`Tcp, [ `Plaintext (Ipaddr.of_string_exn "8.8.8.8", 53) ])
|
||||
in
|
||||
let dns = Dns_miou.create ~nameservers resolver in
|
||||
Happy.inject_resolver ~getaddrinfo:(getaddrinfo dns) resolver;
|
||||
let t = make ~resolver ~filenames in
|
||||
let prm = Miou.call_cc (run t uris) in
|
||||
Miou.await_exn prm
|
||||
let result = Miou.await prm in
|
||||
Happy.kill daemon;
|
||||
match result with Ok () -> () | Error exn -> raise exn
|
||||
|
||||
20
src/happy.ml
20
src/happy.ml
@ -13,7 +13,7 @@ let to_sockaddr (ipaddr, port) =
|
||||
Unix.ADDR_INET (Ipaddr_unix.to_inet_addr ipaddr, port)
|
||||
|
||||
let clock = Mtime_clock.elapsed_ns
|
||||
let he_timer_interval = Duration.(to_f (of_ms 100))
|
||||
let he_timer_interval = Duration.(to_f (of_ms 10))
|
||||
|
||||
type state =
|
||||
| In_progress
|
||||
@ -266,13 +266,13 @@ let await_actions t he () =
|
||||
in
|
||||
List.fold_left fold (he, []) user's_actions
|
||||
|
||||
let get_events t he ~prms =
|
||||
let rec get_events t he ~prms actions =
|
||||
match Option.map (handle t) (Option.join (Miou.care prms)) |> Option.join with
|
||||
| Some event ->
|
||||
let he, actions = Happy_eyeballs.event he (clock ()) event in
|
||||
let he, actions' = Happy_eyeballs.event he (clock ()) event in
|
||||
(* NOTE(dinosaure): prioritise event's actions. *)
|
||||
(he, actions)
|
||||
| None -> (he, [])
|
||||
get_events t he ~prms (actions @ actions')
|
||||
| None -> (he, actions)
|
||||
|
||||
exception Timeout
|
||||
|
||||
@ -284,7 +284,7 @@ let with_timeout ~timeout ?(give = []) fn =
|
||||
Miou.await_first [ Miou.call_cc timeout; Miou.call_cc ~give fn ]
|
||||
|
||||
let suspend t he ~prms =
|
||||
match get_events t he ~prms with
|
||||
match get_events t he ~prms [] with
|
||||
| he, (_ :: _ as actions) -> (he, actions)
|
||||
| he, [] -> (
|
||||
match with_timeout ~timeout:he_timer_interval (await_actions t he) with
|
||||
@ -322,8 +322,8 @@ and go t ~prms he () =
|
||||
Miou.yield ();
|
||||
go t ~prms he ()
|
||||
| _, actions ->
|
||||
let he, actions' = get_events t he ~prms in
|
||||
List.iter (handle_one_action ~prms t) (actions @ actions');
|
||||
let he, actions = get_events t he ~prms actions in
|
||||
List.iter (handle_one_action ~prms t) actions;
|
||||
Miou.yield ();
|
||||
go t ~prms he ()
|
||||
|
||||
@ -416,7 +416,9 @@ let rec read_loop ?(linger = Cstruct.empty) ~id proto fd =
|
||||
let len = Miou_unix.read fd ~off:0 ~len:(Bytes.length buf) buf in
|
||||
Log.debug (fun m -> m "got %d byte(s) from the resolver" len);
|
||||
if len > 0 then process (Cstruct.of_bytes ~off:0 ~len buf)
|
||||
else failwith "End of file reading from resolver"
|
||||
else
|
||||
Fmt.failwith "End of file reading from resolver (linger: %d byte(s))"
|
||||
(Cstruct.length linger)
|
||||
|
||||
external happy_translate_so_type : int -> Unix.socket_type
|
||||
= "happy_translate_so_type"
|
||||
|
||||
@ -232,34 +232,28 @@ let alpn_protocol = function
|
||||
None
|
||||
| None -> None)
|
||||
|
||||
let connect ?port ?tls_config host =
|
||||
let connect ?port ?tls_config ~happy_eyeballs host =
|
||||
let port =
|
||||
match (port, tls_config) with
|
||||
| None, None -> 80
|
||||
| None, Some _ -> 443
|
||||
| Some port, _ -> port
|
||||
in
|
||||
match Unix.gethostbyname host with
|
||||
| { Unix.h_addr_list; _ } when Array.length h_addr_list > 0 ->
|
||||
let inet_addr = h_addr_list.(0) in
|
||||
let fd =
|
||||
if Unix.is_inet6_addr inet_addr then Miou_unix.tcpv6 ()
|
||||
else Miou_unix.tcpv4 ()
|
||||
in
|
||||
let sockaddr = Unix.ADDR_INET (inet_addr, port) in
|
||||
begin
|
||||
match (Miou_unix.connect fd sockaddr, tls_config) with
|
||||
| exception _ -> error_msgf "Impossible to connect to %s" host
|
||||
| (), None -> Ok (`Tcp fd)
|
||||
| (), Some tls_config ->
|
||||
let ( >>= ) = Result.bind in
|
||||
Http_miou_unix.to_tls tls_config fd
|
||||
|> Result.map_error (fun err -> `Tls err)
|
||||
>>= fun socket -> Ok (`Tls socket)
|
||||
end
|
||||
| _ -> error_msgf "%s not found" host
|
||||
match (Happy.connect_endpoint happy_eyeballs host [ port ], tls_config) with
|
||||
| Ok ((ipaddr, port), file_descr), None ->
|
||||
Log.debug (fun m -> m "connect to %a:%d" Ipaddr.pp ipaddr port);
|
||||
Ok (`Tcp file_descr)
|
||||
| Ok ((ipaddr, port), file_descr), Some tls_config ->
|
||||
Log.debug (fun m ->
|
||||
m "connect to %a:%d and start to upgrade to tls" Ipaddr.pp ipaddr port);
|
||||
let ( >>= ) = Result.bind in
|
||||
Http_miou_unix.to_tls tls_config file_descr
|
||||
|> Result.map_error (fun err -> `Tls err)
|
||||
>>= fun file_descr -> Ok (`Tls file_descr)
|
||||
| (Error _ as err), _ -> err
|
||||
|
||||
let single_request ?http_config tls_config ~meth ~headers ?body uri f acc =
|
||||
let single_request ~happy_eyeballs ?http_config tls_config ~meth ~headers ?body
|
||||
uri f acc =
|
||||
let ( let* ) = Result.bind in
|
||||
let ( let+ ) x f = Result.map f x in
|
||||
let* tls, scheme, user_pass, host, port, path = decode_uri uri in
|
||||
@ -276,7 +270,7 @@ let single_request ?http_config tls_config ~meth ~headers ?body uri f acc =
|
||||
| `Default cfg, _ -> Some cfg
|
||||
else Ok None
|
||||
in
|
||||
let* flow = connect ?port ?tls_config host in
|
||||
let* flow = connect ?port ?tls_config ~happy_eyeballs host in
|
||||
match (alpn_protocol flow, http_config) with
|
||||
| (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) ->
|
||||
single_http_1_1_request ~config flow user_pass host meth path headers body
|
||||
@ -306,7 +300,8 @@ let resolve_location ~uri ~location =
|
||||
| _ -> error_msgf "Unknown location (relative path): %S" location
|
||||
|
||||
let request ?config ?tls_config ?authenticator ?(meth = `GET) ?(headers = [])
|
||||
?body ?(max_redirect = 5) ?(follow_redirect = true) ~f ~uri acc =
|
||||
?body ?(max_redirect = 5) ?(follow_redirect = true) ~resolver:happy_eyeballs
|
||||
~f ~uri acc =
|
||||
let tls_config =
|
||||
match tls_config with
|
||||
| Some cfg -> Ok (`Custom cfg)
|
||||
@ -333,14 +328,16 @@ let request ?config ?tls_config ?authenticator ?(meth = `GET) ?(headers = [])
|
||||
| None -> None
|
||||
in
|
||||
if not follow_redirect then
|
||||
single_request ?http_config tls_config ~meth ~headers ?body uri f acc
|
||||
single_request ~happy_eyeballs ?http_config tls_config ~meth ~headers ?body
|
||||
uri f acc
|
||||
else
|
||||
let ( >>= ) = Result.bind in
|
||||
let rec follow_redirect count uri =
|
||||
if count = 0 then Error (`Msg "Redirect limit exceeded")
|
||||
else
|
||||
match
|
||||
single_request ?http_config tls_config ~meth ~headers ?body uri f acc
|
||||
single_request ~happy_eyeballs ?http_config tls_config ~meth ~headers
|
||||
?body uri f acc
|
||||
with
|
||||
| Error _ as err -> err
|
||||
| Ok (resp, body) ->
|
||||
|
||||
@ -44,6 +44,7 @@ val request :
|
||||
-> ?body:string
|
||||
-> ?max_redirect:int
|
||||
-> ?follow_redirect:bool
|
||||
-> resolver:Happy.stack
|
||||
-> f:(response -> 'a -> string -> 'a)
|
||||
-> uri:string
|
||||
-> 'a
|
||||
|
||||
@ -53,21 +53,24 @@ let test00 =
|
||||
Httpcats.Server.string ~headers ~status:`OK body
|
||||
in
|
||||
let stop, prm = server ~port:4000 handler in
|
||||
let daemon, resolver = Happy.stack () in
|
||||
match
|
||||
Httpcats.request
|
||||
Httpcats.request ~resolver
|
||||
~f:(fun _resp buf str ->
|
||||
Buffer.add_string buf str;
|
||||
buf)
|
||||
~uri:"http://localhost:4000/" (Buffer.create 0x10)
|
||||
~uri:"http://127.0.0.1:4000/" (Buffer.create 0x10)
|
||||
with
|
||||
| Ok (_response, buf) ->
|
||||
Alcotest.(check string)
|
||||
"Hello World!" (Buffer.contents buf) "Hello World!";
|
||||
Atomic.set stop true;
|
||||
Miou.await_exn prm
|
||||
Miou.await_exn prm;
|
||||
Happy.kill daemon
|
||||
| Error err ->
|
||||
Atomic.set stop true;
|
||||
Miou.await_exn prm;
|
||||
Happy.kill daemon;
|
||||
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
|
||||
|
||||
let generate g len =
|
||||
@ -105,24 +108,26 @@ let test01 =
|
||||
go max
|
||||
in
|
||||
let stop, prm = server ~port:4000 handler in
|
||||
let daemon, resolver = Happy.stack () in
|
||||
match
|
||||
Httpcats.request
|
||||
Httpcats.request ~resolver
|
||||
~f:(fun _resp buf str ->
|
||||
Buffer.add_string buf str;
|
||||
buf)
|
||||
~uri:"http://localhost:4000" (Buffer.create 0x1000)
|
||||
~uri:"http://127.0.0.1:4000" (Buffer.create 0x1000)
|
||||
with
|
||||
| Ok (_response, buf) ->
|
||||
Alcotest.(check string) "random" (generate g1 max) (Buffer.contents buf);
|
||||
Atomic.set stop true;
|
||||
Miou.await_exn prm
|
||||
Miou.await_exn prm;
|
||||
Happy.kill daemon
|
||||
| Error err ->
|
||||
Atomic.set stop true;
|
||||
Miou.await_exn prm;
|
||||
Happy.kill daemon;
|
||||
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
|
||||
|
||||
let () =
|
||||
let stdout = Alcotest_engine.Global.make_stdout () in
|
||||
let stderr = Alcotest_engine.Global.make_stderr () in
|
||||
Alcotest.run ~stdout ~stderr "network"
|
||||
[ ("simple", [ test00; test01 ]) ]
|
||||
Alcotest.run ~stdout ~stderr "network" [ ("simple", [ test00; test01 ]) ]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user