Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion op-challenger/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,14 @@ func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*typ
return <-ch, nil
}

// SendAsync simply wraps Send to make it non blocking. It does not guarantee transaction nonce ordering,
// unlike the production txMgr.
func (s *stubTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
go func() {
receipt, err := s.Send(ctx, candidate)
resp := txmgr.SendResponse{Receipt: receipt, Err: err}
ch <- resp
}()
}

func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt {
Expand Down
49 changes: 33 additions & 16 deletions op-service/txmgr/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,36 @@ func (q *Queue[T]) Wait() error {
return q.group.Wait()
}

// handleResponse will wait for the response on the first passed channel,
// and then forward it on the second passed channel (attaching the id). It returns
// the response error or the context error if the context is canceled.
func handleResponse[T any](ctx context.Context, c chan SendResponse, d chan TxReceipt[T], id T) error {
select {
case response := <-c:
d <- TxReceipt[T]{ID: id, Receipt: response.Receipt, Err: response.Err}
return response.Err
case <-ctx.Done():
d <- TxReceipt[T]{ID: id, Err: ctx.Err()}
return ctx.Err()
}
}

// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx.
// and then send the next tx asynchronously. The nonce of the transaction is
// determined synchronously, so transactions should be confirmed on chain in
// the order they are sent using this method.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext()
group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
responseChan := make(chan SendResponse, 1)
handleResponse := func() error {
return handleResponse(ctx, responseChan, receiptCh, id)
}
group.Go(handleResponse) // This blocks until the number of handlers is below the limit
q.txMgr.SendAsync(ctx, candidate, responseChan) // Nonce management handled synchronously, i.e. before this returns
}

// TrySend sends the next tx, but only if the number of pending txs is below the
Expand All @@ -75,19 +94,17 @@ func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]
// blocked from completing until the channel is read from.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
group, ctx := q.groupContext()
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}

func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{
ID: id,
Receipt: receipt,
Err: err,
responseChan := make(chan SendResponse, 1)
handleResponse := func() error {
return handleResponse(ctx, responseChan, receiptCh, id)
}
ok := group.TryGo(handleResponse)
if !ok {
return false
} else {
q.txMgr.SendAsync(ctx, candidate, responseChan)
return true
}
return err
}

// groupContext returns a Group and a Context to use when sending a tx.
Expand Down