Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions data/transactions/verify/txnBatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/util/execpool"
)

// UnverifiedTxnSigJob is the sig verification job passed to the Stream verifier
Expand Down Expand Up @@ -113,7 +114,7 @@ type LedgerForStreamVerifier interface {
BlockHdr(rnd basics.Round) (blk bookkeeping.BlockHeader, err error)
}

func (tbp *txnSigBatchProcessor) Cleanup(pending []InputJob, err error) {
func (tbp *txnSigBatchProcessor) Cleanup(pending []execpool.InputJob, err error) {
// report an error for the unchecked txns
// drop the messages without reporting if the receiver does not consume
for i := range pending {
Expand All @@ -122,7 +123,7 @@ func (tbp *txnSigBatchProcessor) Cleanup(pending []InputJob, err error) {
}
}

func (tbp txnSigBatchProcessor) GetErredUnprocessed(ue InputJob, err error) {
func (tbp txnSigBatchProcessor) GetErredUnprocessed(ue execpool.InputJob, err error) {
uelt := ue.(*UnverifiedTxnSigJob)
tbp.sendResult(uelt.TxnGroup, uelt.BacklogMessage, err)
}
Expand All @@ -143,7 +144,7 @@ func (tbp txnSigBatchProcessor) sendResult(veTxnGroup []transactions.SignedTxn,

// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface
func MakeSigVerifyJobProcessor(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache,
resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob) (svp BatchProcessor, err error) {
resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob) (svp execpool.BatchProcessor, err error) {
latest := ledger.Latest()
latestHdr, err := ledger.BlockHdr(latest)
if err != nil {
Expand All @@ -162,14 +163,14 @@ func MakeSigVerifyJobProcessor(ledger LedgerForStreamVerifier, cache VerifiedTra
}, nil
}

func (tbp *txnSigBatchProcessor) ProcessBatch(txns []InputJob) {
func (tbp *txnSigBatchProcessor) ProcessBatch(txns []execpool.InputJob) {
batchVerifier, ctx := tbp.preProcessUnverifiedTxns(txns)
failed, err := batchVerifier.VerifyWithFeedback()
// this error can only be crypto.ErrBatchHasFailedSigs
tbp.postProcessVerifiedJobs(ctx, failed, err)
}

func (tbp *txnSigBatchProcessor) preProcessUnverifiedTxns(uTxns []InputJob) (batchVerifier *crypto.BatchVerifier, ctx interface{}) {
func (tbp *txnSigBatchProcessor) preProcessUnverifiedTxns(uTxns []execpool.InputJob) (batchVerifier *crypto.BatchVerifier, ctx interface{}) {
batchVerifier = crypto.MakeBatchVerifier()
bl := makeBatchLoad(len(uTxns))
// TODO: separate operations here, and get the sig verification inside the LogicSig to the batch here
Expand Down
48 changes: 24 additions & 24 deletions data/transactions/verify/txnBatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
var droppedFromPool = metrics.MakeCounter(metrics.MetricName{Name: "test_streamVerifierTestCore_messages_dropped_pool", Description: "Test streamVerifierTestCore messages dropped from pool"})

func streamVerifierTestCore(txnGroups [][]transactions.SignedTxn, badTxnGroups map[uint64]struct{},
expectedError error, t *testing.T) (sv *StreamToBatch) {
expectedError error, t *testing.T) (sv *execpool.StreamToBatch) {

numOfTxnGroups := len(txnGroups)
verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t)
Expand All @@ -57,12 +57,12 @@ func streamVerifierTestCore(txnGroups [][]transactions.SignedTxn, badTxnGroups m

defer cancel()

inputChan := make(chan InputJob)
inputChan := make(chan execpool.InputJob)
resultChan := make(chan *VerificationResult, txBacklogSize)
droppedChan := make(chan *UnverifiedTxnSigJob)
ep, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{}, cache, resultChan, droppedChan)
require.NoError(t, err)
sv = MakeStreamToBatch(inputChan, verificationPool, ep)
sv = execpool.MakeStreamToBatch(inputChan, verificationPool, ep)
sv.Start(ctx)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -408,12 +408,12 @@ func TestStreamToBatchPoolShutdown(t *testing.T) { //nolint:paralleltest // Not
ctx, cancel := context.WithCancel(context.Background())
cache := MakeVerifiedTransactionCache(50000)

inputChan := make(chan InputJob)
inputChan := make(chan execpool.InputJob)
resultChan := make(chan *VerificationResult, txBacklogSize)
droppedChan := make(chan *UnverifiedTxnSigJob)
ep, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{}, cache, resultChan, droppedChan)
require.NoError(t, err)
sv := MakeStreamToBatch(inputChan, verificationPool, ep)
sv := execpool.MakeStreamToBatch(inputChan, verificationPool, ep)
sv.Start(ctx)

errChan := make(chan error)
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestStreamToBatchPoolShutdown(t *testing.T) { //nolint:paralleltest // Not
}
}()
for err := range errChan {
require.ErrorIs(t, err, ErrShuttingDownError)
require.ErrorIs(t, err, execpool.ErrShuttingDownError)
}
require.Contains(t, logBuffer.String(), "addBatchToThePoolNow: EnqueueBacklog returned an error and StreamToBatch will stop: context canceled")
wg.Wait()
Expand All @@ -468,14 +468,14 @@ func TestStreamToBatchRestart(t *testing.T) {

cache := MakeVerifiedTransactionCache(50)

inputChan := make(chan InputJob)
inputChan := make(chan execpool.InputJob)
resultChan := make(chan *VerificationResult, txBacklogSize)
droppedChan := make(chan *UnverifiedTxnSigJob)

ctx, cancel := context.WithCancel(context.Background())
ep, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{}, cache, resultChan, droppedChan)
require.NoError(t, err)
sv := MakeStreamToBatch(inputChan, verificationPool, ep)
sv := execpool.MakeStreamToBatch(inputChan, verificationPool, ep)
sv.Start(ctx)

errChan := make(chan error)
Expand Down Expand Up @@ -509,7 +509,7 @@ func TestStreamToBatchRestart(t *testing.T) {
cancel()
}()
for err := range errChan {
require.ErrorIs(t, err, ErrShuttingDownError)
require.ErrorIs(t, err, execpool.ErrShuttingDownError)
}
wg.Wait()
sv.WaitForStop()
Expand Down Expand Up @@ -588,12 +588,12 @@ func TestStreamToBatchCtxCancel(t *testing.T) {
defer verificationPool.Shutdown()
ctx, cancel := context.WithCancel(context.Background())
cache := MakeVerifiedTransactionCache(50)
inputChan := make(chan InputJob)
inputChan := make(chan execpool.InputJob)
resultChan := make(chan *VerificationResult, txBacklogSize)
droppedChan := make(chan *UnverifiedTxnSigJob)
ep, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{}, cache, resultChan, droppedChan)
require.NoError(t, err)
sv := MakeStreamToBatch(inputChan, verificationPool, ep)
sv := execpool.MakeStreamToBatch(inputChan, verificationPool, ep)
sv.Start(ctx)

var result *VerificationResult
Expand All @@ -620,7 +620,7 @@ func TestStreamToBatchCtxCancel(t *testing.T) {
close(holdTasks)

wg.Wait()
require.ErrorIs(t, result.Err, ErrShuttingDownError)
require.ErrorIs(t, result.Err, execpool.ErrShuttingDownError)
}

// TestStreamToBatchCtxCancelPoolQueue tests the termination when the ctx is canceled
Expand All @@ -643,12 +643,12 @@ func TestStreamToBatchCtxCancelPoolQueue(t *testing.T) { //nolint:paralleltest /

ctx, cancel := context.WithCancel(context.Background())
cache := MakeVerifiedTransactionCache(50)
inputChan := make(chan InputJob)
inputChan := make(chan execpool.InputJob)
resultChan := make(chan *VerificationResult, txBacklogSize)
droppedChan := make(chan *UnverifiedTxnSigJob)
ep, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{}, cache, resultChan, droppedChan)
require.NoError(t, err)
sv := MakeStreamToBatch(inputChan, verificationPool, ep)
sv := execpool.MakeStreamToBatch(inputChan, verificationPool, ep)
sv.Start(ctx)

var result *VerificationResult
Expand All @@ -659,7 +659,7 @@ func TestStreamToBatchCtxCancelPoolQueue(t *testing.T) { //nolint:paralleltest /
for {
result = <-resultChan
// at least one ErrShuttingDownError is expected
if result.Err != ErrShuttingDownError {
if result.Err != execpool.ErrShuttingDownError {
continue
}
break
Expand Down Expand Up @@ -690,7 +690,7 @@ func TestStreamToBatchCtxCancelPoolQueue(t *testing.T) { //nolint:paralleltest /
// cancel the ctx as the sig is not yet sent to the exec pool
// the test might sporadically fail if between sending the txn above
// and the cancelation, 2 x waitForNextTxnDuration elapses (10ms)
time.Sleep(6 * waitForNextJobDuration)
time.Sleep(12)
go func() {
// wait a bit before releasing the tasks, so that the verificationPool ctx first gets canceled
time.Sleep(20 * time.Millisecond)
Expand All @@ -703,7 +703,7 @@ func TestStreamToBatchCtxCancelPoolQueue(t *testing.T) { //nolint:paralleltest /
cancel()

wg.Wait()
require.ErrorIs(t, result.Err, ErrShuttingDownError)
require.ErrorIs(t, result.Err, execpool.ErrShuttingDownError)
require.Contains(t, logBuffer.String(), "addBatchToThePoolNow: EnqueueBacklog returned an error and StreamToBatch will stop: context canceled")
}

Expand All @@ -725,12 +725,12 @@ func TestStreamToBatchPostVBlocked(t *testing.T) {

txBacklogSizeMod := txBacklogSize / 20

inputChan := make(chan InputJob)
inputChan := make(chan execpool.InputJob)
resultChan := make(chan *VerificationResult, txBacklogSizeMod)
droppedChan := make(chan *UnverifiedTxnSigJob)
ep, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{}, cache, resultChan, droppedChan)
require.NoError(t, err)
sv := MakeStreamToBatch(inputChan, verificationPool, ep)
sv := execpool.MakeStreamToBatch(inputChan, verificationPool, ep)

defer close(droppedChan)
go func() {
Expand Down Expand Up @@ -768,7 +768,7 @@ func TestStreamToBatchPostVBlocked(t *testing.T) {
go processResults(ctx, errChan, resultChan, numOfTxnGroups-overflow, badTxnGroups, &badSigResultCounter, &goodSigResultCounter, &wg)

for err := range errChan {
require.ErrorIs(t, err, ErrShuttingDownError)
require.ErrorIs(t, err, execpool.ErrShuttingDownError)
fmt.Println(badTxnGroups)
}

Expand All @@ -789,7 +789,7 @@ func TestStreamToBatchPostVBlocked(t *testing.T) {
}

for err := range errChan {
require.ErrorIs(t, err, ErrShuttingDownError)
require.ErrorIs(t, err, execpool.ErrShuttingDownError)
fmt.Println(badTxnGroups)
}

Expand Down Expand Up @@ -818,13 +818,13 @@ func TestStreamToBatchCancelWhenPooled(t *testing.T) {

cache := MakeVerifiedTransactionCache(50)

inputChan := make(chan InputJob)
inputChan := make(chan execpool.InputJob)
resultChan := make(chan *VerificationResult, txBacklogSize)
droppedChan := make(chan *UnverifiedTxnSigJob)
ctx, cancel := context.WithCancel(context.Background())
ep, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{}, cache, resultChan, droppedChan)
require.NoError(t, err)
sv := MakeStreamToBatch(inputChan, verificationPool, ep)
sv := execpool.MakeStreamToBatch(inputChan, verificationPool, ep)
sv.Start(ctx)

errChan := make(chan error)
Expand All @@ -849,7 +849,7 @@ func TestStreamToBatchCancelWhenPooled(t *testing.T) {
cancel()
}()
for err := range errChan {
require.ErrorIs(t, err, ErrShuttingDownError)
require.ErrorIs(t, err, execpool.ErrShuttingDownError)
}
wg.Wait()
sv.WaitForStop()
Expand Down
8 changes: 4 additions & 4 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ type TxHandler struct {
cacheConfig txHandlerConfig
ctx context.Context
ctxCancel context.CancelFunc
streamVerifier *verify.StreamToBatch
streamVerifierChan chan verify.InputJob
streamVerifier *execpool.StreamToBatch
streamVerifierChan chan execpool.InputJob
streamVerifierDropped chan *verify.UnverifiedTxnSigJob
erl *util.ElasticRateLimiter
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
msgCache: makeSaltedCache(2 * txBacklogSize),
txCanonicalCache: makeDigestCache(2 * txBacklogSize),
cacheConfig: txHandlerConfig{opts.Config.TxFilterRawMsgEnabled(), opts.Config.TxFilterCanonicalEnabled()},
streamVerifierChan: make(chan verify.InputJob),
streamVerifierChan: make(chan execpool.InputJob),
streamVerifierDropped: make(chan *verify.UnverifiedTxnSigJob),
}

Expand All @@ -198,7 +198,7 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
if err != nil {
return nil, err
}
handler.streamVerifier = verify.MakeStreamToBatch(handler.streamVerifierChan, handler.txVerificationPool, txnElementProcessor)
handler.streamVerifier = execpool.MakeStreamToBatch(handler.streamVerifierChan, handler.txVerificationPool, txnElementProcessor)
go handler.droppedTxnWatcher()
return handler, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package verify
package execpool

import (
"context"
Expand All @@ -23,7 +23,6 @@ import (
"time"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/util/execpool"
)

// ErrShuttingDownError is the error returned when a job is not processed because the service is shutting down
Expand All @@ -38,6 +37,8 @@ var ErrShuttingDownError = errors.New("not processed, execpool service is shutti
// for processing before waitForNextJobDuration.
const waitForNextJobDuration = 2 * time.Millisecond

const txnPerWorksetThreshold = 32
Comment thread
algorandskiy marked this conversation as resolved.

// batchSizeBlockLimit is the limit when the batch exceeds, will be added to the exec pool, even if the pool is saturated
// and the stream will be blocked until the exec pool accepts the batch
const batchSizeBlockLimit = 1024
Expand All @@ -61,14 +62,14 @@ type BatchProcessor interface {
// StreamToBatch makes batches from incoming stream of jobs, and submits the batches to the exec pool
type StreamToBatch struct {
inputChan <-chan InputJob
executionPool execpool.BacklogPool
executionPool BacklogPool
ctx context.Context
activeLoopWg sync.WaitGroup
batchProcessor BatchProcessor
}

// MakeStreamToBatch creates a new stream to batch converter
func MakeStreamToBatch(inputChan <-chan InputJob, execPool execpool.BacklogPool,
func MakeStreamToBatch(inputChan <-chan InputJob, execPool BacklogPool,
batchProcessor BatchProcessor) *StreamToBatch {

return &StreamToBatch{
Expand Down
Loading