diff --git a/eth/api_backend.go b/eth/api_backend.go index 1e064280ef..01622e1d82 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -144,6 +144,13 @@ type EthAPIBackend struct { pendingBlockMutex sync.RWMutex pendingBlocks []*types.Block pendingBlockSlot uint64 + + // Reconciliation and quarantine for stray pool txs + quarantineMutex sync.RWMutex + quarantinedPoolHashes map[common.Hash]struct{} + reconcileMu sync.Mutex + lastReconcile time.Time + reconcileOnce sync.Once } // isXtAborted queries the sequencer coordinator (protocol layer) to determine @@ -1177,136 +1184,6 @@ func (b *EthAPIBackend) PrepareSequencerTransactionsForBlock(ctx context.Context return nil } -// prepareAndInjectSequencerTransactions re-signs putInbox transactions with fresh nonces -// from the txpool and injects all sequencer transactions. -// SSV -func (b *EthAPIBackend) prepareAndInjectSequencerTransactions(ctx context.Context) error { - if b.txTracker == nil { - return nil - } - - type preparedTx struct { - signedTx *types.Transaction - oldHash common.Hash - newHash common.Hash - oldNonce uint64 - newNonce uint64 - } - var preparedPutInbox []preparedTx - - b.sequencerTxMutex.Lock() - putInboxTxs := b.txTracker.transactionsByKind(sequencerTxPutInbox) - if len(putInboxTxs) > 0 { - currentNonce := b.eth.txPool.PoolNonce(b.coordinatorAddr) - log.Info("[SSV] Preparing putInbox transactions with fresh nonces", - "count", len(putInboxTxs), - "startNonce", currentNonce, - "coordinatorAddr", b.coordinatorAddr.Hex()) - - signer := types.NewLondonSigner(b.ChainConfig().ChainID) - - for i, oldTx := range putInboxTxs { - nonce := currentNonce + uint64(i) - - var newTx *types.Transaction - if oldTx.Type() == types.DynamicFeeTxType { - newInner := &types.DynamicFeeTx{ - ChainID: oldTx.ChainId(), - Nonce: nonce, - GasTipCap: oldTx.GasTipCap(), - GasFeeCap: oldTx.GasFeeCap(), - Gas: oldTx.Gas(), - To: oldTx.To(), - Value: oldTx.Value(), - Data: oldTx.Data(), - AccessList: oldTx.AccessList(), - } - newTx = types.NewTx(newInner) - } else { - log.Error("[SSV] Unexpected tx type for putInbox", "type", oldTx.Type(), "txHash", oldTx.Hash().Hex()) - continue - } - - signedTx, err := types.SignTx(newTx, signer, b.coordinatorKey) - if err != nil { - log.Error("[SSV] Failed to re-sign putInbox tx", "err", err, "nonce", nonce) - continue - } - - oldHash := oldTx.Hash() - newHash := signedTx.Hash() - - if record := b.txTracker.record(oldHash); record != nil { - delete(b.txTracker.records, oldHash) - record.tx = signedTx - b.txTracker.records[newHash] = record - - for idx, hash := range b.txTracker.staged { - if hash == oldHash { - b.txTracker.staged[idx] = newHash - break - } - } - } - - preparedPutInbox = append(preparedPutInbox, preparedTx{ - signedTx: signedTx, - oldHash: oldHash, - newHash: newHash, - oldNonce: oldTx.Nonce(), - newNonce: nonce, - }) - } - } - - originalTxs := b.txTracker.transactionsByKind(sequencerTxOriginal) - b.sequencerTxMutex.Unlock() - - for _, prep := range preparedPutInbox { - if err := b.sendTx(ctx, prep.signedTx); err != nil { - reason := reasonForGrep(err) - log.Warn("[SSV] Failed to inject re-signed putInbox tx", - "err", err, - "oldHash", prep.oldHash.Hex(), - "newHash", prep.newHash.Hex(), - "oldNonce", prep.oldNonce, - "newNonce", prep.newNonce, - "reason", reason) - } else { - log.Info("[SSV] Re-signed and injected putInbox tx", - "oldHash", prep.oldHash.Hex(), - "newHash", prep.newHash.Hex(), - "oldNonce", prep.oldNonce, - "newNonce", prep.newNonce) - } - } - - if len(originalTxs) > 0 { - log.Info("[SSV] Injecting original transactions into pool", "count", len(originalTxs)) - - for _, tx := range originalTxs { - if err := b.sendTx(ctx, tx); err != nil { - reason := reasonForGrep(err) - log.Warn("[SSV] Failed to inject original tx", - "err", err, - "txHash", tx.Hash().Hex(), - "nonce", tx.Nonce(), - "reason", reason) - } else { - log.Info("[SSV] Injected original tx", - "txHash", tx.Hash().Hex(), - "nonce", tx.Nonce()) - } - } - } - - return nil -} - -// GetOrderedTransactionsForBlock returns only sequencer-managed transactions in -// the correct order for block inclusion. Normal mempool transactions are -// included by the miner after this list, and must not be returned here. -// SSV func (b *EthAPIBackend) GetOrderedTransactionsForBlock(ctx context.Context) (types.Transactions, error) { if b.coordinator == nil { txs, _, _ := b.assembleSequencerBundle() @@ -1320,11 +1197,9 @@ func (b *EthAPIBackend) GetOrderedTransactionsForBlock(ctx context.Context) (typ // During coordination, exclude cross-chain txs - they'll be included after decision return types.Transactions{}, nil case sequencer.StateBuildingFree, sequencer.StateSubmission: - // Re-sign putInbox transactions with fresh nonces and inject into pool. - if err := b.prepareAndInjectSequencerTransactions(ctx); err != nil { - log.Error("[SSV] Failed to prepare sequencer transactions", "err", err) + if b.coordinatorAddr != (common.Address{}) { + b.reconcileOnce.Do(func() { b.reconcileSequencerNonce(ctx, b.coordinatorAddr) }) } - // After SCP completes (BuildingFree) or during final submission, include ready transactions // This ensures transactions are committed in the first possible block after simulation/decision txs, orderedRecords, pendingBundles := b.assembleSequencerBundle() @@ -1394,24 +1269,75 @@ func (b *EthAPIBackend) assembleSequencerBundle() (types.Transactions, []sequenc return types.Transactions{}, ready, pending } - txs := make(types.Transactions, 0, len(ready)) + // State-based continuity guard: ensure contiguous nonces per account in this block + filteredReady := make([]sequencerBundleEntry, 0, len(ready)) + accountNonces := make(map[common.Address]uint64) + blocked := make(map[common.Address]struct{}) + signer := types.LatestSignerForChainID(b.ChainConfig().ChainID) + + stateDB, err := b.eth.blockchain.State() + if err != nil { + // Fallback pass-through if state not available + txs := make(types.Transactions, 0, len(ready)) + for _, entry := range ready { + if entry.tx != nil { + txs = append(txs, entry.tx) + } + } + return txs, ready, pending + } + for _, entry := range ready { if entry.tx == nil { continue } - if entry.xtID != "" && b.isXtAborted(entry.xtID) { log.Info("[SSV] Skipping aborted XT in bundle assembly", - "xtID", entry.xtID, - "txHash", entry.tx.Hash().Hex(), - "nonce", entry.tx.Nonce()) + "xtID", entry.xtID, "txHash", entry.tx.Hash().Hex(), "nonce", entry.tx.Nonce()) continue } - - txs = append(txs, entry.tx) + from, err := types.Sender(signer, entry.tx) + if err != nil { + log.Warn("[SSV] Failed to extract sender", "txHash", entry.tx.Hash().Hex(), "error", err) + continue + } + if _, stop := blocked[from]; stop { + continue + } + if _, ok := accountNonces[from]; !ok { + accountNonces[from] = stateDB.GetNonce(from) + } + expected := accountNonces[from] + txNonce := entry.tx.Nonce() + if txNonce == expected { + filteredReady = append(filteredReady, entry) + accountNonces[from] = expected + 1 + continue + } + if txNonce < expected { + log.Info("[SSV] Skipping transaction with old nonce", + "txHash", entry.tx.Hash().Hex(), "from", from.Hex(), + "txNonce", txNonce, "expectedNonce", expected, + "xtID", entry.xtID, "kind", entry.kind.String()) + continue + } + log.Warn("[SSV] Nonce gap detected, stopping account transactions", + "from", from.Hex(), "txNonce", txNonce, "expectedNonce", expected, + "gap", txNonce-expected, "txHash", entry.tx.Hash().Hex(), + "xtID", entry.xtID, "kind", entry.kind.String()) + blocked[from] = struct{}{} + if from == b.coordinatorAddr { + go b.reconcileSequencerNonce(context.Background(), from) + } } - return txs, ready, pending + txs := make(types.Transactions, 0, len(filteredReady)) + for _, entry := range filteredReady { + if entry.tx != nil { + txs = append(txs, entry.tx) + } + } + return txs, filteredReady, pending } // buildSequencerOnlyList assembles only the sequencer-managed transactions preserving @@ -2487,10 +2413,27 @@ func (b *EthAPIBackend) simulateXTRequestForSBCP( if len(allFulfilledDeps) > 0 { log.Info("[SSV] Creating putInbox transactions for fulfilled dependencies", "count", len(allFulfilledDeps)) - poolNonce, err := b.GetPoolNonce(ctx, b.coordinatorAddr) + // Dedicated mempool: derive nonce from state + locally staged coordinator putInbox count + stateDB, err := b.eth.blockchain.State() if err != nil { - return false, fmt.Errorf("failed to get nonce: %w", err) + return false, fmt.Errorf("failed to get state for nonce: %w", err) } + baseNonce := stateDB.GetNonce(b.coordinatorAddr) + signer := types.LatestSignerForChainID(b.ChainConfig().ChainID) + staged := 0 + b.sequencerTxMutex.RLock() + if b.txTracker != nil { + for _, rec := range b.txTracker.records { + if rec == nil || rec.tx == nil || rec.kind != sequencerTxPutInbox { + continue + } + if s, err := types.Sender(signer, rec.tx); err == nil && s == b.coordinatorAddr { + staged++ + } + } + } + b.sequencerTxMutex.RUnlock() + poolNonce := baseNonce + uint64(staged) for _, dep := range allFulfilledDeps { putInboxTx, err := mailboxProcessor.createPutInboxTx(dep, poolNonce) @@ -2610,3 +2553,53 @@ func (b *EthAPIBackend) simulateXTRequestForSBCP( return allSuccessful, nil } + +// Reconcile coordinator's pool view: quarantine unknown txs and mark rejected +func (b *EthAPIBackend) reconcileSequencerNonce(ctx context.Context, addr common.Address) { + b.reconcileMu.Lock() + if time.Since(b.lastReconcile) < 2*time.Second { + b.reconcileMu.Unlock() + return + } + b.lastReconcile = time.Now() + b.reconcileMu.Unlock() + + pend, queued := b.eth.txPool.ContentFrom(addr) + quarantined := 0 + mark := func(h common.Hash) { + b.quarantineMutex.Lock() + if b.quarantinedPoolHashes == nil { + b.quarantinedPoolHashes = make(map[common.Hash]struct{}) + } + if _, ok := b.quarantinedPoolHashes[h]; !ok { + b.quarantinedPoolHashes[h] = struct{}{} + quarantined++ + } + b.quarantineMutex.Unlock() + } + handle := func(list []*types.Transaction) { + for _, tx := range list { + if tx == nil { + continue + } + h := tx.Hash() + b.sequencerTxMutex.RLock() + known := b.txTracker != nil && b.txTracker.record(h) != nil + b.sequencerTxMutex.RUnlock() + if !known { + if poolTx := b.eth.txPool.Get(h); poolTx != nil { + poolTx.SetRejected() + } + mark(h) + } + } + } + handle(pend) + handle(queued) + if quarantined > 0 { + log.Warn("[SSV] Reconciled coordinator pool txs; quarantined unknown entries", "addr", addr.Hex(), "quarantined", quarantined) + if miner := b.eth.miner; miner != nil { + miner.InvalidatePendingCache() + } + } +}