Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
a800a16
use optimistic head event in txpool's maintanance loop
bharath-123 Sep 24, 2024
0fac052
fix tests
bharath-123 Oct 24, 2024
c62b0bd
add logic to clear mempool
bharath-123 Sep 26, 2024
e09cfde
remove invalids
bharath-123 Sep 26, 2024
878c5c0
add comments
bharath-123 Sep 26, 2024
f2fdd0f
remove mempool conditionals
bharath-123 Oct 16, 2024
6a8e446
split out reset logic and head only reset logc
bharath-123 Oct 24, 2024
01be512
ensure that unreserved addresses are removed
bharath-123 Nov 10, 2024
2051f8a
add event for mempool clearance
bharath-123 Nov 10, 2024
864138e
update subscription interfaces
bharath-123 Sep 27, 2024
f7bb86f
fix potential panic while subscribing to mempool clearance
bharath-123 Nov 4, 2024
9a352f1
dont send mempool clearing event while holding mempool lock
bharath-123 Nov 10, 2024
7dc78f5
implement stream execute optimistic block
bharath-123 Oct 29, 2024
f311e0a
unit tests
bharath-123 Nov 4, 2024
b0ec901
use generics to implement mock bi directional stream
bharath-123 Oct 1, 2024
56dc5cd
wip
bharath-123 Oct 16, 2024
98edfc9
use an atomic pointer for sequencer block hash
bharath-123 Oct 29, 2024
c929957
reduce mempool clearing timeout
bharath-123 Oct 24, 2024
a1152d7
fix imports
bharath-123 Oct 29, 2024
17a333c
update grpc method names
bharath-123 Nov 5, 2024
f1356b2
only allow 1 client to be connected to the execute optimistic block s…
bharath-123 Nov 12, 2024
e37bba3
rename executeBlockStreamConnected to executeOptimisticBlockStreamCon…
bharath-123 Nov 12, 2024
db067e1
remove restrictions to allow just one client to connect to the optimi…
bharath-123 Dec 2, 2024
c529b84
implement bundle streaming
bharath-123 Nov 5, 2024
bea3622
update unit tests
bharath-123 Nov 5, 2024
35f3a6c
only send the effective tip as part of the fee
bharath-123 Oct 1, 2024
4b612d6
minor test updates
bharath-123 Oct 16, 2024
5acc678
rename grpc methods
bharath-123 Nov 5, 2024
e57148b
close the bundle stream when client closes the connection
bharath-123 Nov 11, 2024
e4cf569
allow only 1 client to connect to the bundle stream
bharath-123 Dec 2, 2024
f795b75
fix minor error
bharath-123 Dec 2, 2024
0c02e8c
fetch the next fee recipient under the block execution lock
bharath-123 Oct 11, 2024
a5fb8a5
validate txs before optimistically executing them
bharath-123 Oct 16, 2024
a114f0d
support uds endpoints for auctioneer
bharath-123 Oct 16, 2024
edfddc9
remove duplicate code
bharath-123 Oct 24, 2024
12e3ab2
add uds flag to options
bharath-123 Nov 4, 2024
14c097e
separate out execution api services and optimistic execution api serv…
bharath-123 Dec 2, 2024
2f6e1eb
minor updates
bharath-123 Oct 17, 2024
df7504f
minor nits
bharath-123 Oct 17, 2024
128be57
make tests more robust
bharath-123 Oct 17, 2024
99560fa
move BigIntoToProtoU128 to shared test utils
bharath-123 Oct 24, 2024
a62047e
minor updates
bharath-123 Oct 29, 2024
e5065e5
update grpc methods
bharath-123 Nov 5, 2024
0af0e92
close the bundle stream when client closes the connection
bharath-123 Nov 11, 2024
258c8e2
remove UDS references
bharath-123 Nov 12, 2024
54381f4
rename a wrongly renamed word
bharath-123 Nov 12, 2024
f28ac18
re add single client connection checks
bharath-123 Nov 12, 2024
12eec4d
maintain only 1 server instance
bharath-123 Nov 12, 2024
651a6ca
renaming
bharath-123 Nov 20, 2024
9bf573a
remove atomic bools to restrict client connections to 1
bharath-123 Dec 3, 2024
6abc0f4
feature flag auctioneer
bharath-123 Nov 12, 2024
e48458f
add flags
bharath-123 Oct 29, 2024
591e827
save
bharath-123 Nov 12, 2024
7444772
maintain a copy of legacy pool tests to test the cases when auctionee…
bharath-123 Nov 12, 2024
61af0ec
unmarshall auction result
bharath-123 Nov 13, 2024
c4662a7
add signature verification
bharath-123 Nov 13, 2024
64758bb
set the trusted builder public key in genesis
bharath-123 Nov 17, 2024
6985314
add some tests for the auction results
bharath-123 Nov 17, 2024
3661b01
dilineate trusted builder public keys by block number
bharath-123 Nov 19, 2024
9e0e612
renaming
bharath-123 Nov 20, 2024
f4bc725
update protos
bharath-123 Nov 20, 2024
2992b5b
change trusted builder instances to auctioneer
bharath-123 Nov 21, 2024
a82a00e
fix typo
bharath-123 Nov 21, 2024
9809d0c
add an api to query the optimistic block
bharath-123 Nov 20, 2024
e6f1345
support querying optimistic block using the optimistic string
bharath-123 Nov 21, 2024
2561dc3
close the stream when we get the done signal
bharath-123 Nov 27, 2024
df8ead5
add debug logs for when the stream rpcs are called
bharath-123 Dec 3, 2024
7aed2a0
add some logs
bharath-123 Nov 27, 2024
634e962
add some logs
bharath-123 Dec 3, 2024
e5d90d2
use atomic pointer for fee recipient
bharath-123 Dec 3, 2024
9e7abe7
remove unecessary lock
bharath-123 Dec 3, 2024
f7d5584
wrap errors
bharath-123 Dec 3, 2024
1457ce3
buffer the reserved addresses and remove them at once
bharath-123 Jan 3, 2025
b6e5401
avoid cleaning up duplicate addresses
bharath-123 Jan 6, 2025
1cd5701
add metrics
bharath-123 Jan 3, 2025
b555cc9
update depot token
bharath-123 Jan 9, 2025
3284998
add a few debug logs
bharath-123 Jan 10, 2025
17dbebd
update to use new protos
bharath-123 Jan 13, 2025
1c1a935
update protobufs
bharath-123 Jan 13, 2025
5b99a85
Merge pull request #28 from astriaorg/bharath/update-protos
bharath-123 Jan 13, 2025
d5628b5
Merge pull request #25 from astriaorg/bharath/add-auctioneer-metrics
bharath-123 Jan 13, 2025
e49d7f4
Merge pull request #24 from astriaorg/bharath/code-cleanups
bharath-123 Jan 13, 2025
98e817c
Merge pull request #22 from astriaorg/bharath/api-to-query-optimistic…
bharath-123 Jan 13, 2025
2b9a863
Merge pull request #21 from astriaorg/bharath/auction-result
bharath-123 Jan 13, 2025
e384e18
Merge pull request #17 from astriaorg/bharath/feature-flag-auctioneer
bharath-123 Jan 13, 2025
9a7f5ad
Merge pull request #16 from astriaorg/bharath/refactor-services
bharath-123 Jan 13, 2025
43fdcea
Merge pull request #12 from astriaorg/bharath/implement-bundle-streaming
bharath-123 Jan 13, 2025
9496cb9
Merge pull request #11 from astriaorg/bharath/implement-optimistic-ex…
bharath-123 Jan 13, 2025
41fb511
Merge pull request #9 from astriaorg/bharath/add-mempool-clearing-event
bharath-123 Jan 13, 2025
11cebfd
Merge pull request #8 from astriaorg/bharath/clear-mempool
bharath-123 Jan 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/astria-build-and-publish-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ jobs:
push: true
tags: ${{ steps.metadata.outputs.tags }}
labels: ${{ steps.metadata.outputs.labels }}
project: w2d6w0spqz
project: w2d6w0spqz
2 changes: 2 additions & 0 deletions cmd/devp2p/internal/ethtest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestEthSuite(t *testing.T) {
if err != nil {
t.Fatalf("could not create new test suite: %v", err)
}

for _, test := range suite.EthTests() {
t.Run(test.Name, func(t *testing.T) {
if test.Slow && testing.Short() {
Expand Down Expand Up @@ -149,5 +150,6 @@ func setupGeth(stack *node.Node, dir string) error {
return fmt.Errorf("failed to register catalyst service: %v", err)
}
_, err = backend.BlockChain().InsertChain(chain.blocks[1:])
backend.BlockChain().SetOptimistic(chain.blocks[len(chain.blocks)-1])
return err
}
13 changes: 10 additions & 3 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/grpc/optimistic"
"github.com/ethereum/go-ethereum/grpc/shared"
"os"
"reflect"
"runtime"
Expand Down Expand Up @@ -206,11 +208,16 @@ func makeFullNode(ctx *cli.Context) *node.Node {

// Configure gRPC if requested.
if ctx.IsSet(utils.GRPCEnabledFlag.Name) {
serviceV1, err := execution.NewExecutionServiceServerV1(eth)
sharedService, err := shared.NewSharedServiceContainer(eth)
if err != nil {
utils.Fatalf("failed to create execution service: %v", err)
utils.Fatalf("failed to create shared service container: %v", err)
}
utils.RegisterGRPCExecutionService(stack, serviceV1, &cfg.Node)

serviceV1a2 := execution.NewExecutionServiceServerV1(sharedService)

optimisticServiceV1a1 := optimistic.NewOptimisticServiceV1Alpha(sharedService)

utils.RegisterGRPCServices(stack, serviceV1a2, optimisticServiceV1a1, optimisticServiceV1a1, &cfg.Node)
}

// Add the Ethereum Stats daemon if requested.
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ var (
utils.MinerRecommitIntervalFlag,
utils.MinerPendingFeeRecipientFlag,
utils.MinerNewPayloadTimeoutFlag, // deprecated
utils.AuctioneerEnabledFlag,
utils.NATFlag,
utils.NoDiscoverFlag,
utils.DiscoveryV4Flag,
Expand Down
20 changes: 17 additions & 3 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package utils

import (
optimisticGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/auction/v1alpha1/auctionv1alpha1grpc"
"context"
"crypto/ecdsa"
"encoding/hex"
Expand Down Expand Up @@ -769,6 +770,13 @@ var (
Category: flags.APICategory,
}

// auctioneer
AuctioneerEnabledFlag = &cli.BoolFlag{
Name: "auctioneer",
Usage: "Enable the auctioneer server",
Category: flags.MinerCategory,
}

// Network Settings
MaxPeersFlag = &cli.IntFlag{
Name: "maxpeers",
Expand Down Expand Up @@ -1438,6 +1446,12 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
SetDataDir(ctx, cfg)
setSmartCard(ctx, cfg)

if ctx.Bool(AuctioneerEnabledFlag.Name) {
cfg.EnableAuctioneer = true
} else {
cfg.EnableAuctioneer = false
}

if ctx.IsSet(JWTSecretFlag.Name) {
cfg.JWTSecret = ctx.String(JWTSecretFlag.Name)
}
Expand Down Expand Up @@ -1987,10 +2001,10 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst
}
}

// RegisterGRPCExecutionService adds the gRPC API to the node.
// RegisterGRPCServices adds the gRPC API to the node.
// It was done this way so that our grpc execution server can access the ethapi.Backend
func RegisterGRPCExecutionService(stack *node.Node, execServ astriaGrpc.ExecutionServiceServer, cfg *node.Config) {
if err := node.NewGRPCServerHandler(stack, execServ, cfg); err != nil {
func RegisterGRPCServices(stack *node.Node, execServ astriaGrpc.ExecutionServiceServer, optimisticExecutionServ optimisticGrpc.OptimisticExecutionServiceServer, auctionServiceServer optimisticGrpc.AuctionServiceServer, cfg *node.Config) {
if err := node.NewGRPCServerHandler(stack, execServ, optimisticExecutionServ, auctionServiceServer, cfg); err != nil {
Fatalf("Failed to register the gRPC service: %v", err)
}
}
Expand Down
6 changes: 6 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// NewMempoolClearedEvent is posted when the mempool is cleared after a head reset for trusted auctioneer
type NewMempoolCleared struct {
// the new head to which the mempool state was reset to before clearing the mempool
NewHead *types.Header
}

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

Expand Down
4 changes: 4 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,10 @@ func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool
}
}

func (p *BlobPool) SubscribeMempoolClearance(ch chan<- core.NewMempoolCleared) event.Subscription {
return nil
}

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
func (p *BlobPool) Nonce(addr common.Address) uint64 {
Expand Down
148 changes: 122 additions & 26 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,14 @@ func (config *Config) sanitize() Config {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type LegacyPool struct {
config Config
chainconfig *params.ChainConfig
chain BlockChain
gasTip atomic.Pointer[uint256.Int]
txFeed event.Feed
signer types.Signer
mu sync.RWMutex
config Config
chainconfig *params.ChainConfig
chain BlockChain
gasTip atomic.Pointer[uint256.Int]
txFeed event.Feed
mempoolClearFeed event.Feed
signer types.Signer
mu sync.RWMutex

astria *astriaOrdered

Expand All @@ -238,6 +239,8 @@ type LegacyPool struct {
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

auctioneerEnabled bool
}

type txpoolResetRequest struct {
Expand All @@ -246,26 +249,27 @@ type txpoolResetRequest struct {

// New creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func New(config Config, chain BlockChain) *LegacyPool {
func New(config Config, chain BlockChain, auctioneerEnabled bool) *LegacyPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()

// Create the transaction pool with its initial settings
pool := &LegacyPool{
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
auctioneerEnabled: auctioneerEnabled,
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -521,6 +525,12 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs
return pool.txFeed.Subscribe(ch)
}

// SubscribeTransactions registers a subscription for the event which is triggered
// when the mempool is cleared after a reset
func (pool *LegacyPool) SubscribeMempoolClearance(ch chan<- core.NewMempoolCleared) event.Subscription {
return pool.mempoolClearFeed.Subscribe(ch)
}

// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (pool *LegacyPool) SetGasTip(tip *big.Int) {
Expand Down Expand Up @@ -1366,8 +1376,16 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
pool.mu.Lock()
if reset != nil {
// Reset from the old head to the new, rescheduling any reorged transactions
pool.reset(reset.oldHead, reset.newHead)
// only reset the state root and the head of the txpool when we are running the auctioneer node.
// when we are not running the auctioneer node, we re-inject any re-orged transactions which is similar
// to the current functionality of geth
if pool.auctioneerEnabled {
// only reset from the old head to the new head
pool.resetHeadOnly(reset.oldHead, reset.newHead)
} else {
// Reset from the old head to the new, rescheduling any reorged transactions
pool.reset(reset.oldHead, reset.newHead)
}

// Nonces were reset, discard any events that became stale
for addr := range events {
Expand All @@ -1376,7 +1394,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
delete(events, addr)
}
}
// Reset needs promote for all addresses
promoteAddrs = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
promoteAddrs = append(promoteAddrs, addr)
Expand All @@ -1389,7 +1406,13 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
if reset != nil {
pool.demoteUnexecutables()
if pool.auctioneerEnabled {
// if we are running the pool as an auctioneer, then we should clear the mempool each time the head
// is reset
pool.clearPendingAndQueued(reset.newHead)
} else {
pool.demoteUnexecutables()
}
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee := eip1559.CalcBaseFee(pool.chainconfig, reset.newHead)
Expand All @@ -1414,6 +1437,10 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.changesSinceReorg = 0 // Reset change counter
pool.mu.Unlock()

if reset != nil {
pool.mempoolClearFeed.Send(core.NewMempoolCleared{NewHead: reset.newHead})
}

// Notify subsystems for newly added transactions
for _, tx := range promoted {
addr, _ := types.Sender(pool.signer, tx)
Expand Down Expand Up @@ -1511,6 +1538,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
}
}
}

// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock() // Special case during testing
Expand All @@ -1524,12 +1552,30 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)

// we don't care about these
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
}

// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *LegacyPool) resetHeadOnly(oldHead, newHead *types.Header) {
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset txpool state", "err", err)
return
}
pool.currentHead.Store(newHead)
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)
}

// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
Expand Down Expand Up @@ -1732,6 +1778,55 @@ func (pool *LegacyPool) truncateQueue() {
}
}

// clearPendingAndQueued removes invalid and processed transactions from the pools
// it assumes that the pool lock is being held
func (pool *LegacyPool) clearPendingAndQueued(newHead *types.Header) {
// Iterate over all accounts and demote any non-executable transactions
addrsForWhichTxsRemoved := map[common.Address]bool{}

for addr, list := range pool.pending {
dropped, invalids := list.ClearList()

pendingGauge.Dec(int64(dropped.Len() + invalids.Len()))

for _, tx := range dropped {
pool.all.Remove(tx.Hash())
}
for _, tx := range invalids {
pool.all.Remove(tx.Hash())
}

if list.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)

addrsForWhichTxsRemoved[addr] = true
}
}

for addr, list := range pool.queue {
dropped, invalids := list.ClearList()
queuedGauge.Dec(int64(dropped.Len() + invalids.Len()))

for _, tx := range dropped {
pool.all.Remove(tx.Hash())
}
for _, tx := range invalids {
pool.all.Remove(tx.Hash())
}

if list.Empty() {
delete(pool.queue, addr)

addrsForWhichTxsRemoved[addr] = true
}
}

for addr := range addrsForWhichTxsRemoved {
pool.reserve(addr, false)
}
}

// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
Expand All @@ -1742,6 +1837,7 @@ func (pool *LegacyPool) truncateQueue() {
func (pool *LegacyPool) demoteUnexecutables() {
// Iterate over all accounts and demote any non-executable transactions
gasLimit := pool.currentHead.Load().GasLimit

for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)

Expand Down
Loading