forked from treeverse/lakeFS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcataloger_get_entry.go
96 lines (86 loc) · 2.54 KB
/
cataloger_get_entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package catalog
import (
"context"
"fmt"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/treeverse/lakefs/db"
)
const useEntryReadBatched = true
func (c *cataloger) GetEntry(ctx context.Context, repository, reference string, path string, params GetEntryParams) (*Entry, error) {
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "reference", IsValid: ValidateReference(reference)},
}); err != nil {
return nil, err
}
if path == "" {
return nil, db.ErrNotFound
}
ref, err := ParseRef(reference)
if err != nil {
return nil, err
}
var entry *Entry
if useEntryReadBatched {
entry, err = c.getEntryBatchMaybeExpired(ctx, repository, *ref, path)
} else {
entry, err = c.getEntryMaybeExpired(ctx, repository, *ref, path)
}
if !params.ReturnExpired && entry != nil && entry.Expired {
return entry, ErrExpired
}
return entry, err
}
func (c *cataloger) getEntryBatchMaybeExpired(ctx context.Context, repository string, ref Ref, path string) (*Entry, error) {
replyChan := make(chan readResponse, 1) // used for a single return status message.
// channel written to and closed by readEntriesBatch
request := &readRequest{
bufKey: bufferingKey{
repository: repository,
ref: ref,
},
pathReq: pathRequest{
path: path,
replyChan: replyChan,
},
}
c.readEntryRequestChan <- request
select {
case response := <-replyChan:
return response.entry, response.err
case <-time.After(c.BatchRead.EntryMaxWait):
return nil, ErrReadEntryTimeout
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (c *cataloger) getEntryMaybeExpired(ctx context.Context, repository string, ref Ref, path string) (*Entry, error) {
res, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
branchID, err := c.getBranchIDCache(tx, repository, ref.Branch)
if err != nil {
return nil, err
}
lineage, err := getLineage(tx, branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("get lineage: %w", err)
}
sql, args, err := psql.
Select("path", "physical_address", "creation_date", "size", "checksum", "metadata", "is_expired").
FromSelect(sqEntriesLineage(branchID, ref.CommitID, lineage), "entries").
Where(sq.Eq{"path": path, "is_deleted": false}).
ToSql()
if err != nil {
return nil, fmt.Errorf("build sql: %w", err)
}
var ent Entry
if err := tx.Get(&ent, sql, args...); err != nil {
return nil, err
}
return &ent, nil
}, c.txOpts(ctx, db.ReadOnly())...)
if err != nil {
return nil, err
}
return res.(*Entry), nil
}