Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lwt-ssl/conduit_lwt_ssl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,6 @@ module TCP = struct

let service =
service_with_ssl service ~file_descr:Protocol.file_descr protocol

include (val Conduit_lwt.repr protocol)
end
7 changes: 7 additions & 0 deletions src/lwt-ssl/conduit_lwt_ssl.mli
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,11 @@ module TCP : sig
context:Ssl.context ->
?verify:verify ->
(Lwt_unix.sockaddr, Protocol.flow) endpoint resolver

type t =
( (Lwt_unix.sockaddr, Conduit_lwt.TCP.Protocol.flow) endpoint,
Lwt_ssl.socket )
Conduit.value

type Conduit_lwt.flow += T of t
end
80 changes: 26 additions & 54 deletions src/lwt/conduit_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,21 @@ let io_of_flow flow =
let rec rrecv buf off len =
let raw = Cstruct.of_bigarray buf ~off ~len in
Lwt_mutex.with_lock mutex (fun () -> recv flow raw) >>= function
| Ok (`Input 0) -> Lwt_unix.yield () >>= fun () -> rrecv buf off len
| Ok (`Input 0) ->
if len = 0
then Lwt.return 0
else Lwt_unix.yield () >>= fun () -> rrecv buf off len
| Ok (`Input len) -> Lwt.return len
| Ok `End_of_flow -> Lwt.return 0
| Error err -> failwith "%a" pp_error err in
let ic = Lwt_io.make ~close:ic_close ~mode:Lwt_io.input rrecv in
let rec ssend buf off len =
let raw = Cstruct.of_bigarray buf ~off ~len in
Lwt_mutex.with_lock mutex (fun () -> send flow raw) >>= function
| Ok 0 -> Lwt_unix.yield () >>= fun () -> ssend buf off len
| Ok 0 ->
if len = 0
then Lwt.return 0
else Lwt_unix.yield () >>= fun () -> ssend buf off len
| Ok len -> Lwt.return len
| Error err -> failwith "%a" pp_error err in
let oc = Lwt_io.make ~close:oc_close ~mode:Lwt_io.output ssend in
Expand Down Expand Up @@ -114,29 +120,9 @@ module TCP = struct
socket : Lwt_unix.file_descr;
sockaddr : Lwt_unix.sockaddr;
linger : Bytes.t;
recv_first : bool;
mutable closed : bool;
}

(* XXX(dinosaure): [recv_first] is here to fit into [Lwt_io], from what we know,
* a tuple of [Lwt_io] [in_channel/out_channel] tries to receive first. However,
* such behavior is problematic for HTTP:
* - as a HTTP client, we should send first
* - as a HTTP server, we should recv first
* - with TLS layer [conduit-tls], both work - where
* the handshake can be done by send or recv
*
* For my perspective, [Lwt_io] is not the right way to abstract a [Conduit.flow]
* and we should directly use [Conduit.send]/[Conduit.recv] when we need to use
* them. Because [Lwt_io] tries to receive in any case, we must check (with [Lwt_unix.readable])
* if the socket can be read. In that case and if we want to [recv_first], we start
* to waiting something from our peer. In the other case, we returns [`Input 0]
* which gives an opportunity for the scheduler to send something (so, [send_first]).
*
* Such patch is really close to what LWT/[Lwt_io] does. A problem should be a diff
* on behaviors between [Conduit_lwt] and [mirage-tcpip] + [Conduit_mirage]. The best
* way to delete it is to deprecate [io_of_flow]. *)

let peer { sockaddr; _ } = sockaddr

let sock { socket; _ } = Lwt_unix.getsockname socket
Expand Down Expand Up @@ -184,14 +170,7 @@ module TCP = struct
let rec go () =
let process () =
Lwt_unix.connect socket sockaddr >>= fun () ->
Lwt.return_ok
{
socket;
sockaddr;
linger;
closed = false;
recv_first = Lwt_unix.readable socket;
} in
Lwt.return_ok { socket; sockaddr; linger; closed = false } in
Lwt.catch process @@ function
| Unix.(Unix_error ((EACCES | EPERM), _, _)) ->
Lwt.return_error `Operation_not_permitted
Expand Down Expand Up @@ -251,9 +230,9 @@ module TCP = struct
then `End_of_flow
else `Input (filled + len))) in
Lwt.catch (fun () ->
if (not (Lwt_unix.readable t.socket)) && not t.recv_first
then Lwt.return_ok (`Input 0)
else process 0 raw)
if Lwt_unix.readable t.socket
then process 0 raw
else Lwt.return_ok (`Input 0))
@@ function
| Unix.(Unix_error ((EAGAIN | EWOULDBLOCK), _, _)) -> recv t raw
| Unix.(Unix_error (EINTR, _, _)) -> recv t raw
Expand All @@ -273,19 +252,18 @@ module TCP = struct
if closed
then Lwt.return_error `Closed_by_peer
else
let max = Cstruct.len raw in
let len0 = min (Bytes.length t.linger) max in
Cstruct.blit_to_bytes raw 0 t.linger 0 len0 ;
let process () =
Lwt_unix.write socket t.linger 0 len0 >>= fun len1 ->
if len1 = len0
then
if max > len0
then send t (Cstruct.shift raw len0)
else Lwt.return_ok max
else Lwt.return_ok len1
(* worst case *) in
Lwt.catch process @@ function
let rec process pushed raw =
if Cstruct.len raw = 0
then Lwt.return_ok pushed
else
let max = Cstruct.len raw in
let len0 = min (Bytes.length t.linger) max in
Cstruct.blit_to_bytes raw 0 t.linger 0 len0 ;
Lwt_unix.write socket t.linger 0 len0 >>= fun len1 ->
if len1 = len0 && len0 = max
then Lwt.return_ok (pushed + len1)
else process (pushed + len1) (Cstruct.shift raw len1) in
Lwt.catch (fun () -> process 0 raw) @@ function
| Unix.(Unix_error ((EAGAIN | EWOULDBLOCK), _, _)) -> send t raw
| Unix.(Unix_error (EINTR, _, _)) -> send t raw
| Unix.(Unix_error (EACCES, _, _)) ->
Expand Down Expand Up @@ -413,14 +391,8 @@ module TCP = struct
let process () =
Lwt_unix.accept service >>= fun (socket, sockaddr) ->
let linger = Bytes.create 0x1000 in
Lwt.return_ok
{
Protocol.socket;
sockaddr;
linger;
closed = false;
recv_first = Lwt_unix.readable socket;
} in
Lwt.return_ok { Protocol.socket; sockaddr; linger; closed = false }
in
Lwt.catch process @@ function
| Unix.(Unix_error ((EAGAIN | EWOULDBLOCK), _, _)) -> accept service
| Unix.(Unix_error (EINTR, _, _)) -> accept service
Expand Down
10 changes: 10 additions & 0 deletions src/lwt/conduit_lwt.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ include

val io_of_flow :
flow -> Lwt_io.input Lwt_io.channel * Lwt_io.output Lwt_io.channel
(** [io_of_flow flow] creates an input flow and an output flow according
to [Lwt_io]. This function, even if it creates something more usable
is {b deprecated}. Indeed, [Lwt_io] has its own way to schedule [read]
and [write] - you should be aware about that more specially when you
use [Conduit_tls] or [Conduit_lwt_ssl].

Due to a specific behavior, [Lwt_io] does not fit with some specific
protocols - non thread-safe protocols, {i send-first} protocols, etc.
From these reasons, and even if {!TCP} try to the best to fit under
an [Lwt_io], you should not use this function. *)

type ('a, 'b, 'c) service = ('a, 'b, 'c) Service.service
(** The type for lwt services. *)
Expand Down
34 changes: 13 additions & 21 deletions src/tls/conduit_tls.ml
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,15 @@ struct
"Got EOF from underlying connection while \
handshake.") ;
return (Ok None)
| `Input 0 -> return (Ok (Some tls))
| `Input 0 ->
Log.debug (fun m ->
m "Underlying connection asks to re-schedule.") ;
return (Ok (Some tls))
| `Input len ->
let uid =
Hashtbl.hash
(Cstruct.to_string (Cstruct.sub raw0 0 len)) in
Log.debug (fun m ->
let uid =
Hashtbl.hash
(Cstruct.to_string (Cstruct.sub raw0 0 len)) in
m
"<~ [%04x] Got %d bytes (handshake in progress: \
true)."
Expand Down Expand Up @@ -225,15 +228,14 @@ struct
t.tls <- None ;
return (Ok `End_of_flow)
| `Input 0 ->
t.tls <- Some tls ;
Log.debug (fun m -> m "We must re-schedule, nothing to read.") ;
return (Ok (`Input 0))
| `Input len -> (
| `Input len ->
Log.debug (fun m -> m "<- Got %d byte(s)." len) ;
let handle raw =
if Tls.Engine.handshake_in_progress tls
then handle_handshake tls t.queue t.flow raw
else handle_tls tls t.queue t.flow raw in
let before = Tls.Engine.handshake_in_progress tls in
Log.debug (fun m ->
let uid =
Hashtbl.hash
Expand All @@ -242,18 +244,8 @@ struct
uid len
(Tls.Engine.handshake_in_progress tls)) ;
handle (Cstruct.sub t.raw 0 len) >>? fun tls ->
let after =
Option.fold ~none:false
~some:Tls.Engine.handshake_in_progress tls in
t.tls <- tls ;
match (tls, before, after) with
| Some _, false, false | Some _, true, false ->
return (Ok (`Input 0))
| Some _, false, true (* renegociate *)
| Some _, true, true (* continue handshake *)
| None, _, _ ->
Log.debug (fun m -> m "Retry to receive something.") ;
recv t raw)))
recv t raw))
| _ ->
let max = Cstruct.len raw in
let len = min (Ke.length t.queue) max in
Expand All @@ -262,7 +254,7 @@ struct
return (Ok (`Input len))

let rec send t raw =
Log.debug (fun m -> m "~> Start to send.") ;
Log.debug (fun m -> m "~> Start to send %d bytes." (Cstruct.len raw)) ;
match t.tls with
| None -> return (Error `Closed_by_peer)
| Some tls when Tls.Engine.can_handle_appdata tls -> (
Expand All @@ -276,11 +268,11 @@ struct
| Some tls -> (
Flow.recv t.flow t.raw >>| reword_error flow_error >>? function
| `End_of_flow ->
Log.warn (fun m -> m "[-] Underlying flow already closed.") ;
Log.debug (fun m -> m "[-] Underlying flow already closed.") ;
t.tls <- None ;
return (Error `Closed_by_peer)
| `Input 0 ->
t.tls <- Some tls ;
Log.debug (fun m -> m "[-] Underlying flow re-schedule.") ;
return (Ok 0)
| `Input len -> (
let res =
Expand Down
20 changes: 0 additions & 20 deletions tests/ping-pong/with_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,6 @@ let () = Printexc.record_backtrace true

let () = Ssl.init ()

let reporter ppf =
let report src level ~over k msgf =
let k _ =
over () ;
k () in
let with_metadata header _tags k ppf fmt =
Format.kfprintf k ppf
("%a[%a]: " ^^ fmt ^^ "\n%!")
Logs_fmt.pp_header (level, header)
Fmt.(styled `Magenta string)
(Logs.Src.name src) in
msgf @@ fun ?header ?tags fmt -> with_metadata header tags k ppf fmt in
{ Logs.report }

let () = Fmt_tty.setup_std_outputs ~style_renderer:`Ansi_tty ~utf_8:true ()

let () = Logs.set_reporter (reporter Fmt.stderr)

let () = Logs.set_level ~all:true (Some Logs.Debug)

let failwith fmt = Fmt.kstrf (fun err -> Lwt.fail (Failure err)) fmt

module Lwt = struct
Expand Down