Continue to improve httpcats and its server impl.
This commit is contained in:
parent
bc30f06dd3
commit
e2ba31f00d
@ -30,7 +30,8 @@ build: [
|
||||
|
||||
synopsis: "A simple HTTP client using http/af, h2, and miou"
|
||||
pin-depends: [
|
||||
[ "miou.dev" "git+https://github.com/robur-coop/miou.git#dd670fa016da94a7c21a85a493e269fb39efa5ba" ]
|
||||
[ "mirage-crypto.0.11.2" "git+https://github.com/dinosaure/mirage-crypto.git#d0bbca9f30f85fe8a32a49ef06ea56398b84aefd" ]
|
||||
[ "mirage-crypto-rng.0.11.2" "git+https://github.com/dinosaure/mirage-crypto.git#d0bbca9f30f85fe8a32a49ef06ea56398b84aefd" ]
|
||||
[ "miou.dev" "git+https://github.com/robur-coop/miou.git#0a627240e7c4ddae00bb3b3cb8bacdd855a273d3" ]
|
||||
[ "mirage-crypto.0.11.2" "git+https://github.com/dinosaure/mirage-crypto.git#13bd9191f42cfcad84a808a79d97788f65af90e9" ]
|
||||
[ "mirage-crypto-rng.0.11.2" "git+https://github.com/dinosaure/mirage-crypto.git#13bd9191f42cfcad84a808a79d97788f65af90e9" ]
|
||||
[ "alcotest.1.7.0" "git+https://github.com/dinosaure/alcotest.git#f690bdeb7b1c3eb5c9016dc7097c400dd42e492b" ]
|
||||
]
|
||||
|
||||
@ -53,13 +53,16 @@ let response_to_httpaf response =
|
||||
in
|
||||
Httpaf.Response.create ~headers status
|
||||
|
||||
let _response_to_h2 response =
|
||||
let response_to_h2 response =
|
||||
H2.Response.create ~headers:response.headers response.status
|
||||
|
||||
let request_from_httpaf ~scheme { Httpaf.Request.meth; target; headers; _ } =
|
||||
let headers = Headers.of_list (Httpaf.Headers.to_list headers) in
|
||||
{ meth; target; scheme; headers }
|
||||
|
||||
let request_from_h2 { H2.Request.meth; target; scheme; headers } =
|
||||
{ meth; target; scheme; headers }
|
||||
|
||||
type stream =
|
||||
{ write_string : ?off:int -> ?len:int -> string -> unit
|
||||
; write_bigstring : ?off:int -> ?len:int -> Bigstringaf.t -> unit
|
||||
@ -146,6 +149,45 @@ let httpaf_handler ~sockaddr ~scheme ~protect:{ Runtime.protect } ~orphans
|
||||
in
|
||||
continue_with (fiber fn) (Reqd.request reqd) { retc; exnc; effc }
|
||||
|
||||
let h2_handler ~sockaddr ~protect:{ Runtime.protect } ~orphans ~handler reqd =
|
||||
let open H2 in
|
||||
let open Effect.Shallow in
|
||||
let retc = Fun.id in
|
||||
let exnc = protect ~orphans (Reqd.report_exn reqd) in
|
||||
let effc :
|
||||
type c. c Effect.t -> ((c, 'a) Effect.Shallow.continuation -> 'b) option =
|
||||
function
|
||||
| String (response, str) ->
|
||||
let response = response_to_h2 response in
|
||||
Log.debug (fun m -> m "write a h2 response and its body");
|
||||
protect ~orphans (Reqd.respond_with_string reqd response) str;
|
||||
let handler = basic_handler ~exnc in
|
||||
Some (fun k -> continue_with k () handler)
|
||||
| Stream response ->
|
||||
let response = response_to_h2 response in
|
||||
let body =
|
||||
protect ~orphans (Reqd.respond_with_streaming reqd) response
|
||||
in
|
||||
let write_string ?off ?len str =
|
||||
protect ~orphans (Body.Writer.write_string body ?off ?len) str
|
||||
in
|
||||
let write_bigstring ?off ?len bstr =
|
||||
protect ~orphans (Body.Writer.write_bigstring body ?off ?len) bstr
|
||||
in
|
||||
let close () = protect ~orphans Body.Writer.close body in
|
||||
let stream = { write_string; write_bigstring; close } in
|
||||
let handler = basic_handler ~exnc in
|
||||
Some (fun k -> continue_with k stream handler)
|
||||
| _ -> None
|
||||
in
|
||||
let fn request =
|
||||
let request = request_from_h2 request in
|
||||
handler request;
|
||||
Runtime.terminate orphans;
|
||||
Log.debug (fun m -> m "the handler for %a has ended" pp_sockaddr sockaddr)
|
||||
in
|
||||
continue_with (fiber fn) (Reqd.request reqd) { retc; exnc; effc }
|
||||
|
||||
let rec clean orphans =
|
||||
match Miou.care orphans with
|
||||
| Some (Some prm) ->
|
||||
@ -170,69 +212,189 @@ let accept_or_stop ?stop file_descr =
|
||||
| Ok value -> value
|
||||
| Error exn -> raise exn)
|
||||
|
||||
let http_1_1_server_connection ~config ~sockaddr ~user's_error_handler ~handler
|
||||
file_descr =
|
||||
let scheme = "http" in
|
||||
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
|
||||
let give = [ Miou_unix.owner file_descr ] in
|
||||
let orphans = Miou.orphans () in
|
||||
let rec error_handler ?request err respond =
|
||||
let { Runtime.protect }, _, _ = Lazy.force process in
|
||||
let request = Option.map (request_from_httpaf ~scheme) request in
|
||||
let respond hdrs =
|
||||
let open Httpaf in
|
||||
let hdrs = Httpaf.Headers.of_list (H2.Headers.to_list hdrs) in
|
||||
let body = protect ~orphans respond hdrs in
|
||||
let write_string ?off ?len str =
|
||||
protect ~orphans (Body.write_string body ?off ?len) str
|
||||
in
|
||||
let write_bigstring ?off ?len bstr =
|
||||
protect ~orphans (Body.write_bigstring body ?off ?len) bstr
|
||||
in
|
||||
let close () = protect ~orphans Body.close_writer body in
|
||||
{ write_string; write_bigstring; close }
|
||||
in
|
||||
match err with
|
||||
| `Exn (Runtime.Flow msg) ->
|
||||
user's_error_handler ?request (`Protocol msg :> error) respond
|
||||
| err -> user's_error_handler ?request (`V1 err) respond
|
||||
and request_handler reqd =
|
||||
let protect, _, _ = Lazy.force process in
|
||||
httpaf_handler ~sockaddr ~scheme ~protect ~orphans ~handler reqd
|
||||
and conn =
|
||||
lazy
|
||||
(Httpaf.Server_connection.create ~config ~error_handler request_handler)
|
||||
and process =
|
||||
lazy
|
||||
(B.run (Lazy.force conn) ~give ~disown:Miou_unix.disown ~read_buffer_size
|
||||
file_descr)
|
||||
in
|
||||
let _, prm, close = Lazy.force process in
|
||||
Log.debug (fun m -> m "the http/1.1 server connection is launched");
|
||||
let _result = Miou.await prm in
|
||||
Runtime.terminate orphans;
|
||||
(* TODO(dinosaure): are you sure? [httpaf_handler] already did it. *)
|
||||
close ()
|
||||
|
||||
let https_1_1_server_connection ~config ~sockaddr ~user's_error_handler ~handler
|
||||
file_descr =
|
||||
let scheme = "https" in
|
||||
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
|
||||
let give = [ Miou_unix.owner file_descr.TLS.flow ] in
|
||||
let disown flow = Miou_unix.disown flow.TLS.flow in
|
||||
let orphans = Miou.orphans () in
|
||||
let rec error_handler ?request err respond =
|
||||
let { Runtime.protect }, _, _ = Lazy.force process in
|
||||
let request = Option.map (request_from_httpaf ~scheme) request in
|
||||
let respond hdrs =
|
||||
let open Httpaf in
|
||||
let hdrs = Httpaf.Headers.of_list (H2.Headers.to_list hdrs) in
|
||||
let body = protect ~orphans respond hdrs in
|
||||
let write_string ?off ?len str =
|
||||
protect ~orphans (Body.write_string body ?off ?len) str
|
||||
in
|
||||
let write_bigstring ?off ?len bstr =
|
||||
protect ~orphans (Body.write_bigstring body ?off ?len) bstr
|
||||
in
|
||||
let close () = protect ~orphans Body.close_writer body in
|
||||
{ write_string; write_bigstring; close }
|
||||
in
|
||||
match err with
|
||||
| `Exn (Runtime.Flow msg) ->
|
||||
user's_error_handler ?request (`Protocol msg :> error) respond
|
||||
| err -> user's_error_handler ?request (`V1 err) respond
|
||||
and request_handler reqd =
|
||||
let protect, _, _ = Lazy.force process in
|
||||
httpaf_handler ~sockaddr ~scheme ~protect ~orphans ~handler reqd
|
||||
and conn =
|
||||
lazy
|
||||
(Httpaf.Server_connection.create ~config ~error_handler request_handler)
|
||||
and process =
|
||||
lazy (A.run (Lazy.force conn) ~give ~disown ~read_buffer_size file_descr)
|
||||
in
|
||||
let _, prm, close = Lazy.force process in
|
||||
Log.debug (fun m -> m "the http/1.1 server connection is launched");
|
||||
let _result = Miou.await prm in
|
||||
Runtime.terminate orphans;
|
||||
(* TODO(dinosaure): are you sure? [httpaf_handler] already did it. *)
|
||||
close ()
|
||||
|
||||
let h2s_server_connection ~config ~sockaddr ~user's_error_handler ~handler
|
||||
file_descr =
|
||||
let read_buffer_size = config.H2.Config.read_buffer_size in
|
||||
let give = [ Miou_unix.owner file_descr.TLS.flow ] in
|
||||
let disown flow = Miou_unix.disown flow.TLS.flow in
|
||||
let orphans = Miou.orphans () in
|
||||
let rec error_handler ?request err respond =
|
||||
let { Runtime.protect }, _, _ = Lazy.force process in
|
||||
let request = Option.map request_from_h2 request in
|
||||
let respond hdrs =
|
||||
let open H2 in
|
||||
let body = protect ~orphans respond hdrs in
|
||||
let write_string ?off ?len str =
|
||||
protect ~orphans (Body.Writer.write_string body ?off ?len) str
|
||||
in
|
||||
let write_bigstring ?off ?len bstr =
|
||||
protect ~orphans (Body.Writer.write_bigstring body ?off ?len) bstr
|
||||
in
|
||||
let close () = protect ~orphans Body.Writer.close body in
|
||||
{ write_string; write_bigstring; close }
|
||||
in
|
||||
match err with
|
||||
| `Exn (Runtime.Flow msg) ->
|
||||
user's_error_handler ?request (`Protocol msg :> error) respond
|
||||
| err -> user's_error_handler ?request (`V2 err) respond
|
||||
and request_handler reqd =
|
||||
let protect, _, _ = Lazy.force process in
|
||||
h2_handler ~sockaddr ~protect ~orphans ~handler reqd
|
||||
and conn =
|
||||
lazy (H2.Server_connection.create ~config ~error_handler request_handler)
|
||||
and process =
|
||||
lazy (C.run (Lazy.force conn) ~give ~disown ~read_buffer_size file_descr)
|
||||
in
|
||||
let _, prm, close = Lazy.force process in
|
||||
Log.debug (fun m -> m "the http/1.1 server connection is launched");
|
||||
let _result = Miou.await prm in
|
||||
Runtime.terminate orphans;
|
||||
(* TODO(dinosaure): are you sure? [httpaf_handler] already did it. *)
|
||||
close ()
|
||||
|
||||
let clear ?stop ?(config = Httpaf.Config.default)
|
||||
?error_handler:(user's_error_handler = default_error_handler) ~handler
|
||||
file_descr =
|
||||
let read_buffer_size = config.Httpaf.Config.read_buffer_size in
|
||||
let rec go orphans file_descr =
|
||||
Log.debug (fun m ->
|
||||
m "waiting for a new connection or a stop signal from the user");
|
||||
match accept_or_stop ?stop file_descr with
|
||||
| `Stop ->
|
||||
Log.debug (fun m -> m "terminate the clear http server");
|
||||
Runtime.terminate orphans
|
||||
| `Stop -> Runtime.terminate orphans
|
||||
| `Accept (file_descr', sockaddr) ->
|
||||
Log.debug (fun m -> m "receive a new client: %a" pp_sockaddr sockaddr);
|
||||
clean orphans;
|
||||
let give = [ Miou_unix.owner file_descr' ] in
|
||||
let _ =
|
||||
Miou.call ~orphans ~give @@ fun () ->
|
||||
let orphans = Miou.orphans () in
|
||||
let rec error_handler ?request err respond =
|
||||
let { Runtime.protect }, _, _ = Lazy.force process in
|
||||
let request =
|
||||
Option.map (request_from_httpaf ~scheme:"http") request
|
||||
in
|
||||
let respond hdrs =
|
||||
let open Httpaf in
|
||||
let hdrs = Httpaf.Headers.of_list (H2.Headers.to_list hdrs) in
|
||||
let body = protect ~orphans respond hdrs in
|
||||
let write_string ?off ?len str =
|
||||
protect ~orphans (Body.write_string body ?off ?len) str
|
||||
in
|
||||
let write_bigstring ?off ?len bstr =
|
||||
protect ~orphans (Body.write_bigstring body ?off ?len) bstr
|
||||
in
|
||||
let close () = protect ~orphans Body.close_writer body in
|
||||
{ write_string; write_bigstring; close }
|
||||
in
|
||||
match err with
|
||||
| `Exn (Runtime.Flow msg) ->
|
||||
user's_error_handler ?request (`Protocol msg :> error) respond
|
||||
| err -> user's_error_handler ?request (`V1 err) respond
|
||||
and request_handler reqd =
|
||||
let protect, _, _ = Lazy.force process in
|
||||
httpaf_handler ~sockaddr ~scheme:"http" ~protect ~orphans ~handler
|
||||
reqd
|
||||
and conn =
|
||||
lazy
|
||||
(Httpaf.Server_connection.create ~config ~error_handler
|
||||
request_handler)
|
||||
and process =
|
||||
lazy
|
||||
(B.run (Lazy.force conn) ~give ~disown:Miou_unix.disown
|
||||
~read_buffer_size file_descr')
|
||||
in
|
||||
let _, prm, close = Lazy.force process in
|
||||
Log.debug (fun m -> m "the http/1.1 server connection is launched");
|
||||
let _result = Miou.await prm in
|
||||
Log.debug (fun m ->
|
||||
m "clean everything for the client %a" pp_sockaddr sockaddr);
|
||||
Runtime.terminate orphans;
|
||||
(* TODO(dinosaure): are you sure? [httpaf_handler] already did it. *)
|
||||
close ();
|
||||
Log.debug (fun m ->
|
||||
m "the process for %a is cleaned" pp_sockaddr sockaddr)
|
||||
http_1_1_server_connection ~config ~sockaddr ~user's_error_handler
|
||||
~handler file_descr'
|
||||
in
|
||||
Miou_unix.disown file_descr';
|
||||
go orphans file_descr
|
||||
in
|
||||
go (Miou.orphans ()) file_descr
|
||||
|
||||
let with_tls ?stop ?(config = `Both (Httpaf.Config.default, H2.Config.default))
|
||||
?error_handler:(user's_error_handler = default_error_handler) tls_config
|
||||
~handler file_descr =
|
||||
let rec go orphans file_descr =
|
||||
match accept_or_stop ?stop file_descr with
|
||||
| `Stop -> Runtime.terminate orphans
|
||||
| `Accept (file_descr', sockaddr) ->
|
||||
clean orphans;
|
||||
let give = [ Miou_unix.owner file_descr' ] in
|
||||
let _ =
|
||||
Miou.call ~orphans ~give @@ fun () ->
|
||||
match TLS.server_of_flow tls_config file_descr' with
|
||||
| Error err ->
|
||||
Log.err (fun m ->
|
||||
m "got a TLS error during the handshake: %a" TLS.pp_error err);
|
||||
Miou_unix.close file_descr'
|
||||
| Ok tls_flow -> begin
|
||||
match (config, epoch tls_flow) with
|
||||
| `Both (_, h2), Some { Tls.Core.alpn_protocol = Some "h2"; _ }
|
||||
| ( `H2 h2
|
||||
, (Some { Tls.Core.alpn_protocol = Some "h2" | None; _ } | None)
|
||||
) ->
|
||||
h2s_server_connection ~config:h2 ~sockaddr
|
||||
~user's_error_handler ~handler tls_flow
|
||||
| ( `Both (httpaf, _)
|
||||
, Some { Tls.Core.alpn_protocol = Some "http/1.1"; _ } )
|
||||
| ( `HTTP_1_1 httpaf
|
||||
, ( Some { Tls.Core.alpn_protocol = Some "http/1.1" | None; _ }
|
||||
| None ) ) ->
|
||||
https_1_1_server_connection ~config:httpaf ~sockaddr
|
||||
~user's_error_handler ~handler tls_flow
|
||||
| `Both _, (Some { Tls.Core.alpn_protocol = None; _ } | None) ->
|
||||
assert false
|
||||
| _, Some { Tls.Core.alpn_protocol = Some _protocol; _ } ->
|
||||
assert false
|
||||
end
|
||||
in
|
||||
Miou_unix.disown file_descr';
|
||||
go orphans file_descr
|
||||
|
||||
@ -31,3 +31,15 @@ val clear :
|
||||
-> handler:handler
|
||||
-> Miou_unix.file_descr
|
||||
-> unit
|
||||
|
||||
val with_tls :
|
||||
?stop:bool Atomic.t
|
||||
-> ?config:
|
||||
[ `H2 of H2.Config.t
|
||||
| `HTTP_1_1 of Httpaf.Config.t
|
||||
| `Both of Httpaf.Config.t * H2.Config.t ]
|
||||
-> ?error_handler:error_handler
|
||||
-> Tls.Config.server
|
||||
-> handler:handler
|
||||
-> Miou_unix.file_descr
|
||||
-> unit
|
||||
|
||||
@ -146,6 +146,18 @@ module Make (Flow : Flow.S) = struct
|
||||
drain_handshake tls_flow
|
||||
| Error err -> Error (`Write err)
|
||||
|
||||
let server_of_flow config flow =
|
||||
let tls_flow =
|
||||
{ role = `Server
|
||||
; flow
|
||||
; state = `Active (Tls.Engine.server config)
|
||||
; linger = None
|
||||
; recv = Bytes.create 0x1000
|
||||
; writer_closed = false
|
||||
}
|
||||
in
|
||||
drain_handshake tls_flow
|
||||
|
||||
let writev flow bufs =
|
||||
if flow.writer_closed then Error `Closed
|
||||
else
|
||||
|
||||
@ -121,4 +121,8 @@ let test01 =
|
||||
Miou.await_exn prm;
|
||||
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
|
||||
|
||||
let () = Alcotest.run "network" [ ("simple", [ test00; test01 ]) ]
|
||||
let () =
|
||||
let stdout = Alcotest_engine.Global.make_stdout () in
|
||||
let stderr = Alcotest_engine.Global.make_stderr () in
|
||||
Alcotest.run ~stdout ~stderr "network"
|
||||
[ ("simple", [ test00; test01 ]) ]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user