Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta: add lock for baseMeta.fmt #3956

Merged
merged 2 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func quota(c *cli.Context) error {
removePassword(c.Args().Get(0))

m := meta.NewClient(c.Args().Get(0), nil)
_, err := m.Load(true)
if err != nil {
logger.Fatalf("Load setting: %s", err)
}
qs := make(map[string]*meta.Quota)
var strict, repair bool
if cmd == meta.QuotaSet {
Expand Down
98 changes: 51 additions & 47 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (m *baseMeta) GetDirStat(ctx Context, inode Ino) (stat *dirStat, st syscall
}

func (m *baseMeta) updateDirStat(ctx Context, ino Ino, length, space, inodes int64) {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
m.dirStatsLock.Lock()
Expand All @@ -351,7 +351,7 @@ func (m *baseMeta) updateParentStat(ctx Context, inode, parent Ino, length, spac
return
}
m.en.updateStats(space, 0)
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
if parent > 0 {
Expand Down Expand Up @@ -379,7 +379,7 @@ func (m *baseMeta) flushDirStat() {
}

func (m *baseMeta) doFlushDirStat() {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
m.dirStatsLock.Lock()
Expand Down Expand Up @@ -428,17 +428,19 @@ func (m *baseMeta) Load(checkVersion bool) (*Format, error) {
if err != nil {
return nil, err
}
var format Format
if err = json.Unmarshal(body, &format); err != nil {
var format = new(Format)
if err = json.Unmarshal(body, format); err != nil {
return nil, fmt.Errorf("json: %s", err)
}
if checkVersion {
if err = format.CheckVersion(); err != nil {
return nil, fmt.Errorf("check version: %s", err)
}
}
m.fmt = &format
return m.fmt, nil
m.Lock()
m.fmt = format
m.Unlock()
return format, nil
}

func (m *baseMeta) newSessionInfo() []byte {
Expand Down Expand Up @@ -546,19 +548,19 @@ func (m *baseMeta) refresh() {
}
m.sesMu.Unlock()

old := m.fmt
if _, err := m.Load(false); err != nil {
old := m.getFormat()
if format, err := m.Load(false); err != nil {
logger.Warnf("reload setting: %s", err)
} else if m.fmt.MetaVersion > MaxVersion {
logger.Fatalf("incompatible metadata version %d > max version %d", m.fmt.MetaVersion, MaxVersion)
} else if m.fmt.UUID != old.UUID {
logger.Fatalf("UUID changed from %s to %s", old, m.fmt.UUID)
} else if !reflect.DeepEqual(m.fmt, old) {
} else if format.MetaVersion > MaxVersion {
logger.Fatalf("incompatible metadata version %d > max version %d", format.MetaVersion, MaxVersion)
} else if format.UUID != old.UUID {
logger.Fatalf("UUID changed from %s to %s", old, format.UUID)
} else if !reflect.DeepEqual(format, old) {
m.msgCallbacks.Lock()
cbs := m.reloadCb
m.msgCallbacks.Unlock()
for _, cb := range cbs {
cb(m.fmt)
cb(format)
}
}

Expand Down Expand Up @@ -617,13 +619,14 @@ func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parents ...Ino)
if space <= 0 && inodes <= 0 {
return 0
}
if space > 0 && m.fmt.Capacity > 0 && atomic.LoadInt64(&m.usedSpace)+atomic.LoadInt64(&m.newSpace)+space > int64(m.fmt.Capacity) {
format := m.getFormat()
if space > 0 && format.Capacity > 0 && atomic.LoadInt64(&m.usedSpace)+atomic.LoadInt64(&m.newSpace)+space > int64(format.Capacity) {
return syscall.ENOSPC
}
if inodes > 0 && m.fmt.Inodes > 0 && atomic.LoadInt64(&m.usedInodes)+atomic.LoadInt64(&m.newInodes)+inodes > int64(m.fmt.Inodes) {
if inodes > 0 && format.Inodes > 0 && atomic.LoadInt64(&m.usedInodes)+atomic.LoadInt64(&m.newInodes)+inodes > int64(format.Inodes) {
return syscall.ENOSPC
}
if !m.GetFormat().DirStats {
if !format.DirStats {
return 0
}
for _, ino := range parents {
Expand All @@ -635,7 +638,7 @@ func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parents ...Ino)
}

func (m *baseMeta) loadQuotas() {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
quotas, err := m.en.doLoadQuotas(Background)
Expand Down Expand Up @@ -683,7 +686,7 @@ func (m *baseMeta) getDirParent(ctx Context, inode Ino) (Ino, syscall.Errno) {

// get inode of the first parent (or myself) with quota
func (m *baseMeta) getQuotaParent(ctx Context, inode Ino) Ino {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return 0
}
var q *Quota
Expand All @@ -707,7 +710,7 @@ func (m *baseMeta) getQuotaParent(ctx Context, inode Ino) Ino {
}

func (m *baseMeta) checkDirQuota(ctx Context, inode Ino, space, inodes int64) bool {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return false
}
var q *Quota
Expand All @@ -731,7 +734,7 @@ func (m *baseMeta) checkDirQuota(ctx Context, inode Ino, space, inodes int64) bo
}

func (m *baseMeta) updateDirQuota(ctx Context, inode Ino, space, inodes int64) {
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
return
}
var q *Quota
Expand All @@ -758,7 +761,7 @@ func (m *baseMeta) flushQuotas() {
var newSpace, newInodes int64
for {
time.Sleep(time.Second * 3)
if !m.GetFormat().DirStats {
if !m.getFormat().DirStats {
continue
}
m.quotaMu.RLock()
Expand Down Expand Up @@ -813,10 +816,7 @@ func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[

switch cmd {
case QuotaSet:
format, err := m.Load(true)
if err != nil {
return errors.Wrap(err, "load format")
}
format := m.getFormat()
if !format.DirStats {
format.DirStats = true
if err := m.en.doInit(format, false); err != nil {
Expand Down Expand Up @@ -1028,8 +1028,9 @@ func (m *baseMeta) statRootFs(ctx Context, totalspace, availspace, iused, iavail
if used < 0 {
used = 0
}
if m.fmt.Capacity > 0 {
*totalspace = m.fmt.Capacity
format := m.getFormat()
if format.Capacity > 0 {
*totalspace = format.Capacity
if *totalspace < uint64(used) {
*totalspace = uint64(used)
}
Expand All @@ -1044,11 +1045,11 @@ func (m *baseMeta) statRootFs(ctx Context, totalspace, availspace, iused, iavail
inodes = 0
}
*iused = uint64(inodes)
if m.fmt.Inodes > 0 {
if *iused > m.fmt.Inodes {
if format.Inodes > 0 {
if *iused > format.Inodes {
*iavail = 0
} else {
*iavail = m.fmt.Inodes - *iused
*iavail = format.Inodes - *iused
}
} else {
*iavail = 10 << 20
Expand Down Expand Up @@ -2146,15 +2147,14 @@ func (m *baseMeta) resolve(ctx Context, dpath string, inode *Ino) syscall.Errno
return 0
}

func (m *baseMeta) getFormat() *Format {
m.Lock()
defer m.Unlock()
return m.fmt
}

func (m *baseMeta) GetFormat() Format {
if m.fmt == nil {
var err error
m.fmt, err = m.Load(false)
if err != nil {
logger.Fatalf("Load format: %s", err)
}
}
return *m.fmt
return *m.getFormat()
}

func (m *baseMeta) CompactAll(ctx Context, threads int, bar *utils.Bar) syscall.Errno {
Expand Down Expand Up @@ -2230,7 +2230,10 @@ func (m *baseMeta) deleteSlice(id uint64, size uint32) {
}

func (m *baseMeta) toTrash(parent Ino) bool {
return m.fmt.TrashDays > 0 && !isTrash(parent)
if isTrash(parent) {
return false
}
return m.getFormat().TrashDays > 0
}

func (m *baseMeta) checkTrash(parent Ino, trash *Ino) syscall.Errno {
Expand Down Expand Up @@ -2286,8 +2289,9 @@ func (m *baseMeta) cleanupTrash() {
if ok, err := m.en.setIfSmall("lastCleanupTrash", time.Now().Unix(), int64(time.Hour.Seconds())*9/10); err != nil {
logger.Warnf("checking counter lastCleanupTrash: %s", err)
} else if ok {
go m.doCleanupTrash(false)
go m.cleanupDelayedSlices()
days := m.getFormat().TrashDays
go m.doCleanupTrash(days, false)
go m.cleanupDelayedSlices(days)
}
}
}
Expand Down Expand Up @@ -2406,17 +2410,17 @@ func (m *baseMeta) scanTrashFiles(ctx Context, scan trashFileScan) error {
return nil
}

func (m *baseMeta) doCleanupTrash(force bool) {
edge := time.Now().Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour)
func (m *baseMeta) doCleanupTrash(days int, force bool) {
edge := time.Now().Add(-time.Duration(24*days+1) * time.Hour)
if force {
edge = time.Now()
}
m.CleanupTrashBefore(Background, edge, nil)
}

func (m *baseMeta) cleanupDelayedSlices() {
func (m *baseMeta) cleanupDelayedSlices(days int) {
now := time.Now()
edge := now.Unix() - int64(m.fmt.TrashDays)*24*3600
edge := now.Unix() - int64(days)*24*3600
logger.Debugf("Cleanup delayed slices: started with edge %d", edge)
if count, err := m.en.doCleanupDelayedSlices(edge); err != nil {
logger.Warnf("Cleanup delayed slices: deleted %d slices in %v, but got error: %s", count, time.Since(now), err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,7 +1581,7 @@ func testTrash(t *testing.T, m Meta) {
if st := m.Rename(ctx2, TrashInode+1, "d", 1, "f", 0, &inode, attr); st != syscall.EPERM {
t.Fatalf("rename d -> f: %s", st)
}
m.getBase().doCleanupTrash(true)
m.getBase().doCleanupTrash(format.TrashDays, true)
if st := m.GetAttr(ctx2, TrashInode+1, attr); st != syscall.ENOENT {
t.Fatalf("getattr: %s", st)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3175,7 +3175,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
}
return nil
})
if err != nil || m.fmt.TrashDays == 0 {
if err != nil || m.getFormat().TrashDays == 0 {
return errno(err)
}

Expand Down Expand Up @@ -3902,7 +3902,7 @@ func (m *redisMeta) DumpMeta(w io.Writer, root Ino, keepSecret bool) (err error)
}

dm := &DumpedMeta{
Setting: *m.fmt,
Setting: *m.getFormat(),
Counters: &DumpedCounters{
UsedSpace: cs[0],
UsedInodes: cs[1],
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2904,7 +2904,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh
if err != nil {
return errno(err)
}
if m.fmt.TrashDays == 0 {
if m.getFormat().TrashDays == 0 {
return 0
}

Expand Down Expand Up @@ -3573,7 +3573,7 @@ func (m *dbMeta) DumpMeta(w io.Writer, root Ino, keepSecret bool) (err error) {
}

dm := DumpedMeta{
Setting: *m.fmt,
Setting: *m.getFormat(),
Counters: counters,
Sustained: sessions,
DelFiles: dels,
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2524,7 +2524,7 @@ func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh
}
}
}
if m.fmt.TrashDays == 0 {
if m.getFormat().TrashDays == 0 {
return 0
}

Expand Down Expand Up @@ -3113,7 +3113,7 @@ func (m *kvMeta) DumpMeta(w io.Writer, root Ino, keepSecret bool) (err error) {
}

dm := DumpedMeta{
Setting: *m.fmt,
Setting: *m.getFormat(),
Counters: &DumpedCounters{
UsedSpace: cs[0],
UsedInodes: cs[1],
Expand Down
5 changes: 3 additions & 2 deletions pkg/meta/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ func (m *baseMeta) GetSummary(ctx Context, inode Ino, summary *Summary, recursiv
func (m *baseMeta) getDirSummary(ctx Context, inode Ino, summary *Summary, recursive bool, strict bool, concurrent chan struct{}, updateProgress func(count uint64, bytes uint64)) syscall.Errno {
var entries []*Entry
var err syscall.Errno
if strict || !m.GetFormat().DirStats {
format := m.getFormat()
if strict || !format.DirStats {
err = m.en.doReaddir(ctx, inode, 1, &entries, -1)
} else {
var st *dirStat
Expand Down Expand Up @@ -410,7 +411,7 @@ func (m *baseMeta) getDirSummary(ctx Context, inode Ino, summary *Summary, recur
} else {
atomic.AddUint64(&summary.Files, 1)
}
if strict || !m.fmt.DirStats {
if strict || !format.DirStats {
atomic.AddUint64(&summary.Size, uint64(align4K(e.Attr.Length)))
if e.Attr.Typ == TypeFile {
atomic.AddUint64(&summary.Length, e.Attr.Length)
Expand Down