diff --git a/app/ante/msg_gatekeeper.go b/app/ante/msg_gatekeeper.go index 9f4841448a..f9ebd49c42 100644 --- a/app/ante/msg_gatekeeper.go +++ b/app/ante/msg_gatekeeper.go @@ -65,10 +65,11 @@ func (mgk MsgVersioningGateKeeper) hasInvalidMsg(ctx sdk.Context, acceptedMsgs m } func (mgk MsgVersioningGateKeeper) IsAllowed(ctx context.Context, msgName string) (bool, error) { - appVersion := sdk.UnwrapSDKContext(ctx).BlockHeader().Version.App + sdkCtx := sdk.UnwrapSDKContext(ctx) + appVersion := sdkCtx.BlockHeader().Version.App acceptedMsgs, exists := mgk.acceptedMsgs[appVersion] if !exists { - return false, sdkerrors.ErrNotSupported.Wrapf("app version %d is not supported", appVersion) + return false, sdkerrors.ErrNotSupported.Wrapf("circuit breaker: app version %d is not supported", appVersion) } _, exists = acceptedMsgs[msgName] if !exists { diff --git a/app/grpc/tx/server.go b/app/grpc/tx/server.go index 417b7c088b..93b3c9da36 100644 --- a/app/grpc/tx/server.go +++ b/app/grpc/tx/server.go @@ -76,7 +76,6 @@ func (s *txServer) TxStatus(ctx context.Context, req *TxStatusRequest) (*TxStatu Height: resTx.Height, Index: resTx.Index, ExecutionCode: resTx.ExecutionCode, - Error: resTx.Error, Status: resTx.Status, }, nil } diff --git a/go.mod b/go.mod index 7ad4054fc1..15341387e8 100644 --- a/go.mod +++ b/go.mod @@ -86,7 +86,6 @@ require ( github.com/cosmos/ledger-cosmos-go v0.13.2 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c // indirect github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect - github.com/creachadair/taskgroup v0.3.2 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/deckarep/golang-set/v2 v2.6.0 // indirect @@ -260,5 +259,6 @@ replace ( github.com/cosmos/ledger-cosmos-go => github.com/cosmos/ledger-cosmos-go v0.12.4 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 + //v1.39.0-tm-v0.34.29 + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241028112242-a31a8fe76311 ) diff --git a/go.sum b/go.sum index 22fd80bf43..c82543a946 100644 --- a/go.sum +++ b/go.sum @@ -317,8 +317,8 @@ github.com/celestiaorg/bittwister v0.0.0-20231213180407-65cdbaf5b8c7 h1:nxplQi8w github.com/celestiaorg/bittwister v0.0.0-20231213180407-65cdbaf5b8c7/go.mod h1:1EF5MfOxVf0WC51Gb7pJ6bcZxnXKNAf9pqWtjgPBAYc= github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZIuyASInj1a9ExI8xOsTOw= github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 h1:L4GTm+JUXhB0a/nGPMq6jEqqe6THuYSQ8m2kUCtZYqw= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241028112242-a31a8fe76311 h1:h5DHE1WwvQnbP9u6REwZN6TAEWtQTomEnqy4Yl90DLw= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241028112242-a31a8fe76311/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 h1:f+fTe7GGk0/qgdzyqB8kk8EcDf9d6MC22khBTQiDXsU= github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16/go.mod h1:07Z8HJqS8Rw4XlZ+ok3D3NM/X/in8mvcGLvl0Zb5wrA= github.com/celestiaorg/go-square v1.1.1 h1:Cy3p8WVspVcyOqHM8BWFuuYPwMitO1pYGe+ImILFZRA= diff --git a/pkg/user/tx_client.go b/pkg/user/tx_client.go index ce2f7b8933..6deed1c342 100644 --- a/pkg/user/tx_client.go +++ b/pkg/user/tx_client.go @@ -433,37 +433,39 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon return nil, err } - switch resp.Status { - case core.TxStatusPending: - // Continue polling if the transaction is still pending - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-pollTicker.C: - continue - } - case core.TxStatusCommitted: - txResponse := &TxResponse{ - Height: resp.Height, - TxHash: txHash, - Code: resp.ExecutionCode, - } - if resp.ExecutionCode != abci.CodeTypeOK { - executionErr := &ExecutionError{ - TxHash: txHash, - Code: resp.ExecutionCode, - ErrorLog: resp.Error, + if resp != nil { + switch resp.Status { + case core.TxStatusPending: + // Continue polling if the transaction is still pending + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-pollTicker.C: + continue + } + case core.TxStatusCommitted: + txResponse := &TxResponse{ + Height: resp.Height, + TxHash: txHash, + Code: resp.ExecutionCode, } + if resp.ExecutionCode != abci.CodeTypeOK { + executionErr := &ExecutionError{ + TxHash: txHash, + Code: resp.ExecutionCode, + ErrorLog: resp.Error, + } + client.deleteFromTxTracker(txHash) + return nil, executionErr + } + client.deleteFromTxTracker(txHash) + return txResponse, nil + case core.TxStatusEvicted: + return nil, client.handleEvictions(txHash) + default: client.deleteFromTxTracker(txHash) - return nil, executionErr + return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash) } - client.deleteFromTxTracker(txHash) - return txResponse, nil - case core.TxStatusEvicted: - return nil, client.handleEvictions(txHash) - default: - client.deleteFromTxTracker(txHash) - return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash) } } } diff --git a/test/cmd/txsim/cli.go b/test/cmd/txsim/cli.go index 55cac6d4fd..4a0af58447 100644 --- a/test/cmd/txsim/cli.go +++ b/test/cmd/txsim/cli.go @@ -39,7 +39,7 @@ var ( pollTime time.Duration send, sendIterations, sendAmount int stake, stakeValue, blob int - useFeegrant, suppressLogs bool + useFeegrant, suppressLogs, ignoreFailures bool upgradeSchedule string blobShareVersion int ) @@ -177,6 +177,10 @@ well funded account that can act as the master account. The command runs until a opts.SuppressLogs() } + if ignoreFailures { + opts.IgnoreFailures() + } + encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) err = txsim.Run( cmd.Context(), @@ -215,6 +219,7 @@ func flags() *flag.FlagSet { flags.StringVar(&blobSizes, "blob-sizes", "100-1000", "range of blob sizes to send") flags.StringVar(&blobAmounts, "blob-amounts", "1", "range of blobs per PFB specified as a single value or a min-max range (e.g., 10 or 5-10). A single value indicates the exact number of blobs to be created.") flags.BoolVar(&useFeegrant, "feegrant", false, "use the feegrant module to pay for fees") + flags.BoolVar(&ignoreFailures, "ignore-failures", false, "ignore failures") flags.BoolVar(&suppressLogs, "suppressLogs", false, "disable logging") flags.IntVar(&blobShareVersion, "blob-share-version", -1, "optionally specify a share version to use for the blob sequences") return flags diff --git a/test/e2e/benchmark/throughput.go b/test/e2e/benchmark/throughput.go index 75bf1bdf6c..0827f6538e 100644 --- a/test/e2e/benchmark/throughput.go +++ b/test/e2e/benchmark/throughput.go @@ -66,7 +66,7 @@ func TwoNodeSimple(logger *log.Logger) error { ValidatorResource: testnet.DefaultResources, TxClientsResource: testnet.DefaultResources, SelfDelegation: 10000000, - CelestiaAppVersion: latestVersion, + CelestiaAppVersion: "6f334a2", TxClientVersion: testnet.TxsimVersion, EnableLatency: false, LatencyParams: LatencyParams{70, 0}, // in milliseconds @@ -76,15 +76,15 @@ func TwoNodeSimple(logger *log.Logger) error { PerPeerBandwidth: 5 * testnet.MB, UpgradeHeight: 0, TimeoutCommit: 1 * time.Second, - TimeoutPropose: 1 * time.Second, - Mempool: "v1", + TimeoutPropose: 5 * time.Second, + Mempool: "v2", BroadcastTxs: true, Prometheus: true, GovMaxSquareSize: appconsts.DefaultGovMaxSquareSize, MaxBlockBytes: appconsts.DefaultMaxBytes, LocalTracingType: "local", - PushTrace: false, - DownloadTraces: false, + PushTrace: true, + DownloadTraces: true, TestDuration: 3 * time.Minute, TxClients: 2, DisableBBR: true, diff --git a/test/e2e/experiment/compact_blocks/main.go b/test/e2e/experiment/compact_blocks/main.go new file mode 100644 index 0000000000..511666315d --- /dev/null +++ b/test/e2e/experiment/compact_blocks/main.go @@ -0,0 +1,178 @@ +package main + +import ( + "context" + "fmt" + "log" + "strings" + "time" + + "github.com/celestiaorg/celestia-app/v3/app" + "github.com/celestiaorg/celestia-app/v3/app/encoding" + "github.com/celestiaorg/celestia-app/v3/test/e2e/testnet" + "github.com/celestiaorg/celestia-app/v3/test/util/genesis" + blobtypes "github.com/celestiaorg/celestia-app/v3/x/blob/types" + "github.com/celestiaorg/knuu/pkg/knuu" + "github.com/tendermint/tendermint/config" + + "github.com/tendermint/tendermint/pkg/trace" + "github.com/tendermint/tendermint/pkg/trace/schema" +) + +const ( + compactBlocksVersion = "70e7354" +) + +func main() { + if err := Run(); err != nil { + log.Fatalf("failed to run experiment: %v", err) + } +} + +func Run() error { + const ( + nodes = 8 + timeoutCommit = 3 * time.Second + timeoutPropose = 4 * time.Second + version = compactBlocksVersion + timeFormat = "20060102_150405" + ) + + blobParams := blobtypes.DefaultParams() + // set the square size to 128 + blobParams.GovMaxSquareSize = 128 + ecfg := encoding.MakeConfig(app.ModuleBasics) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + identifier := fmt.Sprintf("%s_%s", "compact-blocks", time.Now().Format(timeFormat)) + kn, err := knuu.New(ctx, knuu.Options{ + Scope: identifier, + ProxyEnabled: true, + }) + testnet.NoError("failed to initialize Knuu", err) + + network, err := testnet.New(kn, testnet.Options{ + GenesisModifiers: []genesis.Modifier{ + genesis.SetBlobParams(ecfg.Codec, blobParams), + }, + ChainID: identifier, + }) + if err != nil { + return err + } + defer network.Cleanup(ctx) + + cparams := app.DefaultConsensusParams() + cparams.Block.MaxBytes = 8 * 1024 * 1024 + network.SetConsensusParams(cparams) + + err = network.CreateGenesisNodes(ctx, nodes, version, 10000000, 0, testnet.DefaultResources, true) + if err != nil { + return err + } + + gRPCEndpoints, err := network.RemoteGRPCEndpoints(ctx) + if err != nil { + return err + } + + err = network.CreateTxClients( + ctx, + compactBlocksVersion, + 40, + "128000-256000", + 1, + testnet.DefaultResources, + gRPCEndpoints[:2], + map[int64]uint64{}, + ) + if err != nil { + return err + } + + log.Printf("Setting up network\n") + err = network.Setup( + ctx, + testnet.WithTimeoutCommit(timeoutCommit), + testnet.WithTimeoutPropose(timeoutPropose), + testnet.WithMempool("v2"), + func(cfg *config.Config) { + // create a partially connected network by only dialing 5 peers + cfg.P2P.MaxNumOutboundPeers = 3 + cfg.P2P.MaxNumInboundPeers = 4 + cfg.Mempool.MaxTxsBytes = 100 * 1024 * 1024 + cfg.Instrumentation.TraceType = "local" + cfg.Instrumentation.TracingTables = strings.Join([]string{ + schema.RoundStateTable, + schema.BlockTable, + schema.ProposalTable, + schema.CompactBlockTable, + schema.MempoolRecoveryTable, + }, ",") + }, + ) + if err != nil { + return err + } + + pushConfig, err := trace.GetPushConfigFromEnv() + if err != nil { + return err + } + log.Print("Setting up trace push config") + for _, node := range network.Nodes() { + if err = node.Instance.Build().SetEnvironmentVariable(trace.PushBucketName, pushConfig.BucketName); err != nil { + return fmt.Errorf("failed to set TRACE_PUSH_BUCKET_NAME: %v", err) + } + if err = node.Instance.Build().SetEnvironmentVariable(trace.PushRegion, pushConfig.Region); err != nil { + return fmt.Errorf("failed to set TRACE_PUSH_REGION: %v", err) + } + if err = node.Instance.Build().SetEnvironmentVariable(trace.PushAccessKey, pushConfig.AccessKey); err != nil { + return fmt.Errorf("failed to set TRACE_PUSH_ACCESS_KEY: %v", err) + } + if err = node.Instance.Build().SetEnvironmentVariable(trace.PushKey, pushConfig.SecretKey); err != nil { + return fmt.Errorf("failed to set TRACE_PUSH_SECRET_KEY: %v", err) + } + if err = node.Instance.Build().SetEnvironmentVariable(trace.PushDelay, fmt.Sprintf("%d", pushConfig.PushDelay)); err != nil { + return fmt.Errorf("failed to set TRACE_PUSH_DELAY: %v", err) + } + } + + log.Printf("Starting network\n") + err = network.Start(ctx) + if err != nil { + return err + } + + // run the test for 5 minutes + heightTicker := time.NewTicker(20 * time.Second) + timeout := time.NewTimer(5 * time.Minute) + client, err := network.Node(0).Client() + if err != nil { + return err + } + log.Println("--- RUNNING TESTNET") + for { + select { + case <-heightTicker.C: + status, err := client.Status(context.Background()) + if err != nil { + log.Printf("Error getting status: %v", err) + continue + } + log.Printf("Height: %v", status.SyncInfo.LatestBlockHeight) + + case <-timeout.C: + log.Println("--- COLLECTING DATA") + file := "/Users/callum/Developer/go/src/github.com/celestiaorg/big-blocks-research/traces" + if err := trace.S3Download(file, identifier, pushConfig, schema.RoundStateTable, schema.BlockTable, schema.ProposalTable, schema.CompactBlockTable, schema.MempoolRecoveryTable); err != nil { + return fmt.Errorf("failed to download traces from S3: %w", err) + } + + log.Println("--- FINISHED ✅: ChainID: ", identifier) + return nil + } + } +} diff --git a/test/txsim/account.go b/test/txsim/account.go index 9796f60b04..8205206fe7 100644 --- a/test/txsim/account.go +++ b/test/txsim/account.go @@ -26,12 +26,13 @@ import ( const defaultFee = DefaultGasLimit * appconsts.DefaultMinGasPrice type AccountManager struct { - keys keyring.Keyring - conn *grpc.ClientConn - pending []*account - encCfg encoding.Config - pollTime time.Duration - useFeegrant bool + keys keyring.Keyring + conn *grpc.ClientConn + pending []*account + encCfg encoding.Config + pollTime time.Duration + useFeegrant bool + ignoreFailures bool // to protect from concurrent writes to the map mtx sync.Mutex @@ -51,6 +52,7 @@ func NewAccountManager( conn *grpc.ClientConn, pollTime time.Duration, useFeegrant bool, + ignoreFailures bool, ) (*AccountManager, error) { records, err := keys.List() if err != nil { @@ -62,14 +64,15 @@ func NewAccountManager( } am := &AccountManager{ - keys: keys, - encCfg: encCfg, - pending: make([]*account, 0), - conn: conn, - pollTime: pollTime, - useFeegrant: useFeegrant, - addressMap: make(map[string]string), - accountIndex: len(records), + keys: keys, + encCfg: encCfg, + pending: make([]*account, 0), + conn: conn, + pollTime: pollTime, + useFeegrant: useFeegrant, + ignoreFailures: ignoreFailures, + addressMap: make(map[string]string), + accountIndex: len(records), } if masterAccName == "" { @@ -268,6 +271,9 @@ func (am *AccountManager) Submit(ctx context.Context, op Operation) error { } } if err != nil { + if am.ignoreFailures { + return nil + } return err } diff --git a/test/txsim/blob.go b/test/txsim/blob.go index 3fcef2fa33..c68d29b947 100644 --- a/test/txsim/blob.go +++ b/test/txsim/blob.go @@ -28,6 +28,7 @@ type BlobSequence struct { account types.AccAddress useFeegrant bool + gasPrice float64 } func NewBlobSequence(sizes, blobsPerPFB Range) *BlobSequence { @@ -35,6 +36,7 @@ func NewBlobSequence(sizes, blobsPerPFB Range) *BlobSequence { sizes: sizes, blobsPerPFB: blobsPerPFB, shareVersions: []uint8{share.ShareVersionZero, share.ShareVersionOne}, + gasPrice: appconsts.DefaultMinGasPrice * 10, } } @@ -111,10 +113,13 @@ func (s *BlobSequence) Next(_ context.Context, _ grpc.ClientConn, rand *rand.Ran if err != nil { return Operation{}, err } + // increment the gas price by 0.0001 + defer func() { s.gasPrice += 0.0001 }() return Operation{ Msgs: []types.Msg{msg}, Blobs: blobs, GasLimit: estimateGas(sizes, s.useFeegrant), + GasPrice: s.gasPrice, }, nil } diff --git a/test/txsim/run.go b/test/txsim/run.go index a8d57c6a0e..1bc6ee9529 100644 --- a/test/txsim/run.go +++ b/test/txsim/run.go @@ -58,7 +58,16 @@ func Run( } // Create the account manager to handle account transactions. - manager, err := NewAccountManager(ctx, keys, encCfg, opts.masterAcc, conn, opts.pollTime, opts.useFeeGrant) + manager, err := NewAccountManager( + ctx, + keys, + encCfg, + opts.masterAcc, + conn, + opts.pollTime, + opts.useFeeGrant, + opts.ignoreFailures, + ) if err != nil { return err } @@ -129,6 +138,9 @@ type Options struct { pollTime time.Duration useFeeGrant bool suppressLogger bool + // If set to true, if a sequence fails to submit a transction + // it will not halt. + ignoreFailures bool } func (o *Options) Fill() { @@ -156,6 +168,11 @@ func (o *Options) UseFeeGrant() *Options { return o } +func (o *Options) IgnoreFailures() *Options { + o.ignoreFailures = true + return o +} + func (o *Options) SpecifyMasterAccount(name string) *Options { o.masterAcc = name return o