From 62135146ff61cae0916c47a093510fce010819f9 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 5 Jun 2023 15:44:51 -0700 Subject: [PATCH 1/2] Update ProposeBlock Prysm RPC for Deneb --- beacon-chain/blockchain/setup_test.go | 5 + beacon-chain/p2p/interfaces.go | 1 + beacon-chain/p2p/testing/fuzz_p2p.go | 5 + beacon-chain/p2p/testing/mock_broadcaster.go | 6 + beacon-chain/p2p/testing/p2p.go | 6 + .../rpc/prysm/v1alpha1/validator/proposer.go | 114 ++++++++++-------- .../prysm/v1alpha1/validator/proposer_test.go | 51 ++++++++ 7 files changed, 139 insertions(+), 49 deletions(-) diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 89b4ad04ed19..0a7627f152c8 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -52,6 +52,11 @@ func (mb *mockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ ui return nil } +func (mb *mockBroadcaster) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.SignedBlobSidecar) error { + mb.broadcastCalled = true + return nil +} + func (mb *mockBroadcaster) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) { } diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 1bbd65363678..98c92d617075 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -35,6 +35,7 @@ type Broadcaster interface { Broadcast(context.Context, proto.Message) error BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error + BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.SignedBlobSidecar) error } // SetStreamHandler configures p2p to handle streams of a certain topic ID. diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index e9a6e1afeb31..ed902fb06f99 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -143,6 +143,11 @@ func (_ *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ * return nil } +// BroadcastBlob -- fake. +func (_ *FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.SignedBlobSidecar) error { + return nil +} + // InterceptPeerDial -- fake. func (_ *FakeP2P) InterceptPeerDial(peer.ID) (allow bool) { return true diff --git a/beacon-chain/p2p/testing/mock_broadcaster.go b/beacon-chain/p2p/testing/mock_broadcaster.go index b5f88097dcb0..c312bf47c359 100644 --- a/beacon-chain/p2p/testing/mock_broadcaster.go +++ b/beacon-chain/p2p/testing/mock_broadcaster.go @@ -33,3 +33,9 @@ func (m *MockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ uin m.BroadcastCalled = true return nil } + +// BroadcastBlob broadcasts a blob for mock. +func (m *MockBroadcaster) BroadcastBlob(context.Context, uint64, *ethpb.SignedBlobSidecar) error { + m.BroadcastCalled = true + return nil +} diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 2da2c9a92ebc..9574183a76b5 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -176,6 +176,12 @@ func (p *TestP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ * return nil } +// BroadcastBlob broadcasts a blob for mock. +func (p *TestP2P) BroadcastBlob(context.Context, uint64, *ethpb.SignedBlobSidecar) error { + p.BroadcastCalled = true + return nil +} + // SetStreamHandler for RPC. func (p *TestP2P) SetStreamHandler(topic string, handler network.StreamHandler) { p.BHost.SetStreamHandler(protocol.ID(topic), handler) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go index cb53176b6728..f227f538c2d1 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go @@ -25,6 +25,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -215,11 +216,74 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) ( func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) { ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock") defer span.End() + blk, err := blocks.NewSignedBeaconBlock(req.Block) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "%s: %v", CouldNotDecodeBlock, err) } - return vs.proposeGenericBeaconBlock(ctx, blk) + + unblinder, err := newUnblinder(blk, vs.BlockBuilder) + if err != nil { + return nil, errors.Wrap(err, "could not create unblinder") + } + blk, err = unblinder.unblindBuilderBlock(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not unblind builder block") + } + + // Broadcast the new block to the network. + blkPb, err := blk.Proto() + if err != nil { + return nil, errors.Wrap(err, "could not get protobuf block") + } + if err := vs.P2P.Broadcast(ctx, blkPb); err != nil { + return nil, fmt.Errorf("could not broadcast block: %v", err) + } + + root, err := blk.Block().HashTreeRoot() + if err != nil { + return nil, fmt.Errorf("could not tree hash block: %v", err) + } + log.WithFields(logrus.Fields{ + "blockRoot": hex.EncodeToString(root[:]), + }).Debug("Broadcasting block") + + if blk.Version() >= version.Deneb { + b, ok := req.GetBlock().(*ethpb.GenericSignedBeaconBlock_Deneb) + if !ok { + return nil, status.Error(codes.Internal, "Could not cast block to Deneb") + } + scs := make([]*ethpb.BlobSidecar, len(b.Deneb.Blobs)) + for i, blob := range b.Deneb.Blobs { + if err := vs.P2P.BroadcastBlob(ctx, blob.Message.Index, blob); err != nil { + log.WithError(err).Error("Could not broadcast blob") + } + scs[i] = blob.Message + } + if len(scs) > 0 { + if err := vs.BeaconDB.SaveBlobSidecar(ctx, scs); err != nil { + return nil, err + } + } + } + + // Do not block proposal critical path with debug logging or block feed updates. + defer func() { + log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf( + "Block proposal received via RPC") + vs.BlockNotifier.BlockFeed().Send(&feed.Event{ + Type: blockfeed.ReceivedBlock, + Data: &blockfeed.ReceivedBlockData{SignedBlock: blk}, + }) + }() + + if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil { + return nil, fmt.Errorf("could not process beacon block: %v", err) + } + + return ðpb.ProposeResponse{ + BlockRoot: root[:], + }, nil } // PrepareBeaconProposer caches and updates the fee recipient for the given proposer. @@ -301,54 +365,6 @@ func (vs *Server) GetFeeRecipientByPubKey(ctx context.Context, request *ethpb.Fe }, nil } -func (vs *Server) proposeGenericBeaconBlock(ctx context.Context, blk interfaces.SignedBeaconBlock) (*ethpb.ProposeResponse, error) { - ctx, span := trace.StartSpan(ctx, "ProposerServer.proposeGenericBeaconBlock") - defer span.End() - root, err := blk.Block().HashTreeRoot() - if err != nil { - return nil, fmt.Errorf("could not tree hash block: %v", err) - } - - unblinder, err := newUnblinder(blk, vs.BlockBuilder) - if err != nil { - return nil, errors.Wrap(err, "could not create unblinder") - } - blk, err = unblinder.unblindBuilderBlock(ctx) - if err != nil { - return nil, errors.Wrap(err, "could not unblind builder block") - } - - // Do not block proposal critical path with debug logging or block feed updates. - defer func() { - log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf( - "Block proposal received via RPC") - vs.BlockNotifier.BlockFeed().Send(&feed.Event{ - Type: blockfeed.ReceivedBlock, - Data: &blockfeed.ReceivedBlockData{SignedBlock: blk}, - }) - }() - - // Broadcast the new block to the network. - blkPb, err := blk.Proto() - if err != nil { - return nil, errors.Wrap(err, "could not get protobuf block") - } - if err := vs.P2P.Broadcast(ctx, blkPb); err != nil { - return nil, fmt.Errorf("could not broadcast block: %v", err) - } - log.WithFields(logrus.Fields{ - "blockRoot": hex.EncodeToString(root[:]), - }).Debug("Broadcasting block") - - if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil { - return nil, fmt.Errorf("could not process beacon block: %v", err) - } - - return ðpb.ProposeResponse{ - BlockRoot: root[:], - }, nil -} - // computeStateRoot computes the state root after a block has been processed through a state transition and // returns it to the validator client. func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go index 803ebf19c914..7f3df43aab75 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go @@ -678,6 +678,46 @@ func TestProposer_ProposeBlock_OK(t *testing.T) { return ðpb.GenericSignedBeaconBlock{Block: blk} }, }, + { + name: "bellatrix", + block: func(parent [32]byte) *ethpb.GenericSignedBeaconBlock { + blockToPropose := util.NewBeaconBlockBellatrix() + blockToPropose.Block.Slot = 5 + blockToPropose.Block.ParentRoot = parent[:] + blk := ðpb.GenericSignedBeaconBlock_Bellatrix{Bellatrix: blockToPropose} + return ðpb.GenericSignedBeaconBlock{Block: blk} + }, + }, + { + name: "deneb block no blob", + block: func(parent [32]byte) *ethpb.GenericSignedBeaconBlock { + blockToPropose := util.NewBeaconBlockDeneb() + blockToPropose.Block.Slot = 5 + blockToPropose.Block.ParentRoot = parent[:] + blk := ðpb.GenericSignedBeaconBlock_Deneb{Deneb: ðpb.SignedBeaconBlockAndBlobsDeneb{ + Block: blockToPropose, + }} + return ðpb.GenericSignedBeaconBlock{Block: blk} + }, + }, + { + name: "deneb block has blobs", + block: func(parent [32]byte) *ethpb.GenericSignedBeaconBlock { + blockToPropose := util.NewBeaconBlockDeneb() + blockToPropose.Block.Slot = 5 + blockToPropose.Block.ParentRoot = parent[:] + blk := ðpb.GenericSignedBeaconBlock_Deneb{Deneb: ðpb.SignedBeaconBlockAndBlobsDeneb{ + Block: blockToPropose, + Blobs: []*ethpb.SignedBlobSidecar{ + {Message: ðpb.BlobSidecar{Index: 0, Slot: 5, BlockParentRoot: parent[:]}}, + {Message: ðpb.BlobSidecar{Index: 1, Slot: 5, BlockParentRoot: parent[:]}}, + {Message: ðpb.BlobSidecar{Index: 2, Slot: 5, BlockParentRoot: parent[:]}}, + {Message: ðpb.BlobSidecar{Index: 3, Slot: 5, BlockParentRoot: parent[:]}}, + }, + }} + return ðpb.GenericSignedBeaconBlock{Block: blk} + }, + }, } for _, tt := range tests { @@ -690,11 +730,13 @@ func TestProposer_ProposeBlock_OK(t *testing.T) { require.NoError(t, err) c := &mock.ChainService{Root: bsRoot[:], State: beaconState} + db := dbutil.SetupDB(t) proposerServer := &Server{ BlockReceiver: c, BlockNotifier: c.BlockNotifier(), P2P: mockp2p.NewTestP2P(t), BlockBuilder: &builderTest.MockBuilderService{HasConfigured: true, PayloadCapella: emptyPayloadCapella()}, + BeaconDB: db, } blockToPropose := tt.block(bsRoot) res, err := proposerServer.ProposeBeaconBlock(context.Background(), blockToPropose) @@ -702,6 +744,15 @@ func TestProposer_ProposeBlock_OK(t *testing.T) { if res == nil || len(res.BlockRoot) == 0 { t.Error("No block root was returned") } + + if tt.name == "deneb block has blobs" { + scs, err := db.BlobSidecarsBySlot(ctx, blockToPropose.GetDeneb().Block.Block.Slot) + require.NoError(t, err) + assert.Equal(t, 4, len(scs)) + for i, sc := range scs { + require.Equal(t, uint64(i), sc.Index) + } + } }) } } From c098fd93ea6bce4834e6823e8889c56d875ce431 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Tue, 6 Jun 2023 13:55:50 -0700 Subject: [PATCH 2/2] Blob length check --- .../rpc/prysm/v1alpha1/validator/proposer.go | 6 +++- .../prysm/v1alpha1/validator/proposer_test.go | 32 ++++++++++++++++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go index f227f538c2d1..18587390ab5b 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/kv" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" @@ -253,10 +254,13 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign if !ok { return nil, status.Error(codes.Internal, "Could not cast block to Deneb") } + if len(b.Deneb.Blobs) > fieldparams.MaxBlobsPerBlock { + return nil, status.Errorf(codes.InvalidArgument, "Too many blobs in block: %d", len(b.Deneb.Blobs)) + } scs := make([]*ethpb.BlobSidecar, len(b.Deneb.Blobs)) for i, blob := range b.Deneb.Blobs { if err := vs.P2P.BroadcastBlob(ctx, blob.Message.Index, blob); err != nil { - log.WithError(err).Error("Could not broadcast blob") + log.WithError(err).Errorf("Could not broadcast blob index %d / %d", i, len(b.Deneb.Blobs)) } scs[i] = blob.Message } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go index 7f3df43aab75..5a54c3f9733e 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go @@ -631,6 +631,7 @@ func TestProposer_ProposeBlock_OK(t *testing.T) { tests := []struct { name string block func([32]byte) *ethpb.GenericSignedBeaconBlock + err string }{ { name: "phase0", @@ -718,6 +719,26 @@ func TestProposer_ProposeBlock_OK(t *testing.T) { return ðpb.GenericSignedBeaconBlock{Block: blk} }, }, + { + name: "deneb block has too many blobs", + err: "Too many blobs in block: 5", + block: func(parent [32]byte) *ethpb.GenericSignedBeaconBlock { + blockToPropose := util.NewBeaconBlockDeneb() + blockToPropose.Block.Slot = 5 + blockToPropose.Block.ParentRoot = parent[:] + blk := ðpb.GenericSignedBeaconBlock_Deneb{Deneb: ðpb.SignedBeaconBlockAndBlobsDeneb{ + Block: blockToPropose, + Blobs: []*ethpb.SignedBlobSidecar{ + {Message: ðpb.BlobSidecar{Index: 0, Slot: 5, BlockParentRoot: parent[:]}}, + {Message: ðpb.BlobSidecar{Index: 1, Slot: 5, BlockParentRoot: parent[:]}}, + {Message: ðpb.BlobSidecar{Index: 2, Slot: 5, BlockParentRoot: parent[:]}}, + {Message: ðpb.BlobSidecar{Index: 3, Slot: 5, BlockParentRoot: parent[:]}}, + {Message: ðpb.BlobSidecar{Index: 4, Slot: 5, BlockParentRoot: parent[:]}}, + }, + }} + return ðpb.GenericSignedBeaconBlock{Block: blk} + }, + }, } for _, tt := range tests { @@ -740,11 +761,14 @@ func TestProposer_ProposeBlock_OK(t *testing.T) { } blockToPropose := tt.block(bsRoot) res, err := proposerServer.ProposeBeaconBlock(context.Background(), blockToPropose) - assert.NoError(t, err, "Could not propose block correctly") - if res == nil || len(res.BlockRoot) == 0 { - t.Error("No block root was returned") + if tt.err != "" { // Expecting an error + require.ErrorContains(t, tt.err, err) + } else { + assert.NoError(t, err, "Could not propose block correctly") + if res == nil || len(res.BlockRoot) == 0 { + t.Error("No block root was returned") + } } - if tt.name == "deneb block has blobs" { scs, err := db.BlobSidecarsBySlot(ctx, blockToPropose.GetDeneb().Block.Block.Slot) require.NoError(t, err)