Skip to content

DEV2: Implementation of multiple goroutines for concurrent flushing#15

Merged
maoueh merged 35 commits intofirehose-fh3.0from
david-zhou/worker-queue-2
Jun 3, 2025
Merged

DEV2: Implementation of multiple goroutines for concurrent flushing#15
maoueh merged 35 commits intofirehose-fh3.0from
david-zhou/worker-queue-2

Conversation

@Rampex1
Copy link

@Rampex1 Rampex1 commented May 15, 2025

Changes Introduced

Implemention of multiple goroutines for concurrent flushing

  • User determines the number of concurrent goroutines through configuration

Benchmarking report

  • Full benchmark report in md file

Note

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces configurable concurrent flushing of firehose blocks via multiple goroutines, updates tests to support this new configuration, and includes benchmarking reports.

  • Add ConcurrentBlockFlushing config and implement worker queues for asynchronous block serialization and ordered output
  • Update tests to run in sequential and concurrent modes and add new concurrency-specific tests
  • Include performance benchmarking report in firehose_concurrency_2.md

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
eth/tracers/internal/tracetest/firehose/helpers_test.go Update newFirehoseTestTracer signature to accept FirehoseConfig
eth/tracers/internal/tracetest/firehose/firehose_test.go Add loops for sequential and concurrent test runs and pass new config
eth/tracers/firehose_concurrency_test.go New unit tests validating concurrent flushing behavior
eth/tracers/firehose_concurrency_2.md Benchmarking report for concurrent block flushing
eth/tracers/firehose.go Implement core concurrency logic, added config field, worker queues, and clean shutdown
Comments suppressed due to low confidence (2)

eth/tracers/firehose.go:154

  • [nitpick] The field name closeChannels is ambiguous for a sync.Once; consider renaming it to closeOnce or similar to better reflect its purpose.
closeChannels sync.Once

eth/tracers/internal/tracetest/firehose/firehose_test.go:43

  • Tests only cover ConcurrentBlockFlushing values 0 and 1; add cases with higher values (e.g., >1) to verify behavior under multiple worker goroutines.
for _, concurrent := range []int{0, 1} {

@Rampex1 Rampex1 force-pushed the david-zhou/worker-queue-2 branch from f1ff669 to 8eb3d90 Compare May 15, 2025 15:14
@Rampex1 Rampex1 changed the base branch from release/geth-1.x-fh3.0 to firehose-fh3.0 May 15, 2025 19:10
@Rampex1 Rampex1 changed the title DEV: Implementation of multiple goroutines for concurrent flushing DEV2: Implementation of multiple goroutines for concurrent flushing May 15, 2025
@Rampex1 Rampex1 force-pushed the david-zhou/worker-queue-2 branch from 12070e8 to b105796 Compare May 21, 2025 14:35
@Rampex1
Copy link
Author

Rampex1 commented May 21, 2025

Ready for review

@Rampex1
Copy link
Author

Rampex1 commented May 23, 2025

One last change I made which isn't showing on Github for some reasonn (Github is saying "prcessing update")
I refactored the following line in firehose.go to the global state
Line 266 --> concurrentFlushBufferSize: 100,

// if it's a network for which we must reproduce the legacy bugs.
applyBackwardCompatibility *bool
concurrentBlockFlushing bool
concurrentBlockFlushing int
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove and use config block flushing count directly everywehre.

}

type ConcurrentFlushQueue struct {
bufferSize int
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, unnecessary to keep in state.

Comment on lines +32 to +36
func NewConcurrentFlushQueue(bufferSize int, printBlockFunc func(*pbeth.Block, *FinalityStatus), outputFunc func([]byte)) *ConcurrentFlushQueue {
return &ConcurrentFlushQueue{
startSignal: make(chan uint64, 1),
jobQueue: make(chan *blockPrintJob, bufferSize),
outputQueue: make(chan *outputJob, bufferSize),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer size should be the same as number of worker taken in start, might be good to merge Start input with this one, which would mean that we keep the bufferSize struct variable. Maybe with another name though.

@maoueh maoueh merged commit cccc2d8 into firehose-fh3.0 Jun 3, 2025
@maoueh maoueh deleted the david-zhou/worker-queue-2 branch June 3, 2025 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants