From def498cce9cf05c05f725e6feab3cf8e9973c8c2 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 1 Oct 2024 15:48:35 +0300 Subject: [PATCH 01/17] Added TTS metrics --- .../node_builder/access_node_builder.go | 7 +++ cmd/bootstrap/utils/md5.go | 2 +- cmd/observer/node_builder/observer_builder.go | 7 +++ model/flow/transaction_timing.go | 1 + module/metrics.go | 4 ++ module/metrics/noop.go | 1 + module/metrics/transaction.go | 53 +++++++++++++++++++ .../indexer/collection_executed_metric.go | 21 ++++++++ 8 files changed, 95 insertions(+), 1 deletion(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 3c1305a3e96..320cd03be3c 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -149,6 +149,7 @@ type AccessNodeConfig struct { logTxTimeToFinalized bool logTxTimeToExecuted bool logTxTimeToFinalizedExecuted bool + logTxTimeToSealed bool retryEnabled bool rpcMetricsEnabled bool executionDataSyncEnabled bool @@ -239,6 +240,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { logTxTimeToFinalized: false, logTxTimeToExecuted: false, logTxTimeToFinalizedExecuted: false, + logTxTimeToSealed: false, pingEnabled: false, retryEnabled: false, rpcMetricsEnabled: false, @@ -1231,6 +1233,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { "log-tx-time-to-finalized-executed", defaultConfig.logTxTimeToFinalizedExecuted, "log transaction time to finalized and executed") + flags.BoolVar(&builder.logTxTimeToSealed, + "log-tx-time-to-sealed", + defaultConfig.logTxTimeToSealed, + "log transaction time to sealed") flags.BoolVar(&builder.pingEnabled, "ping-enabled", defaultConfig.pingEnabled, @@ -1677,6 +1683,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.logTxTimeToFinalized, builder.logTxTimeToExecuted, builder.logTxTimeToFinalizedExecuted, + builder.logTxTimeToSealed, ) return nil }). diff --git a/cmd/bootstrap/utils/md5.go b/cmd/bootstrap/utils/md5.go index 3abe9c42948..e885ed891e2 100644 --- a/cmd/bootstrap/utils/md5.go +++ b/cmd/bootstrap/utils/md5.go @@ -3,7 +3,7 @@ package utils // The google storage API only provides md5 and crc32 hence overriding the linter flag for md5 // #nosec import ( - "crypto/md5" + "crypto/md5" //nolint:gosec "io" "os" ) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index a1d1dbcf6a1..3748a468ec7 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -157,6 +157,7 @@ type ObserverServiceConfig struct { logTxTimeToFinalized bool logTxTimeToExecuted bool logTxTimeToFinalizedExecuted bool + logTxTimeToSealed bool executionDataSyncEnabled bool executionDataIndexingEnabled bool executionDataDBMode string @@ -233,6 +234,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { logTxTimeToFinalized: false, logTxTimeToExecuted: false, logTxTimeToFinalizedExecuted: false, + logTxTimeToSealed: false, executionDataSyncEnabled: false, executionDataIndexingEnabled: false, executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(), @@ -676,6 +678,10 @@ func (builder *ObserverServiceBuilder) extraFlags() { "log-tx-time-to-finalized-executed", defaultConfig.logTxTimeToFinalizedExecuted, "log transaction time to finalized and executed") + flags.BoolVar(&builder.logTxTimeToSealed, + "log-tx-time-to-sealed", + defaultConfig.logTxTimeToSealed, + "log transaction time to sealed") flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics") flags.BoolVar(&builder.executionDataIndexingEnabled, "execution-data-indexing-enabled", @@ -1751,6 +1757,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.logTxTimeToFinalized, builder.logTxTimeToExecuted, builder.logTxTimeToFinalizedExecuted, + builder.logTxTimeToSealed, ) return nil }) diff --git a/model/flow/transaction_timing.go b/model/flow/transaction_timing.go index 5f2c58812de..3a9da43eee1 100644 --- a/model/flow/transaction_timing.go +++ b/model/flow/transaction_timing.go @@ -10,6 +10,7 @@ type TransactionTiming struct { Received time.Time Finalized time.Time Executed time.Time + Sealed time.Time } func (t TransactionTiming) ID() Identifier { diff --git a/module/metrics.go b/module/metrics.go index f43c8b9325e..9727da98afc 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -1076,6 +1076,10 @@ type TransactionMetrics interface { // works if the transaction was earlier added as received. TransactionFinalized(txID flow.Identifier, when time.Time) + // TransactionSealed reports the time spent between the transaction being received and sealed. Reporting only + // works if the transaction was earlier added as received. + TransactionSealed(txID flow.Identifier, when time.Time) + // TransactionExecuted reports the time spent between the transaction being received and executed. Reporting only // works if the transaction was earlier added as received. TransactionExecuted(txID flow.Identifier, when time.Time) diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 17460bf460a..60d7e17a287 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -214,6 +214,7 @@ func (nc *NoopCollector) ScriptExecutionNotIndexed() func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {} func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {} func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {} +func (nc *NoopCollector) TransactionSealed(txID flow.Identifier, when time.Time) {} func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {} func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {} func (nc *NoopCollector) TransactionValidated() {} diff --git a/module/metrics/transaction.go b/module/metrics/transaction.go index 8bea3e9adea..c079f1f20d7 100644 --- a/module/metrics/transaction.go +++ b/module/metrics/transaction.go @@ -18,9 +18,11 @@ type TransactionCollector struct { logTimeToFinalized bool logTimeToExecuted bool logTimeToFinalizedExecuted bool + logTimeToSealed bool timeToFinalized prometheus.Summary timeToExecuted prometheus.Summary timeToFinalizedExecuted prometheus.Summary + timeToSealed prometheus.Summary transactionSubmission *prometheus.CounterVec transactionSize prometheus.Histogram scriptExecutedDuration *prometheus.HistogramVec @@ -40,6 +42,7 @@ func NewTransactionCollector( logTimeToFinalized bool, logTimeToExecuted bool, logTimeToFinalizedExecuted bool, + logTimeToSealed bool, ) *TransactionCollector { tc := &TransactionCollector{ @@ -48,6 +51,7 @@ func NewTransactionCollector( logTimeToFinalized: logTimeToFinalized, logTimeToExecuted: logTimeToExecuted, logTimeToFinalizedExecuted: logTimeToFinalizedExecuted, + logTimeToSealed: logTimeToSealed, timeToFinalized: promauto.NewSummary(prometheus.SummaryOpts{ Name: "time_to_finalized_seconds", Namespace: namespaceAccess, @@ -91,6 +95,20 @@ func NewTransactionCollector( AgeBuckets: 5, BufCap: 500, }), + timeToSealed: promauto.NewSummary(prometheus.SummaryOpts{ + Name: "time_to_seal_seconds", + Namespace: namespaceAccess, + Subsystem: subsystemTransactionTiming, + Help: "the duration of how long it took between the transaction was received until it was sealed", + Objectives: map[float64]float64{ + 0.01: 0.001, + 0.5: 0.05, + 0.99: 0.001, + }, + MaxAge: 10 * time.Minute, + AgeBuckets: 5, + BufCap: 500, + }), transactionSubmission: promauto.NewCounterVec(prometheus.CounterOpts{ Name: "transaction_submission", Namespace: namespaceAccess, @@ -269,6 +287,27 @@ func (tc *TransactionCollector) TransactionExecuted(txID flow.Identifier, when t } } +func (tc *TransactionCollector) TransactionSealed(txID flow.Identifier, when time.Time) { + t, updated := tc.transactionTimings.Adjust(txID, func(t *flow.TransactionTiming) *flow.TransactionTiming { + t.Sealed = when + return t + }) + + if !updated { + tc.log.Debug(). + Str("transaction_id", txID.String()). + Msg("failed to update TransactionSealed metric") + return + } + + tc.trackTTS(t, tc.logTimeToSealed) + + // remove transaction timing from mempool if finalized and sealed + if !t.Finalized.IsZero() && !t.Sealed.IsZero() { + tc.transactionTimings.Remove(txID) + } +} + func (tc *TransactionCollector) trackTTF(t *flow.TransactionTiming, log bool) { if t.Received.IsZero() || t.Finalized.IsZero() { return @@ -317,6 +356,20 @@ func (tc *TransactionCollector) trackTTFE(t *flow.TransactionTiming, log bool) { } } +func (tc *TransactionCollector) trackTTS(t *flow.TransactionTiming, log bool) { + if t.Received.IsZero() || t.Finalized.IsZero() || t.Sealed.IsZero() { + return + } + duration := t.Sealed.Sub(t.Received).Seconds() + + tc.timeToSealed.Observe(duration) + + if log { + tc.log.Info().Str("transaction_id", t.TransactionID.String()).Float64("duration", duration). + Msg("transaction time to sealed") + } +} + func (tc *TransactionCollector) TransactionSubmissionFailed() { tc.transactionSubmission.WithLabelValues("failed").Inc() } diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 814afbb3325..e7418d31387 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -27,6 +27,11 @@ type CollectionExecutedMetricImpl struct { blocks storage.Blocks } +type TransactionsToBlock struct { + blockId flow.Identifier + transactions []flow.Identifier +} + func NewCollectionExecutedMetricImpl( log zerolog.Logger, accessMetrics module.AccessMetrics, @@ -72,6 +77,7 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { // TODO: lookup actual finalization time by looking at the block finalizing `b` now := time.Now().UTC() blockID := block.ID() + txToBlock := TransactionsToBlock{blockId: blockID} // mark all transactions as finalized // TODO: sample to reduce performance overhead @@ -87,10 +93,25 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { } for _, t := range l.Transactions { + txToBlock.transactions = append(txToBlock.transactions, t) c.accessMetrics.TransactionFinalized(t, now) } } + for _, s := range block.Payload.Seals { + block, err := c.blocks.ByID(s.BlockID) + if err != nil { + c.log.Warn().Err(err).Msg("could not find block") + continue + } + + if block.ID() == txToBlock.blockId && len(txToBlock.transactions) != 0 { + for _, t := range txToBlock.transactions { + c.accessMetrics.TransactionSealed(t, now) + } + } + } + if ti, found := c.blocksToMarkExecuted.ByID(blockID); found { c.blockExecuted(block, ti) c.accessMetrics.UpdateExecutionReceiptMaxHeight(block.Header.Height) From 8fffbb6d1a2526d96127d2df0c69eae1282b9fcf Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 1 Oct 2024 17:00:12 +0300 Subject: [PATCH 02/17] Generated mocks --- module/mock/access_metrics.go | 5 +++++ module/mock/transaction_metrics.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/module/mock/access_metrics.go b/module/mock/access_metrics.go index 21ecc03740f..df3cb8ad8c2 100644 --- a/module/mock/access_metrics.go +++ b/module/mock/access_metrics.go @@ -138,6 +138,11 @@ func (_m *AccessMetrics) TransactionResultFetched(dur time.Duration, size int) { _m.Called(dur, size) } +// TransactionSealed provides a mock function with given fields: txID, when +func (_m *AccessMetrics) TransactionSealed(txID flow.Identifier, when time.Time) { + _m.Called(txID, when) +} + // TransactionSubmissionFailed provides a mock function with given fields: func (_m *AccessMetrics) TransactionSubmissionFailed() { _m.Called() diff --git a/module/mock/transaction_metrics.go b/module/mock/transaction_metrics.go index 9345b934a9a..5e96e52a4ad 100644 --- a/module/mock/transaction_metrics.go +++ b/module/mock/transaction_metrics.go @@ -39,6 +39,11 @@ func (_m *TransactionMetrics) TransactionResultFetched(dur time.Duration, size i _m.Called(dur, size) } +// TransactionSealed provides a mock function with given fields: txID, when +func (_m *TransactionMetrics) TransactionSealed(txID flow.Identifier, when time.Time) { + _m.Called(txID, when) +} + // TransactionSubmissionFailed provides a mock function with given fields: func (_m *TransactionMetrics) TransactionSubmissionFailed() { _m.Called() From 4558746cc923bf98cabc106adb7e796b5a343f4b Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 2 Oct 2024 17:00:54 +0300 Subject: [PATCH 03/17] Fixed failing test --- engine/access/ingestion/engine_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index d93056bd80c..ab885f6bccf 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -277,6 +277,8 @@ func (s *Suite) TestOnFinalizedBlockSingle() { }).Once() } + s.blocks.On("ByID", mock.Anything).Return(block, nil) + // process the block through the finalized callback eng.OnFinalizedBlock(&hotstuffBlock) @@ -335,6 +337,7 @@ func (s *Suite) TestOnFinalizedBlockSeveralBlocksAhead() { // expected all new blocks after last block processed for _, block := range blocks { s.blocks.On("IndexBlockForCollections", block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once() + s.blocks.On("ByID", mock.Anything).Return(&block, nil) for _, cg := range block.Payload.Guarantees { s.request.On("EntityByID", cg.CollectionID, mock.Anything).Return().Run(func(args mock.Arguments) { From 6348f03acb04458d3255e50d5af58993000d380d Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 3 Oct 2024 13:13:23 +0300 Subject: [PATCH 04/17] Renamed struct and id field --- .../indexer/collection_executed_metric.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index e7418d31387..622d53398b8 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -27,8 +27,8 @@ type CollectionExecutedMetricImpl struct { blocks storage.Blocks } -type TransactionsToBlock struct { - blockId flow.Identifier +type BlockTransactions struct { + blockID flow.Identifier transactions []flow.Identifier } @@ -77,7 +77,7 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { // TODO: lookup actual finalization time by looking at the block finalizing `b` now := time.Now().UTC() blockID := block.ID() - txToBlock := TransactionsToBlock{blockId: blockID} + txToBlock := BlockTransactions{blockID: blockID} // mark all transactions as finalized // TODO: sample to reduce performance overhead @@ -105,7 +105,7 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { continue } - if block.ID() == txToBlock.blockId && len(txToBlock.transactions) != 0 { + if block.ID() == txToBlock.blockID && len(txToBlock.transactions) != 0 { for _, t := range txToBlock.transactions { c.accessMetrics.TransactionSealed(t, now) } From 9bd79db4aaca8c27b608a783c2657a8b6f54ce4a Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 3 Oct 2024 13:25:26 +0300 Subject: [PATCH 05/17] Using cached data for if statement --- engine/access/ingestion/engine_test.go | 2 +- .../indexer/collection_executed_metric.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index ab885f6bccf..e32f8a8ae76 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -277,7 +277,7 @@ func (s *Suite) TestOnFinalizedBlockSingle() { }).Once() } - s.blocks.On("ByID", mock.Anything).Return(block, nil) + s.blocks.On("ByID", mock.Anything).Return(&block, nil) // process the block through the finalized callback eng.OnFinalizedBlock(&hotstuffBlock) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 622d53398b8..282a038157d 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -77,7 +77,7 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { // TODO: lookup actual finalization time by looking at the block finalizing `b` now := time.Now().UTC() blockID := block.ID() - txToBlock := BlockTransactions{blockID: blockID} + blockTransactions := BlockTransactions{blockID: blockID} // mark all transactions as finalized // TODO: sample to reduce performance overhead @@ -93,20 +93,20 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { } for _, t := range l.Transactions { - txToBlock.transactions = append(txToBlock.transactions, t) + blockTransactions.transactions = append(blockTransactions.transactions, t) c.accessMetrics.TransactionFinalized(t, now) } } for _, s := range block.Payload.Seals { - block, err := c.blocks.ByID(s.BlockID) + _, err := c.blocks.ByID(s.BlockID) if err != nil { c.log.Warn().Err(err).Msg("could not find block") continue } - if block.ID() == txToBlock.blockID && len(txToBlock.transactions) != 0 { - for _, t := range txToBlock.transactions { + if s.BlockID == blockTransactions.blockID && len(blockTransactions.transactions) != 0 { + for _, t := range blockTransactions.transactions { c.accessMetrics.TransactionSealed(t, now) } } From 0cb72f8742d3952f0f1a4b66a6b6d2b087d493d4 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 15 Oct 2024 12:47:35 +0300 Subject: [PATCH 06/17] Expanded CollectionExecutedMetricImpl with map for sealed metrics --- module/metrics/transaction.go | 6 +-- .../indexer/collection_executed_metric.go | 38 ++++++++++++------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/module/metrics/transaction.go b/module/metrics/transaction.go index c079f1f20d7..474d1ba5fe1 100644 --- a/module/metrics/transaction.go +++ b/module/metrics/transaction.go @@ -302,8 +302,8 @@ func (tc *TransactionCollector) TransactionSealed(txID flow.Identifier, when tim tc.trackTTS(t, tc.logTimeToSealed) - // remove transaction timing from mempool if finalized and sealed - if !t.Finalized.IsZero() && !t.Sealed.IsZero() { + // remove transaction timing from mempool if sealed + if !t.Sealed.IsZero() { tc.transactionTimings.Remove(txID) } } @@ -357,7 +357,7 @@ func (tc *TransactionCollector) trackTTFE(t *flow.TransactionTiming, log bool) { } func (tc *TransactionCollector) trackTTS(t *flow.TransactionTiming, log bool) { - if t.Received.IsZero() || t.Finalized.IsZero() || t.Sealed.IsZero() { + if t.Received.IsZero() || t.Sealed.IsZero() { return } duration := t.Sealed.Sub(t.Received).Seconds() diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 282a038157d..63138b36230 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -25,11 +25,8 @@ type CollectionExecutedMetricImpl struct { collections storage.Collections blocks storage.Blocks -} -type BlockTransactions struct { - blockID flow.Identifier - transactions []flow.Identifier + blockTransactions map[flow.Identifier][]flow.Identifier // Map to track transactions for each block for sealed metrics } func NewCollectionExecutedMetricImpl( @@ -77,7 +74,11 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { // TODO: lookup actual finalization time by looking at the block finalizing `b` now := time.Now().UTC() blockID := block.ID() - blockTransactions := BlockTransactions{blockID: blockID} + + // Initialize the transactions slice for this block ID if it doesn't already exist + if _, exists := c.blockTransactions[blockID]; !exists { + c.blockTransactions[blockID] = []flow.Identifier{} + } // mark all transactions as finalized // TODO: sample to reduce performance overhead @@ -93,21 +94,19 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { } for _, t := range l.Transactions { - blockTransactions.transactions = append(blockTransactions.transactions, t) + c.blockTransactions[blockID] = append(c.blockTransactions[blockID], t) c.accessMetrics.TransactionFinalized(t, now) } } + // Process block seals for _, s := range block.Payload.Seals { - _, err := c.blocks.ByID(s.BlockID) - if err != nil { - c.log.Warn().Err(err).Msg("could not find block") - continue - } - - if s.BlockID == blockTransactions.blockID && len(blockTransactions.transactions) != 0 { - for _, t := range blockTransactions.transactions { + if transactions, found := c.blockTransactions[s.BlockID]; found && len(transactions) != 0 { + for _, t := range transactions { c.accessMetrics.TransactionSealed(t, now) + + // Remove the transaction by transaction ID + c.blockTransactions[s.BlockID] = removeTransactionByID(c.blockTransactions[s.BlockID], t) } } } @@ -119,6 +118,17 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { } } +// Helper function to remove a transaction from the slice by transaction ID +func removeTransactionByID(transactions []flow.Identifier, txID flow.Identifier) []flow.Identifier { + for i, t := range transactions { + if t == txID { + // Remove the transaction by slicing around it + return append(transactions[:i], transactions[i+1:]...) + } + } + return transactions +} + // ExecutionReceiptReceived tracks execution receipt metrics func (c *CollectionExecutedMetricImpl) ExecutionReceiptReceived(r *flow.ExecutionReceipt) { // TODO add actual execution time to execution receipt? From 82abb35fcc16380d2254a12085424f3f1514d6e5 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 15 Oct 2024 13:27:22 +0300 Subject: [PATCH 07/17] Removed ByID mocks and added filed init in constructor --- engine/access/ingestion/engine_test.go | 3 --- .../indexer/collection_executed_metric.go | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index e32f8a8ae76..d93056bd80c 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -277,8 +277,6 @@ func (s *Suite) TestOnFinalizedBlockSingle() { }).Once() } - s.blocks.On("ByID", mock.Anything).Return(&block, nil) - // process the block through the finalized callback eng.OnFinalizedBlock(&hotstuffBlock) @@ -337,7 +335,6 @@ func (s *Suite) TestOnFinalizedBlockSeveralBlocksAhead() { // expected all new blocks after last block processed for _, block := range blocks { s.blocks.On("IndexBlockForCollections", block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once() - s.blocks.On("ByID", mock.Anything).Return(&block, nil) for _, cg := range block.Payload.Guarantees { s.request.On("EntityByID", cg.CollectionID, mock.Anything).Return().Run(func(args mock.Arguments) { diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 63138b36230..ba7f503aa7a 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -46,6 +46,7 @@ func NewCollectionExecutedMetricImpl( blocksToMarkExecuted: blocksToMarkExecuted, collections: collections, blocks: blocks, + blockTransactions: make(map[flow.Identifier][]flow.Identifier), }, nil } From 2401406ecbec3fd04aef642c121806af348c6de8 Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 16 Oct 2024 12:57:24 +0300 Subject: [PATCH 08/17] Added mutex for sealed metrics --- .../indexer/collection_executed_metric.go | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index ba7f503aa7a..caeb07a52db 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -2,6 +2,7 @@ package indexer import ( "errors" + "sync" "time" "github.com/rs/zerolog" @@ -27,6 +28,7 @@ type CollectionExecutedMetricImpl struct { blocks storage.Blocks blockTransactions map[flow.Identifier][]flow.Identifier // Map to track transactions for each block for sealed metrics + mutex sync.RWMutex } func NewCollectionExecutedMetricImpl( @@ -76,11 +78,6 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { now := time.Now().UTC() blockID := block.ID() - // Initialize the transactions slice for this block ID if it doesn't already exist - if _, exists := c.blockTransactions[blockID]; !exists { - c.blockTransactions[blockID] = []flow.Identifier{} - } - // mark all transactions as finalized // TODO: sample to reduce performance overhead for _, g := range block.Payload.Guarantees { @@ -94,21 +91,32 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { continue } + c.mutex.Lock() for _, t := range l.Transactions { c.blockTransactions[blockID] = append(c.blockTransactions[blockID], t) c.accessMetrics.TransactionFinalized(t, now) } + c.mutex.Unlock() } // Process block seals for _, s := range block.Payload.Seals { - if transactions, found := c.blockTransactions[s.BlockID]; found && len(transactions) != 0 { + c.mutex.RLock() + transactions, found := c.blockTransactions[s.BlockID] + c.mutex.RUnlock() // release the read lock after reading + + if found && len(transactions) != 0 { + // Lock for writing since we will modify the transactions list + c.mutex.Lock() + // Process all transactions for _, t := range transactions { c.accessMetrics.TransactionSealed(t, now) - - // Remove the transaction by transaction ID - c.blockTransactions[s.BlockID] = removeTransactionByID(c.blockTransactions[s.BlockID], t) } + + // Remove the entire block of transactions once processed + delete(c.blockTransactions, s.BlockID) + // Unlock after modifications are done + c.mutex.Unlock() } } @@ -119,17 +127,6 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { } } -// Helper function to remove a transaction from the slice by transaction ID -func removeTransactionByID(transactions []flow.Identifier, txID flow.Identifier) []flow.Identifier { - for i, t := range transactions { - if t == txID { - // Remove the transaction by slicing around it - return append(transactions[:i], transactions[i+1:]...) - } - } - return transactions -} - // ExecutionReceiptReceived tracks execution receipt metrics func (c *CollectionExecutedMetricImpl) ExecutionReceiptReceived(r *flow.ExecutionReceipt) { // TODO add actual execution time to execution receipt? From 2ebce19382ffb676de2ba83696ddc0e6392a12ec Mon Sep 17 00:00:00 2001 From: Andrii Diachuk Date: Fri, 18 Oct 2024 12:48:16 +0300 Subject: [PATCH 09/17] Update module/state_synchronization/indexer/collection_executed_metric.go Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com> --- .../indexer/collection_executed_metric.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index caeb07a52db..0e3e3af02b6 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -106,16 +106,12 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { c.mutex.RUnlock() // release the read lock after reading if found && len(transactions) != 0 { - // Lock for writing since we will modify the transactions list - c.mutex.Lock() - // Process all transactions for _, t := range transactions { c.accessMetrics.TransactionSealed(t, now) } - // Remove the entire block of transactions once processed + c.mutex.Lock() delete(c.blockTransactions, s.BlockID) - // Unlock after modifications are done c.mutex.Unlock() } } From b76f46b35db7dd670ec1fc1cdd03edbb9836158e Mon Sep 17 00:00:00 2001 From: Andrii Diachuk Date: Fri, 18 Oct 2024 12:48:24 +0300 Subject: [PATCH 10/17] Update module/state_synchronization/indexer/collection_executed_metric.go Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com> --- .../indexer/collection_executed_metric.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 0e3e3af02b6..a048ce8f94c 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -91,11 +91,11 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { continue } - c.mutex.Lock() for _, t := range l.Transactions { - c.blockTransactions[blockID] = append(c.blockTransactions[blockID], t) c.accessMetrics.TransactionFinalized(t, now) } + c.mutex.Lock() + c.blockTransactions[blockID] = l.Transactions c.mutex.Unlock() } From b3a1c844b7a4b8e1dcba63660bf1293a0b17f87d Mon Sep 17 00:00:00 2001 From: Andrii Diachuk Date: Fri, 18 Oct 2024 12:48:49 +0300 Subject: [PATCH 11/17] Update module/state_synchronization/indexer/collection_executed_metric.go Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com> --- .../state_synchronization/indexer/collection_executed_metric.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index a048ce8f94c..051e8159c08 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -105,7 +105,7 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { transactions, found := c.blockTransactions[s.BlockID] c.mutex.RUnlock() // release the read lock after reading - if found && len(transactions) != 0 { + if found { for _, t := range transactions { c.accessMetrics.TransactionSealed(t, now) } From 70b82e6a333cb3062c29b3a85c7f9c542a4a62f5 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 18 Oct 2024 12:56:59 +0300 Subject: [PATCH 12/17] removed comment --- .../indexer/collection_executed_metric.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 051e8159c08..7053ea99dcd 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -28,7 +28,7 @@ type CollectionExecutedMetricImpl struct { blocks storage.Blocks blockTransactions map[flow.Identifier][]flow.Identifier // Map to track transactions for each block for sealed metrics - mutex sync.RWMutex + mu sync.RWMutex } func NewCollectionExecutedMetricImpl( @@ -94,25 +94,25 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { for _, t := range l.Transactions { c.accessMetrics.TransactionFinalized(t, now) } - c.mutex.Lock() + c.mu.Lock() c.blockTransactions[blockID] = l.Transactions - c.mutex.Unlock() + c.mu.Unlock() } // Process block seals for _, s := range block.Payload.Seals { - c.mutex.RLock() + c.mu.RLock() transactions, found := c.blockTransactions[s.BlockID] - c.mutex.RUnlock() // release the read lock after reading + c.mu.RUnlock() if found { for _, t := range transactions { c.accessMetrics.TransactionSealed(t, now) } - c.mutex.Lock() + c.mu.Lock() delete(c.blockTransactions, s.BlockID) - c.mutex.Unlock() + c.mu.Unlock() } } From bf4d8e826a55e5c4bd2be1ba79ca24ea07d8b9d9 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 18 Oct 2024 14:34:00 +0300 Subject: [PATCH 13/17] Using IdentifierMap instead of map --- .../indexer/collection_executed_metric.go | 30 +++++++++---------- .../indexer/indexer_core.go | 2 +- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 7053ea99dcd..0c8493153f8 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -2,7 +2,6 @@ package indexer import ( "errors" - "sync" "time" "github.com/rs/zerolog" @@ -27,8 +26,7 @@ type CollectionExecutedMetricImpl struct { collections storage.Collections blocks storage.Blocks - blockTransactions map[flow.Identifier][]flow.Identifier // Map to track transactions for each block for sealed metrics - mu sync.RWMutex + blockTransactions *stdmap.IdentifierMap // Map to track transactions for each block for sealed metrics } func NewCollectionExecutedMetricImpl( @@ -40,7 +38,7 @@ func NewCollectionExecutedMetricImpl( collections storage.Collections, blocks storage.Blocks, ) (*CollectionExecutedMetricImpl, error) { - return &CollectionExecutedMetricImpl{ + collectionExecutedMetricImpl := &CollectionExecutedMetricImpl{ log: log, accessMetrics: accessMetrics, collectionsToMarkFinalized: collectionsToMarkFinalized, @@ -48,8 +46,15 @@ func NewCollectionExecutedMetricImpl( blocksToMarkExecuted: blocksToMarkExecuted, collections: collections, blocks: blocks, - blockTransactions: make(map[flow.Identifier][]flow.Identifier), - }, nil + } + + var err error + collectionExecutedMetricImpl.blockTransactions, err = stdmap.NewIdentifierMap(100) + if err != nil { + return nil, err + } + + return collectionExecutedMetricImpl, nil } // CollectionFinalized tracks collections to mark finalized @@ -93,26 +98,19 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { for _, t := range l.Transactions { c.accessMetrics.TransactionFinalized(t, now) + c.blockTransactions.Append(blockID, t) } - c.mu.Lock() - c.blockTransactions[blockID] = l.Transactions - c.mu.Unlock() } // Process block seals for _, s := range block.Payload.Seals { - c.mu.RLock() - transactions, found := c.blockTransactions[s.BlockID] - c.mu.RUnlock() + transactions, found := c.blockTransactions.Get(s.BlockID) if found { for _, t := range transactions { c.accessMetrics.TransactionSealed(t, now) } - - c.mu.Lock() - delete(c.blockTransactions, s.BlockID) - c.mu.Unlock() + c.blockTransactions.Remove(s.BlockID) } } diff --git a/module/state_synchronization/indexer/indexer_core.go b/module/state_synchronization/indexer/indexer_core.go index aede5d6ac4f..9f15cc05e34 100644 --- a/module/state_synchronization/indexer/indexer_core.go +++ b/module/state_synchronization/indexer/indexer_core.go @@ -325,7 +325,7 @@ func (c *IndexerCore) indexRegisters(registers map[ledger.Path]*ledger.Payload, return c.registers.Store(regEntries, height) } -// HandleCollection handles the response of the a collection request made earlier when a block was received. +// HandleCollection handles the response of the collection request made earlier when a block was received. // No errors expected during normal operations. func HandleCollection( collection *flow.Collection, From ef8d3afc8185f269605c2bbde10f9417cc1347b1 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 18 Oct 2024 15:03:16 +0300 Subject: [PATCH 14/17] Added err handling --- .../indexer/collection_executed_metric.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 0c8493153f8..554934caf03 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -98,7 +98,12 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) { for _, t := range l.Transactions { c.accessMetrics.TransactionFinalized(t, now) - c.blockTransactions.Append(blockID, t) + err = c.blockTransactions.Append(blockID, t) + + if err != nil { + c.log.Warn().Err(err).Msg("could not append finalized tx to track sealed transactions") + continue + } } } From 7630bf91d7893a45b8f6ecfd83320d816b70b124 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 18 Oct 2024 16:35:20 +0300 Subject: [PATCH 15/17] Adding transactions also in Collection finilized method --- .../indexer/collection_executed_metric.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 554934caf03..3582648d0d3 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -62,6 +62,18 @@ func (c *CollectionExecutedMetricImpl) CollectionFinalized(light flow.LightColle if ti, found := c.collectionsToMarkFinalized.ByID(light.ID()); found { for _, t := range light.Transactions { c.accessMetrics.TransactionFinalized(t, ti) + + block, err := c.blocks.ByCollectionID(light.ID()) + if err != nil { + c.log.Warn().Err(err).Msg("could not find block by collection ID") + continue + } + + err = c.blockTransactions.Append(block.ID(), t) + if err != nil { + c.log.Warn().Err(err).Msg("could not append finalized tx to track sealed transactions") + continue + } } c.collectionsToMarkFinalized.Remove(light.ID()) } From 61695dd300aa984c73d911847c728d1dd445b406 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 21 Oct 2024 15:48:47 +0300 Subject: [PATCH 16/17] Noved lookup from the loop, refactored --- .../node_builder/access_node_builder.go | 7 ++++ engine/access/access_test.go | 9 +++++ engine/access/ingestion/engine_test.go | 3 ++ .../indexer/collection_executed_metric.go | 35 +++++++++---------- .../indexer/indexer_core_test.go | 3 ++ 5 files changed, 38 insertions(+), 19 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index f80568a94ba..b8b6eab11de 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -304,6 +304,7 @@ type FlowAccessNodeBuilder struct { CollectionsToMarkFinalized *stdmap.Times CollectionsToMarkExecuted *stdmap.Times BlocksToMarkExecuted *stdmap.Times + BlockTransactions *stdmap.IdentifierMap TransactionMetrics *metrics.TransactionCollector TransactionValidationMetrics *metrics.TransactionValidationCollector RestMetrics *metrics.RestCollector @@ -1683,6 +1684,11 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return err } + builder.BlockTransactions, err = stdmap.NewIdentifierMap(100) + if err != nil { + return err + } + builder.BlocksToMarkExecuted, err = stdmap.NewTimes(1 * 300) // assume 1 block per second * 300 seconds return err @@ -1729,6 +1735,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.BlocksToMarkExecuted, builder.Storage.Collections, builder.Storage.Blocks, + builder.BlockTransactions, ) if err != nil { return err diff --git a/engine/access/access_test.go b/engine/access/access_test.go index d7ea12e613c..6999a74d8bb 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -642,6 +642,8 @@ func (suite *Suite) TestGetSealedTransaction() { require.NoError(suite.T(), err) blocksToMarkExecuted, err := stdmap.NewTimes(100) require.NoError(suite.T(), err) + blockTransactions, err := stdmap.NewIdentifierMap(100) + require.NoError(suite.T(), err) bnd, err := backend.New(backend.Params{State: suite.state, CollectionRPC: suite.collClient, @@ -674,6 +676,7 @@ func (suite *Suite) TestGetSealedTransaction() { blocksToMarkExecuted, collections, all.Blocks, + blockTransactions, ) require.NoError(suite.T(), err) @@ -804,6 +807,8 @@ func (suite *Suite) TestGetTransactionResult() { require.NoError(suite.T(), err) blocksToMarkExecuted, err := stdmap.NewTimes(100) require.NoError(suite.T(), err) + blockTransactions, err := stdmap.NewIdentifierMap(100) + require.NoError(suite.T(), err) bnd, err := backend.New(backend.Params{State: suite.state, CollectionRPC: suite.collClient, @@ -836,6 +841,7 @@ func (suite *Suite) TestGetTransactionResult() { blocksToMarkExecuted, collections, all.Blocks, + blockTransactions, ) require.NoError(suite.T(), err) @@ -1052,6 +1058,8 @@ func (suite *Suite) TestExecuteScript() { require.NoError(suite.T(), err) blocksToMarkExecuted, err := stdmap.NewTimes(100) require.NoError(suite.T(), err) + blockTransactions, err := stdmap.NewIdentifierMap(100) + require.NoError(suite.T(), err) collectionExecutedMetric, err := indexer.NewCollectionExecutedMetricImpl( suite.log, @@ -1061,6 +1069,7 @@ func (suite *Suite) TestExecuteScript() { blocksToMarkExecuted, collections, all.Blocks, + blockTransactions, ) require.NoError(suite.T(), err) diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index d93056bd80c..4768e93bc3a 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -133,6 +133,8 @@ func (s *Suite) SetupTest() { require.NoError(s.T(), err) blocksToMarkExecuted, err := stdmap.NewTimes(100) require.NoError(s.T(), err) + blockTransactions, err := stdmap.NewIdentifierMap(100) + require.NoError(s.T(), err) s.proto.state.On("Identity").Return(s.obsIdentity, nil) s.proto.state.On("Params").Return(s.proto.params) @@ -174,6 +176,7 @@ func (s *Suite) SetupTest() { blocksToMarkExecuted, s.collections, s.blocks, + blockTransactions, ) require.NoError(s.T(), err) } diff --git a/module/state_synchronization/indexer/collection_executed_metric.go b/module/state_synchronization/indexer/collection_executed_metric.go index 3582648d0d3..bc1ee3fd341 100644 --- a/module/state_synchronization/indexer/collection_executed_metric.go +++ b/module/state_synchronization/indexer/collection_executed_metric.go @@ -37,8 +37,9 @@ func NewCollectionExecutedMetricImpl( blocksToMarkExecuted *stdmap.Times, collections storage.Collections, blocks storage.Blocks, + blockTransactions *stdmap.IdentifierMap, ) (*CollectionExecutedMetricImpl, error) { - collectionExecutedMetricImpl := &CollectionExecutedMetricImpl{ + return &CollectionExecutedMetricImpl{ log: log, accessMetrics: accessMetrics, collectionsToMarkFinalized: collectionsToMarkFinalized, @@ -46,36 +47,32 @@ func NewCollectionExecutedMetricImpl( blocksToMarkExecuted: blocksToMarkExecuted, collections: collections, blocks: blocks, - } - - var err error - collectionExecutedMetricImpl.blockTransactions, err = stdmap.NewIdentifierMap(100) - if err != nil { - return nil, err - } - - return collectionExecutedMetricImpl, nil + blockTransactions: blockTransactions, + }, nil } // CollectionFinalized tracks collections to mark finalized func (c *CollectionExecutedMetricImpl) CollectionFinalized(light flow.LightCollection) { - if ti, found := c.collectionsToMarkFinalized.ByID(light.ID()); found { + lightID := light.ID() + if ti, found := c.collectionsToMarkFinalized.ByID(lightID); found { + + block, err := c.blocks.ByCollectionID(lightID) + if err != nil { + c.log.Warn().Err(err).Msg("could not find block by collection ID") + return + } + blockID := block.ID() + for _, t := range light.Transactions { c.accessMetrics.TransactionFinalized(t, ti) - block, err := c.blocks.ByCollectionID(light.ID()) - if err != nil { - c.log.Warn().Err(err).Msg("could not find block by collection ID") - continue - } - - err = c.blockTransactions.Append(block.ID(), t) + err = c.blockTransactions.Append(blockID, t) if err != nil { c.log.Warn().Err(err).Msg("could not append finalized tx to track sealed transactions") continue } } - c.collectionsToMarkFinalized.Remove(light.ID()) + c.collectionsToMarkFinalized.Remove(lightID) } } diff --git a/module/state_synchronization/indexer/indexer_core_test.go b/module/state_synchronization/indexer/indexer_core_test.go index 5fd93d4b824..f446c0740a0 100644 --- a/module/state_synchronization/indexer/indexer_core_test.go +++ b/module/state_synchronization/indexer/indexer_core_test.go @@ -197,6 +197,8 @@ func (i *indexCoreTest) initIndexer() *indexCoreTest { require.NoError(i.t, err) blocksToMarkExecuted, err := stdmap.NewTimes(100) require.NoError(i.t, err) + blockTransactions, err := stdmap.NewIdentifierMap(100) + require.NoError(i.t, err) log := zerolog.New(os.Stdout) blocks := storagemock.NewBlocks(i.t) @@ -209,6 +211,7 @@ func (i *indexCoreTest) initIndexer() *indexCoreTest { blocksToMarkExecuted, i.collections, blocks, + blockTransactions, ) require.NoError(i.t, err) From 738324e98cfe4e08af116703cdd50e6eac0ad49c Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 24 Oct 2024 12:30:00 +0300 Subject: [PATCH 17/17] Update cmd/access/node_builder/access_node_builder.go Co-authored-by: Leo Zhang --- cmd/access/node_builder/access_node_builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index b8b6eab11de..01b793e7edc 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1684,7 +1684,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return err } - builder.BlockTransactions, err = stdmap.NewIdentifierMap(100) + builder.BlockTransactions, err = stdmap.NewIdentifierMap(10000) if err != nil { return err }