Skip to content

Commit

Permalink
Meta.Open support override default expire time of open cache
Browse files Browse the repository at this point in the history
Signed-off-by: xixi <[email protected]>
  • Loading branch information
Hexilee committed Jan 16, 2023
1 parent 395bcc0 commit 02384d2
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 45 deletions.
35 changes: 20 additions & 15 deletions cmd/warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ $ juicefs warmup -f /tmp/filelist`,
Aliases: []string{"b"},
Usage: "run in background",
},
&cli.BoolFlag{
Name: "metadata",
Aliases: []string{"m"},
Usage: "also warm up metadata",
&cli.StringFlag{
Name: "opencache",
Aliases: []string{"o"},
Usage: "the duration to expire metadata openCache, can be human readable",
},
},
}
Expand Down Expand Up @@ -119,24 +119,21 @@ END:
}

// send fill-cache command to controller file
func sendCommand(cf *os.File, batch []string, threads uint, background, metadata bool, dspin *utils.DoubleSpinner) {
func sendCommand(cf *os.File, batch []string, threads uint, background bool, openCache time.Duration, dspin *utils.DoubleSpinner) {
paths := strings.Join(batch, "\n")
var back, withMeta uint8
var back uint8
if background {
back = 1
}
if metadata {
withMeta = 1
}

wb := utils.NewBuffer(8 + 4 + 4 + uint32(len(paths)))
wb := utils.NewBuffer(8 + 4 + 3 + 8 + uint32(len(paths)))
wb.Put32(meta.FillCache)
wb.Put32(4 + 4 + uint32(len(paths)))
wb.Put32(4 + 3 + 8 + uint32(len(paths)))
wb.Put32(uint32(len(paths)))
wb.Put([]byte(paths))
wb.Put16(uint16(threads))
wb.Put8(back)
wb.Put8(withMeta)
wb.Put64(uint64(openCache))
if _, err := cf.Write(wb.Bytes()); err != nil {
logger.Fatalf("Write message: %s", err)
}
Expand Down Expand Up @@ -211,7 +208,15 @@ func warmup(ctx *cli.Context) error {
threads = 1
}
background := ctx.Bool("background")
metadata := ctx.Bool("metadata")
openCache := time.Duration(0)
if rawOpenCache := ctx.String("opencache"); rawOpenCache != "" {
var err error
openCache, err = time.ParseDuration(rawOpenCache)
if err != nil || openCache <= 0 {
logger.Fatalf("invalid opencache flag: %s", rawOpenCache)
}
}

start := len(mp)
batch := make([]string, 0, batchMax)
progress := utils.NewProgress(background, true)
Expand All @@ -231,12 +236,12 @@ func warmup(ctx *cli.Context) error {
continue
}
if len(batch) >= batchMax {
sendCommand(controller, batch, threads, background, metadata, dspin)
sendCommand(controller, batch, threads, background, openCache, dspin)
batch = batch[0:]
}
}
if len(batch) > 0 {
sendCommand(controller, batch, threads, background, metadata, dspin)
sendCommand(controller, batch, threads, background, openCache, dspin)
}
progress.Done()
if !background {
Expand Down
2 changes: 1 addition & 1 deletion pkg/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (fs *FileSystem) Open(ctx meta.Context, path string, flags uint32) (f *File
} else if flags&vfs.MODE_MASK_W != 0 {
oflags = syscall.O_RDWR
}
err = fs.m.Open(ctx, fi.inode, oflags, fi.attr)
err = fs.m.Open(ctx, fi.inode, oflags, fi.attr, -1)
if err != 0 {
return
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func (m *baseMeta) Create(ctx Context, parent Ino, name string, mode uint16, cum
eno = 0
}
if eno == 0 && inode != nil {
m.of.Open(*inode, attr)
m.of.Open(*inode, attr, -1)
}
return eno
}
Expand Down Expand Up @@ -870,7 +870,7 @@ func (m *baseMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
return m.en.doRename(ctx, m.checkRoot(parentSrc), nameSrc, m.checkRoot(parentDst), nameDst, flags, inode, attr)
}

func (m *baseMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) syscall.Errno {
func (m *baseMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr, expire time.Duration) syscall.Errno {
if m.conf.ReadOnly && flags&(syscall.O_WRONLY|syscall.O_RDWR|syscall.O_TRUNC|syscall.O_APPEND) != 0 {
return syscall.EROFS
}
Expand All @@ -896,7 +896,7 @@ func (m *baseMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) syscal
}
}
if err == 0 {
m.of.Open(inode, attr)
m.of.Open(inode, attr, expire)
}
return err
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
* limitations under the License.
*/

// disable_mutate_test
//
//nolint:errcheck
//disable_mutate_test
package meta

import (
Expand Down Expand Up @@ -377,10 +378,10 @@ func testMetaClient(t *testing.T, m Meta) {
// data
var sliceId uint64
// try to open a file that does not exist
if st := m.Open(ctx, 99999, syscall.O_RDWR, &Attr{}); st != syscall.ENOENT {
if st := m.Open(ctx, 99999, syscall.O_RDWR, &Attr{}, -1); st != syscall.ENOENT {
t.Fatalf("open not exist inode got %d, expected %d", st, syscall.ENOENT)
}
if st := m.Open(ctx, inode, syscall.O_RDWR, attr); st != 0 {
if st := m.Open(ctx, inode, syscall.O_RDWR, attr, -1); st != 0 {
t.Fatalf("open f: %s", st)
}
_ = m.Close(ctx, inode)
Expand Down Expand Up @@ -1187,7 +1188,7 @@ func testCloseSession(t *testing.T, m Meta) {
if st := m.Setlk(ctx, inode, 1, false, syscall.F_WRLCK, 0x10000, 0x20000, 1); st != 0 {
t.Fatalf("plock wlock: %s", st)
}
if st := m.Open(ctx, inode, syscall.O_RDWR, attr); st != 0 {
if st := m.Open(ctx, inode, syscall.O_RDWR, attr, -1); st != 0 {
t.Fatalf("open f: %s", st)
}
if st := m.Unlink(ctx, 1, "f"); st != 0 {
Expand Down Expand Up @@ -1405,7 +1406,7 @@ func testOpenCache(t *testing.T, m Meta) {
t.Fatalf("create f: %s", st)
}
defer m.Unlink(ctx, 1, "f")
if st := m.Open(ctx, inode, syscall.O_RDWR, attr); st != 0 {
if st := m.Open(ctx, inode, syscall.O_RDWR, attr, -1); st != 0 {
t.Fatalf("open f: %s", st)
}
defer m.Close(ctx, inode)
Expand Down Expand Up @@ -1447,7 +1448,7 @@ func testReadOnly(t *testing.T, m Meta) {
if st := m.Create(ctx, 1, "f", 0644, 022, 0, &inode, attr); st != syscall.EROFS {
t.Fatalf("create f: %s", st)
}
if st := m.Open(ctx, inode, syscall.O_RDWR, attr); st != syscall.EROFS {
if st := m.Open(ctx, inode, syscall.O_RDWR, attr, -1); st != syscall.EROFS {
t.Fatalf("open f: %s", st)
}
}
Expand Down Expand Up @@ -1534,20 +1535,20 @@ func testAttrFlags(t *testing.T, m Meta) {
if st := m.SetAttr(ctx, inode, SetAttrFlag, 0, attr); st != 0 {
t.Fatalf("setattr f: %s", st)
}
if st := m.Open(ctx, inode, syscall.O_WRONLY, attr); st != syscall.EPERM {
if st := m.Open(ctx, inode, syscall.O_WRONLY, attr, -1); st != syscall.EPERM {
t.Fatalf("open f: %s", st)
}
if st := m.Open(ctx, inode, syscall.O_WRONLY|syscall.O_APPEND, attr); st != 0 {
if st := m.Open(ctx, inode, syscall.O_WRONLY|syscall.O_APPEND, attr, -1); st != 0 {
t.Fatalf("open f: %s", st)
}
attr.Flags = FlagAppend | FlagImmutable
if st := m.SetAttr(ctx, inode, SetAttrFlag, 0, attr); st != 0 {
t.Fatalf("setattr f: %s", st)
}
if st := m.Open(ctx, inode, syscall.O_WRONLY, attr); st != syscall.EPERM {
if st := m.Open(ctx, inode, syscall.O_WRONLY, attr, -1); st != syscall.EPERM {
t.Fatalf("open f: %s", st)
}
if st := m.Open(ctx, inode, syscall.O_WRONLY|syscall.O_APPEND, attr); st != syscall.EPERM {
if st := m.Open(ctx, inode, syscall.O_WRONLY|syscall.O_APPEND, attr, -1); st != syscall.EPERM {
t.Fatalf("open f: %s", st)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ type Meta interface {
Readdir(ctx Context, inode Ino, wantattr uint8, entries *[]*Entry) syscall.Errno
// Create creates a file in a directory with given name.
Create(ctx Context, parent Ino, name string, mode uint16, cumask uint16, flags uint32, inode *Ino, attr *Attr) syscall.Errno
// Open checks permission on a node and track it as open.
Open(ctx Context, inode Ino, flags uint32, attr *Attr) syscall.Errno
// Open checks permission on a node and track it as open, you can override the default expire time of open cache.
Open(ctx Context, inode Ino, flags uint32, attr *Attr, expire time.Duration) syscall.Errno
// Close a file.
Close(ctx Context, inode Ino) syscall.Errno
// Read returns the list of slices on the given chunk.
Expand Down
22 changes: 17 additions & 5 deletions pkg/meta/openfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type openFile struct {
attr Attr
refs int
lastCheck time.Time
expire time.Duration
chunks map[uint32][]Slice
}

Expand All @@ -36,10 +37,13 @@ func newOpenFiles(expire time.Duration) *openfiles {
func (o *openfiles) cleanup() {
for {
o.Lock()
cutoff := time.Now().Add(-time.Hour).Add(time.Second * time.Duration(len(o.files)/1e4))
var cnt, expired int
for ino, of := range o.files {
if of.refs <= 0 && of.lastCheck.Before(cutoff) {
expire := time.Hour - time.Second*time.Duration(len(o.files)/1e4)
if of.expire > 0 {
expire = of.expire
}
if of.refs <= 0 && time.Since(of.lastCheck) > expire {
delete(o.files, ino)
expired++
}
Expand All @@ -53,11 +57,18 @@ func (o *openfiles) cleanup() {
}
}

func (o *openfiles) getExpire(ofExpire time.Duration) time.Duration {
if ofExpire > 0 {
return ofExpire
}
return o.expire
}

func (o *openfiles) OpenCheck(ino Ino, attr *Attr) bool {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if ok && time.Since(of.lastCheck) < o.expire {
if ok && time.Since(of.lastCheck) < o.getExpire(of.expire) {
if attr != nil {
*attr = of.attr
}
Expand All @@ -67,7 +78,7 @@ func (o *openfiles) OpenCheck(ino Ino, attr *Attr) bool {
return false
}

func (o *openfiles) Open(ino Ino, attr *Attr) {
func (o *openfiles) Open(ino Ino, attr *Attr, expire time.Duration) {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
Expand All @@ -87,6 +98,7 @@ func (o *openfiles) Open(ino Ino, attr *Attr) {
of.attr.KeepCache = true
of.refs++
of.lastCheck = time.Now()
of.expire = expire
}

func (o *openfiles) Close(ino Ino) bool {
Expand All @@ -107,7 +119,7 @@ func (o *openfiles) Check(ino Ino, attr *Attr) bool {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if ok && time.Since(of.lastCheck) < o.expire {
if ok && time.Since(of.lastCheck) < o.getExpire(of.expire) {
*attr = of.attr
return true
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/vfs/fill.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type _file struct {
size uint64
}

func (v *VFS) fillCache(ctx meta.Context, paths []string, metadata bool, concurrent int, count, bytes *uint64) {
func (v *VFS) fillCache(ctx meta.Context, paths []string, concurrent int, openCache time.Duration, count, bytes *uint64) {
logger.Infof("start to warmup %d paths with %d workers", len(paths), concurrent)
start := time.Now()
todo := make(chan _file, 10240)
Expand All @@ -50,10 +50,11 @@ func (v *VFS) fillCache(ctx meta.Context, paths []string, metadata bool, concurr
if err := v.fillInode(ctx, f.ino, f.size, bytes); err != nil {
logger.Errorf("Inode %d could be corrupted: %s", f.ino, err)
}
if metadata {
if err := v.Meta.Open(ctx, f.ino, syscall.O_RDONLY, &meta.Attr{}); err != 0 {
if openCache > 0 {
if err := v.Meta.Open(ctx, f.ino, syscall.O_RDONLY, &meta.Attr{}, openCache); err != 0 {
logger.Errorf("Inode %d could be opened: %s", f.ino, err)
}
v.Meta.Close(ctx, f.ino)
}
if count != nil {
atomic.AddUint64(count, 1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/vfs/fill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestFill(t *testing.T) {
_, _ = v.Symlink(ctx, "testfile", 1, "sym3")

// normal cases
v.fillCache(meta.Background, []string{"/test/file", "/test", "/sym", "/"}, false, 2, nil, nil)
v.fillCache(meta.Background, []string{"/test/file", "/test", "/sym", "/"}, 2, -1, nil, nil)

// remove chunk
var slices []meta.Slice
Expand All @@ -45,5 +45,5 @@ func TestFill(t *testing.T) {
_ = v.Store.Remove(s.Id, int(s.Size))
}
// bad cases
v.fillCache(meta.Background, []string{"/test/file", "/sym2", "/sym3", "/.stats", "/not_exists"}, false, 2, nil, nil)
v.fillCache(meta.Background, []string{"/test/file", "/sym2", "/sym3", "/.stats", "/not_exists"}, 2, -1, nil, nil)
}
6 changes: 3 additions & 3 deletions pkg/vfs/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,17 +433,17 @@ func (v *VFS) handleInternalMsg(ctx meta.Context, cmd uint32, r *utils.Buffer, o
paths := strings.Split(string(r.Get(int(r.Get32()))), "\n")
concurrent := r.Get16()
background := r.Get8()
metadata := r.Get8()
openCache := r.Get64()
if background == 0 {
var count, bytes uint64
done := make(chan struct{})
go func() {
v.fillCache(ctx, paths, metadata > 0, int(concurrent), &count, &bytes)
v.fillCache(ctx, paths, int(concurrent), time.Duration(openCache), &count, &bytes)
close(done)
}()
writeProgress(&count, &bytes, out, done)
} else {
go v.fillCache(meta.NewContext(ctx.Pid(), ctx.Uid(), ctx.Gids()), paths, metadata > 0, int(concurrent), nil, nil)
go v.fillCache(meta.NewContext(ctx.Pid(), ctx.Uid(), ctx.Gids()), paths, int(concurrent), time.Duration(openCache), nil, nil)
}
_, _ = out.Write([]byte{0})
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/vfs/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (v *VFS) Open(ctx Context, ino Ino, flags uint32) (entry *meta.Entry, fh ui
return
}

err = v.Meta.Open(ctx, ino, flags, attr)
err = v.Meta.Open(ctx, ino, flags, attr, -1)
if err == 0 {
v.UpdateLength(ino, attr)
fh = v.newFileHandle(ino, attr.Length, flags)
Expand Down

0 comments on commit 02384d2

Please sign in to comment.