Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func (mb *mockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ ui
return nil
}

func (mb *mockBroadcaster) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.SignedBlobSidecar) error {
mb.broadcastCalled = true
return nil
}

func (mb *mockBroadcaster) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) {
}

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Broadcaster interface {
Broadcast(context.Context, proto.Message) error
BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.SignedBlobSidecar) error
}

// SetStreamHandler configures p2p to handle streams of a certain topic ID.
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (_ *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *
return nil
}

// BroadcastBlob -- fake.
func (_ *FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.SignedBlobSidecar) error {
return nil
}

// InterceptPeerDial -- fake.
func (_ *FakeP2P) InterceptPeerDial(peer.ID) (allow bool) {
return true
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/p2p/testing/mock_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ func (m *MockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ uin
m.BroadcastCalled = true
return nil
}

// BroadcastBlob broadcasts a blob for mock.
func (m *MockBroadcaster) BroadcastBlob(context.Context, uint64, *ethpb.SignedBlobSidecar) error {
m.BroadcastCalled = true
return nil
}
6 changes: 6 additions & 0 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (p *TestP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *
return nil
}

// BroadcastBlob broadcasts a blob for mock.
func (p *TestP2P) BroadcastBlob(context.Context, uint64, *ethpb.SignedBlobSidecar) error {
p.BroadcastCalled = true
return nil
}

// SetStreamHandler for RPC.
func (p *TestP2P) SetStreamHandler(topic string, handler network.StreamHandler) {
p.BHost.SetStreamHandler(protocol.ID(topic), handler)
Expand Down
118 changes: 69 additions & 49 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/kv"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -215,11 +217,77 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
defer span.End()

blk, err := blocks.NewSignedBeaconBlock(req.Block)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", CouldNotDecodeBlock, err)
}
return vs.proposeGenericBeaconBlock(ctx, blk)

unblinder, err := newUnblinder(blk, vs.BlockBuilder)
if err != nil {
return nil, errors.Wrap(err, "could not create unblinder")
}
blk, err = unblinder.unblindBuilderBlock(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not unblind builder block")
}

// Broadcast the new block to the network.
blkPb, err := blk.Proto()
if err != nil {
return nil, errors.Wrap(err, "could not get protobuf block")
}
if err := vs.P2P.Broadcast(ctx, blkPb); err != nil {
return nil, fmt.Errorf("could not broadcast block: %v", err)
}

root, err := blk.Block().HashTreeRoot()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this check after the broadcast?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really a check, it calculates the root for logging, and we can remove the log. But it's also later used in ReceiveBlock anyway. Moving it after the broadcast to reduce latency before the gossip

if err != nil {
return nil, fmt.Errorf("could not tree hash block: %v", err)
}
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(root[:]),
}).Debug("Broadcasting block")

if blk.Version() >= version.Deneb {
b, ok := req.GetBlock().(*ethpb.GenericSignedBeaconBlock_Deneb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could still be blinded right? , it was unblinded above but the request block would still be possibly blinded. that being said if it was blinded the blobs would also need to be unblinded below.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I'm not handling the blinded logic in this PR. I'll clarify this in the title

if !ok {
return nil, status.Error(codes.Internal, "Could not cast block to Deneb")
}
if len(b.Deneb.Blobs) > fieldparams.MaxBlobsPerBlock {
return nil, status.Errorf(codes.InvalidArgument, "Too many blobs in block: %d", len(b.Deneb.Blobs))
}
scs := make([]*ethpb.BlobSidecar, len(b.Deneb.Blobs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need a validation check on len for blobs, or does receiving handle that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good check to have, I'll add it in

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for i, blob := range b.Deneb.Blobs {
if err := vs.P2P.BroadcastBlob(ctx, blob.Message.Index, blob); err != nil {
log.WithError(err).Errorf("Could not broadcast blob index %d / %d", i, len(b.Deneb.Blobs))
}
scs[i] = blob.Message
}
if len(scs) > 0 {
if err := vs.BeaconDB.SaveBlobSidecar(ctx, scs); err != nil {
return nil, err
}
}
}

// Do not block proposal critical path with debug logging or block feed updates.
defer func() {
log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf(
"Block proposal received via RPC")
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: blk},
})
}()

if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil {
return nil, fmt.Errorf("could not process beacon block: %v", err)
}

return &ethpb.ProposeResponse{
BlockRoot: root[:],
}, nil
}

// PrepareBeaconProposer caches and updates the fee recipient for the given proposer.
Expand Down Expand Up @@ -301,54 +369,6 @@ func (vs *Server) GetFeeRecipientByPubKey(ctx context.Context, request *ethpb.Fe
}, nil
}

func (vs *Server) proposeGenericBeaconBlock(ctx context.Context, blk interfaces.SignedBeaconBlock) (*ethpb.ProposeResponse, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.proposeGenericBeaconBlock")
defer span.End()
root, err := blk.Block().HashTreeRoot()
if err != nil {
return nil, fmt.Errorf("could not tree hash block: %v", err)
}

unblinder, err := newUnblinder(blk, vs.BlockBuilder)
if err != nil {
return nil, errors.Wrap(err, "could not create unblinder")
}
blk, err = unblinder.unblindBuilderBlock(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not unblind builder block")
}

// Do not block proposal critical path with debug logging or block feed updates.
defer func() {
log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf(
"Block proposal received via RPC")
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: blk},
})
}()

// Broadcast the new block to the network.
blkPb, err := blk.Proto()
if err != nil {
return nil, errors.Wrap(err, "could not get protobuf block")
}
if err := vs.P2P.Broadcast(ctx, blkPb); err != nil {
return nil, fmt.Errorf("could not broadcast block: %v", err)
}
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(root[:]),
}).Debug("Broadcasting block")

if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil {
return nil, fmt.Errorf("could not process beacon block: %v", err)
}

return &ethpb.ProposeResponse{
BlockRoot: root[:],
}, nil
}

// computeStateRoot computes the state root after a block has been processed through a state transition and
// returns it to the validator client.
func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
Expand Down
81 changes: 78 additions & 3 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ func TestProposer_ProposeBlock_OK(t *testing.T) {
tests := []struct {
name string
block func([32]byte) *ethpb.GenericSignedBeaconBlock
err string
}{
{
name: "phase0",
Expand Down Expand Up @@ -678,6 +679,66 @@ func TestProposer_ProposeBlock_OK(t *testing.T) {
return &ethpb.GenericSignedBeaconBlock{Block: blk}
},
},
{
name: "bellatrix",
block: func(parent [32]byte) *ethpb.GenericSignedBeaconBlock {
blockToPropose := util.NewBeaconBlockBellatrix()
blockToPropose.Block.Slot = 5
blockToPropose.Block.ParentRoot = parent[:]
blk := &ethpb.GenericSignedBeaconBlock_Bellatrix{Bellatrix: blockToPropose}
return &ethpb.GenericSignedBeaconBlock{Block: blk}
},
},
{
name: "deneb block no blob",
block: func(parent [32]byte) *ethpb.GenericSignedBeaconBlock {
blockToPropose := util.NewBeaconBlockDeneb()
blockToPropose.Block.Slot = 5
blockToPropose.Block.ParentRoot = parent[:]
blk := &ethpb.GenericSignedBeaconBlock_Deneb{Deneb: &ethpb.SignedBeaconBlockAndBlobsDeneb{
Block: blockToPropose,
}}
return &ethpb.GenericSignedBeaconBlock{Block: blk}
},
},
{
name: "deneb block has blobs",
block: func(parent [32]byte) *ethpb.GenericSignedBeaconBlock {
blockToPropose := util.NewBeaconBlockDeneb()
blockToPropose.Block.Slot = 5
blockToPropose.Block.ParentRoot = parent[:]
blk := &ethpb.GenericSignedBeaconBlock_Deneb{Deneb: &ethpb.SignedBeaconBlockAndBlobsDeneb{
Block: blockToPropose,
Blobs: []*ethpb.SignedBlobSidecar{
{Message: &ethpb.BlobSidecar{Index: 0, Slot: 5, BlockParentRoot: parent[:]}},
{Message: &ethpb.BlobSidecar{Index: 1, Slot: 5, BlockParentRoot: parent[:]}},
{Message: &ethpb.BlobSidecar{Index: 2, Slot: 5, BlockParentRoot: parent[:]}},
{Message: &ethpb.BlobSidecar{Index: 3, Slot: 5, BlockParentRoot: parent[:]}},
},
}}
return &ethpb.GenericSignedBeaconBlock{Block: blk}
},
},
{
name: "deneb block has too many blobs",
err: "Too many blobs in block: 5",
block: func(parent [32]byte) *ethpb.GenericSignedBeaconBlock {
blockToPropose := util.NewBeaconBlockDeneb()
blockToPropose.Block.Slot = 5
blockToPropose.Block.ParentRoot = parent[:]
blk := &ethpb.GenericSignedBeaconBlock_Deneb{Deneb: &ethpb.SignedBeaconBlockAndBlobsDeneb{
Block: blockToPropose,
Blobs: []*ethpb.SignedBlobSidecar{
{Message: &ethpb.BlobSidecar{Index: 0, Slot: 5, BlockParentRoot: parent[:]}},
{Message: &ethpb.BlobSidecar{Index: 1, Slot: 5, BlockParentRoot: parent[:]}},
{Message: &ethpb.BlobSidecar{Index: 2, Slot: 5, BlockParentRoot: parent[:]}},
{Message: &ethpb.BlobSidecar{Index: 3, Slot: 5, BlockParentRoot: parent[:]}},
{Message: &ethpb.BlobSidecar{Index: 4, Slot: 5, BlockParentRoot: parent[:]}},
},
}}
return &ethpb.GenericSignedBeaconBlock{Block: blk}
},
},
}

for _, tt := range tests {
Expand All @@ -690,17 +751,31 @@ func TestProposer_ProposeBlock_OK(t *testing.T) {
require.NoError(t, err)

c := &mock.ChainService{Root: bsRoot[:], State: beaconState}
db := dbutil.SetupDB(t)
proposerServer := &Server{
BlockReceiver: c,
BlockNotifier: c.BlockNotifier(),
P2P: mockp2p.NewTestP2P(t),
BlockBuilder: &builderTest.MockBuilderService{HasConfigured: true, PayloadCapella: emptyPayloadCapella()},
BeaconDB: db,
}
blockToPropose := tt.block(bsRoot)
res, err := proposerServer.ProposeBeaconBlock(context.Background(), blockToPropose)
assert.NoError(t, err, "Could not propose block correctly")
if res == nil || len(res.BlockRoot) == 0 {
t.Error("No block root was returned")
if tt.err != "" { // Expecting an error
require.ErrorContains(t, tt.err, err)
} else {
assert.NoError(t, err, "Could not propose block correctly")
if res == nil || len(res.BlockRoot) == 0 {
t.Error("No block root was returned")
}
}
if tt.name == "deneb block has blobs" {
scs, err := db.BlobSidecarsBySlot(ctx, blockToPropose.GetDeneb().Block.Block.Slot)
require.NoError(t, err)
assert.Equal(t, 4, len(scs))
for i, sc := range scs {
require.Equal(t, uint64(i), sc.Index)
}
}
})
}
Expand Down