Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions otherlibs/dune-rpc-lwt/src/dune_rpc_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ module V1 = struct
loop 0 Stack.Empty
;;

let write (_, o) = function
| None -> Lwt_io.close o
| Some csexps ->
Lwt_list.iter_s (fun sexp -> Lwt_io.write o (Csexp.to_string sexp)) csexps
let close (_, o) = Lwt_io.close o

let write (_, o) csexps =
Lwt_list.iter_s (fun sexp -> Lwt_io.write o (Csexp.to_string sexp)) csexps
;;
end)

Expand Down
42 changes: 23 additions & 19 deletions otherlibs/dune-rpc/private/dune_rpc_private.ml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ module Client = struct
(Chan : sig
type t

val write : t -> Sexp.t list option -> unit Fiber.t
val close : t -> unit Fiber.t
val write : t -> Sexp.t list -> unit Fiber.t
val read : t -> Sexp.t option Fiber.t
end) =
struct
Expand All @@ -149,7 +150,8 @@ module Client = struct
module Chan = struct
type t =
{ read : unit -> Sexp.t option Fiber.t
; write : Sexp.t list option -> unit Fiber.t
; write : Sexp.t list -> unit Fiber.t
; close : unit -> unit Fiber.t
; closed_read : bool
; mutable closed_write : bool
; disconnected : unit Fiber.Ivar.t
Expand All @@ -167,22 +169,25 @@ module Client = struct
in
{ read
; write = (fun s -> Chan.write c s)
; close = (fun () -> Chan.close c)
; closed_read = false
; closed_write = false
; disconnected
}
;;

let close t =
let* () = Fiber.return () in
if t.closed_write
then Fiber.return ()
else (
t.closed_write <- true;
t.close ())
;;

let write t s =
let* () = Fiber.return () in
match s with
| Some _ -> t.write s
| None ->
if t.closed_write
then Fiber.return ()
else (
t.closed_write <- true;
t.write None)
t.write s
;;

let read t =
Expand Down Expand Up @@ -253,7 +258,7 @@ module Client = struct
Some x)
in
Fiber.fork_and_join_unit
(fun () -> Chan.write t.chan None)
(fun () -> Chan.close t.chan)
(fun () ->
Fiber.parallel_iter ivars ~f:(fun status ->
match status with
Expand All @@ -271,9 +276,8 @@ module Client = struct
Code_error.raise message info)
;;

let send conn (packet : Packet.t list option) =
let sexps = Option.map packet ~f:(List.map ~f:(Conv.to_sexp Packet.sexp)) in
Chan.write conn.chan sexps
let send conn (packet : Packet.t list) =
List.map ~f:(Conv.to_sexp Packet.sexp) packet |> Chan.write conn.chan
;;

let create ~chan ~initialize ~handler ~on_preemptive_abort =
Expand Down Expand Up @@ -317,7 +321,7 @@ module Client = struct
match prepare_request' conn (id, req) with
| Error e -> Fiber.return (`Completed (Error e))
| Ok ivar ->
let* () = send conn (Some [ Request (id, req) ]) in
let* () = send conn [ Request (id, req) ] in
Fiber.Ivar.read ivar
;;

Expand Down Expand Up @@ -400,7 +404,7 @@ module Client = struct

let notification (type a) t (stg : a Versioned.notification) (n : a) =
let* () = Fiber.return () in
make_notification t stg n (fun call -> send t (Some [ Notification call ]))
make_notification t stg n (fun call -> send t [ Notification call ])
;;

let disconnected t = Fiber.Ivar.read t.chan.disconnected
Expand Down Expand Up @@ -539,7 +543,7 @@ module Client = struct
let* () = Fiber.return () in
let pending = List.rev t.pending in
t.pending <- [];
send t.client (Some pending)
send t.client pending
;;
end

Expand Down Expand Up @@ -573,7 +577,7 @@ module Client = struct
| Request (id, req) ->
let* handler = t.handler in
let* result = V.Handler.handle_request handler () (id, req) in
send t (Some [ Response (id, result) ])
send t [ Response (id, result) ]
| Response (id, response) ->
(match Table.find t.requests id with
| Some status ->
Expand Down Expand Up @@ -737,7 +741,7 @@ module Client = struct
in
client.handler_initialized <- true;
let* () = Fiber.Ivar.fill handler_var handler in
Fiber.finalize (fun () -> f client) ~finally:(fun () -> Chan.write chan None)
Fiber.finalize (fun () -> f client) ~finally:(fun () -> Chan.close chan)
in
Fiber.fork_and_join_unit (fun () -> read_packets client packets) run
;;
Expand Down
3 changes: 2 additions & 1 deletion otherlibs/dune-rpc/private/dune_rpc_private.mli
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,8 @@ module Client : sig
(Chan : sig
type t

val write : t -> Csexp.t list option -> unit Fiber.t
val close : t -> unit Fiber.t
val write : t -> Csexp.t list -> unit Fiber.t
val read : t -> Csexp.t option Fiber.t
end) : S with type 'a fiber := 'a Fiber.t and type chan := Chan.t
end
Expand Down
8 changes: 5 additions & 3 deletions otherlibs/dune-rpc/v1.mli
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,11 @@ module Client : sig
(Chan : sig
type t

(* [write t x] writes the s-expression when [x] is [Some _], and closes
the session if [x = None] *)
val write : t -> Csexp.t list option -> unit Fiber.t
(* [write t x] writes the s-expression*)
val write : t -> Csexp.t list -> unit Fiber.t

(* closes the session *)
val close : t -> unit Fiber.t

(* [read t] attempts to read from [t]. If an s-expression is read, it is
returned as [Some sexp], otherwise [None] is returned and the session
Expand Down
12 changes: 5 additions & 7 deletions src/dune_rpc_client/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ include
(struct
include Csexp_rpc.Session

let write t = function
| None -> close t
| Some packets ->
write t packets
>>| (function
| Ok () -> ()
| Error `Closed -> raise Dune_util.Report_error.Already_reported)
let write t packets =
write t packets
>>| function
| Ok () -> ()
| Error `Closed -> raise Dune_util.Report_error.Already_reported
;;
end)

Expand Down
5 changes: 1 addition & 4 deletions test/expect-tests/dune_rpc/dune_rpc_tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ module Drpc = struct
(struct
include Chan

let write t = function
| None -> close t
| Some packets -> write t packets >>| Result.ok_exn
;;
let write t packets = write t packets >>| Result.ok_exn
end)

module Server = Dune_rpc_server.Make (Chan)
Expand Down
Loading