Skip to content

Commit

Permalink
Merge pull request 'feat: introduce concept of pipelines for better c…
Browse files Browse the repository at this point in the history
…oncurrency' (#30) from feat/concurrency-refactor into main

Reviewed-on: https://git.numtide.com/numtide/treefmt/pulls/30
  • Loading branch information
Brian McGee committed Apr 26, 2024
2 parents 8333c99 + 40b76b7 commit 5d341f9
Show file tree
Hide file tree
Showing 12 changed files with 484 additions and 450 deletions.
43 changes: 32 additions & 11 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"io/fs"
"os"
"path/filepath"
"runtime"
"time"

"git.numtide.com/numtide/treefmt/format"
Expand All @@ -22,8 +24,6 @@ import (
const (
pathsBucket = "paths"
formattersBucket = "formatters"

readBatchSize = 1024
)

// Entry represents a cache entry, indicating the last size and modified time for a file path.
Expand All @@ -32,15 +32,19 @@ type Entry struct {
Modified time.Time
}

var db *bolt.DB
var (
db *bolt.DB
ReadBatchSize = 1024 * runtime.NumCPU()
logger *log.Logger
)

// Open creates an instance of bolt.DB for a given treeRoot path.
// If clean is true, Open will delete any existing data in the cache.
//
// The database will be located in `XDG_CACHE_DIR/treefmt/eval-cache/<id>.db`, where <id> is determined by hashing
// the treeRoot path. This associates a given treeRoot with a given instance of the cache.
func Open(treeRoot string, clean bool, formatters map[string]*format.Formatter) (err error) {
l := log.WithPrefix("cache")
logger = log.WithPrefix("cache")

// determine a unique and consistent db name for the tree root
h := sha1.New()
Expand Down Expand Up @@ -85,7 +89,7 @@ func Open(treeRoot string, clean bool, formatters map[string]*format.Formatter)
}

clean = clean || entry == nil || !(entry.Size == stat.Size() && entry.Modified == stat.ModTime())
l.Debug(
logger.Debug(
"checking if formatter has changed",
"name", name,
"clean", clean,
Expand Down Expand Up @@ -174,6 +178,12 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
// It determines if a path is new or has changed by comparing against cache entries.
func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) error {
start := time.Now()

defer func() {
logger.Infof("finished generating change set in %v", time.Since(start))
}()

var tx *bolt.Tx
var bucket *bolt.Bucket
var processed int
Expand All @@ -185,6 +195,9 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
}
}()

// for quick removal of tree root from paths
relPathOffset := len(walker.Root()) + 1

return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -213,7 +226,8 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
bucket = tx.Bucket([]byte(pathsBucket))
}

cached, err := getEntry(bucket, path)
relPath := path[relPathOffset:]
cached, err := getEntry(bucket, relPath)
if err != nil {
return err
}
Expand All @@ -230,21 +244,28 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
case <-ctx.Done():
return ctx.Err()
default:
pathsCh <- path
pathsCh <- relPath
}

// close the current tx if we have reached the batch size
processed += 1
if processed == readBatchSize {
return tx.Rollback()
if processed == ReadBatchSize {
err = tx.Rollback()
tx = nil
return err
}

return nil
})
}

// Update is used to record updated cache information for the specified list of paths.
func Update(paths []string) (int, error) {
func Update(treeRoot string, paths []string) (int, error) {
start := time.Now()
defer func() {
logger.Infof("finished updating %v paths in %v", len(paths), time.Since(start))
}()

if len(paths) == 0 {
return 0, nil
}
Expand All @@ -260,7 +281,7 @@ func Update(paths []string) (int, error) {
return err
}

pathInfo, err := os.Stat(path)
pathInfo, err := os.Stat(filepath.Join(treeRoot, path))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 5d341f9

Please sign in to comment.