Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 137 additions & 144 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ type EthAPIBackend struct {
pendingBlockMutex sync.RWMutex
pendingBlocks []*types.Block
pendingBlockSlot uint64

// Reconciliation and quarantine for stray pool txs
quarantineMutex sync.RWMutex
quarantinedPoolHashes map[common.Hash]struct{}
reconcileMu sync.Mutex
lastReconcile time.Time
reconcileOnce sync.Once
}

// isXtAborted queries the sequencer coordinator (protocol layer) to determine
Expand Down Expand Up @@ -1177,136 +1184,6 @@ func (b *EthAPIBackend) PrepareSequencerTransactionsForBlock(ctx context.Context
return nil
}

// prepareAndInjectSequencerTransactions re-signs putInbox transactions with fresh nonces
// from the txpool and injects all sequencer transactions.
// SSV
func (b *EthAPIBackend) prepareAndInjectSequencerTransactions(ctx context.Context) error {
if b.txTracker == nil {
return nil
}

type preparedTx struct {
signedTx *types.Transaction
oldHash common.Hash
newHash common.Hash
oldNonce uint64
newNonce uint64
}
var preparedPutInbox []preparedTx

b.sequencerTxMutex.Lock()
putInboxTxs := b.txTracker.transactionsByKind(sequencerTxPutInbox)
if len(putInboxTxs) > 0 {
currentNonce := b.eth.txPool.PoolNonce(b.coordinatorAddr)
log.Info("[SSV] Preparing putInbox transactions with fresh nonces",
"count", len(putInboxTxs),
"startNonce", currentNonce,
"coordinatorAddr", b.coordinatorAddr.Hex())

signer := types.NewLondonSigner(b.ChainConfig().ChainID)

for i, oldTx := range putInboxTxs {
nonce := currentNonce + uint64(i)

var newTx *types.Transaction
if oldTx.Type() == types.DynamicFeeTxType {
newInner := &types.DynamicFeeTx{
ChainID: oldTx.ChainId(),
Nonce: nonce,
GasTipCap: oldTx.GasTipCap(),
GasFeeCap: oldTx.GasFeeCap(),
Gas: oldTx.Gas(),
To: oldTx.To(),
Value: oldTx.Value(),
Data: oldTx.Data(),
AccessList: oldTx.AccessList(),
}
newTx = types.NewTx(newInner)
} else {
log.Error("[SSV] Unexpected tx type for putInbox", "type", oldTx.Type(), "txHash", oldTx.Hash().Hex())
continue
}

signedTx, err := types.SignTx(newTx, signer, b.coordinatorKey)
if err != nil {
log.Error("[SSV] Failed to re-sign putInbox tx", "err", err, "nonce", nonce)
continue
}

oldHash := oldTx.Hash()
newHash := signedTx.Hash()

if record := b.txTracker.record(oldHash); record != nil {
delete(b.txTracker.records, oldHash)
record.tx = signedTx
b.txTracker.records[newHash] = record

for idx, hash := range b.txTracker.staged {
if hash == oldHash {
b.txTracker.staged[idx] = newHash
break
}
}
}

preparedPutInbox = append(preparedPutInbox, preparedTx{
signedTx: signedTx,
oldHash: oldHash,
newHash: newHash,
oldNonce: oldTx.Nonce(),
newNonce: nonce,
})
}
}

originalTxs := b.txTracker.transactionsByKind(sequencerTxOriginal)
b.sequencerTxMutex.Unlock()

for _, prep := range preparedPutInbox {
if err := b.sendTx(ctx, prep.signedTx); err != nil {
reason := reasonForGrep(err)
log.Warn("[SSV] Failed to inject re-signed putInbox tx",
"err", err,
"oldHash", prep.oldHash.Hex(),
"newHash", prep.newHash.Hex(),
"oldNonce", prep.oldNonce,
"newNonce", prep.newNonce,
"reason", reason)
} else {
log.Info("[SSV] Re-signed and injected putInbox tx",
"oldHash", prep.oldHash.Hex(),
"newHash", prep.newHash.Hex(),
"oldNonce", prep.oldNonce,
"newNonce", prep.newNonce)
}
}

if len(originalTxs) > 0 {
log.Info("[SSV] Injecting original transactions into pool", "count", len(originalTxs))

for _, tx := range originalTxs {
if err := b.sendTx(ctx, tx); err != nil {
reason := reasonForGrep(err)
log.Warn("[SSV] Failed to inject original tx",
"err", err,
"txHash", tx.Hash().Hex(),
"nonce", tx.Nonce(),
"reason", reason)
} else {
log.Info("[SSV] Injected original tx",
"txHash", tx.Hash().Hex(),
"nonce", tx.Nonce())
}
}
}

return nil
}

// GetOrderedTransactionsForBlock returns only sequencer-managed transactions in
// the correct order for block inclusion. Normal mempool transactions are
// included by the miner after this list, and must not be returned here.
// SSV
func (b *EthAPIBackend) GetOrderedTransactionsForBlock(ctx context.Context) (types.Transactions, error) {
if b.coordinator == nil {
txs, _, _ := b.assembleSequencerBundle()
Expand All @@ -1320,11 +1197,9 @@ func (b *EthAPIBackend) GetOrderedTransactionsForBlock(ctx context.Context) (typ
// During coordination, exclude cross-chain txs - they'll be included after decision
return types.Transactions{}, nil
case sequencer.StateBuildingFree, sequencer.StateSubmission:
// Re-sign putInbox transactions with fresh nonces and inject into pool.
if err := b.prepareAndInjectSequencerTransactions(ctx); err != nil {
log.Error("[SSV] Failed to prepare sequencer transactions", "err", err)
if b.coordinatorAddr != (common.Address{}) {
b.reconcileOnce.Do(func() { b.reconcileSequencerNonce(ctx, b.coordinatorAddr) })
}

// After SCP completes (BuildingFree) or during final submission, include ready transactions
// This ensures transactions are committed in the first possible block after simulation/decision
txs, orderedRecords, pendingBundles := b.assembleSequencerBundle()
Expand Down Expand Up @@ -1394,24 +1269,75 @@ func (b *EthAPIBackend) assembleSequencerBundle() (types.Transactions, []sequenc
return types.Transactions{}, ready, pending
}

txs := make(types.Transactions, 0, len(ready))
// State-based continuity guard: ensure contiguous nonces per account in this block
filteredReady := make([]sequencerBundleEntry, 0, len(ready))
accountNonces := make(map[common.Address]uint64)
blocked := make(map[common.Address]struct{})
signer := types.LatestSignerForChainID(b.ChainConfig().ChainID)

stateDB, err := b.eth.blockchain.State()
if err != nil {
// Fallback pass-through if state not available
txs := make(types.Transactions, 0, len(ready))
for _, entry := range ready {
if entry.tx != nil {
txs = append(txs, entry.tx)
}
}
return txs, ready, pending
}

for _, entry := range ready {
if entry.tx == nil {
continue
}

if entry.xtID != "" && b.isXtAborted(entry.xtID) {
log.Info("[SSV] Skipping aborted XT in bundle assembly",
"xtID", entry.xtID,
"txHash", entry.tx.Hash().Hex(),
"nonce", entry.tx.Nonce())
"xtID", entry.xtID, "txHash", entry.tx.Hash().Hex(), "nonce", entry.tx.Nonce())
continue
}

txs = append(txs, entry.tx)
from, err := types.Sender(signer, entry.tx)
if err != nil {
log.Warn("[SSV] Failed to extract sender", "txHash", entry.tx.Hash().Hex(), "error", err)
continue
}
if _, stop := blocked[from]; stop {
continue
}
if _, ok := accountNonces[from]; !ok {
accountNonces[from] = stateDB.GetNonce(from)
}
expected := accountNonces[from]
txNonce := entry.tx.Nonce()
if txNonce == expected {
filteredReady = append(filteredReady, entry)
accountNonces[from] = expected + 1
continue
}
if txNonce < expected {
log.Info("[SSV] Skipping transaction with old nonce",
"txHash", entry.tx.Hash().Hex(), "from", from.Hex(),
"txNonce", txNonce, "expectedNonce", expected,
"xtID", entry.xtID, "kind", entry.kind.String())
continue
}
log.Warn("[SSV] Nonce gap detected, stopping account transactions",
"from", from.Hex(), "txNonce", txNonce, "expectedNonce", expected,
"gap", txNonce-expected, "txHash", entry.tx.Hash().Hex(),
"xtID", entry.xtID, "kind", entry.kind.String())
blocked[from] = struct{}{}
if from == b.coordinatorAddr {
go b.reconcileSequencerNonce(context.Background(), from)
}
}

return txs, ready, pending
txs := make(types.Transactions, 0, len(filteredReady))
for _, entry := range filteredReady {
if entry.tx != nil {
txs = append(txs, entry.tx)
}
}
return txs, filteredReady, pending
}

// buildSequencerOnlyList assembles only the sequencer-managed transactions preserving
Expand Down Expand Up @@ -2487,10 +2413,27 @@ func (b *EthAPIBackend) simulateXTRequestForSBCP(
if len(allFulfilledDeps) > 0 {
log.Info("[SSV] Creating putInbox transactions for fulfilled dependencies", "count", len(allFulfilledDeps))

poolNonce, err := b.GetPoolNonce(ctx, b.coordinatorAddr)
// Dedicated mempool: derive nonce from state + locally staged coordinator putInbox count
stateDB, err := b.eth.blockchain.State()
if err != nil {
return false, fmt.Errorf("failed to get nonce: %w", err)
return false, fmt.Errorf("failed to get state for nonce: %w", err)
}
baseNonce := stateDB.GetNonce(b.coordinatorAddr)
signer := types.LatestSignerForChainID(b.ChainConfig().ChainID)
staged := 0
b.sequencerTxMutex.RLock()
if b.txTracker != nil {
for _, rec := range b.txTracker.records {
if rec == nil || rec.tx == nil || rec.kind != sequencerTxPutInbox {
continue
}
if s, err := types.Sender(signer, rec.tx); err == nil && s == b.coordinatorAddr {
staged++
}
}
}
b.sequencerTxMutex.RUnlock()
poolNonce := baseNonce + uint64(staged)

for _, dep := range allFulfilledDeps {
putInboxTx, err := mailboxProcessor.createPutInboxTx(dep, poolNonce)
Expand Down Expand Up @@ -2610,3 +2553,53 @@ func (b *EthAPIBackend) simulateXTRequestForSBCP(

return allSuccessful, nil
}

// Reconcile coordinator's pool view: quarantine unknown txs and mark rejected
func (b *EthAPIBackend) reconcileSequencerNonce(ctx context.Context, addr common.Address) {
b.reconcileMu.Lock()
if time.Since(b.lastReconcile) < 2*time.Second {
b.reconcileMu.Unlock()
return
}
b.lastReconcile = time.Now()
b.reconcileMu.Unlock()

pend, queued := b.eth.txPool.ContentFrom(addr)
quarantined := 0
mark := func(h common.Hash) {
b.quarantineMutex.Lock()
if b.quarantinedPoolHashes == nil {
b.quarantinedPoolHashes = make(map[common.Hash]struct{})
}
if _, ok := b.quarantinedPoolHashes[h]; !ok {
b.quarantinedPoolHashes[h] = struct{}{}
quarantined++
}
b.quarantineMutex.Unlock()
}
handle := func(list []*types.Transaction) {
for _, tx := range list {
if tx == nil {
continue
}
h := tx.Hash()
b.sequencerTxMutex.RLock()
known := b.txTracker != nil && b.txTracker.record(h) != nil
b.sequencerTxMutex.RUnlock()
if !known {
if poolTx := b.eth.txPool.Get(h); poolTx != nil {
poolTx.SetRejected()
}
mark(h)
}
}
}
handle(pend)
handle(queued)
if quarantined > 0 {
log.Warn("[SSV] Reconciled coordinator pool txs; quarantined unknown entries", "addr", addr.Hex(), "quarantined", quarantined)
if miner := b.eth.miner; miner != nil {
miner.InvalidatePendingCache()
}
}
}