Add Httpcats.Server.get to get the body of a client request
This commit is contained in:
parent
2178799c0e
commit
4edd9850b7
@ -1,4 +1,4 @@
|
||||
version=0.25.1
|
||||
version=0.26.1
|
||||
exp-grouping=preserve
|
||||
break-collection-expressions=wrap
|
||||
break-separators=before
|
||||
|
||||
@ -21,6 +21,7 @@ depends: [
|
||||
"happy-eyeballs"
|
||||
"progress"
|
||||
"alcotest" {with-test}
|
||||
"digestif" {with-test}
|
||||
]
|
||||
conflicts: [ "result" {< "1.5"} ]
|
||||
build: [
|
||||
|
||||
@ -63,15 +63,19 @@ let request_from_httpaf ~scheme { Httpaf.Request.meth; target; headers; _ } =
|
||||
let request_from_h2 { H2.Request.meth; target; scheme; headers } =
|
||||
{ meth; target; scheme; headers }
|
||||
|
||||
type stream =
|
||||
type output =
|
||||
{ write_string : ?off:int -> ?len:int -> string -> unit
|
||||
; write_bigstring : ?off:int -> ?len:int -> Bigstringaf.t -> unit
|
||||
; close : unit -> unit
|
||||
}
|
||||
|
||||
type on_read = Bigstringaf.t -> off:int -> len:int -> unit
|
||||
type on_eof = unit -> unit
|
||||
type input = { schedule : on_eof:on_eof -> on_read:on_read -> unit } [@@unboxed]
|
||||
type _ Effect.t += String : response * string -> unit Effect.t
|
||||
type _ Effect.t += Bigstring : response * Bigstringaf.t -> unit Effect.t
|
||||
type _ Effect.t += Stream : response -> stream Effect.t
|
||||
type _ Effect.t += Stream : response -> output Effect.t
|
||||
type _ Effect.t += Get : input Effect.t
|
||||
|
||||
let string ~status ?(headers = Headers.empty) str =
|
||||
let response = { status; headers } in
|
||||
@ -85,8 +89,10 @@ let stream ?(headers = Headers.empty) status =
|
||||
let response = { status; headers } in
|
||||
Effect.perform (Stream response)
|
||||
|
||||
let get () = Effect.perform Get
|
||||
|
||||
type error_handler =
|
||||
?request:request -> error -> (H2.Headers.t -> stream) -> unit
|
||||
?request:request -> error -> (H2.Headers.t -> output) -> unit
|
||||
|
||||
type handler = request -> unit
|
||||
|
||||
@ -112,18 +118,31 @@ let rec basic_handler ~exnc =
|
||||
let httpaf_handler ~sockaddr ~scheme ~protect:{ Runtime.protect } ~orphans
|
||||
~handler reqd =
|
||||
let open Httpaf in
|
||||
let open Effect.Shallow in
|
||||
let retc = Fun.id in
|
||||
let exnc = protect ~orphans (Reqd.report_exn reqd) in
|
||||
let effc :
|
||||
type c. c Effect.t -> ((c, 'a) Effect.Shallow.continuation -> 'b) option =
|
||||
let open Effect.Deep in
|
||||
let rec retc = Fun.id
|
||||
and exnc = protect ~orphans (Reqd.report_exn reqd)
|
||||
and effc :
|
||||
type c. c Effect.t -> ((c, 'a) Effect.Deep.continuation -> 'b) option =
|
||||
function
|
||||
| Get ->
|
||||
let body = Httpaf.Reqd.request_body reqd in
|
||||
let schedule ~on_eof ~on_read =
|
||||
let on_eof () =
|
||||
match_with on_eof () { retc; exnc; effc };
|
||||
protect ~orphans Httpaf.Body.close_reader body;
|
||||
Runtime.terminate orphans (* XXX(dinosaure): really important! *)
|
||||
in
|
||||
protect ~orphans
|
||||
(match_with (Httpaf.Body.schedule_read ~on_eof ~on_read) body)
|
||||
{ retc; exnc; effc }
|
||||
in
|
||||
Log.debug (fun m -> m "return a stream of the body request");
|
||||
Some (fun k -> continue k { schedule })
|
||||
| String (response, str) ->
|
||||
let response = response_to_httpaf response in
|
||||
Log.debug (fun m -> m "write a http/1.1 response and its body");
|
||||
protect ~orphans (Reqd.respond_with_string reqd response) str;
|
||||
let handler = basic_handler ~exnc in
|
||||
Some (fun k -> continue_with k () handler)
|
||||
Some (fun k -> continue k ())
|
||||
| Stream response ->
|
||||
let response = response_to_httpaf response in
|
||||
let body =
|
||||
@ -137,8 +156,7 @@ let httpaf_handler ~sockaddr ~scheme ~protect:{ Runtime.protect } ~orphans
|
||||
in
|
||||
let close () = protect ~orphans Body.close_writer body in
|
||||
let stream = { write_string; write_bigstring; close } in
|
||||
let handler = basic_handler ~exnc in
|
||||
Some (fun k -> continue_with k stream handler)
|
||||
Some (fun k -> continue k stream)
|
||||
| _ -> None
|
||||
in
|
||||
let fn request =
|
||||
@ -147,7 +165,7 @@ let httpaf_handler ~sockaddr ~scheme ~protect:{ Runtime.protect } ~orphans
|
||||
Runtime.terminate orphans;
|
||||
Log.debug (fun m -> m "the handler for %a has ended" pp_sockaddr sockaddr)
|
||||
in
|
||||
continue_with (fiber fn) (Reqd.request reqd) { retc; exnc; effc }
|
||||
match_with fn (Reqd.request reqd) { retc; exnc; effc }
|
||||
|
||||
let h2_handler ~sockaddr ~protect:{ Runtime.protect } ~orphans ~handler reqd =
|
||||
let open H2 in
|
||||
@ -307,6 +325,7 @@ let https_1_1_server_connection ~config ~sockaddr ~user's_error_handler ~handler
|
||||
Log.debug (fun m -> m "the http/1.1 server connection is launched");
|
||||
let _result = Miou.await prm in
|
||||
Runtime.terminate orphans;
|
||||
Log.debug (fun m -> m "the http/1.1 server connection is ended");
|
||||
close ()
|
||||
|
||||
let h2s_server_connection ~config ~sockaddr ~user's_error_handler ~handler
|
||||
|
||||
@ -9,18 +9,23 @@ module Status = H2.Status
|
||||
type request =
|
||||
{ meth : Method.t; target : string; scheme : string; headers : Headers.t }
|
||||
|
||||
type stream =
|
||||
type output =
|
||||
{ write_string : ?off:int -> ?len:int -> string -> unit
|
||||
; write_bigstring : ?off:int -> ?len:int -> Bigstringaf.t -> unit
|
||||
; close : unit -> unit
|
||||
}
|
||||
|
||||
type on_read = Bigstringaf.t -> off:int -> len:int -> unit
|
||||
type on_eof = unit -> unit
|
||||
type input = { schedule : on_eof:on_eof -> on_read:on_read -> unit } [@@unboxed]
|
||||
|
||||
val string : status:Status.t -> ?headers:Headers.t -> string -> unit
|
||||
val bigstring : status:Status.t -> ?headers:Headers.t -> Bigstringaf.t -> unit
|
||||
val stream : ?headers:Headers.t -> Status.t -> stream
|
||||
val stream : ?headers:Headers.t -> Status.t -> output
|
||||
val get : unit -> input
|
||||
|
||||
type error_handler =
|
||||
?request:request -> error -> (H2.Headers.t -> stream) -> unit
|
||||
?request:request -> error -> (H2.Headers.t -> output) -> unit
|
||||
|
||||
type handler = request -> unit
|
||||
|
||||
|
||||
@ -371,6 +371,9 @@ module Make (Flow : Flow.S) (Runtime : RUNTIME) :
|
||||
match result with
|
||||
| Ok () -> disown flow
|
||||
| Error exn ->
|
||||
Log.err (fun m ->
|
||||
m "got an error at the end of our waiters: %S"
|
||||
(Printexc.to_string exn));
|
||||
close ();
|
||||
raise exn
|
||||
in
|
||||
|
||||
@ -6,4 +6,4 @@
|
||||
(test
|
||||
(name test_clear)
|
||||
(modules test_clear)
|
||||
(libraries logs.fmt fmt.tty logs.threaded httpcats alcotest))
|
||||
(libraries logs.fmt fmt.tty logs.threaded digestif httpcats alcotest))
|
||||
|
||||
@ -22,8 +22,7 @@ let reporter ppf =
|
||||
{ Logs.report }
|
||||
|
||||
let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()
|
||||
|
||||
(* let () = Logs.set_reporter (reporter Fmt.stderr) *)
|
||||
let () = Logs.set_reporter (reporter Fmt.stderr)
|
||||
let () = Logs.set_level ~all:true (Some Logs.Debug)
|
||||
let () = Logs_threaded.enable ()
|
||||
let () = Printexc.record_backtrace true
|
||||
@ -128,7 +127,66 @@ let test01 =
|
||||
Happy.kill daemon;
|
||||
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
|
||||
|
||||
let to_string { Httpcats.Server.schedule } k =
|
||||
let buf = Buffer.create 0x1000 in
|
||||
let rec on_eof () = k (Buffer.contents buf)
|
||||
and on_read bstr ~off ~len =
|
||||
let str = Bigstringaf.substring ~off ~len bstr in
|
||||
Buffer.add_string buf str;
|
||||
schedule ~on_eof ~on_read
|
||||
in
|
||||
schedule ~on_eof ~on_read
|
||||
|
||||
let sha1 = Alcotest.testable Digestif.SHA1.pp Digestif.SHA1.equal
|
||||
|
||||
let random_string ~len =
|
||||
let res = Bytes.create len in
|
||||
for i = 0 to len - 1 do
|
||||
Bytes.set res i (Char.unsafe_chr (Random.bits () land 0xff))
|
||||
done;
|
||||
Bytes.unsafe_to_string res
|
||||
|
||||
let test02 =
|
||||
Alcotest.test_case "post" `Quick @@ fun () ->
|
||||
Miou_unix.run @@ fun () ->
|
||||
let handler _request =
|
||||
let open Httpcats.Server in
|
||||
let stream = get () in
|
||||
to_string stream @@ fun body ->
|
||||
let hash = Digestif.SHA1.digest_string body in
|
||||
let hash = Digestif.SHA1.to_hex hash in
|
||||
let headers =
|
||||
Headers.of_list
|
||||
[ ("content-type", "text/plain")
|
||||
; ("content-length", string_of_int (String.length hash)) ]
|
||||
in
|
||||
Httpcats.Server.string ~headers ~status:`OK hash
|
||||
in
|
||||
let stop, prm = server ~port:4000 handler in
|
||||
let daemon, resolver = Happy.stack () in
|
||||
let body = random_string ~len:0x1000000 in
|
||||
match
|
||||
Httpcats.request ~resolver ~meth:`POST ~body
|
||||
~f:(fun _resp buf str ->
|
||||
Buffer.add_string buf str;
|
||||
buf)
|
||||
~uri:"http://127.0.0.1:4000" (Buffer.create 0x1000)
|
||||
with
|
||||
| Ok (_response, buf) ->
|
||||
let hash' = Digestif.SHA1.of_hex (Buffer.contents buf) in
|
||||
let hash = Digestif.SHA1.digest_string body in
|
||||
Alcotest.(check sha1) "sha1" hash hash';
|
||||
Miou_unix.Cond.signal stop;
|
||||
Miou.await_exn prm;
|
||||
Happy.kill daemon
|
||||
| Error err ->
|
||||
Miou_unix.Cond.signal stop;
|
||||
Miou.await_exn prm;
|
||||
Happy.kill daemon;
|
||||
Alcotest.failf "Got an error: %a" Httpcats.pp_error err
|
||||
|
||||
let () =
|
||||
let stdout = Alcotest_engine.Global.make_stdout () in
|
||||
let stderr = Alcotest_engine.Global.make_stderr () in
|
||||
Alcotest.run ~stdout ~stderr "network" [ ("simple", [ test00; test01 ]) ]
|
||||
Alcotest.run ~stdout ~stderr "network"
|
||||
[ ("simple", [ test00; test01; test02 ]) ]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user