Skip to content

Commit

Permalink
fix: tx client concurrency test (#4104)
Browse files Browse the repository at this point in the history
This PR makes two small tweaks:
- Fixes `TestConcurrentTxSubmission` by adding a capacity of 1 to the
errCh. Currently errors were being ignored because the wait group meant
that there wasn't a process to read to the channel as it was being
written to. This fixes this
- Catches the case where a user cancels the context when calling
`ConfirmTx`

**This test is broken until
celestiaorg/celestia-core#1553 is resolved**

---------

Co-authored-by: nina / ნინა <[email protected]>
(cherry picked from commit f21716b)
  • Loading branch information
cmwaters authored and mergify[bot] committed Jan 17, 2025
1 parent 6b92156 commit 68a235b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 40 deletions.
88 changes: 48 additions & 40 deletions pkg/user/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package user_test
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/celestiaorg/celestia-app/v3/test/util/testnode"
"github.com/celestiaorg/go-square/v2/share"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/config"
tmrand "github.com/tendermint/tendermint/libs/rand"
)

Expand All @@ -21,50 +23,56 @@ func TestConcurrentTxSubmission(t *testing.T) {
t.Skip("skipping in short mode")
}

// Setup network
tmConfig := testnode.DefaultTendermintConfig()
tmConfig.Consensus.TimeoutCommit = 10 * time.Second
ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig))
_, err := ctx.WaitForHeight(1)
require.NoError(t, err)
// Iterate over all mempool versions
mempools := []string{config.MempoolV0, config.MempoolV1, config.MempoolV2}
for _, mempool := range mempools {
t.Run(fmt.Sprintf("mempool %s", mempool), func(t *testing.T) {
// Setup network
tmConfig := testnode.DefaultTendermintConfig()
tmConfig.Mempool.Version = mempool
tmConfig.Consensus.TimeoutCommit = 10 * time.Second
ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig))
_, err := ctx.WaitForHeight(1)
require.NoError(t, err)

// Setup signer
txClient, err := testnode.NewTxClientFromContext(ctx)
require.NoError(t, err)
// Setup signer
txClient, err := testnode.NewTxClientFromContext(ctx)
require.NoError(t, err)

// Pregenerate all the blobs
numTxs := 10
blobs := blobfactory.ManyRandBlobs(tmrand.NewRand(), blobfactory.Repeat(2048, numTxs)...)
// Pregenerate all the blobs
numTxs := 100
blobs := blobfactory.ManyRandBlobs(tmrand.NewRand(), blobfactory.Repeat(2048, numTxs)...)

// Prepare transactions
var (
wg sync.WaitGroup
errCh = make(chan error)
)
// Prepare transactions
var (
wg sync.WaitGroup
errCh = make(chan error, 1)
)

subCtx, cancel := context.WithCancel(ctx.GoContext())
defer cancel()
time.AfterFunc(time.Minute, cancel)
for i := 0; i < numTxs; i++ {
wg.Add(1)
go func(b *share.Blob) {
defer wg.Done()
_, err := txClient.SubmitPayForBlob(subCtx, []*share.Blob{b}, user.SetGasLimitAndGasPrice(500_000, appconsts.DefaultMinGasPrice))
if err != nil && !errors.Is(err, context.Canceled) {
// only catch the first error
select {
case errCh <- err:
cancel()
default:
}
subCtx, cancel := context.WithCancel(ctx.GoContext())
defer cancel()
time.AfterFunc(time.Minute, cancel)
for i := 0; i < numTxs; i++ {
wg.Add(1)
go func(b *share.Blob) {
defer wg.Done()
_, err := txClient.SubmitPayForBlob(subCtx, []*share.Blob{b}, user.SetGasLimitAndGasPrice(500_000, appconsts.DefaultMinGasPrice))
if err != nil && !errors.Is(err, context.Canceled) {
// only catch the first error
select {
case errCh <- err:
cancel()
default:
}
}
}(blobs[i])
}
}(blobs[i])
}
wg.Wait()

select {
case err := <-errCh:
require.NoError(t, err)
default:
wg.Wait()
select {
case err := <-errCh:
require.NoError(t, err)
default:
}
})
}
}
3 changes: 3 additions & 0 deletions pkg/user/tx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon
return nil, client.handleEvictions(txHash)
default:
client.deleteFromTxTracker(txHash)
if ctx.Err() != nil {
return nil, ctx.Err()
}
return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash)
}
}
Expand Down

0 comments on commit 68a235b

Please sign in to comment.