Skip to content

Commit

Permalink
fix: stalling on large file sets (#18)
Browse files Browse the repository at this point in the history
When running against nixpkgs we were stalling. This was due to a long running read tx which was preventing any writes.

This breaks up reading the cache when walking the filesystem into many smaller read txs.

On my laptop I'm now getting the following with the echo sample:

```console
# fresh cache

❯ nix run .# -- -c --config-file ./test/echo.toml --tree-root ../../../github.com/nixos/nixpkgs
38825 files changed in 320.655826ms

# hot cache

❯ nix run .# -- --config-file ./test/echo.toml --tree-root ../../../github.com/nixos/nixpkgs
0 files changed in 252.920853ms%
```

Signed-off-by: Brian McGee <[email protected]>

Reviewed-on: https://git.numtide.com/numtide/treefmt/pulls/18
Reviewed-by: Jonas Chevalier <[email protected]>
Co-authored-by: Brian McGee <[email protected]>
Co-committed-by: Brian McGee <[email protected]>
  • Loading branch information
brianmcgee authored and Brian McGee committed Jan 7, 2024
1 parent a3ca782 commit 55ca446
Showing 1 changed file with 51 additions and 25 deletions.
76 changes: 51 additions & 25 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
const (
pathsBucket = "paths"
formattersBucket = "formatters"

readBatchSize = 1024
)

// Entry represents a cache entry, indicating the last size and modified time for a file path.
Expand Down Expand Up @@ -171,40 +173,64 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
// It determines if a path is new or has changed by comparing against cache entries.
func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
return db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(pathsBucket))
var tx *bolt.Tx
var bucket *bolt.Bucket
var processed int

defer func() {
// close any pending read tx
if tx != nil {
_ = tx.Rollback()
}
}()

return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if ctx.Err() != nil {
return ctx.Err()
} else if info.IsDir() {
// todo what about symlinks?
return nil
}
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if ctx.Err() != nil {
return ctx.Err()
} else if info.IsDir() {
// todo what about symlinks?
return nil
}

if info.Mode()&os.ModeSymlink == os.ModeSymlink {
// skip symlinks
return nil
}
// ignore symlinks
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
return nil
}

cached, err := getEntry(bucket, path)
// open a new read tx if there isn't one in progress
// we have to periodically open a new read tx to prevent writes from being blocked
if tx == nil {
tx, err = db.Begin(false)
if err != nil {
return err
return fmt.Errorf("%w: failed to open a new read tx", err)
}
bucket = tx.Bucket([]byte(pathsBucket))
}

changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
cached, err := getEntry(bucket, path)
if err != nil {
return err
}

if !changedOrNew {
// no change
return nil
}
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())

// pass on the path
pathsCh <- path
if !changedOrNew {
// no change
return nil
})
}

// pass on the path
pathsCh <- path

// close the current tx if we have reached the batch size
processed += 1
if processed == readBatchSize {
return tx.Rollback()
}

return nil
})
}

Expand Down

0 comments on commit 55ca446

Please sign in to comment.