forked from treeverse/lakeFS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcataloger_create_entry.go
65 lines (59 loc) · 1.9 KB
/
cataloger_create_entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package catalog
import (
"context"
"database/sql"
"fmt"
"github.com/treeverse/lakefs/db"
)
func (c *cataloger) CreateEntry(ctx context.Context, repository, branch string, entry Entry, params CreateEntryParams) error {
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "branch", IsValid: ValidateBranchName(branch)},
{Name: "path", IsValid: ValidatePath(entry.Path)},
}); err != nil {
return err
}
res, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
branchID, err := c.getBranchIDCache(tx, repository, branch)
if err != nil {
return nil, err
}
return insertEntry(tx, branchID, &entry)
}, c.txOpts(ctx)...)
if err != nil {
return err
}
// post request to dedup if needed
if params.Dedup.ID != "" {
c.dedupCh <- &dedupRequest{
Repository: repository,
StorageNamespace: params.Dedup.StorageNamespace,
DedupID: params.Dedup.ID,
Entry: &entry,
EntryCTID: res.(string),
}
}
return nil
}
func insertEntry(tx db.Tx, branchID int64, entry *Entry) (string, error) {
var (
ctid string
dbTime sql.NullTime
)
if entry.CreationDate.IsZero() {
dbTime.Valid = false
} else {
dbTime.Time = entry.CreationDate
dbTime.Valid = true
}
err := tx.Get(&ctid, `INSERT INTO catalog_entries (branch_id,path,physical_address,checksum,size,metadata,creation_date,is_expired)
VALUES ($1,$2,$3,$4,$5,$6, COALESCE($7, NOW()), $8)
ON CONFLICT (branch_id,path,min_commit)
DO UPDATE SET physical_address=$3, checksum=$4, size=$5, metadata=$6, creation_date=EXCLUDED.creation_date, is_expired=EXCLUDED.is_expired, max_commit=catalog_max_commit_id()
RETURNING ctid`,
branchID, entry.Path, entry.PhysicalAddress, entry.Checksum, entry.Size, entry.Metadata, dbTime, entry.Expired)
if err != nil {
return "", fmt.Errorf("insert entry: %w", err)
}
return ctid, nil
}