Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,67 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
}
}

// BroadcastBlob broadcasts a blob to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input subnet.
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.SignedBlobSidecar) error {
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob")
defer span.End()
if blob == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the only validation we need to do before a blob sidecar is broadcasted? do we need to validate blob with kzg?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just broadcast from p2p's perspective. Validations (if there are any) are done before this

return errors.New("attempted to broadcast nil blob sidecar")
}
forkDigest, err := s.currentForkDigest()
if err != nil {
err := errors.Wrap(err, "could not retrieve fork digest")
tracing.AnnotateError(span, err)
return err
}

// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
go s.broadcastBlob(ctx, subnet, blob, forkDigest)

return nil
}

func (s *Service) broadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.SignedBlobSidecar, forkDigest [4]byte) {
_, span := trace.StartSpan(ctx, "p2p.broadcastBlob")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_, span := trace.StartSpan(ctx, "p2p.broadcastBlob")
_, span := trace.StartSpan(ctx, "p2p.broadcastBlobBackground")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed the function to broadcastBlob

defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.

oneSlot := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
ctx, cancel := context.WithTimeout(ctx, oneSlot)
defer cancel()

wrappedSubIdx := subnet + blobSubnetLockerVal
s.subnetLocker(wrappedSubIdx).RLock()
hasPeer := s.hasPeerWithSubnet(blobSubnetToTopic(subnet, forkDigest))
s.subnetLocker(wrappedSubIdx).RUnlock()

if !hasPeer {
blobSidecarCommitteeBroadcastAttempts.Inc()
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
ok, err := s.FindPeersWithSubnet(ctx, blobSubnetToTopic(subnet, forkDigest), subnet, 1)
if err != nil {
return err
}
if ok {
blobSidecarCommitteeBroadcasts.Inc()
return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil {
log.WithError(err).Error("Failed to find peers")
tracing.AnnotateError(span, err)
}
}

if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest)); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecar")
tracing.AnnotateError(span, err)
}
}

// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
Expand Down Expand Up @@ -238,3 +299,7 @@ func attestationToTopic(subnet uint64, forkDigest [4]byte) string {
func syncCommitteeToTopic(subnet uint64, forkDigest [4]byte) string {
return fmt.Sprintf(SyncCommitteeSubnetTopicFormat, forkDigest, subnet)
}

func blobSubnetToTopic(subnet uint64, forkDigest [4]byte) string {
return fmt.Sprintf(BlobSubnetTopicFormat, forkDigest, subnet)
}
74 changes: 74 additions & 0 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers/scorers"
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/wrapper"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -447,3 +448,76 @@ func TestService_BroadcastSyncCommittee(t *testing.T) {
t.Error("Failed to receive pubsub within 1s")
}
}

func TestService_BroadcastBlob(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
require.NotEqual(t, 0, len(p1.BHost.Network().Peers()), "No peers")

p := &Service{
host: p1.BHost,
pubsub: p1.PubSub(),
joinedTopics: map[string]*pubsub.Topic{},
cfg: &Config{},
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
}

blobSidecar := &ethpb.SignedBlobSidecar{
Message: &ethpb.BlobSidecar{
BlockRoot: bytesutil.PadTo([]byte{'A'}, fieldparams.RootLength),
Index: 1,
Slot: 2,
BlockParentRoot: bytesutil.PadTo([]byte{'B'}, fieldparams.RootLength),
ProposerIndex: 3,
Blob: bytesutil.PadTo([]byte{'C'}, fieldparams.BlobLength),
KzgCommitment: bytesutil.PadTo([]byte{'D'}, fieldparams.BLSPubkeyLength),
KzgProof: bytesutil.PadTo([]byte{'E'}, fieldparams.BLSPubkeyLength),
},
Signature: bytesutil.PadTo([]byte{'F'}, fieldparams.BLSSignatureLength),
}
subnet := uint64(0)

topic := BlobSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(blobSidecar)] = topic
digest, err := p.currentForkDigest()
require.NoError(t, err)
topic = fmt.Sprintf(topic, digest, subnet)

// External peer subscribes to the topic.
topic += p.Encoding().ProtocolSuffix()
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)

time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...

// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
wg.Add(1)
go func(tt *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

incomingMessage, err := sub.Next(ctx)
require.NoError(t, err)

result := &ethpb.SignedBlobSidecar{}
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
require.DeepEqual(t, result, blobSidecar)
}(t)

// Attempt to broadcast nil object should fail.
ctx := context.Background()
require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastBlob(ctx, subnet, nil))

// Broadcast to peers and wait.
require.NoError(t, p.BroadcastBlob(ctx, subnet, blobSidecar))
require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Second), "Failed to receive pubsub within 1s")
}
8 changes: 8 additions & 0 deletions beacon-chain/p2p/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,18 @@ var (
"the subnet. The beacon node increments this counter when the broadcast is blocked " +
"until a subnet peer can be found.",
})
blobSidecarCommitteeBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_blob_sidecar_committee_broadcasts",
Help: "The number of blob sidecar committee messages that were broadcast with no peer on.",
})
syncCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_sync_committee_subnet_attempted_broadcasts",
Help: "The number of sync committee that were attempted to be broadcast.",
})
blobSidecarCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_blob_sidecar_committee_attempted_broadcasts",
Help: "The number of blob sidecar committee messages that were attempted to be broadcast.",
})

// Gossip Tracer Metrics
pubsubTopicsActive = promauto.NewGaugeVec(prometheus.GaugeOpts{
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ var syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
// chosen as more than 64(attestation subnet count).
const syncLockerVal = 100

// The value used with the blob sidecar subnet, in order
// to create an appropriate key to retrieve
// the relevant lock. This is used to differentiate
// blob subnets from others. This is deliberately
// chosen more than sync and attestation subnet combined.
const blobSubnetLockerVal = 110

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then we try to connect
// with those peers. This method will block until the required amount of
Expand Down