diff --git a/.github/workflows/astria-build-and-publish-image.yml b/.github/workflows/astria-build-and-publish-image.yml index 73654354e..3329474bb 100644 --- a/.github/workflows/astria-build-and-publish-image.yml +++ b/.github/workflows/astria-build-and-publish-image.yml @@ -72,4 +72,4 @@ jobs: push: true tags: ${{ steps.metadata.outputs.tags }} labels: ${{ steps.metadata.outputs.labels }} - project: w2d6w0spqz \ No newline at end of file + project: w2d6w0spqz diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 15fd5e580..f2410a9ce 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -18,7 +18,7 @@ package utils import ( - optimisticGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/bundle/v1alpha1/bundlev1alpha1grpc" + optimisticGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/auction/v1alpha1/auctionv1alpha1grpc" "context" "crypto/ecdsa" "encoding/hex" @@ -2003,8 +2003,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst // RegisterGRPCServices adds the gRPC API to the node. // It was done this way so that our grpc execution server can access the ethapi.Backend -func RegisterGRPCServices(stack *node.Node, execServ astriaGrpc.ExecutionServiceServer, optimisticExecutionServ optimisticGrpc.OptimisticExecutionServiceServer, bundleStreamingServ optimisticGrpc.BundleServiceServer, cfg *node.Config) { - if err := node.NewGRPCServerHandler(stack, execServ, optimisticExecutionServ, bundleStreamingServ, cfg); err != nil { +func RegisterGRPCServices(stack *node.Node, execServ astriaGrpc.ExecutionServiceServer, optimisticExecutionServ optimisticGrpc.OptimisticExecutionServiceServer, auctionServiceServer optimisticGrpc.AuctionServiceServer, cfg *node.Config) { + if err := node.NewGRPCServerHandler(stack, execServ, optimisticExecutionServ, auctionServiceServer, cfg); err != nil { Fatalf("Failed to register the gRPC service: %v", err) } } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 81dfb958e..b1397292a 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -1782,9 +1782,12 @@ func (pool *LegacyPool) truncateQueue() { // it assumes that the pool lock is being held func (pool *LegacyPool) clearPendingAndQueued(newHead *types.Header) { // Iterate over all accounts and demote any non-executable transactions + addrsForWhichTxsRemoved := map[common.Address]bool{} + for addr, list := range pool.pending { dropped, invalids := list.ClearList() - pendingGauge.Dec(int64(len(dropped) + len(invalids))) + + pendingGauge.Dec(int64(dropped.Len() + invalids.Len())) for _, tx := range dropped { pool.all.Remove(tx.Hash()) @@ -1796,12 +1799,14 @@ func (pool *LegacyPool) clearPendingAndQueued(newHead *types.Header) { if list.Empty() { delete(pool.pending, addr) delete(pool.beats, addr) + + addrsForWhichTxsRemoved[addr] = true } } for addr, list := range pool.queue { dropped, invalids := list.ClearList() - queuedGauge.Dec(int64(len(dropped) + len(invalids))) + queuedGauge.Dec(int64(dropped.Len() + invalids.Len())) for _, tx := range dropped { pool.all.Remove(tx.Hash()) @@ -1811,12 +1816,15 @@ func (pool *LegacyPool) clearPendingAndQueued(newHead *types.Header) { } if list.Empty() { - if _, ok := pool.queue[addr]; !ok { - pool.reserve(addr, false) - } delete(pool.queue, addr) + + addrsForWhichTxsRemoved[addr] = true } } + + for addr := range addrsForWhichTxsRemoved { + pool.reserve(addr, false) + } } // demoteUnexecutables removes invalid and processed transactions from the pools diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 00620ec68..a1395bebc 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -689,21 +689,15 @@ func TestDropping(t *testing.T) { tx11 = transaction(11, 200, key) tx12 = transaction(12, 300, key) ) - pool.all.Add(tx0, false) - pool.priced.Put(tx0, false) - pool.promoteTx(account, tx0.Hash(), tx0) - pool.all.Add(tx1, false) - pool.priced.Put(tx1, false) - pool.promoteTx(account, tx1.Hash(), tx1) + pool.add(tx0, false) + pool.add(tx1, false) + pool.add(tx2, false) + pool.add(tx10, false) + pool.add(tx11, false) + pool.add(tx12, false) - pool.all.Add(tx2, false) - pool.priced.Put(tx2, false) - pool.promoteTx(account, tx2.Hash(), tx2) - - pool.enqueueTx(tx10.Hash(), tx10, false, true) - pool.enqueueTx(tx11.Hash(), tx11, false, true) - pool.enqueueTx(tx12.Hash(), tx12, false, true) + pool.promoteExecutables([]common.Address{account}) // Check that pre and post validations leave the pool as is if pool.pending[account].Len() != 3 { diff --git a/eth/api_backend.go b/eth/api_backend.go index 304904365..2b5c820c4 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -143,6 +143,13 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe } return b.eth.blockchain.GetBlock(header.Hash(), header.Number.Uint64()), nil } + if number == rpc.OptimisticBlockNumber { + header := b.eth.blockchain.CurrentOptimisticBlock() + if header == nil { + return nil, errors.New("optimistic block not found") + } + return b.eth.blockchain.GetBlock(header.Hash(), header.Number.Uint64()), nil + } return b.eth.blockchain.GetBlockByNumber(uint64(number)), nil } diff --git a/go.mod b/go.mod index 44aeb9066..55ab2532b 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/ethereum/go-ethereum go 1.21 require ( - buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-e09c7fd3fe26.1 - buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.35.2-00000000000000-e09c7fd3fe26.1 - buf.build/gen/go/astria/primitives/protocolbuffers/go v1.35.2-00000000000000-2f2e9ce53f59.1 - buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.35.2-00000000000000-0eda7df0ee38.1 + buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-42cbdd5aad4c.2 + buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.36.2-00000000000000-42cbdd5aad4c.1 + buf.build/gen/go/astria/primitives/protocolbuffers/go v1.36.2-00000000000000-9a039a6ed8db.1 + buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.36.2-00000000000000-e54e1c9ad405.1 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 github.com/Microsoft/go-winio v0.6.1 github.com/VictoriaMetrics/fastcache v1.12.1 @@ -79,7 +79,7 @@ require ( golang.org/x/time v0.5.0 golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d google.golang.org/grpc v1.64.1 - google.golang.org/protobuf v1.35.2 + google.golang.org/protobuf v1.36.2 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 5ad9748dd..992acbb76 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-1f40f333891d.2 h1:9rMXnvPR2EX56tMIqbhOK+DvqKjWb++p5s1/bookIl8= +buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-1f40f333891d.2/go.mod h1:hdCXwnxpMeoqXK5LCQ6gLMcmMLUDX8T9+hbxYrtj+wQ= +buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-42cbdd5aad4c.2 h1:W0lzc0sAzlzyKWWXLcuGW+GDsB9VRT+P/4ffP/hwJ4U= +buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-42cbdd5aad4c.2/go.mod h1:jXiXYlSxLrhrUCAIuLq4cVcfXydbsz9mRVftWx/8eGs= buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-cc31a327d543.1 h1:wOry49zAbse0G4mt2tFTwa4P2AUMuYCR/0mYcPrpcbs= buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-cc31a327d543.1/go.mod h1:+pVCkEpJNp2JtooS8NiydT7bO9+hu11XUZ5Z47DPtXo= buf.build/gen/go/astria/execution-apis/grpc/go v1.5.1-00000000000000-e09c7fd3fe26.1 h1:gS4erruX5XeMN0MZ7xe4JmEIR3uCWrvzG5HGV725WiI= @@ -10,14 +14,28 @@ buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.35.1-20241017141511 buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.35.1-20241017141511-7e4bcc0ebba5.1/go.mod h1:U4LUlabiYNYBd1pqYS9o8SsHjBRoEBysrfRVnebzJH0= buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.35.2-00000000000000-e09c7fd3fe26.1 h1:Twi169wrd7ssCnK27Bymlytv5LmvwFV0zhKhJ64nCYM= buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.35.2-00000000000000-e09c7fd3fe26.1/go.mod h1:PWzMbPHJ+Y31iNFrtSc5vy/wvm2805ZXyDZndzzFLa0= +buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.36.1-00000000000000-1f40f333891d.1 h1:CSMft5/33d/88j3ziC4zid4DOP7X1Xv71I6pW3BUOvA= +buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.36.1-00000000000000-1f40f333891d.1/go.mod h1:7azHjtjY3sk38xuZGlf2X6DpAPgQMoeZZMix+JkqsdU= +buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.36.2-00000000000000-1f40f333891d.1 h1:cRvRFDg3/KPgEB2+8/orNwCWBhZO0wVZKij4TTKBj9w= +buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.36.2-00000000000000-1f40f333891d.1/go.mod h1:oB3M+Fq9RgyUWGMqYk2FqRobQpdH1yZQZ9TYOoc4yIw= +buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.36.2-00000000000000-42cbdd5aad4c.1 h1:GnqNuwC6UjXvtjGscDekiO+/lstY7NWOILlsOMGNpC4= +buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.36.2-00000000000000-42cbdd5aad4c.1/go.mod h1:oB3M+Fq9RgyUWGMqYk2FqRobQpdH1yZQZ9TYOoc4yIw= buf.build/gen/go/astria/primitives/protocolbuffers/go v1.35.1-20240911152449-eeebd3decdce.1 h1:kG4riHqlF9X6iZ1Oxs5/6ul6aue7MS+A6DK6HAchuTk= buf.build/gen/go/astria/primitives/protocolbuffers/go v1.35.1-20240911152449-eeebd3decdce.1/go.mod h1:n9L7X3VAj4od4VHf2ScJuHARUUQTSxJqtRHZk/7Ptt0= buf.build/gen/go/astria/primitives/protocolbuffers/go v1.35.2-00000000000000-2f2e9ce53f59.1 h1:C1bT0G1In6Z6tBERd1XqwDjdxTK+PatSOJYlVk5Is60= buf.build/gen/go/astria/primitives/protocolbuffers/go v1.35.2-00000000000000-2f2e9ce53f59.1/go.mod h1:I9FcB1oNqT1nI+ny0GD8gF9YrIYrHmczgNu6MTE9fAo= +buf.build/gen/go/astria/primitives/protocolbuffers/go v1.36.1-00000000000000-9a039a6ed8db.1 h1:v+RKpd5zE6rqOMA44OLRpDLPYlakjmddvmFFrKxzb48= +buf.build/gen/go/astria/primitives/protocolbuffers/go v1.36.1-00000000000000-9a039a6ed8db.1/go.mod h1:HnX2FkSKZuD3zPFBR+Q17WzloqvIbFd0pYE++or/x2Q= +buf.build/gen/go/astria/primitives/protocolbuffers/go v1.36.2-00000000000000-9a039a6ed8db.1 h1:inT/lOAbHunpGP9YLqtAQNssrxEIgH/OmxXNwbXjUqs= +buf.build/gen/go/astria/primitives/protocolbuffers/go v1.36.2-00000000000000-9a039a6ed8db.1/go.mod h1:Lk1TBSGhOGvbtj0lb7eTeq+Z4N86/67Ay+WWxbqhh6s= buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.35.1-20241017141511-71aab1871615.1 h1:hPMoxTiT7jJjnIbWqneBbL05VeVOTD9UeC/qdvzHL8g= buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.35.1-20241017141511-71aab1871615.1/go.mod h1:2uasRFMH+a3DaF34c1o+w7/YtYnoknmARyYpb9W2QIc= buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.35.2-00000000000000-0eda7df0ee38.1 h1:uJm/22xugluY5AL2NkIDbNEFBxzN6UcI8vts/bGEDBs= buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.35.2-00000000000000-0eda7df0ee38.1/go.mod h1:1Z9P18WNTOT+KvLlc0+2FkcBJ7l5eRUUFcnOxHmLeRA= +buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.36.1-00000000000000-e54e1c9ad405.1 h1:querphz/TCGphT0qGG4DJo6p8qAsfL5/8SEBgfemVhk= +buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.36.1-00000000000000-e54e1c9ad405.1/go.mod h1:D6ou7OxkQXmiZDDNNrT147dA9wC9rhJPchCIfVbw9wM= +buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.36.2-00000000000000-e54e1c9ad405.1 h1:n2embOKwJS+YIyjHRDvOAo7c/kuv3fw9U+gQ/g2Yis8= +buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go v1.36.2-00000000000000-e54e1c9ad405.1/go.mod h1:dHPKfn7RW6FSo7EkD0LqPhZUmRm5NXMB+tWvTrTnZTQ= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -923,6 +941,10 @@ google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFyt google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= +google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/grpc/execution/server.go b/grpc/execution/server.go index 49f12c7b4..1bb17cb1e 100644 --- a/grpc/execution/server.go +++ b/grpc/execution/server.go @@ -100,7 +100,7 @@ func (s *ExecutionServiceServerV1) GetBlock(ctx context.Context, req *astriaPb.G res, err := s.getBlockFromIdentifier(req.GetIdentifier()) if err != nil { log.Error("failed finding block", err) - return nil, err + return nil, shared.WrapError(err, "failed finding block") } log.Debug("GetBlock completed", "request", req, "response", res) @@ -125,7 +125,7 @@ func (s *ExecutionServiceServerV1) BatchGetBlocks(ctx context.Context, req *astr block, err := s.getBlockFromIdentifier(id) if err != nil { log.Error("failed finding block with id", id, "error", err) - return nil, err + return nil, shared.WrapError(err, fmt.Sprintf("failed finding block with id %s", id.String())) } blocks = append(blocks, block) @@ -190,7 +190,7 @@ func (s *ExecutionServiceServerV1) ExecuteBlock(ctx context.Context, req *astria payload, err := s.Eth().Miner().BuildPayload(payloadAttributes) if err != nil { log.Error("failed to build payload", "err", err) - return nil, status.Error(codes.InvalidArgument, "Could not build block with provided txs") + return nil, status.Errorf(codes.InvalidArgument, shared.WrapError(err, "Could not build block with provided txs").Error()) } // call blockchain.InsertChain to actually execute and write the blocks to @@ -198,12 +198,12 @@ func (s *ExecutionServiceServerV1) ExecuteBlock(ctx context.Context, req *astria block, err := engine.ExecutableDataToBlock(*payload.Resolve().ExecutionPayload, nil, nil) if err != nil { log.Error("failed to convert executable data to block", err) - return nil, status.Error(codes.Internal, "failed to execute block") + return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to convert executable data to block").Error()) } err = s.Bc().InsertBlockWithoutSetHead(block) if err != nil { log.Error("failed to insert block to chain", "hash", block.Hash(), "prevHash", req.PrevBlockHash, "err", err) - return nil, status.Error(codes.Internal, "failed to insert block to chain") + return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to insert block to chain").Error()) } // remove txs from original mempool @@ -244,12 +244,12 @@ func (s *ExecutionServiceServerV1) GetCommitmentState(ctx context.Context, req * softBlock, err := ethHeaderToExecutionBlock(s.Bc().CurrentSafeBlock()) if err != nil { log.Error("error finding safe block", err) - return nil, status.Error(codes.Internal, "could not locate soft block") + return nil, status.Error(codes.Internal, shared.WrapError(err, "could not locate soft block").Error()) } firmBlock, err := ethHeaderToExecutionBlock(s.Bc().CurrentFinalBlock()) if err != nil { log.Error("error finding final block", err) - return nil, status.Error(codes.Internal, "could not locate firm block") + return nil, status.Error(codes.Internal, shared.WrapError(err, "could not locate firm block").Error()) } celestiaBlock := s.Bc().CurrentBaseCelestiaHeight() @@ -312,7 +312,7 @@ func (s *ExecutionServiceServerV1) UpdateCommitmentState(ctx context.Context, re if currentHead != softEthHash { if _, err := s.Bc().SetCanonical(softBlock); err != nil { log.Error("failed updating canonical chain to soft block", err) - return nil, status.Error(codes.Internal, "Could not update head to safe hash") + return nil, status.Error(codes.Internal, shared.WrapError(err, "Could not update head to safe hash").Error()) } } @@ -368,7 +368,7 @@ func (s *ExecutionServiceServerV1) getBlockFromIdentifier(identifier *astriaPb.B res, err := ethHeaderToExecutionBlock(header) if err != nil { // This should never happen since we validate header exists above. - return nil, status.Error(codes.Internal, "internal error") + return nil, status.Error(codes.Internal, shared.WrapError(err, "internal error").Error()) } return res, nil diff --git a/grpc/optimistic/server.go b/grpc/optimistic/server.go index 20202fecc..52cf8116c 100644 --- a/grpc/optimistic/server.go +++ b/grpc/optimistic/server.go @@ -1,12 +1,11 @@ package optimistic import ( - optimisticGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/bundle/v1alpha1/bundlev1alpha1grpc" - optimsticPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/bundle/v1alpha1" + optimisticGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/auction/v1alpha1/auctionv1alpha1grpc" + optimsticPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/auction/v1alpha1" astriaPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/execution/v1" "context" "errors" - "fmt" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" cmath "github.com/ethereum/go-ethereum/common/math" @@ -29,7 +28,7 @@ import ( type OptimisticServiceV1Alpha1 struct { optimisticGrpc.UnimplementedOptimisticExecutionServiceServer - optimisticGrpc.UnimplementedBundleServiceServer + optimisticGrpc.UnimplementedAuctionServiceServer sharedServiceContainer *shared.SharedServiceContainer @@ -39,6 +38,8 @@ type OptimisticServiceV1Alpha1 struct { var ( executeOptimisticBlockRequestCount = metrics.GetOrRegisterCounter("astria/optimistic/execute_optimistic_block_requests", nil) executeOptimisticBlockSuccessCount = metrics.GetOrRegisterCounter("astria/optimistic/execute_optimistic_block_success", nil) + optimisticBlockHeight = metrics.GetOrRegisterGauge("astria/execution/optimistic_block_height", nil) + txsStreamedCount = metrics.GetOrRegisterCounter("astria/optimistic/txs_streamed", nil) executionOptimisticBlockTimer = metrics.GetOrRegisterTimer("astria/optimistic/execute_optimistic_block_time", nil) ) @@ -53,7 +54,9 @@ func NewOptimisticServiceV1Alpha(sharedServiceContainer *shared.SharedServiceCon return optimisticService } -func (o *OptimisticServiceV1Alpha1) GetBundleStream(_ *optimsticPb.GetBundleStreamRequest, stream optimisticGrpc.BundleService_GetBundleStreamServer) error { +func (o *OptimisticServiceV1Alpha1) GetBidStream(_ *optimsticPb.GetBidStreamRequest, stream optimisticGrpc.AuctionService_GetBidStreamServer) error { + log.Debug("GetBidStream called") + pendingTxEventCh := make(chan core.NewTxsEvent) pendingTxEvent := o.Eth().TxPool().SubscribeTransactions(pendingTxEventCh, false) defer pendingTxEvent.Unsubscribe() @@ -66,7 +69,7 @@ func (o *OptimisticServiceV1Alpha1) GetBundleStream(_ *optimsticPb.GetBundleStre optimisticBlock := o.Eth().BlockChain().CurrentOptimisticBlock() for _, pendingTx := range pendingTxs.Txs { - bundle := optimsticPb.Bundle{} + bid := optimsticPb.Bid{} totalCost := big.NewInt(0) effectiveTip := cmath.BigMin(pendingTx.GasTipCap(), new(big.Int).Sub(pendingTx.GasFeeCap(), optimisticBlock.BaseFee)) @@ -75,32 +78,41 @@ func (o *OptimisticServiceV1Alpha1) GetBundleStream(_ *optimsticPb.GetBundleStre marshalledTxs := [][]byte{} marshalledTx, err := pendingTx.MarshalBinary() if err != nil { - return status.Errorf(codes.Internal, "error marshalling tx: %v", err) + return status.Errorf(codes.Internal, shared.WrapError(err, "error marshalling tx").Error()) } marshalledTxs = append(marshalledTxs, marshalledTx) - bundle.Fee = totalCost.Uint64() - bundle.Transactions = marshalledTxs - bundle.BaseSequencerBlockHash = *o.currentOptimisticSequencerBlock.Load() - bundle.PrevRollupBlockHash = optimisticBlock.Hash().Bytes() + bid.Fee = totalCost.Uint64() + bid.Transactions = marshalledTxs + bid.SequencerParentBlockHash = *o.currentOptimisticSequencerBlock.Load() + bid.RollupParentBlockHash = optimisticBlock.Hash().Bytes() - err = stream.Send(&optimsticPb.GetBundleStreamResponse{Bundle: &bundle}) + txsStreamedCount.Inc(1) + err = stream.Send(&optimsticPb.GetBidStreamResponse{Bid: &bid}) if err != nil { - return status.Errorf(codes.Internal, "error sending bundle over stream: %v", err) + log.Error("error sending bid over stream", "err", err) + return status.Error(codes.Internal, shared.WrapError(err, "error sending bid over stream").Error()) } } case err := <-pendingTxEvent.Err(): - return status.Errorf(codes.Internal, "error waiting for pending transactions: %v", err) + if err != nil { + log.Error("error waiting for pending transactions", "err", err) + return status.Error(codes.Internal, shared.WrapError(err, "error waiting for pending transactions").Error()) + } else { + // TODO - what is the right error code here? + return status.Error(codes.Internal, "tx pool subscription closed") + } case <-stream.Context().Done(): - log.Debug("GetBundleStream stream closed with error", "err", stream.Context().Err()) return stream.Context().Err() } } } func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlockStream(stream optimisticGrpc.OptimisticExecutionService_ExecuteOptimisticBlockStreamServer) error { + log.Debug("ExecuteOptimisticBlockStream called") + mempoolClearingEventCh := make(chan core.NewMempoolCleared) mempoolClearingEvent := o.Eth().TxPool().SubscribeMempoolClearance(mempoolClearingEventCh) defer mempoolClearingEvent.Unsubscribe() @@ -115,12 +127,14 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlockStream(stream optimist return err } + executeOptimisticBlockRequestCount.Inc(1) + baseBlock := msg.GetBaseBlock() // execute the optimistic block and wait for the mempool clearing event optimisticBlock, err := o.ExecuteOptimisticBlock(stream.Context(), baseBlock) if err != nil { - return status.Error(codes.Internal, "failed to execute optimistic block") + return status.Errorf(codes.Internal, shared.WrapError(err, "failed to execute optimistic block").Error()) } optimisticBlockHash := common.BytesToHash(optimisticBlock.Hash) @@ -131,14 +145,24 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlockStream(stream optimist return status.Error(codes.Internal, "failed to clear mempool after optimistic block execution") } o.currentOptimisticSequencerBlock.Store(&baseBlock.SequencerBlockHash) + executeOptimisticBlockSuccessCount.Inc(1) err = stream.Send(&optimsticPb.ExecuteOptimisticBlockStreamResponse{ Block: optimisticBlock, BaseSequencerBlockHash: baseBlock.SequencerBlockHash, }) case <-time.After(500 * time.Millisecond): + log.Error("timed out waiting for mempool to clear after optimistic block execution") return status.Error(codes.DeadlineExceeded, "timed out waiting for mempool to clear after optimistic block execution") case err := <-mempoolClearingEvent.Err(): - return status.Errorf(codes.Internal, "error waiting for mempool clearing event: %v", err) + if err != nil { + log.Error("error waiting for mempool clearing event", "err", err) + return status.Errorf(codes.Internal, shared.WrapError(err, "error waiting for mempool clearing event").Error()) + } else { + // TODO - what is the right error code here? + return status.Error(codes.Internal, "mempool clearance subscription closed") + } + case <-stream.Context().Done(): + return stream.Context().Err() } } } @@ -146,26 +170,23 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlockStream(stream optimist func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context, req *optimsticPb.BaseBlock) (*astriaPb.Block, error) { // we need to execute the optimistic block log.Debug("ExecuteOptimisticBlock called", "timestamp", req.Timestamp, "sequencer_block_hash", req.SequencerBlockHash) - executeOptimisticBlockRequestCount.Inc(1) + + // Deliberately called after lock, to more directly measure the time spent executing + executionStart := time.Now() + defer executionOptimisticBlockTimer.UpdateSince(executionStart) if err := validateStaticExecuteOptimisticBlockRequest(req); err != nil { log.Error("ExecuteOptimisticBlock called with invalid BaseBlock", "err", err) - return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("BaseBlock is invalid: %s", err.Error())) + return nil, status.Error(codes.InvalidArgument, shared.WrapError(err, "invalid BaseBlock").Error()) } if !o.SyncMethodsCalled() { return nil, status.Error(codes.PermissionDenied, "Cannot execute block until GetGenesisInfo && GetCommitmentState methods are called") } - // Deliberately called after lock, to more directly measure the time spent executing - executionStart := time.Now() - defer executionOptimisticBlockTimer.UpdateSince(executionStart) - softBlock := o.Bc().CurrentSafeBlock() - o.BlockExecutionLock().Lock() nextFeeRecipient := o.NextFeeRecipient() - o.BlockExecutionLock().Unlock() // the height that this block will be at height := o.Bc().CurrentBlock().Number.Uint64() + 1 @@ -186,13 +207,13 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context, payload, err := o.Eth().Miner().BuildPayload(payloadAttributes) if err != nil { log.Error("failed to build payload", "err", err) - return nil, status.Error(codes.InvalidArgument, "Could not build block with provided txs") + return nil, status.Errorf(codes.InvalidArgument, shared.WrapError(err, "failed to build payload").Error()) } block, err := engine.ExecutableDataToBlock(*payload.Resolve().ExecutionPayload, nil, nil) if err != nil { log.Error("failed to convert executable data to block", err) - return nil, status.Error(codes.Internal, "failed to execute block") + return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to convert executable data to block").Error()) } // this will insert the optimistic block into the chain and persist it's state without @@ -200,11 +221,13 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context, err = o.Bc().InsertBlockWithoutSetHead(block) if err != nil { log.Error("failed to insert block to chain", "hash", block.Hash(), "prevHash", block.ParentHash(), "err", err) - return nil, status.Error(codes.Internal, "failed to insert block to chain") + return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to insert block to chain").Error()) } // we store a pointer to the optimistic block in the chain so that we can use it // to retrieve the state of the optimistic block + // this method also sends an event which indicates that a new optimistic block has been set + // the mempool clearing logic is triggered when this event is received o.Bc().SetOptimistic(block) res := &astriaPb.Block{ @@ -216,8 +239,9 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context, }, } + optimisticBlockHeight.Update(int64(block.NumberU64())) + log.Info("ExecuteOptimisticBlock completed", "block_num", res.Number, "timestamp", res.Timestamp) - executeOptimisticBlockSuccessCount.Inc(1) return res, nil } diff --git a/grpc/optimistic/server_test.go b/grpc/optimistic/server_test.go index ff2359520..538b0433b 100644 --- a/grpc/optimistic/server_test.go +++ b/grpc/optimistic/server_test.go @@ -1,7 +1,7 @@ package optimistic import ( - optimsticPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/bundle/v1alpha1" + optimsticPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/auction/v1alpha1" astriaPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/execution/v1" primitivev1 "buf.build/gen/go/astria/primitives/protocolbuffers/go/astria/primitive/v1" sequencerblockv1 "buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go/astria/sequencerblock/v1" @@ -193,7 +193,7 @@ func TestOptimisticServiceServerV1Alpha1_ExecuteOptimisticBlock(t *testing.T) { } } -func TestNewOptimisticServiceServerV1Alpha_StreamBundles(t *testing.T) { +func TestNewOptimisticServiceServerV1Alpha_StreamBids(t *testing.T) { ethservice, sharedService, _, _ := shared.SetupSharedService(t, 10) optimisticServiceV1Alpha1 := SetupOptimisticService(t, sharedService) @@ -286,13 +286,13 @@ func TestNewOptimisticServiceServerV1Alpha_StreamBundles(t *testing.T) { require.Equal(t, pending, 0, "Mempool should have 0 pending txs") require.Equal(t, queued, 0, "Mempool should have 0 queued txs") - mockServerSideStreaming := MockServerSideStreaming[optimsticPb.GetBundleStreamResponse]{ - sentResponses: []*optimsticPb.GetBundleStreamResponse{}, + mockServerSideStreaming := MockServerSideStreaming[optimsticPb.GetBidStreamResponse]{ + sentResponses: []*optimsticPb.GetBidStreamResponse{}, } errorCh = make(chan error) go func() { - errorCh <- optimisticServiceV1Alpha1.GetBundleStream(&optimsticPb.GetBundleStreamRequest{}, &mockServerSideStreaming) + errorCh <- optimisticServiceV1Alpha1.GetBidStream(&optimsticPb.GetBidStreamRequest{}, &mockServerSideStreaming) }() stateDb, err := ethservice.BlockChain().StateAt(currentOptimisticBlock.Root) @@ -334,26 +334,26 @@ func TestNewOptimisticServiceServerV1Alpha_StreamBundles(t *testing.T) { select { case err := <-errorCh: - require.ErrorContains(t, err, "error waiting for pending transactions") + require.ErrorContains(t, err, "tx pool subscription closed") } require.Len(t, mockServerSideStreaming.sentResponses, 5, "Number of responses should match the number of requests") txIndx := 0 for _, resp := range mockServerSideStreaming.sentResponses { - bundle := resp.GetBundle() + bid := resp.GetBid() - require.Len(t, bundle.Transactions, 1, "Bundle should have 1 tx") + require.Len(t, bid.Transactions, 1, "Bid should have 1 tx") - receivedTx := bundle.Transactions[0] + receivedTx := bid.Transactions[0] sentTx := txs[txIndx] marshalledSentTx, err := sentTx.MarshalBinary() require.Nil(t, err, "Failed to marshal tx") require.True(t, bytes.Equal(receivedTx, marshalledSentTx), "Received tx does not match sent tx") txIndx += 1 - require.True(t, bytes.Equal(bundle.PrevRollupBlockHash, currentOptimisticBlock.Hash().Bytes()), "PrevRollupBlockHash should match the current optimistic block hash") - require.True(t, bytes.Equal(bundle.BaseSequencerBlockHash, *optimisticServiceV1Alpha1.currentOptimisticSequencerBlock.Load()), "BaseSequencerBlockHash should match the current optimistic sequencer block hash") + require.True(t, bytes.Equal(bid.RollupParentBlockHash, currentOptimisticBlock.Hash().Bytes()), "PrevRollupBlockHash should match the current optimistic block hash") + require.True(t, bytes.Equal(bid.SequencerParentBlockHash, *optimisticServiceV1Alpha1.currentOptimisticSequencerBlock.Load()), "BaseSequencerBlockHash should match the current optimistic sequencer block hash") } } diff --git a/grpc/optimistic/validation.go b/grpc/optimistic/validation.go index a59420d73..cbd6c62e6 100644 --- a/grpc/optimistic/validation.go +++ b/grpc/optimistic/validation.go @@ -1,7 +1,7 @@ package optimistic import ( - optimsticPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/bundle/v1alpha1" + optimsticPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/auction/v1alpha1" "fmt" ) diff --git a/grpc/shared/container.go b/grpc/shared/container.go index bf5e23b78..263971c5e 100644 --- a/grpc/shared/container.go +++ b/grpc/shared/container.go @@ -28,8 +28,7 @@ type SharedServiceContainer struct { // auctioneer address is a bech32m address auctioneerAddress atomic.Pointer[string] - // TODO: bharath - we could make this an atomic pointer??? - nextFeeRecipient common.Address // Fee recipient for the next block + nextFeeRecipient atomic.Pointer[common.Address] // Fee recipient for the next block } func NewSharedServiceContainer(eth *eth.Ethereum) (*SharedServiceContainer, error) { @@ -123,10 +122,10 @@ func NewSharedServiceContainer(eth *eth.Ethereum) (*SharedServiceContainer, erro bc: bc, bridgeAddresses: bridgeAddresses, bridgeAllowedAssets: bridgeAllowedAssets, - nextFeeRecipient: nextFeeRecipient, } sharedServiceContainer.SetAuctioneerAddress(auctioneerAddress) + sharedServiceContainer.SetNextFeeRecipient(nextFeeRecipient) return sharedServiceContainer, nil } @@ -168,12 +167,12 @@ func (s *SharedServiceContainer) BlockExecutionLock() *sync.Mutex { } func (s *SharedServiceContainer) NextFeeRecipient() common.Address { - return s.nextFeeRecipient + return *s.nextFeeRecipient.Load() } // assumes that the block execution lock is being held func (s *SharedServiceContainer) SetNextFeeRecipient(nextFeeRecipient common.Address) { - s.nextFeeRecipient = nextFeeRecipient + s.nextFeeRecipient.Store(&nextFeeRecipient) } func (s *SharedServiceContainer) BridgeAddresses() map[string]*params.AstriaBridgeAddressConfig { diff --git a/grpc/shared/validation.go b/grpc/shared/validation.go index b6cd82839..90ae13619 100644 --- a/grpc/shared/validation.go +++ b/grpc/shared/validation.go @@ -1,23 +1,40 @@ package shared import ( - bundlev1alpha1 "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/bundle/v1alpha1" + auctionv1alpha1 "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/auction/v1alpha1" primitivev1 "buf.build/gen/go/astria/primitives/protocolbuffers/go/astria/primitive/v1" sequencerblockv1 "buf.build/gen/go/astria/sequencerblock-apis/protocolbuffers/go/astria/sequencerblock/v1" "bytes" "crypto/ed25519" "crypto/sha256" + "errors" "fmt" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/contracts" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/golang/protobuf/proto" - "github.com/pkg/errors" + proto2 "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" "math/big" + "time" ) +var ( + successfulUnbundledAllocations = metrics.GetOrRegisterGauge("astria/optimistic/successful_unbundled_allocations", nil) + allocationsWithInvalidPrevBlockHash = metrics.GetOrRegisterGauge("astria/optimistic/allocations_with_invalid_prev_block_hash", nil) + allocationsWithInvalidPubKey = metrics.GetOrRegisterGauge("astria/optimistic/allocations_with_invalid_pub_key", nil) + allocationsWithInvalidSignature = metrics.GetOrRegisterGauge("astria/optimistic/allocations_with_invalid_signature", nil) + + allocationUnbundlingTimer = metrics.GetOrRegisterTimer("astria/optimistic/allocation_unbundling_time", nil) +) + +func WrapError(err error, msg string) error { + return fmt.Errorf("%s: %w", msg, err) +} + func protoU128ToBigInt(u128 *primitivev1.Uint128) *big.Int { lo := big.NewInt(0).SetUint64(u128.Lo) hi := big.NewInt(0).SetUint64(u128.Hi) @@ -113,43 +130,65 @@ func validateAndUnmarshallSequenceAction(tx *sequencerblockv1.RollupData) (*type return ethTx, nil } -func unmarshallAllocationTxs(allocation *bundlev1alpha1.Allocation, prevBlockHash []byte, auctioneerBech32Address string, addressPrefix string) (types.Transactions, error) { +func unmarshallAllocationTxs(allocation *auctionv1alpha1.Allocation, prevBlockHash []byte, auctioneerBech32Address string, addressPrefix string) (types.Transactions, error) { + unbundlingStart := time.Now() + defer allocationUnbundlingTimer.UpdateSince(unbundlingStart) + processedTxs := types.Transactions{} - payload := allocation.GetPayload() + bid := &auctionv1alpha1.Bid{} + + unprocessedBid := allocation.GetBid() - if !bytes.Equal(payload.PrevRollupBlockHash, prevBlockHash) { - return nil, errors.New("prev block hash do not match in allocation") + err := anypb.UnmarshalTo(unprocessedBid, bid, proto2.UnmarshalOptions{ + Merge: false, + AllowPartial: false, + }) + if err != nil { + return nil, WrapError(err, "failed to unmarshal bid") + } + + log.Debug("Found a potential allocation in the rollup data. Checking if it is valid.", "prevBlockHash", common.BytesToHash(prevBlockHash).String(), "auctioneerBech32Address", auctioneerBech32Address) + + if !bytes.Equal(bid.GetRollupParentBlockHash(), prevBlockHash) { + allocationsWithInvalidPrevBlockHash.Inc(1) + return nil, errors.New("prev block hash in allocation does not match the previous block hash") } publicKey := ed25519.PublicKey(allocation.GetPublicKey()) bech32Address, err := EncodeFromPublicKey(addressPrefix, publicKey) if err != nil { - return nil, errors.Wrapf(err, "failed to encode public key to bech32m address: %s", publicKey) + return nil, WrapError(err, fmt.Sprintf("failed to encode public key to bech32m address: %s", publicKey)) } + if auctioneerBech32Address != bech32Address { - return nil, errors.Errorf("address in allocation does not match auctioneer address. expected: %s, got: %s", auctioneerBech32Address, bech32Address) + allocationsWithInvalidPubKey.Inc(1) + return nil, fmt.Errorf("address in allocation does not match auctioneer address. expected: %s, got: %s", auctioneerBech32Address, bech32Address) } - message, err := proto.Marshal(allocation.GetPayload()) + message, err := proto.Marshal(bid) if err != nil { - return nil, errors.Wrap(err, "failed to marshal allocation") + return nil, WrapError(err, "failed to marshal allocation to verify signature") } signature := allocation.GetSignature() if !ed25519.Verify(publicKey, message, signature) { - return nil, errors.New("failed to verify signature") + allocationsWithInvalidSignature.Inc(1) + return nil, fmt.Errorf("signature in allocation does not match the public key") } - // unmarshall the transactions in the bundle - for _, allocationTx := range payload.GetTransactions() { + log.Debug("Allocation is valid. Unmarshalling the transactions in the bid.") + // unmarshall the transactions in the bid + for _, allocationTx := range bid.GetTransactions() { ethtx := new(types.Transaction) err := ethtx.UnmarshalBinary(allocationTx) if err != nil { - return nil, errors.Wrap(err, "failed to unmarshall allocation transaction") + return nil, WrapError(err, "failed to unmarshall allocation transaction") } processedTxs = append(processedTxs, ethtx) } + successfulUnbundledAllocations.Inc(1) + return processedTxs, nil } @@ -158,10 +197,11 @@ func unmarshallAllocationTxs(allocation *bundlev1alpha1.Allocation, prevBlockHas // TODO - this function has become too big. we should start breaking it down func UnbundleRollupDataTransactions(txs []*sequencerblockv1.RollupData, height uint64, bridgeAddresses map[string]*params.AstriaBridgeAddressConfig, bridgeAllowedAssets map[string]struct{}, prevBlockHash []byte, auctioneerBech32Address string, addressPrefix string) types.Transactions { + processedTxs := types.Transactions{} allocationTxs := types.Transactions{} - // we just return the allocation here and do not unmarshall the transactions in the bundle if we find it - var allocation *bundlev1alpha1.Allocation + // we just return the allocation here and do not unmarshall the transactions in the bid if we find it + var allocation *auctionv1alpha1.Allocation for _, tx := range txs { if deposit := tx.GetDeposit(); deposit != nil { depositTx, err := validateAndUnmarshalDepositTx(deposit, height, bridgeAddresses, bridgeAllowedAssets) @@ -176,7 +216,7 @@ func UnbundleRollupDataTransactions(txs []*sequencerblockv1.RollupData, height u // check if sequence data is of type Allocation if allocation == nil { // TODO - check if we can avoid a temp value - tempAllocation := &bundlev1alpha1.Allocation{} + tempAllocation := &auctionv1alpha1.Allocation{} err := proto.Unmarshal(sequenceData, tempAllocation) if err == nil { unmarshalledAllocationTxs, err := unmarshallAllocationTxs(tempAllocation, prevBlockHash, auctioneerBech32Address, addressPrefix) diff --git a/grpc/shared/validation_test.go b/grpc/shared/validation_test.go index 57c404366..3292d5334 100644 --- a/grpc/shared/validation_test.go +++ b/grpc/shared/validation_test.go @@ -1,11 +1,12 @@ package shared import ( - bundlev1alpha1 "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/bundle/v1alpha1" + auctionv1alpha1 "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/auction/v1alpha1" "bytes" "crypto/ecdsa" "crypto/ed25519" "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" "math/big" "testing" @@ -20,6 +21,25 @@ import ( "github.com/stretchr/testify/require" ) +type allocationInfo struct { + signature []byte + publicKey []byte + bid *auctionv1alpha1.Bid +} + +func (a *allocationInfo) convertToAllocation() (*auctionv1alpha1.Allocation, error) { + convertedBid, err := anypb.New(a.bid) + if err != nil { + return nil, err + } + + return &auctionv1alpha1.Allocation{ + Signature: a.signature, + PublicKey: a.publicKey, + Bid: convertedBid, + }, nil +} + func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) *types.Transaction { return pricedTransaction(nonce, gaslimit, big.NewInt(1), key) } @@ -88,14 +108,14 @@ func TestUnmarshallAllocationTxs(t *testing.T) { validMarshalledTx3, err := tx3.MarshalBinary() require.NoError(t, err, "failed to marshal valid tx: %v", err) - validPayload := &bundlev1alpha1.Bundle{ - Fee: 100, - Transactions: [][]byte{validMarshalledTx1, validMarshalledTx2, validMarshalledTx3}, - BaseSequencerBlockHash: []byte("sequencer block hash"), - PrevRollupBlockHash: []byte("prev rollup block hash"), + validBid := &auctionv1alpha1.Bid{ + Fee: 100, + Transactions: [][]byte{validMarshalledTx1, validMarshalledTx2, validMarshalledTx3}, + SequencerParentBlockHash: []byte("sequencer block hash"), + RollupParentBlockHash: []byte("prev rollup block hash"), } - marshalledAllocation, err := proto.Marshal(validPayload) + marshalledAllocation, err := proto.Marshal(validBid) require.NoError(t, err, "failed to marshal payload: %v", err) signedAllocation, err := auctioneerPrivKey.Sign(nil, marshalledAllocation, &ed25519.Options{ @@ -105,8 +125,10 @@ func TestUnmarshallAllocationTxs(t *testing.T) { require.NoError(t, err, "failed to sign allocation: %v", err) tests := []struct { - description string - allocation *bundlev1alpha1.Allocation + description string + + allocationInfo allocationInfo + prevBlockHash []byte expectedOutput types.Transactions // just check if error contains the string since error contains other details @@ -114,31 +136,30 @@ func TestUnmarshallAllocationTxs(t *testing.T) { }{ { description: "previous block hash mismatch", - allocation: &bundlev1alpha1.Allocation{ - // TODO - add signature and public key validation - Signature: make([]byte, 0), - PublicKey: make([]byte, 0), - Payload: &bundlev1alpha1.Bundle{ - Fee: 100, - Transactions: [][]byte{[]byte("unmarshallable tx")}, - BaseSequencerBlockHash: []byte("sequencer block hash"), - PrevRollupBlockHash: []byte("prev rollup block hash"), + allocationInfo: allocationInfo{ + signature: make([]byte, 0), + publicKey: make([]byte, 0), + bid: &auctionv1alpha1.Bid{ + Fee: 100, + Transactions: [][]byte{[]byte("unmarshallable tx")}, + SequencerParentBlockHash: []byte("sequencer block hash"), + RollupParentBlockHash: []byte("prev rollup block hash"), }, }, prevBlockHash: []byte("not prev rollup block hash"), expectedOutput: types.Transactions{}, - wantErr: "prev block hash do not match in allocation", + wantErr: "prev block hash in allocation does not match the previous block hash", }, { description: "public key doesn't match", - allocation: &bundlev1alpha1.Allocation{ - Signature: []byte("invalid signature"), - PublicKey: []byte("invalid public key"), - Payload: &bundlev1alpha1.Bundle{ - Fee: 100, - Transactions: [][]byte{[]byte("unmarshallable tx")}, - BaseSequencerBlockHash: []byte("sequencer block hash"), - PrevRollupBlockHash: []byte("prev rollup block hash"), + allocationInfo: allocationInfo{ + signature: []byte("invalid signature"), + publicKey: []byte("invalid public key"), + bid: &auctionv1alpha1.Bid{ + Fee: 100, + Transactions: [][]byte{[]byte("unmarshallable tx")}, + SequencerParentBlockHash: []byte("sequencer block hash"), + RollupParentBlockHash: []byte("prev rollup block hash"), }, }, prevBlockHash: []byte("prev rollup block hash"), @@ -147,26 +168,26 @@ func TestUnmarshallAllocationTxs(t *testing.T) { }, { description: "invalid signature", - allocation: &bundlev1alpha1.Allocation{ - Signature: []byte("invalid signature"), - PublicKey: auctioneerPubKey, - Payload: &bundlev1alpha1.Bundle{ - Fee: 100, - Transactions: [][]byte{[]byte("unmarshallable tx")}, - BaseSequencerBlockHash: []byte("sequencer block hash"), - PrevRollupBlockHash: []byte("prev rollup block hash"), + allocationInfo: allocationInfo{ + signature: []byte("invalid signature"), + publicKey: auctioneerPubKey, + bid: &auctionv1alpha1.Bid{ + Fee: 100, + Transactions: [][]byte{[]byte("unmarshallable tx")}, + SequencerParentBlockHash: []byte("sequencer block hash"), + RollupParentBlockHash: []byte("prev rollup block hash"), }, }, prevBlockHash: []byte("prev rollup block hash"), expectedOutput: types.Transactions{}, - wantErr: "failed to verify signature", + wantErr: "signature in allocation does not match the public key", }, { description: "valid allocation", - allocation: &bundlev1alpha1.Allocation{ - Signature: signedAllocation, - PublicKey: auctioneerPubKey, - Payload: validPayload, + allocationInfo: allocationInfo{ + signature: signedAllocation, + publicKey: auctioneerPubKey, + bid: validBid, }, prevBlockHash: []byte("prev rollup block hash"), expectedOutput: types.Transactions{tx1, tx2, tx3}, @@ -176,7 +197,10 @@ func TestUnmarshallAllocationTxs(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - finalTxs, err := unmarshallAllocationTxs(test.allocation, test.prevBlockHash, serviceV1Alpha1.AuctioneerAddress(), addressPrefix) + allocation, err := test.allocationInfo.convertToAllocation() + require.NoError(t, err, "failed to convert allocation info to allocation: %v", err) + + finalTxs, err := unmarshallAllocationTxs(allocation, test.prevBlockHash, serviceV1Alpha1.AuctioneerAddress(), addressPrefix) if test.wantErr == "" && err == nil { for _, tx := range test.expectedOutput { foundTx := false @@ -394,25 +418,28 @@ func TestUnbundleRollupData(t *testing.T) { validMarshalledTx5, err := tx5.MarshalBinary() require.NoError(t, err, "failed to marshal valid tx: %v", err) - payload := &bundlev1alpha1.Bundle{ - Fee: 100, - Transactions: [][]byte{validMarshalledTx1, validMarshalledTx2, validMarshalledTx3}, - BaseSequencerBlockHash: baseSequencerBlockHash, - PrevRollupBlockHash: prevRollupBlockHash, + bid := &auctionv1alpha1.Bid{ + Fee: 100, + Transactions: [][]byte{validMarshalledTx1, validMarshalledTx2, validMarshalledTx3}, + SequencerParentBlockHash: baseSequencerBlockHash, + RollupParentBlockHash: prevRollupBlockHash, } - marshalledPayload, err := proto.Marshal(payload) + marshalledBid, err := proto.Marshal(bid) require.NoError(t, err, "failed to marshal payload: %v", err) - signedPayload, err := auctioneerPrivKey.Sign(nil, marshalledPayload, &ed25519.Options{ + signedBid, err := auctioneerPrivKey.Sign(nil, marshalledBid, &ed25519.Options{ Hash: 0, Context: "", }) require.NoError(t, err, "failed to sign payload: %v", err) - allocation := &bundlev1alpha1.Allocation{ - Signature: signedPayload, + // TODO - we need better naming here! + finalBid, err := anypb.New(bid) + + allocation := &auctionv1alpha1.Allocation{ + Signature: signedBid, PublicKey: auctioneerPubKey, - Payload: payload, + Bid: finalBid, } marshalledAllocation, err := proto.Marshal(allocation) @@ -492,25 +519,28 @@ func TestUnbundleRollupDataWithDuplicateAllocations(t *testing.T) { validMarshalledTx5, err := tx5.MarshalBinary() require.NoError(t, err, "failed to marshal valid tx: %v", err) - payload := &bundlev1alpha1.Bundle{ - Fee: 100, - Transactions: [][]byte{validMarshalledTx1, validMarshalledTx2, validMarshalledTx3}, - BaseSequencerBlockHash: baseSequencerBlockHash, - PrevRollupBlockHash: prevRollupBlockHash, + bid := &auctionv1alpha1.Bid{ + Fee: 100, + Transactions: [][]byte{validMarshalledTx1, validMarshalledTx2, validMarshalledTx3}, + SequencerParentBlockHash: baseSequencerBlockHash, + RollupParentBlockHash: prevRollupBlockHash, } - marshalledPayload, err := proto.Marshal(payload) + marshalledBid, err := proto.Marshal(bid) require.NoError(t, err, "failed to marshal payload: %v", err) - signedPayload, err := auctioneerPrivKey.Sign(nil, marshalledPayload, &ed25519.Options{ + signedPayload, err := auctioneerPrivKey.Sign(nil, marshalledBid, &ed25519.Options{ Hash: 0, Context: "", }) require.NoError(t, err, "failed to sign payload: %v", err) - allocation := &bundlev1alpha1.Allocation{ + finalBid, err := anypb.New(bid) + require.NoError(t, err, "failed to convert bid to anypb: %v", err) + + allocation := &auctionv1alpha1.Allocation{ Signature: signedPayload, PublicKey: auctioneerPubKey, - Payload: payload, + Bid: finalBid, } marshalledAllocation, err := proto.Marshal(allocation) @@ -607,50 +637,55 @@ func TestUnbundleRollupDataWithDuplicateInvalidAllocations(t *testing.T) { invalidMarshalledTx2, err := invalidTx2.MarshalBinary() require.NoError(t, err, "failed to marshal valid tx: %v", err) - payload := &bundlev1alpha1.Bundle{ - Fee: 100, - Transactions: [][]byte{validMarshalledTx1, validMarshalledTx2, validMarshalledTx3}, - BaseSequencerBlockHash: baseSequencerBlockHash, - PrevRollupBlockHash: prevRollupBlockHash, + bid := &auctionv1alpha1.Bid{ + Fee: 100, + Transactions: [][]byte{validMarshalledTx1, validMarshalledTx2, validMarshalledTx3}, + SequencerParentBlockHash: baseSequencerBlockHash, + RollupParentBlockHash: prevRollupBlockHash, } + validBidAny, err := anypb.New(bid) + require.NoError(t, err, "failed to convert bid to anypb: %v", err) - marshalledPayload, err := proto.Marshal(payload) + marshalledBid, err := proto.Marshal(bid) require.NoError(t, err, "failed to marshal allocation: %v", err) - signedPayload, err := auctioneerPrivKey.Sign(nil, marshalledPayload, &ed25519.Options{ + signedBid, err := auctioneerPrivKey.Sign(nil, marshalledBid, &ed25519.Options{ Hash: 0, Context: "", }) require.NoError(t, err, "failed to sign allocation: %v", err) - invalidPayload := &bundlev1alpha1.Bundle{ - Fee: 100, - Transactions: [][]byte{invalidMarshalledTx1, invalidMarshalledTx2}, - BaseSequencerBlockHash: baseSequencerBlockHash, - PrevRollupBlockHash: prevRollupBlockHash, + invalidBid := &auctionv1alpha1.Bid{ + Fee: 100, + Transactions: [][]byte{invalidMarshalledTx1, invalidMarshalledTx2}, + SequencerParentBlockHash: baseSequencerBlockHash, + RollupParentBlockHash: prevRollupBlockHash, } - marshalledInvalidPayload, err := proto.Marshal(invalidPayload) + invalidBidAny, err := anypb.New(invalidBid) + require.NoError(t, err, "failed to convert bid to anypb: %v", err) + + marshalledInvalidBid, err := proto.Marshal(invalidBid) require.NoError(t, err, "failed to marshal invalid allocation: %v", err) - signedInvalidPayload, err := invalidAuctioneerprivkey.Sign(nil, marshalledInvalidPayload, &ed25519.Options{ + signedInvalidBid, err := invalidAuctioneerprivkey.Sign(nil, marshalledInvalidBid, &ed25519.Options{ Hash: 0, Context: "", }) require.NoError(t, err, "failed to sign allocation: %v", err) - allocation := &bundlev1alpha1.Allocation{ - Signature: signedPayload, + allocation := &auctionv1alpha1.Allocation{ + Signature: signedBid, PublicKey: auctioneerPubKey, - Payload: payload, + Bid: validBidAny, } marshalledAllocation, err := proto.Marshal(allocation) require.NoError(t, err, "failed to marshal allocation: %v", err) - invalidAllocation := &bundlev1alpha1.Allocation{ - Signature: signedInvalidPayload, + invalidAllocation := &auctionv1alpha1.Allocation{ + Signature: signedInvalidBid, // trying to spoof the actual auctioneer key PublicKey: auctioneerPubKey, - Payload: invalidPayload, + Bid: invalidBidAny, } marshalledInvalidAllocation, err := proto.Marshal(invalidAllocation) require.NoError(t, err, "failed to marshal invalid allocation: %v", err) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index ffa20fb62..7086f8156 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -834,6 +834,7 @@ func (s *BlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.Hash) m // - When blockNr is -2 the chain latest block is returned. // - When blockNr is -3 the chain finalized block is returned. // - When blockNr is -4 the chain safe block is returned. +// - When blockNr is -5 the chain optimistic block is returned. // - When fullTx is true all transactions in the block are returned, otherwise // only the transaction hash is returned. func (s *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) { diff --git a/node/grpcstack.go b/node/grpcstack.go index b3a34c2ca..15000f9ab 100644 --- a/node/grpcstack.go +++ b/node/grpcstack.go @@ -1,7 +1,7 @@ package node import ( - optimisticGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/bundle/v1alpha1/bundlev1alpha1grpc" + optimisticGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/auction/v1alpha1/auctionv1alpha1grpc" "net" "sync" @@ -19,7 +19,7 @@ type GRPCServerHandler struct { execServer *grpc.Server executionServiceServerV1a2 *astriaGrpc.ExecutionServiceServer optimisticExecServ *optimisticGrpc.OptimisticExecutionServiceServer - streamBundleServ *optimisticGrpc.BundleServiceServer + auctionServiceServ *optimisticGrpc.AuctionServiceServer enableAuctioneer bool } @@ -27,7 +27,7 @@ type GRPCServerHandler struct { // NewServer creates a new gRPC server. // It registers the execution service server. // It registers the gRPC server with the node so it can be stopped on shutdown. -func NewGRPCServerHandler(node *Node, execServ astriaGrpc.ExecutionServiceServer, optimisticExecServ optimisticGrpc.OptimisticExecutionServiceServer, streamBundleServ optimisticGrpc.BundleServiceServer, cfg *Config) error { +func NewGRPCServerHandler(node *Node, execServ astriaGrpc.ExecutionServiceServer, optimisticExecServ optimisticGrpc.OptimisticExecutionServiceServer, auctionServiceServ optimisticGrpc.AuctionServiceServer, cfg *Config) error { execServer := grpc.NewServer() log.Info("gRPC server enabled", "endpoint", cfg.GRPCEndpoint()) @@ -37,14 +37,14 @@ func NewGRPCServerHandler(node *Node, execServ astriaGrpc.ExecutionServiceServer execServer: execServer, executionServiceServerV1a2: &execServ, optimisticExecServ: &optimisticExecServ, - streamBundleServ: &streamBundleServ, + auctionServiceServ: &auctionServiceServ, enableAuctioneer: cfg.EnableAuctioneer, } astriaGrpc.RegisterExecutionServiceServer(execServer, execServ) if cfg.EnableAuctioneer { optimisticGrpc.RegisterOptimisticExecutionServiceServer(execServer, optimisticExecServ) - optimisticGrpc.RegisterBundleServiceServer(execServer, streamBundleServ) + optimisticGrpc.RegisterAuctionServiceServer(execServer, auctionServiceServ) } node.RegisterGRPCServer(serverHandler) diff --git a/rpc/types.go b/rpc/types.go index 2e53174b8..249efc51a 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -63,11 +63,12 @@ type jsonWriter interface { type BlockNumber int64 const ( - SafeBlockNumber = BlockNumber(-4) - FinalizedBlockNumber = BlockNumber(-3) - LatestBlockNumber = BlockNumber(-2) - PendingBlockNumber = BlockNumber(-1) - EarliestBlockNumber = BlockNumber(0) + OptimisticBlockNumber = BlockNumber(-5) + SafeBlockNumber = BlockNumber(-4) + FinalizedBlockNumber = BlockNumber(-3) + LatestBlockNumber = BlockNumber(-2) + PendingBlockNumber = BlockNumber(-1) + EarliestBlockNumber = BlockNumber(0) ) // UnmarshalJSON parses the given JSON fragment into a BlockNumber. It supports: @@ -98,6 +99,9 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error { case "safe": *bn = SafeBlockNumber return nil + case "optimistic": + *bn = OptimisticBlockNumber + return nil } blckNum, err := hexutil.DecodeUint64(input) @@ -135,6 +139,8 @@ func (bn BlockNumber) String() string { return "finalized" case SafeBlockNumber: return "safe" + case OptimisticBlockNumber: + return "optimistic" default: if bn < 0 { return fmt.Sprintf("", bn) @@ -188,6 +194,10 @@ func (bnh *BlockNumberOrHash) UnmarshalJSON(data []byte) error { bn := SafeBlockNumber bnh.BlockNumber = &bn return nil + case "optimistic": + bn := OptimisticBlockNumber + bnh.BlockNumber = &bn + return nil default: if len(input) == 66 { hash := common.Hash{}