Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Refactor executionNodesForBlockID by encapsulating its parameters into a struct #6499

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
91773f3
Created ExecutionNodeIdentitiesProvider, refactored ExecutionNodesFor…
UlyanaAndrukhiv Sep 25, 2024
a3b1fed
Moved ExecutionNodeIdentitiesProvider to node builder, updated tests
UlyanaAndrukhiv Sep 25, 2024
69ea579
Renamed field to execNodeIdentitiesProvider
UlyanaAndrukhiv Sep 25, 2024
4df1b76
Merged
UlyanaAndrukhiv Sep 25, 2024
010492e
Removed unused storages in test
UlyanaAndrukhiv Sep 30, 2024
5544cb4
Merge branch 'UlyanaAndrukhiv/6302-store-transaction-result-error-mes…
UlyanaAndrukhiv Oct 1, 2024
cce8ebd
Added testing db Exists
UlyanaAndrukhiv Oct 1, 2024
039366c
Merged with UlyanaAndrukhiv/6302-store-transaction-result-error-messages
UlyanaAndrukhiv Oct 3, 2024
bd5fe3a
Merged with master
UlyanaAndrukhiv Oct 3, 2024
ccbc4bb
Merge branch 'master' into UlyanaAndrukhiv/6497-refactor-executionNod…
UlyanaAndrukhiv Oct 7, 2024
38467d5
Merge branch 'master' into UlyanaAndrukhiv/6497-refactor-executionNod…
UlyanaAndrukhiv Oct 8, 2024
9d5211c
Merged with UlyanaAndrukhiv/6302-store-transaction-result-error-mess…
UlyanaAndrukhiv Oct 8, 2024
8901f9e
Merge branch 'master' into UlyanaAndrukhiv/6497-refactor-executionNod…
UlyanaAndrukhiv Oct 14, 2024
b728d5e
Merge branch 'master' into UlyanaAndrukhiv/6497-refactor-executionNod…
UlyanaAndrukhiv Oct 22, 2024
8dbf63c
Merged with UlyanaAndrukhiv/6302-store-transaction-result-error-messages
UlyanaAndrukhiv Oct 22, 2024
95bbc46
Merged with master
UlyanaAndrukhiv Oct 24, 2024
54ffb07
Updated godoc and tests
UlyanaAndrukhiv Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 52 additions & 47 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ type FlowAccessNodeBuilder struct {
stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend

TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider
TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -1956,46 +1957,63 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

}

preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

builder.ExecNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider(
node.Logger,
node.State,
node.Storage.Receipts,
preferredENIdentifiers,
fixedENIdentifiers,
)

builder.nodeBackend, err = backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
RetryEnabled: builder.retryEnabled,
MaxHeightRange: backendConfig.MaxHeightRange,
PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs,
FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
CheckPayerBalanceMode: checkPayerBalanceMode,
EventQueryMode: eventQueryMode,
BlockTracker: blockTracker,
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
RetryEnabled: builder.retryEnabled,
MaxHeightRange: backendConfig.MaxHeightRange,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
CheckPayerBalanceMode: checkPayerBalanceMode,
EventQueryMode: eventQueryMode,
BlockTracker: blockTracker,
SubscriptionHandler: subscription.NewSubscriptionHandler(
builder.Logger,
broadcaster,
builder.stateStreamConf.ClientSendTimeout,
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
LastFullBlockHeight: lastFullBlockHeight,
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
LastFullBlockHeight: lastFullBlockHeight,
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
ExecNodeIdentitiesProvider: builder.ExecNodeIdentitiesProvider,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down Expand Up @@ -2049,25 +2067,12 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("could not create requester engine: %w", err)
}

preferredENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

if builder.storeTxResultErrorMessages {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
node.Logger,
node.State,
builder.nodeBackend,
node.Storage.Receipts,
node.Storage.TransactionResultErrorMessages,
preferredENIdentifiers,
fixedENIdentifiers,
builder.ExecNodeIdentitiesProvider,
)
}

Expand Down
58 changes: 38 additions & 20 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/follower"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/stop"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
Expand Down Expand Up @@ -1862,34 +1863,51 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
indexReporter = builder.Reporter
}

preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
node.Logger,
node.State,
node.Storage.Receipts,
preferredENIdentifiers,
fixedENIdentifiers,
)

backendParams := backend.Params{
State: node.State,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
ChainID: node.RootChainID,
AccessMetrics: accessMetrics,
ConnFactory: connFactory,
RetryEnabled: false,
MaxHeightRange: backendConfig.MaxHeightRange,
PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs,
FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
BlockTracker: blockTracker,
State: node.State,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
ChainID: node.RootChainID,
AccessMetrics: accessMetrics,
ConnFactory: connFactory,
RetryEnabled: false,
MaxHeightRange: backendConfig.MaxHeightRange,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
BlockTracker: blockTracker,
SubscriptionHandler: subscription.NewSubscriptionHandler(
builder.Logger,
broadcaster,
builder.stateStreamConf.ClientSendTimeout,
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
}

if builder.localServiceAPIEnabled {
Expand Down
128 changes: 77 additions & 51 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/onflow/flow-go/engine/access/rpc/backend"
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/access/subscription"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/factory"
Expand Down Expand Up @@ -642,23 +643,32 @@ func (suite *Suite) TestGetSealedTransaction() {
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)

bnd, err := backend.New(backend.Params{State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: enNodeIDs.Strings(),
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
suite.log,
suite.state,
receipts,
enNodeIDs,
nil,
)

bnd, err := backend.New(backend.Params{
State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
})
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -821,23 +831,31 @@ func (suite *Suite) TestGetTransactionResult() {
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)

execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
suite.log,
suite.state,
receipts,
enNodeIDs,
nil,
)

bnd, err := backend.New(backend.Params{State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: enNodeIDs.Strings(),
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
})
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -1050,26 +1068,34 @@ func (suite *Suite) TestExecuteScript() {
connFactory := connectionmock.NewConnectionFactory(suite.T())
connFactory.On("GetExecutionAPIClient", mock.Anything).Return(suite.execClient, &mockCloser{}, nil)

execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
suite.log,
suite.state,
receipts,
nil,
identities.NodeIDs(),
)

var err error
suite.backend, err = backend.New(backend.Params{
State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
FixedExecutionNodeIDs: (identities.NodeIDs()).Strings(),
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly,
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly,
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
})
require.NoError(suite.T(), err)

Expand Down
Loading
Loading