From bd348bfa378d182e6bee360879e99460ac25df60 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 27 Mar 2025 22:39:51 +1100 Subject: [PATCH 1/3] chore: make fiximports run in parallel --- scripts/fiximports/main.go | 174 ++++++++++++++++++++++++++++++------- 1 file changed, 144 insertions(+), 30 deletions(-) diff --git a/scripts/fiximports/main.go b/scripts/fiximports/main.go index 427975855bc..912d592eee6 100644 --- a/scripts/fiximports/main.go +++ b/scripts/fiximports/main.go @@ -3,12 +3,14 @@ package main import ( "bytes" "fmt" - "io" "io/fs" "os" "path/filepath" "regexp" + "runtime" "strings" + "sync" + "sync/atomic" "golang.org/x/tools/imports" ) @@ -25,7 +27,18 @@ var ( consecutiveNewlinesRegex = regexp.MustCompile(`\n\s*\n`) ) +type fileContent struct { + path string + original []byte + current []byte + changed bool +} + func main() { + numWorkers := runtime.NumCPU() + + // Collect all the filenames that we want to process + var files []string if err := filepath.Walk(".", func(path string, info fs.FileInfo, err error) error { switch { case err != nil: @@ -39,49 +52,150 @@ func main() { !strings.HasSuffix(info.Name(), ".go"): return nil } - return fixGoImports(path) + files = append(files, path) + return nil }); err != nil { - fmt.Printf("Error fixing go imports: %v\n", err) + _, _ = fmt.Fprintf(os.Stderr, "Error walking directory: %v\n", err) os.Exit(1) } + + // Read all file contents in parallel + fileContents := readFilesParallel(files, numWorkers) + + // Because we have multiple ways of separating imports, we have to imports.Process for each one + // but imports.LocalPrefix is a global, so we have to set it for each group and process files + // in parallel. + for _, prefix := range groupByPrefixes { + imports.LocalPrefix = prefix + processFilesParallel(fileContents, numWorkers) + } + + // Write modified files in parallel + writeFilesParallel(fileContents, numWorkers) } -func fixGoImports(path string) error { - sourceFile, err := os.OpenFile(path, os.O_RDWR, 0666) - if err != nil { - return err +func readFilesParallel(files []string, numWorkers int) []*fileContent { + var readErrors int64 + var wg sync.WaitGroup + fileContents := make([]*fileContent, len(files)) + filesChan := make(chan int, len(files)) + + // Fill a queue with file indices that we can consume in parallel + for i := range files { + filesChan <- i } - defer func() { _ = sourceFile.Close() }() + close(filesChan) - source, err := io.ReadAll(sourceFile) - if err != nil { - return err + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := range filesChan { + path := files[i] + content, err := os.ReadFile(path) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Error reading file %s: %v\n", path, err) + atomic.AddInt64(&readErrors, 1) + continue + } + + // Collapse is a cheap operation to do here + collapsed := collapseImportNewlines(content) + fileContents[i] = &fileContent{ + path: path, + original: content, + current: collapsed, + changed: !bytes.Equal(content, collapsed), + } + } + }() } - formatted := collapseImportNewlines(source) - for _, prefix := range groupByPrefixes { - imports.LocalPrefix = prefix - formatted, err = imports.Process(path, formatted, nil) - if err != nil { - return err - } + + wg.Wait() + + if readErrors > 0 { + _, _ = fmt.Fprintf(os.Stderr, "Failed to read %d files\n", readErrors) + os.Exit(1) } - if !bytes.Equal(source, formatted) { - if err := replaceFileContent(sourceFile, formatted); err != nil { - return err - } + + return fileContents +} + +func processFilesParallel(fileContents []*fileContent, numWorkers int) { + var processErrors int64 + var wg sync.WaitGroup + filesChan := make(chan int, len(fileContents)) + + // Fill a queue with file indices that we can consume in parallel + for i := range fileContents { + filesChan <- i + } + close(filesChan) + + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := range filesChan { + file := fileContents[i] + formatted, err := imports.Process(file.path, file.current, nil) + if err != nil { + atomic.AddInt64(&processErrors, 1) + _, _ = fmt.Fprintf(os.Stderr, "Error processing %s: %v", file.path, err) + continue + } + + if !bytes.Equal(file.current, formatted) { + file.current = formatted + file.changed = true + } + } + }() + } + + wg.Wait() + + if processErrors > 0 { + _, _ = fmt.Fprintf(os.Stderr, "Failed to process %d files\n", processErrors) + os.Exit(1) } - return nil } -func replaceFileContent(target *os.File, replacement []byte) error { - if _, err := target.Seek(0, io.SeekStart); err != nil { - return err +func writeFilesParallel(fileContents []*fileContent, numWorkers int) { + var writeErrors int64 + var wg sync.WaitGroup + + // Only process changed files + changedFiles := make(chan *fileContent, len(fileContents)) + for _, file := range fileContents { + if file != nil && file.changed { + changedFiles <- file + } } - written, err := target.Write(replacement) - if err != nil { - return err + close(changedFiles) + + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for file := range changedFiles { + // Only write if content has actually changed from original + if !bytes.Equal(file.original, file.current) { + if err := os.WriteFile(file.path, file.current, 0666); err != nil { + atomic.AddInt64(&writeErrors, 1) + _, _ = fmt.Fprintf(os.Stderr, "Error writing file %s: %v\n", file.path, err) + } + } + } + }() + } + + wg.Wait() + + if writeErrors > 0 { + _, _ = fmt.Fprintf(os.Stderr, "Failed to write %d files\n", writeErrors) + os.Exit(1) } - return target.Truncate(int64(written)) } func collapseImportNewlines(content []byte) []byte { From 51cabb493ecfb94bda4823414192bf1abfee1453 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 27 Mar 2025 22:55:04 +1100 Subject: [PATCH 2/3] fixup! chore: make fiximports run in parallel --- scripts/fiximports/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/fiximports/main.go b/scripts/fiximports/main.go index 912d592eee6..381ffbf76ed 100644 --- a/scripts/fiximports/main.go +++ b/scripts/fiximports/main.go @@ -128,7 +128,9 @@ func processFilesParallel(fileContents []*fileContent, numWorkers int) { // Fill a queue with file indices that we can consume in parallel for i := range fileContents { - filesChan <- i + if fileContents[i] != nil { // shouldn't be nil, but just in case + filesChan <- i + } } close(filesChan) From c9e5ef47130a9e28cf9a3ea35f850191ea11ff05 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 28 Mar 2025 11:50:18 +1100 Subject: [PATCH 3/3] fixup! chore: make fiximports run in parallel --- scripts/fiximports/main.go | 174 ++++++++++++++----------------------- 1 file changed, 67 insertions(+), 107 deletions(-) diff --git a/scripts/fiximports/main.go b/scripts/fiximports/main.go index 381ffbf76ed..3a5a188814f 100644 --- a/scripts/fiximports/main.go +++ b/scripts/fiximports/main.go @@ -9,10 +9,10 @@ import ( "regexp" "runtime" "strings" - "sync" - "sync/atomic" + "golang.org/x/sync/errgroup" "golang.org/x/tools/imports" + "golang.org/x/xerrors" ) var ( @@ -60,144 +60,104 @@ func main() { } // Read all file contents in parallel - fileContents := readFilesParallel(files, numWorkers) + fileContents, err := readFilesParallel(files, numWorkers) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Error reading files: %v\n", err) + os.Exit(1) + } // Because we have multiple ways of separating imports, we have to imports.Process for each one // but imports.LocalPrefix is a global, so we have to set it for each group and process files // in parallel. for _, prefix := range groupByPrefixes { imports.LocalPrefix = prefix - processFilesParallel(fileContents, numWorkers) + if err := processFilesParallel(fileContents, numWorkers); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Error processing files with prefix %s: %v\n", prefix, err) + os.Exit(1) + } } // Write modified files in parallel - writeFilesParallel(fileContents, numWorkers) + if err := writeFilesParallel(fileContents, numWorkers); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Error writing files: %v\n", err) + os.Exit(1) + } } -func readFilesParallel(files []string, numWorkers int) []*fileContent { - var readErrors int64 - var wg sync.WaitGroup +func readFilesParallel(files []string, numWorkers int) ([]*fileContent, error) { fileContents := make([]*fileContent, len(files)) - filesChan := make(chan int, len(files)) - // Fill a queue with file indices that we can consume in parallel - for i := range files { - filesChan <- i - } - close(filesChan) - - for w := 0; w < numWorkers; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := range filesChan { - path := files[i] - content, err := os.ReadFile(path) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Error reading file %s: %v\n", path, err) - atomic.AddInt64(&readErrors, 1) - continue - } - - // Collapse is a cheap operation to do here - collapsed := collapseImportNewlines(content) - fileContents[i] = &fileContent{ - path: path, - original: content, - current: collapsed, - changed: !bytes.Equal(content, collapsed), - } + var g errgroup.Group + g.SetLimit(numWorkers) + + for i, path := range files { + g.Go(func() error { + content, err := os.ReadFile(path) + if err != nil { + return xerrors.Errorf("reading %s: %w", path, err) } - }() - } - wg.Wait() + // Collapse is a cheap operation to do here + collapsed := collapseImportNewlines(content) + fileContents[i] = &fileContent{ + path: path, + original: content, + current: collapsed, + changed: !bytes.Equal(content, collapsed), + } + return nil + }) + } - if readErrors > 0 { - _, _ = fmt.Fprintf(os.Stderr, "Failed to read %d files\n", readErrors) - os.Exit(1) + if err := g.Wait(); err != nil { + return nil, err } - return fileContents + return fileContents, nil } -func processFilesParallel(fileContents []*fileContent, numWorkers int) { - var processErrors int64 - var wg sync.WaitGroup - filesChan := make(chan int, len(fileContents)) +func processFilesParallel(fileContents []*fileContent, numWorkers int) error { + var g errgroup.Group + g.SetLimit(numWorkers) - // Fill a queue with file indices that we can consume in parallel - for i := range fileContents { - if fileContents[i] != nil { // shouldn't be nil, but just in case - filesChan <- i + for _, file := range fileContents { + if file == nil { + continue } - } - close(filesChan) - - for w := 0; w < numWorkers; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := range filesChan { - file := fileContents[i] - formatted, err := imports.Process(file.path, file.current, nil) - if err != nil { - atomic.AddInt64(&processErrors, 1) - _, _ = fmt.Fprintf(os.Stderr, "Error processing %s: %v", file.path, err) - continue - } - - if !bytes.Equal(file.current, formatted) { - file.current = formatted - file.changed = true - } + g.Go(func() error { + formatted, err := imports.Process(file.path, file.current, nil) + if err != nil { + return xerrors.Errorf("processing %s: %w", file.path, err) } - }() - } - - wg.Wait() - if processErrors > 0 { - _, _ = fmt.Fprintf(os.Stderr, "Failed to process %d files\n", processErrors) - os.Exit(1) + if !bytes.Equal(file.current, formatted) { + file.current = formatted + file.changed = true + } + return nil + }) } + + return g.Wait() } -func writeFilesParallel(fileContents []*fileContent, numWorkers int) { - var writeErrors int64 - var wg sync.WaitGroup +func writeFilesParallel(fileContents []*fileContent, numWorkers int) error { + var g errgroup.Group + g.SetLimit(numWorkers) - // Only process changed files - changedFiles := make(chan *fileContent, len(fileContents)) for _, file := range fileContents { - if file != nil && file.changed { - changedFiles <- file + if file == nil || !file.changed { + continue } - } - close(changedFiles) - - for w := 0; w < numWorkers; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for file := range changedFiles { - // Only write if content has actually changed from original - if !bytes.Equal(file.original, file.current) { - if err := os.WriteFile(file.path, file.current, 0666); err != nil { - atomic.AddInt64(&writeErrors, 1) - _, _ = fmt.Fprintf(os.Stderr, "Error writing file %s: %v\n", file.path, err) - } - } + g.Go(func() error { + if err := os.WriteFile(file.path, file.current, 0666); err != nil { + return xerrors.Errorf("writing %s: %w", file.path, err) } - }() + return nil + }) } - wg.Wait() - - if writeErrors > 0 { - _, _ = fmt.Fprintf(os.Stderr, "Failed to write %d files\n", writeErrors) - os.Exit(1) - } + return g.Wait() } func collapseImportNewlines(content []byte) []byte {