diff --git a/op-challenger/sender/sender_test.go b/op-challenger/sender/sender_test.go index 5169ae67192ad..1d4e17047b74a 100644 --- a/op-challenger/sender/sender_test.go +++ b/op-challenger/sender/sender_test.go @@ -129,8 +129,14 @@ func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*typ return <-ch, nil } +// SendAsync simply wraps Send to make it non blocking. It does not guarantee transaction nonce ordering, +// unlike the production txMgr. func (s *stubTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) { - panic("unimplemented") + go func() { + receipt, err := s.Send(ctx, candidate) + resp := txmgr.SendResponse{Receipt: receipt, Err: err} + ch <- resp + }() } func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt { diff --git a/op-service/txmgr/queue.go b/op-service/txmgr/queue.go index ee7a03ffa9288..c48687d459ece 100644 --- a/op-service/txmgr/queue.go +++ b/op-service/txmgr/queue.go @@ -51,17 +51,36 @@ func (q *Queue[T]) Wait() error { return q.group.Wait() } +// handleResponse will wait for the response on the first passed channel, +// and then forward it on the second passed channel (attaching the id). It returns +// the response error or the context error if the context is canceled. +func handleResponse[T any](ctx context.Context, c chan SendResponse, d chan TxReceipt[T], id T) error { + select { + case response := <-c: + d <- TxReceipt[T]{ID: id, Receipt: response.Receipt, Err: response.Err} + return response.Err + case <-ctx.Done(): + d <- TxReceipt[T]{ID: id, Err: ctx.Err()} + return ctx.Err() + } +} + // Send will wait until the number of pending txs is below the max pending, -// and then send the next tx. +// and then send the next tx asynchronously. The nonce of the transaction is +// determined synchronously, so transactions should be confirmed on chain in +// the order they are sent using this method. // // The actual tx sending is non-blocking, with the receipt returned on the // provided receipt channel. If the channel is unbuffered, the goroutine is // blocked from completing until the channel is read from. func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) { group, ctx := q.groupContext() - group.Go(func() error { - return q.sendTx(ctx, id, candidate, receiptCh) - }) + responseChan := make(chan SendResponse, 1) + handleResponse := func() error { + return handleResponse(ctx, responseChan, receiptCh, id) + } + group.Go(handleResponse) // This blocks until the number of handlers is below the limit + q.txMgr.SendAsync(ctx, candidate, responseChan) // Nonce management handled synchronously, i.e. before this returns } // TrySend sends the next tx, but only if the number of pending txs is below the @@ -75,19 +94,17 @@ func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T] // blocked from completing until the channel is read from. func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool { group, ctx := q.groupContext() - return group.TryGo(func() error { - return q.sendTx(ctx, id, candidate, receiptCh) - }) -} - -func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error { - receipt, err := q.txMgr.Send(ctx, candidate) - receiptCh <- TxReceipt[T]{ - ID: id, - Receipt: receipt, - Err: err, + responseChan := make(chan SendResponse, 1) + handleResponse := func() error { + return handleResponse(ctx, responseChan, receiptCh, id) + } + ok := group.TryGo(handleResponse) + if !ok { + return false + } else { + q.txMgr.SendAsync(ctx, candidate, responseChan) + return true } - return err } // groupContext returns a Group and a Context to use when sending a tx.