Skip to content

Commit

Permalink
irmin-pack: add v1 of chunked suffix module
Browse files Browse the repository at this point in the history
  • Loading branch information
metanivek committed Oct 13, 2022
1 parent 6854d5b commit 297a630
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 0 deletions.
189 changes: 189 additions & 0 deletions src/irmin-pack/unix/chunked_suffix.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
(*
* Copyright (c) 2022-2022 Tarides <[email protected]>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open Import
include Chunked_suffix_intf

module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
module Io = Io
module Errs = Errs
module Ao = Append_only_file.Make (Io) (Errs)

type chunk = { idx : int; start_off : int63; ao : Ao.t }

let chunk_off_to_poff c off = Int63.Syntax.(off - c.start_off)

type create_error = Io.create_error

type open_error =
[ Io.open_error
| `Closed
| `Invalid_argument
| `Inconsistent_store
| `Read_out_of_bounds ]

(** A simple container for chunks. *)
module Inventory : sig
type t

val v : int -> (int -> chunk) -> t
val iter_skip_appendable : (chunk -> unit) -> t -> unit
val appendable : t -> chunk
val find : off:int63 -> t -> chunk

val open_ :
start_idx:int ->
chunk_num:int ->
open_chunk:
(chunk_idx:int -> is_appendable:bool -> (Ao.t, open_error) result) ->
(t, [> open_error ]) result
end = struct
type t = chunk Array.t

exception OpenInventoryError of open_error

let v = Array.init

let iter_skip_appendable f t =
Array.sub t 0 (Array.length t - 1) |> Array.iter f

let appendable t = Array.get t (Array.length t - 1)

let find ~off t =
let find c =
let end_poff = Ao.end_poff c.ao in
let is_after_start = c.start_off <= off in
let is_before_end = chunk_off_to_poff c off < end_poff in
is_after_start && is_before_end
in
match Array.find_opt find t with
| None -> raise (Errors.Pack_error `Read_out_of_bounds)
| Some c -> c

let open_ ~start_idx ~chunk_num ~open_chunk =
let off = ref Int63.zero in
let create_chunk i =
let start_off = !off in
let chunk_idx = start_idx + i in
let is_appendable = i = chunk_num - 1 in
let open_result = open_chunk ~chunk_idx ~is_appendable in
match open_result with
| Error err -> raise (OpenInventoryError err)
| Ok ao ->
let end_poff = Ao.end_poff ao in
(off := Int63.Syntax.(!off + end_poff));
{ idx = chunk_idx; start_off; ao }
in
try Ok (v chunk_num create_chunk)
with OpenInventoryError err ->
Error (err : open_error :> [> open_error ])
end

type t = { inventory : Inventory.t }

(* A lightweight wrapper around creating/opening append only files as chunks *)
module Ao_chunk = struct
let path = Layout.V4.suffix_chunk

let create_rw ~root ~chunk_idx ~overwrite ~auto_flush_threshold
~auto_flush_procedure =
let path = path ~root ~chunk_idx in
Ao.create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_procedure

let open_rw ~root ~chunk_idx ~auto_flush_threshold ~auto_flush_procedure
~end_poff ~dead_header_size =
let path = path ~root ~chunk_idx in
Ao.open_rw ~path ~end_poff ~dead_header_size ~auto_flush_threshold
~auto_flush_procedure

let open_ro ~root ~chunk_idx ~end_poff ~dead_header_size ~is_appendable =
let open Result_syntax in
let path = path ~root ~chunk_idx in
(* Passing [end_poff] is required by Ao for its consistency check (and
only applies to the appendable chunk).
TODO: We could change its API in the future so we can not do
[Io.size_of_path], which is admittedly very silly.*)
let* end_poff =
if is_appendable then Ok end_poff else Io.size_of_path path
in
Ao.open_ro ~path ~end_poff ~dead_header_size
end

let create_rw ~root ~start_idx ~overwrite ~auto_flush_threshold
~auto_flush_procedure =
let open Result_syntax in
let chunk_idx = start_idx in
let+ ao =
Ao_chunk.create_rw ~root ~chunk_idx ~overwrite ~auto_flush_threshold
~auto_flush_procedure
in
let chunk = { idx = chunk_idx; start_off = Int63.zero; ao } in
let inventory = Inventory.v 1 (Fun.const chunk) in
{ inventory }

let open_rw ~root ~end_poff ~start_idx ~chunk_num ~dead_header_size
~auto_flush_threshold ~auto_flush_procedure =
let open Result_syntax in
let open_chunk ~chunk_idx ~is_appendable =
match is_appendable with
| true ->
Ao_chunk.open_rw ~root ~chunk_idx ~end_poff ~auto_flush_threshold
~auto_flush_procedure ~dead_header_size
| false ->
Ao_chunk.open_ro ~root ~chunk_idx ~end_poff ~dead_header_size
~is_appendable
in
let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in
{ inventory }

let open_ro ~root ~end_poff ~dead_header_size ~start_idx ~chunk_num =
let open Result_syntax in
let open_chunk = Ao_chunk.open_ro ~root ~end_poff ~dead_header_size in
let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in
{ inventory }

let appendable_ao t = (Inventory.appendable t.inventory).ao
let end_poff t = appendable_ao t |> Ao.end_poff

let read_exn t ~off ~len buf =
let chunk = Inventory.find ~off t.inventory in
let poff = chunk_off_to_poff chunk off in
Ao.read_exn chunk.ao ~off:poff ~len buf

let append_exn t s = Ao.append_exn (appendable_ao t) s

let close t =
(* Close immutable chunks, ignoring errors. *)
let _ =
t.inventory
|> Inventory.iter_skip_appendable @@ fun chunk ->
let _ = Ao.close chunk.ao in
()
in
(* Close appendable chunk and keep error since this
is the one that can have a pending flush. *)
appendable_ao t |> Ao.close

let empty_buffer t = appendable_ao t |> Ao.empty_buffer
let flush t = appendable_ao t |> Ao.flush
let fsync t = appendable_ao t |> Ao.fsync

let refresh_end_poff t new_end_poff =
Ao.refresh_end_poff (appendable_ao t) new_end_poff

let readonly t = appendable_ao t |> Ao.readonly
let auto_flush_threshold t = appendable_ao t |> Ao.auto_flush_threshold
end
18 changes: 18 additions & 0 deletions src/irmin-pack/unix/chunked_suffix.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
(*
* Copyright (c) 2022-2022 Tarides <[email protected]>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

include Chunked_suffix_intf.Sigs
(** @inline *)
85 changes: 85 additions & 0 deletions src/irmin-pack/unix/chunked_suffix_intf.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
(*
* Copyright (c) 2022-2022 Tarides <[email protected]>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open Import

module type S = sig
(** Abstraction for a chunked suffix. It is functionally equivalent to
{!Append_only_file} but with a chunked implementation that is
parameterized by
- [start_idx] for {!create_rw} to know the starting file name, and
- [start_idx] and [chunk_num] for the open functions to know the starting
file name and how many files there are. *)

module Io : Io.S
module Errs : Io_errors.S
module Ao : Append_only_file.S

type t
type create_error = Io.create_error

type open_error =
[ Io.open_error
| `Closed
| `Invalid_argument
| `Inconsistent_store
| `Read_out_of_bounds ]

val create_rw :
root:string ->
start_idx:int ->
overwrite:bool ->
auto_flush_threshold:int ->
auto_flush_procedure:Ao.auto_flush_procedure ->
(t, [> create_error ]) result

val open_rw :
root:string ->
end_poff:int63 ->
start_idx:int ->
chunk_num:int ->
dead_header_size:int ->
auto_flush_threshold:int ->
auto_flush_procedure:Ao.auto_flush_procedure ->
(t, [> open_error ]) result

val open_ro :
root:string ->
end_poff:int63 ->
dead_header_size:int ->
start_idx:int ->
chunk_num:int ->
(t, [> open_error ]) result

val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result
val empty_buffer : t -> bool
val flush : t -> (unit, [> Io.write_error ]) result
val fsync : t -> (unit, [> Io.write_error ]) result
val end_poff : t -> int63
val read_exn : t -> off:int63 -> len:int -> bytes -> unit
val append_exn : t -> string -> unit
val refresh_end_poff : t -> int63 -> (unit, [> `Rw_not_allowed ]) result
val readonly : t -> bool
val auto_flush_threshold : t -> int option
end

module type Sigs = sig
module type S = S

module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) :
S with module Io = Io and module Errs = Errs
end
14 changes: 14 additions & 0 deletions src/irmin-pack/unix/import.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ let src = Logs.Src.create "irmin-pack.unix" ~doc:"irmin-pack unix backend"

module Log = (val Logs.src_log src : Logs.LOG)

module Array = struct
include Array

let find_opt p a =
let n = length a in
let rec loop i =
if i = n then None
else
let x = get a i in
if p x then Some x else loop (succ i)
in
loop 0
end

module Int63 = struct
include Optint.Int63

Expand Down

0 comments on commit 297a630

Please sign in to comment.