diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index 51cba10053a8..2b0c840154f2 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -202,6 +202,67 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs } } +// BroadcastBlob broadcasts a blob to the p2p network, the message is assumed to be +// broadcasted to the current fork and to the input subnet. +func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.SignedBlobSidecar) error { + ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob") + defer span.End() + if blob == nil { + return errors.New("attempted to broadcast nil blob sidecar") + } + forkDigest, err := s.currentForkDigest() + if err != nil { + err := errors.Wrap(err, "could not retrieve fork digest") + tracing.AnnotateError(span, err) + return err + } + + // Non-blocking broadcast, with attempts to discover a subnet peer if none available. + go s.broadcastBlob(ctx, subnet, blob, forkDigest) + + return nil +} + +func (s *Service) broadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.SignedBlobSidecar, forkDigest [4]byte) { + _, span := trace.StartSpan(ctx, "p2p.broadcastBlob") + defer span.End() + ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline. + + oneSlot := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second + ctx, cancel := context.WithTimeout(ctx, oneSlot) + defer cancel() + + wrappedSubIdx := subnet + blobSubnetLockerVal + s.subnetLocker(wrappedSubIdx).RLock() + hasPeer := s.hasPeerWithSubnet(blobSubnetToTopic(subnet, forkDigest)) + s.subnetLocker(wrappedSubIdx).RUnlock() + + if !hasPeer { + blobSidecarCommitteeBroadcastAttempts.Inc() + if err := func() error { + s.subnetLocker(wrappedSubIdx).Lock() + defer s.subnetLocker(wrappedSubIdx).Unlock() + ok, err := s.FindPeersWithSubnet(ctx, blobSubnetToTopic(subnet, forkDigest), subnet, 1) + if err != nil { + return err + } + if ok { + blobSidecarCommitteeBroadcasts.Inc() + return nil + } + return errors.New("failed to find peers for subnet") + }(); err != nil { + log.WithError(err).Error("Failed to find peers") + tracing.AnnotateError(span, err) + } + } + + if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest)); err != nil { + log.WithError(err).Error("Failed to broadcast blob sidecar") + tracing.AnnotateError(span, err) + } +} + // method to broadcast messages to other peers in our gossip mesh. func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error { ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject") @@ -238,3 +299,7 @@ func attestationToTopic(subnet uint64, forkDigest [4]byte) string { func syncCommitteeToTopic(subnet uint64, forkDigest [4]byte) string { return fmt.Sprintf(SyncCommitteeSubnetTopicFormat, forkDigest, subnet) } + +func blobSubnetToTopic(subnet uint64, forkDigest [4]byte) string { + return fmt.Sprintf(BlobSubnetTopicFormat, forkDigest, subnet) +} diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 29a965f53e7c..8de1d4659069 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -17,6 +17,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers/scorers" p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -447,3 +448,76 @@ func TestService_BroadcastSyncCommittee(t *testing.T) { t.Error("Failed to receive pubsub within 1s") } } + +func TestService_BroadcastBlob(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + require.NotEqual(t, 0, len(p1.BHost.Network().Peers()), "No peers") + + p := &Service{ + host: p1.BHost, + pubsub: p1.PubSub(), + joinedTopics: map[string]*pubsub.Topic{}, + cfg: &Config{}, + genesisTime: time.Now(), + genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), + subnetsLock: make(map[uint64]*sync.RWMutex), + subnetsLockLock: sync.Mutex{}, + peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ + ScorerParams: &scorers.Config{}, + }), + } + + blobSidecar := ðpb.SignedBlobSidecar{ + Message: ðpb.BlobSidecar{ + BlockRoot: bytesutil.PadTo([]byte{'A'}, fieldparams.RootLength), + Index: 1, + Slot: 2, + BlockParentRoot: bytesutil.PadTo([]byte{'B'}, fieldparams.RootLength), + ProposerIndex: 3, + Blob: bytesutil.PadTo([]byte{'C'}, fieldparams.BlobLength), + KzgCommitment: bytesutil.PadTo([]byte{'D'}, fieldparams.BLSPubkeyLength), + KzgProof: bytesutil.PadTo([]byte{'E'}, fieldparams.BLSPubkeyLength), + }, + Signature: bytesutil.PadTo([]byte{'F'}, fieldparams.BLSSignatureLength), + } + subnet := uint64(0) + + topic := BlobSubnetTopicFormat + GossipTypeMapping[reflect.TypeOf(blobSidecar)] = topic + digest, err := p.currentForkDigest() + require.NoError(t, err) + topic = fmt.Sprintf(topic, digest, subnet) + + // External peer subscribes to the topic. + topic += p.Encoding().ProtocolSuffix() + sub, err := p2.SubscribeToTopic(topic) + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... + + // Async listen for the pubsub, must be before the broadcast. + var wg sync.WaitGroup + wg.Add(1) + go func(tt *testing.T) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + incomingMessage, err := sub.Next(ctx) + require.NoError(t, err) + + result := ðpb.SignedBlobSidecar{} + require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) + require.DeepEqual(t, result, blobSidecar) + }(t) + + // Attempt to broadcast nil object should fail. + ctx := context.Background() + require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastBlob(ctx, subnet, nil)) + + // Broadcast to peers and wait. + require.NoError(t, p.BroadcastBlob(ctx, subnet, blobSidecar)) + require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Second), "Failed to receive pubsub within 1s") +} diff --git a/beacon-chain/p2p/monitoring.go b/beacon-chain/p2p/monitoring.go index 8bd0c5d526ec..7123c9971483 100644 --- a/beacon-chain/p2p/monitoring.go +++ b/beacon-chain/p2p/monitoring.go @@ -60,10 +60,18 @@ var ( "the subnet. The beacon node increments this counter when the broadcast is blocked " + "until a subnet peer can be found.", }) + blobSidecarCommitteeBroadcasts = promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_blob_sidecar_committee_broadcasts", + Help: "The number of blob sidecar committee messages that were broadcast with no peer on.", + }) syncCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{ Name: "p2p_sync_committee_subnet_attempted_broadcasts", Help: "The number of sync committee that were attempted to be broadcast.", }) + blobSidecarCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_blob_sidecar_committee_attempted_broadcasts", + Help: "The number of blob sidecar committee messages that were attempted to be broadcast.", + }) // Gossip Tracer Metrics pubsubTopicsActive = promauto.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index bd915a1a13ba..9d9a226d6b4e 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -31,6 +31,13 @@ var syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey // chosen as more than 64(attestation subnet count). const syncLockerVal = 100 +// The value used with the blob sidecar subnet, in order +// to create an appropriate key to retrieve +// the relevant lock. This is used to differentiate +// blob subnets from others. This is deliberately +// chosen more than sync and attestation subnet combined. +const blobSubnetLockerVal = 110 + // FindPeersWithSubnet performs a network search for peers // subscribed to a particular subnet. Then we try to connect // with those peers. This method will block until the required amount of