Skip to content

Commit

Permalink
dump: reduce memory when dump large directories from redis (#4481)
Browse files Browse the repository at this point in the history
Co-authored-by: Sandy Xu <[email protected]>
  • Loading branch information
davies and SandyXSD authored Mar 12, 2024
1 parent fe954c7 commit fa26e0a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/meta/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type DumpedEntry struct {
Xattrs []*DumpedXattr `json:"xattrs,omitempty"`
Chunks []*DumpedChunk `json:"chunks,omitempty"`
Entries map[string]*DumpedEntry `json:"entries,omitempty"`
keys []string
}

var CHARS = []byte("0123456789ABCDEF")
Expand Down
45 changes: 28 additions & 17 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3658,7 +3658,7 @@ func (m *redisMeta) dumpEntries(es ...*DumpedEntry) error {
var xr = make([]*redis.MapStringStringCmd, len(es))
var sr = make([]*redis.StringCmd, len(es))
var cr = make([]*redis.StringSliceCmd, len(es))
var dr = make([]*redis.MapStringStringCmd, len(es))
var dr = make([]*redis.ScanCmd, len(es))
for i, e := range es {
inode := e.Attr.Inode
ar[i] = p.Get(ctx, m.inodeKey(inode))
Expand All @@ -3667,7 +3667,7 @@ func (m *redisMeta) dumpEntries(es ...*DumpedEntry) error {
case "regular":
cr[i] = p.LRange(ctx, m.chunkKey(inode, 0), 0, -1)
case "directory":
dr[i] = p.HGetAll(ctx, m.entryKey(inode))
dr[i] = p.HScan(ctx, m.entryKey(inode), 0, "*", 1000)
case "symlink":
sr[i] = p.Get(ctx, m.symKey(inode))
}
Expand Down Expand Up @@ -3743,17 +3743,12 @@ func (m *redisMeta) dumpEntries(es ...*DumpedEntry) error {
}
}
case TypeDirectory:
dirs, err := dr[i].Result()
keys, cursor, err := dr[i].Result()
if err != nil {
return err
}
e.Entries = make(map[string]*DumpedEntry)
for name := range dirs {
t, inode := m.parseEntry([]byte(dirs[name]))
ce := entryPool.Get().(*DumpedEntry)
ce.Attr.Inode = inode
ce.Attr.Type = typeToString(t)
e.Entries[name] = ce
if cursor == 0 {
e.keys = keys
}
case TypeSymlink:
if e.Symlink, err = sr[i].Result(); err != nil {
Expand Down Expand Up @@ -3807,12 +3802,28 @@ func (m *redisMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, dept
panic(err)
}
}
var err error
entries := make([]*DumpedEntry, 0, len(tree.Entries))
for name, e := range tree.Entries {

if tree.keys == nil {
err := m.hscan(Background, m.entryKey(inode), func(keys []string) error {
tree.keys = append(tree.keys, keys...)
return nil
})
if err != nil {
return err
}
}
entries := make([]*DumpedEntry, len(tree.keys)/2)
for i := range entries {
name := tree.keys[i*2]
t, inode := m.parseEntry([]byte(tree.keys[i*2+1]))
e := entryPool.Get().(*DumpedEntry)
e.Name = name
entries = append(entries, e)
e.Attr.Inode = inode
e.Attr.Type = typeToString(t)
entries[i] = e
}

var err error
if err = tree.writeJsonWithOutEntry(bw, depth); err != nil {
return err
}
Expand Down Expand Up @@ -3865,16 +3876,16 @@ func (m *redisMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, dept
return err
}
if e.Attr.Type == "directory" {
err = m.dumpDir(inode, e, bw, depth+2, bar)
err = m.dumpDir(e.Attr.Inode, e, bw, depth+2, bar)
} else {
err = e.writeJSON(bw, depth+2)
}
entries[i] = nil
delete(tree.Entries, e.Name)
e.Name = ""
e.Xattrs = nil
e.Chunks = nil
e.Entries = nil
e.Symlink = ""
e.keys = nil
entryPool.Put(e)
if err != nil {
return err
Expand Down

0 comments on commit fa26e0a

Please sign in to comment.