Skip to content

Commit

Permalink
Merge pull request #2336 from onflow/fxamacker/drop-file-cache-using-sys
Browse files Browse the repository at this point in the history
Advise Linux to drop files from page cache (it was holding 425 GB of which 198+ GB were 3 checkpoint files)
  • Loading branch information
fxamacker authored Apr 26, 2022
2 parents fdcb6b3 + d303663 commit b341963
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 53 deletions.
9 changes: 9 additions & 0 deletions cmd/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ func main() {
pendingBlocks = buffer.NewPendingBlocks() // for following main chain consensus
return nil
}).
Module("Linux page cache adviser", func(node *cmd.NodeConfig) error {
logger := node.Logger.With().Str("subcomponent", "checkpointer").Logger()
_, err := wal.EvictAllCheckpointsFromLinuxPageCache(triedir, &logger)
if err != nil {
logger.Warn().Msgf("failed to evict checkpoint files from Linux page cache: %s", err)
}
// Don't return error because we only advise Linux to evict files.
return nil
}).
Component("GCP block data uploader", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if enableBlockDataUpload && gcpBucketName != "" {
logger := node.Logger.With().Str("component_name", "gcp_block_data_uploader").Logger()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ require (
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
golang.org/x/sys v0.0.0-20220209214540-3681064d5158
golang.org/x/text v0.3.7
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/api v0.63.0
Expand Down
108 changes: 66 additions & 42 deletions ledger/complete/wal/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"fmt"
"io"
"os"
"os/exec"
"path"
"runtime"
"path/filepath"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -422,9 +421,13 @@ func LoadCheckpoint(filepath string, logger *zerolog.Logger) ([]*trie.MTrie, err
return nil, fmt.Errorf("cannot open checkpoint file %s: %w", filepath, err)
}
defer func() {
_ = file.Close()
evictErr := evictFileFromLinuxPageCache(file, false, logger)
if evictErr != nil {
logger.Warn().Msgf("failed to evict file %s from Linux page cache: %s", filepath, evictErr)
// No need to return this error because it's possible to continue normal operations.
}

_ = requestDropFromOSFileCache(filepath, logger)
_ = file.Close()
}()

return readCheckpoint(file)
Expand Down Expand Up @@ -773,50 +776,71 @@ func readCheckpointV5(f *os.File) ([]*trie.MTrie, error) {
return tries, nil
}

// requestDropFromOSFileCache requests the specified file be dropped from OS file cache.
// The use case is when a new checkpoint is loaded or created, OS file cache can hold the entire
// checkpoint file in memory until requestDropFromOSFileCache() causes it to be dropped from
// the file cache. Not calling requestDropFromOSFileCache() causes two checkpoint files
// to be cached by the OS file cache for each checkpointing, eventually caching hundreds of GB.
// CAUTION: Returns nil without doing anything if GOOS != linux.
func requestDropFromOSFileCache(fileName string, logger *zerolog.Logger) error {
if runtime.GOOS != "linux" {
return nil
}

// Try using /bin/dd (Debian, Ubuntu, etc.)
cmdFileName := "/bin/dd"

// If /bin/dd isn't found, then try /usr/bin/dd (OpenSUSE Leap, etc.)
_, err := os.Stat(cmdFileName)
if os.IsNotExist(err) {
cmdFileName = "/usr/bin/dd"
_, err := os.Stat(cmdFileName)
if os.IsNotExist(err) {
return fmt.Errorf("required program dd not found in /bin/ and /usr/bin/")
// EvictAllCheckpointsFromLinuxPageCache advises Linux to evict all checkpoint files
// in dir from Linux page cache. It returns list of files that Linux was
// successfully advised to evict and first error encountered (if any).
// Even after error advising eviction, it continues to advise eviction of remaining files.
func EvictAllCheckpointsFromLinuxPageCache(dir string, logger *zerolog.Logger) ([]string, error) {
var err error
matches, err := filepath.Glob(filepath.Join(dir, checkpointFilenamePrefix+"*"))
if err != nil {
return nil, fmt.Errorf("failed to enumerate checkpoints: %w", err)
}
evictedFileNames := make([]string, 0, len(matches))
for _, fn := range matches {
base := filepath.Base(fn)
if !strings.HasPrefix(base, checkpointFilenamePrefix) {
continue
}
justNumber := base[len(checkpointFilenamePrefix):]
_, err := strconv.Atoi(justNumber)
if err != nil {
continue
}
evictErr := evictFileFromLinuxPageCacheByName(fn, false, logger)
if evictErr != nil {
if err == nil {
err = evictErr // Save first evict error encountered
}
logger.Warn().Msgf("failed to evict file %s from Linux page cache: %s", fn, err)
continue
}
evictedFileNames = append(evictedFileNames, fn)
}
// return the first error encountered
return evictedFileNames, err
}

// Remove some special chars from fileName just in case.
// Regex would be shorter but not as easy to read.
s := strings.ReplaceAll(fileName, " ", "")
s = strings.ReplaceAll(s, ";", "")
s = strings.ReplaceAll(s, "$", "")
s = strings.ReplaceAll(s, "|", "")
s = strings.ReplaceAll(s, ">", "")
s = strings.ReplaceAll(s, "<", "")
s = strings.ReplaceAll(s, "*", "")

_, err = os.Stat(s)
if os.IsNotExist(err) {
return fmt.Errorf("sanitized filename %s does not exist", s)
// evictFileFromLinuxPageCacheByName advises Linux to evict the file from Linux page cache.
func evictFileFromLinuxPageCacheByName(fileName string, fsync bool, logger *zerolog.Logger) error {
f, err := os.Open(fileName)
if err != nil {
return err
}
defer f.Close()

cmd := exec.Command(cmdFileName, "if="+s, "iflag=nocache", "count=0")
return evictFileFromLinuxPageCache(f, fsync, logger)
}

if logger != nil {
logger.Info().Msgf("run %q to drop file from OS file cache", cmd.String())
// evictFileFromLinuxPageCache advises Linux to evict a file from Linux page cache.
// A use case is when a new checkpoint is loaded or created, Linux may cache big
// checkpoint files in memory until evictFileFromLinuxPageCache causes them to be
// evicted from the Linux page cache. Not calling eviceFileFromLinuxPageCache()
// causes two checkpoint files to be cached for each checkpointing, eventually
// caching hundreds of GB.
// CAUTION: no-op when GOOS != linux.
func evictFileFromLinuxPageCache(f *os.File, fsync bool, logger *zerolog.Logger) error {
err := fadviseNoLinuxPageCache(f.Fd(), fsync)
if err != nil {
return err
}

return cmd.Run()
fstat, err := f.Stat()
if err == nil {
fsize := fstat.Size()
logger.Info().Msgf("advised Linux to evict file %s (%d MiB) from page cache", f.Name(), fsize/1024/1024)
} else {
logger.Info().Msgf("advised Linux to evict file %s from page cache", f.Name())
}
return nil
}
16 changes: 14 additions & 2 deletions ledger/complete/wal/checkpointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
realWAL "github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/unittest"
Expand Down Expand Up @@ -336,6 +337,14 @@ func Test_Checkpointing(t *testing.T) {
}
})

t.Run("advise to evict checkpoints from page cache", func(t *testing.T) {
logger := zerolog.Nop()
evictedFileNames, err := wal.EvictAllCheckpointsFromLinuxPageCache(dir, &logger)
require.NoError(t, err)
require.Equal(t, 1, len(evictedFileNames))
require.Equal(t, path.Join(dir, "checkpoint.00000010"), evictedFileNames[0])
})

t.Run("corrupted checkpoints are skipped", func(t *testing.T) {

f6, err := mtrie.NewForest(size*10, metricsCollector, nil)
Expand Down Expand Up @@ -418,6 +427,7 @@ func Test_Checkpointing(t *testing.T) {
}

})

})
}

Expand Down Expand Up @@ -534,7 +544,8 @@ func Test_StoringLoadingCheckpoints(t *testing.T) {
file.Close()

t.Run("works without data modification", func(t *testing.T) {
tries, err := realWAL.LoadCheckpoint(filepath, nil)
logger := zerolog.Nop()
tries, err := realWAL.LoadCheckpoint(filepath, &logger)
require.NoError(t, err)
require.Equal(t, 1, len(tries))
require.Equal(t, updatedTrie, tries[0])
Expand All @@ -551,7 +562,8 @@ func Test_StoringLoadingCheckpoints(t *testing.T) {
err = os.WriteFile(filepath, b, 0644)
require.NoError(t, err)

tries, err := realWAL.LoadCheckpoint(filepath, nil)
logger := zerolog.Nop()
tries, err := realWAL.LoadCheckpoint(filepath, &logger)
require.Error(t, err)
require.Nil(t, tries)
require.Contains(t, err.Error(), "checksum")
Expand Down
10 changes: 7 additions & 3 deletions ledger/complete/wal/checkpointer_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/hex"
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/ledger"
Expand All @@ -18,7 +19,8 @@ func TestLoadCheckpointV1(t *testing.T) {
mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"),
}

tries, err := LoadCheckpoint("test_data/checkpoint.v1", nil)
logger := zerolog.Nop()
tries, err := LoadCheckpoint("test_data/checkpoint.v1", &logger)
require.NoError(t, err)
require.Equal(t, len(expectedRootHash), len(tries))

Expand All @@ -37,7 +39,8 @@ func TestLoadCheckpointV3(t *testing.T) {
mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"),
}

tries, err := LoadCheckpoint("test_data/checkpoint.v3", nil)
logger := zerolog.Nop()
tries, err := LoadCheckpoint("test_data/checkpoint.v3", &logger)
require.NoError(t, err)
require.Equal(t, len(expectedRootHash), len(tries))

Expand All @@ -56,7 +59,8 @@ func TestLoadCheckpointV4(t *testing.T) {
mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"),
}

tries, err := LoadCheckpoint("test_data/checkpoint.v4", nil)
logger := zerolog.Nop()
tries, err := LoadCheckpoint("test_data/checkpoint.v4", &logger)
require.NoError(t, err)
require.Equal(t, len(expectedRootHash), len(tries))

Expand Down
25 changes: 25 additions & 0 deletions ledger/complete/wal/fadvise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//go:build !linux
// +build !linux

package wal

// fadviseNoLinuxPageCache does nothing if GOOS != "linux".
// Otherwise, fadviseNoLinuxPageCache advises Linux to evict
// a file from Linux page cache. This calls unix.Fadvise which
// in turn calls posix_fadvise with POSIX_FADV_DONTNEED.
// CAUTION: If fsync=true, this will call unix.Fsync which
// can cause performance hit especially when used inside a loop.
func fadviseNoLinuxPageCache(fd uintptr, fsync bool) error {
return nil
}

// fadviseNoLinuxPageCache does nothing if GOOS != "linux".
// Otherwise, fadviseSegmentNoLinuxPageCache advises Linux to evict
// file segment from Linux page cache. This calls unix.Fadvise
// which in turn calls posix_fadvise with POSIX_FADV_DONTNEED.
// If GOOS != "linux" then this function does nothing.
// CAUTION: If fsync=true, this will call unix.Fsync which
// can cause performance hit especially when used inside a loop.
func fadviseSegmentNoLinuxPageCache(fd uintptr, off, len int64, fsync bool) error {
return nil
}
41 changes: 41 additions & 0 deletions ledger/complete/wal/fadvise_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//go:build linux
// +build linux

package wal

import (
"golang.org/x/sys/unix"
)

// fadviseNoLinuxPageCache advises Linux to evict
// a file from Linux page cache. This calls unix.Fadvise which
// in turn calls posix_fadvise with POSIX_FADV_DONTNEED.
// If GOOS != "linux" then this function does nothing.
// CAUTION: If fsync=true, this will call unix.Fsync which
// can cause performance hit especially when used inside a loop.
func fadviseNoLinuxPageCache(fd uintptr, fsync bool) error {
return fadviseSegmentNoLinuxPageCache(fd, 0, 0, fsync)
}

// fadviseSegmentNoLinuxPageCache advises Linux to evict the
// file segment from Linux page cache. This calls unix.Fadvise
// which in turn calls posix_fadvise with POSIX_FADV_DONTNEED.
// If GOOS != "linux" then this function does nothing.
// CAUTION: If fsync=true, this will call unix.Fsync which
// can cause performance hit especially when used inside a loop.
func fadviseSegmentNoLinuxPageCache(fd uintptr, off, len int64, fsync bool) error {
// Optionally call fsync because dirty pages won't be evicted.
if fsync {
_ = unix.Fsync(int(fd)) // ignore error because this is optional
}

// Fadvise under the hood calls posix_fadvise.
// posix_fadvise doc from man page says:
// "posix_fadvise - predeclare an access pattern for file data"
// "The advice applies to a (not necessarily existent) region
// starting at offset and extending for len bytes (or until
// the end of the file if len is 0) within the file referred
// to by fd. The advice is not binding; it merely constitutes
// an expectation on behalf of the application."
return unix.Fadvise(int(fd), off, len, unix.FADV_DONTNEED)
}
12 changes: 7 additions & 5 deletions ledger/complete/wal/syncrename.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func (s *SyncOnCloseRenameFile) Close() error {
return fmt.Errorf("cannot sync file %s: %w", s.file.Name(), err)
}

// s.file.Sync() was already called, so we pass fsync=false
err = evictFileFromLinuxPageCache(s.file, false, s.logger)
if err != nil {
s.logger.Warn().Msgf("failed to evict file %s from Linux page cache: %s", s.targetName, err)
// No need to return this error because we're only "advising" Linux to evict a file from cache.
}

err = s.file.Close()
if err != nil {
return fmt.Errorf("error while closing file %s: %w", s.file.Name(), err)
Expand All @@ -63,11 +70,6 @@ func (s *SyncOnCloseRenameFile) Close() error {
return fmt.Errorf("error while renaming from %s to %s: %w", s.file.Name(), s.targetName, err)
}

err = requestDropFromOSFileCache(s.targetName, s.logger)
if err != nil {
return fmt.Errorf("error while requesting drop of %s from OS file cache : %w", s.targetName, err)
}

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions ledger/complete/wal/syncrename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path"
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/utils/unittest"
Expand All @@ -28,10 +29,12 @@ func Test_RenameHappensAfterClosing(t *testing.T) {

writer := bufio.NewWriter(file)

logger := zerolog.Nop()
syncer := &SyncOnCloseRenameFile{
file: file,
targetName: fullFileName,
Writer: writer,
logger: &logger,
}

sampleBytes := []byte{2, 1, 3, 7}
Expand Down

0 comments on commit b341963

Please sign in to comment.