forked from treeverse/lakeFS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcataloger_commit.go
114 lines (101 loc) · 3.57 KB
/
cataloger_commit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package catalog
import (
"context"
"fmt"
"time"
"github.com/jmoiron/sqlx"
"github.com/treeverse/lakefs/db"
)
func (c *cataloger) Commit(ctx context.Context, repository, branch string, message string, committer string, metadata Metadata) (*CommitLog, error) {
if err := Validate(ValidateFields{
{Name: "branch", IsValid: ValidateBranchName(branch)},
{Name: "message", IsValid: ValidateCommitMessage(message)},
{Name: "committer", IsValid: ValidateCommitter(committer)},
}); err != nil {
return nil, err
}
res, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
branchID, err := getBranchID(tx, repository, branch, LockTypeUpdate)
if err != nil {
return nil, fmt.Errorf("get branch id: %w", err)
}
lastCommitID, err := getLastCommitIDByBranchID(tx, branchID)
if err != nil {
return nil, fmt.Errorf("last commit id: %w", err)
}
committedAffected, err := commitUpdateCommittedEntriesWithMaxCommit(tx, branchID, lastCommitID)
if err != nil {
return nil, fmt.Errorf("update commit entries: %w", err)
}
_, err = commitDeleteUncommittedTombstones(tx, branchID, lastCommitID)
if err != nil {
return nil, fmt.Errorf("delete uncommitted tombstones: %w", err)
}
// uncommitted to committed entries
commitID, err := getNextCommitID(tx)
if err != nil {
return nil, fmt.Errorf("next commit id: %w", err)
}
// commit entries (include the tombstones)
affectedNew, err := commitEntries(tx, branchID, commitID)
if err != nil {
return nil, fmt.Errorf("commit entries: %w", err)
}
if (affectedNew + committedAffected) == 0 {
return nil, ErrNothingToCommit
}
// insert commit record
var creationDate time.Time
if err = tx.Get(&creationDate,
`INSERT INTO catalog_commits (branch_id,commit_id,committer,message,creation_date,metadata,merge_type,previous_commit_id)
VALUES ($1,$2,$3,$4,transaction_timestamp(),$5,$6,$7)
RETURNING creation_date`,
branchID, commitID, committer, message, metadata, RelationTypeNone, lastCommitID,
); err != nil {
return nil, err
}
reference := MakeReference(branch, commitID)
parentReference := MakeReference(branch, lastCommitID)
commitLog := &CommitLog{
Committer: committer,
Message: message,
CreationDate: creationDate,
Metadata: metadata,
Reference: reference,
Parents: []string{parentReference},
}
return commitLog, nil
}, c.txOpts(ctx)...)
if err != nil {
return nil, err
}
return res.(*CommitLog), nil
}
func commitUpdateCommittedEntriesWithMaxCommit(tx sqlx.Execer, branchID int64, commitID CommitID) (int64, error) {
res, err := tx.Exec(`UPDATE catalog_entries_v SET max_commit = $2
WHERE branch_id = $1 AND is_committed
AND max_commit = catalog_max_commit_id()
AND path in (SELECT path FROM catalog_entries_v WHERE branch_id = $1 AND NOT is_committed)`,
branchID, commitID)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func commitDeleteUncommittedTombstones(tx sqlx.Execer, branchID int64, commitID CommitID) (int64, error) {
res, err := tx.Exec(`DELETE FROM catalog_entries_v WHERE branch_id = $1 AND NOT is_committed AND is_tombstone AND path IN (
SELECT path FROM catalog_entries_v WHERE branch_id = $1 AND is_committed AND max_commit = $2)`,
branchID, commitID)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func commitEntries(tx sqlx.Execer, branchID int64, commitID CommitID) (int64, error) {
res, err := tx.Exec(`UPDATE catalog_entries_v SET min_commit = $2 WHERE branch_id = $1 AND NOT is_committed`,
branchID, commitID)
if err != nil {
return 0, err
}
return res.RowsAffected()
}