First commit

This commit is contained in:
Calascibetta Romain 2023-10-03 12:15:25 +02:00
commit 228a334df2
No known key found for this signature in database
GPG Key ID: 8CC4DC3365A666B0
19 changed files with 1940 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
_build
*.native
*.byte
*.so
*.tar.gz
*.merlin
*.install

1
.ocamlformat Normal file
View File

@ -0,0 +1 @@
exp-grouping=preserve

31
LICENSE.md Normal file
View File

@ -0,0 +1,31 @@
Copyright (c) 2016, Inhabited Type LLC
Copyright (c) 2023, Robur
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

28
README.md Normal file
View File

@ -0,0 +1,28 @@
# A simple HTTP client (http/1.1 & h2) with [Miou][miou]
```ocaml
let () =
Miou_unix.run @@ fun () ->
let daemon, resolver = Happy.stack () in (* happy-eyeballs *)
let dns = Dns_miou.create resolve in
Happy.inject_resolver ~getaddrinfo:(getaddrinfo dns) resolver; (* dns x happy-eyeballs *)
let acc = Buffer.create 0x100 in
let f buf str = Buffer.add_string buf str; buf in
begin match Httpcats.request ~resolver ~f ~uri:"https://blog.osau.re/" acc with
| Ok (_response, buf) ->
Format.printf "%s%!" (Buffer.contents buf)
| Error err ->
Format.eprintf "Got an error: %a\n%!" Httpcats.pp_error err
end;
Happy.kill daemon
```
**NOTE**: it requires the upstream version of `miou`!
- [ ] Fix the issue between HTTP/1.1 and TLS (and close-notify)
- [ ] Implement some tests
- [ ] Documentation (.ml & .mli)
- [ ] DNS resolution over UDP
- [ ] DNS over TLS
[miou]: https://github.com/robur-coop/miou

5
app/dune Normal file
View File

@ -0,0 +1,5 @@
(executable
(name hurle)
(modules hurle)
(libraries logs.fmt fmt.tty logs.threaded mirage-crypto-rng.unix httpcats
httpcats.happy hxd.string hxd.core))

74
app/hurle.ml Normal file
View File

@ -0,0 +1,74 @@
let anchor = Unix.gettimeofday ()
let sigpipe = 13
let () = Sys.set_signal sigpipe Sys.Signal_ignore
let reporter ppf =
let report src level ~over k msgf =
let k _ =
over ();
k ()
in
let with_metadata header _tags k ppf fmt =
Format.kfprintf k ppf
("[%a]%a[%a][%a]: " ^^ fmt ^^ "\n%!")
Fmt.(styled `Cyan (fmt "%.04f"))
(Unix.gettimeofday () -. anchor)
Logs_fmt.pp_header (level, header)
Fmt.(styled `Blue int)
(Stdlib.Domain.self () :> int)
Fmt.(styled `Magenta string)
(Logs.Src.name src)
in
msgf @@ fun ?header ?tags fmt -> with_metadata header tags k ppf fmt
in
{ Logs.report }
let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()
let () = Logs.set_reporter (reporter Fmt.stderr)
let () = Logs.set_level ~all:true (Some Logs.Debug)
let () = Logs_threaded.enable ()
let () = Printexc.record_backtrace true
let out = Mutex.create ()
let pr fmt =
let finally () = Mutex.unlock out in
Mutex.lock out;
Fun.protect ~finally @@ fun () -> Format.printf fmt
let epr fmt =
let finally () = Mutex.unlock out in
Mutex.lock out;
Fun.protect ~finally @@ fun () -> Format.eprintf fmt
let getaddrinfo dns =
{
Happy.getaddrinfo =
(fun record host -> Dns_miou.getaddrinfo dns record host);
}
let () = Mirage_crypto_rng_unix.initialize (module Mirage_crypto_rng.Fortuna)
let () =
Miou_unix.run @@ fun () ->
match Sys.argv with
| [| _; uri |] ->
let daemon, resolver = Happy.stack () in
let dns = Dns_miou.create resolver in
Happy.inject_resolver ~getaddrinfo:(getaddrinfo dns) resolver;
let acc = Buffer.create 0x100 in
let f buf str =
Buffer.add_string buf str;
buf
in
let prm =
Miou.call @@ fun () ->
match Httpcats.request ~resolver ~f ~uri acc with
| Ok (_response, buf) ->
pr "@[<hov>%a@]\n%!"
(Hxd_string.pp Hxd.default)
(Buffer.contents buf)
| Error err -> epr "Got an error: %a\n%!" Httpcats.pp_error err
in
Miou.await_exn prm;
Happy.kill daemon
| _ -> epr "%s <uri>\n%!" Sys.argv.(0)

2
dune-project Normal file
View File

@ -0,0 +1,2 @@
(lang dune 2.0)
(name http-client)

28
httpcats.opam Normal file
View File

@ -0,0 +1,28 @@
opam-version: "2.0"
maintainer: "Robur <team@robur.coop>"
authors: ["Robur <team@robur.coop>"]
homepage: "https://github.com/robur-coop/httpcats"
dev-repo: "git+https://github.com/robur-coop/httpcats.git"
bug-reports: "https://github.com/robur-coop/httpcats/issues"
license: "BSD-3-clause"
depends: [
"ocaml" {>= "5.0.0"}
"dune" {>= "2.0.0"}
"logs"
"miou"
"httpaf" {>= "0.7.0"}
"tls" {>= "0.16.0"}
"fmt"
"h2" {>= "0.10.0"}
]
conflicts: [ "result" {< "1.5"} ]
build: [
["dune" "subst"] {dev}
["dune" "build" "-p" name "-j" jobs]
]
synopsis: "A simple HTTP client using http/af, h2, and miou"
pin-depends: [
"miou.dev" "git+https://github.com/robur-coop/miou.git#55ff54ddfd585f52fcda4d56caf2458760a7f949"
]

1
src/dns_miou.ml Normal file
View File

@ -0,0 +1 @@
include Dns_client.Make (Happy)

17
src/dune Normal file
View File

@ -0,0 +1,17 @@
(library
(name httpcats)
(public_name httpcats)
(modules flow http_miou_unix httpcats tls_miou)
(libraries ca-certs httpaf h2 httpcats.happy tls miou miou.unix))
(library
(name happy)
(public_name httpcats.happy)
(modules happy dns_miou)
(foreign_stubs
(language c)
(names happy)
(flags (:standard)))
(wrapped false)
(libraries mtime.clock.os ipaddr.unix mirage-crypto-rng happy-eyeballs dns
dns-client miou miou.unix))

15
src/flow.ml Normal file
View File

@ -0,0 +1,15 @@
module type S = sig
type t
type error
val pp_error : error Fmt.t
val read :
?read_buffer_size:int ->
t ->
([ `Data of Cstruct.t | `End_of_input ], error) result
val writev : t -> Cstruct.t list -> (unit, error) result
val close : t -> unit
val shutdown : t -> [ `Recv | `Send ] -> unit
end

19
src/happy.c Normal file
View File

@ -0,0 +1,19 @@
#include <caml/memory.h>
#include <caml/fail.h>
extern int caml_unix_socket_type_table[];
CAMLprim value
happy_translate_so_type(value so_type) {
int n = Int_val (so_type);
int r = -1;
for (int i = 0; i < 4; i++)
if (caml_unix_socket_type_table[i] == n)
r = i;
if (r == -1)
caml_invalid_argument("invalid type of socket");
return Val_int (r);
}

493
src/happy.ml Normal file
View File

@ -0,0 +1,493 @@
let src = Logs.Src.create "happy"
module Log = (val Logs.src_log src : Logs.LOG)
let error_msgf fmt = Format.kasprintf (fun msg -> Error (`Msg msg)) fmt
let pp_sockaddr ppf = function
| Unix.ADDR_UNIX str -> Format.fprintf ppf "<%s>" str
| Unix.ADDR_INET (inet_addr, port) ->
Format.fprintf ppf "%s:%u" (Unix.string_of_inet_addr inet_addr) port
let to_sockaddr (ipaddr, port) =
Unix.ADDR_INET (Ipaddr_unix.to_inet_addr ipaddr, port)
let clock = Mtime_clock.elapsed_ns
let he_timer_interval = Duration.(to_f (of_ms 10))
type state =
| In_progress
| Connected of (Ipaddr.t * int) * Unix.file_descr
| Failed of string
type entry = Happy_eyeballs.id * attempt * [ `host ] Domain_name.t * addr
and attempt = int
and addr = Ipaddr.t * int
and cancel = attempt * [ `Connection of Miou_unix.file_descr ] Miou.t
and action =
[ `Connect_ip of state Atomic.t * addr list
| `Connect of state Atomic.t * [ `host ] Domain_name.t * int list ]
and value =
[ `Connection of Miou_unix.file_descr
| `Resolution_v4 of
[ `host ] Domain_name.t * (Ipaddr.V4.Set.t, [ `Msg of string ]) result
| `Resolution_v6 of
[ `host ] Domain_name.t * (Ipaddr.V6.Set.t, [ `Msg of string ]) result ]
and getaddrinfo = {
getaddrinfo :
'response 'a.
'response Dns.Rr_map.key ->
'a Domain_name.t ->
('response, [ `Msg of string ]) result;
}
[@@unboxed]
let dummy =
let getaddrinfo _ _ = Error (`Msg "Not implemented") in
{ getaddrinfo }
type stack = {
mutable cancel_connecting : cancel list Happy_eyeballs.Waiter_map.t;
mutable waiters : state Atomic.t Happy_eyeballs.Waiter_map.t;
condition : Miou_unix.Cond.t;
queue : action Miou.Queue.t;
connections : (Miou.Promise.Uid.t, entry) Hashtbl.t;
mutable getaddrinfo : getaddrinfo;
}
let create_stack () =
{
cancel_connecting = Happy_eyeballs.Waiter_map.empty;
waiters = Happy_eyeballs.Waiter_map.empty;
condition = Miou_unix.Cond.make ();
queue = Miou.Queue.create ();
connections = Hashtbl.create 0x100;
getaddrinfo = dummy;
}
let try_connect addr () =
let addr = to_sockaddr addr in
let socket =
match Unix.domain_of_sockaddr addr with
| Unix.PF_UNIX -> Fmt.invalid_arg "Invalid address: %a" pp_sockaddr addr
| Unix.PF_INET -> Miou_unix.tcpv4 ()
| Unix.PF_INET6 -> Miou_unix.tcpv6 ()
in
try
Log.debug (fun m -> m "connect to %a" pp_sockaddr addr);
Miou_unix.connect socket addr;
`Connection (Miou_unix.transfer socket)
with Unix.Unix_error (err, _, _) ->
Fmt.failwith "error connecting to nameserver %a: %s" pp_sockaddr addr
(Unix.error_message err)
let disown fd =
Miou_unix.disown fd;
Miou_unix.to_file_descr fd
let getpeername fd = try Some (Unix.getpeername fd) with _exn -> None
external to_cancel :
value Miou.t -> [ `Connection of Miou_unix.file_descr ] Miou.t = "%identity"
let connect t ~prms:orphans host id attempt addr =
let connection : value Miou.t = Miou.call ~orphans (try_connect addr) in
Hashtbl.add t.connections
(Miou.Promise.uid connection)
(id, attempt, host, addr);
let entry = ((attempt, to_cancel connection) :> cancel) in
t.cancel_connecting <-
Happy_eyeballs.Waiter_map.update id
(function None -> Some [ entry ] | Some cs -> Some (entry :: cs))
t.cancel_connecting
let handle_one_action t ~prms = function
| Happy_eyeballs.Connect (host, id, attempt, addr) ->
connect t ~prms host id attempt addr
| Happy_eyeballs.Connect_failed (host, id, reason) ->
Log.warn (fun m ->
m "connection to %a failed: %s" Domain_name.pp host reason);
let cancel_connecting, others =
Happy_eyeballs.Waiter_map.find_and_remove id t.cancel_connecting
in
t.cancel_connecting <- cancel_connecting;
List.iter
(fun (_, prm) -> Miou.cancel prm)
(Option.value ~default:[] others);
(* clean waiter *)
let waiters, waiter =
Happy_eyeballs.Waiter_map.find_and_remove id t.waiters
in
t.waiters <- waiters;
let msg =
Fmt.str "Connection to %a failed: %s" Domain_name.pp host reason
in
let transition waiter =
let set = Atomic.compare_and_set waiter In_progress (Failed msg) in
if not set then begin
match Atomic.get waiter with
| Connected (_, fd) ->
let sockaddr = getpeername fd in
let fd = Miou_unix.of_file_descr fd in
Log.warn (fun m ->
m "close the file-descriptor of %a (%a): %s" Domain_name.pp
host
Fmt.(option ~none:(const string "<none>") pp_sockaddr)
sockaddr reason);
Miou_unix.close fd;
Atomic.set waiter (Failed msg)
| In_progress -> Atomic.set waiter (Failed msg)
| Failed _ -> ()
end
in
Option.iter transition waiter
| Happy_eyeballs.Resolve_a host ->
let _ =
Miou.call_cc ~orphans:prms @@ fun () ->
match t.getaddrinfo.getaddrinfo Dns.Rr_map.A host with
| Ok (_ttl, res) -> `Resolution_v4 (host, Ok res)
| Error _ as err -> `Resolution_v4 (host, err)
in
()
| Happy_eyeballs.Resolve_aaaa host ->
let _ =
Miou.call_cc ~orphans:prms @@ fun () ->
match t.getaddrinfo.getaddrinfo Dns.Rr_map.Aaaa host with
| Ok (_ttl, res) -> `Resolution_v6 (host, Ok res)
| Error _ as err -> `Resolution_v6 (host, err)
in
()
let handle t prm =
let entry = Hashtbl.find_opt t.connections (Miou.Promise.uid prm) in
match (Miou.await prm, entry) with
| Error exn, Some (id, attempt, host, addr) ->
Hashtbl.remove t.connections (Miou.Promise.uid prm);
let msg =
match exn with
| Invalid_argument str | Failure str -> str
| Miou.Cancelled -> "cancelled"
| exn -> Printexc.to_string exn
in
let fold = function
| None -> None
| Some cs -> (
match List.filter (fun (att, _) -> not (att = attempt)) cs with
| [] -> None
| cs -> Some cs)
in
t.cancel_connecting <-
Happy_eyeballs.Waiter_map.update id fold t.cancel_connecting;
Some (Happy_eyeballs.Connection_failed (host, id, addr, msg))
| Ok (`Connection fd), Some (id, attempt, host, addr) ->
let cancel_connecting, others =
Happy_eyeballs.Waiter_map.find_and_remove id t.cancel_connecting
in
t.cancel_connecting <- cancel_connecting;
List.iter
(fun (att, prm) -> if att <> attempt then Miou.cancel prm)
(Option.value ~default:[] others);
let waiters, waiter =
Happy_eyeballs.Waiter_map.find_and_remove id t.waiters
in
t.waiters <- waiters;
let transition waiter =
Log.debug (fun m -> m "disown the file-descr of %a" Domain_name.pp host);
let fd = disown fd in
let connected = Connected (addr, fd) in
let set = Atomic.compare_and_set waiter In_progress connected in
Log.debug (fun m ->
m "transfer the file-descriptor for %a" Domain_name.pp host);
if not set then (
let fd = Miou_unix.of_file_descr fd in
Log.warn (fun m ->
m "close the file-descriptor of %a" Domain_name.pp host);
Miou_unix.close fd)
in
Option.iter transition waiter;
Some (Happy_eyeballs.Connected (host, id, addr))
| Ok (`Connection fd), None ->
Miou_unix.close fd;
None
| Ok (`Resolution_v4 (host, Ok ips)), _ ->
Log.debug (fun m -> m "%a resolved" Domain_name.pp host);
Some (Happy_eyeballs.Resolved_a (host, ips))
| Ok (`Resolution_v4 (host, Error (`Msg msg))), _ ->
Log.warn (fun m ->
m "impossible to resolve %a: %s" Domain_name.pp host msg);
Some (Happy_eyeballs.Resolved_a_failed (host, msg))
| Ok (`Resolution_v6 (host, Ok ips)), _ ->
Log.debug (fun m -> m "%a resolved" Domain_name.pp host);
Some (Happy_eyeballs.Resolved_aaaa (host, ips))
| Ok (`Resolution_v6 (host, Error (`Msg msg))), _ ->
Log.warn (fun m ->
m "impossible to resolve %a: %s" Domain_name.pp host msg);
Some (Happy_eyeballs.Resolved_aaaa_failed (host, msg))
| Error exn, None ->
Log.err (fun m ->
m "got an unexpected error from a promise: %S"
(Printexc.to_string exn));
None
let await_actions t he () =
Log.debug (fun m -> m "wait for user's actions");
let user's_actions =
Miou_unix.Cond.until
~predicate:(fun () ->
Log.debug (fun m ->
m "got an user's action? %b"
(Fun.negate Miou.Queue.is_empty t.queue));
Miou.Queue.is_empty t.queue)
~fn:(fun () -> Miou.Queue.(to_list (transfer t.queue)))
t.condition
in
Log.debug (fun m -> m "got %d action(s)" (List.length user's_actions));
let fold (he, actions) = function
| `Connect_ip (waiter, addrs) ->
let waiters, id = Happy_eyeballs.Waiter_map.register waiter t.waiters in
t.waiters <- waiters;
let he, actions' = Happy_eyeballs.connect_ip he (clock ()) ~id addrs in
Log.debug (fun m ->
m "+%d action(s) for connect-ip" (List.length actions'));
(he, actions @ actions')
| `Connect (waiter, host, ports) ->
let waiters, id = Happy_eyeballs.Waiter_map.register waiter t.waiters in
t.waiters <- waiters;
let he, actions' =
Happy_eyeballs.connect he (clock ()) ~id host ports
in
Log.debug (fun m ->
m "+%d action(s) for connect" (List.length actions'));
(he, actions @ actions')
in
List.fold_left fold (he, []) user's_actions
let get_events t he ~prms =
match Option.map (handle t) (Option.join (Miou.care prms)) |> Option.join with
| Some event ->
let he, actions = Happy_eyeballs.event he (clock ()) event in
(* NOTE(dinosaure): prioritise event's actions. *)
(he, actions)
| None -> (he, [])
exception Timeout
let with_timeout ~timeout ?(give = []) fn =
let timeout () =
Log.debug (fun m -> m "wait with a timeout (%fs)" timeout);
Miou_unix.sleep timeout;
Log.debug (fun m -> m "raise timeout");
List.iter Miou.Ownership.disown give;
raise Timeout
in
Miou.await_first [ Miou.call_cc ~give timeout; Miou.call_cc ~give fn ]
let suspend t he ~prms =
match get_events t he ~prms with
| he, (_ :: _ as actions) -> (he, actions)
| he, [] -> (
match with_timeout ~timeout:he_timer_interval (await_actions t he) with
| Error Timeout -> (he, [])
| Error exn ->
Log.err (fun m ->
m "got an unexpected exception: %S" (Printexc.to_string exn));
raise exn
| Ok (he, actions) ->
Log.debug (fun m -> m "return %d action(s)" (List.length actions));
(he, actions))
let rec launch_stack ?aaaa_timeout ?connect_delay ?connect_timeout
?resolve_timeout ?resolve_retries t () =
let prms = Miou.orphans () in
let he =
Happy_eyeballs.create ?aaaa_timeout ?connect_delay ?connect_timeout
?resolve_timeout ?resolve_retries (clock ())
in
Log.debug (fun m -> m "the daemon is launched");
Miou.call (go t ~prms he)
and go t ~prms he () =
let he, cont, actions =
if Miou.Queue.is_empty t.queue then Happy_eyeballs.timer he (clock ())
else (he, `Suspend, [])
in
match (cont, actions) with
| `Suspend, [] ->
Log.debug (fun m -> m "the daemon is suspended to a new event");
let he, actions = suspend t he ~prms in
Log.debug (fun m -> m "consume %d action(s)" (List.length actions));
List.iter (handle_one_action ~prms t) actions;
Log.debug (fun m -> m "action(s) launched");
Miou.yield ();
go t ~prms he ()
| _, actions ->
let he, actions' = get_events t he ~prms in
List.iter (handle_one_action ~prms t) (actions @ actions');
Miou.yield ();
go t ~prms he ()
let connect_ip t ips =
let waiter = Atomic.make In_progress in
Miou.Queue.enqueue t.queue (`Connect_ip (waiter, ips));
Miou_unix.Cond.signal t.condition;
Log.debug (fun m -> m "the daemon was signaled about another user's action");
waiter
let to_pairs lst =
List.map (fun (`Plaintext (ipaddr, port)) -> (ipaddr, port)) lst
let rec wait value =
match Atomic.get value with
| In_progress ->
Miou.yield ();
wait value
| Connected (addr, fd) -> (addr, fd)
| Failed msg -> failwith msg
let connect_to_nameservers t nameservers =
let nss = to_pairs nameservers in
let waiter = connect_ip t nss in
let addr, fd = Miou.await_exn (Miou.call_cc (fun () -> wait waiter)) in
(addr, Miou_unix.of_file_descr fd)
type daemon = unit Miou.t
let stack ?aaaa_timeout ?connect_delay ?connect_timeout ?resolve_timeout
?resolve_retries () =
let v = create_stack () in
( launch_stack ?aaaa_timeout ?connect_delay ?connect_timeout ?resolve_timeout
?resolve_retries v (),
v )
let inject_resolver ~getaddrinfo stack = stack.getaddrinfo <- getaddrinfo
let kill = Miou.cancel
type +'a io = 'a
type io_addr = [ `Plaintext of Ipaddr.t * int ]
type t = {
nameservers : io_addr list;
proto : Dns.proto;
timeout : float;
stack : stack;
}
type context = float * Miou_unix.file_descr
let nameservers { nameservers; proto; _ } = (proto, nameservers)
let bind x f = f x
let lift = Fun.id
let rng = Mirage_crypto_rng.generate ?g:None
let connect t =
try
Log.debug (fun m -> m "connect to nameservers");
let _addr, fd = connect_to_nameservers t.stack t.nameservers in
Ok (`Tcp, (t.timeout, fd))
with Failure msg -> Error (`Msg msg)
let rec read_loop ?(linger = Cstruct.empty) ~id proto fd =
let process rx =
let rec handle_data ({ Cstruct.len = rx_len; _ } as rx) =
if rx_len > 2 then
let len = Cstruct.BE.get_uint16 rx 0 in
if rx_len - 2 >= len then
let packet, rest =
if rx_len - 2 = len then (rx, Cstruct.empty)
else Cstruct.split rx (len + 2)
in
let id' = Cstruct.BE.get_uint16 packet 2 in
if id = id' then packet else handle_data rest
else read_loop ~linger:rx ~id proto fd
else read_loop ~linger:rx ~id proto fd
in
let rx =
if Cstruct.length linger = 0 then rx else Cstruct.append linger rx
in
handle_data rx
in
match proto with
| `Tcp ->
let buf = Bytes.create 0x10 in
let len = Miou_unix.read fd ~off:0 ~len:(Bytes.length buf) buf in
Log.debug (fun m -> m "got %d byte(s) from the resolver" len);
if len > 0 then process (Cstruct.of_bytes ~off:0 ~len buf)
else failwith "End of file reading from resolver"
external happy_translate_so_type : int -> Unix.socket_type
= "happy_translate_so_type"
let type_of_socket fd =
let fd = Miou_unix.to_file_descr fd in
let ty = Unix.getsockopt_int fd Unix.SO_TYPE in
happy_translate_so_type ty
let send_recv (timeout, fd) ({ Cstruct.len; _ } as tx) =
if len > 4 then begin
match type_of_socket fd with
| Unix.SOCK_STREAM -> (
let fn () =
Log.debug (fun m -> m "send a packet to resolver");
Miou_unix.write fd ~off:0 ~len (Cstruct.to_string tx);
let id = Cstruct.BE.get_uint16 tx 2 in
Log.debug (fun m -> m "recv a packet from resolver");
let packet = read_loop ~id `Tcp fd in
(packet, Miou_unix.transfer fd)
in
Miou_unix.disown fd;
match with_timeout ~timeout ~give:[ Miou_unix.owner fd ] fn with
| Ok (packet, _) ->
Log.debug (fun m -> m "got a DNS packet from the resolver");
Ok packet
| Error Timeout ->
Log.warn (fun m -> m "DNS request timeout");
error_msgf "DNS request timeout"
| Error (Failure msg) ->
Log.warn (fun m -> m "Got a failure: %s" msg);
Error (`Msg msg)
| Error exn ->
error_msgf "Got an unexpected exception: %S"
(Printexc.to_string exn))
| _ -> error_msgf "Invalid type of file-descriptor"
end
else error_msgf "Invalid context (data length <= 4)"
let close (_, fd) = Miou_unix.close fd
let of_ns ns = Int64.to_float ns /. 1_000_000_000.
let create ?nameservers ~timeout stack =
let proto, nameservers =
match nameservers with
| None -> (`Tcp, [ `Plaintext (Ipaddr.of_string_exn "8.8.8.8", 53) ])
| Some (a, nss) -> (a, nss)
in
{ nameservers; proto; timeout = of_ns timeout; stack }
external reraise : exn -> 'a = "%reraise"
let connect_ip t ips =
let waiter = connect_ip t ips in
match Miou.await (Miou.call_cc (fun () -> wait waiter)) with
| Ok (addr, fd) -> Ok (addr, Miou_unix.of_file_descr fd)
| Error (Failure msg) -> Error (`Msg msg)
| Error exn -> reraise exn
let connect_host t host ports =
let waiter = Atomic.make In_progress in
Miou.Queue.enqueue t.queue (`Connect (waiter, host, ports));
Miou_unix.Cond.signal t.condition;
match Miou.await (Miou.call_cc (fun () -> wait waiter)) with
| Ok (addr, fd) -> Ok (addr, Miou_unix.of_file_descr fd)
| Error (Failure msg) -> Error (`Msg msg)
| Error exn -> reraise exn
let connect_endpoint t str ports =
match Ipaddr.of_string str with
| Ok ipaddr -> connect_ip t (List.map (fun port -> (ipaddr, port)) ports)
| Error _ -> (
match Result.bind (Domain_name.of_string str) Domain_name.host with
| Ok domain_name -> connect_host t domain_name ports
| Error _ -> error_msgf "Invalid endpoint: %S" str)

64
src/happy.mli Normal file
View File

@ -0,0 +1,64 @@
include
Dns_client.S
with type io_addr = [ `Plaintext of Ipaddr.t * int ]
and type +'a io = 'a
type daemon
type getaddrinfo = {
getaddrinfo :
'response 'a.
'response Dns.Rr_map.key ->
'a Domain_name.t ->
('response, [ `Msg of string ]) result;
}
[@@unboxed]
val stack :
?aaaa_timeout:int64 ->
?connect_delay:int64 ->
?connect_timeout:int64 ->
?resolve_timeout:int64 ->
?resolve_retries:int ->
unit ->
daemon * stack
val inject_resolver : getaddrinfo:getaddrinfo -> stack -> unit
(** [inject_resolver ~getaddrinfo stack] injects a DNS resolver into the given
{i happy-eyeballs} [stack]. Initially, the {i happy-eyeballs} stack (created
by {!val:stack}) can not resolve domain-name. When the user is able to
resolve a domain-name (via the DNS protocol for example), he/she can
{i inject} its resolver into the {i happy-eyeballs} stack.
Only after injection the user can use {!val:connect_host} &
{!val:connect_endpoint}. *)
val kill : daemon -> unit
(** [kill daemon] cleans resources required by our happy-eyeballs implementation.
The user {b must} call at the end of its application this function. *)
val connect_ip :
stack ->
(Ipaddr.t * int) list ->
((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
(** [connect_ip t addresses] establishes a connection to [addresses]. *)
val connect_host :
stack ->
[ `host ] Domain_name.t ->
int list ->
((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
(** [connect_host t host ports] establishes a connection to [host] on [ports]
(tried in sequence).
@raise Failure if [ports] is empty. *)
val connect_endpoint :
stack ->
string ->
int list ->
((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
(** [connect_endpoint t host ports] establishes a connection to [host] on
[ports], which may be a host name or an IP address.
@raise Failure if [ports] is the empty list. *)

576
src/http_miou_unix.ml Normal file
View File

@ -0,0 +1,576 @@
(*----------------------------------------------------------------------------
Copyright (c) 2018 Inhabited Type LLC.
Copyright (c) 2018 Anton Bachin
Copyright (c) 2023 Robur
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)
module type RUNTIME = sig
type t
val next_read_operation : t -> [ `Read | `Yield | `Close ]
val read : t -> Bigstringaf.t -> off:int -> len:int -> int
val read_eof : t -> Bigstringaf.t -> off:int -> len:int -> int
val yield_reader : t -> (unit -> unit) -> unit
val next_write_operation :
t -> [ `Write of Bigstringaf.t Faraday.iovec list | `Close of int | `Yield ]
val report_write_result : t -> [ `Ok of int | `Closed ] -> unit
val yield_writer : t -> (unit -> unit) -> unit
val report_exn : t -> exn -> unit
end
module Buffer : sig
type t
val create : int -> t
val get : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
val put : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
end = struct
type t = {
mutable buffer : Bigstringaf.t;
mutable off : int;
mutable len : int;
}
let create size =
let buffer = Bigstringaf.create size in
{ buffer; off = 0; len = 0 }
let compress t =
if t.len = 0 then begin
t.off <- 0;
t.len <- 0
end
else if t.off > 0 then begin
Bigstringaf.blit t.buffer ~src_off:t.off t.buffer ~dst_off:0 ~len:t.len;
t.off <- 0
end
let get t ~f =
let n = f t.buffer ~off:t.off ~len:t.len in
t.off <- t.off + n;
t.len <- t.len - n;
if t.len = 0 then t.off <- 0;
n
let put t ~f =
compress t;
let off = t.off + t.len in
let buf = t.buffer in
if Bigstringaf.length buf = t.len then begin
t.buffer <- Bigstringaf.create (2 * Bigstringaf.length buf);
Bigstringaf.blit buf ~src_off:t.off t.buffer ~dst_off:0 ~len:t.len
end;
let n = f t.buffer ~off ~len:(Bigstringaf.length t.buffer - off) in
t.len <- t.len + n;
n
end
let src = Logs.Src.create "http-miou-unix"
module Log = (val Logs.src_log src : Logs.LOG)
let catch ~on fn =
try fn ()
with exn ->
Log.err (fun m ->
m "Got an unexpected exception: %S" (Printexc.to_string exn));
on exn
exception Flow of string
module type S = sig
type conn
type flow
type protect = {
protect : 'a 'b. orphans:unit Miou.orphans -> ('a -> 'b) -> 'a -> 'b;
}
[@@unboxed]
val run :
conn ->
?give:Miou.Ownership.t list ->
?disown:(flow -> unit) ->
read_buffer_size:int ->
flow ->
protect * unit Miou.t
end
let rec terminate orphans =
match Miou.care orphans with
| None -> ()
| Some None ->
Miou.yield ();
terminate orphans
| Some (Some prm) ->
Miou.await_exn prm;
terminate orphans
module Make (Flow : Flow.S) (Runtime : RUNTIME) :
S with type conn = Runtime.t and type flow = Flow.t = struct
type conn = Runtime.t
type flow = Flow.t
let recv flow buffer =
let bytes_read =
Buffer.put buffer ~f:(fun bstr ~off:dst_off ~len ->
match Flow.read ~read_buffer_size:len flow with
| Ok (`Data { Cstruct.buffer; off = src_off; len }) ->
Bigstringaf.blit buffer ~src_off bstr ~dst_off ~len;
len
| Ok `End_of_input -> 0
| Error err ->
Log.debug (fun m -> m "close the socket (recv)");
Flow.close flow;
raise (Flow (Fmt.str "%a" Flow.pp_error err)))
in
if bytes_read = 0 then `Eof else `Ok bytes_read
let writev flow bstrs =
let copy { Faraday.buffer; off; len } = Bigstringaf.copy buffer ~off ~len in
let css = List.map copy bstrs |> List.map Cstruct.of_bigarray in
match Flow.writev flow css with
| Ok () ->
let len = List.fold_left (fun a { Cstruct.len; _ } -> a + len) 0 css in
`Ok len
| Error err ->
Log.err (fun m -> m "got an error: %a" Flow.pp_error err);
Log.debug (fun m -> m "close the socket (writev)");
Flow.close flow;
`Closed
type _ Effect.t += Spawn : (unit -> unit) -> unit Effect.t
let launch ?give ?orphans fn k =
let _prm = Miou.call_cc ?orphans ?give fn in
Effect.Deep.continue k ()
(* Protected runtime operations.
*)
let protect ?give ?orphans fn v =
let retc = Fun.id in
let exnc = raise in
let open Effect.Deep in
let effc : type c. c Effect.t -> ((c, 'b) continuation -> 'b) option =
function
| Spawn fn -> Some (launch ?give ?orphans fn)
| _ -> None
in
match_with fn v { retc; exnc; effc }
let next_read_operation ?give ?orphans =
protect ?give ?orphans Runtime.next_read_operation
let next_write_operation ?give ?orphans =
protect ?give ?orphans Runtime.next_write_operation
let read ?give ?orphans conn bstr ~off ~len =
protect ?give ?orphans (Runtime.read conn ~off ~len) bstr
let read_eof ?give ?orphans conn bstr ~off ~len =
protect ?give ?orphans (Runtime.read_eof conn ~off ~len) bstr
let report_exn ?give ?orphans conn exn =
Log.err (fun m -> m "report an exception: %S" (Printexc.to_string exn));
protect ?give ?orphans (Runtime.report_exn conn) exn
let report_write_result ?give ?orphans conn =
protect ?give ?orphans (Runtime.report_write_result conn)
let yield_reader ?give ?orphans conn =
protect ?give ?orphans (Runtime.yield_reader conn)
let yield_writer ?give ?orphans conn =
protect ?give ?orphans (Runtime.yield_writer conn)
type protect = {
protect : 'a 'b. orphans:unit Miou.orphans -> ('a -> 'b) -> 'a -> 'b;
}
[@@unboxed]
let run conn ?(give = []) ?(disown = Fun.const ()) ~read_buffer_size flow =
let buffer = Buffer.create read_buffer_size in
let rec reader () =
let rec go orphans () =
match next_read_operation ~orphans ~give conn with
| `Read -> (
Log.debug (fun m -> m "next read operation: `read");
let read_eof = read_eof ~orphans ~give in
let read = read ~orphans ~give in
match recv flow buffer with
| `Eof ->
Buffer.get buffer ~f:(fun bstr ~off ~len ->
read_eof conn bstr ~off ~len)
|> ignore;
go orphans ()
| `Ok _ ->
Buffer.get buffer ~f:(fun bstr ~off ~len ->
read conn bstr ~off ~len)
|> ignore;
go orphans ())
| `Yield ->
Log.debug (fun m -> m "next read operation: `yield");
let continuation () = Effect.perform (Spawn reader) in
yield_reader conn ~orphans ~give continuation;
disown flow;
terminate orphans
| `Close ->
Log.debug (fun m -> m "read: disown the file-descriptor");
disown flow;
terminate orphans
in
let orphans = Miou.orphans () in
catch ~on:(report_exn conn ~orphans ~give) @@ fun () -> go orphans ()
in
let rec writer () =
let rec go orphans () =
match next_write_operation ~orphans ~give conn with
| `Write iovecs ->
Log.debug (fun m -> m "next write operation: `write");
writev flow iovecs |> report_write_result conn ~orphans ~give;
go orphans ()
| `Yield ->
Log.debug (fun m -> m "next write operation: `yield");
let continuation () = Effect.perform (Spawn writer) in
yield_writer conn ~orphans ~give continuation;
disown flow;
terminate orphans
| `Close _ ->
Log.debug (fun m -> m "next write operation: `close");
(* TODO(dinosaure): something already closed the socket when we use http/1.1 *)
Flow.shutdown flow `Send;
terminate orphans
in
let orphans = Miou.orphans () in
catch ~on:(report_exn conn ~orphans ~give) @@ fun () -> go orphans ()
in
let protect ~orphans = protect ~orphans ~give in
let prm =
Miou.call_cc ~give @@ fun () ->
let p0 = Miou.call_cc ~give reader in
let p1 = Miou.call_cc ~give writer in
let result =
match Miou.await_all [ p0; p1 ] with
| [ Ok (); Ok () ] -> Ok ()
| [ Error exn; _ ] | [ _; Error exn ] -> Error exn
| _ -> assert false
in
Log.debug (fun m -> m "close the file-descriptor");
disown flow;
match result with Ok () -> () | Error exn -> raise exn
in
Log.debug (fun m -> m "the main task is: %a" Miou.Promise.pp prm);
({ protect }, prm)
end
module TCP = struct
type t = Miou_unix.file_descr
type error = Unix.error * string * string
let pp_error ppf (err, f, v) =
Fmt.pf ppf "%s(%s): %s" f v (Unix.error_message err)
let read ?(read_buffer_size = 0x1000) flow =
let buf = Bytes.create read_buffer_size in
match Miou_unix.read flow buf ~off:0 ~len:(Bytes.length buf) with
| 0 -> Ok `End_of_input
| len -> Ok (`Data (Cstruct.of_string (Bytes.sub_string buf 0 len)))
| exception Unix.Unix_error (Unix.ECONNRESET, _, _) -> Ok `End_of_input
| exception Unix.Unix_error (err, f, v) -> Error (err, f, v)
let write flow ({ Cstruct.len; _ } as cs) =
let str = Cstruct.to_string cs in
try Ok (Miou_unix.write flow str ~off:0 ~len)
with Unix.Unix_error (err, f, v) -> Error (err, f, v)
let writev flow css =
let rec go = function
| [] -> Ok ()
| x :: r -> begin
match write flow x with Ok () -> go r | Error _ as err -> err
end
in
go css
let close = Miou_unix.close
let shutdown flow = function
| `Recv ->
Logs.debug (fun m -> m "shutdown the receiving side");
Miou_unix.shutdown flow Unix.SHUTDOWN_RECEIVE
| `Send ->
Logs.debug (fun m -> m "shutdown the sending side");
Miou_unix.shutdown flow Unix.SHUTDOWN_SEND
end
module TLS = Tls_miou.Make (TCP)
type tls = TLS.t
type tls_error = TLS.error
let pp_tls_error = TLS.pp_error
let to_tls = TLS.client_of_flow
let epoch tls =
match tls.TLS.state with
| `End_of_input | `Error _ -> None
| `Active tls -> (
match Tls.Engine.epoch tls with
| `InitialEpoch -> assert false
| `Epoch data -> Some data)
module Httpaf_Client_connection = struct
include Httpaf.Client_connection
let yield_reader _ = assert false
let next_read_operation t =
(next_read_operation t :> [ `Close | `Read | `Yield ])
end
(* Implementations. *)
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of TLS.t | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of Httpaf.Response.t | `V2 of H2.Response.t ]
type 'body body = {
body : 'body;
write_string : 'body -> ?off:int -> ?len:int -> string -> unit;
close : 'body -> unit;
}
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t body) version
| V2 : (H2.Response.t, H2.Body.Writer.t body) version
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
| `Protocol of string ]
let pp_error ppf = function
| `V1 (`Malformed_response msg) ->
Fmt.pf ppf "Malformed HTTP/1.1 response: %s" msg
| `V1 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V1 (`Exn exn) | `V2 (`Exn exn) ->
Fmt.pf ppf "Got an unexpected exception: %S" (Printexc.to_string exn)
| `V2 (`Malformed_response msg) -> Fmt.pf ppf "Malformed H2 response: %s" msg
| `V2 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V2 (`Protocol_error (err, msg)) ->
Fmt.pf ppf "Protocol error %a: %s" H2.Error_code.pp_hum err msg
| `Protocol msg -> Fmt.string ppf msg
type ('resp, 'acc) await = unit -> ('resp * 'acc, error) result
type 'acc process =
| Process :
('resp, 'body) version * ('resp, 'acc) await * 'body
-> 'acc process
module A = Make (TLS) (Httpaf_Client_connection)
module B = Make (TCP) (Httpaf_Client_connection)
module C = Make (TLS) (H2.Client_connection)
module D = Make (TCP) (H2.Client_connection)
(* NOTE(dinosaure): we avoid first-class module here. *)
let run ~f acc config flow request =
let response : response option ref = ref None
and error = ref None
and acc = ref acc in
let error_handler err =
Log.err (fun m -> m "Got an error: %a" pp_error err);
match err with
| `V1 (`Exn (Flow msg)) | `V2 (`Exn (Flow msg)) ->
error := Some (`Protocol msg)
| err -> error := Some err
in
let response_handler ?(shutdown = Fun.const ()) = function
| `V1 (resp, body) ->
let rec on_eof = shutdown
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f !acc str;
Httpaf.Body.schedule_read body ~on_read ~on_eof
in
response := Some (`V1 resp);
Httpaf.Body.schedule_read body ~on_read ~on_eof
| `V2 (resp, body) ->
let rec on_eof () =
Log.debug (fun m ->
m "shutdown the connection from the application level");
shutdown ()
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f !acc str;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
response := Some (`V2 resp);
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
let give =
match flow with
| `Tls flow -> [ Miou_unix.owner flow.TLS.flow ]
| `Tcp flow -> [ Miou_unix.owner flow ]
in
match (flow, config, request) with
| `Tls flow, `V1 config, `V1 request ->
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let disown flow = Miou_unix.disown flow.TLS.flow in
let response_handler resp body = response_handler (`V1 (resp, body)) in
let error_handler error = error_handler (`V1 error) in
let body, conn =
Httpaf.Client_connection.request ~config request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { A.protect }, prm =
disown flow;
Log.debug (fun m -> m "start an http/1.1 request over TLS");
A.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let write_string body ?off ?len str =
protect ~orphans (Httpaf.Body.write_string body ?off ?len) str
in
let close body = protect ~orphans Httpaf.Body.close_writer body in
let body = { body; write_string; close } in
(orphans, Process (V1, await, body))
| `Tcp flow, `V1 config, `V1 request ->
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let disown = Miou_unix.disown in
let response_handler resp body = response_handler (`V1 (resp, body)) in
let error_handler error = error_handler (`V1 error) in
let body, conn =
Httpaf.Client_connection.request ~config request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { B.protect }, prm =
disown flow;
B.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let write_string body ?off ?len str =
protect ~orphans (Httpaf.Body.write_string body ?off ?len) str
in
let close body = protect ~orphans Httpaf.Body.close_writer body in
let body = { body; write_string; close } in
(orphans, Process (V1, await, body))
| `Tls flow, `V2 config, `V2 request ->
let read_buffer_size = config.H2.Config.read_buffer_size in
let disown flow = Miou_unix.disown flow.TLS.flow in
let error_handler error = error_handler (`V2 error) in
let conn = H2.Client_connection.create ~config ~error_handler () in
let shutdown () = H2.Client_connection.shutdown conn in
let response_handler resp body =
response_handler ~shutdown (`V2 (resp, body))
in
let body =
H2.Client_connection.request conn request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { C.protect }, prm =
disown flow;
Log.debug (fun m -> m "start an h2 request over TLS");
C.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V2 (`Exn exn))
| Ok (), None, Some (`V2 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V1 _) | None) -> assert false
in
let write_string body ?off ?len str =
protect ~orphans (H2.Body.Writer.write_string body ?off ?len) str
in
let close body =
Log.debug (fun m -> m "close the stream from the application level");
protect ~orphans H2.Body.Writer.close body
in
let body = { body; write_string; close } in
(orphans, Process (V2, await, body))
| `Tcp flow, `V2 config, `V2 request ->
let read_buffer_size = config.H2.Config.read_buffer_size in
let disown = Miou_unix.disown in
let error_handler error = error_handler (`V2 error) in
let conn = H2.Client_connection.create ~config ~error_handler () in
let shutdown () = H2.Client_connection.shutdown conn in
let response_handler resp body =
response_handler ~shutdown (`V2 (resp, body))
in
let body =
H2.Client_connection.request conn request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { D.protect }, prm =
disown flow;
D.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V2 (`Exn exn))
| Ok (), None, Some (`V2 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V1 _) | None) -> assert false
in
let write_string body ?off ?len str =
protect ~orphans (H2.Body.Writer.write_string body ?off ?len) str
in
let close body = protect ~orphans H2.Body.Writer.close body in
let body = { body; write_string; close } in
(orphans, Process (V2, await, body))
| _ -> Fmt.invalid_arg "Http_miou_unix.run: incompatible arguments"

50
src/http_miou_unix.mli Normal file
View File

@ -0,0 +1,50 @@
type tls
type tls_error
val pp_tls_error : tls_error Fmt.t
val to_tls :
Tls.Config.client ->
?host:[ `host ] Domain_name.t ->
Miou_unix.file_descr ->
(tls, tls_error) result
val epoch : tls -> Tls.Core.epoch_data option
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of tls | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
| `Protocol of string ]
val pp_error : error Fmt.t
type 'body body = {
body : 'body;
write_string : 'body -> ?off:int -> ?len:int -> string -> unit;
close : 'body -> unit;
}
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t body) version
| V2 : (H2.Response.t, H2.Body.Writer.t body) version
type ('resp, 'acc) await = unit -> ('resp * 'acc, error) result
type 'acc process =
| Process :
('resp, 'body) version * ('resp, 'acc) await * 'body
-> 'acc process
val terminate : unit Miou.orphans -> unit
val run :
f:('acc -> string -> 'acc) ->
'acc ->
config ->
flow ->
request ->
unit Miou.orphans * 'acc process

307
src/httpcats.ml Normal file
View File

@ -0,0 +1,307 @@
let src = Logs.Src.create "http-client"
module Log = (val Logs.src_log src : Logs.LOG)
let error_msgf fmt = Fmt.kstr (fun msg -> Error (`Msg msg)) fmt
let decode_host_port str =
match String.split_on_char ':' str with
| [] -> Error (`Msg "Empty host part")
| [ host ] -> Ok (host, None)
| hd :: tl -> (
let port, host =
match List.rev (hd :: tl) with
| hd :: tl -> (hd, String.concat ":" (List.rev tl))
| _ -> assert false
in
try Ok (host, Some (int_of_string port))
with _ -> Error (`Msg "Couln't decode port"))
let decode_uri uri =
(* proto :// user : pass @ host : port / path *)
let ( >>= ) = Result.bind in
match String.split_on_char '/' uri with
| proto :: "" :: user_pass_host_port :: path ->
(if String.equal proto "http:" then Ok ("http", false)
else if String.equal proto "https:" then Ok ("https", true)
else Error (`Msg "Unknown protocol"))
>>= fun (scheme, is_tls) ->
let decode_user_pass up =
match String.split_on_char ':' up with
| [ user; pass ] -> Ok (user, pass)
| _ -> Error (`Msg "Couldn't decode user and password")
in
(match String.split_on_char '@' user_pass_host_port with
| [ host_port ] -> Ok (None, host_port)
| [ user_pass; host_port ] ->
decode_user_pass user_pass >>= fun up -> Ok (Some up, host_port)
| _ -> Error (`Msg "Couldn't decode URI"))
>>= fun (user_pass, host_port) ->
decode_host_port host_port >>= fun (host, port) ->
Ok (is_tls, scheme, user_pass, host, port, "/" ^ String.concat "/" path)
| _ -> Error (`Msg "Could't decode URI on top")
let add_authentication ~add headers = function
| None -> headers
| Some (user, pass) ->
let data = Base64.encode_string (user ^ ":" ^ pass) in
let s = "Basic " ^ data in
add headers "authorization" s
let user_agent = "http-client/%%VERSION_NUM%%"
let prep_http_1_1_headers headers host user_pass blen =
let headers = Httpaf.Headers.of_list headers in
let add = Httpaf.Headers.add_unless_exists in
let headers = add headers "user-agent" user_agent in
let headers = add headers "host" host in
let headers = add headers "connection" "close" in
let headers =
add headers "content-length" (string_of_int (Option.value ~default:0 blen))
in
add_authentication ~add headers user_pass
let prep_h2_headers headers (host : string) user_pass blen =
(* please note, that h2 (at least in version 0.10.0) encodes the headers
in reverse order ; and for http/2 compatibility we need to retain the
:authority pseudo-header first (after method/scheme/... that are encoded
specially *)
(* also note that "host" is no longer a thing, but :authority is -- so if
we find a host header, we'll rephrase that as authority. *)
let headers =
List.rev_map (fun (k, v) -> (String.lowercase_ascii k, v)) headers
in
let headers = H2.Headers.of_rev_list headers in
let headers, authority =
match
(H2.Headers.get headers "host", H2.Headers.get headers ":authority")
with
| None, None -> (headers, host)
| Some h, None ->
Log.debug (fun m ->
m "removing host header (inserting authority instead)");
(H2.Headers.remove headers "host", h)
| None, Some a -> (H2.Headers.remove headers ":authority", a)
| Some h, Some a ->
if String.equal h a then
(H2.Headers.remove (H2.Headers.remove headers ":authority") "host", h)
else begin
Log.warn (fun m ->
m "authority header %s mismatches host %s (keeping both)" a h);
(H2.Headers.remove headers ":authority", a)
end
in
let add hdr = H2.Headers.add_unless_exists hdr ?sensitive:None in
let hdr = H2.Headers.empty in
let hdr = add hdr ":authority" authority in
let hdr = H2.Headers.add_list hdr (H2.Headers.to_rev_list headers) in
let hdr = add hdr "user-agent" user_agent in
let hdr =
add hdr "content-length" (string_of_int (Option.value ~default:0 blen))
in
add_authentication ~add hdr user_pass
module Version = Httpaf.Version
module Status = H2.Status
module Headers = H2.Headers
type response = {
version : Version.t;
status : Status.t;
reason : string;
headers : Headers.t;
}
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
| `Protocol of string
| `Msg of string
| `Tls of Http_miou_unix.tls_error ]
let pp_error ppf = function
| #Http_miou_unix.error as err -> Http_miou_unix.pp_error ppf err
| `Msg msg -> Fmt.string ppf msg
| `Tls err -> Http_miou_unix.pp_tls_error ppf err
let from_httpaf response =
{
version = response.Httpaf.Response.version;
status = (response.Httpaf.Response.status :> H2.Status.t);
reason = response.Httpaf.Response.reason;
headers =
H2.Headers.of_list
(Httpaf.Headers.to_list response.Httpaf.Response.headers);
}
let single_http_1_1_request ?(config = Httpaf.Config.default) flow user_pass
host meth path headers body f acc =
let body_length = Option.map String.length body in
let headers = prep_http_1_1_headers headers host user_pass body_length in
let request = Httpaf.Request.create ~headers meth path in
match Http_miou_unix.run ~f acc (`V1 config) flow (`V1 request) with
| _, Process (V2, _, _) -> assert false
| orphans, Process (V1, await, { body = stream; write_string; close }) -> (
Option.iter (write_string stream) body;
close stream;
let result = await () in
Http_miou_unix.terminate orphans;
match result with
| Ok (response, acc) -> Ok (from_httpaf response, acc)
| Error (#Http_miou_unix.error as err) -> Error (err :> error))
let from_h2 response =
{
version = { major = 2; minor = 0 };
status = response.H2.Response.status;
reason = "";
headers = response.H2.Response.headers;
}
let single_h2_request ?(config = H2.Config.default) flow scheme user_pass host
meth path headers body f acc =
let body_length = Option.map String.length body in
let headers = prep_h2_headers headers host user_pass body_length in
let request = H2.Request.create ~scheme ~headers meth path in
match Http_miou_unix.run ~f acc (`V2 config) flow (`V2 request) with
| _, Process (V1, _, _) -> assert false
| orphans, Process (V2, await, { body = stream; write_string; close }) -> (
Option.iter (write_string stream) body;
close stream;
let result = await () in
Http_miou_unix.terminate orphans;
match result with
| Ok (response, acc) -> Ok (from_h2 response, acc)
| Error (#Http_miou_unix.error as err) -> Error (err :> error))
let alpn_protocol = function
| `Tcp _ -> None
| `Tls tls -> (
match Http_miou_unix.epoch tls with
| Some { Tls.Core.alpn_protocol = Some "h2"; _ } -> Some `H2
| Some { Tls.Core.alpn_protocol = Some "http/1.1"; _ } -> Some `HTTP_1_1
| Some { Tls.Core.alpn_protocol = None; _ } -> None
| Some { Tls.Core.alpn_protocol = Some protocol; _ } ->
Log.warn (fun m ->
m "The ALPN negotiation unexpectedly resulted in %S." protocol);
None
| None -> None)
let connect resolver ?port ?tls_config host =
let port =
match (port, tls_config) with
| None, None -> 80
| None, Some _ -> 443
| Some port, _ -> port
in
let ( >>= ) x f = Result.bind x f in
Happy.connect_endpoint resolver host [ port ] >>= fun (_addr, socket) ->
match tls_config with
| Some config ->
Http_miou_unix.to_tls config socket
|> Result.map_error (fun err -> `Tls err)
>>= fun socket -> Ok (`Tls socket)
| None -> Ok (`Tcp socket)
let single_request resolver ?http_config tls_config ~meth ~headers ?body uri f
acc =
let ( let* ) = Result.bind in
let ( let+ ) x f = Result.map f x in
let* tls, scheme, user_pass, host, port, path = decode_uri uri in
let* tls_config =
if tls then
let+ tls_config = Lazy.force tls_config in
let host =
let* domain_name = Domain_name.of_string host in
Domain_name.host domain_name
in
match (tls_config, host) with
| `Custom cfg, _ -> Some cfg
| `Default cfg, Ok host -> Some (Tls.Config.peer cfg host)
| `Default cfg, _ -> Some cfg
else Ok None
in
let* flow = connect resolver ?port ?tls_config host in
match (alpn_protocol flow, http_config) with
| (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) ->
single_http_1_1_request ~config flow user_pass host meth path headers body
f acc
| (Some `HTTP_1_1 | None), None ->
single_http_1_1_request flow user_pass host meth path headers body f acc
| (Some `H2 | None), Some (`H2 config) ->
single_h2_request ~config flow scheme user_pass host meth path headers
body f acc
| Some `H2, None ->
single_h2_request flow scheme user_pass host meth path headers body f acc
| _ -> assert false
let default_authenticator = lazy (Ca_certs.authenticator ())
let resolve_location ~uri ~location =
match String.split_on_char '/' location with
| "http:" :: "" :: _ -> Ok location
| "https:" :: "" :: _ -> Ok location
| "" :: "" :: _ ->
let schema = String.sub uri 0 (String.index uri '/') in
Ok (schema ^ location)
| "" :: _ -> begin
match String.split_on_char '/' uri with
| schema :: "" :: user_pass_host_port :: _ ->
Ok (String.concat "/" [ schema; ""; user_pass_host_port ^ location ])
| _ -> error_msgf "Expected an absolute uri, got: %S" uri
end
| _ -> error_msgf "Unknown location (relative path): %S" location
let request ?config ?tls_config ?authenticator ?(meth = `GET) ?(headers = [])
?body ?(max_redirect = 5) ?(follow_redirect = true) ~resolver ~f ~uri acc =
let tls_config =
lazy
begin
match tls_config with
| Some cfg -> Ok (`Custom cfg)
| None ->
let alpn_protocols =
match config with
| None -> [ "h2"; "http/1.1" ]
| Some (`H2 _) -> [ "h2" ]
| Some (`HTTP_1_1 _) -> [ "http/1.1" ]
and authenticator =
match authenticator with
| None -> Lazy.force default_authenticator
| Some authenticator -> Ok authenticator
in
Result.map
(fun authenticator ->
`Default (Tls.Config.client ~alpn_protocols ~authenticator ()))
authenticator
end
in
let http_config =
match config with
| Some (`H2 cfg) -> Some (`V2 cfg)
| Some (`HTTP_1_1 cfg) -> Some (`V1 cfg)
| None -> None
in
if not follow_redirect then
single_request resolver ?http_config tls_config ~meth ~headers ?body uri f
acc
else
let ( >>= ) = Result.bind in
let rec follow_redirect count uri =
if count = 0 then Error (`Msg "Redirect limit exceeded")
else
match
single_request resolver ?http_config tls_config ~meth ~headers ?body
uri f acc
with
| Error _ as err -> err
| Ok (resp, body) ->
if Status.is_redirection resp.status then
match Headers.get resp.headers "location" with
| Some location ->
resolve_location ~uri ~location >>= fun uri ->
follow_redirect (pred count) uri
| None -> Ok (resp, body)
else Ok (resp, body)
in
follow_redirect max_redirect uri

29
src/httpcats.mli Normal file
View File

@ -0,0 +1,29 @@
type error
val pp_error : error Fmt.t
module Version = Httpaf.Version
module Status = H2.Status
module Headers = H2.Headers
type response = {
version : Version.t;
status : Status.t;
reason : string;
headers : Headers.t;
}
val request :
?config:[ `HTTP_1_1 of Httpaf.Config.t | `H2 of H2.Config.t ] ->
?tls_config:Tls.Config.client ->
?authenticator:X509.Authenticator.t ->
?meth:Httpaf.Method.t ->
?headers:(string * string) list ->
?body:string ->
?max_redirect:int ->
?follow_redirect:bool ->
resolver:Happy.stack ->
f:('a -> string -> 'a) ->
uri:string ->
'a ->
(response * 'a, error) result

193
src/tls_miou.ml Normal file
View File

@ -0,0 +1,193 @@
let src = Logs.Src.create "tls-miou"
module Log = (val Logs.src_log src : Logs.LOG)
module Make (Flow : Flow.S) = struct
type error =
[ `Tls_alert of Tls.Packet.alert_type
| `Tls_failure of Tls.Engine.failure
| `Read of Flow.error
| `Write of Flow.error
| `Closed ]
let pp_error ppf = function
| `Tls_failure failure -> Tls.Engine.pp_failure ppf failure
| `Tls_alert alert -> Fmt.string ppf (Tls.Packet.alert_type_to_string alert)
| `Read err -> Flow.pp_error ppf err
| `Write err -> Flow.pp_error ppf err
| `Closed -> Fmt.string ppf "Connection closed by peer"
type state = [ `Active of Tls.Engine.state | `End_of_input | `Error of error ]
type t = {
role : [ `Server | `Client ];
flow : Flow.t;
mutable state : state;
mutable linger : Cstruct.t list;
mutable writer_closed : bool;
}
let tls_alert alert = `Error (`Tls_alert alert)
let tls_fail failure = `Error (`Tls_failure failure)
let lift_read_result = function
| Ok ((`Data _ | `End_of_input) as x) -> x
| Error err -> `Error (`Read err)
let lift_write_result = function
| Ok () -> `Ok ()
| Error err -> `Error (`Write err)
let check_write flow res =
let () =
match (flow.state, lift_write_result res) with
| `Active _, ((`End_of_input | `Error _) as err) ->
flow.state <- err;
Log.warn (fun m -> m "close the socket due to a writing error");
Flow.close flow.flow
| _ -> ()
in
match res with Ok () -> Ok () | Error err -> Error (`Write err)
let read_react ~read_buffer_size flow =
let handle tls buf =
match Tls.Engine.handle_tls tls buf with
| Ok (res, `Response resp, `Data data) ->
let state =
match res with
| `Ok tls -> `Active tls
| `Eof -> `End_of_input
| `Alert alert -> tls_alert alert
in
flow.state <- state;
let _ =
match resp with
| None -> Ok ()
| Some buf -> check_write flow (Flow.writev flow.flow [ buf ])
in
(*
NOTE(dinosaure): with the /shutdown/ thing, we should not close
the connection and let the user to do so.
let () =
match res with
| `Ok _ -> ()
| _ ->
Log.warn (fun m -> m "close the socket due to a reading error");
Flow.close flow.flow
in
*)
let data =
match data with
| None -> None
| Some data when Cstruct.length data > read_buffer_size ->
let data, linger = Cstruct.split data read_buffer_size in
flow.linger <- linger :: flow.linger;
Some data
| Some data -> Some data
in
`Ok data
| Error (failure, `Response resp) ->
let reason = tls_fail failure in
flow.state <- reason;
let _ = Flow.writev flow.flow [ resp ] in
Log.warn (fun m -> m "close the socket due to a reading error");
Flow.close flow.flow;
reason
in
match flow.state with
| (`End_of_input | `Error _) as err -> err
| `Active _ -> (
match lift_read_result (Flow.read ~read_buffer_size flow.flow) with
| (`End_of_input | `Error _) as v ->
flow.state <- v;
v
| `Data buf -> (
match flow.state with
| `Active tls -> handle tls buf
| (`End_of_input | `Error _) as v -> v))
let split ~len cs =
if Cstruct.length cs >= len then Cstruct.split cs len
else (cs, Cstruct.empty)
let rec read ?(read_buffer_size = 0x1000) flow =
match flow.linger with
| _ :: _ as bufs ->
let cs = Cstruct.concat (List.rev bufs) in
let data, linger = split ~len:read_buffer_size cs in
if Cstruct.length linger > 0 then flow.linger <- [ linger ]
else flow.linger <- [];
Ok (`Data data)
| [] -> (
match read_react ~read_buffer_size flow with
| `Ok None -> read ~read_buffer_size flow
| `Ok (Some buf) -> Ok (`Data buf)
| `End_of_input -> Ok `End_of_input
| `Error e -> Error e)
let writev flow bufs =
if flow.writer_closed then Error `Closed
else
match flow.state with
| `End_of_input -> Error `Closed
| `Error e -> Error e
| `Active tls -> (
match Tls.Engine.send_application_data tls bufs with
| Some (tls, answer) ->
flow.state <- `Active tls;
check_write flow (Flow.writev flow.flow [ answer ])
| None -> assert false)
let write flow buf = writev flow [ buf ]
let rec drain_handshake flow =
match flow.state with
| `Active tls when not (Tls.Engine.handshake_in_progress tls) -> Ok flow
| _ -> (
match read_react ~read_buffer_size:0x1000 flow with
| `Ok mbuf ->
flow.linger <- Option.to_list mbuf @ flow.linger;
drain_handshake flow
| `Error e -> Error e
| `End_of_input -> Error `Closed)
let close flow =
match flow.state with
| `Active tls ->
flow.state <- `End_of_input;
let _, buf = Tls.Engine.send_close_notify tls in
let _ = Flow.writev flow.flow [ buf ] in
Log.debug (fun m -> m "close the socket");
Flow.close flow.flow
| _ -> ()
let shutdown flow v =
match (flow.state, v) with
| `Active tls, `Send when not flow.writer_closed ->
let tls, buf = Tls.Engine.send_close_notify tls in
flow.state <- `Active tls;
flow.writer_closed <- true;
let _ = Flow.writev flow.flow [ buf ] in
Flow.shutdown flow.flow `Send
| `Active _, _ -> close flow
| _ -> ()
let client_of_flow conf ?host flow =
let conf' =
match host with None -> conf | Some host -> Tls.Config.peer conf host
in
let tls, init = Tls.Engine.client conf' in
let tls_flow =
{
role = `Client;
flow;
state = `Active tls;
linger = [];
writer_closed = false;
}
in
match check_write tls_flow (Flow.writev flow [ init ]) with
| Ok () -> drain_handshake tls_flow
| Error _ as err -> err
end