forked from treeverse/lakeFS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_branch_reader.go
126 lines (118 loc) · 3.43 KB
/
db_branch_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package catalog
import (
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/treeverse/lakefs/db"
)
type DBBranchReader struct {
tx db.Tx
branchID int64
buf []*DBReaderEntry
bufSize int
EOF bool
after string
commitID CommitID
}
func NewDBBranchReader(tx db.Tx, branchID int64, commitID CommitID, bufSize int, after string) *DBBranchReader {
return &DBBranchReader{
tx: tx,
branchID: branchID,
bufSize: bufSize,
after: after,
commitID: commitID,
}
}
func (r *DBBranchReader) Next() (*DBReaderEntry, error) {
if r.EOF {
return nil, nil
}
if r.buf == nil {
r.buf = make([]*DBReaderEntry, 0, r.bufSize)
q := sqBranchReaderSelectWithCommitID(r.branchID, r.commitID).Limit(uint64(r.bufSize)).Where("path > ?", r.after)
sql, args, err := q.PlaceholderFormat(sq.Dollar).ToSql()
if err != nil {
return nil, fmt.Errorf("next query format: %w", err)
}
err = r.tx.Select(&r.buf, sql, args...)
if err != nil {
return nil, fmt.Errorf("next select: %w", err)
}
}
// returns the significant entry of that Path, and remove rows with that Path from buf
l := len(r.buf)
if l == 0 {
r.EOF = true
return nil, nil
}
// last Path in buffer may have more rows that were not read yet
if r.buf[l-1].Path == r.buf[0].Path {
err := r.extendBuf()
if err != nil {
return nil, fmt.Errorf("error getting entry : %w", err)
}
l = len(r.buf)
}
if l == 0 {
r.EOF = true
return nil, nil
}
firstPath := r.buf[0].Path
i := 1
for i < l && r.buf[i].Path == firstPath {
i++
}
nextPK := findSignificantEntry(r.buf[:i], r.commitID)
r.buf = r.buf[i:] // discard first rows from buffer
return nextPK, nil
}
func findSignificantEntry(buf []*DBReaderEntry, lineageCommitID CommitID) *DBReaderEntry {
l := len(buf)
var ret *DBReaderEntry
if buf[l-1].MinCommit == 0 { // uncommitted.Will appear only when reading includes uncommitted entries
ret = buf[l-1]
} else {
ret = buf[0]
}
// if entry was deleted after the max commit that can be read, it must be set to undeleted
if lineageCommitID == CommittedID ||
lineageCommitID == UncommittedID ||
ret.MaxCommit == MaxCommitID {
return ret
}
if ret.MaxCommit >= lineageCommitID {
ret.MaxCommit = MaxCommitID
}
return ret
}
func sqBranchReaderSelectWithCommitID(branchID int64, commitID CommitID) sq.SelectBuilder {
q := sq.Select("branch_id", "path", "min_commit", "max_commit", "ctid").
From("catalog_entries").
Where("branch_id = ? ", branchID).
OrderBy("branch_id", "path", "min_commit desc")
if commitID == CommittedID {
q = q.Where("min_commit > 0")
} else if commitID > 0 {
q = q.Where("min_commit between 1 and ?", commitID)
}
return q
}
func (r *DBBranchReader) extendBuf() error {
l := len(r.buf)
if l == 0 {
return fmt.Errorf("empty buffer - internal error : %w", ErrUnexpected)
}
lastRow := r.buf[l-1]
completionQuery := sqBranchReaderSelectWithCommitID(r.branchID, r.commitID)
completionQuery = completionQuery.Where("Path = ? and min_commit < ?", lastRow.Path, lastRow.MinCommit)
continuationQuery := sqBranchReaderSelectWithCommitID(r.branchID, r.commitID).
Where("Path > ?", lastRow.Path).
Limit(uint64(r.bufSize))
unionQuery := completionQuery.
Prefix("(").
SuffixExpr(sq.ConcatExpr(")\n UNION ALL \n(", continuationQuery, ")"))
sql, args, err := unionQuery.PlaceholderFormat(sq.Dollar).ToSql()
if err != nil {
return fmt.Errorf("format union query: %w", err)
}
return r.tx.Select(&r.buf, sql, args...)
}