Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Time To Seal metric to access node to track time it takes to seal a transaction #6512

Merged
merged 26 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
def498c
Added TTS metrics
AndriiDiachuk Oct 1, 2024
8fffbb6
Generated mocks
AndriiDiachuk Oct 1, 2024
90addfd
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 1, 2024
9f4feb5
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 2, 2024
4558746
Fixed failing test
AndriiDiachuk Oct 2, 2024
41b0fad
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into add-ti…
AndriiDiachuk Oct 2, 2024
6348f03
Renamed struct and id field
AndriiDiachuk Oct 3, 2024
9bd79db
Using cached data for if statement
AndriiDiachuk Oct 3, 2024
a4e8537
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into add-ti…
AndriiDiachuk Oct 14, 2024
0cb72f8
Expanded CollectionExecutedMetricImpl with map for sealed metrics
AndriiDiachuk Oct 15, 2024
82abb35
Removed ByID mocks and added filed init in constructor
AndriiDiachuk Oct 15, 2024
a2c12e7
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 16, 2024
2401406
Added mutex for sealed metrics
AndriiDiachuk Oct 16, 2024
944451f
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 18, 2024
2ebce19
Update module/state_synchronization/indexer/collection_executed_metri…
AndriiDiachuk Oct 18, 2024
b76f46b
Update module/state_synchronization/indexer/collection_executed_metri…
AndriiDiachuk Oct 18, 2024
b3a1c84
Update module/state_synchronization/indexer/collection_executed_metri…
AndriiDiachuk Oct 18, 2024
70b82e6
removed comment
AndriiDiachuk Oct 18, 2024
bf4d8e8
Using IdentifierMap instead of map
AndriiDiachuk Oct 18, 2024
ef8d3af
Added err handling
AndriiDiachuk Oct 18, 2024
7630bf9
Adding transactions also in Collection finilized method
AndriiDiachuk Oct 18, 2024
61695dd
Noved lookup from the loop, refactored
AndriiDiachuk Oct 21, 2024
8523e65
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 21, 2024
e664d09
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 22, 2024
738324e
Update cmd/access/node_builder/access_node_builder.go
Guitarheroua Oct 24, 2024
e316783
Merge branch 'master' into add-time-to-seal-metrics
Guitarheroua Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type AccessNodeConfig struct {
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
Expand Down Expand Up @@ -243,6 +244,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
logTxTimeToFinalizedExecuted: false,
logTxTimeToSealed: false,
pingEnabled: false,
retryEnabled: false,
rpcMetricsEnabled: false,
Expand Down Expand Up @@ -304,6 +306,7 @@ type FlowAccessNodeBuilder struct {
CollectionsToMarkFinalized *stdmap.Times
CollectionsToMarkExecuted *stdmap.Times
BlocksToMarkExecuted *stdmap.Times
BlockTransactions *stdmap.IdentifierMap
TransactionMetrics *metrics.TransactionCollector
TransactionValidationMetrics *metrics.TransactionValidationCollector
RestMetrics *metrics.RestCollector
Expand Down Expand Up @@ -1239,6 +1242,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"log-tx-time-to-finalized-executed",
defaultConfig.logTxTimeToFinalizedExecuted,
"log transaction time to finalized and executed")
flags.BoolVar(&builder.logTxTimeToSealed,
"log-tx-time-to-sealed",
defaultConfig.logTxTimeToSealed,
"log transaction time to sealed")
flags.BoolVar(&builder.pingEnabled,
"ping-enabled",
defaultConfig.pingEnabled,
Expand Down Expand Up @@ -1682,6 +1689,11 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return err
}

builder.BlockTransactions, err = stdmap.NewIdentifierMap(10000)
if err != nil {
return err
}

builder.BlocksToMarkExecuted, err = stdmap.NewTimes(1 * 300) // assume 1 block per second * 300 seconds

return err
Expand All @@ -1693,6 +1705,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.logTxTimeToFinalized,
builder.logTxTimeToExecuted,
builder.logTxTimeToFinalizedExecuted,
builder.logTxTimeToSealed,
)
return nil
}).
Expand Down Expand Up @@ -1727,6 +1740,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.BlocksToMarkExecuted,
builder.Storage.Collections,
builder.Storage.Blocks,
builder.BlockTransactions,
)
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type ObserverServiceConfig struct {
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
executionDataSyncEnabled bool
executionDataIndexingEnabled bool
executionDataDBMode string
Expand Down Expand Up @@ -222,6 +223,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
logTxTimeToFinalizedExecuted: false,
logTxTimeToSealed: false,
executionDataSyncEnabled: false,
executionDataIndexingEnabled: false,
executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(),
Expand Down Expand Up @@ -658,6 +660,10 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"log-tx-time-to-finalized-executed",
defaultConfig.logTxTimeToFinalizedExecuted,
"log transaction time to finalized and executed")
flags.BoolVar(&builder.logTxTimeToSealed,
"log-tx-time-to-sealed",
defaultConfig.logTxTimeToSealed,
"log transaction time to sealed")
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
flags.BoolVar(&builder.executionDataIndexingEnabled,
"execution-data-indexing-enabled",
Expand Down Expand Up @@ -1670,6 +1676,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.logTxTimeToFinalized,
builder.logTxTimeToExecuted,
builder.logTxTimeToFinalizedExecuted,
builder.logTxTimeToSealed,
)
return nil
})
Expand Down
9 changes: 9 additions & 0 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ func (suite *Suite) TestGetSealedTransaction() {
require.NoError(suite.T(), err)
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(suite.T(), err)

bnd, err := backend.New(backend.Params{State: suite.state,
CollectionRPC: suite.collClient,
Expand Down Expand Up @@ -672,6 +674,7 @@ func (suite *Suite) TestGetSealedTransaction() {
blocksToMarkExecuted,
collections,
all.Blocks,
blockTransactions,
)
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -820,6 +823,8 @@ func (suite *Suite) TestGetTransactionResult() {
require.NoError(suite.T(), err)
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(suite.T(), err)

bnd, err := backend.New(backend.Params{State: suite.state,
CollectionRPC: suite.collClient,
Expand Down Expand Up @@ -851,6 +856,7 @@ func (suite *Suite) TestGetTransactionResult() {
blocksToMarkExecuted,
collections,
all.Blocks,
blockTransactions,
)
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -1083,6 +1089,8 @@ func (suite *Suite) TestExecuteScript() {
require.NoError(suite.T(), err)
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(suite.T(), err)

collectionExecutedMetric, err := indexer.NewCollectionExecutedMetricImpl(
suite.log,
Expand All @@ -1092,6 +1100,7 @@ func (suite *Suite) TestExecuteScript() {
blocksToMarkExecuted,
collections,
all.Blocks,
blockTransactions,
)
require.NoError(suite.T(), err)

Expand Down
3 changes: 3 additions & 0 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (s *Suite) SetupTest() {
require.NoError(s.T(), err)
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(s.T(), err)
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(s.T(), err)

s.proto.state.On("Identity").Return(s.obsIdentity, nil)
s.proto.state.On("Params").Return(s.proto.params)
Expand Down Expand Up @@ -177,6 +179,7 @@ func (s *Suite) SetupTest() {
blocksToMarkExecuted,
s.collections,
s.blocks,
blockTransactions,
)
require.NoError(s.T(), err)
}
Expand Down
1 change: 1 addition & 0 deletions model/flow/transaction_timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type TransactionTiming struct {
Received time.Time
Finalized time.Time
Executed time.Time
Sealed time.Time
}

func (t TransactionTiming) ID() Identifier {
Expand Down
4 changes: 4 additions & 0 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,10 @@ type TransactionMetrics interface {
// works if the transaction was earlier added as received.
TransactionFinalized(txID flow.Identifier, when time.Time)

// TransactionSealed reports the time spent between the transaction being received and sealed. Reporting only
// works if the transaction was earlier added as received.
TransactionSealed(txID flow.Identifier, when time.Time)

// TransactionExecuted reports the time spent between the transaction being received and executed. Reporting only
// works if the transaction was earlier added as received.
TransactionExecuted(txID flow.Identifier, when time.Time)
Expand Down
1 change: 1 addition & 0 deletions module/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (nc *NoopCollector) ScriptExecutionNotIndexed()
func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {}
func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionSealed(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {}
func (nc *NoopCollector) TransactionValidated() {}
Expand Down
53 changes: 53 additions & 0 deletions module/metrics/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ type TransactionCollector struct {
logTimeToFinalized bool
logTimeToExecuted bool
logTimeToFinalizedExecuted bool
logTimeToSealed bool
timeToFinalized prometheus.Summary
timeToExecuted prometheus.Summary
timeToFinalizedExecuted prometheus.Summary
timeToSealed prometheus.Summary
transactionSubmission *prometheus.CounterVec
transactionSize prometheus.Histogram
scriptExecutedDuration *prometheus.HistogramVec
Expand All @@ -40,6 +42,7 @@ func NewTransactionCollector(
logTimeToFinalized bool,
logTimeToExecuted bool,
logTimeToFinalizedExecuted bool,
logTimeToSealed bool,
) *TransactionCollector {

tc := &TransactionCollector{
Expand All @@ -48,6 +51,7 @@ func NewTransactionCollector(
logTimeToFinalized: logTimeToFinalized,
logTimeToExecuted: logTimeToExecuted,
logTimeToFinalizedExecuted: logTimeToFinalizedExecuted,
logTimeToSealed: logTimeToSealed,
timeToFinalized: promauto.NewSummary(prometheus.SummaryOpts{
Name: "time_to_finalized_seconds",
Namespace: namespaceAccess,
Expand Down Expand Up @@ -91,6 +95,20 @@ func NewTransactionCollector(
AgeBuckets: 5,
BufCap: 500,
}),
timeToSealed: promauto.NewSummary(prometheus.SummaryOpts{
Name: "time_to_seal_seconds",
Namespace: namespaceAccess,
Subsystem: subsystemTransactionTiming,
Help: "the duration of how long it took between the transaction was received until it was sealed",
Objectives: map[float64]float64{
0.01: 0.001,
0.5: 0.05,
0.99: 0.001,
},
MaxAge: 10 * time.Minute,
AgeBuckets: 5,
BufCap: 500,
}),
transactionSubmission: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "transaction_submission",
Namespace: namespaceAccess,
Expand Down Expand Up @@ -269,6 +287,27 @@ func (tc *TransactionCollector) TransactionExecuted(txID flow.Identifier, when t
}
}

func (tc *TransactionCollector) TransactionSealed(txID flow.Identifier, when time.Time) {
t, updated := tc.transactionTimings.Adjust(txID, func(t *flow.TransactionTiming) *flow.TransactionTiming {
t.Sealed = when
return t
})

if !updated {
tc.log.Debug().
Str("transaction_id", txID.String()).
Msg("failed to update TransactionSealed metric")
return
}

tc.trackTTS(t, tc.logTimeToSealed)

// remove transaction timing from mempool if sealed
if !t.Sealed.IsZero() {
tc.transactionTimings.Remove(txID)
}
}

func (tc *TransactionCollector) trackTTF(t *flow.TransactionTiming, log bool) {
if t.Received.IsZero() || t.Finalized.IsZero() {
return
Expand Down Expand Up @@ -317,6 +356,20 @@ func (tc *TransactionCollector) trackTTFE(t *flow.TransactionTiming, log bool) {
}
}

func (tc *TransactionCollector) trackTTS(t *flow.TransactionTiming, log bool) {
if t.Received.IsZero() || t.Sealed.IsZero() {
return
}
duration := t.Sealed.Sub(t.Received).Seconds()

tc.timeToSealed.Observe(duration)

if log {
tc.log.Info().Str("transaction_id", t.TransactionID.String()).Float64("duration", duration).
Msg("transaction time to sealed")
}
}

func (tc *TransactionCollector) TransactionSubmissionFailed() {
tc.transactionSubmission.WithLabelValues("failed").Inc()
}
Expand Down
5 changes: 5 additions & 0 deletions module/mock/access_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions module/mock/transaction_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type CollectionExecutedMetricImpl struct {

collections storage.Collections
blocks storage.Blocks

blockTransactions *stdmap.IdentifierMap // Map to track transactions for each block for sealed metrics
}

func NewCollectionExecutedMetricImpl(
Expand All @@ -35,6 +37,7 @@ func NewCollectionExecutedMetricImpl(
blocksToMarkExecuted *stdmap.Times,
collections storage.Collections,
blocks storage.Blocks,
blockTransactions *stdmap.IdentifierMap,
) (*CollectionExecutedMetricImpl, error) {
return &CollectionExecutedMetricImpl{
log: log,
Expand All @@ -44,16 +47,32 @@ func NewCollectionExecutedMetricImpl(
blocksToMarkExecuted: blocksToMarkExecuted,
collections: collections,
blocks: blocks,
blockTransactions: blockTransactions,
}, nil
}

// CollectionFinalized tracks collections to mark finalized
func (c *CollectionExecutedMetricImpl) CollectionFinalized(light flow.LightCollection) {
if ti, found := c.collectionsToMarkFinalized.ByID(light.ID()); found {
lightID := light.ID()
if ti, found := c.collectionsToMarkFinalized.ByID(lightID); found {

block, err := c.blocks.ByCollectionID(lightID)
if err != nil {
c.log.Warn().Err(err).Msg("could not find block by collection ID")
return
}
blockID := block.ID()

for _, t := range light.Transactions {
c.accessMetrics.TransactionFinalized(t, ti)

err = c.blockTransactions.Append(blockID, t)
if err != nil {
c.log.Warn().Err(err).Msg("could not append finalized tx to track sealed transactions")
continue
}
}
c.collectionsToMarkFinalized.Remove(light.ID())
c.collectionsToMarkFinalized.Remove(lightID)
}
}

Expand Down Expand Up @@ -88,6 +107,24 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) {

for _, t := range l.Transactions {
c.accessMetrics.TransactionFinalized(t, now)
err = c.blockTransactions.Append(blockID, t)

if err != nil {
c.log.Warn().Err(err).Msg("could not append finalized tx to track sealed transactions")
continue
}
}
}

// Process block seals
for _, s := range block.Payload.Seals {
transactions, found := c.blockTransactions.Get(s.BlockID)

if found {
for _, t := range transactions {
c.accessMetrics.TransactionSealed(t, now)
}
c.blockTransactions.Remove(s.BlockID)
}
}

Expand Down
Loading
Loading