From 8af5b3c076017955bca062adab823add4d3f4e45 Mon Sep 17 00:00:00 2001 From: Brian McGee Date: Fri, 19 Apr 2024 10:57:41 +0100 Subject: [PATCH] feat: introduce concept of pipelines for better concurrency Replaces the `Before` config option with an optional `Pipeline` key. This is used to group formatters together in the order in which they are specified within the config file. Signed-off-by: Brian McGee --- cli/format.go | 340 ++++++++++++++++++++----------------- cli/format_test.go | 75 -------- config/formatter.go | 4 +- format/formatter.go | 209 +++++------------------ format/pipeline.go | 31 ++++ test/examples/treefmt.toml | 6 +- 6 files changed, 261 insertions(+), 404 deletions(-) create mode 100644 format/pipeline.go diff --git a/cli/format.go b/cli/format.go index 14ac16cd..c2487539 100644 --- a/cli/format.go +++ b/cli/format.go @@ -8,23 +8,38 @@ import ( "io/fs" "os" "os/signal" - "strings" + "slices" "syscall" "time" + "git.numtide.com/numtide/treefmt/format" + "github.com/gobwas/glob" + "git.numtide.com/numtide/treefmt/cache" "git.numtide.com/numtide/treefmt/config" - format2 "git.numtide.com/numtide/treefmt/format" "git.numtide.com/numtide/treefmt/walk" "github.com/charmbracelet/log" "golang.org/x/sync/errgroup" ) -var ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled") +const ( + BatchSize = 1024 +) + +var ( + start time.Time + globalExcludes []glob.Glob + formatters map[string]*format.Formatter + pipelines map[string]*format.Pipeline + pathsCh chan string + processedCh chan string + + ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled") +) -func (f *Format) Run() error { - start := time.Now() +func (f *Format) Run() (err error) { + start = time.Now() Cli.Configure() @@ -36,86 +51,40 @@ func (f *Format) Run() error { } }() - // create an overall context - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // read config cfg, err := config.ReadFile(Cli.ConfigFile) if err != nil { return fmt.Errorf("%w: failed to read config file", err) } - globalExcludes, err := format2.CompileGlobs(cfg.Global.Excludes) - - // create optional formatter filter set - formatterSet := make(map[string]bool) - for _, name := range Cli.Formatters { - _, ok := cfg.Formatters[name] - if !ok { - return fmt.Errorf("%w: formatter not found in config: %v", err, name) - } - formatterSet[name] = true - } - - includeFormatter := func(name string) bool { - if len(formatterSet) == 0 { - return true - } else { - _, include := formatterSet[name] - return include - } + if globalExcludes, err = format.CompileGlobs(cfg.Global.Excludes); err != nil { + return fmt.Errorf("%w: failed to compile global globs", err) } - formatters := make(map[string]*format2.Formatter) + pipelines = make(map[string]*format.Pipeline) + formatters = make(map[string]*format.Formatter) - // detect broken dependencies - for name, formatterCfg := range cfg.Formatters { - before := formatterCfg.Before - if before != "" { - // check child formatter exists - _, ok := cfg.Formatters[before] + // filter formatters + if len(Cli.Formatters) > 0 { + // first check the cli formatter list is valid + for _, name := range Cli.Formatters { + _, ok := cfg.Formatters[name] if !ok { - return fmt.Errorf("formatter %v is before %v but config for %v was not found", name, before, before) + return fmt.Errorf("formatter not found in config: %v", name) } } - } - - // dependency cycle detection - for name, formatterCfg := range cfg.Formatters { - var ok bool - var history []string - childName := name - for { - // add to history - history = append(history, childName) - - if formatterCfg.Before == "" { - break - } else if formatterCfg.Before == name { - return fmt.Errorf("formatter cycle detected %v", strings.Join(history, " -> ")) - } - - // load child config - childName = formatterCfg.Before - formatterCfg, ok = cfg.Formatters[formatterCfg.Before] - if !ok { - return fmt.Errorf("formatter not found: %v", formatterCfg.Before) + // next we remove any formatter configs that were not specified + for name := range cfg.Formatters { + if !slices.Contains(Cli.Formatters, name) { + delete(cfg.Formatters, name) } } } // init formatters for name, formatterCfg := range cfg.Formatters { - if !includeFormatter(name) { - // remove this formatter - delete(cfg.Formatters, name) - l.Debugf("formatter %v is not in formatter list %v, skipping", name, Cli.Formatters) - continue - } - - formatter, err := format2.NewFormatter(name, formatterCfg, globalExcludes) - if errors.Is(err, format2.ErrCommandNotFound) && Cli.AllowMissingFormatter { + formatter, err := format.NewFormatter(name, formatterCfg, globalExcludes) + if errors.Is(err, format.ErrCommandNotFound) && Cli.AllowMissingFormatter { l.Debugf("formatter not found: %v", name) continue } else if err != nil { @@ -123,49 +92,101 @@ func (f *Format) Run() error { } formatters[name] = formatter - } - // iterate the initialised formatters configuring parent/child relationships - for _, formatter := range formatters { - if formatter.Before() != "" { - child, ok := formatters[formatter.Before()] + if formatterCfg.Pipeline == "" { + pipeline := format.Pipeline{} + pipeline.Add(formatter) + pipelines[name] = &pipeline + } else { + key := fmt.Sprintf("p:%s", formatterCfg.Pipeline) + pipeline, ok := pipelines[key] if !ok { - // formatter has been filtered out by the user - formatter.ResetBefore() - continue + pipeline = &format.Pipeline{} + pipelines[key] = pipeline } - formatter.SetChild(child) - child.SetParent(formatter) + pipeline.Add(formatter) } } + // open the cache if err = cache.Open(Cli.TreeRoot, Cli.ClearCache, formatters); err != nil { return err } - // - completedCh := make(chan string, 1024) + // create an app context and listen for shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - ctx = format2.SetCompletedChannel(ctx, completedCh) + go func() { + exit := make(chan os.Signal, 1) + signal.Notify(exit, os.Interrupt, syscall.SIGTERM) + <-exit + cancel() + }() - // + // create some groups for concurrent processing and control flow eg, ctx := errgroup.WithContext(ctx) - // start the formatters - for name := range formatters { - formatter := formatters[name] - eg.Go(func() error { - return formatter.Run(ctx) - }) - } + // create a channel for paths to be processed + // we use a multiple of batch size here to allow for greater concurrency + pathsCh = make(chan string, 10*BatchSize) + + // create a channel for tracking paths that have been processed + processedCh = make(chan string, cap(pathsCh)) + + // start concurrent processing tasks + eg.Go(updateCache(ctx)) + eg.Go(applyFormatters(ctx)) + eg.Go(walkFilesystem(ctx)) + + // wait for everything to complete + return eg.Wait() +} + +func walkFilesystem(ctx context.Context) func() error { + return func() error { + paths := Cli.Paths + + if len(paths) == 0 && Cli.Stdin { + // read in all the paths + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + paths = append(paths, scanner.Text()) + } + } + + walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths) + if err != nil { + return fmt.Errorf("failed to create walker: %w", err) + } + + defer close(pathsCh) + + if Cli.NoCache { + return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // ignore symlinks and directories + if !(info.IsDir() || info.Mode()&os.ModeSymlink == os.ModeSymlink) { + pathsCh <- path + } + return nil + } + }) + } - // determine paths to be formatted - pathsCh := make(chan string, 1024) + if err = cache.ChangeSet(ctx, walker, pathsCh); err != nil { + return fmt.Errorf("failed to generate change set: %w", err) + } + return nil + } +} - // update cache as paths are completed - eg.Go(func() error { - batchSize := 1024 - batch := make([]string, 0, batchSize) +func updateCache(ctx context.Context) func() error { + return func() error { + batch := make([]string, 0, BatchSize) var changes int @@ -188,13 +209,13 @@ func (f *Format) Run() error { select { case <-ctx.Done(): return ctx.Err() - case path, ok := <-completedCh: + case path, ok := <-processedCh: if !ok { break LOOP } batch = append(batch, path) - if len(batch) == batchSize { - if err = processBatch(); err != nil { + if len(batch) == BatchSize { + if err := processBatch(); err != nil { return err } } @@ -202,7 +223,7 @@ func (f *Format) Run() error { } // final flush - if err = processBatch(); err != nil { + if err := processBatch(); err != nil { return err } @@ -212,81 +233,84 @@ func (f *Format) Run() error { fmt.Printf("%v files changed in %v\n", changes, time.Now().Sub(start)) return nil - }) - - eg.Go(func() error { - // pass paths to each formatter - for path := range pathsCh { - for _, formatter := range formatters { - if formatter.Wants(path) { - formatter.Put(path) - } - } - } + } +} - // indicate no more paths for each formatter - for _, formatter := range formatters { - if formatter.Parent() != nil { - // this formatter is not a root, it will be closed by a parent - continue - } - formatter.Close() - } +func applyFormatters(ctx context.Context) func() error { + fg, ctx := errgroup.WithContext(ctx) + batches := make(map[string][]string) - // await completion - for _, formatter := range formatters { - formatter.AwaitCompletion() + tryApply := func(key string, path string) { + batch, ok := batches[key] + if !ok { + batch = make([]string, 0, BatchSize) } + batch = append(batch, path) + batches[key] = batch - // indicate no more completion events - close(completedCh) + if len(batch) == BatchSize { + pipeline := pipelines[key] - return nil - }) + // copy the batch + paths := make([]string, len(batch)) + copy(paths, batch) - eg.Go(func() (err error) { - paths := Cli.Paths + fg.Go(func() error { + if err := pipeline.Apply(ctx, paths); err != nil { + return err + } + for _, path := range paths { + processedCh <- path + } + return nil + }) - if len(paths) == 0 && Cli.Stdin { - // read in all the paths - scanner := bufio.NewScanner(os.Stdin) - for scanner.Scan() { - paths = append(paths, scanner.Text()) - } + batches[key] = batch[:0] } + } - walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths) - if err != nil { - return fmt.Errorf("%w: failed to create walker", err) - } + flushBatches := func() { + for key, pipeline := range pipelines { - defer close(pathsCh) + batch := batches[key] + pipeline := pipeline // capture for closure - if Cli.NoCache { - return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // ignore symlinks and directories - if !(info.IsDir() || info.Mode()&os.ModeSymlink == os.ModeSymlink) { - pathsCh <- path + if len(batch) > 0 { + fg.Go(func() error { + if err := pipeline.Apply(ctx, batch); err != nil { + return fmt.Errorf("%w: pipeline failure, %s", err, key) + } + for _, path := range batch { + processedCh <- path } return nil - } - }) + }) + } } + } - return cache.ChangeSet(ctx, walker, pathsCh) - }) + return func() error { + defer func() { + // close processed channel + close(processedCh) + }() - // listen for shutdown and call cancel if required - go func() { - exit := make(chan os.Signal, 1) - signal.Notify(exit, os.Interrupt, syscall.SIGTERM) - <-exit - cancel() - }() + for path := range pathsCh { + for key, pipeline := range pipelines { + if !pipeline.Wants(path) { + continue + } + tryApply(key, path) + } + } - return eg.Wait() + // flush any partial batches which remain + flushBatches() + + // wait for all outstanding formatting tasks to complete + if err := fg.Wait(); err != nil { + return fmt.Errorf("pipeline processing failure: %w", err) + } + return nil + } } diff --git a/cli/format_test.go b/cli/format_test.go index 4f032318..9030d099 100644 --- a/cli/format_test.go +++ b/cli/format_test.go @@ -41,27 +41,6 @@ func TestAllowMissingFormatter(t *testing.T) { as.NoError(err) } -func TestDependencyCycle(t *testing.T) { - as := require.New(t) - - tempDir := t.TempDir() - configPath := tempDir + "/treefmt.toml" - - test.WriteConfig(t, configPath, config2.Config{ - Formatters: map[string]*config2.Formatter{ - "a": {Command: "echo", Before: "b"}, - "b": {Command: "echo", Before: "c"}, - "c": {Command: "echo", Before: "a"}, - "d": {Command: "echo", Before: "e"}, - "e": {Command: "echo", Before: "f"}, - "f": {Command: "echo"}, - }, - }) - - _, err := cmd(t, "--config-file", configPath, "--tree-root", tempDir) - as.ErrorContains(err, "formatter cycle detected") -} - func TestSpecifyingFormatters(t *testing.T) { as := require.New(t) @@ -451,60 +430,6 @@ func TestGitWorktree(t *testing.T) { as.Contains(string(out), fmt.Sprintf("%d files changed", 57)) } -func TestOrderingFormatters(t *testing.T) { - as := require.New(t) - - tempDir := test.TempExamples(t) - configPath := path.Join(tempDir, "treefmt.toml") - - // missing child - test.WriteConfig(t, configPath, config2.Config{ - Formatters: map[string]*config2.Formatter{ - "hs-a": { - Command: "echo", - Includes: []string{"*.hs"}, - Before: "hs-b", - }, - }, - }) - - out, err := cmd(t, "--config-file", configPath, "--tree-root", tempDir) - as.ErrorContains(err, "formatter hs-a is before hs-b but config for hs-b was not found") - - // multiple roots - test.WriteConfig(t, configPath, config2.Config{ - Formatters: map[string]*config2.Formatter{ - "hs-a": { - Command: "echo", - Includes: []string{"*.hs"}, - Before: "hs-b", - }, - "hs-b": { - Command: "echo", - Includes: []string{"*.hs"}, - Before: "hs-c", - }, - "hs-c": { - Command: "echo", - Includes: []string{"*.hs"}, - }, - "py-a": { - Command: "echo", - Includes: []string{"*.py"}, - Before: "py-b", - }, - "py-b": { - Command: "echo", - Includes: []string{"*.py"}, - }, - }, - }) - - out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir) - as.NoError(err) - as.Contains(string(out), "8 files changed") -} - func TestPathsArg(t *testing.T) { as := require.New(t) diff --git a/config/formatter.go b/config/formatter.go index 34f4983f..4bc6a760 100644 --- a/config/formatter.go +++ b/config/formatter.go @@ -9,6 +9,6 @@ type Formatter struct { Includes []string // Excludes is an optional list of glob patterns used to exclude certain files from this Formatter. Excludes []string - // Before is the name of another formatter which must process a path after this one - Before string + // + Pipeline string } diff --git a/format/formatter.go b/format/formatter.go index 434addf8..532a2068 100644 --- a/format/formatter.go +++ b/format/formatter.go @@ -24,48 +24,66 @@ type Formatter struct { log *log.Logger executable string // path to the executable described by Command - before string - - child *Formatter - parent *Formatter - // internal compiled versions of Includes and Excludes. includes []glob.Glob excludes []glob.Glob - - // inboxCh is used to accept new paths for formatting. - inboxCh chan string - // completedCh is used to wait for this formatter to finish all processing. - completedCh chan interface{} - - // Entries from inboxCh are batched according to batchSize and stored in batch for processing when the batchSize has - // been reached or Close is invoked. - batch []string - batchSize int } -func (f *Formatter) Before() string { - return f.before +// Executable returns the path to the executable defined by Command +func (f *Formatter) Executable() string { + return f.executable } -func (f *Formatter) ResetBefore() { - f.before = "" +func (f *Formatter) Apply(ctx context.Context, paths []string) error { + // only apply if the resultant batch is not empty + if len(paths) > 0 { + // construct args, starting with config + args := f.config.Options + + // append each file path + for _, path := range paths { + args = append(args, path) + } + + // execute + start := time.Now() + cmd := exec.CommandContext(ctx, f.config.Command, args...) + + if out, err := cmd.CombinedOutput(); err != nil { + f.log.Debugf("\n%v", string(out)) + // todo log output + return err + } + + f.log.Infof("%v files processed in %v", len(paths), time.Now().Sub(start)) + } + + return nil } -// Executable returns the path to the executable defined by Command -func (f *Formatter) Executable() string { - return f.executable +// Wants is used to test if a Formatter wants path based on it's configured Includes and Excludes patterns. +// Returns true if the Formatter should be applied to path, false otherwise. +func (f *Formatter) Wants(path string) bool { + match := !PathMatches(path, f.excludes) && PathMatches(path, f.includes) + if match { + f.log.Debugf("match: %v", path) + } + return match } // NewFormatter is used to create a new Formatter. -func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.Glob) (*Formatter, error) { +func NewFormatter( + name string, + config *config.Formatter, + globalExcludes []glob.Glob, +) (*Formatter, error) { var err error f := Formatter{} - // capture the name from the config file + + // capture config and the formatter's name f.name = name f.config = config - f.before = config.Before // test if the formatter is available executable, err := exec.LookPath(config.Command) @@ -78,10 +96,6 @@ func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.G // initialise internal state f.log = log.WithPrefix("format | " + name) - f.batchSize = 1024 - f.batch = make([]string, 0, f.batchSize) - f.inboxCh = make(chan string, f.batchSize) - f.completedCh = make(chan interface{}, 1) f.includes, err = CompileGlobs(config.Includes) if err != nil { @@ -96,140 +110,3 @@ func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.G return &f, nil } - -func (f *Formatter) SetParent(formatter *Formatter) { - f.parent = formatter -} - -func (f *Formatter) Parent() *Formatter { - return f.parent -} - -func (f *Formatter) SetChild(formatter *Formatter) { - f.child = formatter -} - -// Wants is used to test if a Formatter wants path based on it's configured Includes and Excludes patterns. -// Returns true if the Formatter should be applied to path, false otherwise. -func (f *Formatter) Wants(path string) bool { - if f.parent != nil { - // we don't accept this path directly, our parent will forward it - return false - } - match := !PathMatches(path, f.excludes) && PathMatches(path, f.includes) - if match { - f.log.Debugf("match: %v", path) - } - return match -} - -// Put add path into this Formatter's inboxCh for processing. -func (f *Formatter) Put(path string) { - f.inboxCh <- path -} - -// Run is the main processing loop for this Formatter. -// It accepts a context which is used to lookup certain dependencies and for cancellation. -func (f *Formatter) Run(ctx context.Context) (err error) { - defer func() { - if f.child != nil { - // indicate no further processing for the child formatter - f.child.Close() - } - - // indicate this formatter has finished processing - f.completedCh <- nil - }() - -LOOP: - // keep processing until ctx has been cancelled or inboxCh has been closed - for { - select { - - case <-ctx.Done(): - // ctx has been cancelled - err = ctx.Err() - break LOOP - - case path, ok := <-f.inboxCh: - // check if the inboxCh has been closed - if !ok { - break LOOP - } - - // add path to the current batch - f.batch = append(f.batch, path) - - if len(f.batch) == f.batchSize { - // drain immediately - if err := f.apply(ctx); err != nil { - break LOOP - } - } - } - } - - // check if LOOP was exited due to an error - if err != nil { - return - } - - // processing any lingering batch - return f.apply(ctx) -} - -// apply executes Command against the latest batch of paths. -// It accepts a context which is used to lookup certain dependencies and for cancellation. -func (f *Formatter) apply(ctx context.Context) error { - // empty check - if len(f.batch) == 0 { - return nil - } - - // construct args, starting with config - args := f.config.Options - - // append each file path - for _, path := range f.batch { - args = append(args, path) - } - - // execute - start := time.Now() - cmd := exec.CommandContext(ctx, f.config.Command, args...) - - if out, err := cmd.CombinedOutput(); err != nil { - f.log.Debugf("\n%v", string(out)) - // todo log output - return err - } - - f.log.Infof("%v files processed in %v", len(f.batch), time.Now().Sub(start)) - - if f.child == nil { - // mark each path in this batch as completed - for _, path := range f.batch { - MarkPathComplete(ctx, path) - } - } else { - // otherwise forward each path onto the next formatter for processing - for _, path := range f.batch { - f.child.Put(path) - } - } - - // reset batch - f.batch = f.batch[:0] - - return nil -} - -// Close is used to indicate that a Formatter should process any remaining paths and then stop it's processing loop. -func (f *Formatter) Close() { - close(f.inboxCh) -} - -func (f *Formatter) AwaitCompletion() { - // todo support a timeout - <-f.completedCh -} diff --git a/format/pipeline.go b/format/pipeline.go new file mode 100644 index 00000000..29b94fb5 --- /dev/null +++ b/format/pipeline.go @@ -0,0 +1,31 @@ +package format + +import "context" + +type Pipeline struct { + sequence []*Formatter +} + +func (p *Pipeline) Add(f *Formatter) { + p.sequence = append(p.sequence, f) +} + +func (p *Pipeline) Wants(path string) bool { + var match bool + for _, f := range p.sequence { + match = f.Wants(path) + if match { + break + } + } + return match +} + +func (p *Pipeline) Apply(ctx context.Context, paths []string) error { + for _, f := range p.sequence { + if err := f.Apply(ctx, paths); err != nil { + return err + } + } + return nil +} diff --git a/test/examples/treefmt.toml b/test/examples/treefmt.toml index 699665a0..0207c04d 100644 --- a/test/examples/treefmt.toml +++ b/test/examples/treefmt.toml @@ -31,12 +31,12 @@ command = "alejandra" includes = ["*.nix"] # Act as an example on how to exclude specific files excludes = ["examples/nix/sources.nix"] -# Make this run before deadnix -# Note this formatter determines the file set for any 'downstream' formatters -before = "deadnix" +pipeline = "nix" [formatter.deadnix] command = "deadnix" +includes = ["*.nix"] +pipeline = "nix" [formatter.ruby] command = "rufo"