Skip to content

Commit

Permalink
chore: some cleanup and commenting
Browse files Browse the repository at this point in the history
Signed-off-by: Brian McGee <[email protected]>
  • Loading branch information
brianmcgee committed May 2, 2024
1 parent 2eaf999 commit c720e41
Showing 1 changed file with 91 additions and 56 deletions.
147 changes: 91 additions & 56 deletions cli/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (f *Format) Run() (err error) {
pipelines = make(map[string]*format.Pipeline)
formatters = make(map[string]*format.Formatter)

// iterate the formatters in lexicographical order
for _, name := range cfg.Names {
// init formatter
formatterCfg := cfg.Formatters[name]
formatter, err := format.NewFormatter(name, Cli.TreeRoot, formatterCfg, globalExcludes)
if errors.Is(err, format.ErrCommandNotFound) && Cli.AllowMissingFormatter {
Expand All @@ -74,8 +76,12 @@ func (f *Format) Run() (err error) {
return fmt.Errorf("%w: failed to initialise formatter: %v", err, name)
}

// store formatter by name
formatters[name] = formatter

// If no pipeline is configured, we add the formatter to a nominal pipeline of size 1 with the key being the
// formatter's name. If a pipeline is configured, we add the formatter to a pipeline keyed by
// 'p:<pipeline_name>' in which it is sorted by priority.
if formatterCfg.Pipeline == "" {
pipeline := format.Pipeline{}
pipeline.Add(formatter)
Expand Down Expand Up @@ -110,17 +116,17 @@ func (f *Format) Run() (err error) {
// initialise stats collection
stats.Init()

// create some groups for concurrent processing and control flow
// create an overall error group for executing high level tasks concurrently
eg, ctx := errgroup.WithContext(ctx)

// create a channel for paths to be processed
// we use a multiple of batch size here to allow for greater concurrency
// create a channel for files needing to be processed
// we use a multiple of batch size here as a rudimentary concurrency optimization based on the host machine
filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU())

// create a channel for tracking paths that have been processed
// create a channel for files that have been processed
processedCh = make(chan *walk.File, cap(filesCh))

// start concurrent processing tasks
// start concurrent processing tasks in reverse order
eg.Go(updateCache(ctx))
eg.Go(applyFormatters(ctx))
eg.Go(walkFilesystem(ctx))
Expand All @@ -129,15 +135,70 @@ func (f *Format) Run() (err error) {
return eg.Wait()
}

func updateCache(ctx context.Context) func() error {
return func() error {
// used to batch updates for more efficient txs
batch := make([]*walk.File, 0, BatchSize)

// apply a batch
processBatch := func() error {
if err := cache.Update(batch); err != nil {
return err
}
batch = batch[:0]
return nil
}

LOOP:
for {
select {
// detect ctx cancellation
case <-ctx.Done():
return ctx.Err()
// respond to processed files
case file, ok := <-processedCh:
if !ok {
// channel has been closed, no further files to process
break LOOP
}
// append to batch and process if we have enough
batch = append(batch, file)
if len(batch) == BatchSize {
if err := processBatch(); err != nil {
return err
}
}
}
}

// final flush
if err := processBatch(); err != nil {
return err
}

// if fail on change has been enabled, check that no files were actually formatted, throwing an error if so
if Cli.FailOnChange && stats.Value(stats.Formatted) != 0 {
return ErrFailOnChange
}

// print stats to stdout
stats.Print()

return nil
}
}

func walkFilesystem(ctx context.Context) func() error {
return func() error {
paths := Cli.Paths

// we read paths from stdin if the cli flag has been set and no paths were provided as cli args
if len(paths) == 0 && Cli.Stdin {

// determine the current working directory
cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("%w: failed to determine current working directory", err)
return fmt.Errorf("failed to determine current working directory: %w", err)
}

// read in all the paths
Expand All @@ -149,17 +210,21 @@ func walkFilesystem(ctx context.Context) func() error {
path = filepath.Join(cwd, path)
}

// append the fully qualified path to our paths list
paths = append(paths, path)
}
}

// create a filesystem walker
walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths)
if err != nil {
return fmt.Errorf("failed to create walker: %w", err)
}

// close the files channel when we're done walking the file system
defer close(filesCh)

// if no cache has been configured, we invoke the walker directly
if Cli.NoCache {
return walker.Walk(ctx, func(file *walk.File, err error) error {
select {
Expand All @@ -177,76 +242,42 @@ func walkFilesystem(ctx context.Context) func() error {
})
}

// otherwise we pass the walker to the cache and have it generate files for processing based on whether or not
// they have been added/changed since the last invocation
if err = cache.ChangeSet(ctx, walker, filesCh); err != nil {
return fmt.Errorf("failed to generate change set: %w", err)
}
return nil
}
}

func updateCache(ctx context.Context) func() error {
return func() error {
batch := make([]*walk.File, 0, BatchSize)

processBatch := func() error {
if err := cache.Update(batch); err != nil {
return err
}
batch = batch[:0]
return nil
}

LOOP:
for {
select {
case <-ctx.Done():
return ctx.Err()
case path, ok := <-processedCh:
if !ok {
break LOOP
}
batch = append(batch, path)
if len(batch) == BatchSize {
if err := processBatch(); err != nil {
return err
}
}
}
}

// final flush
if err := processBatch(); err != nil {
return err
}

if Cli.FailOnChange && stats.Value(stats.Formatted) != 0 {
return ErrFailOnChange
}

stats.Print()
return nil
}
}

func applyFormatters(ctx context.Context) func() error {
// create our own errgroup for concurrent formatting tasks
fg, ctx := errgroup.WithContext(ctx)

// pre-initialise batches keyed by pipeline
batches := make(map[string][]*walk.File)
for key := range pipelines {
batches[key] = make([]*walk.File, 0, BatchSize)
}

// for a given pipeline key, add the provided file to the current batch and trigger a format if the batch size has
// been reached
tryApply := func(key string, file *walk.File) {
batch, ok := batches[key]
if !ok {
batch = make([]*walk.File, 0, BatchSize)
}
batch = append(batch, file)
batches[key] = batch
// append to batch
batches[key] = append(batches[key], file)

// check if the batch is full
batch := batches[key]
if len(batch) == BatchSize {
// get the pipeline
pipeline := pipelines[key]

// copy the batch
files := make([]*walk.File, len(batch))
copy(files, batch)

// apply to the pipeline
fg.Go(func() error {
if err := pipeline.Apply(ctx, files); err != nil {
return err
Expand All @@ -257,10 +288,12 @@ func applyFormatters(ctx context.Context) func() error {
return nil
})

// reset the batch
batches[key] = batch[:0]
}
}

// format any partial batches
flushBatches := func() {
for key, pipeline := range pipelines {

Expand All @@ -287,6 +320,7 @@ func applyFormatters(ctx context.Context) func() error {
close(processedCh)
}()

// iterate the files channel, checking if any pipeline wants it, and attempting to apply if so.
for file := range filesCh {
var matched bool
for key, pipeline := range pipelines {
Expand All @@ -299,6 +333,7 @@ func applyFormatters(ctx context.Context) func() error {
if matched {
stats.Add(stats.Matched, 1)
} else {
// no match, so we send it direct to the processed channel
processedCh <- file
}
}
Expand Down

0 comments on commit c720e41

Please sign in to comment.