Skip to content

Commit

Permalink
meta/tkv: skip updating nlink when the transactions conflict a lot
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyXSD committed Mar 3, 2023
1 parent a230794 commit 8cc7d05
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 29 deletions.
27 changes: 16 additions & 11 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type kvtxn interface {

type tkvClient interface {
name() string
txn(f func(*kvTxn) error) error
txn(f func(*kvTxn) error, retry int) error
scan(prefix []byte, handler func(key, value []byte)) error
reset(prefix []byte) error
close() error
Expand All @@ -62,6 +62,7 @@ type tkvClient interface {

type kvTxn struct {
kvtxn
retry int
}

func (tx *kvTxn) deleteKeys(prefix []byte) {
Expand Down Expand Up @@ -332,7 +333,7 @@ func (m *kvMeta) get(key []byte) ([]byte, error) {
err := m.client.txn(func(tx *kvTxn) error {
value = tx.get(key)
return nil
})
}, 0)
return value, err
}

Expand All @@ -344,7 +345,7 @@ func (m *kvMeta) scanKeys(prefix []byte) ([][]byte, error) {
return true
})
return nil
})
}, 0)
return keys, err
}

Expand All @@ -363,7 +364,7 @@ func (m *kvMeta) scanValues(prefix []byte, limit int, filter func(k, v []byte) b
return limit < 0 || c < limit
})
return nil
})
}, 0)
return values, err
}

Expand Down Expand Up @@ -685,7 +686,7 @@ func (m *kvMeta) txn(f func(tx *kvTxn) error, inodes ...Ino) error {
}
var lastErr error
for i := 0; i < 50; i++ {
err := m.client.txn(f)
err := m.client.txn(f, i)
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
Expand Down Expand Up @@ -1103,8 +1104,12 @@ func (m *kvMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode
now := time.Now()
if parent != TrashInode {
if _type == TypeDirectory {
pattr.Nlink++
updateParent = true
if tx.retry < 5 {
pattr.Nlink++
updateParent = true
} else {
logger.Warnf("Skip updating nlink of directory %d to reduce conflic", parent)
}
}
if updateParent || now.Sub(time.Unix(pattr.Mtime, int64(pattr.Mtimensec))) >= minUpdateTime {
pattr.Mtime = now.Unix()
Expand Down Expand Up @@ -1660,7 +1665,7 @@ func (m *kvMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry
err := m.client.txn(func(tx *kvTxn) error {
rs = tx.gets(keys...)
return nil
})
}, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -2110,7 +2115,7 @@ func (m *kvMeta) doCleanupDelayedSlices(edge int64) (int, error) {
return c < batch
})
return nil
}); err != nil {
}, 0); err != nil {
logger.Warnf("Scan delayed slices: %s", err)
return count, err
}
Expand Down Expand Up @@ -2259,7 +2264,7 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) {
if s.id > 0 && m.client.txn(func(tx *kvTxn) error {
refs = tx.incrBy(m.sliceKey(s.id, s.size), 0)
return nil
}) == nil && refs < 0 {
}, 0) == nil && refs < 0 {
m.deleteSlice(s.id, s.size)
}
}
Expand Down Expand Up @@ -2594,7 +2599,7 @@ func (m *kvMeta) dumpEntry(inode Ino, e *DumpedEntry) error {
e.Symlink = string(l)
}
return nil
})
}, 0)
}

func (m *kvMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, depth int, showProgress func(totalIncr, currentIncr int64)) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/tkv_badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (c *badgerClient) shouldRetry(err error) bool {
return err == badger.ErrConflict
}

func (c *badgerClient) txn(f func(*kvTxn) error) (err error) {
func (c *badgerClient) txn(f func(*kvTxn) error, retry int) (err error) {
t := c.client.NewTransaction(true)
defer t.Discard()
defer func() {
Expand All @@ -161,7 +161,7 @@ func (c *badgerClient) txn(f func(*kvTxn) error) (err error) {
}
}()
tx := &badgerTxn{t, c.client}
err = f(&kvTxn{tx})
err = f(&kvTxn{tx, retry})
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/tkv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (c *etcdClient) shouldRetry(err error) bool {
return errors.Is(err, conflicted)
}

func (c *etcdClient) txn(f func(*kvTxn) error) (err error) {
func (c *etcdClient) txn(f func(*kvTxn) error, retry int) (err error) {
ctx := context.Background()
tx := &etcdTxn{
ctx,
Expand All @@ -231,7 +231,7 @@ func (c *etcdClient) txn(f func(*kvTxn) error) (err error) {
}
}
}()
err = f(&kvTxn{tx})
err = f(&kvTxn{tx, retry})
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/tkv_fdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func (c *fdbClient) name() string {
return "fdb"
}

func (c *fdbClient) txn(f func(*kvTxn) error) error {
func (c *fdbClient) txn(f func(*kvTxn) error, retry int) error {
_, err := c.client.Transact(func(t fdb.Transaction) (interface{}, error) {
e := f(&kvTxn{&fdbTxn{t}})
e := f(&kvTxn{&fdbTxn{t}, retry})
return nil, e
})
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/meta/tkv_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ func (c *memKV) set(key string, value []byte) {
}
}

func (c *memKV) txn(f func(*kvTxn) error) error {
func (c *memKV) txn(f func(*kvTxn) error, retry int) error {
tx := &memTxn{
store: c,
observed: make(map[string]int),
buffer: make(map[string][]byte),
}
if err := f(&kvTxn{tx}); err != nil {
if err := f(&kvTxn{tx, retry}); err != nil {
return err
}

Expand Down Expand Up @@ -264,7 +264,7 @@ func (c *memKV) reset(prefix []byte) error {
return c.scan(prefix, func(key, value []byte) {
kt.delete(key)
})
})
}, 0)
}

func (c *memKV) close() error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/meta/tkv_prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ type prefixClient struct {
prefix []byte
}

func (c *prefixClient) txn(f func(*kvTxn) error) error {
func (c *prefixClient) txn(f func(*kvTxn) error, retry int) error {
return c.tkvClient.txn(func(tx *kvTxn) error {
return f(&kvTxn{&prefixTxn{tx, c.prefix}})
})
return f(&kvTxn{&prefixTxn{tx, c.prefix}, retry})
}, retry)
}

func (c *prefixClient) scan(prefix []byte, handler func(key, value []byte)) error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/meta/tkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func testTKV(t *testing.T, c tkvClient) {
if err := c.txn(func(kt *kvTxn) error {
f(kt)
return nil
}); err != nil {
}, 0); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -197,21 +197,21 @@ func testTKV(t *testing.T, c tkvClient) {
c.txn(func(tx *kvTxn) error {
count = tx.incrBy([]byte("counter"), -1)
return nil
})
}, 0)
if count != -1 {
t.Fatalf("counter should be -1, but got %d", count)
}
c.txn(func(tx *kvTxn) error {
count = tx.incrBy([]byte("counter"), 0)
return nil
})
}, 0)
if count != -1 {
t.Fatalf("counter should be -1, but got %d", count)
}
c.txn(func(tx *kvTxn) error {
count = tx.incrBy([]byte("counter"), 2)
return nil
})
}, 0)
if count != 1 {
t.Fatalf("counter should be 1, but got %d", count)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/tkv_tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *tikvClient) shouldRetry(err error) bool {
return strings.Contains(err.Error(), "write conflict") || strings.Contains(err.Error(), "TxnLockNotFound")
}

func (c *tikvClient) txn(f func(*kvTxn) error) (err error) {
func (c *tikvClient) txn(f func(*kvTxn) error, retry int) (err error) {
tx, err := c.client.Begin()
if err != nil {
return err
Expand All @@ -196,7 +196,7 @@ func (c *tikvClient) txn(f func(*kvTxn) error) (err error) {
}
}
}()
if err = f(&kvTxn{&tikvTxn{tx}}); err != nil {
if err = f(&kvTxn{&tikvTxn{tx}, retry}); err != nil {
return err
}
if !tx.IsReadOnly() {
Expand Down

0 comments on commit 8cc7d05

Please sign in to comment.