Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 31 additions & 29 deletions lib/build_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,47 @@ let catch_cancel fn =
)

let tail ?switch t dst =
let rec readonly_tail ch buf =
Lwt_io.read_into ch buf 0 max_chunk_size >>= function
| 0 -> Lwt_result.return ()
| n -> dst (Bytes.sub_string buf 0 n); readonly_tail ch buf
in

let rec open_tail fd cond buf i =
match switch with
| Some sw when not (Lwt_switch.is_on sw) -> Lwt_result.fail `Cancelled
| Some _ | None ->
let avail = min (t.len - i) max_chunk_size in
if avail > 0 then (
Lwt_unix.pread fd ~file_offset:i buf 0 avail >>= fun n ->
dst (Bytes.sub_string buf 0 n);
open_tail fd cond buf (i + avail)
) else (
match t.state with
| `Open _ -> Lwt_condition.wait cond >>= fun () -> open_tail fd cond buf i
| `Readonly _ | `Empty | `Finished -> Lwt_result.return ()
)
in

let interrupt th =
catch_cancel @@ fun () ->
Lwt_switch.add_hook_or_exec switch (fun () -> Lwt.cancel th; Lwt.return_unit) >>= fun () ->
th
in

match t.state with
| `Finished -> invalid_arg "tail: log is finished!"
| `Readonly path ->
let flags = [Unix.O_RDONLY; Unix.O_NONBLOCK; Unix.O_CLOEXEC] in
Lwt_io.(with_file ~mode:input ~flags) path @@ fun ch ->
let buf = Bytes.create max_chunk_size in
let rec aux () =
Lwt_io.read_into ch buf 0 max_chunk_size >>= function
| 0 -> Lwt_result.return ()
| n -> dst (Bytes.sub_string buf 0 n); aux ()
in
catch_cancel @@ fun () ->
let th = aux () in
Lwt_switch.add_hook_or_exec switch (fun () -> Lwt.cancel th; Lwt.return_unit) >>= fun () ->
th
interrupt (readonly_tail ch buf)
| `Empty -> Lwt_result.return ()
| `Open (fd, cond) ->
(* Dup [fd], which can still work after [fd] is closed. *)
with_dup fd @@ fun fd ->
let buf = Bytes.create max_chunk_size in
let rec aux i =
match switch with
| Some sw when not (Lwt_switch.is_on sw) -> Lwt_result.fail `Cancelled
| _ ->
let avail = min (t.len - i) max_chunk_size in
if avail > 0 then (
Lwt_unix.pread fd ~file_offset:i buf 0 avail >>= fun n ->
dst (Bytes.sub_string buf 0 n);
aux (i + avail)
) else (
match t.state with
| `Open _ -> Lwt_condition.wait cond >>= fun () -> aux i
| _ -> Lwt_result.return ()
)
in
catch_cancel @@ fun () ->
let th = aux 0 in
Lwt_switch.add_hook_or_exec switch (fun () -> Lwt.cancel th; Lwt.return_unit) >>= fun () ->
th
interrupt (open_tail fd cond buf 0)

let create path =
Lwt_unix.openfile path Lwt_unix.[O_CREAT; O_TRUNC; O_RDWR; O_CLOEXEC] 0o666 >|= fun fd ->
Expand Down Expand Up @@ -114,7 +116,7 @@ let empty = {
}

let copy ~src ~dst =
let buf = Bytes.create 4096 in
let buf = Bytes.create max_chunk_size in
let rec aux () =
Lwt_unix.read src buf 0 (Bytes.length buf) >>= function
| 0 -> Lwt.return_unit
Expand Down