Skip to content

Commit

Permalink
Add a split function to create new chunk file
Browse files Browse the repository at this point in the history
  • Loading branch information
icristescu committed Oct 24, 2022
1 parent ae36632 commit 2be7958
Show file tree
Hide file tree
Showing 14 changed files with 413 additions and 60 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
`finished` callback (#2089, @Ngoguey42)
- Add `Gc.oldest_live_commit` which returns the key of the commit on which the
latest gc was called on. (#2110, @icristescu)
- Add `split` to create a new append-only file on disk, for the following
writes. (#2118, @icristescu)

### Changed

Expand Down
113 changes: 101 additions & 12 deletions src/irmin-pack/unix/chunked_suffix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
| `Inconsistent_store
| `Read_out_of_bounds ]

type add_new_error =
[ open_error
| Io.close_error
| `Pending_flush
| `File_exists of string
| `Multiple_empty_chunks ]

(** A simple container for chunks. *)
module Inventory : sig
type t
Expand All @@ -56,13 +63,24 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
(t, [> open_error ]) result

val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result

val add_new_appendable :
open_chunk:
(chunk_idx:int ->
is_legacy:bool ->
is_appendable:bool ->
(Ao.t, add_new_error) result) ->
t ->
(unit, [> add_new_error ]) result

val length : t -> int63
end = struct
type t = chunk Array.t
type t = { mutable chunks : chunk Array.t }

exception OpenInventoryError of open_error

let v = Array.init
let appendable t = Array.get t (Array.length t - 1)
let v num create = { chunks = Array.init num create }
let appendable t = Array.get t.chunks (Array.length t.chunks - 1)

let find ~off t =
let open Int63.Syntax in
Expand All @@ -72,23 +90,28 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let poff = suffix_off_to_chunk_poff c in
Int63.zero <= poff && poff < end_poff
in
match Array.find_opt find t with
match Array.find_opt find t.chunks with
| None -> raise (Errors.Pack_error `Read_out_of_bounds)
| Some c -> (c, suffix_off_to_chunk_poff c)

let end_offset_of_chunk start_offset ao =
let chunk_len = Ao.end_poff ao in
Int63.Syntax.(start_offset + chunk_len)

let is_legacy chunk_idx = chunk_idx = 0

let open_ ~start_idx ~chunk_num ~open_chunk =
let off_acc = ref Int63.zero in
let create_chunk i =
let suffix_off = !off_acc in
let is_appendable = i = chunk_num - 1 in
let chunk_idx = start_idx + i in
let is_legacy = chunk_idx = 0 in
let is_legacy = is_legacy chunk_idx in
let open_result = open_chunk ~chunk_idx ~is_legacy ~is_appendable in
match open_result with
| Error err -> raise (OpenInventoryError err)
| Ok ao ->
let chunk_len = Ao.end_poff ao in
(off_acc := Int63.Syntax.(suffix_off + chunk_len));
off_acc := end_offset_of_chunk suffix_off ao;
{ idx = chunk_idx; suffix_off; ao }
in
try Ok (v chunk_num create_chunk)
Expand All @@ -98,17 +121,58 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let close t =
(* Close immutable chunks, ignoring errors. *)
let _ =
Array.sub t 0 (Array.length t - 1)
Array.sub t.chunks 0 (Array.length t.chunks - 1)
|> Array.iter @@ fun chunk ->
let _ = Ao.close chunk.ao in
()
in
(* Close appendable chunk and keep error since this
is the one that can have a pending flush. *)
(appendable t).ao |> Ao.close

let wrap_error result =
Result.map_error
(fun err -> (err : add_new_error :> [> add_new_error ]))
result

let reopen_last_chunk ~open_chunk t =
(* Close the previous appendable chunk and reopen as non-appendable. *)
let open Result_syntax in
let ({ idx; ao; suffix_off } as last_chunk) = appendable t in
let is_legacy = is_legacy idx in
(* Compute the suffix_off for the following chunk. *)
let length = end_offset_of_chunk suffix_off ao in
let* () = Ao.close ao in
let* ao =
open_chunk ~chunk_idx:idx ~is_legacy ~is_appendable:false |> wrap_error
in
let pos = Array.length t.chunks - 1 in
t.chunks.(pos) <- { last_chunk with ao };
Ok length

let create_appendable_chunk ~open_chunk t suffix_off =
let open Result_syntax in
let next_id = succ (appendable t).idx in
let* ao =
open_chunk ~chunk_idx:next_id ~is_legacy:false ~is_appendable:true
in
Ok { idx = next_id; suffix_off; ao }

let add_new_appendable ~open_chunk t =
let open Result_syntax in
let* next_suffix_off = reopen_last_chunk ~open_chunk t in
let* chunk =
create_appendable_chunk ~open_chunk t next_suffix_off |> wrap_error
in
t.chunks <- Array.append t.chunks [| chunk |];
Ok ()

let length t =
let open Int63.Syntax in
Array.fold_left (fun sum c -> sum + Ao.end_poff c.ao) Int63.zero t.chunks
end

type t = { inventory : Inventory.t }
type t = { inventory : Inventory.t; root : string; dead_header_size : int }

let chunk_path = Layout.V4.suffix_chunk

Expand All @@ -122,7 +186,7 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
in
let chunk = { idx = chunk_idx; suffix_off = Int63.zero; ao } in
let inventory = Inventory.v 1 (Fun.const chunk) in
{ inventory }
{ inventory; root; dead_header_size = 0 }

(** A module to adjust values when mapping from chunks to append-only files *)
module Ao_shim = struct
Expand Down Expand Up @@ -156,7 +220,7 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
| false -> Ao.open_ro ~path ~end_poff ~dead_header_size
in
let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in
{ inventory }
{ inventory; root; dead_header_size }

let open_ro ~root ~end_poff ~dead_header_size ~start_idx ~chunk_num =
let open Result_syntax in
Expand All @@ -168,16 +232,41 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
Ao.open_ro ~path ~end_poff ~dead_header_size
in
let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in
{ inventory }
{ inventory; root; dead_header_size }

let appendable_ao t = (Inventory.appendable t.inventory).ao
let end_poff t = appendable_ao t |> Ao.end_poff
let length t = Inventory.length t.inventory

let read_exn t ~off ~len buf =
let chunk, poff = Inventory.find ~off t.inventory in
Ao.read_exn chunk.ao ~off:poff ~len buf

let append_exn t s = Ao.append_exn (appendable_ao t) s

let add_chunk ~auto_flush_threshold ~auto_flush_procedure t =
let open Result_syntax in
let* () =
let end_poff = end_poff t in
if Int63.(compare end_poff zero = 0) then Error `Multiple_empty_chunks
else Ok ()
in
let root = t.root in
let dead_header_size = t.dead_header_size in
let open_chunk ~chunk_idx ~is_legacy ~is_appendable =
let path = chunk_path ~root ~chunk_idx in
let* { dead_header_size; end_poff } =
Ao_shim.v ~path ~end_poff:Int63.zero ~dead_header_size ~is_legacy
~is_appendable
in
match is_appendable with
| true ->
Ao.create_rw ~path ~overwrite:true ~auto_flush_threshold
~auto_flush_procedure
| false -> Ao.open_ro ~path ~end_poff ~dead_header_size
in
Inventory.add_new_appendable ~open_chunk t.inventory

let close t = Inventory.close t.inventory
let empty_buffer t = appendable_ao t |> Ao.empty_buffer
let flush t = appendable_ao t |> Ao.flush
Expand Down
14 changes: 14 additions & 0 deletions src/irmin-pack/unix/chunked_suffix_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ module type S = sig
| `Inconsistent_store
| `Read_out_of_bounds ]

type add_new_error =
[ open_error
| Io.close_error
| `Pending_flush
| `File_exists of string
| `Multiple_empty_chunks ]

val create_rw :
root:string ->
start_idx:int ->
Expand All @@ -65,6 +72,12 @@ module type S = sig
chunk_num:int ->
(t, [> open_error ]) result

val add_chunk :
auto_flush_threshold:int ->
auto_flush_procedure:Ao.auto_flush_procedure ->
t ->
(unit, [> add_new_error ]) result

val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result
val empty_buffer : t -> bool
val flush : t -> (unit, [> Io.write_error ]) result
Expand All @@ -77,6 +90,7 @@ module type S = sig
Possible new names: [consistency_poff], [persisted_poff].
*)
val end_poff : t -> int63
val length : t -> int63
val read_exn : t -> off:int63 -> len:int -> bytes -> unit
val append_exn : t -> string -> unit

Expand Down
4 changes: 2 additions & 2 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
offsets are translated into real ones (i.e. in prefix or suffix offsets). *)
let end_offset t =
let open Int63.Syntax in
Suffix.end_poff (Fm.suffix t.fm) + suffix_start_offset t
Suffix.length (Fm.suffix t.fm) + suffix_start_offset t

module Suffix_arithmetic = struct
(* Adjust the read in suffix, as the global offset [off] is
Expand Down Expand Up @@ -319,7 +319,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
List.rev !preffix_chunks
| None -> []
in
let suffix_end_poff = Fm.Suffix.end_poff (Fm.suffix t.fm) in
let suffix_end_poff = Fm.Suffix.length (Fm.suffix t.fm) in
let suffix_start_offset = suffix_start_offset t in
let get_entry_accessor rem_len location poff =
let accessor =
Expand Down
4 changes: 3 additions & 1 deletion src/irmin-pack/unix/errors.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ type base_error =
| `Invalid_prefix_read of string
| `Invalid_mapping_read of string
| `Invalid_read_of_gced_object of string
| `Inconsistent_store ]
| `Inconsistent_store
| `Split_forbidden_during_batch
| `Multiple_empty_chunks ]
[@@deriving irmin ~pp]
(** [base_error] is the type of most errors that can occur in a [result], except
for errors that have associated exceptions (see below) and backend-specific
Expand Down
13 changes: 13 additions & 0 deletions src/irmin-pack/unix/ext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ module Maker (Config : Conf.S) = struct
let fsync t = File_manager.fsync t.fm |> Errs.raise_if_error
let reload t = File_manager.reload t.fm |> Errs.raise_if_error

let split t =
let open Result_syntax in
let readonly = Irmin_pack.Conf.readonly t.config in
let* () = if readonly then Error `Ro_not_allowed else Ok () in
let* () =
if t.during_batch then Error `Split_forbidden_during_batch
else Ok ()
in
File_manager.split t.fm

let split_exn repo = split repo |> Errs.raise_if_error

module Gc = struct
let cancel t =
match t.running_gc with
Expand Down Expand Up @@ -525,6 +537,7 @@ module Maker (Config : Conf.S) = struct
let reload = X.Repo.reload
let flush = X.Repo.flush
let fsync = X.Repo.fsync
let split = X.Repo.split_exn

module Gc = struct
type msg = [ `Msg of string ]
Expand Down
Loading

0 comments on commit 2be7958

Please sign in to comment.