Upgrade httpcats with the last version of miou

This commit is contained in:
Calascibetta Romain 2024-03-26 16:38:13 +01:00
parent 26b88af737
commit 2f6ebec92c
29 changed files with 1291 additions and 1620 deletions

View File

@ -1,5 +1,12 @@
version=0.26.1
exp-grouping=preserve
break-infix=wrap-or-vertical
break-collection-expressions=wrap
break-sequences=false
break-infix-before-func=false
dock-collection-brackets=true
break-separators=before
dock-collection-brackets=false
field-space=tight
if-then-else=compact
break-sequences=false
sequence-blank-line=compact

View File

@ -1,11 +1,11 @@
; (executable
; (name hurle)
; (modules hurle)
; (libraries logs.fmt fmt.tty logs.threaded mirage-crypto-rng.unix httpcats
; httpcats.happy hxd.string hxd.core))
(executable
(name main)
(modules main)
(libraries logs.threaded logs.fmt fmt.tty mirage-crypto-rng.unix hxd.string
hxd.core dns_miou httpcats))
(executable
(name pars)
(modules q pars)
(modules pars)
(libraries duration fmt.tty logs.fmt logs.threaded mirage-crypto-rng.unix
httpcats httpcats.happy progress))
httpcats dns_miou happy progress))

View File

@ -1,93 +0,0 @@
type arguments =
{ json : bool
; form : bool
; multipart : bool
; boundary : string option
; raw : string option
; compress : bool
; pretty : [ `All | `Colors | `Format | `None ] (* style : style *)
; sorted : bool
; charset : Rosetta.encoding
; response_mime : string option
; headers : bool
; body : bool
; verbose : [ `Verbose | `Quiet ]
; output : Fpath.t option
; continue : bool
; auth : (string * string option) option
; auth_type : [ `Basic | `Digest ]
; follow : bool
; chunked : bool
; meth : Method.t
; uri : string
}
let anchor = Unix.gettimeofday ()
let sigpipe = 13
let () = Sys.set_signal sigpipe Sys.Signal_ignore
let reporter ppf =
let report src level ~over k msgf =
let k _ =
over ();
k ()
in
let with_metadata header _tags k ppf fmt =
Format.kfprintf k ppf
("[%a]%a[%a][%a]: " ^^ fmt ^^ "\n%!")
Fmt.(styled `Cyan (fmt "%.04f"))
(Unix.gettimeofday () -. anchor)
Logs_fmt.pp_header (level, header)
Fmt.(styled `Blue int)
(Stdlib.Domain.self () :> int)
Fmt.(styled `Magenta string)
(Logs.Src.name src)
in
msgf @@ fun ?header ?tags fmt -> with_metadata header tags k ppf fmt
in
{ Logs.report }
let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()
let () = Logs.set_reporter (reporter Fmt.stderr)
let () = Logs.set_level ~all:true (Some Logs.Debug)
let () = Logs_threaded.enable ()
let () = Printexc.record_backtrace true
let out = Mutex.create ()
let pr fmt =
let finally () = Mutex.unlock out in
Mutex.lock out;
Fun.protect ~finally @@ fun () -> Format.printf fmt
let epr fmt =
let finally () = Mutex.unlock out in
Mutex.lock out;
Fun.protect ~finally @@ fun () -> Format.eprintf fmt
let getaddrinfo dns =
{ Happy.getaddrinfo =
(fun record host -> Dns_miou.getaddrinfo dns record host)
}
let () = Mirage_crypto_rng_unix.initialize (module Mirage_crypto_rng.Fortuna)
let () =
Miou_unix.run @@ fun () ->
match Sys.argv with
| [| _; uri |] ->
let acc = Buffer.create 0x100 in
let f _ buf str =
Buffer.add_string buf str;
buf
in
let prm =
Miou.call @@ fun () ->
match Httpcats.request ~f ~uri acc with
| Ok (_response, buf) ->
pr "@[<hov>%a@]\n%!"
(Hxd_string.pp Hxd.default)
(Buffer.contents buf)
| Error err -> epr "Got an error: %a\n%!" Httpcats.pp_error err
in
Miou.await_exn prm
| _ -> epr "%s <uri>\n%!" Sys.argv.(0)

52
app/main.ml Normal file
View File

@ -0,0 +1,52 @@
let () = Printexc.record_backtrace true
let anchor = Mtime_clock.now ()
let reporter ppf =
let report src level ~over k msgf =
let k _ = over (); k () in
let with_metadata header _tags k ppf fmt =
let timestamp = Mtime.span (Mtime_clock.now ()) anchor in
Format.kfprintf k ppf
("[+%a]%a[%a][%a]: " ^^ fmt ^^ "\n%!")
Fmt.(
styled `Magenta
(using (fun ns -> Mtime.Span.to_float_ns ns /. 1e9) (fmt "%06.04f")))
timestamp Logs_fmt.pp_header (level, header)
Fmt.(styled `Blue int)
(Stdlib.Domain.self () :> int)
Fmt.(styled `Magenta string)
(Logs.Src.name src)
in
msgf @@ fun ?header ?tags fmt -> with_metadata header tags k ppf fmt
in
{ Logs.report }
let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()
let () = Logs.set_reporter (reporter Fmt.stderr)
let () = Logs.set_level ~all:true (Some Logs.Debug)
let () = Logs_threaded.enable ()
let () = Mirage_crypto_rng_unix.initialize (module Mirage_crypto_rng.Fortuna)
let getaddrinfo dns =
{
Happy.getaddrinfo= (fun record host -> Dns_miou.getaddrinfo dns record host)
}
let () =
Miou_unix.run @@ fun () ->
let daemon, resolver = Happy.stack () in
let dns = Dns_miou.create resolver in
Happy.inject_resolver ~getaddrinfo:(getaddrinfo dns) resolver;
let f _resp buf str = Buffer.add_string buf str; buf in
match
Httpcats.request ~resolver ~f ~uri:Sys.argv.(1) (Buffer.create 0x100)
with
| Ok (_, body) ->
Happy.kill daemon;
Format.printf "@[<hov>%a@]\n%!"
(Hxd_string.pp Hxd.default)
(Buffer.contents body)
| Error err ->
Happy.kill daemon;
Format.eprintf "%a\n%!" Httpcats.pp_error err;
exit 1

View File

@ -1,11 +1,9 @@
let anchor = Unix.gettimeofday ()
let () = Printexc.record_backtrace true
let reporter ppf =
let report src level ~over k msgf =
let k _ =
over ();
k ()
in
let k _ = over (); k () in
let with_metadata header _tags k ppf fmt =
Format.kfprintf k ppf
("[%a]%a[%a][%a]: " ^^ fmt ^^ "\n%!")
@ -22,9 +20,9 @@ let reporter ppf =
{ Logs.report }
let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()
let () = Logs.set_reporter (reporter Fmt.stderr)
(* let () = Logs.set_reporter (reporter Fmt.stderr) *)
let () = Logs.set_level ~all:true (Some Logs.Debug)
(* let () = Logs.set_level ~all:true (Some Logs.Debug) *)
let () = Logs_threaded.enable ()
let () = Printexc.record_backtrace true
@ -62,21 +60,37 @@ let epr fmt =
type reporter = All_knowning of int Progress.Reporter.t | Unknown of int
let get_length { Httpcats.headers; _ } =
let headers = H2.Headers.to_list headers in
let headers =
List.map (fun (k, v) -> (String.lowercase_ascii k, v)) headers
in
Option.map int_of_string (List.assoc_opt "content-length" headers)
let download ~orphans ~events ~uid ~resolver ~uri =
let _prm =
Miou.call ~orphans @@ fun () ->
let got_response = ref false in
let counter = ref 0 in
let[@warning "-8"] (Ok (_, _, _, _, _, path)) = Httpcats.decode_uri uri in
let f resp () str =
if not !got_response then begin
Miou.Queue.enqueue events
(`Response (uid, Filename.basename path, resp));
Logs.debug (fun m ->
m "response for %s: %a" uri Httpcats.pp_response resp);
got_response := true
end;
let max = Option.value ~default:0 (get_length resp) in
counter := !counter + String.length str;
Logs.debug (fun m ->
m "got %d/%d byte(s) from %d:%s" !counter max uid uri);
Miou.Queue.enqueue events (`Data (uid, String.length str))
in
match Httpcats.request ~resolver ~uri ~f () with
| Ok (_response, ()) -> Miou.Queue.enqueue events (`End uid)
| Ok (_response, ()) ->
Logs.debug (fun m -> m "%d:%s downloaded" uid uri);
Miou.Queue.enqueue events (`End uid)
| Error err -> Miou.Queue.enqueue events (`Error (uid, err))
in
()
@ -89,15 +103,15 @@ type event =
type display = (unit, unit) Progress.Display.t
type t =
{ gen : int Atomic.t
; orphans : unit Miou.orphans
; events : event Miou.Queue.t
; reporters : reporter array
; display : display
; align : int
; resolver : Happy.stack
}
type t = {
gen: int Atomic.t
; orphans: unit Miou.orphans
; events: event Miou.Queue.t
; reporters: reporter array
; display: display
; align: int
; resolver: Happy.stack
}
let make ~resolver ~filenames =
let gen = Atomic.make 0 in
@ -119,13 +133,6 @@ let make ~resolver ~filenames =
let [] = Progress.Display.reporters display in
{ gen; orphans; events; reporters; display; align; resolver }
let get_length { Httpcats.headers; _ } =
let headers = H2.Headers.to_list headers in
let headers =
List.map (fun (k, v) -> (String.lowercase_ascii k, v)) headers
in
Option.map int_of_string (List.assoc_opt "content-length" headers)
let bar t ~filename ~response =
match get_length response with
| Some total ->
@ -188,7 +195,7 @@ let rec run t uris () =
~resolver:t.resolver ~uri;
let events' = Miou.Queue.(to_list (transfer t.events)) in
consume t events';
run t rest ()
run t rest (Miou.yield ())
| Some prm, uri :: rest ->
Option.iter Miou.await_exn prm;
download ~orphans:t.orphans ~events:t.events
@ -196,12 +203,12 @@ let rec run t uris () =
~resolver:t.resolver ~uri;
let events' = Miou.Queue.(to_list (transfer t.events)) in
consume t events';
run t rest ()
run t rest (Miou.yield ())
| Some prm, [] ->
Option.iter Miou.await_exn prm;
let events' = Miou.Queue.(to_list (transfer t.events)) in
consume t events';
run t [] ()
run t [] (Miou.yield ())
let get_uris_from_stdin () =
let rec go acc =
@ -212,8 +219,8 @@ let get_uris_from_stdin () =
go []
let getaddrinfo dns =
{ Happy.getaddrinfo =
(fun record host -> Dns_miou.getaddrinfo dns record host)
{
Happy.getaddrinfo= (fun record host -> Dns_miou.getaddrinfo dns record host)
}
let sigpipe = 13

View File

@ -1,43 +0,0 @@
type 'a t =
{ buffer : 'a array
; mutable rdpos : int
; mutable wrpos : int
; lock : Mutex.t
; non_empty : Miou_unix.Cond.t
; non_full : Miou_unix.Cond.t
}
let make size v =
let lock = Mutex.create () in
{ buffer = Array.make size v
; lock
; rdpos = 0
; wrpos = 0
; non_empty = Miou_unix.Cond.make ~mutex:lock ()
; non_full = Miou_unix.Cond.make ~mutex:lock ()
}
let is_empty t =
Mutex.lock t.lock;
let is_empty = t.rdpos = t.wrpos in
Mutex.unlock t.lock;
is_empty
let put t v =
let predicate () = (t.wrpos + 1) mod Array.length t.buffer = t.rdpos in
let fn () =
t.buffer.(t.wrpos) <- v;
t.wrpos <- (t.wrpos + 1) mod Array.length t.buffer;
Miou_unix.Cond.signal t.non_empty
in
Miou_unix.Cond.until ~predicate ~fn t.non_full
let get t =
let predicate () = t.wrpos = t.rdpos in
let fn () =
let v = t.buffer.(t.rdpos) in
t.rdpos <- (t.rdpos + 1) mod Array.length t.buffer;
Miou_unix.Cond.signal t.non_full;
v
in
Miou_unix.Cond.until ~predicate ~fn t.non_empty

View File

@ -1,6 +0,0 @@
type 'a t
val make : int -> 'a -> 'a t
val put : 'a t -> 'a -> unit
val get : 'a t -> 'a
val is_empty : 'a t -> bool

View File

@ -1,13 +1,8 @@
open Httpcats.Server
let anchor = Unix.gettimeofday ()
let reporter ppf =
let report src level ~over k msgf =
let k _ =
over ();
k ()
in
let k _ = over (); k () in
let with_metadata header _tags k ppf fmt =
Format.kfprintf k ppf
("[%a]%a[%a][%a]: " ^^ fmt ^^ "\n%!")
@ -82,38 +77,38 @@ let listen sockaddr =
let rec cleanup orphans =
match Miou.care orphans with
| None | Some None -> ()
| Some (Some prm) ->
Miou.await_exn prm;
cleanup orphans
| Some (Some prm) -> Miou.await_exn prm; cleanup orphans
let handler request =
match request.target with
| "" | "/" | "/index.html" ->
let headers =
Headers.of_list
[ ("content-type", "text/html; charset=utf-8")
; ("content-length", string_of_int (String.length index_html)) ]
in
string ~headers ~status:`OK index_html
| _ ->
let headers = Headers.of_list [ ("content-length", "0") ] in
string ~headers ~status:`Not_found ""
let handler = function
| `V2 _ -> assert false
| `V1 reqd -> (
let open Httpaf in
let request = Reqd.request reqd in
match request.Request.target with
| "" | "/" | "/index.html" ->
let headers =
Headers.of_list
[
("content-type", "text/html; charset=utf-8")
; ("content-length", string_of_int (String.length index_html))
]
in
let resp = Response.create ~headers `OK in
let body = Reqd.request_body reqd in
Body.close_reader body;
Reqd.respond_with_string reqd resp index_html
| _ ->
let headers = Headers.of_list [ ("content-length", "0") ] in
let resp = Response.create ~headers `Not_found in
Reqd.respond_with_string reqd resp "")
let stop = Miou_unix.Cond.make ()
let server sockaddr =
let file_descr = listen sockaddr in
Httpcats.Server.clear ~stop ~handler file_descr;
Miou_unix.disown file_descr
let stop _ = Miou_unix.Cond.broadcast stop
let server sockaddr = Httpcats.Server.clear ~handler sockaddr
let () =
let addr = sockaddr_of_arguments () in
let () = Sys.set_signal Sys.sigint (Signal_handle stop) in
let () = Printexc.record_backtrace true in
Miou_unix.run @@ fun () ->
Miou_unix.run ~domains:3 @@ fun () ->
let prm = Miou.call_cc @@ fun () -> server addr in
Miou.parallel server (List.init (Miou.Domain.count ()) (Fun.const addr))
Miou.parallel server (List.init 3 (Fun.const addr))
|> List.iter (function Ok () -> () | Error exn -> raise exn);
Miou.await_exn prm

15
happy/README.md Normal file
View File

@ -0,0 +1,15 @@
# DNS resolution
This example shows the implementation of a "pool" of connections. In other
words, we try several connections and choose the one that finishes its TCP/IP
handshake as quickly as possible. A DNS resolution is performed. The idea is to
allocate a "daemon" to manage all the TCP/IP connections (which is the role of
happy-eyeballs). As soon as it is switched off, it makes sure that all the
resources have been released.
You can test the code with it:
```shell-session
$ dune exec examples/happy/main.exe --
Connected to google.com via *:443
...
```

20
happy/dune Normal file
View File

@ -0,0 +1,20 @@
(library
(name happy)
(modules happy)
(libraries miou.unix mtime.clock.os ipaddr.unix cstruct dns dns-client
happy-eyeballs mirage-crypto-rng hxd.string hxd.core)
(foreign_stubs
(language c)
(names happy)
(flags (:standard))))
(library
(name dns_miou)
(modules dns_miou)
(libraries happy))
(executable
(name main)
(modules main)
(libraries mirage-crypto-rng.unix logs.fmt logs.threaded fmt.tty duration
dns_miou))

View File

@ -1,6 +1,10 @@
let src = Logs.Src.create "happy"
let src_daemon = Logs.Src.create "happy-daemon"
module Log = (val Logs.src_log src : Logs.LOG)
module Logd = (val Logs.src_log src_daemon : Logs.LOG)
let src_client = Logs.Src.create "happy"
module Logc = (val Logs.src_log src_client : Logs.LOG)
let error_msgf fmt = Format.kasprintf (fun msg -> Error (`Msg msg)) fmt
@ -13,7 +17,6 @@ let to_sockaddr (ipaddr, port) =
Unix.ADDR_INET (Ipaddr_unix.to_inet_addr ipaddr, port)
let clock = Mtime_clock.elapsed_ns
let he_timer_interval = Duration.(to_f (of_ms 10))
type state =
| In_progress
@ -23,52 +26,60 @@ type state =
type entry = Happy_eyeballs.id * attempt * [ `host ] Domain_name.t * addr
and attempt = int
and addr = Ipaddr.t * int
and cancel = attempt * [ `Connection of Miou_unix.file_descr ] Miou.t
and action =
type cancel = attempt * unit Miou.t
type action =
[ `Connect_ip of state Atomic.t * addr list
| `Connect of state Atomic.t * [ `host ] Domain_name.t * int list ]
and value =
[ `Connection of Miou_unix.file_descr
type connected = [ `Connected of entry * Miou_unix.file_descr ]
type event =
[ connected
| `Connection_failed of entry * string
| `Resolution_v4 of
[ `host ] Domain_name.t * (Ipaddr.V4.Set.t, [ `Msg of string ]) result
| `Resolution_v6 of
[ `host ] Domain_name.t * (Ipaddr.V6.Set.t, [ `Msg of string ]) result ]
and getaddrinfo =
{ getaddrinfo :
and getaddrinfo = {
getaddrinfo:
'response 'a.
'response Dns.Rr_map.key
-> 'a Domain_name.t
-> ('response, [ `Msg of string ]) result
}
}
[@@unboxed]
let dummy =
let getaddrinfo _ _ = Error (`Msg "Not implemented") in
{ getaddrinfo }
type stack =
{ mutable cancel_connecting : cancel list Happy_eyeballs.Waiter_map.t
; mutable waiters : state Atomic.t Happy_eyeballs.Waiter_map.t
; condition : Miou_unix.Cond.t
; queue : action Miou.Queue.t
; connections : (Miou.Promise.Uid.t, entry) Hashtbl.t
; mutable getaddrinfo : getaddrinfo
}
type stack = {
mutable cancel_connecting: cancel list Happy_eyeballs.Waiter_map.t
; mutable waiters: state Atomic.t Happy_eyeballs.Waiter_map.t
; condition: Miou.Condition.t
; mutex: Miou.Mutex.t
; queue: [ action | event ] Miou.Queue.t
; mutable getaddrinfo: getaddrinfo
}
let create_stack () =
{ cancel_connecting = Happy_eyeballs.Waiter_map.empty
; waiters = Happy_eyeballs.Waiter_map.empty
; condition = Miou_unix.Cond.make ()
; queue = Miou.Queue.create ()
; connections = Hashtbl.create 0x100
; getaddrinfo = dummy
{
cancel_connecting= Happy_eyeballs.Waiter_map.empty
; waiters= Happy_eyeballs.Waiter_map.empty
; condition= Miou.Condition.create ()
; mutex= Miou.Mutex.create ()
; queue= Miou.Queue.create ()
; getaddrinfo= dummy
}
let try_connect addr () =
let try_connect t ~meta addr () =
let id, attempt, _, _ = meta in
let addr = to_sockaddr addr in
Logd.debug (fun m ->
m "connect to %a (%d:%d)" pp_sockaddr addr (Obj.magic id) attempt);
let socket =
match Unix.domain_of_sockaddr addr with
| Unix.PF_UNIX -> Fmt.invalid_arg "Invalid address: %a" pp_sockaddr addr
@ -76,40 +87,43 @@ let try_connect addr () =
| Unix.PF_INET6 -> Miou_unix.tcpv6 ()
in
try
Log.debug (fun m -> m "connect to %a" pp_sockaddr addr);
Miou_unix.connect socket addr;
`Connection (Miou_unix.transfer socket)
Logd.debug (fun m ->
m "connected to %a (%d:%d)" pp_sockaddr addr (Obj.magic id) attempt);
Miou.Mutex.protect t.mutex @@ fun () ->
Miou.Queue.enqueue t.queue (`Connected (meta, socket));
Miou.Condition.signal t.condition
with Unix.Unix_error (err, _, _) ->
Log.err (fun m ->
Logd.err (fun m ->
m "error connecting to %a: %s" pp_sockaddr addr (Unix.error_message err));
Fmt.failwith "error connecting to %a: %s" pp_sockaddr addr
(Unix.error_message err)
let disown fd =
Miou_unix.disown fd;
Miou_unix.to_file_descr fd
Miou_unix.close socket;
let msg =
Fmt.str "error connecting to %a: %s" pp_sockaddr addr
(Unix.error_message err)
in
Miou.Mutex.protect t.mutex @@ fun () ->
Miou.Queue.enqueue t.queue (`Connection_failed (meta, msg));
Miou.Condition.signal t.condition
let getpeername fd = try Some (Unix.getpeername fd) with _exn -> None
external to_cancel :
value Miou.t -> [ `Connection of Miou_unix.file_descr ] Miou.t = "%identity"
let connect t ~prms:orphans host id attempt addr =
let connection : value Miou.t = Miou.call ~orphans (try_connect addr) in
Hashtbl.add t.connections
(Miou.Promise.uid connection)
(id, attempt, host, addr);
let entry = ((attempt, to_cancel connection) :> cancel) in
let meta = (id, attempt, host, addr) in
Logd.debug (fun m ->
m "connect to %a (%d:%d)" Domain_name.pp host (Obj.magic id) attempt);
let prm : unit Miou.t = Miou.call_cc ~orphans (try_connect t ~meta addr) in
let entry = (attempt, prm) in
t.cancel_connecting <-
Happy_eyeballs.Waiter_map.update id
(function None -> Some [ entry ] | Some cs -> Some (entry :: cs))
t.cancel_connecting
let handle_one_action t ~prms = function
let handle_one_action t ~prms action =
match action with
| Happy_eyeballs.Connect (host, id, attempt, addr) ->
connect t ~prms host id attempt addr
| Happy_eyeballs.Connect_failed (host, id, reason) ->
Log.warn (fun m ->
Logd.warn (fun m ->
m "connection to %a failed: %s" Domain_name.pp host reason);
let cancel_connecting, others =
Happy_eyeballs.Waiter_map.find_and_remove id t.cancel_connecting
@ -133,7 +147,7 @@ let handle_one_action t ~prms = function
| Connected (_, fd) ->
let sockaddr = getpeername fd in
let fd = Miou_unix.of_file_descr fd in
Log.warn (fun m ->
Logd.warn (fun m ->
m "close the file-descriptor of %a (%a): %s" Domain_name.pp
host
Fmt.(option ~none:(const string "<none>") pp_sockaddr)
@ -148,31 +162,32 @@ let handle_one_action t ~prms = function
| Happy_eyeballs.Resolve_a host ->
let _ =
Miou.call_cc ~orphans:prms @@ fun () ->
match t.getaddrinfo.getaddrinfo Dns.Rr_map.A host with
| Ok (_ttl, res) -> `Resolution_v4 (host, Ok res)
| Error _ as err -> `Resolution_v4 (host, err)
let result =
match t.getaddrinfo.getaddrinfo Dns.Rr_map.A host with
| Ok (_ttl, res) -> Ok res
| Error _ as err -> err
in
Miou.Mutex.protect t.mutex @@ fun () ->
Miou.Queue.enqueue t.queue (`Resolution_v4 (host, result));
Miou.Condition.signal t.condition
in
()
| Happy_eyeballs.Resolve_aaaa host ->
let _ =
Miou.call_cc ~orphans:prms @@ fun () ->
match t.getaddrinfo.getaddrinfo Dns.Rr_map.Aaaa host with
| Ok (_ttl, res) -> `Resolution_v6 (host, Ok res)
| Error _ as err -> `Resolution_v6 (host, err)
let result =
match t.getaddrinfo.getaddrinfo Dns.Rr_map.Aaaa host with
| Ok (_ttl, res) -> Ok res
| Error _ as err -> err
in
Miou.Mutex.protect t.mutex @@ fun () ->
Miou.Queue.enqueue t.queue (`Resolution_v6 (host, result));
Miou.Condition.signal t.condition
in
()
let handle t prm =
let entry = Hashtbl.find_opt t.connections (Miou.Promise.uid prm) in
match (Miou.await prm, entry) with
| Error exn, Some (id, attempt, host, addr) ->
Hashtbl.remove t.connections (Miou.Promise.uid prm);
let msg =
match exn with
| Invalid_argument str | Failure str -> str
| Miou.Cancelled -> "cancelled"
| exn -> Printexc.to_string exn
in
let to_event t = function
| `Connection_failed ((id, attempt, host, addr), msg) ->
let fold = function
| None -> None
| Some cs -> (
@ -182,77 +197,61 @@ let handle t prm =
in
t.cancel_connecting <-
Happy_eyeballs.Waiter_map.update id fold t.cancel_connecting;
Some (Happy_eyeballs.Connection_failed (host, id, addr, msg))
| Ok (`Connection fd), Some (id, attempt, host, addr) ->
Happy_eyeballs.Connection_failed (host, id, addr, msg)
| `Connected ((id, attempt, host, addr), fd) as event ->
let cancel_connecting, others =
Happy_eyeballs.Waiter_map.find_and_remove id t.cancel_connecting
in
t.cancel_connecting <- cancel_connecting;
List.iter
(fun (att, prm) -> if att <> attempt then Miou.cancel prm)
(fun (att, prm) ->
if att <> attempt then begin
Logd.debug (fun m -> m "cancel (%d:%d)" (Obj.magic id) attempt);
Miou.cancel prm
end)
(Option.value ~default:[] others);
let waiters, waiter =
Happy_eyeballs.Waiter_map.find_and_remove id t.waiters
in
t.waiters <- waiters;
let transition waiter =
Log.debug (fun m -> m "disown the file-descr of %a" Domain_name.pp host);
let fd = disown fd in
let connected = Connected (addr, fd) in
let set = Atomic.compare_and_set waiter In_progress connected in
Log.debug (fun m ->
m "transfer the file-descriptor for %a" Domain_name.pp host);
if not set then (
let fd = Miou_unix.of_file_descr fd in
Log.warn (fun m ->
m "close the file-descriptor of %a" Domain_name.pp host);
Miou_unix.close fd)
let () =
match waiter with
| None ->
Format.eprintf ">>> not wait!!!!\n%!";
Miou.Mutex.protect t.mutex @@ fun () ->
Miou.Queue.enqueue t.queue event;
Miou.Condition.signal t.condition
| Some waiter ->
let connected = Connected (addr, Miou_unix.to_file_descr fd) in
let set = Atomic.compare_and_set waiter In_progress connected in
Logd.debug (fun m -> m "file-descr transmitted? %b" set);
if not set then Miou_unix.close fd
in
Option.iter transition waiter;
Some (Happy_eyeballs.Connected (host, id, addr))
| Ok (`Connection fd), None ->
Miou_unix.close fd;
None
| Ok (`Resolution_v4 (host, Ok ips)), _ ->
Log.debug (fun m -> m "%a resolved" Domain_name.pp host);
Some (Happy_eyeballs.Resolved_a (host, ips))
| Ok (`Resolution_v4 (host, Error (`Msg msg))), _ ->
Log.warn (fun m ->
Logd.debug (fun m ->
m "connected to %a (%a) (%d:%d)" Domain_name.pp host pp_sockaddr
(to_sockaddr addr) (Obj.magic id) attempt);
Happy_eyeballs.Connected (host, id, addr)
| `Resolution_v4 (host, Ok ips) ->
Logd.debug (fun m -> m "%a resolved" Domain_name.pp host);
Happy_eyeballs.Resolved_a (host, ips)
| `Resolution_v4 (host, Error (`Msg msg)) ->
Logd.warn (fun m ->
m "impossible to resolve %a: %s" Domain_name.pp host msg);
Some (Happy_eyeballs.Resolved_a_failed (host, msg))
| Ok (`Resolution_v6 (host, Ok ips)), _ ->
Log.debug (fun m -> m "%a resolved" Domain_name.pp host);
Some (Happy_eyeballs.Resolved_aaaa (host, ips))
| Ok (`Resolution_v6 (host, Error (`Msg msg))), _ ->
Log.warn (fun m ->
Happy_eyeballs.Resolved_a_failed (host, msg)
| `Resolution_v6 (host, Ok ips) ->
Logd.debug (fun m -> m "%a resolved" Domain_name.pp host);
Happy_eyeballs.Resolved_aaaa (host, ips)
| `Resolution_v6 (host, Error (`Msg msg)) ->
Logd.warn (fun m ->
m "impossible to resolve %a: %s" Domain_name.pp host msg);
Some (Happy_eyeballs.Resolved_aaaa_failed (host, msg))
| Error exn, None ->
Log.err (fun m ->
m "got an unexpected error from a promise: %S"
(Printexc.to_string exn));
None
Happy_eyeballs.Resolved_aaaa_failed (host, msg)
let await_actions t he () =
Log.debug (fun m -> m "wait for user's actions");
let user's_actions =
Miou_unix.Cond.until
~predicate:(fun () ->
Log.debug (fun m ->
m "got an user's action? %b"
(Fun.negate Miou.Queue.is_empty t.queue));
Miou.Queue.is_empty t.queue)
~fn:(fun () -> Miou.Queue.(to_list (transfer t.queue)))
t.condition
in
Log.debug (fun m -> m "got %d action(s)" (List.length user's_actions));
let to_actions t he user's_actions =
let fold (he, actions) = function
| `Connect_ip (waiter, addrs) ->
let waiters, id = Happy_eyeballs.Waiter_map.register waiter t.waiters in
t.waiters <- waiters;
let he, actions' = Happy_eyeballs.connect_ip he (clock ()) ~id addrs in
Log.debug (fun m ->
m "+%d action(s) for connect-ip" (List.length actions'));
(he, actions @ actions')
| `Connect (waiter, host, ports) ->
let waiters, id = Happy_eyeballs.Waiter_map.register waiter t.waiters in
@ -260,86 +259,116 @@ let await_actions t he () =
let he, actions' =
Happy_eyeballs.connect he (clock ()) ~id host ports
in
Log.debug (fun m ->
m "+%d action(s) for connect" (List.length actions'));
(he, actions @ actions')
in
List.fold_left fold (he, []) user's_actions
let rec get_events t he ~prms actions =
match Option.map (handle t) (Option.join (Miou.care prms)) |> Option.join with
| Some event ->
let he, actions' = Happy_eyeballs.event he (clock ()) event in
(* NOTE(dinosaure): prioritise event's actions. *)
get_events t he ~prms (actions @ actions')
| None -> (he, actions)
let await_actions_or_events t () =
Miou.Mutex.protect t.mutex @@ fun () ->
while Miou.Queue.is_empty t.queue do
Miou.Condition.wait t.condition t.mutex
done
exception Timeout
let with_timeout ~timeout ?(give = []) fn =
let timeout () =
Miou_unix.sleep timeout;
raise Timeout
let with_timeout ~timeout:ts fn =
let timeout () = Miou_unix.sleep ts; raise Timeout in
let prm1 = Miou.call_cc timeout in
let prm0 = Miou.call_cc fn in
Miou.await_first [ prm0; prm1 ]
let he_timer_interval = Duration.(to_f (of_ms 10))
let suspend t _cont he =
let timeout = he_timer_interval in
match with_timeout ~timeout (await_actions_or_events t) with
| Error Timeout -> (he, [], [])
| Ok () ->
let user's_actions_and_events = Miou.Queue.(to_list (transfer t.queue)) in
Logd.debug (fun m ->
m "got %d actions or events" (List.length user's_actions_and_events));
let user's_actions, events =
List.partition_map
(function
| #action as action -> Either.Left action
| #event as event -> Either.Right event)
user's_actions_and_events
in
Logd.debug (fun m ->
m "got %d actions and %d events"
(List.length user's_actions)
(List.length events));
let he, actions = to_actions t he user's_actions in
(he, actions, events)
| Error exn ->
Logd.err (fun m ->
m "Got an unexpected exception (suspend): %s" (Printexc.to_string exn));
raise exn
let rec clean_up prms =
match Miou.care prms with
| Some (Some prm) ->
let _ = Miou.await prm in
clean_up prms
| Some None | None -> Miou.yield ()
let rec go t ~prms he () =
Logd.debug (fun m -> m "daemon tick");
clean_up prms;
let he, cont, actions = Happy_eyeballs.timer he (clock ()) in
List.iter (handle_one_action ~prms t) actions;
let he, actions, events = suspend t cont he in
Logd.debug (fun m ->
m "got %d action(s) and %d event(s)" (List.length actions)
(List.length events));
let he, actions =
List.fold_left
(fun (he, actions) event ->
let he, actions' =
Happy_eyeballs.event he (clock ()) (to_event t event)
in
(he, List.rev_append actions actions'))
(he, actions) events
in
Miou.await_first [ Miou.call_cc timeout; Miou.call_cc ~give fn ]
Logd.debug (fun m -> m "daemon handles %d action(s)" (List.length actions));
List.iter (handle_one_action ~prms t) actions;
go t ~prms he ()
let suspend t he ~prms =
match get_events t he ~prms [] with
| he, (_ :: _ as actions) -> (he, actions)
| he, [] -> (
match with_timeout ~timeout:he_timer_interval (await_actions t he) with
| Error Timeout -> (he, [])
| Error exn ->
Log.err (fun m ->
m "got an unexpected exception: %S" (Printexc.to_string exn));
raise exn
| Ok (he, actions) ->
Log.debug (fun m -> m "return %d action(s)" (List.length actions));
(he, actions))
let rec launch_stack ?aaaa_timeout ?connect_delay ?connect_timeout
?resolve_timeout ?resolve_retries t () =
let launch_daemon ?aaaa_timeout ?connect_delay ?connect_timeout ?resolve_timeout
?resolve_retries t () =
let prms = Miou.orphans () in
let he =
Happy_eyeballs.create ?aaaa_timeout ?connect_delay ?connect_timeout
?resolve_timeout ?resolve_retries (clock ())
in
Log.debug (fun m -> m "the daemon is launched");
Miou.call (go t ~prms he)
and go t ~prms he () =
let he, cont, actions =
if Miou.Queue.is_empty t.queue then Happy_eyeballs.timer he (clock ())
else (he, `Suspend, [])
in
match (cont, actions) with
| `Suspend, [] ->
Log.debug (fun m -> m "the daemon is suspended to a new event");
let he, actions = suspend t he ~prms in
Log.debug (fun m -> m "consume %d action(s)" (List.length actions));
List.iter (handle_one_action ~prms t) actions;
Log.debug (fun m -> m "action(s) launched");
Miou.yield ();
go t ~prms he ()
| _, actions ->
let he, actions = get_events t he ~prms actions in
List.iter (handle_one_action ~prms t) actions;
Miou.yield ();
go t ~prms he ()
let _pp_addr ppf (ipaddr, port) = Fmt.pf ppf "%a:%d" Ipaddr.pp ipaddr port
let connect_ip t ips =
let waiter = Atomic.make In_progress in
Miou.Mutex.protect t.mutex @@ fun () ->
Miou.Queue.enqueue t.queue (`Connect_ip (waiter, ips));
Miou_unix.Cond.signal t.condition;
Log.debug (fun m -> m "the daemon was signaled about another user's action");
Miou.Condition.signal t.condition;
waiter
let connect_ip t ips =
try connect_ip t ips
with exn ->
Logc.err (fun m ->
m "Got an unexpected exception: %S" (Printexc.to_string exn));
raise exn
let to_pairs lst =
List.map (fun (`Plaintext (ipaddr, port)) -> (ipaddr, port)) lst
let he_wait_interval = Duration.(to_f (of_ms 10))
let rec wait value =
Logc.debug (fun m -> m "wait for a connected socket");
match Atomic.get value with
| In_progress ->
Miou_unix.sleep he_wait_interval;
Miou.yield ();
wait value
| Connected (addr, fd) -> (addr, fd)
@ -348,7 +377,8 @@ let rec wait value =
let connect_to_nameservers t nameservers =
let nss = to_pairs nameservers in
let waiter = connect_ip t nss in
let addr, fd = Miou.await_exn (Miou.call_cc (fun () -> wait waiter)) in
let prm = Miou.call_cc @@ fun () -> wait waiter in
let addr, fd = Miou.await_exn prm in
(addr, Miou_unix.of_file_descr fd)
type daemon = unit Miou.t
@ -356,7 +386,7 @@ type daemon = unit Miou.t
let stack ?aaaa_timeout ?connect_delay ?connect_timeout ?resolve_timeout
?resolve_retries () =
let v = create_stack () in
( launch_stack ?aaaa_timeout ?connect_delay ?connect_timeout ?resolve_timeout
( launch_daemon ?aaaa_timeout ?connect_delay ?connect_timeout ?resolve_timeout
?resolve_retries v ()
, v )
@ -366,14 +396,14 @@ let kill = Miou.cancel
type +'a io = 'a
type io_addr = [ `Plaintext of Ipaddr.t * int ]
type t =
{ nameservers : io_addr list
; proto : Dns.proto
; timeout : float
; stack : stack
}
type t = {
nameservers: io_addr list
; proto: Dns.proto
; timeout: float
; stack: stack
}
type context = float * bool ref * Miou_unix.file_descr
type context = float * Miou_unix.file_descr
let nameservers { nameservers; proto; _ } = (proto, nameservers)
let bind x f = f x
@ -384,15 +414,19 @@ let connect t =
match t.proto with
| `Tcp -> (
try
Log.debug (fun m -> m "connect to nameservers");
let _addr, fd = connect_to_nameservers t.stack t.nameservers in
Ok (`Tcp, (t.timeout, ref false, fd))
with Failure msg -> Error (`Msg msg))
Ok (`Tcp, (t.timeout, fd))
with
| Failure msg -> Error (`Msg msg)
| exn ->
Logc.err (fun m ->
m "Got unexpected exception: %S" (Printexc.to_string exn));
raise exn)
| `Udp -> error_msgf "Invalid protocol"
let rec read_loop ?(linger = Cstruct.empty) ~id proto fd =
let process rx =
let rec handle_data ({ Cstruct.len = rx_len; _ } as rx) =
let rec handle_data ({ Cstruct.len= rx_len; _ } as rx) =
if rx_len > 2 then
let len = Cstruct.BE.get_uint16 rx 0 in
if rx_len - 2 >= len then
@ -413,8 +447,8 @@ let rec read_loop ?(linger = Cstruct.empty) ~id proto fd =
match proto with
| `Tcp ->
let buf = Bytes.create 0x10 in
let len = Miou_unix.read fd ~off:0 ~len:(Bytes.length buf) buf in
Log.debug (fun m -> m "got %d byte(s) from the resolver" len);
let len = Miou_unix.read fd buf 0 (Bytes.length buf) in
Logc.debug (fun m -> m "got %d byte(s) from the resolver" len);
if len > 0 then process (Cstruct.of_bytes ~off:0 ~len buf)
else
Fmt.failwith "End of file reading from resolver (linger: %d byte(s))"
@ -428,40 +462,38 @@ let type_of_socket fd =
let ty = Unix.getsockopt_int fd Unix.SO_TYPE in
happy_translate_so_type ty
let send_recv (timeout, closed, fd) ({ Cstruct.len; _ } as tx) =
let send_recv (timeout, fd) ({ Cstruct.len; _ } as tx) =
Logc.debug (fun m ->
m "<=> @[<hov>%a@]" (Hxd_string.pp Hxd.default) (Cstruct.to_string tx));
if len > 4 then begin
match type_of_socket fd with
| Unix.SOCK_STREAM -> (
let fn () =
Log.debug (fun m -> m "send a packet to resolver");
Miou_unix.write fd ~off:0 ~len (Cstruct.to_string tx);
Logc.debug (fun m -> m "sending a dns packet to resolver");
Miou_unix.write fd (Cstruct.to_string tx) 0 len;
let id = Cstruct.BE.get_uint16 tx 2 in
Miou.Ownership.check (Miou_unix.owner fd);
Log.debug (fun m -> m "recv a packet from resolver");
Logc.debug (fun m -> m "recving a dns packet from resolver");
let packet = read_loop ~id `Tcp fd in
(packet, Miou_unix.transfer fd)
(packet, fd)
in
Miou_unix.disown fd;
match with_timeout ~timeout ~give:[ Miou_unix.owner fd ] fn with
match with_timeout ~timeout fn with
| Ok (packet, _) ->
Log.debug (fun m -> m "got a DNS packet from the resolver");
Logc.debug (fun m -> m "got a DNS packet from the resolver");
Ok packet
| Error Timeout ->
Log.warn (fun m -> m "DNS request timeout");
Logc.warn (fun m -> m "DNS request timeout");
error_msgf "DNS request timeout"
| Error (Failure msg) ->
Log.warn (fun m -> m "Got a failure: %s" msg);
closed := true;
Logc.warn (fun m -> m "Got a failure: %s" msg);
Error (`Msg msg)
| Error exn ->
closed := true;
error_msgf "Got an unexpected exception: %S"
(Printexc.to_string exn))
| _ -> error_msgf "Invalid type of file-descriptor"
end
else error_msgf "Invalid context (data length <= 4)"
let close (_, closed, fd) = if not !closed then Miou_unix.close fd
let close (_, fd) = Miou_unix.close fd
let of_ns ns = Int64.to_float ns /. 1_000_000_000.
let create ?nameservers ~timeout stack =
@ -470,22 +502,27 @@ let create ?nameservers ~timeout stack =
| None -> (`Tcp, [ `Plaintext (Ipaddr.of_string_exn "8.8.8.8", 53) ])
| Some (a, nss) -> (a, nss)
in
{ nameservers; proto; timeout = of_ns timeout; stack }
{ nameservers; proto; timeout= of_ns timeout; stack }
external reraise : exn -> 'a = "%reraise"
let connect_ip t ips =
let waiter = connect_ip t ips in
match Miou.await (Miou.call_cc (fun () -> wait waiter)) with
let prm = Miou.call_cc @@ fun () -> wait waiter in
match Miou.await prm with
| Ok (addr, fd) -> Ok (addr, Miou_unix.of_file_descr fd)
| Error (Failure msg) -> Error (`Msg msg)
| Error exn -> reraise exn
let connect_host t host ports =
let waiter = Atomic.make In_progress in
Miou.Queue.enqueue t.queue (`Connect (waiter, host, ports));
Miou_unix.Cond.signal t.condition;
match Miou.await (Miou.call_cc (fun () -> wait waiter)) with
let () =
Miou.Mutex.protect t.mutex @@ fun () ->
Miou.Queue.enqueue t.queue (`Connect (waiter, host, ports));
Miou.Condition.signal t.condition
in
let prm = Miou.call_cc @@ fun () -> wait waiter in
match Miou.await prm with
| Ok (addr, fd) -> Ok (addr, Miou_unix.of_file_descr fd)
| Error (Failure msg) -> Error (`Msg msg)
| Error exn -> reraise exn

View File

@ -5,13 +5,13 @@ include
type daemon
type getaddrinfo =
{ getaddrinfo :
type getaddrinfo = {
getaddrinfo:
'response 'a.
'response Dns.Rr_map.key
-> 'a Domain_name.t
-> ('response, [ `Msg of string ]) result
}
}
[@@unboxed]
val stack :
@ -34,8 +34,6 @@ val inject_resolver : getaddrinfo:getaddrinfo -> stack -> unit
{!val:connect_endpoint}. *)
val kill : daemon -> unit
(** [kill daemon] cleans resources required by our happy-eyeballs implementation.
The user {b must} call at the end of its application this function. *)
val connect_ip :
stack

63
happy/main.ml Normal file
View File

@ -0,0 +1,63 @@
let anchor = Mtime_clock.now ()
let reporter ppf =
let report src level ~over k msgf =
let k _ = over (); k () in
let with_metadata header _tags k ppf fmt =
let ts = Mtime.span (Mtime_clock.now ()) anchor in
let ts = Mtime.Span.to_float_ns ts /. 1e9 in
Format.kfprintf k ppf
("[+%as]%a[%a][%a]: " ^^ fmt ^^ "\n%!")
Fmt.(styled `Magenta (fmt "%04.04f"))
ts Logs_fmt.pp_header (level, header)
Fmt.(styled `Blue int)
(Stdlib.Domain.self () :> int)
Fmt.(styled `Magenta string)
(Logs.Src.name src)
in
msgf @@ fun ?header ?tags fmt -> with_metadata header tags k ppf fmt
in
{ Logs.report }
let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()
let () = Logs.set_reporter (reporter Fmt.stderr)
let () = Logs.set_level ~all:true (Some Logs.Debug)
let () = Logs_threaded.enable ()
let () = Mirage_crypto_rng_unix.initialize (module Mirage_crypto_rng.Fortuna)
let out = Mutex.create ()
let pr fmt =
let finally () = Mutex.unlock out in
Mutex.lock out;
Fun.protect ~finally @@ fun () -> Format.printf fmt
let epr fmt =
let finally () = Mutex.unlock out in
Mutex.lock out;
Fun.protect ~finally @@ fun () -> Format.eprintf fmt
let pp_msg ppf (`Msg msg) = Fmt.string ppf msg
let getaddrinfo dns =
{
Happy.getaddrinfo= (fun record host -> Dns_miou.getaddrinfo dns record host)
}
let pp_sockaddr ppf (ipaddr, port) =
Format.fprintf ppf "%a:%d" Ipaddr.pp ipaddr port
let () =
Miou_unix.run @@ fun () ->
let daemon, stack = Happy.stack () in
let dns = Dns_miou.create stack in
Happy.inject_resolver ~getaddrinfo:(getaddrinfo dns) stack;
for _ = 0 to 10_000 do
match Happy.connect_endpoint stack "google.com" [ 443 ] with
| Ok (sockaddr, fd) ->
Format.printf "Connected to google.com via %a\n%!" pp_sockaddr sockaddr;
Miou_unix.close fd
| Error (`Msg msg) ->
Logs.err (fun m -> m "Got an error: %s" msg);
failwith msg
done;
Happy.kill daemon

View File

@ -32,7 +32,7 @@ build: [
synopsis: "A simple HTTP client using http/af, h2, and miou"
pin-depends: [
[ "miou.dev" "git+https://github.com/robur-coop/miou.git#d795f08fc64f3e53077172dbfedfcefe47a2b832" ]
[ "mirage-crypto.0.11.2" "git+https://github.com/dinosaure/mirage-crypto.git#13bd9191f42cfcad84a808a79d97788f65af90e9" ]
[ "mirage-crypto-rng.0.11.2" "git+https://github.com/dinosaure/mirage-crypto.git#13bd9191f42cfcad84a808a79d97788f65af90e9" ]
[ "mirage-crypto.0.11.3" "git+https://github.com/dinosaure/mirage-crypto.git#13bd9191f42cfcad84a808a79d97788f65af90e9" ]
[ "mirage-crypto-rng.0.11.3" "git+https://github.com/dinosaure/mirage-crypto.git#13bd9191f42cfcad84a808a79d97788f65af90e9" ]
[ "alcotest.1.7.0" "git+https://github.com/dinosaure/alcotest.git#f690bdeb7b1c3eb5c9016dc7097c400dd42e492b" ]
]

View File

@ -1,18 +1,5 @@
(library
(name httpcats)
(public_name httpcats)
(modules flow runtime http_miou_unix http_miou_client http_miou_server
httpcats tls_miou)
(libraries ca-certs httpaf h2 httpcats.happy tls miou miou.unix))
(library
(name happy)
(public_name httpcats.happy)
(modules happy dns_miou)
(foreign_stubs
(language c)
(names happy)
(flags (:standard)))
(wrapped false)
(libraries mtime.clock.os ipaddr.unix mirage-crypto-rng happy-eyeballs dns
dns-client miou miou.unix))
(modules httpcats tls_miou http_miou_unix http_miou_client http_miou_server
flow runtime)
(libraries hxd.string hxd.core ca-certs happy httpaf h2 tls miou.unix))

View File

@ -1,10 +1,10 @@
module type S = sig
type t
type error
type error = private [> `Closed ]
val pp_error : error Fmt.t
val read : t -> bytes -> off:int -> len:int -> (int, error) result
val writev : t -> Cstruct.t list -> (unit, error) result
val close : t -> unit
val shutdown : t -> [ `Recv | `Send ] -> unit
val shutdown : t -> [ `read | `write | `read_write ] -> unit
end

View File

@ -1,11 +1,8 @@
let src = Logs.Src.create "http-miou-client"
module Log = (val Logs.src_log src : Logs.LOG)
open Http_miou_unix
module TLS_for_httpaf = struct
include TLS
let shutdown flow _ = Miou_unix.disown flow.flow
end
module Httpaf_Client_connection = struct
include Httpaf.Client_connection
@ -15,27 +12,16 @@ module Httpaf_Client_connection = struct
(next_read_operation t :> [ `Close | `Read | `Yield ])
end
module A = Runtime.Make (TLS_for_httpaf) (Httpaf_Client_connection)
module A = Runtime.Make (TLS) (Httpaf_Client_connection)
module B = Runtime.Make (TCP) (Httpaf_Client_connection)
module C = Runtime.Make (TLS) (H2.Client_connection)
module D = Runtime.Make (TCP) (H2.Client_connection)
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of TLS.t | `Tcp of Miou_unix.file_descr ]
type flow = [ `Tls of Http_miou_unix.TLS.t | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of Httpaf.Response.t | `V2 of H2.Response.t ]
type 'body body =
{ body : 'body
; write_string : 'body -> ?off:int -> ?len:int -> string -> unit
; close : 'body -> unit
; release : unit -> unit
}
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t body) version
| V2 : (H2.Response.t, H2.Body.Writer.t body) version
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
@ -55,186 +41,148 @@ let pp_error ppf = function
Fmt.pf ppf "Protocol error %a: %s" H2.Error_code.pp_hum err msg
| `Protocol msg -> Fmt.string ppf msg
type ('resp, 'acc) await = unit -> ('resp * 'acc, error) result
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t) version
| V2 : (H2.Response.t, H2.Body.Writer.t) version
type 'resp await = unit -> ('resp, error) result
type 'acc process =
| Process :
('resp, 'body) version * ('resp, 'acc) await * 'body
('resp, 'body) version * ('resp * 'acc) await * 'body
-> 'acc process
let src = Logs.Src.create "http-miou-client"
let http_1_1_response_handler ~f acc =
let acc = ref acc in
let response = ref None in
let go resp body orphans =
let rec on_eof () = Httpaf.Body.close_reader body
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f (`V1 resp) !acc str;
Httpaf.Body.schedule_read body ~on_read ~on_eof
in
response := Some (`V1 resp);
Httpaf.Body.schedule_read body ~on_read ~on_eof;
Runtime.terminate orphans
in
let response_handler resp body = Runtime.flat_tasks (go resp body) in
(response_handler, response, acc)
module Log = (val Logs.src_log src : Logs.LOG)
let http_1_1_error_handler () =
let error = ref None in
let error_handler = function
| `Exn (Runtime.Flow msg) -> error := Some (`Protocol msg)
| err -> error := Some (`V1 err)
in
(error_handler, error)
let h2_response_handler conn ~f acc =
let acc = ref acc in
let response = ref None in
let go resp body orphans =
let rec on_eof () =
H2.Body.Reader.close body;
H2.Client_connection.shutdown conn
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f (`V2 resp) !acc str;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
response := Some (`V1 resp);
H2.Body.Reader.schedule_read body ~on_read ~on_eof;
Log.debug (fun m -> m "reader terminates");
Runtime.terminate orphans
in
let response_handler resp body = Runtime.flat_tasks (go resp body) in
(response_handler, response, acc)
let h2_error_handler () =
let error = ref None in
let error_handler = function
| `Exn (Runtime.Flow msg) -> error := Some (`Protocol msg)
| err -> error := Some (`V2 err)
in
(error_handler, error)
let pp_request ppf (flow, request) =
match (flow, request) with
| `Tls _, `V1 _ -> Fmt.string ppf "http/1.1 + tls"
| `Tcp _, `V1 _ -> Fmt.string ppf "http/1.1"
| `Tls _, `V2 _ -> Fmt.string ppf "h2 + tls"
| `Tcp _, `V2 _ -> Fmt.string ppf "h2"
(* NOTE(dinosaure): we avoid first-class module here. *)
let run ~f acc config flow request =
let response : response option ref = ref None
and error = ref None
and acc = ref acc in
let error_handler err =
Log.err (fun m -> m "got an error: %a" pp_error err);
match err with
| `V1 (`Exn (Runtime.Flow msg)) | `V2 (`Exn (Runtime.Flow msg)) ->
error := Some (`Protocol msg)
| err -> error := Some err
in
let response_handler ?(shutdown = Fun.const ()) = function
| `V1 (resp, body) ->
let rec on_eof = shutdown
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f (`V1 resp) !acc str;
Httpaf.Body.schedule_read body ~on_read ~on_eof
in
response := Some (`V1 resp);
Httpaf.Body.schedule_read body ~on_read ~on_eof
| `V2 (resp, body) ->
let rec on_eof = shutdown
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f (`V2 resp) !acc str;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
response := Some (`V2 resp);
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
let give =
match flow with
| `Tls flow -> [ Miou_unix.owner flow.TLS.flow ]
| `Tcp flow -> [ Miou_unix.owner flow ]
in
Log.debug (fun m -> m "Start a new %a request" pp_request (flow, request));
match (flow, config, request) with
| `Tls flow, `V1 config, `V1 request ->
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let disown flow = Miou_unix.disown flow.TLS.flow in
let response_handler resp body = response_handler (`V1 (resp, body)) in
let error_handler error = error_handler (`V1 error) in
let response_handler, response, acc = http_1_1_response_handler ~f acc in
let error_handler, error = http_1_1_error_handler () in
let body, conn =
Httpaf.Client_connection.request ~config request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { Runtime.protect }, prm, close =
Log.debug (fun m -> m "start an http/1.1 request over TLS");
A.run conn ~give ~disown ~read_buffer_size flow
in
let prm = A.run conn ~read_buffer_size flow in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 response) -> Ok (response, !acc)
| Ok (), None, Some (`V1 resp) -> Ok (resp, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let release () =
Runtime.terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (Httpaf.Body.write_string body ?off ?len) str
in
let close body = protect ~orphans Httpaf.Body.close_writer body in
let body = { body; write_string; close; release } in
Process (V1, await, body)
| `Tcp flow, `V1 config, `V1 request ->
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let disown = Miou_unix.disown in
let response_handler resp body = response_handler (`V1 (resp, body)) in
let error_handler error = error_handler (`V1 error) in
let response_handler, response, acc = http_1_1_response_handler ~f acc in
let error_handler, error = http_1_1_error_handler () in
let body, conn =
Httpaf.Client_connection.request ~config request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { Runtime.protect }, prm, close =
B.run conn ~give ~disown ~read_buffer_size flow
in
let prm = B.run conn ~read_buffer_size flow in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 response) -> Ok (response, !acc)
| Ok (), None, Some (`V1 resp) -> Ok (resp, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let release () =
Runtime.terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (Httpaf.Body.write_string body ?off ?len) str
in
let close body = protect ~orphans Httpaf.Body.close_writer body in
let body = { body; write_string; close; release } in
Process (V1, await, body)
| `Tls flow, `V2 config, `V2 request ->
let read_buffer_size = config.H2.Config.read_buffer_size in
let disown flow = Miou_unix.disown flow.TLS.flow in
let error_handler error = error_handler (`V2 error) in
let error_handler, error = h2_error_handler () in
let conn = H2.Client_connection.create ~config ~error_handler () in
let shutdown () = H2.Client_connection.shutdown conn in
let response_handler resp body =
response_handler ~shutdown (`V2 (resp, body))
in
let response_handler, response, acc = h2_response_handler conn ~f acc in
let body =
H2.Client_connection.request conn request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { Runtime.protect }, prm, close =
Log.debug (fun m -> m "start an h2 request over TLS");
C.run conn ~give ~disown ~read_buffer_size flow
H2.Client_connection.request conn ~error_handler ~response_handler
request
in
let prm = C.run conn ~read_buffer_size flow in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V2 (`Exn exn))
| Ok (), None, Some (`V2 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V1 _) | None) -> assert false
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 resp) -> Ok (resp, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let release () =
Runtime.terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (H2.Body.Writer.write_string body ?off ?len) str
in
let close body =
Log.debug (fun m -> m "close the stream from the application level");
protect ~orphans H2.Body.Writer.close body
in
let body = { body; write_string; close; release } in
Process (V2, await, body)
| `Tcp flow, `V2 config, `V2 request ->
let read_buffer_size = config.H2.Config.read_buffer_size in
let disown = Miou_unix.disown in
let error_handler error = error_handler (`V2 error) in
let error_handler, error = h2_error_handler () in
let conn = H2.Client_connection.create ~config ~error_handler () in
let shutdown () = H2.Client_connection.shutdown conn in
let response_handler resp body =
response_handler ~shutdown (`V2 (resp, body))
in
let response_handler, response, acc = h2_response_handler conn ~f acc in
let body =
H2.Client_connection.request conn request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { Runtime.protect }, prm, close =
D.run conn ~give ~disown ~read_buffer_size flow
H2.Client_connection.request conn ~error_handler ~response_handler
request
in
let prm = D.run conn ~read_buffer_size flow in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V2 (`Exn exn))
| Ok (), None, Some (`V2 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V1 _) | None) -> assert false
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 resp) -> Ok (resp, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let release () =
Runtime.terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (H2.Body.Writer.write_string body ?off ?len) str
in
let close body = protect ~orphans H2.Body.Writer.close body in
let body = { body; write_string; close; release } in
Process (V2, await, body)
| _ -> Fmt.invalid_arg "Http_miou_unix.run: incompatible arguments"
| _ -> invalid_arg "Http_miou_client.run"

View File

@ -10,22 +10,15 @@ type error =
val pp_error : error Fmt.t
type 'body body =
{ body : 'body
; write_string : 'body -> ?off:int -> ?len:int -> string -> unit
; close : 'body -> unit
; release : unit -> unit
}
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t body) version
| V2 : (H2.Response.t, H2.Body.Writer.t body) version
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t) version
| V2 : (H2.Response.t, H2.Body.Writer.t) version
type ('resp, 'acc) await = unit -> ('resp * 'acc, error) result
type 'resp await = unit -> ('resp, error) result
type 'acc process =
| Process :
('resp, 'body) version * ('resp, 'acc) await * 'body
('resp, 'body) version * ('resp * 'acc) await * 'body
-> 'acc process
val run :

View File

@ -1,25 +1,22 @@
open Http_miou_unix
module A = Runtime.Make (TLS) (Httpaf.Server_connection)
module TLS_for_httpaf = struct
include TLS
module TCP_and_httpaf = struct
include TCP
let shutdown flow _ = Miou_unix.disown flow.flow
let shutdown flow = function `read -> () | value -> shutdown flow value
end
module A = Runtime.Make (TLS_for_httpaf) (Httpaf.Server_connection)
module B = Runtime.Make (TCP) (Httpaf.Server_connection)
module B = Runtime.Make (TCP_and_httpaf) (Httpaf.Server_connection)
module C = Runtime.Make (TLS) (H2.Server_connection)
[@@@warning "-34"]
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of TLS.t | `Tcp of Miou_unix.file_descr ]
type error =
[ `V1 of Httpaf.Server_connection.error
| `V2 of H2.Server_connection.error
| `Protocol of string ]
type stop = Miou.Mutex.t * Miou.Condition.t * bool ref
let pp_error ppf = function
| `V1 `Bad_request -> Fmt.string ppf "Bad HTTP/1.1 request"
| `V1 `Bad_gateway -> Fmt.string ppf "Bad HTTP/1.1 gateway"
@ -37,24 +34,18 @@ module Method = H2.Method
module Headers = H2.Headers
module Status = H2.Status
exception Body_already_sent
type request = {
meth: Method.t
; target: string
; scheme: string
; headers: Headers.t
}
type request =
{ meth : Method.t; target : string; scheme : string; headers : Headers.t }
type response = { status : Status.t; headers : Headers.t }
let response_to_httpaf response =
let headers = Httpaf.Headers.of_list (H2.Headers.to_list response.headers) in
let status =
match response.status with
| #Httpaf.Status.t as status -> status
| _ -> invalid_arg "Invalid HTTP/1.1 status"
in
Httpaf.Response.create ~headers status
let response_to_h2 response =
H2.Response.create ~headers:response.headers response.status
type response = { status: Status.t; headers: Headers.t }
type body = [ `V1 of [ `write ] Httpaf.Body.t | `V2 of H2.Body.Writer.t ]
type reqd = [ `V1 of Httpaf.Reqd.t | `V2 of H2.Reqd.t ]
type error_handler = ?request:request -> error -> (Headers.t -> body) -> unit
type handler = reqd -> unit
let request_from_httpaf ~scheme { Httpaf.Request.meth; target; headers; _ } =
let headers = Headers.of_list (Httpaf.Headers.to_list headers) in
@ -63,368 +54,207 @@ let request_from_httpaf ~scheme { Httpaf.Request.meth; target; headers; _ } =
let request_from_h2 { H2.Request.meth; target; scheme; headers } =
{ meth; target; scheme; headers }
type output =
{ write_string : ?off:int -> ?len:int -> string -> unit
; write_bigstring : ?off:int -> ?len:int -> Bigstringaf.t -> unit
; close : unit -> unit
}
type on_read = Bigstringaf.t -> off:int -> len:int -> unit
type on_eof = unit -> unit
type input = { schedule : on_eof:on_eof -> on_read:on_read -> unit } [@@unboxed]
type _ Effect.t += String : response * string -> unit Effect.t
type _ Effect.t += Bigstring : response * Bigstringaf.t -> unit Effect.t
type _ Effect.t += Stream : response -> output Effect.t
type _ Effect.t += Get : input Effect.t
let string ~status ?(headers = Headers.empty) str =
let response = { status; headers } in
Effect.perform (String (response, str))
let bigstring ~status ?(headers = Headers.empty) bstr =
let response = { status; headers } in
Effect.perform (Bigstring (response, bstr))
let stream ?(headers = Headers.empty) status =
let response = { status; headers } in
Effect.perform (Stream response)
let get () = Effect.perform Get
type error_handler =
?request:request -> error -> (H2.Headers.t -> output) -> unit
type handler = request -> unit
let pp_sockaddr ppf = function
| Unix.ADDR_UNIX name -> Fmt.pf ppf "<%s>" name
| Unix.ADDR_INET (inet_addr, port) ->
Fmt.pf ppf "%s:%d" (Unix.string_of_inet_addr inet_addr) port
let rec basic_handler ~exnc =
let open Effect.Shallow in
let fail k = discontinue_with k Body_already_sent (basic_handler ~exnc) in
let retc = Fun.id in
let effc :
type c. c Effect.t -> ((c, 'a) Effect.Shallow.continuation -> 'b) option =
function
| String _ | Bigstring _ | Stream _ ->
Log.err (fun m -> m "the user wants to write to the peer a second time");
Some fail
| _ -> None
in
{ retc; exnc; effc }
let httpaf_handler ~sockaddr ~scheme ~protect:{ Runtime.protect } ~orphans
~handler reqd =
let open Httpaf in
let open Effect.Deep in
let rec retc = Fun.id
and exnc = protect ~orphans (Reqd.report_exn reqd)
and effc :
type c. c Effect.t -> ((c, 'a) Effect.Deep.continuation -> 'b) option =
function
| Get ->
let body = Httpaf.Reqd.request_body reqd in
let schedule ~on_eof ~on_read =
let on_eof () =
match_with on_eof () { retc; exnc; effc };
protect ~orphans Httpaf.Body.close_reader body;
Runtime.terminate orphans (* XXX(dinosaure): really important! *)
in
protect ~orphans
(match_with (Httpaf.Body.schedule_read ~on_eof ~on_read) body)
{ retc; exnc; effc }
in
Log.debug (fun m -> m "return a stream of the body request");
Some (fun k -> continue k { schedule })
| String (response, str) ->
let response = response_to_httpaf response in
Log.debug (fun m -> m "write a http/1.1 response and its body");
protect ~orphans (Reqd.respond_with_string reqd response) str;
Some (fun k -> continue k ())
| Stream response ->
let response = response_to_httpaf response in
let body =
protect ~orphans (Reqd.respond_with_streaming reqd) response
in
let write_string ?off ?len str =
protect ~orphans (Body.write_string body ?off ?len) str
in
let write_bigstring ?off ?len bstr =
protect ~orphans (Body.write_bigstring body ?off ?len) bstr
in
let close () = protect ~orphans Body.close_writer body in
let stream = { write_string; write_bigstring; close } in
Some (fun k -> continue k stream)
| _ -> None
in
let fn request =
let request = request_from_httpaf ~scheme request in
handler request;
Runtime.terminate orphans;
Log.debug (fun m -> m "the handler for %a has ended" pp_sockaddr sockaddr)
in
match_with fn (Reqd.request reqd) { retc; exnc; effc }
let h2_handler ~sockaddr ~protect:{ Runtime.protect } ~orphans ~handler reqd =
let open H2 in
let open Effect.Shallow in
let retc = Fun.id in
let exnc = protect ~orphans (Reqd.report_exn reqd) in
let effc :
type c. c Effect.t -> ((c, 'a) Effect.Shallow.continuation -> 'b) option =
function
| String (response, str) ->
let response = response_to_h2 response in
Log.debug (fun m -> m "write a h2 response and its body");
protect ~orphans (Reqd.respond_with_string reqd response) str;
let handler = basic_handler ~exnc in
Some (fun k -> continue_with k () handler)
| Stream response ->
let response = response_to_h2 response in
let body =
protect ~orphans (Reqd.respond_with_streaming reqd) response
in
let write_string ?off ?len str =
protect ~orphans (Body.Writer.write_string body ?off ?len) str
in
let write_bigstring ?off ?len bstr =
protect ~orphans (Body.Writer.write_bigstring body ?off ?len) bstr
in
let close () = protect ~orphans Body.Writer.close body in
let stream = { write_string; write_bigstring; close } in
let handler = basic_handler ~exnc in
Some (fun k -> continue_with k stream handler)
| _ -> None
in
let fn request =
let request = request_from_h2 request in
handler request;
Runtime.terminate orphans;
Log.debug (fun m -> m "the handler for %a has ended" pp_sockaddr sockaddr)
in
continue_with (fiber fn) (Reqd.request reqd) { retc; exnc; effc }
let rec clean orphans =
match Miou.care orphans with
| Some (Some prm) ->
Miou.await_exn prm;
clean orphans
| Some None | None -> ()
let default_error_handler ?request:_ _err _respond = ()
let http_1_1_server_connection ~config ~user's_error_handler ~user's_handler
flow =
let scheme = "http" in
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let error_handler ?request err respond =
let request = Option.map (request_from_httpaf ~scheme) request in
let err =
match err with `Exn (Runtime.Flow msg) -> `Protocol msg | err -> `V1 err
in
Runtime.flat_tasks @@ fun orphans ->
let respond hdrs =
let hdrs = Httpaf.Headers.of_list (Headers.to_list hdrs) in
let body = respond hdrs in
`V1 body
in
user's_error_handler ?request err respond;
Runtime.terminate orphans
in
let request_handler reqd =
Runtime.flat_tasks @@ fun orphans ->
user's_handler (`V1 reqd);
Runtime.terminate orphans
in
let conn =
Httpaf.Server_connection.create ~config ~error_handler request_handler
in
Miou.await_exn (B.run conn ~read_buffer_size flow)
let https_1_1_server_connection ~config ~user's_error_handler ~user's_handler
flow =
let scheme = "https" in
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let error_handler ?request err respond =
let request = Option.map (request_from_httpaf ~scheme) request in
let err =
match err with `Exn (Runtime.Flow msg) -> `Protocol msg | err -> `V1 err
in
Runtime.flat_tasks @@ fun orphans ->
let respond hdrs =
let hdrs = Httpaf.Headers.of_list (Headers.to_list hdrs) in
let body = respond hdrs in
`V1 body
in
user's_error_handler ?request err respond;
Runtime.terminate orphans
in
let request_handler reqd =
Runtime.flat_tasks @@ fun orphans ->
user's_handler (`V1 reqd);
Runtime.terminate orphans
in
let conn =
Httpaf.Server_connection.create ~config ~error_handler request_handler
in
Miou.await_exn (A.run conn ~read_buffer_size flow)
let h2s_server_connection ~config ~user's_error_handler ~user's_handler flow =
let read_buffer_size = config.H2.Config.read_buffer_size in
let error_handler ?request err respond =
let request = Option.map request_from_h2 request in
let err =
match err with `Exn (Runtime.Flow msg) -> `Protocol msg | err -> `V2 err
in
Runtime.flat_tasks @@ fun orphans ->
let respond hdrs = `V2 (respond hdrs) in
user's_error_handler ?request err respond;
Runtime.terminate orphans
in
let request_handler reqd =
Runtime.flat_tasks @@ fun orphans ->
user's_handler (`V2 reqd);
Runtime.terminate orphans
in
let conn =
H2.Server_connection.create ~config ~error_handler request_handler
in
Miou.await_exn (C.run conn ~read_buffer_size flow)
let rec clean_up orphans =
match Miou.care orphans with
| None | Some None -> ()
| Some (Some prm) -> (
match Miou.await prm with
| Ok () -> clean_up orphans
| Error exn ->
Log.err (fun m ->
m "unexpected exception: %s" (Printexc.to_string exn));
clean_up orphans)
exception Stop
let wait ~stop () =
Miou_unix.Cond.wait stop;
raise_notrace Stop
let rec wait ((m, c, v) as stop) () =
let value =
Miou.Mutex.protect m @@ fun () ->
while not !v do
Miou.Condition.wait c m
done;
!v
in
if value then raise Stop else wait stop ()
let stop () = (Miou.Mutex.create (), Miou.Condition.create (), ref false)
let switch (m, c, v) =
Miou.Mutex.protect m @@ fun () ->
v := true;
Miou.Condition.broadcast c
let accept_or_stop ?stop file_descr =
match stop with
| None -> Some (Miou_unix.accept file_descr)
| Some stop -> (
let accept =
Miou.call_cc ~give:[ Miou_unix.owner file_descr ] @@ fun () ->
let file_descr', sockaddr = Miou_unix.accept file_descr in
Log.debug (fun m ->
m "receive a new tcp/ip connection from: %a" pp_sockaddr sockaddr);
Miou_unix.disown file_descr;
(Miou_unix.transfer file_descr', sockaddr)
in
let wait = Miou.call_cc (wait ~stop) in
Miou.await_first [ accept; wait ] |> function
| Ok value ->
let _ = Miou.await wait in
Some value
let accept = Miou.call_cc @@ fun () -> Miou_unix.accept file_descr in
let wait = Miou.call_cc (wait stop) in
Log.debug (fun m -> m "waiting for a client");
match Miou.await_first [ accept; wait ] with
| Ok (fd, sockaddr) -> Some (fd, sockaddr)
| Error Stop -> None
| Error exn -> raise exn)
| Error exn ->
Log.err (fun m ->
m "unexpected exception: %S" (Printexc.to_string exn));
raise exn)
let http_1_1_server_connection ~config ~sockaddr ~user's_error_handler ~handler
file_descr =
let scheme = "http" in
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let give = [ Miou_unix.owner file_descr ] in
let orphans = Miou.orphans () in
let rec error_handler ?request err respond =
let { Runtime.protect }, _, _ = Lazy.force process in
let request = Option.map (request_from_httpaf ~scheme) request in
let respond hdrs =
let open Httpaf in
let hdrs = Httpaf.Headers.of_list (H2.Headers.to_list hdrs) in
let body = protect ~orphans respond hdrs in
let write_string ?off ?len str =
protect ~orphans (Body.write_string body ?off ?len) str
in
let write_bigstring ?off ?len bstr =
protect ~orphans (Body.write_bigstring body ?off ?len) bstr
in
let close () = protect ~orphans Body.close_writer body in
{ write_string; write_bigstring; close }
in
match err with
| `Exn (Runtime.Flow msg) ->
user's_error_handler ?request (`Protocol msg :> error) respond
| err -> user's_error_handler ?request (`V1 err) respond
and request_handler reqd =
let protect, _, _ = Lazy.force process in
httpaf_handler ~sockaddr ~scheme ~protect ~orphans ~handler reqd
and conn =
lazy
(Httpaf.Server_connection.create ~config ~error_handler request_handler)
and process =
lazy
(B.run (Lazy.force conn) ~give ~disown:Miou_unix.disown ~read_buffer_size
file_descr)
in
let _, prm, close = Lazy.force process in
Log.debug (fun m -> m "the http/1.1 server connection is launched");
let _result = Miou.await prm in
Runtime.terminate orphans;
Log.debug (fun m -> m "the http/1.1 server connection is ended");
close ()
let pp_sockaddr ppf = function
| Unix.ADDR_UNIX str -> Fmt.pf ppf "<%s>" str
| Unix.ADDR_INET (inet_addr, port) ->
Fmt.pf ppf "%s:%d" (Unix.string_of_inet_addr inet_addr) port
let https_1_1_server_connection ~config ~sockaddr ~user's_error_handler ~handler
file_descr =
let scheme = "https" in
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let give = [ Miou_unix.owner file_descr.TLS.flow ] in
let disown flow = Miou_unix.disown flow.TLS.flow in
let orphans = Miou.orphans () in
let rec error_handler ?request err respond =
let { Runtime.protect }, _, _ = Lazy.force process in
let request = Option.map (request_from_httpaf ~scheme) request in
let respond hdrs =
let open Httpaf in
let hdrs = Httpaf.Headers.of_list (H2.Headers.to_list hdrs) in
let body = protect ~orphans respond hdrs in
let write_string ?off ?len str =
protect ~orphans (Body.write_string body ?off ?len) str
in
let write_bigstring ?off ?len bstr =
protect ~orphans (Body.write_bigstring body ?off ?len) bstr
in
let close () = protect ~orphans Body.close_writer body in
{ write_string; write_bigstring; close }
in
match err with
| `Exn (Runtime.Flow msg) ->
user's_error_handler ?request (`Protocol msg :> error) respond
| err -> user's_error_handler ?request (`V1 err) respond
and request_handler reqd =
let protect, _, _ = Lazy.force process in
httpaf_handler ~sockaddr ~scheme ~protect ~orphans ~handler reqd
and conn =
lazy
(Httpaf.Server_connection.create ~config ~error_handler request_handler)
and process =
lazy (A.run (Lazy.force conn) ~give ~disown ~read_buffer_size file_descr)
in
let _, prm, close = Lazy.force process in
Log.debug (fun m -> m "the http/1.1 server connection is launched");
let _result = Miou.await prm in
Runtime.terminate orphans;
Log.debug (fun m -> m "the http/1.1 server connection is ended");
close ()
let h2s_server_connection ~config ~sockaddr ~user's_error_handler ~handler
file_descr =
let read_buffer_size = config.H2.Config.read_buffer_size in
let give = [ Miou_unix.owner file_descr.TLS.flow ] in
let disown flow = Miou_unix.disown flow.TLS.flow in
let orphans = Miou.orphans () in
let rec error_handler ?request err respond =
let { Runtime.protect }, _, _ = Lazy.force process in
let request = Option.map request_from_h2 request in
let respond hdrs =
let open H2 in
let body = protect ~orphans respond hdrs in
let write_string ?off ?len str =
protect ~orphans (Body.Writer.write_string body ?off ?len) str
in
let write_bigstring ?off ?len bstr =
protect ~orphans (Body.Writer.write_bigstring body ?off ?len) bstr
in
let close () = protect ~orphans Body.Writer.close body in
{ write_string; write_bigstring; close }
in
match err with
| `Exn (Runtime.Flow msg) ->
user's_error_handler ?request (`Protocol msg :> error) respond
| err -> user's_error_handler ?request (`V2 err) respond
and request_handler reqd =
let protect, _, _ = Lazy.force process in
h2_handler ~sockaddr ~protect ~orphans ~handler reqd
and conn =
lazy (H2.Server_connection.create ~config ~error_handler request_handler)
and process =
lazy (C.run (Lazy.force conn) ~give ~disown ~read_buffer_size file_descr)
in
let _, prm, close = Lazy.force process in
Log.debug (fun m -> m "the h2 server connection is launched");
let _result = Miou.await prm in
Runtime.terminate orphans;
close ()
let clear ?stop ?(config = Httpaf.Config.default)
?error_handler:(user's_error_handler = default_error_handler) ~handler
file_descr =
let clear ?stop ?(config = Httpaf.Config.default) ?backlog
?error_handler:(user's_error_handler = default_error_handler)
~handler:user's_handler sockaddr =
let rec go orphans file_descr =
match accept_or_stop ?stop file_descr with
| None -> Runtime.terminate orphans
| Some (file_descr', sockaddr) ->
Log.debug (fun m -> m "receive a new client: %a" pp_sockaddr sockaddr);
clean orphans;
let give = [ Miou_unix.owner file_descr' ] in
| None ->
Log.debug (fun m -> m "stop the server");
Runtime.terminate orphans;
Miou_unix.close file_descr
| Some (fd', sockaddr) ->
Log.debug (fun m ->
m "receive a connection from: %a" pp_sockaddr sockaddr);
clean_up orphans;
let _ =
Miou.call ~orphans ~give @@ fun () ->
http_1_1_server_connection ~config ~sockaddr ~user's_error_handler
~handler file_descr'
Miou.call ~orphans @@ fun () ->
http_1_1_server_connection ~config ~user's_error_handler
~user's_handler fd'
in
Miou_unix.disown file_descr';
go orphans file_descr
in
go (Miou.orphans ()) file_descr
let socket =
match sockaddr with
| Unix.ADDR_UNIX _ -> invalid_arg "Impossible to create a Unix socket"
| Unix.ADDR_INET (inet_addr, _) ->
if Unix.is_inet6_addr inet_addr then Miou_unix.tcpv6 ()
else Miou_unix.tcpv4 ()
in
Miou_unix.bind_and_listen ?backlog socket sockaddr;
go (Miou.orphans ()) socket
let alpn tls =
match Http_miou_unix.epoch tls with
| Some { Tls.Core.alpn_protocol= protocol; _ } -> protocol
| None -> None
let with_tls ?stop ?(config = `Both (Httpaf.Config.default, H2.Config.default))
?error_handler:(user's_error_handler = default_error_handler) tls_config
~handler file_descr =
?backlog ?error_handler:(user's_error_handler = default_error_handler)
tls_config ~handler:user's_handler sockaddr =
let rec go orphans file_descr =
match accept_or_stop ?stop file_descr with
| None -> Runtime.terminate orphans
| Some (file_descr', sockaddr) ->
clean orphans;
let give = [ Miou_unix.owner file_descr' ] in
| None -> Runtime.terminate orphans; Miou_unix.close file_descr
| Some (fd', _sockaddr) ->
clean_up orphans;
let _ =
Miou.call ~orphans ~give @@ fun () ->
match TLS.server_of_flow tls_config file_descr' with
Miou.call ~orphans @@ fun () ->
match TLS.server_of_flow tls_config fd' with
| Error err ->
Log.err (fun m ->
m "got a TLS error during the handshake: %a" TLS.pp_error err);
Miou_unix.close file_descr'
Miou_unix.close fd'
| Ok tls_flow -> begin
match (config, epoch tls_flow) with
| `Both (_, h2), Some { Tls.Core.alpn_protocol = Some "h2"; _ }
| ( `H2 h2
, (Some { Tls.Core.alpn_protocol = Some "h2" | None; _ } | None)
) ->
h2s_server_connection ~config:h2 ~sockaddr
~user's_error_handler ~handler tls_flow
| ( `Both (httpaf, _)
, Some { Tls.Core.alpn_protocol = Some "http/1.1"; _ } )
| ( `HTTP_1_1 httpaf
, ( Some { Tls.Core.alpn_protocol = Some "http/1.1" | None; _ }
| None ) ) ->
https_1_1_server_connection ~config:httpaf ~sockaddr
~user's_error_handler ~handler tls_flow
| `Both _, (Some { Tls.Core.alpn_protocol = None; _ } | None) ->
assert false
| _, Some { Tls.Core.alpn_protocol = Some _protocol; _ } ->
assert false
match (config, alpn tls_flow) with
| `Both (_, h2), Some "h2" | `H2 h2, (Some "h2" | None) ->
h2s_server_connection ~config:h2 ~user's_error_handler
~user's_handler tls_flow
| `Both (httpaf, _), Some "http/1.1"
| `HTTP_1_1 httpaf, (Some "http/1.1" | None) ->
https_1_1_server_connection ~config:httpaf
~user's_error_handler ~user's_handler tls_flow
| `Both _, None -> assert false
| _, Some _protocol -> assert false
end
in
Miou_unix.disown file_descr';
go orphans file_descr
in
go (Miou.orphans ()) file_descr
let socket =
match sockaddr with
| Unix.ADDR_UNIX _ -> invalid_arg "Impossible to create a Unix socket"
| Unix.ADDR_INET (inet_addr, _) ->
if Unix.is_inet6_addr inet_addr then Miou_unix.tcpv6 ()
else Miou_unix.tcpv4 ()
in
Miou_unix.bind_and_listen ?backlog socket sockaddr;
go (Miou.orphans ()) socket

View File

@ -6,45 +6,42 @@ module Method = H2.Method
module Headers = H2.Headers
module Status = H2.Status
type request =
{ meth : Method.t; target : string; scheme : string; headers : Headers.t }
type stop
type output =
{ write_string : ?off:int -> ?len:int -> string -> unit
; write_bigstring : ?off:int -> ?len:int -> Bigstringaf.t -> unit
; close : unit -> unit
}
val stop : unit -> stop
val switch : stop -> unit
type on_read = Bigstringaf.t -> off:int -> len:int -> unit
type on_eof = unit -> unit
type input = { schedule : on_eof:on_eof -> on_read:on_read -> unit } [@@unboxed]
type request = {
meth: Method.t
; target: string
; scheme: string
; headers: Headers.t
}
val string : status:Status.t -> ?headers:Headers.t -> string -> unit
val bigstring : status:Status.t -> ?headers:Headers.t -> Bigstringaf.t -> unit
val stream : ?headers:Headers.t -> Status.t -> output
val get : unit -> input
type error_handler =
?request:request -> error -> (H2.Headers.t -> output) -> unit
type handler = request -> unit
type response = { status: Status.t; headers: Headers.t }
type body = [ `V1 of [ `write ] Httpaf.Body.t | `V2 of H2.Body.Writer.t ]
type reqd = [ `V1 of Httpaf.Reqd.t | `V2 of H2.Reqd.t ]
type error_handler = ?request:request -> error -> (Headers.t -> body) -> unit
type handler = reqd -> unit
val clear :
?stop:Miou_unix.Cond.t
?stop:stop
-> ?config:Httpaf.Config.t
-> ?backlog:int
-> ?error_handler:error_handler
-> handler:handler
-> Miou_unix.file_descr
-> Unix.sockaddr
-> unit
val with_tls :
?stop:Miou_unix.Cond.t
?stop:stop
-> ?config:
[ `H2 of H2.Config.t
[ `Both of Httpaf.Config.t * H2.Config.t
| `HTTP_1_1 of Httpaf.Config.t
| `Both of Httpaf.Config.t * H2.Config.t ]
| `H2 of H2.Config.t ]
-> ?backlog:int
-> ?error_handler:error_handler
-> Tls.Config.server
-> handler:handler
-> Miou_unix.file_descr
-> Unix.sockaddr
-> unit

View File

@ -4,91 +4,44 @@ module Log = (val Logs.src_log src : Logs.LOG)
module TCP = struct
type t = Miou_unix.file_descr
type error = Unix.error * string * string
type error = [ `Unix of Unix.error * string * string | `Closed ]
let pp_error ppf (err, f, v) =
Fmt.pf ppf "%s(%s): %s" f v (Unix.error_message err)
let pp_error ppf = function
| `Unix (err, f, v) -> Fmt.pf ppf "%s(%s): %s" f v (Unix.error_message err)
| `Closed -> Fmt.string ppf "Connection closed by peer (tcp)"
let read flow buf ~off ~len =
match Miou_unix.read flow buf ~off ~len with
match Miou_unix.read flow buf off len with
| len -> Ok len
| exception Unix.Unix_error (Unix.ECONNRESET, _, _) -> Ok 0
| exception Unix.Unix_error (err, f, v) -> Error (err, f, v)
| exception Unix.Unix_error (err, f, v) -> Error (`Unix (err, f, v))
let full_write flow ({ Cstruct.len; _ } as cs) =
let str = Cstruct.to_string cs in
let rec go fd buf off len =
if len = 0 then Ok ()
else
match Unix.select [] [ fd ] [] (-1.0) with
| [], [ _ ], [] -> begin
try
let len' = Unix.single_write fd buf off len in
go fd buf (off + len') (len - len')
with
| Unix.Unix_error (Unix.EINTR, _, _) -> go fd buf off len
| Unix.Unix_error (err, f, v) -> Error (err, f, v)
end
| _ -> go fd buf off len
| exception Unix.Unix_error (err, f, v) -> Error (err, f, v)
in
go (Miou_unix.to_file_descr flow) (Bytes.unsafe_of_string str) 0 len
Miou_unix.write flow str 0 len
let writev flow css =
let cs = Cstruct.concat css in
full_write flow cs
full_write flow cs; Ok ()
let close = Miou_unix.close
let shutdown flow = function
| `Recv -> Miou_unix.shutdown flow Unix.SHUTDOWN_RECEIVE
| `Send -> Miou_unix.shutdown flow Unix.SHUTDOWN_SEND
| `read -> Unix.shutdown (Miou_unix.to_file_descr flow) Unix.SHUTDOWN_RECEIVE
| `write -> Unix.shutdown (Miou_unix.to_file_descr flow) Unix.SHUTDOWN_SEND
| `read_write -> Unix.close (Miou_unix.to_file_descr flow)
[@@ocamlformat "disable"]
end
module TLS = struct
include Tls_miou.Make (TCP)
module TLS = Tls_miou.Make (TCP)
let close flow =
match flow.state with
| `Active _ -> close flow
| _ -> Miou_unix.disown flow.flow
end
let to_tls cfg ?host flow =
match TLS.client_of_flow cfg ?host flow with
| Ok tls_flow -> Ok tls_flow
| Error _ as err ->
Miou_unix.disown flow;
err
let to_tls cfg ?host flow = TLS.client_of_flow cfg ?host flow
let epoch tls =
match tls.TLS.state with
| `End_of_input | `Error _ -> None
| `Active tls -> (
match Tls.Engine.epoch tls with
| `Active tls ->
( match Tls.Engine.epoch tls with
| Error () -> assert false
| Ok data -> Some data)
(* Implementations. *)
(* XXX(dinosaure): We need to make a serious note of this. The behavior of
http/1.1 with TLS seems rather "random" in the sense that some servers
deliver a close-notify to the client while others do nothing... Worse still,
if we want to deliver a close-notify to the server to say we no longer want
to write (but still want to read), some servers close the connection and we
get an [ECONNRESET] when we want to read (and we wouldn't have read the whole
HTML document...).
In other words, as far as HTTP/1.1 and TLS are concerned, we should NOT send
a close-notify. The other subtlety is (and this is not yet the case in our
code) to close the connection if the TLS layer fails (which is not the case
with H2). In short, these two subtleties mean that I've spent quite a few
days experimenting with and without the shutdown, and finally realized that
both behaviors are necessary for HTTP/1.1 AND H2...
However, this clarifies something quite important: thanks to Miou, we can be
sure that no task remains in the background after the request has been sent
and the response has been received. The interaction model with an HTTP server
can be quite complex (long polling, h2, websocket...) and we're not immune to
forgetting or double-clicking. Miou fails in such situations, which forced me
to rethink the execution of an HTTP request without [Lwt.async] ;) ! *)
| Ok data -> Some data )
| _ -> None
[@@ocamlformat "disable"]

View File

@ -1,6 +1,10 @@
let src = Logs.Src.create "http-client"
let src = Logs.Src.create "httpcats"
module Log = (val Logs.src_log src : Logs.LOG)
module Flow = Flow
module Runtime = Runtime
module Client = Http_miou_client
module Server = Http_miou_server
let error_msgf fmt = Fmt.kstr (fun msg -> Error (`Msg msg)) fmt
@ -105,19 +109,12 @@ let prep_h2_headers headers (host : string) user_pass blen =
(H2.Headers.get headers "host", H2.Headers.get headers ":authority")
with
| None, None -> (headers, host)
| Some h, None ->
Log.debug (fun m ->
m "removing host header (inserting authority instead)");
(H2.Headers.remove headers "host", h)
| Some h, None -> (H2.Headers.remove headers "host", h)
| None, Some a -> (H2.Headers.remove headers ":authority", a)
| Some h, Some a ->
if String.equal h a then
(H2.Headers.remove (H2.Headers.remove headers ":authority") "host", h)
else begin
Log.warn (fun m ->
m "authority header %s mismatches host %s (keeping both)" a h);
(H2.Headers.remove headers ":authority", a)
end
else (H2.Headers.remove headers ":authority", a)
in
let add hdr = H2.Headers.add_unless_exists hdr ?sensitive:None in
let hdr = H2.Headers.empty in
@ -133,12 +130,20 @@ module Version = Httpaf.Version
module Status = H2.Status
module Headers = H2.Headers
type response =
{ version : Version.t
; status : Status.t
; reason : string
; headers : Headers.t
}
type response = {
version: Version.t
; status: Status.t
; reason: string
; headers: Headers.t
}
let pp_response ppf resp =
Fmt.pf ppf
"@[<hov>{ version=@ %a;@ status=@ %s;@ reason=@ %S;@ headers=@ @[<hov>%a@] \
}@]"
Version.pp_hum resp.version
(Status.to_string resp.status)
resp.reason Headers.pp_hum resp.headers
type error =
[ `V1 of Httpaf.Client_connection.error
@ -148,89 +153,90 @@ type error =
| `Tls of Http_miou_unix.TLS.error ]
let pp_error ppf = function
| `Protocol msg -> Fmt.string ppf msg
| `Msg msg -> Fmt.string ppf msg
| `Tls err -> Http_miou_unix.TLS.pp_error ppf err
| `V1 (`Malformed_response msg) ->
Fmt.pf ppf "Malformed HTTP/1.1 response: %s" msg
| `V1 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V1 (`Exn exn) | `V2 (`Exn exn) ->
Fmt.pf ppf "Got an unexpected exception: %S" (Printexc.to_string exn)
| `V2 (`Malformed_response msg) -> Fmt.pf ppf "Malformed H2 response: %s" msg
| `V2 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V2 (`Protocol_error (err, msg)) ->
Fmt.pf ppf "Protocol error %a: %s" H2.Error_code.pp_hum err msg
| #Client.error as err -> Client.pp_error ppf err
let from_httpaf response =
{ version = response.Httpaf.Response.version
; status = (response.Httpaf.Response.status :> H2.Status.t)
; reason = response.Httpaf.Response.reason
; headers =
{
version= response.Httpaf.Response.version
; status= (response.Httpaf.Response.status :> H2.Status.t)
; reason= response.Httpaf.Response.reason
; headers=
H2.Headers.of_list
(Httpaf.Headers.to_list response.Httpaf.Response.headers)
}
let single_http_1_1_request ?(config = Httpaf.Config.default) flow user_pass
host meth path headers body f acc =
let body_length = Option.map String.length body in
let headers = prep_http_1_1_headers headers host user_pass body_length in
let request = Httpaf.Request.create ~headers meth path in
let f response acc str =
let[@warning "-8"] (`V1 response : Http_miou_client.response) = response in
f (from_httpaf response) acc str
in
Log.debug (fun m -> m "start to send the http/1.1 request");
match Http_miou_client.run ~f acc (`V1 config) flow (`V1 request) with
| Process (V2, _, _) -> assert false
| Process (V1, await, { body = stream; write_string; close; release }) -> (
Option.iter (write_string stream) body;
close stream;
let result = await () in
release ();
match result with
| Ok (response, acc) -> Ok (from_httpaf response, acc)
| Error (#Http_miou_client.error as err) -> Error (err :> error))
let from_h2 response =
{ version = { major = 2; minor = 0 }
; status = response.H2.Response.status
; reason = ""
; headers = response.H2.Response.headers
{
version= { major= 2; minor= 0 }
; status= response.H2.Response.status
; reason= ""
; headers= response.H2.Response.headers
}
let single_h2_request ?(config = H2.Config.default) flow scheme user_pass host
meth path headers body f acc =
let body_length = Option.map String.length body in
let headers = prep_h2_headers headers host user_pass body_length in
let request = H2.Request.create ~scheme ~headers meth path in
let single_http_1_1_request ?(config = Httpaf.Config.default) flow user_pass
host meth path headers contents f acc =
let contents_length = Option.map String.length contents in
let headers = prep_http_1_1_headers headers host user_pass contents_length in
let request = Httpaf.Request.create ~headers meth path in
let f response acc str =
let[@warning "-8"] (`V2 response : Http_miou_client.response) = response in
let[@warning "-8"] (`V1 response : Client.response) = response in
f (from_httpaf response) acc str
in
match Client.run ~f acc (`V1 config) flow (`V1 request) with
| Process (V2, _, _) -> assert false
| Process (V1, await, body) -> (
let go orphans =
Option.iter (Httpaf.Body.write_string body) contents;
Runtime.terminate orphans
in
Runtime.flat_tasks go;
let finally () =
match flow with
| `Tls flow -> Http_miou_unix.TLS.close flow
| `Tcp flow -> Http_miou_unix.TCP.close flow
in
Fun.protect ~finally @@ fun () ->
match await () with
| Ok (response, acc) -> Ok (from_httpaf response, acc)
| Error (#Client.error as err) -> Error (err :> error))
let single_h2_request ?(config = H2.Config.default) flow scheme user_pass host
meth path headers contents f acc =
let contents_length = Option.map String.length contents in
let headers = prep_h2_headers headers host user_pass contents_length in
let request = H2.Request.create ~scheme ~headers meth path in
let first = ref false in
let f response acc str =
let[@warning "-8"] (`V2 response : Client.response) = response in
if !first then (
Log.debug (fun m -> m "Response: %a" pp_response (from_h2 response));
first := false);
f (from_h2 response) acc str
in
match Http_miou_client.run ~f acc (`V2 config) flow (`V2 request) with
match Client.run ~f acc (`V2 config) flow (`V2 request) with
| Process (V1, _, _) -> assert false
| Process (V2, await, { body = stream; write_string; close; release }) -> (
Option.iter (write_string stream) body;
close stream;
let result = await () in
release ();
match result with
| Process (V2, await, body) -> (
let go orphans =
Option.iter (H2.Body.Writer.write_string body) contents;
H2.Body.Writer.close body;
Log.debug (fun m -> m "writer terminates");
Runtime.terminate orphans
in
Runtime.flat_tasks go;
match await () with
| Ok (response, acc) -> Ok (from_h2 response, acc)
| Error (#Http_miou_client.error as err) -> Error (err :> error))
| Error (#Client.error as err) -> Error (err :> error))
let alpn_protocol = function
| `Tcp _ -> None
| `Tls tls -> (
match Http_miou_unix.epoch tls with
| Some { Tls.Core.alpn_protocol = Some "h2"; _ } -> Some `H2
| Some { Tls.Core.alpn_protocol = Some "http/1.1"; _ } -> Some `HTTP_1_1
| Some { Tls.Core.alpn_protocol = None; _ } -> None
| Some { Tls.Core.alpn_protocol = Some protocol; _ } ->
Log.warn (fun m ->
m "The ALPN negotiation unexpectedly resulted in %S." protocol);
None
| Some { Tls.Core.alpn_protocol= Some "h2"; _ } -> Some `H2
| Some { Tls.Core.alpn_protocol= Some "http/1.1"; _ } -> Some `HTTP_1_1
| Some { Tls.Core.alpn_protocol= None; _ } -> None
| Some { Tls.Core.alpn_protocol= Some _; _ } -> None
| None -> None)
let connect ?port ?tls_config ~happy_eyeballs host =
@ -240,13 +246,10 @@ let connect ?port ?tls_config ~happy_eyeballs host =
| None, Some _ -> 443
| Some port, _ -> port
in
Log.debug (fun m -> m "Try to connect to %s" host);
match (Happy.connect_endpoint happy_eyeballs host [ port ], tls_config) with
| Ok ((ipaddr, port), file_descr), None ->
Log.debug (fun m -> m "connected to %a:%d" Ipaddr.pp ipaddr port);
Ok (`Tcp file_descr)
| Ok ((ipaddr, port), file_descr), Some tls_config ->
Log.debug (fun m ->
m "connect to %a:%d and start to upgrade to tls" Ipaddr.pp ipaddr port);
| Ok ((_ipaddr, _port), file_descr), None -> Ok (`Tcp file_descr)
| Ok ((_ipaddr, _port), file_descr), Some tls_config ->
let ( >>= ) = Result.bind in
Http_miou_unix.to_tls tls_config file_descr
|> Result.map_error (fun err -> `Tls err)
@ -271,19 +274,26 @@ let single_request ~happy_eyeballs ?http_config tls_config ~meth ~headers ?body
| `Default cfg, _ -> Some cfg
else Ok None
in
Log.debug (fun m -> m "connect to %s (connected)" uri);
let* flow = connect ?port ?tls_config ~happy_eyeballs host in
Log.debug (fun m -> m "single request to %s (connected)" uri);
match (alpn_protocol flow, http_config) with
| (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) ->
| (Some `HTTP_1_1 | None), Some (`V1 config) ->
single_http_1_1_request ~config flow user_pass host meth path headers body
f acc
| (Some `HTTP_1_1 | None), None ->
single_http_1_1_request flow user_pass host meth path headers body f acc
| (Some `H2 | None), Some (`H2 config) ->
| (Some `H2 | None), Some (`V2 config) ->
single_h2_request ~config flow scheme user_pass host meth path headers
body f acc
| Some `H2, None ->
single_h2_request flow scheme user_pass host meth path headers body f acc
| _ -> assert false
| Some `H2, Some (`V1 _) ->
Log.warn (fun m -> m "ALPN protocol is h2 where user forces http/1.1");
single_h2_request flow scheme user_pass host meth path headers body f acc
| Some `HTTP_1_1, Some (`V2 _) ->
Log.warn (fun m -> m "ALPN protocol is http/1.1 where user forces h2");
single_http_1_1_request flow user_pass host meth path headers body f acc
let resolve_location ~uri ~location =
match String.split_on_char '/' location with
@ -351,5 +361,3 @@ let request ?config ?tls_config ?authenticator ?(meth = `GET) ?(headers = [])
else Ok (resp, body)
in
follow_redirect max_redirect uri
module Server = Http_miou_server

View File

@ -26,15 +26,17 @@ module Headers = H2.Headers
Case-insensitive key-value pairs. *)
type response =
{ version : Version.t
; status : Status.t
; reason : string
; headers : Headers.t
}
type response = {
version: Version.t
; status: Status.t
; reason: string
; headers: Headers.t
}
(** A response, consisting of version, status, reason (HTTP 1.1 only), and
headers. *)
val pp_response : response Fmt.t
val request :
?config:[ `HTTP_1_1 of Httpaf.Config.t | `H2 of H2.Config.t ]
-> ?tls_config:Tls.Config.client
@ -50,6 +52,7 @@ val request :
-> 'a
-> (response * 'a, error) result
module Client = Http_miou_client
module Server = Http_miou_server
(**/**)

View File

@ -1,39 +1,8 @@
(*----------------------------------------------------------------------------
Copyright (c) 2018 Inhabited Type LLC.
Copyright (c) 2018 Anton Bachin
Copyright (c) 2023 Robur
let src = Logs.Src.create "runtime"
All rights reserved.
module Log = (val Logs.src_log src : Logs.LOG)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)
module type RUNTIME = sig
module type S = sig
type t
val next_read_operation : t -> [ `Read | `Yield | `Close ]
@ -49,10 +18,6 @@ module type RUNTIME = sig
val report_exn : t -> exn -> unit
end
let src = Logs.Src.create "runtime"
module Log = (val Logs.src_log src : Logs.LOG)
module Buffer : sig
type t
@ -60,12 +25,11 @@ module Buffer : sig
val get : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
val put : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
end = struct
type t =
{ mutable buffer : Bigstringaf.t; mutable off : int; mutable len : int }
type t = { mutable buffer: Bigstringaf.t; mutable off: int; mutable len: int }
let create size =
let buffer = Bigstringaf.create size in
{ buffer; off = 0; len = 0 }
{ buffer; off= 0; len= 0 }
let compress t =
if t.len = 0 then begin
@ -78,6 +42,10 @@ end = struct
end
let get t ~f =
Log.debug (fun m ->
m "<- @[<hov>%a@]"
(Hxd_string.pp Hxd.default)
(Bigstringaf.substring t.buffer ~off:t.off ~len:t.len));
let n = f t.buffer ~off:t.off ~len:t.len in
t.off <- t.off + n;
t.len <- t.len - n;
@ -97,44 +65,48 @@ end = struct
n
end
type protect =
{ protect : 'a 'b. orphans:unit Miou.orphans -> ('a -> 'b) -> 'a -> 'b }
[@@unboxed]
let catch ~on fn =
try fn ()
with exn ->
Log.err (fun m ->
m "Got an unexpected exception: %S" (Printexc.to_string exn));
on exn
exception Flow of string
module type S = sig
type conn
type flow
val run :
conn
-> ?give:Miou.Ownership.t list
-> ?disown:(flow -> unit)
-> read_buffer_size:int
-> flow
-> protect * unit Miou.t * (unit -> unit)
end
let rec terminate orphans =
match Miou.care orphans with
| None -> Miou.yield ()
| Some None ->
Miou.yield ();
terminate orphans
| Some (Some prm) ->
Miou.await_exn prm;
terminate orphans
| Some None -> Miou.yield (); terminate orphans
| Some (Some prm) -> (
match Miou.await prm with
| Ok () -> terminate orphans
| Error exn ->
Log.err (fun m ->
m "Unexpected exception: %S" (Printexc.to_string exn));
terminate orphans)
module Make (Flow : Flow.S) (Runtime : RUNTIME) :
S with type conn = Runtime.t and type flow = Flow.t = struct
type _ Effect.t += Spawn : (unit -> unit) -> unit Effect.t
let flat_tasks fn =
let orphans = Miou.orphans () in
let open Effect.Deep in
let retc = Fun.id
and exnc exn =
Log.err (fun m -> m "Unexpected exception: %S" (Printexc.to_string exn));
raise exn
and effc : type c. c Effect.t -> ((c, 'a) continuation -> 'a) option =
function
| Spawn fn ->
Log.debug (fun m -> m "spawn a new task");
let _ =
Miou.call_cc ~orphans @@ fun () ->
Log.debug (fun m -> m "function spawned");
try fn ()
with exn ->
Log.err (fun m ->
m "Unexpected exception: %S" (Printexc.to_string exn));
raise exn
in
Some (fun k -> continue k ())
| _ -> None
in
match_with fn orphans { retc; exnc; effc }
module Make (Flow : Flow.S) (Runtime : S) = struct
type conn = Runtime.t
type flow = Flow.t
@ -148,8 +120,6 @@ module Make (Flow : Flow.S) (Runtime : RUNTIME) :
Bigstringaf.blit_from_bytes buf ~src_off:0 bstr ~dst_off ~len;
len
| Error err ->
Log.err (fun m ->
m "close the socket (recv) due to: %a" Flow.pp_error err);
Flow.close flow;
raise (Flow (Fmt.str "%a" Flow.pp_error err)))
in
@ -158,225 +128,97 @@ module Make (Flow : Flow.S) (Runtime : RUNTIME) :
let writev flow bstrs =
let copy { Faraday.buffer; off; len } = Bigstringaf.copy buffer ~off ~len in
let css = List.map copy bstrs |> List.map Cstruct.of_bigarray in
List.iter
(fun cs ->
Log.debug (fun m ->
m "-> @[<hov>%a@]"
(Hxd_string.pp Hxd.default)
(Cstruct.to_string cs)))
css;
match Flow.writev flow css with
| Ok () ->
let len = List.fold_left (fun a { Cstruct.len; _ } -> a + len) 0 css in
`Ok len
| Error err ->
Log.err (fun m ->
m "close the socket (writev) due to: %a" Flow.pp_error err);
Flow.close flow;
`Closed
| Error _ -> Flow.close flow; `Closed
type prm = Miou.Promise.Uid.t * Miou.Domain.Uid.t * int
type _ Effect.t += Spawn : (prm:prm -> unit -> unit) -> unit Effect.t
let pp_prm ppf (uid, runner, resources) =
Fmt.pf ppf "[%a:%a](%d)" Miou.Domain.Uid.pp runner Miou.Promise.Uid.pp uid
resources
let launch ?give ~orphans fn k =
let prm = Miou.self () in
let prm' =
Miou.call_cc ~orphans ?give @@ fun () ->
let prm = Miou.self () in
Log.debug (fun m -> m "%a launched" pp_prm prm);
fn ~prm ()
in
Log.debug (fun m ->
m "%a (child) is attached to %a (parent)" Miou.Promise.pp prm' pp_prm
prm);
Effect.Deep.continue k ()
(* Protected runtime operations.
A note on design and the need to "protect" the appearance of a new task.
Miou constrains us on 2 points:
1) we are obliged to observe the result of a task ([Miou.await]), otherwise
Miou fails.
2) only the task creator can observe the result. For example, this code
doesn't work:
{[
let prm = Miou.call @@ fun () -> Miou.call (Fun.const ()) in
Miou.await_exn (Miou.await_exn prm)
(* the second promise (in [prm]) can only be awaited in [prm]. *)
]}
The first rule requires us to use [Miou.orphans], which allows tasks to be
stored in the background. We can then catch up with them, which is what the
[terminate] function does. The second rule is more complicated...
The runtime ([httpaf] or [h2]) may be able to stop a task but force the
user to give a "continuation" allowing the same task to be restarted at the
right time (when the user wants to do a [Body.write_string], for example).
This is the case when [`Yield] is received from the runtime. This means
that there are continuations (in the runtime) which can cause tasks to
appear and which should be saved in a [Miou.orphans].
The problem is that it's not the task that gives the continuation that will
launch the subtask... So the best protection is to execute runtime
functions (which may launch subtasks) with a specific [Miou.orphan].
Continuation, on the other hand, consists in raising an effect that will be
protected with a specific [Miou.orphans]. In other words, the subtask is
saved not in the task [Miou.orphans] that creates the continuation, but in
the [Miou.orphans]'s task that creates **effectfully** the subtask.
So, after the ceremony to start an HTTP request, we need to protect the
functions that interact with the Body as well, because they can create
tasks too...
{[
let orphans, body, prm = run_http_request flow in
Body.write_string body "Hello World";
(* a new task appears to write ["Hello World!"] which will be saved into
orphans. *)
Body.close body;
(* a new task appears to finish the HTTP request. *)
let result = await prm in
terminate orphans
]}
*)
let protect ?give ~orphans fn v =
let retc = Fun.id in
let exnc = raise in
let open Effect.Deep in
let effc : type c. c Effect.t -> ((c, 'b) continuation -> 'b) option =
function
| Spawn fn -> Some (launch ?give ~orphans fn)
| _ -> None
in
match_with fn v { retc; exnc; effc }
let next_read_operation ?give ~orphans =
protect ?give ~orphans Runtime.next_read_operation
let next_write_operation ?give ~orphans =
protect ?give ~orphans Runtime.next_write_operation
let read ?give ~orphans conn bstr ~off ~len =
protect ?give ~orphans (Runtime.read conn ~off ~len) bstr
let read_eof ?give ~orphans conn bstr ~off ~len =
protect ?give ~orphans (Runtime.read_eof conn ~off ~len) bstr
let report_exn ?give ~orphans ?(close = Fun.const ()) conn exn =
Log.err (fun m -> m "report an exception: %S" (Printexc.to_string exn));
protect ?give ~orphans (Runtime.report_exn conn) exn;
terminate orphans;
close ()
let report_write_result ?give ~orphans conn =
protect ?give ~orphans (Runtime.report_write_result conn)
let yield_reader ?give ~orphans conn =
protect ?give ~orphans (Runtime.yield_reader conn)
let yield_writer ?give ~orphans conn =
protect ?give ~orphans (Runtime.yield_writer conn)
let run conn ?(give = []) ?(disown = Fun.const ()) ~read_buffer_size flow =
let run conn ~read_buffer_size flow =
let buffer = Buffer.create read_buffer_size in
let closed = ref false in
let close () =
if not !closed then (
Flow.close flow;
closed := true)
else disown flow
in
let rec reader ~prm () =
Log.debug (fun m -> m "%a starts the reading loop" pp_prm prm);
let rec go orphans () =
match next_read_operation ~orphans ~give conn with
| `Read -> (
Log.debug (fun m -> m "%a next read operation: `read" pp_prm prm);
let read_eof = read_eof ~orphans ~give in
let read = read ~orphans ~give in
let rec reader () =
let rec go orphans =
Log.debug (fun m -> m "next_read_operation");
match Runtime.next_read_operation conn with
| `Read -> begin
match recv flow buffer with
| `Eof ->
Buffer.get buffer ~f:(fun bstr ~off ~len ->
read_eof conn bstr ~off ~len)
|> ignore;
go orphans ()
| `Ok _ ->
Buffer.get buffer ~f:(fun bstr ~off ~len ->
read conn bstr ~off ~len)
|> ignore;
go orphans ())
Log.debug (fun m -> m "the connection is closed by peer");
let _ = Buffer.get buffer ~f:(Runtime.read_eof conn) in
go orphans
| `Ok len ->
Log.debug (fun m -> m "read %d byte(s)" len);
let _ = Buffer.get buffer ~f:(Runtime.read conn) in
go orphans
end
| `Yield ->
Log.debug (fun m -> m "%a next read operation: `yield" pp_prm prm);
let continuation () =
let prm = Miou.self () in
Log.debug (fun m -> m "%a launches a new task" pp_prm prm);
let k () =
Log.debug (fun m -> m "spawn reader");
Effect.perform (Spawn reader)
in
yield_reader conn ~orphans ~give continuation;
disown flow;
Runtime.yield_reader conn k;
Log.debug (fun m -> m "yield the reader");
terminate orphans
| `Close ->
Log.debug (fun m ->
m "%a read: disown the file-descriptor" pp_prm prm);
disown flow;
Log.debug (fun m -> m "shutdown the reader");
Flow.shutdown flow `read;
terminate orphans
in
let orphans = Miou.orphans () in
catch ~on:(report_exn conn ~orphans ~give ~close) @@ fun () ->
go orphans ()
try flat_tasks go
with exn ->
Log.err (fun m -> m "report an exception: %S" (Printexc.to_string exn));
let go orphans =
Runtime.report_exn conn exn;
terminate orphans
in
flat_tasks go
in
let rec writer ~prm () =
Log.debug (fun m -> m "%a starts to the writing loop" pp_prm prm);
let rec go orphans () =
match next_write_operation ~orphans ~give conn with
let rec writer () =
let rec go orphans =
Log.debug (fun m -> m "next_write_operation");
match Runtime.next_write_operation conn with
| `Write iovecs ->
Log.debug (fun m -> m "%a next write operation: `write" pp_prm prm);
writev flow iovecs |> report_write_result conn ~orphans ~give;
go orphans ()
let len =
List.fold_left (fun acc { Faraday.len; _ } -> acc + len) 0 iovecs
in
Log.debug (fun m -> m "write %d byte(s)" len);
writev flow iovecs |> Runtime.report_write_result conn;
go orphans
| `Yield ->
Log.debug (fun m -> m "%a next write operation: `yield" pp_prm prm);
let continuation () =
let prm = Miou.self () in
Log.debug (fun m -> m "%a launches a new task" pp_prm prm);
let k () =
Log.debug (fun m -> m "spawn writer");
Effect.perform (Spawn writer)
in
yield_writer conn ~orphans ~give continuation;
disown flow;
Runtime.yield_writer conn k;
Log.debug (fun m -> m "yield the writer");
terminate orphans
| `Close _ ->
Log.debug (fun m -> m "%a next write operation: `close" pp_prm prm);
let () = try Flow.shutdown flow `Send with _exn -> disown flow in
Log.debug (fun m -> m "shutdown the writer");
Flow.shutdown flow `write;
terminate orphans
in
let orphans = Miou.orphans () in
catch ~on:(report_exn conn ~orphans ~give ~close) @@ fun () ->
go orphans ()
try flat_tasks go
with exn ->
Log.err (fun m -> m "report an exception: %S" (Printexc.to_string exn));
let go orphans =
Runtime.report_exn conn exn;
terminate orphans
in
flat_tasks go
in
let protect ~orphans = protect ~orphans ~give in
let prm =
Miou.call_cc ~give @@ fun () ->
let p0 = Miou.call_cc ~give @@ fun () -> reader ~prm:(Miou.self ()) () in
let p1 = Miou.call_cc ~give @@ fun () -> writer ~prm:(Miou.self ()) () in
let result =
match Miou.await_all [ p0; p1 ] with
| [ Ok (); Ok () ] -> Ok ()
| [ Error exn; _ ] | [ _; Error exn ] -> Error exn
| _ -> assert false
in
Log.debug (fun m -> m "close the file-descriptor");
(* TODO(dinosaure): we should probably check the state of the underlying
flow and see if it's closed or not. I suspect that on [http/1.1], it's
not the case but Miou does not complain because we [disown]
everywhere. *)
match result with
| Ok () -> disown flow
| Error exn ->
Log.err (fun m ->
m "got an error at the end of our waiters: %S"
(Printexc.to_string exn));
close ();
raise exn
in
Log.debug (fun m -> m "the main task is: %a" Miou.Promise.pp prm);
({ protect }, prm, close)
Miou.call_cc @@ fun () ->
let p0 = Miou.call_cc reader and p1 = Miou.call_cc writer in
match Miou.await_all [ p0; p1 ] with
| [ Ok (); Ok () ] -> Log.debug (fun m -> m "reader / writer terminates")
| [ Error exn; _ ] | [ _; Error exn ] ->
Log.err (fun m -> m "got an exception: %S" (Printexc.to_string exn));
raise exn
| _ -> Log.err (fun m -> m "impossible")
end

View File

@ -1,4 +1,4 @@
module type RUNTIME = sig
module type S = sig
type t
val next_read_operation : t -> [ `Read | `Yield | `Close ]
@ -14,26 +14,14 @@ module type RUNTIME = sig
val report_exn : t -> exn -> unit
end
type protect =
{ protect : 'a 'b. orphans:unit Miou.orphans -> ('a -> 'b) -> 'a -> 'b }
[@@unboxed]
exception Flow of string
module type S = sig
type conn
type flow
module Make (Flow : Flow.S) (Runtime : S) : sig
type conn = Runtime.t
type flow = Flow.t
val run :
conn
-> ?give:Miou.Ownership.t list
-> ?disown:(flow -> unit)
-> read_buffer_size:int
-> flow
-> protect * unit Miou.t * (unit -> unit)
val run : conn -> read_buffer_size:int -> flow -> unit Miou.t
end
module Make (Flow : Flow.S) (Runtime : RUNTIME) :
S with type conn = Runtime.t and type flow = Flow.t
val terminate : unit Miou.orphans -> unit
val flat_tasks : (unit Miou.orphans -> unit) -> unit

View File

@ -15,161 +15,226 @@ module Make (Flow : Flow.S) = struct
| `Tls_alert alert -> Fmt.string ppf (Tls.Packet.alert_type_to_string alert)
| `Read err -> Flow.pp_error ppf err
| `Write err -> Flow.pp_error ppf err
| `Closed -> Fmt.string ppf "Connection closed by peer"
| `Closed -> Fmt.string ppf "Connection closed by peer (tls)"
type state = [ `Active of Tls.Engine.state | `End_of_input | `Error of error ]
type state =
[ `Active of Tls.Engine.state
| `Read_closed of Tls.Engine.state
| `Write_closed of Tls.Engine.state
| `Closed
| `Error of error ]
type t =
{ role : [ `Server | `Client ]
; flow : Flow.t
; mutable state : state
; recv : bytes
; mutable linger : Cstruct.t option
; mutable writer_closed : bool
}
type t = {
role: [ `Server | `Client ]
; flow: Flow.t
; mutable state: state
; mutable linger: Cstruct.t list
; mutable rest: Cstruct.t list
; read_buffer_size: int
}
let write flow buf =
let half_close state mode =
match (state, mode) with
| `Active tls, `read -> `Read_closed tls
| `Active tls, `write -> `Write_closed tls
| `Active _, `read_write -> `Closed
| `Read_closed tls, `read -> `Read_closed tls
| `Read_closed _, (`write | `read_write) -> `Closed
| `Write_closed tls, `write -> `Write_closed tls
| `Write_closed _, (`read | `read_write) -> `Closed
| ((`Closed | `Error _) as e), (`read | `write | `read_write) -> e
let inject_state tls = function
| `Active _ -> `Active tls
| `Read_closed _ -> `Read_closed tls
| `Write_closed _ -> `Write_closed tls
| (`Closed | `Error _) as e -> e
let tls_alert a = `Error (`Tls_alert a)
let tls_fail f = `Error (`Tls_failure f)
let write_flow flow buf =
match Flow.writev flow.flow [ buf ] with
| Ok () -> Ok ()
| Error err ->
match flow.state with
| `Active _ ->
flow.state <- `Error (`Write err);
Error err
| _ -> Error err
[@@ocamlformat "disable"]
let write_ign flow buf = ignore (Flow.writev flow.flow [ buf ])
let tls_alert alert = `Error (`Tls_alert alert)
let tls_fail failure = `Error (`Tls_failure failure)
| Ok _ as o -> o
| Error `Closed ->
flow.state <- half_close flow.state `write;
Error `Closed
| Error e ->
flow.state <- `Error (`Write e);
Error (`Write e)
let handle flow tls buf =
match Tls.Engine.handle_tls tls buf with
| Ok (state', `Response resp, `Data data) ->
let state' = begin match state' with
| `Ok tls -> `Active tls
| `Eof -> `End_of_input
| `Alert alert -> tls_alert alert end in
flow.state <- state';
let _ = Option.map (write flow) resp in
| Ok (state, eof, `Response resp, `Data data) ->
let state = inject_state state flow.state in
let state = Option.(value ~default:state (map (fun `Eof -> half_close state `read) eof)) in
flow.state <- state;
let _ = match resp with
| None -> Ok ()
| Some buf -> write_flow flow buf in
`Ok data
| Error (alert, `Response resp) ->
flow.state <- tls_fail alert;
match write flow resp with
| Ok () -> tls_fail alert
| Error err -> `Error (`Write err)
| Error (fail, `Response resp) ->
let reason = match fail with
| `Alert a -> tls_alert a | f -> tls_fail f in
flow.state <- reason;
let _ = Flow.writev flow.flow [resp] in
reason
[@@ocamlformat "disable"]
let read flow =
let buf = Bytes.create flow.read_buffer_size in
match Flow.read flow.flow buf ~off:0 ~len:(Bytes.length buf) with
| Error _ as err -> err
| Ok 0 -> Ok `Eof
| Ok len -> Ok (`Data (Cstruct.of_bytes buf ~off:0 ~len))
let read_react flow =
match flow.state with
| (`End_of_input | `Error _) as v -> v
| `Active _ ->
let result =
Flow.read flow.flow flow.recv ~off:0 ~len:(Bytes.length flow.recv) in
match flow.state, result with
| `Active _, Error err -> flow.state <- `Error (`Read err); `Error (`Read err)
| `Active _, Ok 0 -> flow.state <- `End_of_input; `End_of_input
| `Active tls, Ok len ->
handle flow tls (Cstruct.of_bytes flow.recv ~off:0 ~len)
| `Error e, _ -> `Error e
| `End_of_input, _ -> `End_of_input
| `Error _ as e -> e
| `Read_closed _ -> `Eof
| `Closed -> `Eof (* XXX(dinosaure): ECONNRESET? *)
| `Active _ | `Write_closed _ ->
match read flow with
| Error e ->
flow.state <- `Error (`Read e);
`Error (`Read e)
| Ok `Eof ->
flow.state <- half_close flow.state `read;
`Eof
| Ok `Data buf ->
match flow.state with
| `Active tls | `Write_closed tls -> handle flow tls buf
| `Read_closed _ -> `Eof
| `Closed -> `Error (`Write `Closed)
| `Error _ as e -> e
[@@ocamlformat "disable"]
let write_out buf ~off ~len flow res =
let open Cstruct in
let rlen = length res in
let n = min len rlen in
Cstruct.blit_to_bytes res 0 buf off n;
flow.linger <- (if n < rlen then Some (sub res n (rlen - n)) else None);
Ok n
let rec read flow buf ~off ~len =
let rec read_in flow =
match flow.linger with
| Some res -> write_out buf ~off ~len flow res
| None -> (
| _ :: _ as bufs ->
flow.linger <- [];
Ok (`Data (Cstruct.concat @@ List.rev bufs))
| [] -> (
match read_react flow with
| `Ok None -> read flow buf ~off ~len
| `Ok (Some res) -> write_out buf ~off ~len flow res
| `End_of_input -> Ok 0
| `Error err -> Error err)
| `Ok None -> read_in flow
| `Ok (Some buf) -> Ok (`Data buf)
| `Eof -> Ok `Eof
| `Error e -> Error e)
let writev flow bufs =
match flow.state with
| `Closed | `Write_closed _ -> Error `Closed
| `Error e -> Error (e :> error)
| `Active tls | `Read_closed tls -> (
match Tls.Engine.send_application_data tls bufs with
| Some (tls, answer) ->
flow.state <- `Active tls;
write_flow flow answer
| None -> assert false)
let write flow buf = writev flow [ buf ]
let rec drain_handshake flow =
let push_linger flow mcs =
match (mcs, flow.linger) with
| None, _ -> ()
| scs, None -> flow.linger <- scs
| Some cs, Some l -> flow.linger <- Some (Cstruct.append l cs)
in
Log.debug (fun m -> m "drain the handshake");
match flow.state with
| `Active tls when not (Tls.Engine.handshake_in_progress tls) ->
Log.debug (fun m -> m "handshake finished");
flow.rest <- flow.linger;
flow.linger <- [];
Ok flow
| _ -> (
match read_react flow with
| `Ok cs ->
push_linger flow cs;
| `Ok mbuf ->
flow.linger <- Option.to_list mbuf @ flow.linger;
drain_handshake flow
| `Error err -> Error err
| `End_of_input -> Error `Closed)
| `Error e -> Error (e :> error)
| `Eof -> Error `Closed)
let close flow =
let () =
match flow.state with
| `Active tls | `Read_closed tls ->
let tls, buf = Tls.Engine.send_close_notify tls in
flow.state <- inject_state tls flow.state;
flow.state <- `Closed;
let _ = write_flow flow buf in
()
| `Write_closed _ -> flow.state <- `Closed
| _ -> ()
in
Flow.close flow.flow
let shutdown flow mode =
match flow.state with
| `Active tls ->
Log.debug (fun m -> m "close the socket");
let _, buf = Tls.Engine.send_close_notify tls in
flow.state <- `End_of_input;
write_ign flow buf;
Flow.close flow.flow
| _ -> ()
| `Active tls | `Read_closed tls | `Write_closed tls ->
let tls, buf =
match (flow.state, mode) with
| (`Active tls | `Read_closed tls), (`write | `read_write) ->
let tls, buf = Tls.Engine.send_close_notify tls in
(tls, Some buf)
| _, _ -> (tls, None)
in
flow.state <- inject_state tls (half_close flow.state mode);
Option.fold ~none:()
~some:(fun b ->
let _ = write_flow flow b in
())
buf;
if flow.state = `Closed then Flow.close flow.flow
| `Error _ | `Closed -> Flow.close flow.flow
let shutdown flow v =
flow.writer_closed <- true;
Flow.shutdown flow.flow v
let client_of_flow conf ?host flow =
let client_of_flow conf ?(read_buffer_size = 0x1000) ?host flow =
let conf' =
match host with None -> conf | Some host -> Tls.Config.peer conf host
in
let tls, init = Tls.Engine.client conf' in
let tls_flow =
{ role = `Client
{
role= `Client
; flow
; state = `Active tls
; linger = None
; recv = Bytes.create 0x1000
; writer_closed = false
; state= `Active tls
; linger= []
; rest= []
; read_buffer_size
}
in
match write tls_flow init with
| Ok () ->
Log.debug (fun m -> m "start tls handshake");
drain_handshake tls_flow
| Error err -> Error (`Write err)
match write_flow tls_flow init with
| Ok () -> drain_handshake tls_flow
| Error err -> Error err
let server_of_flow config flow =
let server_of_flow conf ?(read_buffer_size = 0x1000) flow =
let tls = Tls.Engine.server conf in
let tls_flow =
{ role = `Server
{
role= `Server
; flow
; state = `Active (Tls.Engine.server config)
; linger = None
; recv = Bytes.create 0x1000
; writer_closed = false
; state= `Active tls
; linger= []
; rest= []
; read_buffer_size
}
in
drain_handshake tls_flow
let writev flow bufs =
if flow.writer_closed then Error `Closed
else
match flow.state with
| `End_of_input -> Error `Closed
| `Error e -> Error e
| `Active tls -> (
match Tls.Engine.send_application_data tls bufs with
| Some (tls, answer) ->
flow.state <- `Active tls;
Result.map_error (fun err -> `Write err) (write flow answer)
| None -> assert false)
let write flow buf = writev flow [ buf ]
let read t buf ~off ~len =
match t.rest with
| cs :: rest ->
let len' = min (Cstruct.length cs) len in
Log.debug (fun m -> m "transmit some saved bytes (%d byte(s))" len');
Cstruct.blit_to_bytes cs 0 buf off len';
let cs = Cstruct.shift cs len' in
if Cstruct.length cs = 0 then t.rest <- rest
else t.rest <- cs :: rest;
Ok len'
| [] ->
Log.debug (fun m -> m "start to read some bytes from the underlying flow");
match read_in t with
| Ok `Eof -> Ok 0
| Ok (`Data cs) ->
let len = min (Cstruct.length cs) len in
Cstruct.blit_to_bytes cs 0 buf off len;
if Cstruct.length cs > len
then t.rest <- [ Cstruct.shift cs len ];
Ok len
| Error _ as err -> err
[@@ocamlformat "disable"]
end

View File

@ -2,10 +2,7 @@ let anchor = Unix.gettimeofday ()
let reporter ppf =
let report src level ~over k msgf =
let k _ =
over ();
k ()
in
let k _ = over (); k () in
let with_metadata header _tags k ppf fmt =
Format.kfprintf k ppf
("[%a]%a[%a][%a]: " ^^ fmt ^^ "\n%!")
@ -28,47 +25,47 @@ let () = Logs_threaded.enable ()
let () = Printexc.record_backtrace true
let server ?(port = 8080) handler =
let stop = Miou_unix.Cond.make () in
let stop = Httpcats.Server.stop () in
let prm =
Miou.call @@ fun () ->
let file_descr = Miou_unix.tcpv4 () in
Miou_unix.bind_and_listen file_descr
(Unix.ADDR_INET (Unix.inet_addr_loopback, port));
Httpcats.Server.clear ~stop ~handler file_descr;
Miou_unix.disown file_descr
let sockaddr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
Httpcats.Server.clear ~stop ~handler sockaddr
in
(stop, prm)
let test00 =
Alcotest.test_case "simple" `Quick @@ fun () ->
Miou_unix.run @@ fun () ->
let handler _request =
let open Httpcats.Server in
let body = "Hello World!" in
let headers =
Headers.of_list
[ ("content-type", "text/plain")
; ("content-length", string_of_int (String.length body)) ]
in
Httpcats.Server.string ~headers ~status:`OK body
let handler = function
| `V2 _ -> assert false
| `V1 reqd ->
let open Httpaf in
let body = "Hello World!" in
let headers =
Headers.of_list
[
("content-type", "text/plain")
; ("content-length", string_of_int (String.length body))
]
in
let resp = Response.create ~headers `OK in
Reqd.respond_with_string reqd resp body
in
let stop, prm = server ~port:4000 handler in
let daemon, resolver = Happy.stack () in
match
Httpcats.request ~resolver
~f:(fun _resp buf str ->
Buffer.add_string buf str;
buf)
~f:(fun _resp buf str -> Buffer.add_string buf str; buf)
~uri:"http://127.0.0.1:4000/" (Buffer.create 0x10)
with
| Ok (_response, buf) ->
Alcotest.(check string)
"Hello World!" (Buffer.contents buf) "Hello World!";
Miou_unix.Cond.signal stop;
Httpcats.Server.switch stop;
Miou.await_exn prm;
Happy.kill daemon
| Error err ->
Miou_unix.Cond.signal stop;
Httpcats.Server.switch stop;
Miou.await_exn prm;
Happy.kill daemon;
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
@ -87,56 +84,48 @@ let test01 =
let g1 = Random.State.copy g0 in
let max = 0x100000 in
let chunk = 0x10 in
let handler _request =
let open Httpcats.Server in
let headers =
Headers.of_list
[ ("content-type", "text/plain"); ("content-length", string_of_int max)
]
in
let stream = Httpcats.Server.stream ~headers `OK in
let rec go rest =
if rest <= 0 then stream.close ()
else
let len = min chunk rest in
let str = generate g0 len in
begin
stream.write_string str;
go (rest - len)
end
in
go max
let handler = function
| `V2 _ -> assert false
| `V1 reqd ->
Logs.debug (fun m -> m "Got a request");
let open Httpaf in
let headers =
Headers.of_list
[
("content-type", "text/plain")
; ("content-length", string_of_int max)
]
in
let resp = Response.create ~headers `OK in
let body = Reqd.respond_with_streaming reqd resp in
let rec go rest =
if rest <= 0 then Body.close_writer body
else
let len = min chunk rest in
let str = generate g0 len in
Body.write_string body str;
go (rest - len)
in
go max
in
let stop, prm = server ~port:4000 handler in
let daemon, resolver = Happy.stack () in
match
Httpcats.request ~resolver
~f:(fun _resp buf str ->
Buffer.add_string buf str;
buf)
~f:(fun _resp buf str -> Buffer.add_string buf str; buf)
~uri:"http://127.0.0.1:4000" (Buffer.create 0x1000)
with
| Ok (_response, buf) ->
Alcotest.(check string) "random" (generate g1 max) (Buffer.contents buf);
Miou_unix.Cond.signal stop;
Httpcats.Server.switch stop;
Miou.await_exn prm;
Happy.kill daemon
| Error err ->
Miou_unix.Cond.signal stop;
Httpcats.Server.switch stop;
Miou.await_exn prm;
Happy.kill daemon;
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
let to_string { Httpcats.Server.schedule } k =
let buf = Buffer.create 0x1000 in
let rec on_eof () = k (Buffer.contents buf)
and on_read bstr ~off ~len =
let str = Bigstringaf.substring ~off ~len bstr in
Buffer.add_string buf str;
schedule ~on_eof ~on_read
in
schedule ~on_eof ~on_read
let sha1 = Alcotest.testable Digestif.SHA1.pp Digestif.SHA1.equal
let random_string ~len =
@ -146,47 +135,63 @@ let random_string ~len =
done;
Bytes.unsafe_to_string res
let fold ~finally ~f acc body =
let open Httpaf in
let acc = ref acc in
let rec on_eof () = Body.close_reader body; finally !acc
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
Logs.debug (fun m -> m "Feed the context");
acc := f !acc str;
Body.schedule_read body ~on_eof ~on_read
in
Body.schedule_read body ~on_eof ~on_read
let test02 =
Alcotest.test_case "post" `Quick @@ fun () ->
Miou_unix.run @@ fun () ->
let handler _request =
let open Httpcats.Server in
let stream = get () in
to_string stream @@ fun body ->
let hash = Digestif.SHA1.digest_string body in
let hash = Digestif.SHA1.to_hex hash in
let headers =
Headers.of_list
[ ("content-type", "text/plain")
; ("content-length", string_of_int (String.length hash)) ]
in
Httpcats.Server.string ~headers ~status:`OK hash
let handler = function
| `V2 _ -> assert false
| `V1 reqd ->
let open Httpaf in
let f ctx str = Digestif.SHA1.feed_string ctx str in
let finally ctx =
let hash = Digestif.SHA1.(to_hex (get ctx)) in
let headers =
Headers.of_list
[
("content-type", "text/plain")
; ("content-length", string_of_int (String.length hash))
]
in
let resp = Response.create ~headers `OK in
Reqd.respond_with_string reqd resp hash
in
fold ~finally ~f Digestif.SHA1.empty (Reqd.request_body reqd)
in
let stop, prm = server ~port:4000 handler in
let daemon, resolver = Happy.stack () in
let body = random_string ~len:0x4000 in
match
Httpcats.request ~resolver ~meth:`POST ~body
~f:(fun _resp buf str ->
Buffer.add_string buf str;
buf)
~f:(fun _resp buf str -> Buffer.add_string buf str; buf)
~uri:"http://127.0.0.1:4000" (Buffer.create 0x1000)
with
| Ok (_response, buf) ->
let hash' = Digestif.SHA1.of_hex (Buffer.contents buf) in
let hash = Digestif.SHA1.digest_string body in
Alcotest.(check sha1) "sha1" hash hash';
Miou_unix.Cond.signal stop;
Httpcats.Server.switch stop;
Miou.await_exn prm;
Happy.kill daemon
| Error err ->
Miou_unix.Cond.signal stop;
Httpcats.Server.switch stop;
Miou.await_exn prm;
Happy.kill daemon;
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
let () =
let stdout = Alcotest_engine.Global.make_stdout () in
let stderr = Alcotest_engine.Global.make_stderr () in
let stdout = Alcotest_engine.Formatters.make_stdout () in
let stderr = Alcotest_engine.Formatters.make_stderr () in
Alcotest.run ~stdout ~stderr "network"
[ ("simple", [ test00; test01; test02 ]) ]