forked from treeverse/lakeFS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcataloger.go
411 lines (368 loc) · 12.6 KB
/
cataloger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
package catalog
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/treeverse/lakefs/catalog/params"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
)
const (
CatalogerCommitter = ""
DefaultPathDelimiter = "/"
dedupBatchSize = 10
dedupBatchTimeout = 50 * time.Millisecond
dedupChannelSize = 5000
dedupReportChannelSize = 5000
defaultCatalogerCacheSize = 1024
defaultCatalogerCacheExpiry = 20 * time.Second
defaultCatalogerCacheJitter = 5 * time.Second
MaxReadQueue = 10
defaultBatchReadEntryMaxWait = 15 * time.Second
defaultBatchScanTimeout = 500 * time.Microsecond
defaultBatchDelay = 1000 * time.Microsecond
defaultBatchEntriesReadAtOnce = 64
defaultBatchReaders = 8
defaultBatchWriteEntriesInsertSize = 10
)
type DedupReport struct {
Repository string
StorageNamespace string
DedupID string
Entry *Entry
NewPhysicalAddress string
Timestamp time.Time
}
type DedupParams struct {
ID string
StorageNamespace string
}
type ExpireResult struct {
Repository string
Branch string
PhysicalAddress string
InternalReference string
}
type RepositoryCataloger interface {
CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) error
GetRepository(ctx context.Context, repository string) (*Repository, error)
DeleteRepository(ctx context.Context, repository string) error
ListRepositories(ctx context.Context, limit int, after string) ([]*Repository, bool, error)
}
type BranchCataloger interface {
CreateBranch(ctx context.Context, repository, branch string, sourceBranch string) (*CommitLog, error)
DeleteBranch(ctx context.Context, repository, branch string) error
ListBranches(ctx context.Context, repository string, prefix string, limit int, after string) ([]*Branch, bool, error)
BranchExists(ctx context.Context, repository string, branch string) (bool, error)
GetBranchReference(ctx context.Context, repository, branch string) (string, error)
ResetBranch(ctx context.Context, repository, branch string) error
}
var ErrExpired = errors.New("expired from storage")
// ExpiryRows is a database iterator over ExpiryResults. Use Next to advance from row to row.
type ExpiryRows interface {
io.Closer
Next() bool
Err() error
// Read returns the current from ExpiryRows, or an error on failure. Call it only after
// successfully calling Next.
Read() (*ExpireResult, error)
}
// GetEntryParams configures what entries GetEntry returns.
type GetEntryParams struct {
// For entries to expired objects the Expired bit is set. If true, GetEntry returns
// successfully for expired entries, otherwise it returns the entry with ErrExpired.
ReturnExpired bool
}
type CreateEntryParams struct {
Dedup DedupParams
}
type EntryCataloger interface {
// GetEntry returns the current entry for path in repository branch reference. Returns
// the entry with ExpiredError if it has expired from underlying storage.
GetEntry(ctx context.Context, repository, reference string, path string, params GetEntryParams) (*Entry, error)
CreateEntry(ctx context.Context, repository, branch string, entry Entry, params CreateEntryParams) error
CreateEntries(ctx context.Context, repository, branch string, entries []Entry) error
DeleteEntry(ctx context.Context, repository, branch string, path string) error
ListEntries(ctx context.Context, repository, reference string, prefix, after string, delimiter string, limit int) ([]*Entry, bool, error)
ResetEntry(ctx context.Context, repository, branch string, path string) error
ResetEntries(ctx context.Context, repository, branch string, prefix string) error
// QueryEntriesToExpire returns ExpiryRows iterating over all objects to expire on
// repositoryName according to policy.
QueryEntriesToExpire(ctx context.Context, repositoryName string, policy *Policy) (ExpiryRows, error)
// MarkEntriesExpired marks all entries identified by expire as expired. It is a batch operation.
MarkEntriesExpired(ctx context.Context, repositoryName string, expireResults []*ExpireResult) error
// MarkObjectsForDeletion marks objects in catalog_object_dedup as "deleting" if all
// their entries are expired, and returns the new total number of objects marked (or an
// error). These objects are not yet safe to delete: there could be a race between
// marking objects as expired deduping newly-uploaded objects. See
// DeleteOrUnmarkObjectsForDeletion for that actual deletion.
MarkObjectsForDeletion(ctx context.Context, repositoryName string) (int64, error)
// DeleteOrUnmarkObjectsForDeletion scans objects in catalog_object_dedup for objects
// marked "deleting" and returns an iterator over physical addresses of those objects
// all of whose referring entries are still expired. If called after MarkEntriesExpired
// and MarkObjectsForDeletion this is safe, because no further entries can refer to
// expired objects. It also removes the "deleting" mark from those objects that have an
// entry _not_ marked as expiring and therefore were not on the returned rows.
DeleteOrUnmarkObjectsForDeletion(ctx context.Context, repositoryName string) (StringRows, error)
DedupReportChannel() chan *DedupReport
}
type MultipartUpdateCataloger interface {
CreateMultipartUpload(ctx context.Context, repository, uploadID, path, physicalAddress string, creationTime time.Time) error
GetMultipartUpload(ctx context.Context, repository, uploadID string) (*MultipartUpload, error)
DeleteMultipartUpload(ctx context.Context, repository, uploadID string) error
}
type Committer interface {
Commit(ctx context.Context, repository, branch string, message string, committer string, metadata Metadata) (*CommitLog, error)
GetCommit(ctx context.Context, repository, reference string) (*CommitLog, error)
ListCommits(ctx context.Context, repository, branch string, fromReference string, limit int) ([]*CommitLog, bool, error)
RollbackCommit(ctx context.Context, repository, reference string) error
}
type Differ interface {
Diff(ctx context.Context, repository, leftBranch string, rightBranch string, limit int, after string) (Differences, bool, error)
DiffUncommitted(ctx context.Context, repository, branch string, limit int, after string) (Differences, bool, error)
}
type Merger interface {
Merge(ctx context.Context, repository, leftBranch, rightBranch, committer, message string, metadata Metadata) (*MergeResult, error)
}
type Cataloger interface {
RepositoryCataloger
BranchCataloger
EntryCataloger
Committer
MultipartUpdateCataloger
Differ
Merger
io.Closer
}
type dedupRequest struct {
Repository string
StorageNamespace string
DedupID string
Entry *Entry
EntryCTID string
}
type CacheConfig struct {
Enabled bool
Size int
Expiry time.Duration
Jitter time.Duration
}
// cataloger main catalog implementation based on mvcc
type cataloger struct {
params.Catalog
log logging.Logger
db db.Database
wg sync.WaitGroup
cache Cache
dedupCh chan *dedupRequest
dedupReportEnabled bool
dedupReportCh chan *DedupReport
readEntryRequestChan chan *readRequest
}
type CatalogerOption func(*cataloger)
func WithCacheEnabled(b bool) CatalogerOption {
return func(c *cataloger) {
c.Cache.Enabled = b
}
}
func WithDedupReportChannel(b bool) CatalogerOption {
return func(c *cataloger) {
c.dedupReportEnabled = b
}
}
func WithParams(p params.Catalog) CatalogerOption {
return func(c *cataloger) {
if p.BatchRead.ScanTimeout != 0 {
c.BatchRead.ScanTimeout = p.BatchRead.ScanTimeout
}
if p.BatchRead.Delay != 0 {
c.BatchRead.Delay = p.BatchRead.Delay
}
if p.BatchRead.EntriesAtOnce != 0 {
c.BatchRead.EntriesAtOnce = p.BatchRead.EntriesAtOnce
}
if p.BatchRead.EntryMaxWait != 0 {
c.BatchRead.EntryMaxWait = p.BatchRead.EntryMaxWait
}
if p.BatchRead.Readers != 0 {
c.BatchRead.Readers = p.BatchRead.Readers
}
if p.BatchWrite.EntriesInsertSize != 0 {
c.BatchWrite.EntriesInsertSize = p.BatchWrite.EntriesInsertSize
}
if p.Cache.Size != 0 {
c.Cache.Size = p.Cache.Size
}
if p.Cache.Expiry != 0 {
c.Cache.Expiry = p.Cache.Expiry
}
if p.Cache.Jitter != 0 {
c.Cache.Jitter = p.Cache.Jitter
}
c.Cache.Enabled = p.Cache.Enabled
}
}
func NewCataloger(db db.Database, options ...CatalogerOption) Cataloger {
c := &cataloger{
log: logging.Default().WithField("service_name", "cataloger"),
db: db,
dedupCh: make(chan *dedupRequest, dedupChannelSize),
dedupReportEnabled: true,
Catalog: params.Catalog{
BatchRead: params.BatchRead{
EntryMaxWait: defaultBatchReadEntryMaxWait,
ScanTimeout: defaultBatchScanTimeout,
Delay: defaultBatchDelay,
EntriesAtOnce: defaultBatchEntriesReadAtOnce,
Readers: defaultBatchReaders,
},
BatchWrite: params.BatchWrite{
EntriesInsertSize: defaultBatchWriteEntriesInsertSize,
},
Cache: params.Cache{
Enabled: false,
Size: defaultCatalogerCacheSize,
Expiry: defaultCatalogerCacheExpiry,
Jitter: defaultCatalogerCacheJitter,
},
},
}
for _, opt := range options {
opt(c)
}
if c.Cache.Enabled {
c.cache = NewLRUCache(c.Cache.Size, c.Cache.Expiry, c.Cache.Jitter)
} else {
c.cache = &DummyCache{}
}
if c.dedupReportEnabled {
c.dedupReportCh = make(chan *DedupReport, dedupReportChannelSize)
}
c.processDedupBatches()
c.startReadOrchestrator()
return c
}
func (c *cataloger) startReadOrchestrator() {
c.readEntryRequestChan = make(chan *readRequest, MaxReadQueue)
c.wg.Add(1)
go c.readEntriesBatchOrchestrator()
}
func (c *cataloger) txOpts(ctx context.Context, opts ...db.TxOpt) []db.TxOpt {
o := []db.TxOpt{
db.WithContext(ctx),
db.WithLogger(c.log),
}
return append(o, opts...)
}
func (c *cataloger) Close() error {
if c != nil {
close(c.dedupCh)
close(c.readEntryRequestChan)
c.wg.Wait()
close(c.dedupReportCh)
}
return nil
}
func (c *cataloger) DedupReportChannel() chan *DedupReport {
return c.dedupReportCh
}
func (c *cataloger) processDedupBatches() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
batch := make([]*dedupRequest, 0, dedupBatchSize)
timer := time.NewTimer(dedupBatchTimeout)
for {
processBatch := false
select {
case req, ok := <-c.dedupCh:
if !ok {
return
}
batch = append(batch, req)
l := len(batch)
if l == 1 {
timer.Reset(dedupBatchTimeout)
}
if l == dedupBatchSize {
processBatch = true
}
case <-timer.C:
if len(batch) > 0 {
processBatch = true
}
}
if processBatch {
c.dedupBatch(batch)
batch = batch[:0]
}
}
}()
}
func (c *cataloger) dedupBatch(batch []*dedupRequest) {
ctx := context.Background()
dedupBatchSizeHistogram.Observe(float64(len(batch)))
res, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
addresses := make([]string, len(batch))
for i, r := range batch {
repoID, err := c.getRepositoryIDCache(tx, r.Repository)
if err != nil {
return nil, err
}
// add dedup record
res, err := tx.Exec(`INSERT INTO catalog_object_dedup (repository_id, dedup_id, physical_address) values ($1, decode($2,'hex'), $3)
ON CONFLICT DO NOTHING`,
repoID, r.DedupID, r.Entry.PhysicalAddress)
if err != nil {
return nil, err
}
if rowsAffected, err := res.RowsAffected(); err != nil {
return nil, err
} else if rowsAffected == 1 {
// new address was added - continue
continue
}
// fill the address into the right location
err = tx.Get(&addresses[i], `SELECT physical_address FROM catalog_object_dedup WHERE repository_id=$1 AND dedup_id=decode($2,'hex')`,
repoID, r.DedupID)
if err != nil {
return nil, err
}
// update the entry with new address physical address
_, err = tx.Exec(`UPDATE catalog_entries SET physical_address=$2 WHERE ctid=$1 AND physical_address=$3`,
r.EntryCTID, addresses[i], r.Entry.PhysicalAddress)
if err != nil {
return nil, err
}
}
return addresses, nil
}, c.txOpts(ctx)...)
if err != nil {
c.log.WithError(err).Errorf("Dedup batch failed (%d requests)", len(batch))
return
}
// call callbacks for each entry we updated
if c.dedupReportEnabled {
addresses := res.([]string)
for i, r := range batch {
if addresses[i] == "" {
continue
}
report := &DedupReport{
Timestamp: time.Now(),
Repository: r.Repository,
StorageNamespace: r.StorageNamespace,
Entry: r.Entry,
DedupID: r.DedupID,
NewPhysicalAddress: addresses[i],
}
select {
case c.dedupReportCh <- report:
default:
dedupRemoveObjectDroppedCounter.Inc()
}
}
}
}