Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/server/structs/endpoints_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ type BlockGossipEvent struct {
Block string `json:"block"`
}

type DataColumnGossipEvent struct {
Slot string `json:"slot"`
Index string `json:"index"`
BlockRoot string `json:"block_root"`
KzgCommitments []string `json:"kzg_commitments"`
}

type AggregatedAttEventSource struct {
Aggregate *Attestation `json:"aggregate"`
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/core/feed/operation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//async/event:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
],
)
11 changes: 11 additions & 0 deletions beacon-chain/core/feed/operation/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package operation
import (
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
)

Expand Down Expand Up @@ -39,6 +40,9 @@ const (

// BlockGossipReceived is sent after a block has been received from gossip or API that passes validation rules.
BlockGossipReceived = 10

// DataColumnReceived is sent after a data column has been seen after gossip validation rules.
DataColumnReceived = 11
)

// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
Expand Down Expand Up @@ -95,3 +99,10 @@ type BlockGossipReceivedData struct {
// SignedBlock is the block that was received.
SignedBlock interfaces.ReadOnlySignedBeaconBlock
}

type DataColumnReceivedData struct {
Slot primitives.Slot
Index uint64
BlockRoot [32]byte
KzgCommitments [][]byte
}
18 changes: 18 additions & 0 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ const (
LightClientFinalityUpdateTopic = "light_client_finality_update"
// LightClientOptimisticUpdateTopic represents a new light client optimistic update event topic.
LightClientOptimisticUpdateTopic = "light_client_optimistic_update"
// DataColumnTopic represents a data column sidecar event topic
DataColumnTopic = "data_column_sidecar"
)

var (
Expand Down Expand Up @@ -105,6 +107,7 @@ var opsFeedEventTopics = map[feed.EventType]string{
operation.AttesterSlashingReceived: AttesterSlashingTopic,
operation.ProposerSlashingReceived: ProposerSlashingTopic,
operation.BlockGossipReceived: BlockGossipTopic,
operation.DataColumnReceived: DataColumnTopic,
}

var stateFeedEventTopics = map[feed.EventType]string{
Expand Down Expand Up @@ -461,6 +464,8 @@ func topicForEvent(event *feed.Event) string {
return BlockTopic
case payloadattribute.EventData:
return PayloadAttributesTopic
case *operation.DataColumnReceivedData:
return DataColumnTopic
default:
return InvalidTopic
}
Expand Down Expand Up @@ -495,6 +500,19 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
}
return jsonMarshalReader(eventName, blk)
}, nil
case *operation.DataColumnReceivedData:
return func() io.Reader {
kzgCommitments := make([]string, len(v.KzgCommitments))
for i, kzgCommitment := range v.KzgCommitments {
kzgCommitments[i] = hexutil.Encode(kzgCommitment)
}
return jsonMarshalReader(eventName, &structs.DataColumnGossipEvent{
Slot: fmt.Sprintf("%d", v.Slot),
Index: fmt.Sprintf("%d", v.Index),
BlockRoot: hexutil.Encode(v.BlockRoot[:]),
KzgCommitments: kzgCommitments,
})
}, nil
case *operation.AggregatedAttReceivedData:
switch att := v.Attestation.AggregateVal().(type) {
case *eth.Attestation:
Expand Down
14 changes: 12 additions & 2 deletions beacon-chain/rpc/eth/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/ethereum/go-ethereum/common"
sse "github.com/r3labs/sse/v2"
"github.com/r3labs/sse/v2"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -121,6 +121,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
AttesterSlashingTopic,
ProposerSlashingTopic,
BlockGossipTopic,
DataColumnTopic,
})
require.NoError(t, err)
ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(&eth.BlobSidecar{}))
Expand Down Expand Up @@ -301,6 +302,15 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
SignedBlock: signedBlock,
},
},
{
Type: operation.DataColumnReceived,
Data: &operation.DataColumnReceivedData{
Slot: 1,
Index: 2,
BlockRoot: [32]byte{'a'},
KzgCommitments: [][]byte{{'a'}, {'b'}, {'c'}},
},
},
}
}

Expand Down Expand Up @@ -709,7 +719,7 @@ func TestStuckReaderScenarios(t *testing.T) {

func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) {
topics, events := operationEventsFixtures(t)
require.Equal(t, 11, len(events))
require.Equal(t, 12, len(events))

// set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader.
stn := mockChain.NewEventFeedWrapper()
Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/sync/validate_data_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"math"
"time"

"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
Expand Down Expand Up @@ -205,6 +207,18 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
log.WithField("slot", roDataColumn.Slot()).Warn("Failed to send data column log entry")
}

if s.cfg.operationNotifier != nil {
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.DataColumnReceived,
Data: &operation.DataColumnReceivedData{
Slot: roDataColumn.Slot(),
Index: roDataColumn.Index,
BlockRoot: roDataColumn.BlockRoot(),
KzgCommitments: bytesutil.SafeCopy2dBytes(roDataColumn.KzgCommitments),
},
})
}

return pubsub.ValidationAccept, nil
}

Expand Down
3 changes: 3 additions & 0 deletions changelog/tt_fish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- Data column support for beacon api event end point
Loading