forked from treeverse/lakeFS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcataloger_delete_entry.go
78 lines (73 loc) · 2.3 KB
/
cataloger_delete_entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package catalog
import (
"context"
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/treeverse/lakefs/db"
)
func (c *cataloger) DeleteEntry(ctx context.Context, repository, branch string, path string) error {
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "branch", IsValid: ValidateBranchName(branch)},
}); err != nil {
return err
}
if path == "" {
return db.ErrNotFound
}
_, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
branchID, err := c.getBranchIDCache(tx, repository, branch)
if err != nil {
return nil, err
}
// delete uncommitted entry, if found first
res, err := tx.Exec("DELETE FROM catalog_entries WHERE branch_id=$1 AND path=$2 AND min_commit=0 AND max_commit=catalog_max_commit_id()",
branchID, path)
if err != nil {
return nil, fmt.Errorf("uncommitted: %w", err)
}
deletedUncommittedCount, err := res.RowsAffected()
if err != nil {
return nil, fmt.Errorf("rows affected: %w", err)
}
// get uncommitted entry based on path
lineage, err := getLineage(tx, branchID, UncommittedID)
if err != nil {
return nil, fmt.Errorf("get lineage: %w", err)
}
sql, args, err := psql.
Select("is_committed").
FromSelect(sqEntriesLineage(branchID, UncommittedID, lineage), "entries").
// Expired objects *can* be successfully deleted!
Where(sq.Eq{"path": path, "is_deleted": false}).
ToSql()
if err != nil {
return nil, fmt.Errorf("build sql: %w", err)
}
var isCommitted bool
err = tx.Get(&isCommitted, sql, args...)
committedNotFound := errors.Is(err, db.ErrNotFound)
if err != nil && !committedNotFound {
return nil, err
}
// 1. found committed record - add tombstone and return success
// 2. not found committed record:
// - if we deleted uncommitted - return success
// - if we didn't delete uncommitted - return not found
if isCommitted {
_, err = tx.Exec(`INSERT INTO catalog_entries (branch_id,path,physical_address,checksum,size,metadata,min_commit,max_commit)
VALUES ($1,$2,'','',0,'{}',0,0)`,
branchID, path)
if err != nil {
return nil, fmt.Errorf("tombstone: %w", err)
}
return nil, nil
}
if deletedUncommittedCount == 0 {
return nil, ErrEntryNotFound
}
return nil, nil
}, c.txOpts(ctx)...)
return err
}