Skip to content

Commit

Permalink
meta: add lock for baseMeta.fmt (#3956)
Browse files Browse the repository at this point in the history
Co-authored-by: Davies Liu <[email protected]>
  • Loading branch information
SandyXSD and davies authored Aug 2, 2023
1 parent 6478411 commit 6239442
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 56 deletions.
4 changes: 4 additions & 0 deletions cmd/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func quota(c *cli.Context) error {
removePassword(c.Args().Get(0))

m := meta.NewClient(c.Args().Get(0), nil)
_, err := m.Load(true)
if err != nil {
logger.Fatalf("Load setting: %s", err)
}
qs := make(map[string]*meta.Quota)
var strict, repair bool
if cmd == meta.QuotaSet {
Expand Down
98 changes: 51 additions & 47 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (m *baseMeta) GetDirStat(ctx Context, inode Ino) (stat *dirStat, st syscall
}

func (m *baseMeta) updateDirStat(ctx Context, ino Ino, length, space, inodes int64) {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
m.dirStatsLock.Lock()
Expand All @@ -351,7 +351,7 @@ func (m *baseMeta) updateParentStat(ctx Context, inode, parent Ino, length, spac
return
}
m.en.updateStats(space, 0)
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
if parent > 0 {
Expand Down Expand Up @@ -379,7 +379,7 @@ func (m *baseMeta) flushDirStat() {
}

func (m *baseMeta) doFlushDirStat() {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
m.dirStatsLock.Lock()
Expand Down Expand Up @@ -428,17 +428,19 @@ func (m *baseMeta) Load(checkVersion bool) (*Format, error) {
if err != nil {
return nil, err
}
var format Format
if err = json.Unmarshal(body, &format); err != nil {
var format = new(Format)
if err = json.Unmarshal(body, format); err != nil {
return nil, fmt.Errorf("json: %s", err)
}
if checkVersion {
if err = format.CheckVersion(); err != nil {
return nil, fmt.Errorf("check version: %s", err)
}
}
m.fmt = &format
return m.fmt, nil
m.Lock()
m.fmt = format
m.Unlock()
return format, nil
}

func (m *baseMeta) newSessionInfo() []byte {
Expand Down Expand Up @@ -546,19 +548,19 @@ func (m *baseMeta) refresh() {
}
m.sesMu.Unlock()

old := m.fmt
if _, err := m.Load(false); err != nil {
old := m.getFormat()
if format, err := m.Load(false); err != nil {
logger.Warnf("reload setting: %s", err)
} else if m.fmt.MetaVersion > MaxVersion {
logger.Fatalf("incompatible metadata version %d > max version %d", m.fmt.MetaVersion, MaxVersion)
} else if m.fmt.UUID != old.UUID {
logger.Fatalf("UUID changed from %s to %s", old, m.fmt.UUID)
} else if !reflect.DeepEqual(m.fmt, old) {
} else if format.MetaVersion > MaxVersion {
logger.Fatalf("incompatible metadata version %d > max version %d", format.MetaVersion, MaxVersion)
} else if format.UUID != old.UUID {
logger.Fatalf("UUID changed from %s to %s", old, format.UUID)
} else if !reflect.DeepEqual(format, old) {
m.msgCallbacks.Lock()
cbs := m.reloadCb
m.msgCallbacks.Unlock()
for _, cb := range cbs {
cb(m.fmt)
cb(format)
}
}

Expand Down Expand Up @@ -617,13 +619,14 @@ func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parents ...Ino)
if space <= 0 && inodes <= 0 {
return 0
}
if space > 0 && m.fmt.Capacity > 0 && atomic.LoadInt64(&m.usedSpace)+atomic.LoadInt64(&m.newSpace)+space > int64(m.fmt.Capacity) {
format := m.getFormat()
if space > 0 && format.Capacity > 0 && atomic.LoadInt64(&m.usedSpace)+atomic.LoadInt64(&m.newSpace)+space > int64(format.Capacity) {
return syscall.ENOSPC
}
if inodes > 0 && m.fmt.Inodes > 0 && atomic.LoadInt64(&m.usedInodes)+atomic.LoadInt64(&m.newInodes)+inodes > int64(m.fmt.Inodes) {
if inodes > 0 && format.Inodes > 0 && atomic.LoadInt64(&m.usedInodes)+atomic.LoadInt64(&m.newInodes)+inodes > int64(format.Inodes) {
return syscall.ENOSPC
}
if !m.GetFormat().DirStats {
if !format.DirStats {
return 0
}
for _, ino := range parents {
Expand All @@ -635,7 +638,7 @@ func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parents ...Ino)
}

func (m *baseMeta) loadQuotas() {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
quotas, err := m.en.doLoadQuotas(Background)
Expand Down Expand Up @@ -683,7 +686,7 @@ func (m *baseMeta) getDirParent(ctx Context, inode Ino) (Ino, syscall.Errno) {

// get inode of the first parent (or myself) with quota
func (m *baseMeta) getQuotaParent(ctx Context, inode Ino) Ino {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return 0
}
var q *Quota
Expand All @@ -707,7 +710,7 @@ func (m *baseMeta) getQuotaParent(ctx Context, inode Ino) Ino {
}

func (m *baseMeta) checkDirQuota(ctx Context, inode Ino, space, inodes int64) bool {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return false
}
var q *Quota
Expand All @@ -731,7 +734,7 @@ func (m *baseMeta) checkDirQuota(ctx Context, inode Ino, space, inodes int64) bo
}

func (m *baseMeta) updateDirQuota(ctx Context, inode Ino, space, inodes int64) {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
var q *Quota
Expand All @@ -758,7 +761,7 @@ func (m *baseMeta) flushQuotas() {
var newSpace, newInodes int64
for {
time.Sleep(time.Second * 3)
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
continue
}
m.quotaMu.RLock()
Expand Down Expand Up @@ -813,10 +816,7 @@ func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[

switch cmd {
case QuotaSet:
format, err := m.Load(true)
if err != nil {
return errors.Wrap(err, "load format")
}
format := m.getFormat()
if !format.DirStats {
format.DirStats = true
if err := m.en.doInit(format, false); err != nil {
Expand Down Expand Up @@ -1028,8 +1028,9 @@ func (m *baseMeta) statRootFs(ctx Context, totalspace, availspace, iused, iavail
if used < 0 {
used = 0
}
if m.fmt.Capacity > 0 {
*totalspace = m.fmt.Capacity
format := m.getFormat()
if format.Capacity > 0 {
*totalspace = format.Capacity
if *totalspace < uint64(used) {
*totalspace = uint64(used)
}
Expand All @@ -1044,11 +1045,11 @@ func (m *baseMeta) statRootFs(ctx Context, totalspace, availspace, iused, iavail
inodes = 0
}
*iused = uint64(inodes)
if m.fmt.Inodes > 0 {
if *iused > m.fmt.Inodes {
if format.Inodes > 0 {
if *iused > format.Inodes {
*iavail = 0
} else {
*iavail = m.fmt.Inodes - *iused
*iavail = format.Inodes - *iused
}
} else {
*iavail = 10 << 20
Expand Down Expand Up @@ -2146,15 +2147,14 @@ func (m *baseMeta) resolve(ctx Context, dpath string, inode *Ino) syscall.Errno
return 0
}

func (m *baseMeta) getFormat() *Format {
m.Lock()
defer m.Unlock()
return m.fmt
}

func (m *baseMeta) GetFormat() Format {
if m.fmt == nil {
var err error
m.fmt, err = m.Load(false)
if err != nil {
logger.Fatalf("Load format: %s", err)
}
}
return *m.fmt
return *m.getFormat()
}

func (m *baseMeta) CompactAll(ctx Context, threads int, bar *utils.Bar) syscall.Errno {
Expand Down Expand Up @@ -2230,7 +2230,10 @@ func (m *baseMeta) deleteSlice(id uint64, size uint32) {
}

func (m *baseMeta) toTrash(parent Ino) bool {
return m.fmt.TrashDays > 0 && !isTrash(parent)
if isTrash(parent) {
return false
}
return m.getFormat().TrashDays > 0
}

func (m *baseMeta) checkTrash(parent Ino, trash *Ino) syscall.Errno {
Expand Down Expand Up @@ -2286,8 +2289,9 @@ func (m *baseMeta) cleanupTrash() {
if ok, err := m.en.setIfSmall("lastCleanupTrash", time.Now().Unix(), int64(time.Hour.Seconds())*9/10); err != nil {
logger.Warnf("checking counter lastCleanupTrash: %s", err)
} else if ok {
go m.doCleanupTrash(false)
go m.cleanupDelayedSlices()
days := m.getFormat().TrashDays
go m.doCleanupTrash(days, false)
go m.cleanupDelayedSlices(days)
}
}
}
Expand Down Expand Up @@ -2406,17 +2410,17 @@ func (m *baseMeta) scanTrashFiles(ctx Context, scan trashFileScan) error {
return nil
}

func (m *baseMeta) doCleanupTrash(force bool) {
edge := time.Now().Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour)
func (m *baseMeta) doCleanupTrash(days int, force bool) {
edge := time.Now().Add(-time.Duration(24*days+1) * time.Hour)
if force {
edge = time.Now()
}
m.CleanupTrashBefore(Background, edge, nil)
}

func (m *baseMeta) cleanupDelayedSlices() {
func (m *baseMeta) cleanupDelayedSlices(days int) {
now := time.Now()
edge := now.Unix() - int64(m.fmt.TrashDays)*24*3600
edge := now.Unix() - int64(days)*24*3600
logger.Debugf("Cleanup delayed slices: started with edge %d", edge)
if count, err := m.en.doCleanupDelayedSlices(edge); err != nil {
logger.Warnf("Cleanup delayed slices: deleted %d slices in %v, but got error: %s", count, time.Since(now), err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,7 +1581,7 @@ func testTrash(t *testing.T, m Meta) {
if st := m.Rename(ctx2, TrashInode+1, "d", 1, "f", 0, &inode, attr); st != syscall.EPERM {
t.Fatalf("rename d -> f: %s", st)
}
m.getBase().doCleanupTrash(true)
m.getBase().doCleanupTrash(format.TrashDays, true)
if st := m.GetAttr(ctx2, TrashInode+1, attr); st != syscall.ENOENT {
t.Fatalf("getattr: %s", st)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3175,7 +3175,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
}
return nil
})
if err != nil || m.fmt.TrashDays == 0 {
if err != nil || m.getFormat().TrashDays == 0 {
return errno(err)
}

Expand Down Expand Up @@ -3902,7 +3902,7 @@ func (m *redisMeta) DumpMeta(w io.Writer, root Ino, keepSecret bool) (err error)
}

dm := &DumpedMeta{
Setting: *m.fmt,
Setting: *m.getFormat(),
Counters: &DumpedCounters{
UsedSpace: cs[0],
UsedInodes: cs[1],
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2904,7 +2904,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh
if err != nil {
return errno(err)
}
if m.fmt.TrashDays == 0 {
if m.getFormat().TrashDays == 0 {
return 0
}

Expand Down Expand Up @@ -3573,7 +3573,7 @@ func (m *dbMeta) DumpMeta(w io.Writer, root Ino, keepSecret bool) (err error) {
}

dm := DumpedMeta{
Setting: *m.fmt,
Setting: *m.getFormat(),
Counters: counters,
Sustained: sessions,
DelFiles: dels,
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2524,7 +2524,7 @@ func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh
}
}
}
if m.fmt.TrashDays == 0 {
if m.getFormat().TrashDays == 0 {
return 0
}

Expand Down Expand Up @@ -3113,7 +3113,7 @@ func (m *kvMeta) DumpMeta(w io.Writer, root Ino, keepSecret bool) (err error) {
}

dm := DumpedMeta{
Setting: *m.fmt,
Setting: *m.getFormat(),
Counters: &DumpedCounters{
UsedSpace: cs[0],
UsedInodes: cs[1],
Expand Down
5 changes: 3 additions & 2 deletions pkg/meta/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ func (m *baseMeta) GetSummary(ctx Context, inode Ino, summary *Summary, recursiv
func (m *baseMeta) getDirSummary(ctx Context, inode Ino, summary *Summary, recursive bool, strict bool, concurrent chan struct{}, updateProgress func(count uint64, bytes uint64)) syscall.Errno {
var entries []*Entry
var err syscall.Errno
if strict || !m.GetFormat().DirStats {
format := m.getFormat()
if strict || !format.DirStats {
err = m.en.doReaddir(ctx, inode, 1, &entries, -1)
} else {
var st *dirStat
Expand Down Expand Up @@ -410,7 +411,7 @@ func (m *baseMeta) getDirSummary(ctx Context, inode Ino, summary *Summary, recur
} else {
atomic.AddUint64(&summary.Files, 1)
}
if strict || !m.fmt.DirStats {
if strict || !format.DirStats {
atomic.AddUint64(&summary.Size, uint64(align4K(e.Attr.Length)))
if e.Attr.Typ == TypeFile {
atomic.AddUint64(&summary.Length, e.Attr.Length)
Expand Down

0 comments on commit 6239442

Please sign in to comment.