@@ -32,6 +32,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
32
32
| `Inconsistent_store
33
33
| `Read_out_of_bounds ]
34
34
35
+ type add_new_error =
36
+ [ open_error
37
+ | Io .close_error
38
+ | `Pending_flush
39
+ | `File_exists of string
40
+ | `Multiple_empty_chunks ]
41
+
35
42
(* * A simple container for chunks. *)
36
43
module Inventory : sig
37
44
type t
@@ -56,13 +63,24 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
56
63
(t , [> open_error ]) result
57
64
58
65
val close : t -> (unit , [> Io .close_error | `Pending_flush ]) result
66
+
67
+ val add_new_appendable :
68
+ open_chunk :
69
+ (chunk_idx :int ->
70
+ is_legacy :bool ->
71
+ is_appendable :bool ->
72
+ (Ao .t , add_new_error ) result ) ->
73
+ t ->
74
+ (unit , [> add_new_error ]) result
75
+
76
+ val length : t -> int63
59
77
end = struct
60
- type t = chunk Array .t
78
+ type t = { mutable chunks : chunk Array .t }
61
79
62
80
exception OpenInventoryError of open_error
63
81
64
- let v = Array. init
65
- let appendable t = Array. get t (Array. length t - 1 )
82
+ let v num create = { chunks = Array. init num create }
83
+ let appendable t = Array. get t.chunks (Array. length t.chunks - 1 )
66
84
67
85
let find ~off t =
68
86
let open Int63.Syntax in
@@ -72,23 +90,28 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
72
90
let poff = suffix_off_to_chunk_poff c in
73
91
Int63. zero < = poff && poff < end_poff
74
92
in
75
- match Array. find_opt find t with
93
+ match Array. find_opt find t.chunks with
76
94
| None -> raise (Errors. Pack_error `Read_out_of_bounds )
77
95
| Some c -> (c, suffix_off_to_chunk_poff c)
78
96
97
+ let end_offset_of_chunk start_offset ao =
98
+ let chunk_len = Ao. end_poff ao in
99
+ Int63.Syntax. (start_offset + chunk_len)
100
+
101
+ let is_legacy chunk_idx = chunk_idx = 0
102
+
79
103
let open_ ~start_idx ~chunk_num ~open_chunk =
80
104
let off_acc = ref Int63. zero in
81
105
let create_chunk i =
82
106
let suffix_off = ! off_acc in
83
107
let is_appendable = i = chunk_num - 1 in
84
108
let chunk_idx = start_idx + i in
85
- let is_legacy = chunk_idx = 0 in
109
+ let is_legacy = is_legacy chunk_idx in
86
110
let open_result = open_chunk ~chunk_idx ~is_legacy ~is_appendable in
87
111
match open_result with
88
112
| Error err -> raise (OpenInventoryError err)
89
113
| Ok ao ->
90
- let chunk_len = Ao. end_poff ao in
91
- (off_acc := Int63.Syntax. (suffix_off + chunk_len));
114
+ off_acc := end_offset_of_chunk suffix_off ao;
92
115
{ idx = chunk_idx; suffix_off; ao }
93
116
in
94
117
try Ok (v chunk_num create_chunk)
@@ -98,17 +121,58 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
98
121
let close t =
99
122
(* Close immutable chunks, ignoring errors. *)
100
123
let _ =
101
- Array. sub t 0 (Array. length t - 1 )
124
+ Array. sub t.chunks 0 (Array. length t.chunks - 1 )
102
125
|> Array. iter @@ fun chunk ->
103
126
let _ = Ao. close chunk.ao in
104
127
()
105
128
in
106
129
(* Close appendable chunk and keep error since this
107
130
is the one that can have a pending flush. *)
108
131
(appendable t).ao |> Ao. close
132
+
133
+ let wrap_error result =
134
+ Result. map_error
135
+ (fun err -> (err : add_new_error :> [> add_new_error ] ))
136
+ result
137
+
138
+ let reopen_last_chunk ~open_chunk t =
139
+ (* Close the previous appendable chunk and reopen as non-appendable. *)
140
+ let open Result_syntax in
141
+ let ({ idx; ao; suffix_off } as last_chunk) = appendable t in
142
+ let is_legacy = is_legacy idx in
143
+ (* Compute the suffix_off for the following chunk. *)
144
+ let length = end_offset_of_chunk suffix_off ao in
145
+ let * () = Ao. close ao in
146
+ let * ao =
147
+ open_chunk ~chunk_idx: idx ~is_legacy ~is_appendable: false |> wrap_error
148
+ in
149
+ let pos = Array. length t.chunks - 1 in
150
+ t.chunks.(pos) < - { last_chunk with ao };
151
+ Ok length
152
+
153
+ let create_appendable_chunk ~open_chunk t suffix_off =
154
+ let open Result_syntax in
155
+ let next_id = succ (appendable t).idx in
156
+ let * ao =
157
+ open_chunk ~chunk_idx: next_id ~is_legacy: false ~is_appendable: true
158
+ in
159
+ Ok { idx = next_id; suffix_off; ao }
160
+
161
+ let add_new_appendable ~open_chunk t =
162
+ let open Result_syntax in
163
+ let * next_suffix_off = reopen_last_chunk ~open_chunk t in
164
+ let * chunk =
165
+ create_appendable_chunk ~open_chunk t next_suffix_off |> wrap_error
166
+ in
167
+ t.chunks < - Array. append t.chunks [| chunk |];
168
+ Ok ()
169
+
170
+ let length t =
171
+ let open Int63.Syntax in
172
+ Array. fold_left (fun sum c -> sum + Ao. end_poff c.ao) Int63. zero t.chunks
109
173
end
110
174
111
- type t = { inventory : Inventory .t }
175
+ type t = { inventory : Inventory .t ; root : string ; dead_header_size : int }
112
176
113
177
let chunk_path = Layout.V4. suffix_chunk
114
178
@@ -122,7 +186,7 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
122
186
in
123
187
let chunk = { idx = chunk_idx; suffix_off = Int63. zero; ao } in
124
188
let inventory = Inventory. v 1 (Fun. const chunk) in
125
- { inventory }
189
+ { inventory; root; dead_header_size = 0 }
126
190
127
191
(* * A module to adjust values when mapping from chunks to append-only files *)
128
192
module Ao_shim = struct
@@ -156,7 +220,7 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
156
220
| false -> Ao. open_ro ~path ~end_poff ~dead_header_size
157
221
in
158
222
let + inventory = Inventory. open_ ~start_idx ~chunk_num ~open_chunk in
159
- { inventory }
223
+ { inventory; root; dead_header_size }
160
224
161
225
let open_ro ~root ~end_poff ~dead_header_size ~start_idx ~chunk_num =
162
226
let open Result_syntax in
@@ -168,16 +232,41 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
168
232
Ao. open_ro ~path ~end_poff ~dead_header_size
169
233
in
170
234
let + inventory = Inventory. open_ ~start_idx ~chunk_num ~open_chunk in
171
- { inventory }
235
+ { inventory; root; dead_header_size }
172
236
173
237
let appendable_ao t = (Inventory. appendable t.inventory).ao
174
238
let end_poff t = appendable_ao t |> Ao. end_poff
239
+ let length t = Inventory. length t.inventory
175
240
176
241
let read_exn t ~off ~len buf =
177
242
let chunk, poff = Inventory. find ~off t.inventory in
178
243
Ao. read_exn chunk.ao ~off: poff ~len buf
179
244
180
245
let append_exn t s = Ao. append_exn (appendable_ao t) s
246
+
247
+ let add_chunk ~auto_flush_threshold ~auto_flush_procedure t =
248
+ let open Result_syntax in
249
+ let * () =
250
+ let end_poff = end_poff t in
251
+ if Int63. (compare end_poff zero = 0 ) then Error `Multiple_empty_chunks
252
+ else Ok ()
253
+ in
254
+ let root = t.root in
255
+ let dead_header_size = t.dead_header_size in
256
+ let open_chunk ~chunk_idx ~is_legacy ~is_appendable =
257
+ let path = chunk_path ~root ~chunk_idx in
258
+ let * { dead_header_size; end_poff } =
259
+ Ao_shim. v ~path ~end_poff: Int63. zero ~dead_header_size ~is_legacy
260
+ ~is_appendable
261
+ in
262
+ match is_appendable with
263
+ | true ->
264
+ Ao. create_rw ~path ~overwrite: true ~auto_flush_threshold
265
+ ~auto_flush_procedure
266
+ | false -> Ao. open_ro ~path ~end_poff ~dead_header_size
267
+ in
268
+ Inventory. add_new_appendable ~open_chunk t.inventory
269
+
181
270
let close t = Inventory. close t.inventory
182
271
let empty_buffer t = appendable_ao t |> Ao. empty_buffer
183
272
let flush t = appendable_ao t |> Ao. flush
0 commit comments