Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EIP-7732 support (ePBS) #227

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions clients/consensus/chainspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type ChainSpec struct {
ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH" check-if-fork:"ElectraForkEpoch"`
Eip7594ForkVersion phase0.Version `yaml:"EIP7594_FORK_VERSION" check-if-fork:"Eip7594ForkEpoch"`
Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH" check-if-fork:"Eip7594ForkEpoch"`
Eip7732ForkVersion phase0.Version `yaml:"EIP7732_FORK_VERSION" check-if-fork:"Eip7732ForkEpoch"`
Eip7732ForkEpoch *uint64 `yaml:"EIP7732_FORK_EPOCH"`
SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"`
SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"`
EpochsPerHistoricalVector uint64 `yaml:"EPOCHS_PER_HISTORICAL_VECTOR"`
Expand Down Expand Up @@ -71,6 +73,11 @@ type ChainSpec struct {
DataColumnSidecarSubnetCount *uint64 `yaml:"DATA_COLUMN_SIDECAR_SUBNET_COUNT" check-if-fork:"Eip7594ForkEpoch"`
CustodyRequirement *uint64 `yaml:"CUSTODY_REQUIREMENT" check-if-fork:"Eip7594ForkEpoch"`

// EIP7732: ePBS
PtcSize uint64 `yaml:"PTC_SIZE" check-if-fork:"Eip7732ForkEpoch"`
MaxPayloadAttestations uint64 `yaml:"MAX_PAYLOAD_ATTESTATIONS" check-if-fork:"Eip7732ForkEpoch"`
DomainPtcAttester phase0.DomainType `yaml:"DOMAIN_PTC_ATTESTER" check-if-fork:"Eip7732ForkEpoch"`

// additional dora specific specs
WhiskForkEpoch *uint64
}
Expand Down
8 changes: 8 additions & 0 deletions clients/consensus/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,11 @@ func (cs *ChainState) GetValidatorChurnLimit(validatorCount uint64) uint64 {

return adaptable
}

func (cs *ChainState) IsEip7732Enabled(epoch phase0.Epoch) bool {
if cs.specs == nil {
return false
}

return cs.specs.Eip7732ForkEpoch != nil && phase0.Epoch(*cs.specs.Eip7732ForkEpoch) <= epoch
}
65 changes: 35 additions & 30 deletions clients/consensus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,37 @@ type ClientConfig struct {
}

type Client struct {
pool *Pool
clientIdx uint16
endpointConfig *ClientConfig
clientCtx context.Context
clientCtxCancel context.CancelFunc
rpcClient *rpc.BeaconClient
logger *logrus.Entry
isOnline bool
isSyncing bool
isOptimistic bool
versionStr string
nodeIdentity *rpc.NodeIdentity
clientType ClientType
lastEvent time.Time
retryCounter uint64
lastError error
headMutex sync.RWMutex
headRoot phase0.Root
headSlot phase0.Slot
justifiedRoot phase0.Root
justifiedEpoch phase0.Epoch
finalizedRoot phase0.Root
finalizedEpoch phase0.Epoch
lastFinalityUpdateEpoch phase0.Epoch
lastMetadataUpdate time.Time
lastSyncUpdateEpoch phase0.Epoch
peers []*v1.Peer
blockDispatcher Dispatcher[*v1.BlockEvent]
headDispatcher Dispatcher[*v1.HeadEvent]
checkpointDispatcher Dispatcher[*v1.Finality]
pool *Pool
clientIdx uint16
endpointConfig *ClientConfig
clientCtx context.Context
clientCtxCancel context.CancelFunc
rpcClient *rpc.BeaconClient
logger *logrus.Entry
isOnline bool
isSyncing bool
isOptimistic bool
versionStr string
nodeIdentity *rpc.NodeIdentity
clientType ClientType
lastEvent time.Time
retryCounter uint64
lastError error
headMutex sync.RWMutex
headRoot phase0.Root
headSlot phase0.Slot
justifiedRoot phase0.Root
justifiedEpoch phase0.Epoch
finalizedRoot phase0.Root
finalizedEpoch phase0.Epoch
lastFinalityUpdateEpoch phase0.Epoch
lastMetadataUpdate time.Time
lastSyncUpdateEpoch phase0.Epoch
peers []*v1.Peer
blockDispatcher Dispatcher[*v1.BlockEvent]
headDispatcher Dispatcher[*v1.HeadEvent]
checkpointDispatcher Dispatcher[*v1.Finality]
executionPayloadDispatcher Dispatcher[*v1.ExecutionPayloadEvent]
}

func (pool *Pool) newPoolClient(clientIdx uint16, endpoint *ClientConfig) (*Client, error) {
Expand Down Expand Up @@ -96,6 +97,10 @@ func (client *Client) SubscribeFinalizedEvent(capacity int) *Subscription[*v1.Fi
return client.checkpointDispatcher.Subscribe(capacity, false)
}

func (client *Client) SubscribeExecutionPayloadEvent(capacity int, blocking bool) *Subscription[*v1.ExecutionPayloadEvent] {
return client.executionPayloadDispatcher.Subscribe(capacity, blocking)
}

func (client *Client) GetPool() *Pool {
return client.pool
}
Expand Down
18 changes: 17 additions & 1 deletion clients/consensus/clientlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ func (client *Client) runClientLogic() error {
}

// start event stream
blockStream := client.rpcClient.NewBlockStream(client.clientCtx, client.logger, rpc.StreamBlockEvent|rpc.StreamHeadEvent|rpc.StreamFinalizedEvent)
blockStream := client.rpcClient.NewBlockStream(
client.clientCtx,
client.logger,
rpc.StreamBlockEvent|rpc.StreamHeadEvent|rpc.StreamFinalizedEvent|rpc.StreamExecutionPayloadEvent,
)
defer blockStream.Close()

// process events
Expand Down Expand Up @@ -162,6 +166,12 @@ func (client *Client) runClientLogic() error {
if err != nil {
client.logger.Warnf("failed processing finalized event: %v", err)
}

case rpc.StreamExecutionPayloadEvent:
err := client.processExecutionPayloadEvent(evt.Data.(*v1.ExecutionPayloadEvent))
if err != nil {
client.logger.Warnf("failed processing execution payload event: %v", err)
}
}

client.logger.Tracef("event (%v) processing time: %v ms", evt.Event, time.Since(now).Milliseconds())
Expand Down Expand Up @@ -394,3 +404,9 @@ func (client *Client) pollClientHead() error {

return nil
}

func (client *Client) processExecutionPayloadEvent(evt *v1.ExecutionPayloadEvent) error {
client.executionPayloadDispatcher.Fire(evt)

return nil
}
17 changes: 17 additions & 0 deletions clients/consensus/rpc/beaconapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/capella"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/eip7732"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/rs/zerolog"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -406,6 +407,22 @@ func (bc *BeaconClient) GetBlockBodyByBlockroot(ctx context.Context, blockroot p
return result.Data, nil
}

func (bc *BeaconClient) GetExecutionPayloadByBlockroot(ctx context.Context, blockroot phase0.Root) (*eip7732.SignedExecutionPayloadEnvelope, error) {
provider, isProvider := bc.clientSvc.(eth2client.ExecutionPayloadProvider)
if !isProvider {
return nil, fmt.Errorf("get execution payload not supported")
}

result, err := provider.SignedExecutionPayloadEnvelope(ctx, &api.SignedExecutionPayloadEnvelopeOpts{
Block: fmt.Sprintf("0x%x", blockroot),
})
if err != nil {
return nil, err
}

return result.Data, nil
}

func (bc *BeaconClient) GetState(ctx context.Context, stateRef string) (*spec.VersionedBeaconState, error) {
provider, isProvider := bc.clientSvc.(eth2client.BeaconStateProvider)
if !isProvider {
Expand Down
34 changes: 31 additions & 3 deletions clients/consensus/rpc/beaconstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
)

const (
StreamBlockEvent uint16 = 0x01
StreamHeadEvent uint16 = 0x02
StreamFinalizedEvent uint16 = 0x04
StreamBlockEvent uint16 = 0x01
StreamHeadEvent uint16 = 0x02
StreamFinalizedEvent uint16 = 0x04
StreamExecutionPayloadEvent uint16 = 0x08
)

type BeaconStreamEvent struct {
Expand Down Expand Up @@ -87,6 +88,8 @@ func (bs *BeaconStream) startStream() {
bs.processHeadEvent(evt)
case "finalized_checkpoint":
bs.processFinalizedEvent(evt)
case "execution_payload":
bs.processExecutionPayloadEvent(evt)
}
case <-stream.Ready:
bs.ReadyChan <- &BeaconStreamStatus{
Expand Down Expand Up @@ -148,6 +151,16 @@ func (bs *BeaconStream) subscribeStream(endpoint string, events uint16) *eventst
topicsCount++
}

if events&StreamExecutionPayloadEvent > 0 {
if topicsCount > 0 {
fmt.Fprintf(&topics, ",")
}

fmt.Fprintf(&topics, "execution_payload")

topicsCount++
}

if topicsCount == 0 {
return nil
}
Expand Down Expand Up @@ -225,6 +238,21 @@ func (bs *BeaconStream) processFinalizedEvent(evt eventsource.Event) {
}
}

func (bs *BeaconStream) processExecutionPayloadEvent(evt eventsource.Event) {
var parsed v1.ExecutionPayloadEvent

err := json.Unmarshal([]byte(evt.Data()), &parsed)
if err != nil {
bs.logger.Warnf("beacon block stream failed to decode execution_payload event: %v", err)
return
}

bs.EventChan <- &BeaconStreamEvent{
Event: StreamExecutionPayloadEvent,
Data: &parsed,
}
}

func getRedactedURL(requrl string) string {
var logurl string

Expand Down
15 changes: 8 additions & 7 deletions db/epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ func InsertEpoch(epoch *dbtypes.Epoch, tx *sqlx.Tx) error {
INSERT INTO epochs (
epoch, validator_count, validator_balance, eligible, voted_target, voted_head, voted_total, block_count, orphaned_count,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation, payload_count
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
ON CONFLICT (epoch) DO UPDATE SET
validator_count = excluded.validator_count,
validator_balance = excluded.validator_balance,
Expand All @@ -31,17 +31,18 @@ func InsertEpoch(epoch *dbtypes.Epoch, tx *sqlx.Tx) error {
proposer_slashing_count = excluded.proposer_slashing_count,
bls_change_count = excluded.bls_change_count,
eth_transaction_count = excluded.eth_transaction_count,
sync_participation = excluded.sync_participation`,
sync_participation = excluded.sync_participation,
payload_count = excluded.payload_count`,
dbtypes.DBEngineSqlite: `
INSERT OR REPLACE INTO epochs (
epoch, validator_count, validator_balance, eligible, voted_target, voted_head, voted_total, block_count, orphaned_count,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)`,
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation, payload_count
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)`,
}),
epoch.Epoch, epoch.ValidatorCount, epoch.ValidatorBalance, epoch.Eligible, epoch.VotedTarget, epoch.VotedHead, epoch.VotedTotal, epoch.BlockCount, epoch.OrphanedCount,
epoch.AttestationCount, epoch.DepositCount, epoch.ExitCount, epoch.WithdrawCount, epoch.WithdrawAmount, epoch.AttesterSlashingCount, epoch.ProposerSlashingCount,
epoch.BLSChangeCount, epoch.EthTransactionCount, epoch.SyncParticipation)
epoch.BLSChangeCount, epoch.EthTransactionCount, epoch.SyncParticipation, epoch.PayloadCount)
if err != nil {
return err
}
Expand All @@ -63,7 +64,7 @@ func GetEpochs(firstEpoch uint64, limit uint32) []*dbtypes.Epoch {
SELECT
epoch, validator_count, validator_balance, eligible, voted_target, voted_head, voted_total, block_count, orphaned_count,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation, payload_count
FROM epochs
WHERE epoch <= $1
ORDER BY epoch DESC
Expand Down
12 changes: 6 additions & 6 deletions db/orphaned_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ func InsertOrphanedBlock(block *dbtypes.OrphanedBlock, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO orphaned_blocks (
root, header_ver, header_ssz, block_ver, block_ssz
) VALUES ($1, $2, $3, $4, $5)
root, header_ver, header_ssz, block_ver, block_ssz, payload_ver, payload_ssz
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (root) DO NOTHING`,
dbtypes.DBEngineSqlite: `
INSERT OR IGNORE INTO orphaned_blocks (
root, header_ver, header_ssz, block_ver, block_ssz
) VALUES ($1, $2, $3, $4, $5)`,
root, header_ver, header_ssz, block_ver, block_ssz, payload_ver, payload_ssz
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
}),
block.Root, block.HeaderVer, block.HeaderSSZ, block.BlockVer, block.BlockSSZ)
block.Root, block.HeaderVer, block.HeaderSSZ, block.BlockVer, block.BlockSSZ, block.PayloadVer, block.PayloadSSZ)
if err != nil {
return err
}
Expand All @@ -27,7 +27,7 @@ func InsertOrphanedBlock(block *dbtypes.OrphanedBlock, tx *sqlx.Tx) error {
func GetOrphanedBlock(root []byte) *dbtypes.OrphanedBlock {
block := dbtypes.OrphanedBlock{}
err := ReaderDb.Get(&block, `
SELECT root, header_ver, header_ssz, block_ver, block_ssz
SELECT root, header_ver, header_ssz, block_ver, block_ssz, payload_ver, payload_ssz
FROM orphaned_blocks
WHERE root = $1
`, root)
Expand Down
29 changes: 29 additions & 0 deletions db/schema/pgsql/20250208225212_epbs-payload.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- +goose Up
-- +goose StatementBegin

ALTER TABLE public."unfinalized_blocks" ADD
"payload_ver" int NOT NULL DEFAULT 0,
"payload_ssz" bytea NULL;

ALTER TABLE public."orphaned_blocks" ADD
"payload_ver" int NOT NULL DEFAULT 0,
"payload_ssz" bytea NULL;

ALTER TABLE public."slots" ADD
"payload_status" smallint NOT NULL DEFAULT 0;

CREATE INDEX IF NOT EXISTS "slots_payload_status_idx"
ON public."slots"
("payload_status" ASC NULLS LAST);

ALTER TABLE public."epochs" ADD
"payload_count" int NOT NULL DEFAULT 0;

ALTER TABLE public."unfinalized_epochs" ADD
"payload_count" int NOT NULL DEFAULT 0;

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
22 changes: 22 additions & 0 deletions db/schema/sqlite/20250208225212_epbs-payload.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- +goose Up
-- +goose StatementBegin

ALTER TABLE "unfinalized_blocks" ADD "payload_ver" int NOT NULL DEFAULT 0;
ALTER TABLE "unfinalized_blocks" ADD "payload_ssz" BLOB NULL;

ALTER TABLE "orphaned_blocks" ADD "payload_ver" int NOT NULL DEFAULT 0;
ALTER TABLE "orphaned_blocks" ADD "payload_ssz" BLOB NULL;

ALTER TABLE "slots" ADD "payload_status" smallint NOT NULL DEFAULT 0;

CREATE INDEX IF NOT EXISTS "slots_payload_status_idx" ON "slots" ("payload_status" ASC);

ALTER TABLE "epochs" ADD "payload_count" int NOT NULL DEFAULT 0;

ALTER TABLE "unfinalized_epochs" ADD "payload_count" int NOT NULL DEFAULT 0;

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
Loading
Loading