Skip to content

Commit

Permalink
fix: record cache entries for files that don't match formatters
Browse files Browse the repository at this point in the history
Signed-off-by: Brian McGee <[email protected]>
  • Loading branch information
brianmcgee committed May 2, 2024
1 parent 618f6f7 commit 5a5c1ea
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 171 deletions.
56 changes: 19 additions & 37 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"crypto/sha1"
"encoding/hex"
"fmt"
"io/fs"
"os"
"path/filepath"
"runtime"
"time"

Expand Down Expand Up @@ -180,7 +178,7 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {

// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
// It determines if a path is new or has changed by comparing against cache entries.
func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) error {
func ChangeSet(ctx context.Context, walker walk.Walker, filesCh chan<- *walk.File) error {
start := time.Now()

defer func() {
Expand All @@ -198,24 +196,21 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
}
}()

// for quick removal of tree root from paths
relPathOffset := len(walker.Root()) + 1

return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error {
return walker.Walk(ctx, func(file *walk.File, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if info.IsDir() {
} else if file.Info.IsDir() {
// ignore directories
return nil
}
}

// ignore symlinks
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
if file.Info.Mode()&os.ModeSymlink == os.ModeSymlink {
return nil
}

Expand All @@ -229,13 +224,12 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
bucket = tx.Bucket([]byte(pathsBucket))
}

relPath := path[relPathOffset:]
cached, err := getEntry(bucket, relPath)
cached, err := getEntry(bucket, file.RelPath)
if err != nil {
return err
}

changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
changedOrNew := cached == nil || !(cached.Modified == file.Info.ModTime() && cached.Size == file.Info.Size())

stats.Add(stats.Traversed, 1)
if !changedOrNew {
Expand All @@ -250,7 +244,7 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
case <-ctx.Done():
return ctx.Err()
default:
pathsCh <- relPath
filesCh <- file
}

// close the current tx if we have reached the batch size
Expand All @@ -266,47 +260,35 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
}

// Update is used to record updated cache information for the specified list of paths.
func Update(treeRoot string, paths []string) (int, error) {
func Update(files []*walk.File) error {
start := time.Now()
defer func() {
logger.Infof("finished updating %v paths in %v", len(paths), time.Since(start))
logger.Infof("finished processing %v paths in %v", len(files), time.Since(start))
}()

if len(paths) == 0 {
return 0, nil
if len(files) == 0 {
return nil
}

var changes int

return changes, db.Update(func(tx *bolt.Tx) error {
return db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(pathsBucket))

for _, path := range paths {
cached, err := getEntry(bucket, path)
if err != nil {
return err
}

pathInfo, err := os.Stat(filepath.Join(treeRoot, path))
for _, f := range files {
currentInfo, err := os.Stat(f.Path)
if err != nil {
return err
}

if cached == nil || !(cached.Modified == pathInfo.ModTime() && cached.Size == pathInfo.Size()) {
changes += 1
} else {
// no change to write
continue
if !(f.Info.ModTime() == currentInfo.ModTime() && f.Info.Size() == currentInfo.Size()) {
stats.Add(stats.Formatted, 1)
}

stats.Add(stats.Formatted, 1)

entry := Entry{
Size: pathInfo.Size(),
Modified: pathInfo.ModTime(),
Size: currentInfo.Size(),
Modified: currentInfo.ModTime(),
}

if err = putEntry(bucket, path, &entry); err != nil {
if err = putEntry(bucket, f.RelPath, &entry); err != nil {
return err
}
}
Expand Down
59 changes: 26 additions & 33 deletions cli/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"os/signal"
"path/filepath"
Expand Down Expand Up @@ -35,8 +34,8 @@ var (
globalExcludes []glob.Glob
formatters map[string]*format.Formatter
pipelines map[string]*format.Pipeline
pathsCh chan string
processedCh chan string
filesCh chan *walk.File
processedCh chan *walk.File

ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled")
)
Expand Down Expand Up @@ -142,10 +141,10 @@ func (f *Format) Run() (err error) {

// create a channel for paths to be processed
// we use a multiple of batch size here to allow for greater concurrency
pathsCh = make(chan string, BatchSize*runtime.NumCPU())
filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU())

// create a channel for tracking paths that have been processed
processedCh = make(chan string, cap(pathsCh))
processedCh = make(chan *walk.File, cap(filesCh))

// start concurrent processing tasks
eg.Go(updateCache(ctx))
Expand Down Expand Up @@ -185,26 +184,26 @@ func walkFilesystem(ctx context.Context) func() error {
return fmt.Errorf("failed to create walker: %w", err)
}

defer close(pathsCh)
defer close(filesCh)

if Cli.NoCache {
return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error {
return walker.Walk(ctx, func(file *walk.File, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// ignore symlinks and directories
if !(info.IsDir() || info.Mode()&os.ModeSymlink == os.ModeSymlink) {
if !(file.Info.IsDir() || file.Info.Mode()&os.ModeSymlink == os.ModeSymlink) {
stats.Add(stats.Traversed, 1)
stats.Add(stats.Emitted, 1)
pathsCh <- path
filesCh <- file
}
return nil
}
})
}

if err = cache.ChangeSet(ctx, walker, pathsCh); err != nil {
if err = cache.ChangeSet(ctx, walker, filesCh); err != nil {
return fmt.Errorf("failed to generate change set: %w", err)
}
return nil
Expand All @@ -213,19 +212,11 @@ func walkFilesystem(ctx context.Context) func() error {

func updateCache(ctx context.Context) func() error {
return func() error {
batch := make([]string, 0, BatchSize)

var changes int
batch := make([]*walk.File, 0, BatchSize)

processBatch := func() error {
if Cli.NoCache {
changes += len(batch)
} else {
count, err := cache.Update(Cli.TreeRoot, batch)
if err != nil {
return err
}
changes += count
if err := cache.Update(batch); err != nil {
return err
}
batch = batch[:0]
return nil
Expand Down Expand Up @@ -254,7 +245,7 @@ func updateCache(ctx context.Context) func() error {
return err
}

if Cli.FailOnChange && changes != 0 {
if Cli.FailOnChange && stats.Value(stats.Formatted) != 0 {
return ErrFailOnChange
}

Expand All @@ -265,28 +256,28 @@ func updateCache(ctx context.Context) func() error {

func applyFormatters(ctx context.Context) func() error {
fg, ctx := errgroup.WithContext(ctx)
batches := make(map[string][]string)
batches := make(map[string][]*walk.File)

tryApply := func(key string, path string) {
tryApply := func(key string, file *walk.File) {
batch, ok := batches[key]
if !ok {
batch = make([]string, 0, BatchSize)
batch = make([]*walk.File, 0, BatchSize)
}
batch = append(batch, path)
batch = append(batch, file)
batches[key] = batch

if len(batch) == BatchSize {
pipeline := pipelines[key]

// copy the batch
paths := make([]string, len(batch))
copy(paths, batch)
files := make([]*walk.File, len(batch))
copy(files, batch)

fg.Go(func() error {
if err := pipeline.Apply(ctx, paths); err != nil {
if err := pipeline.Apply(ctx, files); err != nil {
return err
}
for _, path := range paths {
for _, path := range files {
processedCh <- path
}
return nil
Expand Down Expand Up @@ -322,17 +313,19 @@ func applyFormatters(ctx context.Context) func() error {
close(processedCh)
}()

for path := range pathsCh {
for file := range filesCh {
var matched bool
for key, pipeline := range pipelines {
if !pipeline.Wants(path) {
if !pipeline.Wants(file) {
continue
}
matched = true
tryApply(key, path)
tryApply(key, file)
}
if matched {
stats.Add(stats.Matched, 1)
} else {
processedCh <- file
}
}

Expand Down
Loading

0 comments on commit 5a5c1ea

Please sign in to comment.