Add a simple clear http/1.1 server

This commit is contained in:
Calascibetta Romain 2023-10-11 14:24:55 +02:00
parent bce86fe466
commit 119f7a1f02
No known key found for this signature in database
GPG Key ID: 8CC4DC3365A666B0
22 changed files with 1299 additions and 846 deletions

View File

@ -1,2 +1,5 @@
version=0.25.1
exp-grouping=preserve
break-collection-expressions=wrap
break-separators=before
dock-collection-brackets=false

View File

@ -1,8 +1,8 @@
(executable
(name hurle)
(modules hurle)
(libraries logs.fmt fmt.tty logs.threaded mirage-crypto-rng.unix httpcats
httpcats.happy hxd.string hxd.core))
; (executable
; (name hurle)
; (modules hurle)
; (libraries logs.fmt fmt.tty logs.threaded mirage-crypto-rng.unix httpcats
; httpcats.happy hxd.string hxd.core))
(executable
(name pars)

View File

@ -1,3 +1,27 @@
type arguments =
{ json : bool
; form : bool
; multipart : bool
; boundary : string option
; raw : string option
; compress : bool
; pretty : [ `All | `Colors | `Format | `None ] (* style : style *)
; sorted : bool
; charset : Rosetta.encoding
; response_mime : string option
; headers : bool
; body : bool
; verbose : [ `Verbose | `Quiet ]
; output : Fpath.t option
; continue : bool
; auth : (string * string option) option
; auth_type : [ `Basic | `Digest ]
; follow : bool
; chunked : bool
; meth : Method.t
; uri : string
}
let anchor = Unix.gettimeofday ()
let sigpipe = 13
let () = Sys.set_signal sigpipe Sys.Signal_ignore
@ -41,9 +65,8 @@ let epr fmt =
Fun.protect ~finally @@ fun () -> Format.eprintf fmt
let getaddrinfo dns =
{
Happy.getaddrinfo =
(fun record host -> Dns_miou.getaddrinfo dns record host);
{ Happy.getaddrinfo =
(fun record host -> Dns_miou.getaddrinfo dns record host)
}
let () = Mirage_crypto_rng_unix.initialize (module Mirage_crypto_rng.Fortuna)

View File

@ -89,14 +89,14 @@ type event =
type display = (unit, unit) Progress.Display.t
type t = {
gen : int Atomic.t;
orphans : unit Miou.orphans;
events : event Miou.Queue.t;
reporters : reporter array;
display : display;
align : int;
}
type t =
{ gen : int Atomic.t
; orphans : unit Miou.orphans
; events : event Miou.Queue.t
; reporters : reporter array
; display : display
; align : int
}
let make ~filenames =
let gen = Atomic.make 0 in
@ -208,9 +208,8 @@ let get_uris_from_stdin () =
go []
let getaddrinfo dns =
{
Happy.getaddrinfo =
(fun record host -> Dns_miou.getaddrinfo dns record host);
{ Happy.getaddrinfo =
(fun record host -> Dns_miou.getaddrinfo dns record host)
}
let sigpipe = 13

View File

@ -1,21 +1,20 @@
type 'a t = {
buffer : 'a array;
mutable rdpos : int;
mutable wrpos : int;
lock : Mutex.t;
non_empty : Miou_unix.Cond.t;
non_full : Miou_unix.Cond.t;
}
type 'a t =
{ buffer : 'a array
; mutable rdpos : int
; mutable wrpos : int
; lock : Mutex.t
; non_empty : Miou_unix.Cond.t
; non_full : Miou_unix.Cond.t
}
let make size v =
let lock = Mutex.create () in
{
buffer = Array.make size v;
lock;
rdpos = 0;
wrpos = 0;
non_empty = Miou_unix.Cond.make ~mutex:lock ();
non_full = Miou_unix.Cond.make ~mutex:lock ();
{ buffer = Array.make size v
; lock
; rdpos = 0
; wrpos = 0
; non_empty = Miou_unix.Cond.make ~mutex:lock ()
; non_full = Miou_unix.Cond.make ~mutex:lock ()
}
let is_empty t =

View File

@ -1,7 +1,8 @@
(library
(name httpcats)
(public_name httpcats)
(modules flow http_miou_unix httpcats tls_miou)
(modules flow runtime http_miou_unix http_miou_client http_miou_server
httpcats tls_miou)
(libraries ca-certs httpaf h2 httpcats.happy tls miou miou.unix))
(library

View File

@ -36,36 +36,35 @@ and value =
| `Resolution_v6 of
[ `host ] Domain_name.t * (Ipaddr.V6.Set.t, [ `Msg of string ]) result ]
and getaddrinfo = {
getaddrinfo :
'response 'a.
'response Dns.Rr_map.key ->
'a Domain_name.t ->
('response, [ `Msg of string ]) result;
}
and getaddrinfo =
{ getaddrinfo :
'response 'a.
'response Dns.Rr_map.key
-> 'a Domain_name.t
-> ('response, [ `Msg of string ]) result
}
[@@unboxed]
let dummy =
let getaddrinfo _ _ = Error (`Msg "Not implemented") in
{ getaddrinfo }
type stack = {
mutable cancel_connecting : cancel list Happy_eyeballs.Waiter_map.t;
mutable waiters : state Atomic.t Happy_eyeballs.Waiter_map.t;
condition : Miou_unix.Cond.t;
queue : action Miou.Queue.t;
connections : (Miou.Promise.Uid.t, entry) Hashtbl.t;
mutable getaddrinfo : getaddrinfo;
}
type stack =
{ mutable cancel_connecting : cancel list Happy_eyeballs.Waiter_map.t
; mutable waiters : state Atomic.t Happy_eyeballs.Waiter_map.t
; condition : Miou_unix.Cond.t
; queue : action Miou.Queue.t
; connections : (Miou.Promise.Uid.t, entry) Hashtbl.t
; mutable getaddrinfo : getaddrinfo
}
let create_stack () =
{
cancel_connecting = Happy_eyeballs.Waiter_map.empty;
waiters = Happy_eyeballs.Waiter_map.empty;
condition = Miou_unix.Cond.make ();
queue = Miou.Queue.create ();
connections = Hashtbl.create 0x100;
getaddrinfo = dummy;
{ cancel_connecting = Happy_eyeballs.Waiter_map.empty
; waiters = Happy_eyeballs.Waiter_map.empty
; condition = Miou_unix.Cond.make ()
; queue = Miou.Queue.create ()
; connections = Hashtbl.create 0x100
; getaddrinfo = dummy
}
let try_connect addr () =
@ -358,8 +357,8 @@ let stack ?aaaa_timeout ?connect_delay ?connect_timeout ?resolve_timeout
?resolve_retries () =
let v = create_stack () in
( launch_stack ?aaaa_timeout ?connect_delay ?connect_timeout ?resolve_timeout
?resolve_retries v (),
v )
?resolve_retries v ()
, v )
let inject_resolver ~getaddrinfo stack = stack.getaddrinfo <- getaddrinfo
let kill = Miou.cancel
@ -367,12 +366,12 @@ let kill = Miou.cancel
type +'a io = 'a
type io_addr = [ `Plaintext of Ipaddr.t * int ]
type t = {
nameservers : io_addr list;
proto : Dns.proto;
timeout : float;
stack : stack;
}
type t =
{ nameservers : io_addr list
; proto : Dns.proto
; timeout : float
; stack : stack
}
type context = float * bool ref * Miou_unix.file_descr

View File

@ -5,23 +5,23 @@ include
type daemon
type getaddrinfo = {
getaddrinfo :
'response 'a.
'response Dns.Rr_map.key ->
'a Domain_name.t ->
('response, [ `Msg of string ]) result;
}
type getaddrinfo =
{ getaddrinfo :
'response 'a.
'response Dns.Rr_map.key
-> 'a Domain_name.t
-> ('response, [ `Msg of string ]) result
}
[@@unboxed]
val stack :
?aaaa_timeout:int64 ->
?connect_delay:int64 ->
?connect_timeout:int64 ->
?resolve_timeout:int64 ->
?resolve_retries:int ->
unit ->
daemon * stack
?aaaa_timeout:int64
-> ?connect_delay:int64
-> ?connect_timeout:int64
-> ?resolve_timeout:int64
-> ?resolve_retries:int
-> unit
-> daemon * stack
val inject_resolver : getaddrinfo:getaddrinfo -> stack -> unit
(** [inject_resolver ~getaddrinfo stack] injects a DNS resolver into the given
@ -38,26 +38,26 @@ val kill : daemon -> unit
The user {b must} call at the end of its application this function. *)
val connect_ip :
stack ->
(Ipaddr.t * int) list ->
((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
stack
-> (Ipaddr.t * int) list
-> ((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
(** [connect_ip t addresses] establishes a connection to [addresses]. *)
val connect_host :
stack ->
[ `host ] Domain_name.t ->
int list ->
((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
stack
-> [ `host ] Domain_name.t
-> int list
-> ((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
(** [connect_host t host ports] establishes a connection to [host] on [ports]
(tried in sequence).
@raise Failure if [ports] is empty. *)
val connect_endpoint :
stack ->
string ->
int list ->
((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
stack
-> string
-> int list
-> ((Ipaddr.t * int) * Miou_unix.file_descr, [> `Msg of string ]) result
(** [connect_endpoint t host ports] establishes a connection to [host] on
[ports], which may be a host name or an IP address.

240
src/http_miou_client.ml Normal file
View File

@ -0,0 +1,240 @@
open Http_miou_unix
module TLS_for_httpaf = struct
include TLS
let shutdown flow _ = Miou_unix.disown flow.flow
end
module Httpaf_Client_connection = struct
include Httpaf.Client_connection
let yield_reader _ = assert false
let next_read_operation t =
(next_read_operation t :> [ `Close | `Read | `Yield ])
end
module A = Runtime.Make (TLS_for_httpaf) (Httpaf_Client_connection)
module B = Runtime.Make (TCP) (Httpaf_Client_connection)
module C = Runtime.Make (TLS) (H2.Client_connection)
module D = Runtime.Make (TCP) (H2.Client_connection)
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of TLS.t | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of Httpaf.Response.t | `V2 of H2.Response.t ]
type 'body body =
{ body : 'body
; write_string : 'body -> ?off:int -> ?len:int -> string -> unit
; close : 'body -> unit
; release : unit -> unit
}
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t body) version
| V2 : (H2.Response.t, H2.Body.Writer.t body) version
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
| `Protocol of string ]
let pp_error ppf = function
| `V1 (`Malformed_response msg) ->
Fmt.pf ppf "Malformed HTTP/1.1 response: %s" msg
| `V1 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V1 (`Exn exn) | `V2 (`Exn exn) ->
Fmt.pf ppf "Got an unexpected exception: %S" (Printexc.to_string exn)
| `V2 (`Malformed_response msg) -> Fmt.pf ppf "Malformed H2 response: %s" msg
| `V2 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V2 (`Protocol_error (err, msg)) ->
Fmt.pf ppf "Protocol error %a: %s" H2.Error_code.pp_hum err msg
| `Protocol msg -> Fmt.string ppf msg
type ('resp, 'acc) await = unit -> ('resp * 'acc, error) result
type 'acc process =
| Process :
('resp, 'body) version * ('resp, 'acc) await * 'body
-> 'acc process
let src = Logs.Src.create "http-miou-client"
module Log = (val Logs.src_log src : Logs.LOG)
(* NOTE(dinosaure): we avoid first-class module here. *)
let run ~f acc config flow request =
let response : response option ref = ref None
and error = ref None
and acc = ref acc in
let error_handler err =
Log.err (fun m -> m "got an error: %a" pp_error err);
match err with
| `V1 (`Exn (Runtime.Flow msg)) | `V2 (`Exn (Runtime.Flow msg)) ->
error := Some (`Protocol msg)
| err -> error := Some err
in
let response_handler ?(shutdown = Fun.const ()) = function
| `V1 (resp, body) ->
let rec on_eof = shutdown
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f (`V1 resp) !acc str;
Httpaf.Body.schedule_read body ~on_read ~on_eof
in
response := Some (`V1 resp);
Httpaf.Body.schedule_read body ~on_read ~on_eof
| `V2 (resp, body) ->
let rec on_eof = shutdown
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f (`V2 resp) !acc str;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
response := Some (`V2 resp);
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
let give =
match flow with
| `Tls flow -> [ Miou_unix.owner flow.TLS.flow ]
| `Tcp flow -> [ Miou_unix.owner flow ]
in
match (flow, config, request) with
| `Tls flow, `V1 config, `V1 request ->
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let disown flow = Miou_unix.disown flow.TLS.flow in
let response_handler resp body = response_handler (`V1 (resp, body)) in
let error_handler error = error_handler (`V1 error) in
let body, conn =
Httpaf.Client_connection.request ~config request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { Runtime.protect }, prm, close =
Log.debug (fun m -> m "start an http/1.1 request over TLS");
A.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let release () =
Runtime.terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (Httpaf.Body.write_string body ?off ?len) str
in
let close body = protect ~orphans Httpaf.Body.close_writer body in
let body = { body; write_string; close; release } in
Process (V1, await, body)
| `Tcp flow, `V1 config, `V1 request ->
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let disown = Miou_unix.disown in
let response_handler resp body = response_handler (`V1 (resp, body)) in
let error_handler error = error_handler (`V1 error) in
let body, conn =
Httpaf.Client_connection.request ~config request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { Runtime.protect }, prm, close =
B.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let release () =
Runtime.terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (Httpaf.Body.write_string body ?off ?len) str
in
let close body = protect ~orphans Httpaf.Body.close_writer body in
let body = { body; write_string; close; release } in
Process (V1, await, body)
| `Tls flow, `V2 config, `V2 request ->
let read_buffer_size = config.H2.Config.read_buffer_size in
let disown flow = Miou_unix.disown flow.TLS.flow in
let error_handler error = error_handler (`V2 error) in
let conn = H2.Client_connection.create ~config ~error_handler () in
let shutdown () = H2.Client_connection.shutdown conn in
let response_handler resp body =
response_handler ~shutdown (`V2 (resp, body))
in
let body =
H2.Client_connection.request conn request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { Runtime.protect }, prm, close =
Log.debug (fun m -> m "start an h2 request over TLS");
C.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V2 (`Exn exn))
| Ok (), None, Some (`V2 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V1 _) | None) -> assert false
in
let release () =
Runtime.terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (H2.Body.Writer.write_string body ?off ?len) str
in
let close body =
Log.debug (fun m -> m "close the stream from the application level");
protect ~orphans H2.Body.Writer.close body
in
let body = { body; write_string; close; release } in
Process (V2, await, body)
| `Tcp flow, `V2 config, `V2 request ->
let read_buffer_size = config.H2.Config.read_buffer_size in
let disown = Miou_unix.disown in
let error_handler error = error_handler (`V2 error) in
let conn = H2.Client_connection.create ~config ~error_handler () in
let shutdown () = H2.Client_connection.shutdown conn in
let response_handler resp body =
response_handler ~shutdown (`V2 (resp, body))
in
let body =
H2.Client_connection.request conn request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { Runtime.protect }, prm, close =
D.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V2 (`Exn exn))
| Ok (), None, Some (`V2 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V1 _) | None) -> assert false
in
let release () =
Runtime.terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (H2.Body.Writer.write_string body ?off ?len) str
in
let close body = protect ~orphans H2.Body.Writer.close body in
let body = { body; write_string; close; release } in
Process (V2, await, body)
| _ -> Fmt.invalid_arg "Http_miou_unix.run: incompatible arguments"

37
src/http_miou_client.mli Normal file
View File

@ -0,0 +1,37 @@
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of Http_miou_unix.TLS.t | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of Httpaf.Response.t | `V2 of H2.Response.t ]
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
| `Protocol of string ]
val pp_error : error Fmt.t
type 'body body =
{ body : 'body
; write_string : 'body -> ?off:int -> ?len:int -> string -> unit
; close : 'body -> unit
; release : unit -> unit
}
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t body) version
| V2 : (H2.Response.t, H2.Body.Writer.t body) version
type ('resp, 'acc) await = unit -> ('resp * 'acc, error) result
type 'acc process =
| Process :
('resp, 'body) version * ('resp, 'acc) await * 'body
-> 'acc process
val run :
f:(response -> 'acc -> string -> 'acc)
-> 'acc
-> config
-> flow
-> request
-> 'acc process

204
src/http_miou_server.ml Normal file
View File

@ -0,0 +1,204 @@
open Http_miou_unix
module TLS_for_httpaf = struct
include TLS
let shutdown flow _ = Miou_unix.disown flow.flow
end
module A = Runtime.Make (TLS_for_httpaf) (Httpaf.Server_connection)
module B = Runtime.Make (TCP) (Httpaf.Server_connection)
module C = Runtime.Make (TLS) (H2.Server_connection)
[@@@warning "-34"]
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of TLS.t | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of Httpaf.Response.t | `V2 of H2.Response.t ]
type error =
[ `V1 of Httpaf.Server_connection.error
| `V2 of H2.Server_connection.error
| `Protocol of string ]
let pp_error ppf = function
| `V1 `Bad_request -> Fmt.string ppf "Bad HTTP/1.1 request"
| `V1 `Bad_gateway -> Fmt.string ppf "Bad HTTP/1.1 gateway"
| `V1 `Internal_server_error | `V2 `Internal_server_error ->
Fmt.string ppf "Internal server error"
| `V1 (`Exn exn) | `V2 (`Exn exn) ->
Fmt.pf ppf "Got an unexpected exception: %S" (Printexc.to_string exn)
| `V2 `Bad_request -> Fmt.string ppf "Bad H2 request"
| `Protocol msg -> Fmt.string ppf msg
let src = Logs.Src.create "http-miou-server"
module Log = (val Logs.src_log src : Logs.LOG)
exception Body_already_sent
type stream =
{ write_string : ?off:int -> ?len:int -> string -> unit
; write_bigstring : ?off:int -> ?len:int -> Bigstringaf.t -> unit
; close : unit -> unit
}
type _ Effect.t += String : response * string -> unit Effect.t
type _ Effect.t += Bigstring : response * Bigstringaf.t -> unit Effect.t
type _ Effect.t += Stream : response -> stream Effect.t
let string response str = Effect.perform (String (response, str))
let bigstring response bstr = Effect.perform (Bigstring (response, bstr))
let stream response = Effect.perform (Stream response)
type error_handler =
?request:request -> error -> (H2.Headers.t -> stream) -> unit
type handler = request -> unit
let pp_sockaddr ppf = function
| Unix.ADDR_UNIX name -> Fmt.pf ppf "<%s>" name
| Unix.ADDR_INET (inet_addr, port) ->
Fmt.pf ppf "%s:%d" (Unix.string_of_inet_addr inet_addr) port
let rec basic_handler ~exnc =
let open Effect.Shallow in
let fail k = discontinue_with k Body_already_sent (basic_handler ~exnc) in
let retc = Fun.id in
let effc :
type c. c Effect.t -> ((c, 'a) Effect.Shallow.continuation -> 'b) option =
function
| String _ | Bigstring _ | Stream _ ->
Log.err (fun m -> m "the user wants to write to the peer a second time");
Some fail
| _ -> None
in
{ retc; exnc; effc }
let httpaf_handler ~sockaddr ~protect:{ Runtime.protect } ~orphans ~handler reqd
=
let open Httpaf in
let open Effect.Shallow in
let retc = Fun.id in
let exnc = protect ~orphans (Reqd.report_exn reqd) in
let effc :
type c. c Effect.t -> ((c, 'a) Effect.Shallow.continuation -> 'b) option =
function
| String (`V1 response, str) ->
Log.debug (fun m -> m "write a http/1.1 response and its body");
protect ~orphans (Reqd.respond_with_string reqd response) str;
let handler = basic_handler ~exnc in
Some (fun k -> continue_with k () handler)
| Stream (`V1 response) ->
let body =
protect ~orphans (Reqd.respond_with_streaming reqd) response
in
let write_string ?off ?len str =
protect ~orphans (Body.write_string body ?off ?len) str
in
let write_bigstring ?off ?len bstr =
protect ~orphans (Body.write_bigstring body ?off ?len) bstr
in
let close () = protect ~orphans Body.close_writer body in
let stream = { write_string; write_bigstring; close } in
let handler = basic_handler ~exnc in
Some (fun k -> continue_with k stream handler)
| _ -> None
in
let fn request =
handler request;
Runtime.terminate orphans;
Log.debug (fun m -> m "the handler for %a has ended" pp_sockaddr sockaddr)
in
continue_with (fiber fn) (`V1 (Reqd.request reqd)) { retc; exnc; effc }
let rec clean orphans =
match Miou.care orphans with
| Some (Some prm) ->
Miou.await_exn prm;
clean orphans
| Some None | None -> ()
let default_error_handler ?request:_ _err _respond = ()
let accept_or_stop ?stop file_descr =
match stop with
| None -> `Accept (Miou_unix.accept file_descr)
| Some stop -> (
let accept =
Miou.call_cc ~give:[ Miou_unix.owner file_descr ] @@ fun () ->
let file_descr', sockaddr = Miou_unix.accept file_descr in
Miou_unix.disown file_descr;
`Accept (Miou_unix.transfer file_descr', sockaddr)
in
let rec go () = if Atomic.get stop then `Stop else go (Miou.yield ()) in
Miou.await_first [ accept; Miou.call_cc go ] |> function
| Ok value -> value
| Error exn -> raise exn)
let clear ?stop ?(config = Httpaf.Config.default)
?error_handler:(user's_error_handler = default_error_handler) ~handler
file_descr =
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let rec go orphans file_descr =
Log.debug (fun m ->
m "waiting for a new connection or a stop signal from the user");
match accept_or_stop ?stop file_descr with
| `Stop ->
Log.debug (fun m -> m "terminate the clear http server");
Runtime.terminate orphans
| `Accept (file_descr', sockaddr) ->
Log.debug (fun m -> m "receive a new client: %a" pp_sockaddr sockaddr);
clean orphans;
let give = [ Miou_unix.owner file_descr' ] in
let _ =
Miou.call ~orphans ~give @@ fun () ->
let orphans = Miou.orphans () in
let rec error_handler ?request err respond =
let { Runtime.protect }, _, _ = Lazy.force process in
let respond hdrs =
let open Httpaf in
let hdrs = Httpaf.Headers.of_list (H2.Headers.to_list hdrs) in
let body = protect ~orphans respond hdrs in
let write_string ?off ?len str =
protect ~orphans (Body.write_string body ?off ?len) str
in
let write_bigstring ?off ?len bstr =
protect ~orphans (Body.write_bigstring body ?off ?len) bstr
in
let close () = protect ~orphans Body.close_writer body in
{ write_string; write_bigstring; close }
in
let request = Option.map (fun request -> `V1 request) request in
match err with
| `Exn (Runtime.Flow msg) ->
user's_error_handler ?request (`Protocol msg :> error) respond
| err -> user's_error_handler ?request (`V1 err) respond
and request_handler reqd =
let protect, _, _ = Lazy.force process in
httpaf_handler ~sockaddr ~protect ~orphans ~handler reqd
and conn =
lazy
(Httpaf.Server_connection.create ~config ~error_handler
request_handler)
and process =
lazy
(B.run (Lazy.force conn) ~give ~disown:Miou_unix.disown
~read_buffer_size file_descr')
in
let _, prm, close = Lazy.force process in
Log.debug (fun m -> m "the http/1.1 server connection is launched");
let _result = Miou.await prm in
Log.debug (fun m ->
m "clean everything for the client %a" pp_sockaddr sockaddr);
Runtime.terminate orphans;
(* TODO(dinosaure): are you sure? [httpaf_handler] already did it. *)
close ();
Log.debug (fun m ->
m "the process for %a is cleaned" pp_sockaddr sockaddr)
in
Miou_unix.disown file_descr';
go orphans file_descr
in
go (Miou.orphans ()) file_descr

29
src/http_miou_server.mli Normal file
View File

@ -0,0 +1,29 @@
type error
val pp_error : error Fmt.t
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of Httpaf.Response.t | `V2 of H2.Response.t ]
type stream =
{ write_string : ?off:int -> ?len:int -> string -> unit
; write_bigstring : ?off:int -> ?len:int -> Bigstringaf.t -> unit
; close : unit -> unit
}
val string : response -> string -> unit
val bigstring : response -> Bigstringaf.t -> unit
val stream : response -> stream
type error_handler =
?request:request -> error -> (H2.Headers.t -> stream) -> unit
type handler = request -> unit
val clear :
?stop:bool Atomic.t
-> ?config:Httpaf.Config.t
-> ?error_handler:error_handler
-> handler:handler
-> Miou_unix.file_descr
-> unit

View File

@ -1,391 +1,7 @@
(*----------------------------------------------------------------------------
Copyright (c) 2018 Inhabited Type LLC.
Copyright (c) 2018 Anton Bachin
Copyright (c) 2023 Robur
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)
module type RUNTIME = sig
type t
val next_read_operation : t -> [ `Read | `Yield | `Close ]
val read : t -> Bigstringaf.t -> off:int -> len:int -> int
val read_eof : t -> Bigstringaf.t -> off:int -> len:int -> int
val yield_reader : t -> (unit -> unit) -> unit
val next_write_operation :
t -> [ `Write of Bigstringaf.t Faraday.iovec list | `Close of int | `Yield ]
val report_write_result : t -> [ `Ok of int | `Closed ] -> unit
val yield_writer : t -> (unit -> unit) -> unit
val report_exn : t -> exn -> unit
end
module Buffer : sig
type t
val create : int -> t
val get : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
val put : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
end = struct
type t = {
mutable buffer : Bigstringaf.t;
mutable off : int;
mutable len : int;
}
let create size =
let buffer = Bigstringaf.create size in
{ buffer; off = 0; len = 0 }
let compress t =
if t.len = 0 then begin
t.off <- 0;
t.len <- 0
end
else if t.off > 0 then begin
Bigstringaf.blit t.buffer ~src_off:t.off t.buffer ~dst_off:0 ~len:t.len;
t.off <- 0
end
let get t ~f =
let n = f t.buffer ~off:t.off ~len:t.len in
t.off <- t.off + n;
t.len <- t.len - n;
if t.len = 0 then t.off <- 0;
n
let put t ~f =
compress t;
let off = t.off + t.len in
let buf = t.buffer in
if Bigstringaf.length buf = t.len then begin
t.buffer <- Bigstringaf.create (2 * Bigstringaf.length buf);
Bigstringaf.blit buf ~src_off:t.off t.buffer ~dst_off:0 ~len:t.len
end;
let n = f t.buffer ~off ~len:(Bigstringaf.length t.buffer - off) in
t.len <- t.len + n;
n
end
let src = Logs.Src.create "http-miou-unix"
module Log = (val Logs.src_log src : Logs.LOG)
let catch ~on fn =
try fn ()
with exn ->
Log.err (fun m ->
m "Got an unexpected exception: %S" (Printexc.to_string exn));
on exn
exception Flow of string
module type S = sig
type conn
type flow
type protect = {
protect : 'a 'b. orphans:unit Miou.orphans -> ('a -> 'b) -> 'a -> 'b;
}
[@@unboxed]
val run :
conn ->
?give:Miou.Ownership.t list ->
?disown:(flow -> unit) ->
read_buffer_size:int ->
flow ->
protect * unit Miou.t * (unit -> unit)
end
let rec terminate orphans =
match Miou.care orphans with
| None -> Miou.yield ()
| Some None ->
Miou.yield ();
terminate orphans
| Some (Some prm) ->
Miou.await_exn prm;
terminate orphans
module Make (Flow : Flow.S) (Runtime : RUNTIME) :
S with type conn = Runtime.t and type flow = Flow.t = struct
type conn = Runtime.t
type flow = Flow.t
let recv flow buffer =
let bytes_read =
Buffer.put buffer ~f:(fun bstr ~off:dst_off ~len ->
let buf = Bytes.create len in
match Flow.read flow buf ~off:0 ~len with
| Ok 0 -> 0
| Ok len ->
Bigstringaf.blit_from_bytes buf ~src_off:0 bstr ~dst_off ~len;
len
| Error err ->
Log.err (fun m ->
m "close the socket (recv) due to: %a" Flow.pp_error err);
Flow.close flow;
raise (Flow (Fmt.str "%a" Flow.pp_error err)))
in
if bytes_read = 0 then `Eof else `Ok bytes_read
let writev flow bstrs =
let copy { Faraday.buffer; off; len } = Bigstringaf.copy buffer ~off ~len in
let css = List.map copy bstrs |> List.map Cstruct.of_bigarray in
match Flow.writev flow css with
| Ok () ->
let len = List.fold_left (fun a { Cstruct.len; _ } -> a + len) 0 css in
`Ok len
| Error err ->
Log.err (fun m ->
m "close the socket (writev) due to: %a" Flow.pp_error err);
Flow.close flow;
`Closed
type prm = Miou.Promise.Uid.t * Miou.Domain.Uid.t * int
type _ Effect.t += Spawn : (prm:prm -> unit -> unit) -> unit Effect.t
let pp_prm ppf (uid, runner, resources) =
Fmt.pf ppf "[%a:%a](%d)" Miou.Domain.Uid.pp runner Miou.Promise.Uid.pp uid
resources
let launch ?give ?orphans fn k =
let prm =
Miou.call_cc ?orphans ?give @@ fun () ->
let prm = Miou.self () in
Log.debug (fun m -> m "%a launched" pp_prm prm);
fn ~prm ()
in
let cs = Effect.Deep.get_callstack k 1_000_000 in
Log.debug (fun m -> m "%a is launched from:" Miou.Promise.pp prm);
Log.debug (fun m -> m "%s" (Printexc.raw_backtrace_to_string cs));
Effect.Deep.continue k ()
(* Protected runtime operations.
A note on design and the need to "protect" the appearance of a new task.
Miou constrains us on 2 points:
1) we are obliged to observe the result of a task ([Miou.await]), otherwise
Miou fails.
2) only the task creator can observe the result. For example, this code
doesn't work:
{[
let prm = Miou.call @@ fun () -> Miou.call (Fun.const ()) in
Miou.await_exn (Miou.await_exn prm)
(* the second promise (in [prm]) can only be awaited in [prm]. *)
]}
The first rule requires us to use [Miou.orphans], which allows tasks to be
stored in the background. We can then catch up with them, which is what the
[terminate] function does. The second rule is more complicated...
The runtime ([httpaf] or [h2]) may be able to stop a task but force the
user to give a "continuation" allowing the same task to be restarted at the
right time (when the user wants to do a [Body.write_string], for example).
This is the case when [`Yield] is received from the runtime. This means
that there are continuations (in the runtime) which can cause tasks to
appear and which should be saved in a [Miou.orphans].
The problem is that it's not the task that gives the continuation that will
launch the subtask... So the best protection is to execute runtime
functions (which may launch subtasks) with a specific [Miou.orphan].
Continuation, on the other hand, consists in raising an effect that will be
protected with a specific [Miou.orphans]. In other words, the subtask is
saved not in the task [Miou.orphans] that creates the continuation, but in
the [Miou.orphans]'s task that creates **effectfully** the subtask.
So, after the ceremony to start an HTTP request, we need to protect the
functions that interact with the Body as well, because they can create
tasks too...
{[
let orphans, body, prm = run_http_request flow in
Body.write_string body "Hello World";
(* a new task appears to write ["Hello World!"] which will be saved into
orphans. *)
Body.close body;
(* a new task appears to finish the HTTP request. *)
let result = await prm in
terminate orphans
]}
*)
let protect ?give ?orphans fn v =
let retc = Fun.id in
let exnc = raise in
let open Effect.Deep in
let effc : type c. c Effect.t -> ((c, 'b) continuation -> 'b) option =
function
| Spawn fn -> Some (launch ?give ?orphans fn)
| _ -> None
in
match_with fn v { retc; exnc; effc }
let next_read_operation ?give ?orphans =
protect ?give ?orphans Runtime.next_read_operation
let next_write_operation ?give ?orphans =
protect ?give ?orphans Runtime.next_write_operation
let read ?give ?orphans conn bstr ~off ~len =
protect ?give ?orphans (Runtime.read conn ~off ~len) bstr
let read_eof ?give ?orphans conn bstr ~off ~len =
protect ?give ?orphans (Runtime.read_eof conn ~off ~len) bstr
let report_exn ?give ?orphans ?(close = Fun.const ()) conn exn =
Log.err (fun m -> m "report an exception: %S" (Printexc.to_string exn));
protect ?give ?orphans (Runtime.report_exn conn) exn;
Option.iter terminate orphans;
close ()
let report_write_result ?give ?orphans conn =
protect ?give ?orphans (Runtime.report_write_result conn)
let yield_reader ?give ?orphans conn =
protect ?give ?orphans (Runtime.yield_reader conn)
let yield_writer ?give ?orphans conn =
protect ?give ?orphans (Runtime.yield_writer conn)
type protect = {
protect : 'a 'b. orphans:unit Miou.orphans -> ('a -> 'b) -> 'a -> 'b;
}
[@@unboxed]
let run conn ?(give = []) ?(disown = Fun.const ()) ~read_buffer_size flow =
let buffer = Buffer.create read_buffer_size in
let closed = ref false in
let close () =
if not !closed then (
Flow.close flow;
closed := true)
else disown flow
in
let rec reader ~prm () =
Log.debug (fun m -> m "%a starts the reading loop" pp_prm prm);
let rec go orphans () =
match next_read_operation ~orphans ~give conn with
| `Read -> (
Log.debug (fun m -> m "%a next read operation: `read" pp_prm prm);
let read_eof = read_eof ~orphans ~give in
let read = read ~orphans ~give in
match recv flow buffer with
| `Eof ->
Buffer.get buffer ~f:(fun bstr ~off ~len ->
read_eof conn bstr ~off ~len)
|> ignore;
go orphans ()
| `Ok _ ->
Buffer.get buffer ~f:(fun bstr ~off ~len ->
read conn bstr ~off ~len)
|> ignore;
go orphans ())
| `Yield ->
Log.debug (fun m -> m "%a next read operation: `yield" pp_prm prm);
let continuation () =
let prm = Miou.self () in
Log.debug (fun m -> m "%a launches a new task" pp_prm prm);
Effect.perform (Spawn reader)
in
yield_reader conn ~orphans ~give continuation;
disown flow;
terminate orphans
| `Close ->
Log.debug (fun m ->
m "%a read: disown the file-descriptor" pp_prm prm);
disown flow;
terminate orphans
in
let orphans = Miou.orphans () in
catch ~on:(report_exn conn ~orphans ~give ~close) @@ fun () ->
go orphans ()
in
let rec writer ~prm () =
Log.debug (fun m -> m "%a starts to the writing loop" pp_prm prm);
let rec go orphans () =
match next_write_operation ~orphans ~give conn with
| `Write iovecs ->
Log.debug (fun m -> m "%a next write operation: `write" pp_prm prm);
writev flow iovecs |> report_write_result conn ~orphans ~give;
go orphans ()
| `Yield ->
Log.debug (fun m -> m "%a next write operation: `yield" pp_prm prm);
let continuation () =
let prm = Miou.self () in
Log.debug (fun m -> m "%a launches a new task" pp_prm prm);
Effect.perform (Spawn writer)
in
yield_writer conn ~orphans ~give continuation;
disown flow;
terminate orphans
| `Close _ ->
Log.debug (fun m -> m "%a next write operation: `close" pp_prm prm);
Flow.shutdown flow `Send;
terminate orphans
in
let orphans = Miou.orphans () in
catch ~on:(report_exn conn ~orphans ~give ~close) @@ fun () ->
go orphans ()
in
let protect ~orphans = protect ~orphans ~give in
let prm =
Miou.call_cc ~give @@ fun () ->
let p0 = Miou.call_cc ~give @@ fun () -> reader ~prm:(Miou.self ()) () in
let p1 = Miou.call_cc ~give @@ fun () -> writer ~prm:(Miou.self ()) () in
let result =
match Miou.await_all [ p0; p1 ] with
| [ Ok (); Ok () ] -> Ok ()
| [ Error exn; _ ] | [ _; Error exn ] -> Error exn
| _ -> assert false
in
Log.debug (fun m -> m "close the file-descriptor");
(* TODO(dinosaure): we should probably check the state of the underlying
flow and see if it's closed or not. I suspect that on [http/1.1], it's
not the case but Miou does not complain because we [disown]
everywhere. *)
match result with
| Ok () -> disown flow
| Error exn ->
close ();
raise exn
in
Log.debug (fun m -> m "the main task is: %a" Miou.Promise.pp prm);
({ protect }, prm, close)
end
module TCP = struct
type t = Miou_unix.file_descr
type error = Unix.error * string * string
@ -429,12 +45,14 @@ module TCP = struct
| `Send -> Miou_unix.shutdown flow Unix.SHUTDOWN_SEND
end
module TLS = Tls_miou.Make (TCP)
module TLS = struct
include Tls_miou.Make (TCP)
type tls = TLS.t
type tls_error = TLS.error
let pp_tls_error = TLS.pp_error
let close flow =
match flow.state with
| `Active _ -> close flow
| _ -> Miou_unix.disown flow.flow
end
let to_tls cfg ?host flow =
match TLS.client_of_flow cfg ?host flow with
@ -451,76 +69,8 @@ let epoch tls =
| `InitialEpoch -> assert false
| `Epoch data -> Some data)
module Httpaf_Client_connection = struct
include Httpaf.Client_connection
let yield_reader _ = assert false
let next_read_operation t =
(next_read_operation t :> [ `Close | `Read | `Yield ])
end
(* Implementations. *)
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of TLS.t | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of Httpaf.Response.t | `V2 of H2.Response.t ]
type 'body body = {
body : 'body;
write_string : 'body -> ?off:int -> ?len:int -> string -> unit;
close : 'body -> unit;
release : unit -> unit;
}
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t body) version
| V2 : (H2.Response.t, H2.Body.Writer.t body) version
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
| `Protocol of string ]
let pp_error ppf = function
| `V1 (`Malformed_response msg) ->
Fmt.pf ppf "Malformed HTTP/1.1 response: %s" msg
| `V1 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V1 (`Exn exn) | `V2 (`Exn exn) ->
Fmt.pf ppf "Got an unexpected exception: %S" (Printexc.to_string exn)
| `V2 (`Malformed_response msg) -> Fmt.pf ppf "Malformed H2 response: %s" msg
| `V2 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V2 (`Protocol_error (err, msg)) ->
Fmt.pf ppf "Protocol error %a: %s" H2.Error_code.pp_hum err msg
| `Protocol msg -> Fmt.string ppf msg
type ('resp, 'acc) await = unit -> ('resp * 'acc, error) result
type 'acc process =
| Process :
('resp, 'body) version * ('resp, 'acc) await * 'body
-> 'acc process
module TLS' = struct
include TLS
let close flow =
match flow.TLS.state with
| `Active _ -> close flow
| _ -> Miou_unix.disown flow.TLS.flow
end
module A =
Make
(struct
include TLS'
let shutdown flow _ = Miou_unix.disown flow.flow
end)
(Httpaf_Client_connection)
(* XXX(dinosaure): We need to make a serious note of this. The behavior of
http/1.1 with TLS seems rather "random" in the sense that some servers
deliver a close-notify to the client while others do nothing... Worse still,
@ -542,180 +92,3 @@ module A =
can be quite complex (long polling, h2, websocket...) and we're not immune to
forgetting or double-clicking. Miou fails in such situations, which forced me
to rethink the execution of an HTTP request without [Lwt.async] ;) ! *)
module B = Make (TCP) (Httpaf_Client_connection)
module C = Make (TLS') (H2.Client_connection)
module D = Make (TCP) (H2.Client_connection)
(* NOTE(dinosaure): we avoid first-class module here. *)
let run ~f acc config flow request =
let response : response option ref = ref None
and error = ref None
and acc = ref acc in
let error_handler err =
Log.err (fun m -> m "Got an error: %a" pp_error err);
match err with
| `V1 (`Exn (Flow msg)) | `V2 (`Exn (Flow msg)) ->
error := Some (`Protocol msg)
| err -> error := Some err
in
let response_handler ?(shutdown = Fun.const ()) = function
| `V1 (resp, body) ->
let rec on_eof = shutdown
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f (`V1 resp) !acc str;
Httpaf.Body.schedule_read body ~on_read ~on_eof
in
response := Some (`V1 resp);
Httpaf.Body.schedule_read body ~on_read ~on_eof
| `V2 (resp, body) ->
let rec on_eof = shutdown
and on_read bstr ~off ~len =
let str = Bigstringaf.substring bstr ~off ~len in
acc := f (`V2 resp) !acc str;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
response := Some (`V2 resp);
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
let give =
match flow with
| `Tls flow -> [ Miou_unix.owner flow.TLS.flow ]
| `Tcp flow -> [ Miou_unix.owner flow ]
in
match (flow, config, request) with
| `Tls flow, `V1 config, `V1 request ->
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let disown flow = Miou_unix.disown flow.TLS.flow in
let response_handler resp body = response_handler (`V1 (resp, body)) in
let error_handler error = error_handler (`V1 error) in
let body, conn =
Httpaf.Client_connection.request ~config request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { A.protect }, prm, close =
Log.debug (fun m -> m "start an http/1.1 request over TLS");
A.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let release () =
terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (Httpaf.Body.write_string body ?off ?len) str
in
let close body = protect ~orphans Httpaf.Body.close_writer body in
let body = { body; write_string; close; release } in
Process (V1, await, body)
| `Tcp flow, `V1 config, `V1 request ->
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
let disown = Miou_unix.disown in
let response_handler resp body = response_handler (`V1 (resp, body)) in
let error_handler error = error_handler (`V1 error) in
let body, conn =
Httpaf.Client_connection.request ~config request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { B.protect }, prm, close =
B.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V1 (`Exn exn))
| Ok (), None, Some (`V1 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V2 _) | None) -> assert false
in
let release () =
terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (Httpaf.Body.write_string body ?off ?len) str
in
let close body = protect ~orphans Httpaf.Body.close_writer body in
let body = { body; write_string; close; release } in
Process (V1, await, body)
| `Tls flow, `V2 config, `V2 request ->
let read_buffer_size = config.H2.Config.read_buffer_size in
let disown flow = Miou_unix.disown flow.TLS.flow in
let error_handler error = error_handler (`V2 error) in
let conn = H2.Client_connection.create ~config ~error_handler () in
let shutdown () = H2.Client_connection.shutdown conn in
let response_handler resp body =
response_handler ~shutdown (`V2 (resp, body))
in
let body =
H2.Client_connection.request conn request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { C.protect }, prm, close =
Log.debug (fun m -> m "start an h2 request over TLS");
C.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V2 (`Exn exn))
| Ok (), None, Some (`V2 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V1 _) | None) -> assert false
in
let release () =
terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (H2.Body.Writer.write_string body ?off ?len) str
in
let close body =
Log.debug (fun m -> m "close the stream from the application level");
protect ~orphans H2.Body.Writer.close body
in
let body = { body; write_string; close; release } in
Process (V2, await, body)
| `Tcp flow, `V2 config, `V2 request ->
let read_buffer_size = config.H2.Config.read_buffer_size in
let disown = Miou_unix.disown in
let error_handler error = error_handler (`V2 error) in
let conn = H2.Client_connection.create ~config ~error_handler () in
let shutdown () = H2.Client_connection.shutdown conn in
let response_handler resp body =
response_handler ~shutdown (`V2 (resp, body))
in
let body =
H2.Client_connection.request conn request ~error_handler
~response_handler
in
let orphans = Miou.orphans () in
let { D.protect }, prm, close =
D.run conn ~give ~disown ~read_buffer_size flow
in
let await () =
match (Miou.await prm, !error, !response) with
| _, Some error, _ -> Error error
| Error exn, _, _ -> Error (`V2 (`Exn exn))
| Ok (), None, Some (`V2 response) -> Ok (response, !acc)
| Ok (), None, (Some (`V1 _) | None) -> assert false
in
let release () =
terminate orphans;
close ()
in
let write_string body ?off ?len str =
protect ~orphans (H2.Body.Writer.write_string body ?off ?len) str
in
let close body = protect ~orphans H2.Body.Writer.close body in
let body = { body; write_string; close; release } in
Process (V2, await, body)
| _ -> Fmt.invalid_arg "Http_miou_unix.run: incompatible arguments"

View File

@ -1,52 +1,10 @@
type tls
type tls_error
val pp_tls_error : tls_error Fmt.t
module TCP : Flow.S with type t = Miou_unix.file_descr
module TLS : module type of Tls_miou.Make (TCP)
val to_tls :
Tls.Config.client ->
?host:[ `host ] Domain_name.t ->
Miou_unix.file_descr ->
(tls, tls_error) result
Tls.Config.client
-> ?host:[ `host ] Domain_name.t
-> Miou_unix.file_descr
-> (TLS.t, TLS.error) result
val epoch : tls -> Tls.Core.epoch_data option
type config = [ `V1 of Httpaf.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of tls | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of Httpaf.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of Httpaf.Response.t | `V2 of H2.Response.t ]
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
| `Protocol of string ]
val pp_error : error Fmt.t
type 'body body = {
body : 'body;
write_string : 'body -> ?off:int -> ?len:int -> string -> unit;
close : 'body -> unit;
release : unit -> unit;
}
type ('resp, 'body) version =
| V1 : (Httpaf.Response.t, [ `write ] Httpaf.Body.t body) version
| V2 : (H2.Response.t, H2.Body.Writer.t body) version
type ('resp, 'acc) await = unit -> ('resp * 'acc, error) result
type 'acc process =
| Process :
('resp, 'body) version * ('resp, 'acc) await * 'body
-> 'acc process
val terminate : unit Miou.orphans -> unit
val run :
f:(response -> 'acc -> string -> 'acc) ->
'acc ->
config ->
flow ->
request ->
'acc process
val epoch : TLS.t -> Tls.Core.epoch_data option

View File

@ -8,6 +8,7 @@ let decode_host_port str =
match String.split_on_char ':' str with
| [] -> Error (`Msg "Empty host part")
| [ host ] -> Ok (host, None)
| [ host; "" ] -> Ok (host, None)
| hd :: tl -> (
let port, host =
match List.rev (hd :: tl) with
@ -17,8 +18,14 @@ let decode_host_port str =
try Ok (host, Some (int_of_string port))
with _ -> Error (`Msg "Couln't decode port"))
let decode_user_pass up =
match String.split_on_char ':' up with
| [ user; pass ] -> Ok (user, Some pass)
| [ user ] -> Ok (user, None)
| _ -> assert false
type uri =
bool * string * (string * string) option * string * int option * string
bool * string * (string * string option) option * string * int option * string
let decode_uri uri =
(* proto :// user : pass @ host : port / path *)
@ -29,11 +36,6 @@ let decode_uri uri =
else if String.equal proto "https:" then Ok ("https", true)
else Error (`Msg "Unknown protocol"))
>>= fun (scheme, is_tls) ->
let decode_user_pass up =
match String.split_on_char ':' up with
| [ user; pass ] -> Ok (user, pass)
| _ -> Error (`Msg "Couldn't decode user and password")
in
(match String.split_on_char '@' user_pass_host_port with
| [ host_port ] -> Ok (None, host_port)
| [ user_pass; host_port ] ->
@ -42,14 +44,37 @@ let decode_uri uri =
>>= fun (user_pass, host_port) ->
decode_host_port host_port >>= fun (host, port) ->
Ok (is_tls, scheme, user_pass, host, port, "/" ^ String.concat "/" path)
| [ user_pass_host_port ] ->
(match String.split_on_char '@' user_pass_host_port with
| [ host_port ] -> Ok (None, host_port)
| [ user_pass; host_port ] ->
decode_user_pass user_pass >>= fun up -> Ok (Some up, host_port)
| _ -> Error (`Msg "Couldn't decode URI"))
>>= fun (user_pass, host_port) ->
decode_host_port host_port >>= fun (host, port) ->
Ok (false, "", user_pass, host, port, "/")
| user_pass_host_port :: path ->
(match String.split_on_char '@' user_pass_host_port with
| [ host_port ] -> Ok (None, host_port)
| [ user_pass; host_port ] ->
decode_user_pass user_pass >>= fun up -> Ok (Some up, host_port)
| _ -> Error (`Msg "Couldn't decode URI"))
>>= fun (user_pass, host_port) ->
decode_host_port host_port >>= fun (host, port) ->
Ok (false, "", user_pass, host, port, "/" ^ String.concat "/" path)
| _ -> Error (`Msg "Could't decode URI on top")
let add_authentication ~add headers = function
| None -> headers
| Some (user, pass) ->
let add_authentication ?(meth = `Basic) ~add headers user_pass =
match (user_pass, meth) with
| None, _ -> headers
| Some (user, Some pass), `Basic ->
let data = Base64.encode_string (user ^ ":" ^ pass) in
let s = "Basic " ^ data in
add headers "authorization" s
let str = "Basic " ^ data in
add headers "authorization" str
| Some (user, None), `Basic ->
let data = Base64.encode_string user in
let str = "Basic " ^ data in
add headers "authorization" str
let user_agent = "http-client/%%VERSION_NUM%%"
@ -108,33 +133,43 @@ module Version = Httpaf.Version
module Status = H2.Status
module Headers = H2.Headers
type response = {
version : Version.t;
status : Status.t;
reason : string;
headers : Headers.t;
}
type response =
{ version : Version.t
; status : Status.t
; reason : string
; headers : Headers.t
}
type error =
[ `V1 of Httpaf.Client_connection.error
| `V2 of H2.Client_connection.error
| `Protocol of string
| `Msg of string
| `Tls of Http_miou_unix.tls_error ]
| `Tls of Http_miou_unix.TLS.error ]
let pp_error ppf = function
| #Http_miou_unix.error as err -> Http_miou_unix.pp_error ppf err
| `Protocol msg -> Fmt.string ppf msg
| `Msg msg -> Fmt.string ppf msg
| `Tls err -> Http_miou_unix.pp_tls_error ppf err
| `Tls err -> Http_miou_unix.TLS.pp_error ppf err
| `V1 (`Malformed_response msg) ->
Fmt.pf ppf "Malformed HTTP/1.1 response: %s" msg
| `V1 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V1 (`Exn exn) | `V2 (`Exn exn) ->
Fmt.pf ppf "Got an unexpected exception: %S" (Printexc.to_string exn)
| `V2 (`Malformed_response msg) -> Fmt.pf ppf "Malformed H2 response: %s" msg
| `V2 (`Invalid_response_body_length _resp) ->
Fmt.pf ppf "Invalid response body length"
| `V2 (`Protocol_error (err, msg)) ->
Fmt.pf ppf "Protocol error %a: %s" H2.Error_code.pp_hum err msg
let from_httpaf response =
{
version = response.Httpaf.Response.version;
status = (response.Httpaf.Response.status :> H2.Status.t);
reason = response.Httpaf.Response.reason;
headers =
{ version = response.Httpaf.Response.version
; status = (response.Httpaf.Response.status :> H2.Status.t)
; reason = response.Httpaf.Response.reason
; headers =
H2.Headers.of_list
(Httpaf.Headers.to_list response.Httpaf.Response.headers);
(Httpaf.Headers.to_list response.Httpaf.Response.headers)
}
let single_http_1_1_request ?(config = Httpaf.Config.default) flow user_pass
@ -143,10 +178,10 @@ let single_http_1_1_request ?(config = Httpaf.Config.default) flow user_pass
let headers = prep_http_1_1_headers headers host user_pass body_length in
let request = Httpaf.Request.create ~headers meth path in
let f response acc str =
let[@warning "-8"] (`V1 response : Http_miou_unix.response) = response in
let[@warning "-8"] (`V1 response : Http_miou_client.response) = response in
f (from_httpaf response) acc str
in
match Http_miou_unix.run ~f acc (`V1 config) flow (`V1 request) with
match Http_miou_client.run ~f acc (`V1 config) flow (`V1 request) with
| Process (V2, _, _) -> assert false
| Process (V1, await, { body = stream; write_string; close; release }) -> (
Option.iter (write_string stream) body;
@ -155,14 +190,13 @@ let single_http_1_1_request ?(config = Httpaf.Config.default) flow user_pass
release ();
match result with
| Ok (response, acc) -> Ok (from_httpaf response, acc)
| Error (#Http_miou_unix.error as err) -> Error (err :> error))
| Error (#Http_miou_client.error as err) -> Error (err :> error))
let from_h2 response =
{
version = { major = 2; minor = 0 };
status = response.H2.Response.status;
reason = "";
headers = response.H2.Response.headers;
{ version = { major = 2; minor = 0 }
; status = response.H2.Response.status
; reason = ""
; headers = response.H2.Response.headers
}
let single_h2_request ?(config = H2.Config.default) flow scheme user_pass host
@ -171,10 +205,10 @@ let single_h2_request ?(config = H2.Config.default) flow scheme user_pass host
let headers = prep_h2_headers headers host user_pass body_length in
let request = H2.Request.create ~scheme ~headers meth path in
let f response acc str =
let[@warning "-8"] (`V2 response : Http_miou_unix.response) = response in
let[@warning "-8"] (`V2 response : Http_miou_client.response) = response in
f (from_h2 response) acc str
in
match Http_miou_unix.run ~f acc (`V2 config) flow (`V2 request) with
match Http_miou_client.run ~f acc (`V2 config) flow (`V2 request) with
| Process (V1, _, _) -> assert false
| Process (V2, await, { body = stream; write_string; close; release }) -> (
Option.iter (write_string stream) body;
@ -183,7 +217,7 @@ let single_h2_request ?(config = H2.Config.default) flow scheme user_pass host
release ();
match result with
| Ok (response, acc) -> Ok (from_h2 response, acc)
| Error (#Http_miou_unix.error as err) -> Error (err :> error))
| Error (#Http_miou_client.error as err) -> Error (err :> error))
let alpn_protocol = function
| `Tcp _ -> None
@ -319,3 +353,5 @@ let request ?config ?tls_config ?authenticator ?(meth = `GET) ?(headers = [])
else Ok (resp, body)
in
follow_redirect max_redirect uri
module Server = Http_miou_server

View File

@ -1,35 +1,59 @@
(** HTTP client with Miou.
A HTTP client using the Miou scheduler. It does a single HTTP request
(though may follow redirects) to a remote uri. Both HTTP protocol 1.1 and
2.0 are supported. Both http and https (via the pure implementation
[ocaml-tls]) are supported. A connection is established via the
happy-eyeballs algorithm.
*)
type error
val pp_error : error Fmt.t
module Version = Httpaf.Version
module Status = H2.Status
module Headers = H2.Headers
(** Protocol Version
type response = {
version : Version.t;
status : Status.t;
reason : string;
headers : Headers.t;
}
Consists of [major.minor], in H2 this is [2.0]. *)
module Status = H2.Status
(** Response Status codes
A three-digit integer, the result of the request. *)
module Headers = H2.Headers
(** Header fields
Case-insensitive key-value pairs. *)
type response =
{ version : Version.t
; status : Status.t
; reason : string
; headers : Headers.t
}
(** A response, consisting of version, status, reason (HTTP 1.1 only), and
headers. *)
val request :
?config:[ `HTTP_1_1 of Httpaf.Config.t | `H2 of H2.Config.t ] ->
?tls_config:Tls.Config.client ->
?authenticator:X509.Authenticator.t ->
?meth:Httpaf.Method.t ->
?headers:(string * string) list ->
?body:string ->
?max_redirect:int ->
?follow_redirect:bool ->
f:(response -> 'a -> string -> 'a) ->
uri:string ->
'a ->
(response * 'a, error) result
?config:[ `HTTP_1_1 of Httpaf.Config.t | `H2 of H2.Config.t ]
-> ?tls_config:Tls.Config.client
-> ?authenticator:X509.Authenticator.t
-> ?meth:Httpaf.Method.t
-> ?headers:(string * string) list
-> ?body:string
-> ?max_redirect:int
-> ?follow_redirect:bool
-> f:(response -> 'a -> string -> 'a)
-> uri:string
-> 'a
-> (response * 'a, error) result
module Server = Http_miou_server
(**/**)
type uri =
bool * string * (string * string) option * string * int option * string
bool * string * (string * string option) option * string * int option * string
val decode_uri : string -> (uri, [> `Msg of string ]) result

379
src/runtime.ml Normal file
View File

@ -0,0 +1,379 @@
(*----------------------------------------------------------------------------
Copyright (c) 2018 Inhabited Type LLC.
Copyright (c) 2018 Anton Bachin
Copyright (c) 2023 Robur
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)
module type RUNTIME = sig
type t
val next_read_operation : t -> [ `Read | `Yield | `Close ]
val read : t -> Bigstringaf.t -> off:int -> len:int -> int
val read_eof : t -> Bigstringaf.t -> off:int -> len:int -> int
val yield_reader : t -> (unit -> unit) -> unit
val next_write_operation :
t -> [ `Write of Bigstringaf.t Faraday.iovec list | `Close of int | `Yield ]
val report_write_result : t -> [ `Ok of int | `Closed ] -> unit
val yield_writer : t -> (unit -> unit) -> unit
val report_exn : t -> exn -> unit
end
let src = Logs.Src.create "runtime"
module Log = (val Logs.src_log src : Logs.LOG)
module Buffer : sig
type t
val create : int -> t
val get : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
val put : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
end = struct
type t =
{ mutable buffer : Bigstringaf.t; mutable off : int; mutable len : int }
let create size =
let buffer = Bigstringaf.create size in
{ buffer; off = 0; len = 0 }
let compress t =
if t.len = 0 then begin
t.off <- 0;
t.len <- 0
end
else if t.off > 0 then begin
Bigstringaf.blit t.buffer ~src_off:t.off t.buffer ~dst_off:0 ~len:t.len;
t.off <- 0
end
let get t ~f =
let n = f t.buffer ~off:t.off ~len:t.len in
t.off <- t.off + n;
t.len <- t.len - n;
if t.len = 0 then t.off <- 0;
n
let put t ~f =
compress t;
let off = t.off + t.len in
let buf = t.buffer in
if Bigstringaf.length buf = t.len then begin
t.buffer <- Bigstringaf.create (2 * Bigstringaf.length buf);
Bigstringaf.blit buf ~src_off:t.off t.buffer ~dst_off:0 ~len:t.len
end;
let n = f t.buffer ~off ~len:(Bigstringaf.length t.buffer - off) in
t.len <- t.len + n;
n
end
type protect =
{ protect : 'a 'b. orphans:unit Miou.orphans -> ('a -> 'b) -> 'a -> 'b }
[@@unboxed]
let catch ~on fn =
try fn ()
with exn ->
Log.err (fun m ->
m "Got an unexpected exception: %S" (Printexc.to_string exn));
on exn
exception Flow of string
module type S = sig
type conn
type flow
val run :
conn
-> ?give:Miou.Ownership.t list
-> ?disown:(flow -> unit)
-> read_buffer_size:int
-> flow
-> protect * unit Miou.t * (unit -> unit)
end
let rec terminate orphans =
match Miou.care orphans with
| None -> Miou.yield ()
| Some None ->
Miou.yield ();
terminate orphans
| Some (Some prm) ->
Miou.await_exn prm;
terminate orphans
module Make (Flow : Flow.S) (Runtime : RUNTIME) :
S with type conn = Runtime.t and type flow = Flow.t = struct
type conn = Runtime.t
type flow = Flow.t
let recv flow buffer =
let bytes_read =
Buffer.put buffer ~f:(fun bstr ~off:dst_off ~len ->
let buf = Bytes.create len in
match Flow.read flow buf ~off:0 ~len with
| Ok 0 -> 0
| Ok len ->
Bigstringaf.blit_from_bytes buf ~src_off:0 bstr ~dst_off ~len;
len
| Error err ->
Log.err (fun m ->
m "close the socket (recv) due to: %a" Flow.pp_error err);
Flow.close flow;
raise (Flow (Fmt.str "%a" Flow.pp_error err)))
in
if bytes_read = 0 then `Eof else `Ok bytes_read
let writev flow bstrs =
let copy { Faraday.buffer; off; len } = Bigstringaf.copy buffer ~off ~len in
let css = List.map copy bstrs |> List.map Cstruct.of_bigarray in
match Flow.writev flow css with
| Ok () ->
let len = List.fold_left (fun a { Cstruct.len; _ } -> a + len) 0 css in
`Ok len
| Error err ->
Log.err (fun m ->
m "close the socket (writev) due to: %a" Flow.pp_error err);
Flow.close flow;
`Closed
type prm = Miou.Promise.Uid.t * Miou.Domain.Uid.t * int
type _ Effect.t += Spawn : (prm:prm -> unit -> unit) -> unit Effect.t
let pp_prm ppf (uid, runner, resources) =
Fmt.pf ppf "[%a:%a](%d)" Miou.Domain.Uid.pp runner Miou.Promise.Uid.pp uid
resources
let launch ?give ~orphans fn k =
let prm = Miou.self () in
let prm' =
Miou.call_cc ~orphans ?give @@ fun () ->
let prm = Miou.self () in
Log.debug (fun m -> m "%a launched" pp_prm prm);
fn ~prm ()
in
Log.debug (fun m ->
m "%a (child) is attached to %a (parent)" Miou.Promise.pp prm' pp_prm
prm);
Effect.Deep.continue k ()
(* Protected runtime operations.
A note on design and the need to "protect" the appearance of a new task.
Miou constrains us on 2 points:
1) we are obliged to observe the result of a task ([Miou.await]), otherwise
Miou fails.
2) only the task creator can observe the result. For example, this code
doesn't work:
{[
let prm = Miou.call @@ fun () -> Miou.call (Fun.const ()) in
Miou.await_exn (Miou.await_exn prm)
(* the second promise (in [prm]) can only be awaited in [prm]. *)
]}
The first rule requires us to use [Miou.orphans], which allows tasks to be
stored in the background. We can then catch up with them, which is what the
[terminate] function does. The second rule is more complicated...
The runtime ([httpaf] or [h2]) may be able to stop a task but force the
user to give a "continuation" allowing the same task to be restarted at the
right time (when the user wants to do a [Body.write_string], for example).
This is the case when [`Yield] is received from the runtime. This means
that there are continuations (in the runtime) which can cause tasks to
appear and which should be saved in a [Miou.orphans].
The problem is that it's not the task that gives the continuation that will
launch the subtask... So the best protection is to execute runtime
functions (which may launch subtasks) with a specific [Miou.orphan].
Continuation, on the other hand, consists in raising an effect that will be
protected with a specific [Miou.orphans]. In other words, the subtask is
saved not in the task [Miou.orphans] that creates the continuation, but in
the [Miou.orphans]'s task that creates **effectfully** the subtask.
So, after the ceremony to start an HTTP request, we need to protect the
functions that interact with the Body as well, because they can create
tasks too...
{[
let orphans, body, prm = run_http_request flow in
Body.write_string body "Hello World";
(* a new task appears to write ["Hello World!"] which will be saved into
orphans. *)
Body.close body;
(* a new task appears to finish the HTTP request. *)
let result = await prm in
terminate orphans
]}
*)
let protect ?give ~orphans fn v =
let retc = Fun.id in
let exnc = raise in
let open Effect.Deep in
let effc : type c. c Effect.t -> ((c, 'b) continuation -> 'b) option =
function
| Spawn fn -> Some (launch ?give ~orphans fn)
| _ -> None
in
match_with fn v { retc; exnc; effc }
let next_read_operation ?give ~orphans =
protect ?give ~orphans Runtime.next_read_operation
let next_write_operation ?give ~orphans =
protect ?give ~orphans Runtime.next_write_operation
let read ?give ~orphans conn bstr ~off ~len =
protect ?give ~orphans (Runtime.read conn ~off ~len) bstr
let read_eof ?give ~orphans conn bstr ~off ~len =
protect ?give ~orphans (Runtime.read_eof conn ~off ~len) bstr
let report_exn ?give ~orphans ?(close = Fun.const ()) conn exn =
Log.err (fun m -> m "report an exception: %S" (Printexc.to_string exn));
protect ?give ~orphans (Runtime.report_exn conn) exn;
terminate orphans;
close ()
let report_write_result ?give ~orphans conn =
protect ?give ~orphans (Runtime.report_write_result conn)
let yield_reader ?give ~orphans conn =
protect ?give ~orphans (Runtime.yield_reader conn)
let yield_writer ?give ~orphans conn =
protect ?give ~orphans (Runtime.yield_writer conn)
let run conn ?(give = []) ?(disown = Fun.const ()) ~read_buffer_size flow =
let buffer = Buffer.create read_buffer_size in
let closed = ref false in
let close () =
if not !closed then (
Flow.close flow;
closed := true)
else disown flow
in
let rec reader ~prm () =
Log.debug (fun m -> m "%a starts the reading loop" pp_prm prm);
let rec go orphans () =
match next_read_operation ~orphans ~give conn with
| `Read -> (
Log.debug (fun m -> m "%a next read operation: `read" pp_prm prm);
let read_eof = read_eof ~orphans ~give in
let read = read ~orphans ~give in
match recv flow buffer with
| `Eof ->
Buffer.get buffer ~f:(fun bstr ~off ~len ->
read_eof conn bstr ~off ~len)
|> ignore;
go orphans ()
| `Ok _ ->
Buffer.get buffer ~f:(fun bstr ~off ~len ->
read conn bstr ~off ~len)
|> ignore;
go orphans ())
| `Yield ->
Log.debug (fun m -> m "%a next read operation: `yield" pp_prm prm);
let continuation () =
let prm = Miou.self () in
Log.debug (fun m -> m "%a launches a new task" pp_prm prm);
Effect.perform (Spawn reader)
in
yield_reader conn ~orphans ~give continuation;
disown flow;
terminate orphans
| `Close ->
Log.debug (fun m ->
m "%a read: disown the file-descriptor" pp_prm prm);
disown flow;
terminate orphans
in
let orphans = Miou.orphans () in
catch ~on:(report_exn conn ~orphans ~give ~close) @@ fun () ->
go orphans ()
in
let rec writer ~prm () =
Log.debug (fun m -> m "%a starts to the writing loop" pp_prm prm);
let rec go orphans () =
match next_write_operation ~orphans ~give conn with
| `Write iovecs ->
Log.debug (fun m -> m "%a next write operation: `write" pp_prm prm);
writev flow iovecs |> report_write_result conn ~orphans ~give;
go orphans ()
| `Yield ->
Log.debug (fun m -> m "%a next write operation: `yield" pp_prm prm);
let continuation () =
let prm = Miou.self () in
Log.debug (fun m -> m "%a launches a new task" pp_prm prm);
Effect.perform (Spawn writer)
in
yield_writer conn ~orphans ~give continuation;
disown flow;
terminate orphans
| `Close _ ->
Log.debug (fun m -> m "%a next write operation: `close" pp_prm prm);
Flow.shutdown flow `Send;
terminate orphans
in
let orphans = Miou.orphans () in
catch ~on:(report_exn conn ~orphans ~give ~close) @@ fun () ->
go orphans ()
in
let protect ~orphans = protect ~orphans ~give in
let prm =
Miou.call_cc ~give @@ fun () ->
let p0 = Miou.call_cc ~give @@ fun () -> reader ~prm:(Miou.self ()) () in
let p1 = Miou.call_cc ~give @@ fun () -> writer ~prm:(Miou.self ()) () in
let result =
match Miou.await_all [ p0; p1 ] with
| [ Ok (); Ok () ] -> Ok ()
| [ Error exn; _ ] | [ _; Error exn ] -> Error exn
| _ -> assert false
in
Log.debug (fun m -> m "close the file-descriptor");
(* TODO(dinosaure): we should probably check the state of the underlying
flow and see if it's closed or not. I suspect that on [http/1.1], it's
not the case but Miou does not complain because we [disown]
everywhere. *)
match result with
| Ok () -> disown flow
| Error exn ->
close ();
raise exn
in
Log.debug (fun m -> m "the main task is: %a" Miou.Promise.pp prm);
({ protect }, prm, close)
end

39
src/runtime.mli Normal file
View File

@ -0,0 +1,39 @@
module type RUNTIME = sig
type t
val next_read_operation : t -> [ `Read | `Yield | `Close ]
val read : t -> Bigstringaf.t -> off:int -> len:int -> int
val read_eof : t -> Bigstringaf.t -> off:int -> len:int -> int
val yield_reader : t -> (unit -> unit) -> unit
val next_write_operation :
t -> [ `Write of Bigstringaf.t Faraday.iovec list | `Close of int | `Yield ]
val report_write_result : t -> [ `Ok of int | `Closed ] -> unit
val yield_writer : t -> (unit -> unit) -> unit
val report_exn : t -> exn -> unit
end
type protect =
{ protect : 'a 'b. orphans:unit Miou.orphans -> ('a -> 'b) -> 'a -> 'b }
[@@unboxed]
exception Flow of string
module type S = sig
type conn
type flow
val run :
conn
-> ?give:Miou.Ownership.t list
-> ?disown:(flow -> unit)
-> read_buffer_size:int
-> flow
-> protect * unit Miou.t * (unit -> unit)
end
module Make (Flow : Flow.S) (Runtime : RUNTIME) :
S with type conn = Runtime.t and type flow = Flow.t
val terminate : unit Miou.orphans -> unit

View File

@ -19,14 +19,14 @@ module Make (Flow : Flow.S) = struct
type state = [ `Active of Tls.Engine.state | `End_of_input | `Error of error ]
type t = {
role : [ `Server | `Client ];
flow : Flow.t;
mutable state : state;
recv : bytes;
mutable linger : Cstruct.t option;
mutable writer_closed : bool;
}
type t =
{ role : [ `Server | `Client ]
; flow : Flow.t
; mutable state : state
; recv : bytes
; mutable linger : Cstruct.t option
; mutable writer_closed : bool
}
let write flow buf =
match Flow.writev flow.flow [ buf ] with
@ -132,13 +132,12 @@ module Make (Flow : Flow.S) = struct
in
let tls, init = Tls.Engine.client conf' in
let tls_flow =
{
role = `Client;
flow;
state = `Active tls;
linger = None;
recv = Bytes.create 0x1000;
writer_closed = false;
{ role = `Client
; flow
; state = `Active tls
; linger = None
; recv = Bytes.create 0x1000
; writer_closed = false
}
in
match write tls_flow init with

9
test/dune Normal file
View File

@ -0,0 +1,9 @@
(test
(name test_decode_uri)
(modules test_decode_uri)
(libraries httpcats alcotest))
(test
(name test_clear)
(modules test_clear)
(libraries logs.fmt fmt.tty logs.threaded httpcats alcotest))

74
test/test_clear.ml Normal file
View File

@ -0,0 +1,74 @@
let anchor = Unix.gettimeofday ()
let reporter ppf =
let report src level ~over k msgf =
let k _ =
over ();
k ()
in
let with_metadata header _tags k ppf fmt =
Format.kfprintf k ppf
("[%a]%a[%a][%a]: " ^^ fmt ^^ "\n%!")
Fmt.(styled `Cyan (fmt "%.04f"))
(Unix.gettimeofday () -. anchor)
Logs_fmt.pp_header (level, header)
Fmt.(styled `Blue int)
(Stdlib.Domain.self () :> int)
Fmt.(styled `Magenta string)
(Logs.Src.name src)
in
msgf @@ fun ?header ?tags fmt -> with_metadata header tags k ppf fmt
in
{ Logs.report }
let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()
let () = Logs.set_reporter (reporter Fmt.stderr)
let () = Logs.set_level ~all:true (Some Logs.Debug)
let () = Logs_threaded.enable ()
let () = Printexc.record_backtrace true
let server ?(port = 8080) handler =
let stop = Atomic.make false in
let prm =
Miou.call @@ fun () ->
let file_descr = Miou_unix.tcpv4 () in
Miou_unix.bind_and_listen file_descr
(Unix.ADDR_INET (Unix.inet_addr_loopback, port));
Httpcats.Server.clear ~stop ~handler file_descr;
Miou_unix.close file_descr
in
(stop, prm)
let test00 =
Alcotest.test_case "simple" `Quick @@ fun () ->
Miou_unix.run @@ fun () ->
let handler _request =
let open Httpaf in
let body = "Hello World!" in
let headers =
Headers.of_list
[ ("content-type", "text/plain")
; ("content-length", string_of_int (String.length body)) ]
in
let response = Response.create ~headers `OK in
Httpcats.Server.string (`V1 response) body
in
let stop, prm = server ~port:4000 handler in
match
Httpcats.request
~f:(fun _resp buf str ->
Buffer.add_string buf str;
buf)
~uri:"http://localhost:4000/" (Buffer.create 0x10)
with
| Ok (_response, buf) ->
Alcotest.(check string)
"Hello World!" (Buffer.contents buf) "Hello World!";
Atomic.set stop true;
Miou.await_exn prm
| Error err ->
Atomic.set stop true;
Miou.await_exn prm;
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
let () = Alcotest.run "client / server" [ ("simple", [ test00 ]) ]

28
test/test_decode_uri.ml Normal file
View File

@ -0,0 +1,28 @@
let pp_uri ppf (is_tls, scheme, user_pass, host, port, path) =
let pp_user_pass ppf (user, pass) =
Fmt.pf ppf "(@[<1>%S,@ @[%a@]@])" user Fmt.(Dump.option string) pass
in
Fmt.pf ppf "(@[<1>%b,@ %S,@ @[<hov>%a@],@ %S,@ @[<hov>%a@],@ %S@])" is_tls
scheme
Fmt.(Dump.option pp_user_pass)
user_pass host
Fmt.(Dump.option int)
port path
let uri = Alcotest.testable pp_uri ( = )
let msg = Alcotest.testable (fun ppf (`Msg msg) -> Fmt.string ppf msg) ( = )
let value = Alcotest.result uri msg
let test00 =
Alcotest.test_case "simple" `Quick @@ fun () ->
let uri = (false, "", None, "example.org", None, "/") in
Alcotest.(check value)
"example.org"
(Httpcats.decode_uri "example.org")
(Ok uri);
let uri = (false, "", None, "", Some 3000, "/") in
Alcotest.(check value) ":3000" (Httpcats.decode_uri ":3000") (Ok uri);
let uri = (false, "", None, "", None, "/foo") in
Alcotest.(check value) ":/foo" (Httpcats.decode_uri ":/foo") (Ok uri)
let () = Alcotest.run "decode_uri" [ ("simple", [ test00 ]) ]