Skip to content

Commit

Permalink
support parallel check
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro committed Sep 22, 2022
1 parent 5c3e468 commit 5d50d06
Showing 1 changed file with 93 additions and 34 deletions.
127 changes: 93 additions & 34 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,29 +1131,19 @@ func (m *baseMeta) Check(ctx Context, fpath string, repair bool, recursive bool)
}
}

if st = m.walk(ctx, inode, fpath, &attr, func(ctx Context, inode1 Ino, path string, attr *Attr, st1 syscall.Errno) syscall.Errno {
if st1 != 0 {
logger.Errorf("Walk path %s inode %d: %s", path, inode1, st1)
return st1
}
if attr.Typ != TypeDirectory {
logger.Debugf("Path %s (inode %d) is a file, skip the nlink check", path, inode1)
return 0
}
if st1 := m.GetAttr(ctx, inode1, attr); st1 != 0 {
logger.Errorf("GetAttr inode %d: %s", inode1, st1)
return st1
if !recursive {
var attr Attr
if st = m.GetAttr(ctx, inode, &attr); st != 0 {
logger.Errorf("GetAttr inode %d: %s", inode, st)
}
nlink, st1 := m.countDirNlink(ctx, inode1)
if st1 != 0 {
logger.Errorf("Count nlink for inode %d: %s", inode1, st1)
return st1
var nlink uint32
nlink, st = m.countDirNlink(ctx, inode)
if st != 0 {
logger.Errorf("Count nlink for inode %d: %s", inode, st)
}
if attr.Full && attr.Nlink == nlink {
if !recursive && inode1 == inode {
return skipDir
}
return 0
logger.Infof("Path %s inode %d is valid", fpath, inode)
return
}
if !attr.Full {
now := time.Now().Unix()
Expand All @@ -1168,26 +1158,95 @@ func (m *baseMeta) Check(ctx Context, fpath string, repair bool, recursive bool)
}
attr.Nlink = nlink
if repair {
if st1 = m.en.doRepair(ctx, inode1, attr); st1 == 0 {
logger.Infof("Path %s (inode %d) is successfully repaired", path, inode1)
if !recursive && inode1 == inode {
return skipDir
}
if st = m.en.doRepair(ctx, inode, &attr); st == 0 {
logger.Infof("Path %s (inode %d) is successfully repaired", fpath, inode)
} else {
logger.Errorf("Repair path %s inode %d: %s", path, inode1, st1)
logger.Errorf("Repair path %s inode %d: %s", fpath, inode, st)
}
return st1
}
logger.Warnf("Path %s (inode %d) can be repaired, please re-check with 'repair' enabled", path, inode1)
if !recursive && inode1 == inode {
return skipDir
} else {
logger.Warnf("Path %s (inode %d) can be repaired, please re-run with '--path %s --repair' to fix it", fpath, inode, fpath)
}
return 0
}); st != 0 {
logger.Infof("Path %s inode %d is valid", fpath, inode)
return
}

type ent struct {
inode Ino
path string
}
pipeCh := make(chan ent)
stCh := make(chan syscall.Errno, 1)
go func() {
defer close(pipeCh)
stCh <- m.walk(ctx, inode, fpath, &attr, func(ctx Context, inode1 Ino, path string, attr *Attr, st1 syscall.Errno) syscall.Errno {
if st1 != 0 {
logger.Errorf("Walk path %s inode %d: %s", path, inode1, st1)
return st1
}
if attr.Typ != TypeDirectory {
logger.Debugf("Path %s (inode %d) is a file, skip the nlink check", path, inode1)
return 0
}
pipeCh <- ent{inode: inode1, path: path}
return 0
})
}()

var wg sync.WaitGroup
const threads = 20
wg.Add(threads)
for i := 0; i < threads; i++ {
go func() {
for e := range pipeCh {
var attr Attr
var st1 syscall.Errno
if st1 = m.GetAttr(ctx, e.inode, &attr); st1 != 0 {
logger.Errorf("GetAttr inode %d: %s", e.inode, st1)
continue
}
var nlink uint32
nlink, st1 = m.countDirNlink(ctx, e.inode)
if st1 != 0 {
logger.Errorf("Count nlink for inode %d: %s", e.inode, st1)
continue
}
if attr.Full && attr.Nlink == nlink {
continue
}
if !attr.Full {
now := time.Now().Unix()
attr.Mode = 0644
attr.Uid = ctx.Uid()
attr.Gid = ctx.Gid()
attr.Atime = now
attr.Mtime = now
attr.Ctime = now
attr.Length = 4 << 10
attr.Parent = parent
}
attr.Nlink = nlink
if repair {
if st1 = m.en.doRepair(ctx, e.inode, &attr); st1 == 0 {
logger.Infof("Path %s (inode %d) is successfully repaired", e.path, e.inode)
} else {
logger.Errorf("Repair path %s inode %d: %s", e.path, e.inode, st1)
}
continue
}
logger.Warnf("Path %s (inode %d) can be repaired, please re-run with '--path %s --repair' to fix it", e.path, e.inode, e.path)
}
wg.Done()
}()
}

finish := make(chan struct{})
go func() {
wg.Wait()
finish <- struct{}{}
}()
<-finish
logger.Infof("Path %s inode %d is valid", fpath, inode)
return
return <-stCh
}

func (m *baseMeta) fileDeleted(opened bool, inode Ino, length uint64) {
Expand Down

0 comments on commit 5d50d06

Please sign in to comment.