diff --git a/app/estimate_square_size.go b/app/estimate_square_size.go index 1d4eb96c..f3f64878 100644 --- a/app/estimate_square_size.go +++ b/app/estimate_square_size.go @@ -34,7 +34,7 @@ func estimateSquareSize(normalTxs [][]byte, blobTxs []core.BlobTx) (squareSize u // estimate as much totalSharesUsed := uint64(txSharesUsed + pfbTxSharesUsed + blobSharesUsed) totalSharesUsed *= 2 - minSize := uint64(math.Sqrt(float64(totalSharesUsed))) + minSize := uint64(math.Ceil(math.Sqrt(float64(totalSharesUsed)))) squareSize = shares.RoundUpPowerOfTwo(minSize) if squareSize >= appconsts.DefaultMaxSquareSize { squareSize = appconsts.DefaultMaxSquareSize diff --git a/app/estimate_square_size_test.go b/app/estimate_square_size_test.go index 2f43921a..2556d02f 100644 --- a/app/estimate_square_size_test.go +++ b/app/estimate_square_size_test.go @@ -25,12 +25,12 @@ func Test_estimateSquareSize(t *testing.T) { } tests := []test{ {"empty block", 0, 0, 0, appconsts.DefaultMinSquareSize}, - {"one normal tx", 1, 0, 0, appconsts.DefaultMinSquareSize}, + {"one normal tx", 1, 0, 0, 2}, {"one small pfb small block", 0, 1, 100, 2}, {"mixed small block", 10, 12, 500, 16}, {"small block 2", 0, 12, 1000, 16}, {"mixed medium block 2", 10, 20, 10000, 32}, - {"one large pfb large block", 0, 1, 1000000, 64}, + {"one large pfb large block", 0, 1, 1000000, appconsts.DefaultMaxSquareSize}, {"one hundred large pfb large block", 0, 100, 100000, appconsts.DefaultMaxSquareSize}, {"one hundred large pfb medium block", 100, 100, 100000, appconsts.DefaultMaxSquareSize}, {"mixed transactions large block", 100, 100, 100000, appconsts.DefaultMaxSquareSize}, @@ -75,7 +75,7 @@ func Test_estimateSquareSize_MultiBlob(t *testing.T) { func() [][]int { return blobfactory.Repeat([]int{1000}, 10) }, - 8, 8, + 16, 8, }, { "10 multiblob 4 share transactions", diff --git a/testing/txsim/account.go b/testing/txsim/account.go new file mode 100644 index 00000000..97528a76 --- /dev/null +++ b/testing/txsim/account.go @@ -0,0 +1,443 @@ +package txsim + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/celestiaorg/celestia-app/app" + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/tx/signing" + authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" + auth "github.com/cosmos/cosmos-sdk/x/auth/types" + bank "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/rs/zerolog/log" +) + +const defaultFee = DefaultGasLimit * DefaultGasPrice + +type AccountManager struct { + keys keyring.Keyring + tx *TxClient + query *QueryClient + pending []*Account + + // to protect from concurrent writes to the map + mtx sync.Mutex + masterAccount *Account + accounts map[string]*Account +} + +type Account struct { + Address types.AccAddress + PubKey cryptotypes.PubKey + Sequence uint64 + AccountNumber uint64 + Balance int64 +} + +func NewAccountManager(ctx context.Context, keys keyring.Keyring, txClient *TxClient, queryClient *QueryClient) (*AccountManager, error) { + records, err := keys.List() + if err != nil { + return nil, err + } + + if len(records) == 0 { + return nil, fmt.Errorf("no accounts found in keyring") + } + + am := &AccountManager{ + keys: keys, + accounts: make(map[string]*Account), + pending: make([]*Account, 0), + tx: txClient, + query: queryClient, + } + + if err := am.setupMasterAccount(ctx); err != nil { + return nil, err + } + + log.Info(). + Str("address", am.masterAccount.Address.String()). + Int64("balance", am.masterAccount.Balance). + Msg("set master account") + am.accounts[am.masterAccount.Address.String()] = am.masterAccount + + return am, nil +} + +// setupMasterAccount loops through all accounts in the keyring and picks out the one with +// the highest balance as the master account. Accounts that don't yet exist on chain are +// ignored. +func (am *AccountManager) setupMasterAccount(ctx context.Context) error { + am.mtx.Lock() + defer am.mtx.Unlock() + + records, err := am.keys.List() + if err != nil { + return err + } + + for _, record := range records { + address, err := record.GetAddress() + if err != nil { + return fmt.Errorf("error getting address for account %s: %w", record.Name, err) + } + + // search for the account on chain + balance, err := am.getBalance(ctx, address) + if err != nil { + log.Err(err).Str("account", record.Name).Msg("error getting initial account balance") + continue + } + + // the master account is the account with the highest balance + if am.masterAccount == nil || balance > am.masterAccount.Balance { + accountNumber, sequence, err := am.getAccountDetails(ctx, address) + if err != nil { + log.Err(err).Str("account", record.Name).Msg("error getting initial account details") + continue + } + pk, err := record.GetPubKey() + if err != nil { + return fmt.Errorf("error getting public key for account %s: %w", record.Name, err) + } + am.masterAccount = &Account{ + Address: address, + PubKey: pk, + Sequence: sequence, + AccountNumber: accountNumber, + Balance: balance, + } + } + } + + if am.masterAccount == nil { + return fmt.Errorf("no suitable account found") + } + + return nil +} + +// AllocateAccounts is used by sequences to specify the number of accounts +// and the balance of each of those accounts. Not concurrently safe. +func (am *AccountManager) AllocateAccounts(n, balance int) []types.AccAddress { + if n < 1 { + panic("n must be greater than 0") + } + if balance < 1 { + panic("balance must be greater than 0") + } + + path := hd.CreateHDPath(118, 0, 0).String() + addresses := make([]types.AccAddress, n) + for i := 0; i < n; i++ { + record, _, err := am.keys.NewMnemonic(am.nextAccountName(), keyring.English, path, keyring.DefaultBIP39Passphrase, hd.Secp256k1) + if err != nil { + panic(err) + } + addresses[i], err = record.GetAddress() + if err != nil { + panic(err) + } + + pk, err := record.GetPubKey() + if err != nil { + panic(err) + } + + am.pending = append(am.pending, &Account{ + Address: addresses[i], + PubKey: pk, + Balance: int64(balance), + }) + } + return addresses +} + +// Submit executes on an operation. This is thread safe. +func (am *AccountManager) Submit(ctx context.Context, op Operation) error { + for _, msg := range op.Msgs { + if err := msg.ValidateBasic(); err != nil { + return fmt.Errorf("error validating message: %w", err) + } + } + + // create the tx builder and add the messages + builder := am.tx.Tx() + err := builder.SetMsgs(op.Msgs...) + if err != nil { + return fmt.Errorf("error setting messages: %w", err) + } + + if op.GasLimit == 0 { + builder.SetGasLimit(DefaultGasLimit) + builder.SetFeeAmount(types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(defaultFee)))) + } else { + builder.SetGasLimit(op.GasLimit) + if op.GasPrice > 0 { + builder.SetFeeAmount(types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(float64(op.GasLimit)*op.GasPrice)))) + } else { + builder.SetFeeAmount(types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(float64(op.GasLimit)*DefaultGasPrice)))) + } + } + + if err := am.signTransaction(builder); err != nil { + return err + } + + // If the sequence specified a delay, then wait for those blocks to be produced + if op.Delay != 0 { + if err := am.tx.WaitForNBlocks(ctx, op.Delay); err != nil { + return fmt.Errorf("error waiting for blocks: %w", err) + } + } + + // broadcast the transaction + resp, err := am.tx.Broadcast(ctx, builder, op.Blobs) + if err != nil { + return fmt.Errorf("error broadcasting transaction: %w", err) + } + + signers := builder.GetTx().GetSigners() + + // increment the sequence number for all the signers + am.incrementSignerSequences(signers) + + log.Info(). + Int64("height", resp.Height). + Str("signers", addrsToString(signers)). + Str("msgs", msgsToString(op.Msgs)). + Msg("tx committed") + + return nil +} + +// Generate the pending accounts by sending the adequate funds and setting up the feegrant permissions. +// This operation is not concurrently safe. +func (am *AccountManager) GenerateAccounts(ctx context.Context) error { + if len(am.pending) == 0 { + return nil + } + + msgs := make([]types.Msg, 0) + // batch together all the messages needed to create all the accounts + for _, acc := range am.pending { + if am.masterAccount.Balance < acc.Balance { + return fmt.Errorf("master account has insufficient funds") + } + + bankMsg := bank.NewMsgSend(am.masterAccount.Address, acc.Address, types.NewCoins(types.NewInt64Coin(app.BondDenom, acc.Balance))) + msgs = append(msgs, bankMsg) + } + + err := am.Submit(ctx, Operation{Msgs: msgs}) + if err != nil { + return fmt.Errorf("error funding accounts: %w", err) + } + + // check that the account now exists + for _, acc := range am.pending { + acc.AccountNumber, acc.Sequence, err = am.getAccountDetails(ctx, acc.Address) + if err != nil { + return fmt.Errorf("getting account %s: %w", acc.Address, err) + } + + // set the account + am.accounts[acc.Address.String()] = acc + log.Info(). + Str("address", acc.Address.String()). + Int64("balance", acc.Balance). + Str("pubkey", acc.PubKey.String()). + Uint64("account number", acc.AccountNumber). + Uint64("sequence", acc.Sequence). + Msg("initialized account") + } + + // update master account + err = am.updateAccount(ctx, am.masterAccount) + if err != nil { + return fmt.Errorf("updating master account: %w", err) + } + + // clear the pending accounts + am.pending = nil + return nil +} + +func (am *AccountManager) signTransaction(builder client.TxBuilder) error { + signers := builder.GetTx().GetSigners() + for _, signer := range signers { + _, ok := am.accounts[signer.String()] + if !ok { + return fmt.Errorf("account %s not found", signer.String()) + } + } + + // To ensure we have the correct bytes to sign over we produce + // a dry run of the signing data + draftsigV2 := make([]signing.SignatureV2, len(signers)) + index := 0 + for _, signer := range signers { + acc := am.accounts[signer.String()] + record, err := am.keys.KeyByAddress(signer) + if err != nil { + return fmt.Errorf("error getting key for account %s: %w", signer.String(), err) + } + pk, _ := record.GetPubKey() + if !pk.Equals(acc.PubKey) { + return fmt.Errorf("public key (%s != %s) mismatch for account %s", pk.String(), acc.PubKey.String(), signer.String()) + } + draftsigV2[index] = signing.SignatureV2{ + PubKey: acc.PubKey, + Data: &signing.SingleSignatureData{ + SignMode: signing.SignMode_SIGN_MODE_DIRECT, + Signature: nil, + }, + Sequence: acc.Sequence, + } + index++ + } + + err := builder.SetSignatures(draftsigV2...) + if err != nil { + return fmt.Errorf("error setting draft signatures: %w", err) + } + + // now we can use the data to produce the signature from each signer + index = 0 + sigV2 := make([]signing.SignatureV2, len(signers)) + for _, signer := range signers { + acc := am.accounts[signer.String()] + signature, err := am.createSignature(acc, builder) + if err != nil { + return fmt.Errorf("error creating signature: %w", err) + } + sigV2[index] = signing.SignatureV2{ + PubKey: acc.PubKey, + Data: &signing.SingleSignatureData{ + SignMode: signing.SignMode_SIGN_MODE_DIRECT, + Signature: signature, + }, + Sequence: acc.Sequence, + } + index++ + } + + err = builder.SetSignatures(sigV2...) + if err != nil { + return fmt.Errorf("error setting signatures: %w", err) + } + + return nil +} + +func (am *AccountManager) createSignature(account *Account, builder client.TxBuilder) ([]byte, error) { + signerData := authsigning.SignerData{ + Address: account.Address.String(), + ChainID: am.tx.ChainID(), + AccountNumber: account.AccountNumber, + Sequence: account.Sequence, + PubKey: account.PubKey, + } + + bytesToSign, err := am.tx.encCfg.TxConfig.SignModeHandler().GetSignBytes( + signing.SignMode_SIGN_MODE_DIRECT, + signerData, + builder.GetTx(), + ) + if err != nil { + return nil, fmt.Errorf("error getting sign bytes: %w", err) + } + + signature, _, err := am.keys.SignByAddress(account.Address, bytesToSign) + if err != nil { + return nil, fmt.Errorf("error signing bytes: %w", err) + } + + return signature, nil +} + +func (am *AccountManager) updateAccount(ctx context.Context, account *Account) error { + newBalance, err := am.getBalance(ctx, account.Address) + if err != nil { + return fmt.Errorf("getting account balance: %w", err) + } + newAccountNumber, newSequence, err := am.getAccountDetails(ctx, account.Address) + if err != nil { + return fmt.Errorf("getting account details: %w", err) + } + + am.mtx.Lock() + defer am.mtx.Unlock() + account.Balance = newBalance + account.AccountNumber = newAccountNumber + account.Sequence = newSequence + return nil +} + +// getBalance returns the balance for the given address +func (am *AccountManager) getBalance(ctx context.Context, address types.AccAddress) (int64, error) { + balanceResp, err := am.query.Bank().Balance(ctx, &bank.QueryBalanceRequest{ + Address: address.String(), + Denom: app.BondDenom, + }) + if err != nil { + return 0, fmt.Errorf("error getting balance for %s: %w", address.String(), err) + } + return balanceResp.GetBalance().Amount.Int64(), nil +} + +// getAccountDetails returns the account number and sequence for the given address +func (am *AccountManager) getAccountDetails(ctx context.Context, address types.AccAddress) (uint64, uint64, error) { + accountResp, err := am.query.Auth().Account(ctx, &auth.QueryAccountRequest{ + Address: address.String(), + }) + if err != nil { + return 0, 0, fmt.Errorf("error getting account state for %s: %w", address.String(), err) + } + + var acc auth.AccountI + err = am.tx.encCfg.InterfaceRegistry.UnpackAny(accountResp.Account, &acc) + if err != nil { + return 0, 0, fmt.Errorf("error unpacking account: %w", err) + } + + return acc.GetAccountNumber(), acc.GetSequence(), nil +} + +func (am *AccountManager) incrementSignerSequences(signers []types.AccAddress) { + am.mtx.Lock() + defer am.mtx.Unlock() + for _, signer := range signers { + am.accounts[signer.String()].Sequence++ + } +} + +func (am *AccountManager) nextAccountName() string { + return accountName(len(am.pending) + len(am.accounts)) +} + +func accountName(n int) string { return fmt.Sprintf("tx-sim-%d", n) } + +func addrsToString(addrs []types.AccAddress) string { + addrsStr := make([]string, len(addrs)) + for i, addr := range addrs { + addrsStr[i] = addr.String() + } + return strings.Join(addrsStr, ",") +} + +func msgsToString(msgs []types.Msg) string { + msgsStr := make([]string, len(msgs)) + for i, msg := range msgs { + msgsStr[i] = types.MsgTypeURL(msg) + } + return strings.Join(msgsStr, ",") +} diff --git a/testing/txsim/blob.go b/testing/txsim/blob.go new file mode 100644 index 00000000..7b0a5805 --- /dev/null +++ b/testing/txsim/blob.go @@ -0,0 +1,124 @@ +package txsim + +import ( + "context" + "fmt" + "math/rand" + + "github.com/celestiaorg/celestia-app/pkg/appconsts" + ns "github.com/celestiaorg/celestia-app/pkg/namespace" + "github.com/celestiaorg/celestia-app/testutil/blobfactory" + blob "github.com/celestiaorg/celestia-app/x/blob/types" + "github.com/cosmos/cosmos-sdk/types" + "github.com/gogo/protobuf/grpc" +) + +var _ Sequence = &BlobSequence{} + +// As napkin math, this would cover the cost of 8267 4KB blobs +const fundsForGas = 1e9 // 1000 TIA + +// BlobSequence defines a pattern whereby a single user repeatedly sends a pay for blob +// message roughly every height. The PFB may consist of several blobs +type BlobSequence struct { + namespace ns.Namespace + sizes Range + blobsPerPFB Range + + account types.AccAddress +} + +func NewBlobSequence(sizes, blobsPerPFB Range) *BlobSequence { + return &BlobSequence{ + sizes: sizes, + blobsPerPFB: blobsPerPFB, + } +} + +// WithNamespace provides the option of fixing a predefined namespace for +// all blobs. +func (s *BlobSequence) WithNamespace(namespace ns.Namespace) *BlobSequence { + s.namespace = namespace + return s +} + +func (s *BlobSequence) Clone(n int) []Sequence { + sequenceGroup := make([]Sequence, n) + for i := 0; i < n; i++ { + sequenceGroup[i] = &BlobSequence{ + namespace: s.namespace, + sizes: s.sizes, + blobsPerPFB: s.blobsPerPFB, + } + } + return sequenceGroup +} + +func (s *BlobSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { + s.account = allocateAccounts(1, fundsForGas)[0] +} + +func (s *BlobSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) { + numBlobs := s.blobsPerPFB.Rand(rand) + sizes := make([]int, numBlobs) + namespaces := make([]ns.Namespace, numBlobs) + for i := range sizes { + if s.namespace.ID != nil { + namespaces[i] = s.namespace + } else { + // generate a random namespace for the blob + namespace := make([]byte, ns.NamespaceVersionZeroIDSize) + _, err := rand.Read(namespace) + if err != nil { + return Operation{}, fmt.Errorf("generating random namespace: %w", err) + } + namespaces[i] = ns.MustNewV0(namespace) + } + sizes[i] = s.sizes.Rand(rand) + } + // generate the blobs + blobs := blobfactory.RandBlobsWithNamespace(namespaces, sizes) + // derive the pay for blob message + msg, err := blob.NewMsgPayForBlobs(s.account.String(), blobs...) + if err != nil { + return Operation{}, err + } + return Operation{ + Msgs: []types.Msg{msg}, + Blobs: blobs, + GasLimit: EstimateGas(sizes), + }, nil +} + +type Range struct { + Min int + Max int +} + +func NewRange(min, max int) Range { + return Range{Min: min, Max: max} +} + +// Rand returns a random number between min (inclusive) and max (exclusive). +func (r Range) Rand(rand *rand.Rand) int { + if r.Max <= r.Min { + return r.Min + } + return rand.Intn(r.Max-r.Min) + r.Min +} + +const ( + perByteGasTolerance = 2 + pfbGasFixedCost = 80000 +) + +// EstimateGas estimates the gas required to pay for a set of blobs in a PFB. +func EstimateGas(blobSizes []int) uint64 { + totalByteCount := 0 + for _, size := range blobSizes { + totalByteCount += size + } + variableGasAmount := (appconsts.DefaultGasPerBlobByte + perByteGasTolerance) * totalByteCount + + return uint64(variableGasAmount + pfbGasFixedCost) +} diff --git a/testing/txsim/client.go b/testing/txsim/client.go new file mode 100644 index 00000000..0e71a956 --- /dev/null +++ b/testing/txsim/client.go @@ -0,0 +1,304 @@ +package txsim + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/celestiaorg/celestia-app/app/encoding" + blob "github.com/celestiaorg/celestia-app/x/blob/types" + sdkclient "github.com/cosmos/cosmos-sdk/client" + auth "github.com/cosmos/cosmos-sdk/x/auth/types" + bank "github.com/cosmos/cosmos-sdk/x/bank/types" + protogrpc "github.com/gogo/protobuf/grpc" + "github.com/tendermint/tendermint/rpc/client/http" + coretypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + // how often to poll the network for the latest height + DefaultPollTime = 3 * time.Second + + // how many times to wait for a transaction to be committed before + // concluding that it has failed + maxRetries = 10 + + rpcContextTimeout = 10 * time.Second +) + +var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed") + +// TxClient is a client for submitting transactions to one of several nodes. It uses a round-robin +// algorithm for multiplexing requests across multiple clients. +type TxClient struct { + rpcClients []*http.HTTP + encCfg encoding.Config + chainID string + pollTime time.Duration + + mtx sync.Mutex + // index indicates which client to use next + index int + height int64 + lastUpdated time.Time +} + +func NewTxClient(ctx context.Context, encCfg encoding.Config, pollTime time.Duration, rpcEndpoints []string) (*TxClient, error) { + if len(rpcEndpoints) == 0 { + return nil, errors.New("must have at least one endpoint specified") + } + + // setup all the rpc clients to communicate with full nodes + rpcClients := make([]*http.HTTP, len(rpcEndpoints)) + var ( + err error + chainID string + height int64 + ) + for i, endpoint := range rpcEndpoints { + rpcClients[i], err = http.New(endpoint, "/websocket") + if err != nil { + return nil, fmt.Errorf("error creating rpc client with endpoint %s: %w", endpoint, err) + } + + // check that the node is up + status, err := rpcClients[i].Status(ctx) + if err != nil { + return nil, fmt.Errorf("error getting status from rpc server %s: %w", endpoint, err) + } + + // set the chainID + if chainID == "" { + chainID = status.NodeInfo.Network + } + + // set the latest height + if status.SyncInfo.EarliestBlockHeight > height { + height = status.SyncInfo.EarliestBlockHeight + } + } + return &TxClient{ + rpcClients: rpcClients, + encCfg: encCfg, + chainID: chainID, + pollTime: pollTime, + height: height, + lastUpdated: time.Now(), + }, nil +} + +func (tc *TxClient) Tx() sdkclient.TxBuilder { + builder := tc.encCfg.TxConfig.NewTxBuilder() + return builder +} + +func (tc *TxClient) ChainID() string { + return tc.chainID +} + +func (tc *TxClient) Height() int64 { + tc.mtx.Lock() + defer tc.mtx.Unlock() + return tc.height +} + +func (tc *TxClient) updateHeight(newHeight int64) int64 { + tc.mtx.Lock() + defer tc.mtx.Unlock() + if newHeight > tc.height { + tc.height = newHeight + tc.lastUpdated = time.Now() + return newHeight + } + return tc.height +} + +func (tc *TxClient) LastUpdated() time.Time { + tc.mtx.Lock() + defer tc.mtx.Unlock() + return tc.lastUpdated +} + +// WaitForNBlocks uses WaitForHeight to wait for the given number of blocks to +// be produced. +func (tc *TxClient) WaitForNBlocks(ctx context.Context, blocks int64) error { + return tc.WaitForHeight(ctx, tc.Height()+blocks) +} + +// WaitForHeight continually polls the network for the latest height. It is +// concurrently safe. +func (tc *TxClient) WaitForHeight(ctx context.Context, height int64) error { + // check if we can immediately return + if height <= tc.Height() { + return nil + } + + ticker := time.NewTicker(tc.pollTime) + for { + select { + case <-ticker.C: + // check if we've reached the target height + if height <= tc.Height() { + return nil + } + // check when the last time we polled to avoid concurrent processes + // from polling the network too often + if time.Since(tc.LastUpdated()) < tc.pollTime { + continue + } + + // ping a node for their latest height + status, err := tc.Client().Status(ctx) + if err != nil { + return fmt.Errorf("error getting status from rpc server: %w", err) + } + + latestHeight := tc.updateHeight(status.SyncInfo.LatestBlockHeight) + // check if the new latest height is greater or equal than the target height + if latestHeight >= height { + return nil + } + + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (tc *TxClient) WaitForTx(ctx context.Context, txID []byte) (*coretypes.ResultTx, error) { + for i := 0; i < maxRetries; i++ { + subctx, cancel := context.WithTimeout(ctx, rpcContextTimeout) + defer cancel() + + resp, err := tc.Client().Tx(subctx, txID, false) + if err != nil { + // sub context timed out but the parent hasn't (we retry) + if subctx.Err() != nil && ctx.Err() == nil { + continue + } + + // tx still no longer exists + if strings.Contains(err.Error(), "not found") { + time.Sleep(tc.pollTime) + continue + } + return nil, err + } + + if resp.TxResult.Code != 0 { + return nil, fmt.Errorf("non zero code delivering tx (%d): %s", resp.TxResult.Code, resp.TxResult.Log) + } + + return resp, nil + } + return nil, errTimedOutWaitingForTx +} + +// Client multiplexes the RPC clients +func (tc *TxClient) Client() *http.HTTP { + tc.mtx.Lock() + defer tc.mtx.Unlock() + defer tc.next() + return tc.rpcClients[tc.index] +} + +// Broadcast encodes and broadcasts a transaction to the network. If CheckTx fails, +// the error will be returned. The method does not wait for the transaction to be +// included in a block. +func (tc *TxClient) Broadcast(ctx context.Context, txBuilder sdkclient.TxBuilder, blobs []*blob.Blob) (*coretypes.ResultTx, error) { + tx, err := tc.encCfg.TxConfig.TxEncoder()(txBuilder.GetTx()) + if err != nil { + return nil, fmt.Errorf("error encoding tx: %w", err) + } + + // If blobs exist, these are bundled into the existing tx. + if len(blobs) > 0 { + txWithBlobs, err := types.MarshalBlobTx(tx, blobs...) + if err != nil { + return nil, err + } + tx = txWithBlobs + } + + for { + subctx, cancel := context.WithTimeout(ctx, rpcContextTimeout) + defer cancel() + + resp, err := tc.Client().BroadcastTxSync(subctx, tx) + if err != nil { + if subctx.Err() != nil { + continue + } + return nil, err + } + + if resp.Code != 0 { + return nil, fmt.Errorf("non zero code checking tx (%d): %s", resp.Code, resp.Log) + } + + return tc.WaitForTx(ctx, resp.Hash) + } +} + +// next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex. +func (tc *TxClient) next() { + tc.index = (tc.index + 1) % len(tc.rpcClients) +} + +// QueryClient multiplexes requests across multiple running gRPC connections. It does this in a round-robin fashion. +type QueryClient struct { + connections []*grpc.ClientConn + + mtx sync.Mutex + // index indicates which client to be used next + index int +} + +func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) { + connections := make([]*grpc.ClientConn, len(grpcEndpoints)) + for idx, endpoint := range grpcEndpoints { + conn, err := grpc.Dial(grpcEndpoints[0], grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("dialing %s: %w", endpoint, err) + } + connections[idx] = conn + } + + return &QueryClient{ + connections: connections, + }, nil +} + +// next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex. +func (qc *QueryClient) next() { + qc.index = (qc.index + 1) % len(qc.connections) +} + +func (qc *QueryClient) Conn() protogrpc.ClientConn { + qc.mtx.Lock() + defer qc.mtx.Unlock() + defer qc.next() + return qc.connections[qc.index] +} + +func (qc *QueryClient) Bank() bank.QueryClient { + return bank.NewQueryClient(qc.Conn()) +} + +func (qc *QueryClient) Auth() auth.QueryClient { + return auth.NewQueryClient(qc.Conn()) +} + +func (qc *QueryClient) Close() error { + var err error + for _, conn := range qc.connections { + err = conn.Close() + } + return err +} diff --git a/testing/txsim/run.go b/testing/txsim/run.go new file mode 100644 index 00000000..ac6efb2f --- /dev/null +++ b/testing/txsim/run.go @@ -0,0 +1,104 @@ +package txsim + +import ( + "context" + "errors" + "fmt" + "math/rand" + "time" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/rs/zerolog/log" +) + +// Run is the entrypoint function for starting the txsim client. The lifecycle of the client is managed +// through the context. At least one grpc and rpc endpoint must be provided. The client relies on a +// single funded master account present in the keyring. The client allocates subaccounts for sequences +// at runtime. A seed can be provided for deterministic randomness. The pollTime dictates the frequency +// that the client should check for updates from state and that transactions have been committed. +// +// This should be used for testing purposes only. +// +// All sequences can be scaled up using the `Clone` method. This allows for a single sequence that +// repeatedly sends random PFBs to be scaled up to 1000 accounts sending PFBs. +func Run( + ctx context.Context, + rpcEndpoints, grpcEndpoints []string, + keys keyring.Keyring, + seed int64, + pollTime time.Duration, + sequences ...Sequence, +) error { + r := rand.New(rand.NewSource(seed)) + + txClient, err := NewTxClient(ctx, encoding.MakeConfig(app.ModuleEncodingRegisters...), pollTime, rpcEndpoints) + if err != nil { + return err + } + + queryClient, err := NewQueryClient(grpcEndpoints) + if err != nil { + return err + } + defer queryClient.Close() + + // Create the account manager to handle account transactions. + manager, err := NewAccountManager(ctx, keys, txClient, queryClient) + if err != nil { + return err + } + + // Initiaize each of the sequences by allowing them to allocate accounts. + for _, sequence := range sequences { + sequence.Init(ctx, manager.query.Conn(), manager.AllocateAccounts, r) + } + + // Generate the allotted accounts on chain by sending them sufficient funds + if err := manager.GenerateAccounts(ctx); err != nil { + return err + } + + errCh := make(chan error, len(sequences)) + + // Spin up a task group to run each of the sequences concurrently. + for idx, sequence := range sequences { + go func(seqID int, sequence Sequence, errCh chan<- error) { + opNum := 0 + r := rand.New(rand.NewSource(seed)) + // each sequence loops through the next set of operations, the new messages are then + // submitted on chain + for { + ops, err := sequence.Next(ctx, manager.query.Conn(), r) + if err != nil { + errCh <- fmt.Errorf("sequence %d: %w", seqID, err) + return + } + + // Submit the messages to the chain. + if err := manager.Submit(ctx, ops); err != nil { + errCh <- fmt.Errorf("sequence %d: %w", seqID, err) + return + } + opNum++ + } + }(idx, sequence, errCh) + } + + var finalErr error + for i := 0; i < len(sequences); i++ { + err := <-errCh + if err == nil { // should never happen + continue + } + if errors.Is(err, EndOfSequence) { + log.Info().Err(err).Msg("sequence terminated") + continue + } + log.Error().Err(err).Msg("sequence failed") + finalErr = err + } + + return finalErr +} diff --git a/testing/txsim/run_test.go b/testing/txsim/run_test.go new file mode 100644 index 00000000..3dacf1c7 --- /dev/null +++ b/testing/txsim/run_test.go @@ -0,0 +1,156 @@ +//go:build !race + +// known race in testnode +// ref: https://github.com/celestiaorg/celestia-app/issues/1369 +package txsim_test + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/celestiaorg/celestia-app/testing/txsim" + "github.com/celestiaorg/celestia-app/testutil/testnode" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + sdk "github.com/cosmos/cosmos-sdk/types" + + blob "github.com/celestiaorg/celestia-app/x/blob/types" + bank "github.com/cosmos/cosmos-sdk/x/bank/types" + distribution "github.com/cosmos/cosmos-sdk/x/distribution/types" + staking "github.com/cosmos/cosmos-sdk/x/staking/types" + "github.com/stretchr/testify/require" +) + +func TestTxSimulator(t *testing.T) { + testCases := []struct { + name string + sequences []txsim.Sequence + expMessages map[string]int64 + }{ + { + name: "send sequence", + sequences: []txsim.Sequence{txsim.NewSendSequence(2, 1000, 100)}, + // we expect at least 5 bank send messages within 30 seconds + expMessages: map[string]int64{sdk.MsgTypeURL(&bank.MsgSend{}): 5}, + }, + { + name: "stake sequence", + sequences: []txsim.Sequence{txsim.NewStakeSequence(1000)}, + expMessages: map[string]int64{ + sdk.MsgTypeURL(&staking.MsgDelegate{}): 1, + sdk.MsgTypeURL(&distribution.MsgWithdrawDelegatorReward{}): 5, + // NOTE: this sequence also makes redelegations but because the + // testnet has only one validator, this never happens + }, + }, + { + name: "blob sequence", + sequences: []txsim.Sequence{ + txsim.NewBlobSequence( + txsim.NewRange(100, 1000), + txsim.NewRange(1, 3)), + }, + expMessages: map[string]int64{sdk.MsgTypeURL(&blob.MsgPayForBlobs{}): 10}, + }, + { + name: "multi blob sequence", + sequences: txsim.NewBlobSequence( + txsim.NewRange(1000, 1000), + txsim.NewRange(3, 3), + ).Clone(4), + expMessages: map[string]int64{sdk.MsgTypeURL(&blob.MsgPayForBlobs{}): 20}, + }, + { + name: "multi mixed sequence", + sequences: append(append( + txsim.NewSendSequence(2, 1000, 100).Clone(3), + txsim.NewStakeSequence(1000).Clone(3)...), + txsim.NewBlobSequence(txsim.NewRange(1000, 1000), txsim.NewRange(1, 3)).Clone(3)...), + expMessages: map[string]int64{ + sdk.MsgTypeURL(&bank.MsgSend{}): 15, + sdk.MsgTypeURL(&staking.MsgDelegate{}): 2, + sdk.MsgTypeURL(&distribution.MsgWithdrawDelegatorReward{}): 10, + sdk.MsgTypeURL(&blob.MsgPayForBlobs{}): 10, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + keyring, rpcAddr, grpcAddr := Setup(t) + + err := txsim.Run( + ctx, + []string{rpcAddr}, + []string{grpcAddr}, + keyring, + 9001, + time.Second, + tc.sequences..., + ) + // Expect all sequences to run for at least 30 seconds without error + require.True(t, errors.Is(err, context.DeadlineExceeded), err.Error()) + + blocks, err := testnode.ReadBlockchain(context.Background(), rpcAddr) + require.NoError(t, err) + for _, block := range blocks { + msgs, err := testnode.DecodeBlockData(block.Data) + require.NoError(t, err, block.Height) + for _, msg := range msgs { + if _, ok := tc.expMessages[sdk.MsgTypeURL(msg)]; ok { + tc.expMessages[sdk.MsgTypeURL(msg)]-- + } + } + } + for msg, count := range tc.expMessages { + if count > 0 { + t.Errorf("missing %d messages of type %s (blocks: %d)", count, msg, len(blocks)) + } + } + }) + } +} + +func Setup(t testing.TB) (keyring.Keyring, string, string) { + t.Helper() + genesis, keyring, err := testnode.DefaultGenesisState() + require.NoError(t, err) + + tmCfg := testnode.DefaultTendermintConfig() + tmCfg.RPC.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", testnode.GetFreePort()) + tmCfg.P2P.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", testnode.GetFreePort()) + tmCfg.RPC.GRPCListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", testnode.GetFreePort()) + + node, app, cctx, err := testnode.New( + t, + testnode.DefaultParams(), + tmCfg, + true, + genesis, + keyring, + "testnet", + ) + require.NoError(t, err) + + cctx, stopNode, err := testnode.StartNode(node, cctx) + require.NoError(t, err) + + appConf := testnode.DefaultAppConfig() + appConf.GRPC.Address = fmt.Sprintf("127.0.0.1:%d", testnode.GetFreePort()) + appConf.API.Address = fmt.Sprintf("tcp://127.0.0.1:%d", testnode.GetFreePort()) + + _, cleanupGRPC, err := testnode.StartGRPCServer(app, appConf, cctx) + require.NoError(t, err) + + t.Cleanup(func() { + t.Log("tearing down testnode") + require.NoError(t, stopNode()) + require.NoError(t, cleanupGRPC()) + }) + + return keyring, tmCfg.RPC.ListenAddress, appConf.GRPC.Address +} diff --git a/testing/txsim/send.go b/testing/txsim/send.go new file mode 100644 index 00000000..04c74779 --- /dev/null +++ b/testing/txsim/send.go @@ -0,0 +1,69 @@ +package txsim + +import ( + "context" + "math/rand" + + "github.com/celestiaorg/celestia-app/app" + "github.com/cosmos/cosmos-sdk/types" + bank "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/gogo/protobuf/grpc" +) + +var _ Sequence = &SendSequence{} + +const ( + sendGasLimit = 100000 + sendFee = sendGasLimit * DefaultGasPrice +) + +// SendSequence sets up an endless sequence of send transactions, moving tokens +// between a set of accounts +type SendSequence struct { + numAccounts int + sendAmount int + maxHeightDelay int + accounts []types.AccAddress + index int + numIterations int +} + +func NewSendSequence(numAccounts, sendAmount, numIterations int) *SendSequence { + return &SendSequence{ + numAccounts: numAccounts, + sendAmount: sendAmount, + maxHeightDelay: 5, + numIterations: numIterations, + } +} + +func (s *SendSequence) Clone(n int) []Sequence { + sequenceGroup := make([]Sequence, n) + for i := 0; i < n; i++ { + sequenceGroup[i] = NewSendSequence(s.numAccounts, s.sendAmount, s.numIterations) + } + return sequenceGroup +} + +// Init sets up the accounts involved in the sequence. It calculates the necessary balance as the fees per transaction +// multiplied by the number of expected iterations plus the amount to be sent from one account to another +func (s *SendSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { + amount := s.sendAmount + (s.numIterations * int(sendFee)) + s.accounts = allocateAccounts(s.numAccounts, amount) +} + +// Next sumbits a transaction to remove funds from one account to the next +func (s *SendSequence) Next(_ context.Context, _ grpc.ClientConn, rand *rand.Rand) (Operation, error) { + if s.index >= s.numIterations { + return Operation{}, EndOfSequence + } + op := Operation{ + Msgs: []types.Msg{ + bank.NewMsgSend(s.accounts[s.index%s.numAccounts], s.accounts[(s.index+1)%s.numAccounts], types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(s.sendAmount)))), + }, + Delay: rand.Int63n(int64(s.maxHeightDelay)), + GasLimit: sendGasLimit, + } + s.index++ + return op, nil +} diff --git a/testing/txsim/sequence.go b/testing/txsim/sequence.go new file mode 100644 index 00000000..3f088fb0 --- /dev/null +++ b/testing/txsim/sequence.go @@ -0,0 +1,57 @@ +package txsim + +import ( + "context" + "errors" + "math/rand" + + blob "github.com/celestiaorg/celestia-app/x/blob/types" + "github.com/cosmos/cosmos-sdk/types" + "github.com/gogo/protobuf/grpc" +) + +// Sequence is the basic unit for programmatic transaction generation. +// It embodies a pattern of transactions which are executed among a group +// of accounts in isolation from the rest of the state machine. +type Sequence interface { + // Clone replicates n instances of the sequence for scaling up the load + // on a network. This is called before `Init` + Clone(n int) []Sequence + + // Init allows the sequence to initialize itself. It may read the current state of + // the chain and provision accounts for usage throughout the sequence. + // For any randomness, use the rand source provided. + Init(ctx context.Context, querier grpc.ClientConn, accountAllocator AccountAllocator, rand *rand.Rand) + + // Next returns the next operation in the sequence. It returns EndOfSequence + // when the sequence has been exhausted. The sequence may make use of the + // grpc connection to query the state of the network as well as the deterministic + // random number generator. Any error will abort the rest of the sequence. + Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) +} + +// An operation represents a series of messages and blobs that are to be bundled in a +// single transaction. A delay (in heights) may also be set before the transaction is sent. +// The gas limit and price can also be set. If left at 0, the DefaultGasLimit will be used. +type Operation struct { + Msgs []types.Msg + Blobs []*blob.Blob + Delay int64 + GasLimit uint64 + GasPrice float64 +} + +const ( + // set default gas limit to cover the costs of most transactions + // At 0.001 utia per gas, this equates to 1000utia per transaction + DefaultGasLimit = 1000000 + DefaultGasPrice = 0.001 +) + +// EndOfSequence is a special error which indicates that the sequence has been terminated +// nolint: revive +var EndOfSequence = errors.New("end of sequence") + +// AccountAllocator reserves and funds a series of accounts to be used exclusively by +// the Sequence. +type AccountAllocator func(n, balance int) []types.AccAddress diff --git a/testing/txsim/stake.go b/testing/txsim/stake.go new file mode 100644 index 00000000..c2db0439 --- /dev/null +++ b/testing/txsim/stake.go @@ -0,0 +1,109 @@ +package txsim + +import ( + "context" + "math/rand" + + "github.com/celestiaorg/celestia-app/app" + "github.com/cosmos/cosmos-sdk/types" + distribution "github.com/cosmos/cosmos-sdk/x/distribution/types" + staking "github.com/cosmos/cosmos-sdk/x/staking/types" + "github.com/gogo/protobuf/grpc" +) + +var _ Sequence = &StakeSequence{} + +// StakeSequence sets up an endless sequence whereby an account delegates to a validator, continuously claims +// the reward, and occasionally redelegates to another validator at random. The account only ever delegates +// to a single validator at a time. TODO: Allow for multiple delegations +type StakeSequence struct { + initialStake int + redelegatePropability int + delegatedTo string + account types.AccAddress +} + +func NewStakeSequence(initialStake int) *StakeSequence { + return &StakeSequence{ + initialStake: initialStake, + redelegatePropability: 10, // 1 in every 10 + } +} + +func (s *StakeSequence) Clone(n int) []Sequence { + sequenceGroup := make([]Sequence, n) + for i := 0; i < n; i++ { + sequenceGroup[i] = NewStakeSequence(s.initialStake) + } + return sequenceGroup +} + +func (s *StakeSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { + s.account = allocateAccounts(1, s.initialStake+fundsForGas)[0] +} + +func (s *StakeSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) { + var op Operation + + // for the first operation, the account delegates to a validator + if s.delegatedTo == "" { + val, err := getRandomValidator(ctx, querier, rand) + if err != nil { + return Operation{}, err + } + s.delegatedTo = val.OperatorAddress + return Operation{ + Msgs: []types.Msg{ + &staking.MsgDelegate{ + DelegatorAddress: s.account.String(), + ValidatorAddress: s.delegatedTo, + Amount: types.NewInt64Coin(app.BondDenom, int64(s.initialStake)), + }, + }, + }, nil + } + + // occasionally redelegate the initial stake to another validator at random + if rand.Intn(s.redelegatePropability) == 0 { + val, err := getRandomValidator(ctx, querier, rand) + if err != nil { + return Operation{}, err + } + if val.OperatorAddress != s.delegatedTo { + op = Operation{ + Msgs: []types.Msg{ + &staking.MsgBeginRedelegate{ + DelegatorAddress: s.account.String(), + ValidatorSrcAddress: s.delegatedTo, + ValidatorDstAddress: val.OperatorAddress, + // NOTE: only the initial stake is redelgated (not the entire balance) + Amount: types.NewInt64Coin(app.BondDenom, int64(s.initialStake)), + }, + }, + } + s.delegatedTo = val.OperatorAddress + return op, nil + } + } + + // claim pending rewards + op = Operation{ + Msgs: []types.Msg{ + &distribution.MsgWithdrawDelegatorReward{ + DelegatorAddress: s.account.String(), + ValidatorAddress: s.delegatedTo, + }, + }, + Delay: rand.Int63n(20), + } + + return op, nil +} + +func getRandomValidator(ctx context.Context, conn grpc.ClientConn, rand *rand.Rand) (staking.Validator, error) { + resp, err := staking.NewQueryClient(conn).Validators(ctx, &staking.QueryValidatorsRequest{}) + if err != nil { + return staking.Validator{}, err + } + return resp.Validators[rand.Intn(len(resp.Validators))], nil +} diff --git a/testutil/testfactory/utils.go b/testutil/testfactory/utils.go index 8c9b7ba1..160759af 100644 --- a/testutil/testfactory/utils.go +++ b/testutil/testfactory/utils.go @@ -91,7 +91,7 @@ func FundKeyringAccounts(cdc codec.Codec, accounts ...string) (keyring.Keyring, ) genBalances[i] = banktypes.Balance{Address: addr.String(), Coins: balances.Sort()} - genAccounts[i] = authtypes.NewBaseAccount(addr, nil, 0, 0) + genAccounts[i] = authtypes.NewBaseAccount(addr, nil, uint64(i), 0) } return kr, genBalances, genAccounts } diff --git a/testutil/testnode/full_node.go b/testutil/testnode/full_node.go index efe6880c..3886b63f 100644 --- a/testutil/testnode/full_node.go +++ b/testutil/testnode/full_node.go @@ -41,7 +41,7 @@ import ( // NOTE: the forced delay between blocks, TimeIotaMs in the consensus // parameters, is set to the lowest possible value (1ms). func New( - t *testing.T, + t testing.TB, cparams *tmproto.ConsensusParams, tmCfg *config.Config, supressLog bool, @@ -190,9 +190,9 @@ func DefaultNetwork(t *testing.T, blockTime time.Duration) (accounts []string, c tmCfg := DefaultTendermintConfig() tmCfg.Consensus.TimeoutCommit = blockTime - tmCfg.RPC.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", getFreePort()) - tmCfg.P2P.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", getFreePort()) - tmCfg.RPC.GRPCListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", getFreePort()) + tmCfg.RPC.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", GetFreePort()) + tmCfg.P2P.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", GetFreePort()) + tmCfg.RPC.GRPCListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", GetFreePort()) genState, kr, err := DefaultGenesisState(accounts...) require.NoError(t, err) @@ -204,8 +204,8 @@ func DefaultNetwork(t *testing.T, blockTime time.Duration) (accounts []string, c require.NoError(t, err) appConf := DefaultAppConfig() - appConf.GRPC.Address = fmt.Sprintf("127.0.0.1:%d", getFreePort()) - appConf.API.Address = fmt.Sprintf("tcp://127.0.0.1:%d", getFreePort()) + appConf.GRPC.Address = fmt.Sprintf("127.0.0.1:%d", GetFreePort()) + appConf.API.Address = fmt.Sprintf("tcp://127.0.0.1:%d", GetFreePort()) cctx, cleanupGRPC, err := StartGRPCServer(app, appConf, cctx) require.NoError(t, err) @@ -219,7 +219,7 @@ func DefaultNetwork(t *testing.T, blockTime time.Duration) (accounts []string, c return accounts, cctx } -func getFreePort() int { +func GetFreePort() int { a, err := net.ResolveTCPAddr("tcp", "localhost:0") if err == nil { var l *net.TCPListener diff --git a/testutil/testnode/full_node_test.go b/testutil/testnode/full_node_test.go index ede2b270..40f49801 100644 --- a/testutil/testnode/full_node_test.go +++ b/testutil/testnode/full_node_test.go @@ -67,7 +67,7 @@ func TestIntegrationTestSuite(t *testing.T) { func (s *IntegrationTestSuite) Test_FillBlock() { require := s.Require() - for squareSize := 2; squareSize < appconsts.DefaultMaxSquareSize; squareSize *= 2 { + for squareSize := 2; squareSize <= appconsts.DefaultMaxSquareSize; squareSize *= 2 { resp, err := s.cctx.FillBlock(squareSize, s.accounts, flags.BroadcastAsync) require.NoError(err) diff --git a/testutil/testnode/node_init.go b/testutil/testnode/node_init.go index f79197da..e28813b4 100644 --- a/testutil/testnode/node_init.go +++ b/testutil/testnode/node_init.go @@ -188,7 +188,7 @@ func writeFile(name string, dir string, contents []byte) error { return nil } -func initFileStructure(t *testing.T, tmCfg *config.Config) (string, error) { +func initFileStructure(t testing.TB, tmCfg *config.Config) (string, error) { basePath := filepath.Join(t.TempDir(), ".celestia-app") tmCfg.SetRoot(basePath) configPath := filepath.Join(basePath, "config") diff --git a/testutil/testnode/node_interaction_api.go b/testutil/testnode/node_interaction_api.go index 2a451faf..660e041c 100644 --- a/testutil/testnode/node_interaction_api.go +++ b/testutil/testnode/node_interaction_api.go @@ -177,7 +177,7 @@ func (c *Context) FillBlock(squareSize int, accounts []string, broadcastMode str // in order to get the square size that we want, we need to fill half the // square minus a few for the tx (see the square estimation logic in // app/estimate_square_size.go) - shareCount := (squareSize * squareSize / 2) - 1 + shareCount := (squareSize * squareSize / 2) - 2 // we use a formula to guarantee that the tx is the exact size needed to force a specific square size. blobSize := shareCount * appconsts.ContinuationSparseShareContentSize // this last patch allows for the formula above to work on a square size of diff --git a/testutil/testnode/read.go b/testutil/testnode/read.go new file mode 100644 index 00000000..c4d87334 --- /dev/null +++ b/testutil/testnode/read.go @@ -0,0 +1,133 @@ +package testnode + +import ( + "context" + "fmt" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/tendermint/tendermint/rpc/client/http" + "github.com/tendermint/tendermint/types" +) + +func ReadRecentBlocks(ctx context.Context, rpcAddress string, blocks int64) ([]*types.Block, error) { + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return nil, err + } + status, err := client.Status(ctx) + if err != nil { + return nil, err + } + if status.SyncInfo.LatestBlockHeight < blocks { + return nil, fmt.Errorf("latest block height %d is less than requested blocks %d", status.SyncInfo.LatestBlockHeight, blocks) + } + return ReadBlockHeights(ctx, rpcAddress, status.SyncInfo.LatestBlockHeight-blocks+1, status.SyncInfo.LatestBlockHeight) +} + +func ReadBlockchain(ctx context.Context, rpcAddress string) ([]*types.Block, error) { + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return nil, err + } + status, err := client.Status(ctx) + if err != nil { + return nil, err + } + return ReadBlockHeights(ctx, rpcAddress, 1, status.SyncInfo.LatestBlockHeight) +} + +func ReadBlockHeights(ctx context.Context, rpcAddress string, fromHeight, toHeight int64) ([]*types.Block, error) { + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return nil, err + } + blocks := make([]*types.Block, toHeight-fromHeight+1) + for i := fromHeight; i <= toHeight; i++ { + resp, err := client.Block(ctx, &i) + if err != nil { + return nil, err + } + blocks[i-fromHeight] = resp.Block + } + return blocks, nil +} + +func DecodeBlockData(data types.Data) ([]sdk.Msg, error) { + encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) + decoder := encoding.IndexWrapperDecoder(encCfg.TxConfig.TxDecoder()) + msgs := make([]sdk.Msg, 0) + for _, txBytes := range data.Txs { + tx, err := decoder(txBytes) + if err != nil { + return nil, fmt.Errorf("decoding tx: %s: %w", string(txBytes), err) + } + msgs = append(msgs, tx.GetMsgs()...) + } + return msgs, nil +} + +func CalculateMeanGasFromRecentBlocks(ctx context.Context, rpcAddress, msgType string, blocks int64) (float64, int64, error) { + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return 0.0, 0, err + } + status, err := client.Status(ctx) + if err != nil { + return 0.0, 0, err + } + if status.SyncInfo.LatestBlockHeight <= blocks { + return 0.0, 0, fmt.Errorf("latest block height %d is less than %d", status.SyncInfo.LatestBlockHeight, blocks) + } + return CalculateMeanGas(ctx, rpcAddress, msgType, status.SyncInfo.LatestBlockHeight-blocks+1, status.SyncInfo.LatestBlockHeight) +} + +func CalculateMeanGas(ctx context.Context, rpcAddress, msgType string, fromHeight int64, toHeight int64) (float64, int64, error) { + var ( + encCfg = encoding.MakeConfig(app.ModuleEncodingRegisters...) + decoder = encoding.IndexWrapperDecoder(encCfg.TxConfig.TxDecoder()) + totalGas int64 + count int64 + average = func() float64 { + if count == 0 { + return 0 + } + return float64(totalGas) / float64(count) + } + ) + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return 0.0, 0, err + } + + for height := fromHeight; height <= toHeight; height++ { + resp, err := client.Block(ctx, &height) + if err != nil { + return average(), count, err + } + indices := make([]int, 0, len(resp.Block.Data.Txs)) + for i, rawTx := range resp.Block.Data.Txs { + tx, err := decoder(rawTx) + if err != nil { + return average(), count, fmt.Errorf("decoding tx (height: %d): %w", height, err) + } + msgs := tx.GetMsgs() + // multi message transactions are not included + if len(msgs) == 1 && sdk.MsgTypeURL(msgs[0]) == msgType { + indices = append(indices, i) + } + } + if len(indices) > 0 { + results, err := client.BlockResults(ctx, &height) + if err != nil { + return average(), count, fmt.Errorf("getting block results (height %d): %w", height, err) + } + for _, i := range indices { + totalGas += results.TxsResults[i].GasUsed + count++ + } + } + } + return average(), count, nil +}