From 2195894a5a1f7c2b0d5e1dc9460f10f7b565e86d Mon Sep 17 00:00:00 2001 From: Dmitrii Shmatko Date: Tue, 29 Apr 2025 16:02:06 +0200 Subject: [PATCH 1/4] DataColumns byRoot RPC changes with list of columns identifiers --- .../phase0/ssz_static/SszTestExecutor.java | 8 +- .../libp2p/rpc/DataColumnIdentifier.java | 65 --------- ...ataColumnSidecarsByRootRequestMessage.java | 6 +- ...umnSidecarsByRootRequestMessageSchema.java | 9 +- .../rpc/DataColumnsByRootIdentifier.java | 61 +++++++++ .../DataColumnsByRootIdentifierSchema.java | 53 ++++++++ .../util/DataColumnIdentifier.java | 24 ++++ .../util/DataColumnSlotAndIdentifier.java | 3 +- .../spec/schemas/SchemaDefinitionsFulu.java | 9 ++ .../registry/SchemaRegistryBuilder.java | 15 ++- .../spec/schemas/registry/SchemaTypes.java | 3 + .../DataColumnSidecarByRootCustody.java | 2 +- .../DataColumnSidecarByRootCustodyImpl.java | 9 +- .../DataColumnSidecarRecoveringCustody.java | 2 +- ...ataColumnSidecarRecoveringCustodyImpl.java | 2 +- .../LateInitDataColumnSidecarCustody.java | 2 +- .../log/rpc/DasByRootResponseLogger.java | 19 +-- .../datacolumns/log/rpc/DasReqRespLogger.java | 6 +- .../log/rpc/DasReqRespLoggerImpl.java | 10 +- .../LoggingBatchDataColumnsByRootReqResp.java | 4 +- .../BatchDataColumnsByRootReqResp.java | 4 +- .../retriever/DataColumnReqResp.java | 2 +- .../DataColumnReqRespBatchingImpl.java | 35 +++-- .../DataColumnReqRespBatchingImplTest.java | 124 ++++++++++++++++++ .../retriever/SimpleSidecarRetrieverTest.java | 5 +- .../datacolumns/retriever/TestPeer.java | 2 +- .../retriever/TestPeerManager.java | 2 +- .../eth2/Eth2P2PNetworkBuilder.java | 11 ++ .../eth2/peers/DataColumnPeerManagerImpl.java | 6 +- .../eth2/peers/DefaultEth2Peer.java | 4 +- .../teku/networking/eth2/peers/Eth2Peer.java | 4 +- .../eth2/peers/Eth2PeerManager.java | 5 + .../rpc/beaconchain/BeaconChainMethods.java | 11 +- ...SidecarsByRootListenerValidatingProxy.java | 20 ++- ...ataColumnSidecarsByRootMessageHandler.java | 39 ++++-- .../DataColumnSidecarsByRootValidator.java | 2 +- ...carsByRootListenerValidatingProxyTest.java | 64 ++++----- .../eth2/peers/RespondingEth2Peer.java | 4 +- .../beaconchain/BeaconChainController.java | 19 ++- 39 files changed, 493 insertions(+), 182 deletions(-) delete mode 100644 ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnIdentifier.java create mode 100644 ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnsByRootIdentifier.java create mode 100644 ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnsByRootIdentifierSchema.java create mode 100644 ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/util/DataColumnIdentifier.java create mode 100644 ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqRespBatchingImplTest.java diff --git a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/ssz_static/SszTestExecutor.java b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/ssz_static/SszTestExecutor.java index bf712c66ab9..800e7207df3 100644 --- a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/ssz_static/SszTestExecutor.java +++ b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/ssz_static/SszTestExecutor.java @@ -29,7 +29,6 @@ import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.altair.BeaconBlockBodySchemaAltair; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.operations.AttestationData; import tech.pegasys.teku.spec.datastructures.operations.Deposit; import tech.pegasys.teku.spec.datastructures.operations.DepositData; @@ -230,8 +229,11 @@ public class SszTestExecutor implements TestExecutor { // Fulu types .put( - "ssz_static/DataColumnIdentifier", - new SszTestExecutor<>(schemas -> DataColumnIdentifier.SSZ_SCHEMA)) + "ssz_static/DataColumnsByRootIdentifier", + new SszTestExecutor<>( + schemas -> + SchemaDefinitionsFulu.required(schemas) + .getDataColumnsByRootIdentifierSchema())) .put( "ssz_static/DataColumnSidecar", new SszTestExecutor<>( diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnIdentifier.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnIdentifier.java deleted file mode 100644 index b333ce6374d..00000000000 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnIdentifier.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2023 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc; - -import org.apache.tuweni.bytes.Bytes32; -import tech.pegasys.teku.infrastructure.ssz.containers.Container2; -import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2; -import tech.pegasys.teku.infrastructure.ssz.primitive.SszBytes32; -import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64; -import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas; -import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; - -public class DataColumnIdentifier extends Container2 { - - public static class DataColumnIdentifierSchema - extends ContainerSchema2 { - - private DataColumnIdentifierSchema() { - super( - "DataColumnIdentifier", - namedSchema("block_root", SszPrimitiveSchemas.BYTES32_SCHEMA), - namedSchema("index", SszPrimitiveSchemas.UINT64_SCHEMA)); - } - - @Override - public DataColumnIdentifier createFromBackingNode(final TreeNode node) { - return new DataColumnIdentifier(node); - } - } - - public static final DataColumnIdentifierSchema SSZ_SCHEMA = new DataColumnIdentifierSchema(); - - public static DataColumnIdentifier createFromSidecar(final DataColumnSidecar sidecar) { - return new DataColumnIdentifier(sidecar.getBlockRoot(), sidecar.getIndex()); - } - - private DataColumnIdentifier(final TreeNode node) { - super(SSZ_SCHEMA, node); - } - - public DataColumnIdentifier(final Bytes32 root, final UInt64 index) { - super(SSZ_SCHEMA, SszBytes32.of(root), SszUInt64.of(index)); - } - - public Bytes32 getBlockRoot() { - return getField0().get(); - } - - public UInt64 getIndex() { - return getField1().get(); - } -} diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnSidecarsByRootRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnSidecarsByRootRequestMessage.java index fff1a4df8da..87efe97d427 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnSidecarsByRootRequestMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnSidecarsByRootRequestMessage.java @@ -18,12 +18,12 @@ import tech.pegasys.teku.infrastructure.ssz.impl.SszListImpl; import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode; -public class DataColumnSidecarsByRootRequestMessage extends SszListImpl - implements SszList, RpcRequest { +public class DataColumnSidecarsByRootRequestMessage extends SszListImpl + implements SszList, RpcRequest { public DataColumnSidecarsByRootRequestMessage( final DataColumnSidecarsByRootRequestMessageSchema schema, - final List dataColumnIdentifiers) { + final List dataColumnIdentifiers) { super(schema, schema.createTreeFromElements(dataColumnIdentifiers)); } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnSidecarsByRootRequestMessageSchema.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnSidecarsByRootRequestMessageSchema.java index afd629068d8..7acee1258cc 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnSidecarsByRootRequestMessageSchema.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnSidecarsByRootRequestMessageSchema.java @@ -18,10 +18,13 @@ import tech.pegasys.teku.spec.config.SpecConfigFulu; public class DataColumnSidecarsByRootRequestMessageSchema - extends AbstractSszListSchema { + extends AbstractSszListSchema< + DataColumnsByRootIdentifier, DataColumnSidecarsByRootRequestMessage> { - public DataColumnSidecarsByRootRequestMessageSchema(final SpecConfigFulu specConfig) { - super(DataColumnIdentifier.SSZ_SCHEMA, specConfig.getMaxRequestDataColumnSidecars()); + public DataColumnSidecarsByRootRequestMessageSchema( + final SpecConfigFulu specConfig, + final DataColumnsByRootIdentifierSchema byRootIdentifierSchema) { + super(byRootIdentifierSchema, specConfig.getMaxRequestDataColumnSidecars()); } @Override diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnsByRootIdentifier.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnsByRootIdentifier.java new file mode 100644 index 00000000000..81dfd107518 --- /dev/null +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnsByRootIdentifier.java @@ -0,0 +1,61 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc; + +import java.util.List; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.ssz.collections.SszUInt64List; +import tech.pegasys.teku.infrastructure.ssz.containers.Container2; +import tech.pegasys.teku.infrastructure.ssz.primitive.SszBytes32; +import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64; +import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; + +public class DataColumnsByRootIdentifier + extends Container2 { + + public static DataColumnsByRootIdentifier createFromSidecar( + final DataColumnSidecar sidecar, final DataColumnsByRootIdentifierSchema schema) { + return new DataColumnsByRootIdentifier( + sidecar.getBlockRoot(), List.of(sidecar.getIndex()), schema); + } + + DataColumnsByRootIdentifier(final TreeNode node, final DataColumnsByRootIdentifierSchema schema) { + super(schema, node); + } + + public DataColumnsByRootIdentifier( + final Bytes32 root, final UInt64 index, final DataColumnsByRootIdentifierSchema schema) { + this(root, List.of(index), schema); + } + + public DataColumnsByRootIdentifier( + final Bytes32 root, + final List indices, + final DataColumnsByRootIdentifierSchema schema) { + super( + schema, + SszBytes32.of(root), + schema.getColumnsSchema().createFromElements(indices.stream().map(SszUInt64::of).toList())); + } + + public Bytes32 getBlockRoot() { + return getField0().get(); + } + + public List getColumns() { + return getField1().asListUnboxed(); + } +} diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnsByRootIdentifierSchema.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnsByRootIdentifierSchema.java new file mode 100644 index 00000000000..4d473465c13 --- /dev/null +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/DataColumnsByRootIdentifierSchema.java @@ -0,0 +1,53 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc; + +import java.util.List; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.ssz.collections.SszUInt64List; +import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2; +import tech.pegasys.teku.infrastructure.ssz.primitive.SszBytes32; +import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas; +import tech.pegasys.teku.infrastructure.ssz.schema.collections.SszUInt64ListSchema; +import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.config.SpecConfigFulu; + +public class DataColumnsByRootIdentifierSchema + extends ContainerSchema2 { + + public DataColumnsByRootIdentifierSchema(final SpecConfigFulu specConfig) { + super( + "DataColumnIdentifier", + namedSchema("block_root", SszPrimitiveSchemas.BYTES32_SCHEMA), + namedSchema("columns", SszUInt64ListSchema.create(specConfig.getNumberOfColumns()))); + } + + public DataColumnsByRootIdentifier create(final Bytes32 root, final UInt64 index) { + return create(root, List.of(index)); + } + + public DataColumnsByRootIdentifier create(final Bytes32 root, final List indices) { + return new DataColumnsByRootIdentifier(root, indices, this); + } + + @Override + public DataColumnsByRootIdentifier createFromBackingNode(final TreeNode node) { + return new DataColumnsByRootIdentifier(node, this); + } + + public SszUInt64ListSchema getColumnsSchema() { + return (SszUInt64ListSchema) getFieldSchema1(); + } +} diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/util/DataColumnIdentifier.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/util/DataColumnIdentifier.java new file mode 100644 index 00000000000..790188a7ec5 --- /dev/null +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/util/DataColumnIdentifier.java @@ -0,0 +1,24 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.spec.datastructures.util; + +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; + +public record DataColumnIdentifier(Bytes32 blockRoot, UInt64 columnId) { + public static DataColumnIdentifier createFromSidecar(final DataColumnSidecar sidecar) { + return new DataColumnIdentifier(sidecar.getBlockRoot(), sidecar.getIndex()); + } +} diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/util/DataColumnSlotAndIdentifier.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/util/DataColumnSlotAndIdentifier.java index 53300d6f2aa..19be52a9d36 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/util/DataColumnSlotAndIdentifier.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/util/DataColumnSlotAndIdentifier.java @@ -19,14 +19,13 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; public record DataColumnSlotAndIdentifier(UInt64 slot, Bytes32 blockRoot, UInt64 columnIndex) implements Comparable { public DataColumnSlotAndIdentifier( final UInt64 slot, final DataColumnIdentifier dataColumnIdentifier) { - this(slot, dataColumnIdentifier.getBlockRoot(), dataColumnIdentifier.getIndex()); + this(slot, dataColumnIdentifier.blockRoot(), dataColumnIdentifier.columnId()); } public static DataColumnSlotAndIdentifier fromDataColumn( diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/SchemaDefinitionsFulu.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/SchemaDefinitionsFulu.java index 4692e547714..982873331ff 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/SchemaDefinitionsFulu.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/SchemaDefinitionsFulu.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.CELL_SCHEMA; +import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.DATA_COLUMN_BY_ROOT_IDENTIFIER_SCHEMA; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.DATA_COLUMN_SCHEMA; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.DATA_COLUMN_SIDECARS_BY_RANGE_REQUEST_MESSAGE_SCHEMA; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.DATA_COLUMN_SIDECARS_BY_ROOT_REQUEST_MESSAGE_SCHEMA; @@ -33,6 +34,7 @@ import tech.pegasys.teku.spec.datastructures.builder.versions.fulu.ExecutionPayloadAndBlobsCellBundleSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRootRequestMessageSchema; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifierSchema; import tech.pegasys.teku.spec.schemas.registry.SchemaRegistry; public class SchemaDefinitionsFulu extends SchemaDefinitionsElectra { @@ -40,6 +42,7 @@ public class SchemaDefinitionsFulu extends SchemaDefinitionsElectra { private final CellSchema cellSchema; private final DataColumnSchema dataColumnSchema; private final DataColumnSidecarSchema dataColumnSidecarSchema; + private final DataColumnsByRootIdentifierSchema dataColumnsByRootIdentifierSchema; private final MatrixEntrySchema matrixEntrySchema; private final ExecutionPayloadAndBlobsCellBundleSchema executionPayloadAndBlobsCellBundleSchema; @@ -54,6 +57,8 @@ public SchemaDefinitionsFulu(final SchemaRegistry schemaRegistry) { this.cellSchema = schemaRegistry.get(CELL_SCHEMA); this.dataColumnSchema = schemaRegistry.get(DATA_COLUMN_SCHEMA); this.dataColumnSidecarSchema = schemaRegistry.get(DATA_COLUMN_SIDECAR_SCHEMA); + this.dataColumnsByRootIdentifierSchema = + schemaRegistry.get(DATA_COLUMN_BY_ROOT_IDENTIFIER_SCHEMA); this.matrixEntrySchema = schemaRegistry.get(MATRIX_ENTRY_SCHEMA); this.executionPayloadAndBlobsCellBundleSchema = schemaRegistry.get(EXECUTION_PAYLOAD_AND_BLOBS_CELL_BUNDLE_SCHEMA); @@ -91,6 +96,10 @@ public DataColumnSidecarSchema getDataColumnSidecarSchema() { return dataColumnSidecarSchema; } + public DataColumnsByRootIdentifierSchema getDataColumnsByRootIdentifierSchema() { + return dataColumnsByRootIdentifierSchema; + } + public MatrixEntrySchema getMatrixEntrySchema() { return matrixEntrySchema; } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/registry/SchemaRegistryBuilder.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/registry/SchemaRegistryBuilder.java index 2136a03891b..da0c39f4340 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/registry/SchemaRegistryBuilder.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/registry/SchemaRegistryBuilder.java @@ -43,6 +43,7 @@ import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.BUILDER_BID_SCHEMA; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.CELL_SCHEMA; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.CONSOLIDATION_REQUEST_SCHEMA; +import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.DATA_COLUMN_BY_ROOT_IDENTIFIER_SCHEMA; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.DATA_COLUMN_SCHEMA; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.DATA_COLUMN_SIDECARS_BY_RANGE_REQUEST_MESSAGE_SCHEMA; import static tech.pegasys.teku.spec.schemas.registry.SchemaTypes.DATA_COLUMN_SIDECARS_BY_ROOT_REQUEST_MESSAGE_SCHEMA; @@ -132,6 +133,7 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRootRequestMessageSchema; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifierSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.versions.altair.MetadataMessageSchemaAltair; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.versions.fulu.MetadataMessageSchemaFulu; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.versions.phase0.MetadataMessageSchemaPhase0; @@ -221,6 +223,7 @@ public static SchemaRegistryBuilder create() { .addProvider(createCellSchemaProvider()) .addProvider(createDataColumnSchemaProvider()) .addProvider(createDataColumnSidecarSchemaProvider()) + .addProvider(createDataColumnsByRootIdentifierSchemaProvider()) .addProvider(createMatrixEntrySchemaProvider()) .addProvider(createDataColumnSidecarsByRootRequestMessageSchemaProvider()) .addProvider(createDataColumnSidecarsByRangeRequestMessageSchemaProvider()) @@ -751,6 +754,15 @@ private static SchemaProvider createDataColumnSidecarSchemaProvider() { .build(); } + private static SchemaProvider createDataColumnsByRootIdentifierSchemaProvider() { + return providerBuilder(DATA_COLUMN_BY_ROOT_IDENTIFIER_SCHEMA) + .withCreator( + FULU, + (registry, specConfig, schemaName) -> + new DataColumnsByRootIdentifierSchema(SpecConfigFulu.required(specConfig))) + .build(); + } + private static SchemaProvider createMatrixEntrySchemaProvider() { return providerBuilder(MATRIX_ENTRY_SCHEMA) .withCreator( @@ -766,7 +778,8 @@ private static SchemaProvider createDataColumnSidecarsByRootRequestMessageSch FULU, (registry, specConfig, schemaName) -> new DataColumnSidecarsByRootRequestMessageSchema( - SpecConfigFulu.required(specConfig))) + SpecConfigFulu.required(specConfig), + registry.get(DATA_COLUMN_BY_ROOT_IDENTIFIER_SCHEMA))) .build(); } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/registry/SchemaTypes.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/registry/SchemaTypes.java index b8827e7b5ef..3b259702a70 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/registry/SchemaTypes.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/schemas/registry/SchemaTypes.java @@ -58,6 +58,7 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRootRequestMessageSchema; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifierSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessageSchema; import tech.pegasys.teku.spec.datastructures.operations.AggregateAndProof.AggregateAndProofSchema; import tech.pegasys.teku.spec.datastructures.operations.Attestation; @@ -191,6 +192,8 @@ public class SchemaTypes { public static final SchemaId DATA_COLUMN_SCHEMA = create("DATA_COLUMN_SCHEMA"); public static final SchemaId DATA_COLUMN_SIDECAR_SCHEMA = create("DATA_COLUMN_SIDECAR_SCHEMA"); + public static final SchemaId + DATA_COLUMN_BY_ROOT_IDENTIFIER_SCHEMA = create("DATA_COLUMN_BY_ROOT_IDENTIFIER_SCHEMA"); public static final SchemaId MATRIX_ENTRY_SCHEMA = create("MATRIX_ENTRY_SCHEMA"); public static final SchemaId diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarByRootCustody.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarByRootCustody.java index 64d35b8543d..434fe01c389 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarByRootCustody.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarByRootCustody.java @@ -17,7 +17,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.stream.AsyncStream; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; public interface DataColumnSidecarByRootCustody extends DataColumnSidecarCustody { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarByRootCustodyImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarByRootCustodyImpl.java index 794753ae405..5cb75103319 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarByRootCustodyImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarByRootCustodyImpl.java @@ -26,7 +26,7 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; import tech.pegasys.teku.storage.client.CombinedChainDataClient; @@ -99,7 +99,7 @@ private static class ColumnSlotCache { public synchronized Optional get( final DataColumnIdentifier dataColumnIdentifier) { - return Optional.ofNullable(blockRootToSlot.get(dataColumnIdentifier.getBlockRoot())) + return Optional.ofNullable(blockRootToSlot.get(dataColumnIdentifier.blockRoot())) .map(slot -> new DataColumnSlotAndIdentifier(slot, dataColumnIdentifier)); } @@ -111,13 +111,12 @@ public SafeFuture> getOrComputeAsync( .orElseGet( () -> asyncRootToSlotSupplier - .apply(dataColumnIdentifier.getBlockRoot()) + .apply(dataColumnIdentifier.blockRoot()) .thenPeek( maybeSlot -> maybeSlot.ifPresent( slot -> - addBlockRootToSlot( - dataColumnIdentifier.getBlockRoot(), slot))) + addBlockRootToSlot(dataColumnIdentifier.blockRoot(), slot))) .thenApply( maybeSlot -> maybeSlot.map( diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarRecoveringCustody.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarRecoveringCustody.java index 60513731681..86297f05c27 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarRecoveringCustody.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarRecoveringCustody.java @@ -20,7 +20,7 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarRecoveringCustodyImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarRecoveringCustodyImpl.java index 76fd0493227..40916ba934b 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarRecoveringCustodyImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarRecoveringCustodyImpl.java @@ -44,7 +44,7 @@ import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; import tech.pegasys.teku.spec.logic.versions.fulu.helpers.MiscHelpersFulu; import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/LateInitDataColumnSidecarCustody.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/LateInitDataColumnSidecarCustody.java index 15e09562c11..dfd85977d35 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/LateInitDataColumnSidecarCustody.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/LateInitDataColumnSidecarCustody.java @@ -19,7 +19,7 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasByRootResponseLogger.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasByRootResponseLogger.java index dd729749913..8d6040130d9 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasByRootResponseLogger.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasByRootResponseLogger.java @@ -20,16 +20,16 @@ import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; -class DasByRootResponseLogger extends AbstractDasResponseLogger> { +class DasByRootResponseLogger extends AbstractDasResponseLogger> { public DasByRootResponseLogger( final TimeProvider timeProvider, final Direction direction, final LoggingPeerId peerId, - final List dataColumnIdentifiers) { + final List dataColumnIdentifiers) { super(timeProvider, direction, peerId, dataColumnIdentifiers); } @@ -71,12 +71,15 @@ protected String requestToString(final List respons (s1, s2) -> s1)); final List idsWithMaybeSlot = request.stream() - .map( + .flatMap( it -> - new DataColumnSlotAndIdentifier( - blockRootToSlot.getOrDefault(it.getBlockRoot(), UNKNOWN_SLOT), - it.getBlockRoot(), - it.getIndex())) + it.getColumns().stream() + .map( + column -> + new DataColumnSlotAndIdentifier( + blockRootToSlot.getOrDefault(it.getBlockRoot(), UNKNOWN_SLOT), + it.getBlockRoot(), + column))) .toList(); return columnIdsToString(idsWithMaybeSlot); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasReqRespLogger.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasReqRespLogger.java index 3850525fcf0..57494eb3380 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasReqRespLogger.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasReqRespLogger.java @@ -17,7 +17,7 @@ import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; public interface DasReqRespLogger { @@ -30,7 +30,7 @@ static DasReqRespLogger create(final TimeProvider timeProvider) { DasReqRespLogger NOOP = new DasReqRespLogger() { @Override - public ReqRespMethodLogger, DataColumnSidecar> + public ReqRespMethodLogger, DataColumnSidecar> getDataColumnSidecarsByRootLogger() { return new NoopReqRespMethodLogger<>(); } @@ -42,7 +42,7 @@ static DasReqRespLogger create(final TimeProvider timeProvider) { } }; - ReqRespMethodLogger, DataColumnSidecar> + ReqRespMethodLogger, DataColumnSidecar> getDataColumnSidecarsByRootLogger(); ReqRespMethodLogger getDataColumnSidecarsByRangeLogger(); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasReqRespLoggerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasReqRespLoggerImpl.java index 11584797974..7a038a2ec14 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasReqRespLoggerImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/DasReqRespLoggerImpl.java @@ -16,25 +16,25 @@ import java.util.List; import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; class DasReqRespLoggerImpl implements DasReqRespLogger { private final TimeProvider timeProvider; - private final ReqRespMethodLogger, DataColumnSidecar> + private final ReqRespMethodLogger, DataColumnSidecar> byRootMethodLogger = new ReqRespMethodLogger<>() { @Override public ReqRespResponseLogger onInboundRequest( - final LoggingPeerId fromPeer, final List request) { + final LoggingPeerId fromPeer, final List request) { return new DasByRootResponseLogger( timeProvider, AbstractResponseLogger.Direction.INBOUND, fromPeer, request); } @Override public ReqRespResponseLogger onOutboundRequest( - final LoggingPeerId toPeer, final List request) { + final LoggingPeerId toPeer, final List request) { return new DasByRootResponseLogger( timeProvider, AbstractResponseLogger.Direction.OUTBOUND, toPeer, request); } @@ -62,7 +62,7 @@ public DasReqRespLoggerImpl(final TimeProvider timeProvider) { } @Override - public ReqRespMethodLogger, DataColumnSidecar> + public ReqRespMethodLogger, DataColumnSidecar> getDataColumnSidecarsByRootLogger() { return byRootMethodLogger; } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/LoggingBatchDataColumnsByRootReqResp.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/LoggingBatchDataColumnsByRootReqResp.java index 81efa91a66e..9990cb48c93 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/LoggingBatchDataColumnsByRootReqResp.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/log/rpc/LoggingBatchDataColumnsByRootReqResp.java @@ -17,7 +17,7 @@ import org.apache.tuweni.units.bigints.UInt256; import tech.pegasys.teku.infrastructure.async.stream.AsyncStream; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; import tech.pegasys.teku.statetransition.datacolumns.retriever.BatchDataColumnsByRootReqResp; public class LoggingBatchDataColumnsByRootReqResp implements BatchDataColumnsByRootReqResp { @@ -32,7 +32,7 @@ public LoggingBatchDataColumnsByRootReqResp( @Override public AsyncStream requestDataColumnSidecarsByRoot( - final UInt256 nodeId, final List columnIdentifiers) { + final UInt256 nodeId, final List columnIdentifiers) { final ReqRespResponseLogger responseLogger = logger .getDataColumnSidecarsByRootLogger() diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/BatchDataColumnsByRootReqResp.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/BatchDataColumnsByRootReqResp.java index 5003a33d30e..b47d6305ebd 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/BatchDataColumnsByRootReqResp.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/BatchDataColumnsByRootReqResp.java @@ -17,12 +17,12 @@ import org.apache.tuweni.units.bigints.UInt256; import tech.pegasys.teku.infrastructure.async.stream.AsyncStream; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; public interface BatchDataColumnsByRootReqResp { AsyncStream requestDataColumnSidecarsByRoot( - UInt256 nodeId, List columnIdentifiers); + UInt256 nodeId, List byRootIdentifiers); int getCurrentRequestLimit(UInt256 nodeId); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqResp.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqResp.java index b6f48c9fe3f..81611c8628d 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqResp.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqResp.java @@ -16,7 +16,7 @@ import org.apache.tuweni.units.bigints.UInt256; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; public interface DataColumnReqResp { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqRespBatchingImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqRespBatchingImpl.java index faffd0cf1b6..1856ceb9fb0 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqRespBatchingImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqRespBatchingImpl.java @@ -20,18 +20,26 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.units.bigints.UInt256; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.stream.AsyncStream; import tech.pegasys.teku.infrastructure.async.stream.AsyncStreamHandler; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifierSchema; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsFulu; public class DataColumnReqRespBatchingImpl implements DataColumnReqResp { private final BatchDataColumnsByRootReqResp batchRpc; + private final DataColumnsByRootIdentifierSchema byRootIdentifierSchema; - public DataColumnReqRespBatchingImpl(final BatchDataColumnsByRootReqResp batchRpc) { + public DataColumnReqRespBatchingImpl( + final BatchDataColumnsByRootReqResp batchRpc, + final SchemaDefinitionsFulu schemaDefinitionsFulu) { this.batchRpc = batchRpc; + this.byRootIdentifierSchema = schemaDefinitionsFulu.getDataColumnsByRootIdentifierSchema(); } private record RequestEntry( @@ -63,9 +71,20 @@ public void flush() { } private void flushForNode(final UInt256 nodeId, final List nodeRequests) { - AsyncStream response = - batchRpc.requestDataColumnSidecarsByRoot( - nodeId, nodeRequests.stream().map(e -> e.columnIdentifier).toList()); + final Map> byRootMap = + nodeRequests.stream() + .map(e -> e.columnIdentifier) + .collect(Collectors.groupingBy(DataColumnIdentifier::blockRoot)); + final List dataColumnsByRootIdentifiers = + byRootMap.entrySet().stream() + .map( + entry -> + byRootIdentifierSchema.create( + entry.getKey(), + entry.getValue().stream().map(DataColumnIdentifier::columnId).toList())) + .toList(); + final AsyncStream response = + batchRpc.requestDataColumnSidecarsByRoot(nodeId, dataColumnsByRootIdentifiers); response.consume( new AsyncStreamHandler<>() { @@ -76,13 +95,13 @@ private void flushForNode(final UInt256 nodeId, final List nodeReq @Override public SafeFuture onNext(final DataColumnSidecar dataColumnSidecar) { - final DataColumnIdentifier colId = + final DataColumnIdentifier dataColumnIdentifier = DataColumnIdentifier.createFromSidecar(dataColumnSidecar); - final RequestEntry request = requestsNyColumnId.get(colId); + final RequestEntry request = requestsNyColumnId.get(dataColumnIdentifier); if (request == null) { return SafeFuture.failedFuture( new IllegalArgumentException( - "Responded data column was not requested: " + colId)); + "Responded data column was not requested: " + dataColumnIdentifier)); } else { request.promise().complete(dataColumnSidecar); count.incrementAndGet(); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqRespBatchingImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqRespBatchingImplTest.java new file mode 100644 index 00000000000..eacceba87db --- /dev/null +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/retriever/DataColumnReqRespBatchingImplTest.java @@ -0,0 +1,124 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.datacolumns.retriever; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE; +import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; + +import java.util.List; +import org.apache.tuweni.bytes.Bytes32; +import org.apache.tuweni.units.bigints.UInt256; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.stream.AsyncStream; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsFulu; +import tech.pegasys.teku.spec.util.DataStructureUtil; + +public class DataColumnReqRespBatchingImplTest { + final Spec spec = TestSpecFactory.createMinimalFulu(); + final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + final BatchDataColumnsByRootReqResp batchRpc = mock(BatchDataColumnsByRootReqResp.class); + final SchemaDefinitionsFulu schemaDefinitionsFulu = + SchemaDefinitionsFulu.required(spec.forMilestone(SpecMilestone.FULU).getSchemaDefinitions()); + final DataColumnReqRespBatchingImpl dataColumnReqResp = + new DataColumnReqRespBatchingImpl(batchRpc, schemaDefinitionsFulu); + + @Test + @SuppressWarnings("JavaCase") + public void sanityCheck() { + final SignedBeaconBlockHeader blockHeader1 = dataStructureUtil.randomSignedBeaconBlockHeader(); + final SignedBeaconBlockHeader blockHeader2 = dataStructureUtil.randomSignedBeaconBlockHeader(); + final Bytes32 blockRoot1 = blockHeader1.getMessage().getRoot(); + final Bytes32 blockRoot2 = blockHeader2.getMessage().getRoot(); + final DataColumnSidecar sidecar1_0 = + dataStructureUtil.randomDataColumnSidecar(blockHeader1, ZERO); + final DataColumnSidecar sidecar1_1 = + dataStructureUtil.randomDataColumnSidecar(blockHeader1, ONE); + final DataColumnSidecar sidecar1_3 = + dataStructureUtil.randomDataColumnSidecar(blockHeader1, UInt64.valueOf(3)); + final DataColumnSidecar sidecar2_0 = + dataStructureUtil.randomDataColumnSidecar(blockHeader2, ZERO); + when(batchRpc.requestDataColumnSidecarsByRoot( + UInt256.ZERO, + List.of( + schemaDefinitionsFulu + .getDataColumnsByRootIdentifierSchema() + .create(blockRoot1, List.of(ZERO, ONE))))) + .thenReturn(AsyncStream.of(sidecar1_0, sidecar1_1)); + when(batchRpc.requestDataColumnSidecarsByRoot( + UInt256.ONE, + List.of( + schemaDefinitionsFulu + .getDataColumnsByRootIdentifierSchema() + .create(blockRoot1, List.of(UInt64.valueOf(3)))))) + .thenReturn(AsyncStream.of(sidecar1_3)); + when(batchRpc.requestDataColumnSidecarsByRoot( + UInt256.valueOf(2), + List.of( + schemaDefinitionsFulu + .getDataColumnsByRootIdentifierSchema() + .create(blockRoot2, List.of(ZERO))))) + .thenReturn(AsyncStream.of(sidecar2_0)); + final SafeFuture dataColumnSidecar1_0_Future = + dataColumnReqResp.requestDataColumnSidecar( + UInt256.ZERO, new DataColumnIdentifier(blockRoot1, ZERO)); + final SafeFuture dataColumnSidecar1_1_Future = + dataColumnReqResp.requestDataColumnSidecar( + UInt256.ZERO, new DataColumnIdentifier(blockRoot1, ONE)); + final SafeFuture dataColumnSidecar1_3_Future = + dataColumnReqResp.requestDataColumnSidecar( + UInt256.ONE, new DataColumnIdentifier(blockRoot1, UInt64.valueOf(3))); + final SafeFuture dataColumnSidecar2_0_Future = + dataColumnReqResp.requestDataColumnSidecar( + UInt256.valueOf(2), new DataColumnIdentifier(blockRoot2, ZERO)); + dataColumnReqResp.flush(); + + assertThat(dataColumnSidecar1_0_Future).isCompletedWithValue(sidecar1_0); + assertThat(dataColumnSidecar1_1_Future).isCompletedWithValue(sidecar1_1); + assertThat(dataColumnSidecar1_3_Future).isCompletedWithValue(sidecar1_3); + assertThat(dataColumnSidecar2_0_Future).isCompletedWithValue(sidecar2_0); + verify(batchRpc) + .requestDataColumnSidecarsByRoot( + UInt256.ZERO, + List.of( + schemaDefinitionsFulu + .getDataColumnsByRootIdentifierSchema() + .create(blockRoot1, List.of(ZERO, ONE)))); + verify(batchRpc) + .requestDataColumnSidecarsByRoot( + UInt256.ONE, + List.of( + schemaDefinitionsFulu + .getDataColumnsByRootIdentifierSchema() + .create(blockRoot1, List.of(UInt64.valueOf(3))))); + verify(batchRpc) + .requestDataColumnSidecarsByRoot( + UInt256.valueOf(2), + List.of( + schemaDefinitionsFulu + .getDataColumnsByRootIdentifierSchema() + .create(blockRoot2, List.of(ZERO)))); + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/retriever/SimpleSidecarRetrieverTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/retriever/SimpleSidecarRetrieverTest.java index c7690a85f10..d5f483e34d5 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/retriever/SimpleSidecarRetrieverTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/retriever/SimpleSidecarRetrieverTest.java @@ -41,6 +41,7 @@ import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; import tech.pegasys.teku.spec.logic.versions.fulu.helpers.MiscHelpersFulu; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsFulu; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.datacolumns.CanonicalBlockResolverStub; @@ -49,9 +50,11 @@ public class SimpleSidecarRetrieverTest { final StubTimeProvider stubTimeProvider = StubTimeProvider.withTimeInSeconds(0); final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner(stubTimeProvider); final DataColumnPeerSearcherStub dataColumnPeerSearcherStub = new DataColumnPeerSearcherStub(); - final TestPeerManager testPeerManager = new TestPeerManager(); final Spec spec = TestSpecFactory.createMinimalFulu(); + final SchemaDefinitionsFulu schemaDefinitionsFulu = + SchemaDefinitionsFulu.required(spec.forMilestone(SpecMilestone.FULU).getSchemaDefinitions()); + final TestPeerManager testPeerManager = new TestPeerManager(); final SpecConfigFulu config = SpecConfigFulu.required(spec.forMilestone(SpecMilestone.FULU).getConfig()); final MiscHelpersFulu miscHelpers = diff --git a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/retriever/TestPeer.java b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/retriever/TestPeer.java index 3520c36c273..23e957c298c 100644 --- a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/retriever/TestPeer.java +++ b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/retriever/TestPeer.java @@ -22,7 +22,7 @@ import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; public class TestPeer { diff --git a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/retriever/TestPeerManager.java b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/retriever/TestPeerManager.java index 0aaca3dffa0..b380316faa9 100644 --- a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/retriever/TestPeerManager.java +++ b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/retriever/TestPeerManager.java @@ -18,7 +18,7 @@ import org.apache.tuweni.units.bigints.UInt256; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; public class TestPeerManager implements DataColumnPeerManager, DataColumnReqResp { private final DataColumnPeerManagerStub dataColumnPeerManagerStub = diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java index b0b3cde5d44..107eb485d8b 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java @@ -89,6 +89,7 @@ import tech.pegasys.teku.spec.datastructures.state.Checkpoint; import tech.pegasys.teku.spec.datastructures.util.ForkAndSpecMilestone; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsSupplier; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.gossip.DasGossipLogger; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger; @@ -110,6 +111,7 @@ public class Eth2P2PNetworkBuilder { protected EventChannels eventChannels; protected CombinedChainDataClient combinedChainDataClient; protected DataColumnSidecarByRootCustody dataColumnSidecarCustody; + protected CustodyGroupCountManager custodyGroupCountManager; protected MetadataMessagesFactory metadataMessagesFactory; protected OperationProcessor gossipedBlockProcessor; protected OperationProcessor gossipedBlobSidecarProcessor; @@ -177,6 +179,7 @@ public Eth2P2PNetwork build() { asyncRunner, combinedChainDataClient, dataColumnSidecarCustody, + custodyGroupCountManager, metadataMessagesFactory, metricsSystem, attestationSubnetService, @@ -489,6 +492,7 @@ private void validate() { assertNotNull("metricsSystem", metricsSystem); assertNotNull("combinedChainDataClient", combinedChainDataClient); assertNotNull("dataColumnSidecarCustody", dataColumnSidecarCustody); + assertNotNull("custodyGroupCountManager", custodyGroupCountManager); assertNotNull("metadataMessagesFactory", metadataMessagesFactory); assertNotNull("keyValueStore", keyValueStore); assertNotNull("timeProvider", timeProvider); @@ -536,6 +540,13 @@ public Eth2P2PNetworkBuilder dataColumnSidecarCustody( return this; } + public Eth2P2PNetworkBuilder custodyGroupCountManager( + final CustodyGroupCountManager custodyGroupCountManager) { + checkNotNull(custodyGroupCountManager); + this.custodyGroupCountManager = custodyGroupCountManager; + return this; + } + public Eth2P2PNetworkBuilder metadataMessagesFactory( final MetadataMessagesFactory metadataMessagesFactory) { checkNotNull(metadataMessagesFactory); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DataColumnPeerManagerImpl.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DataColumnPeerManagerImpl.java index b8e0b85b126..344fcac7932 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DataColumnPeerManagerImpl.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DataColumnPeerManagerImpl.java @@ -23,7 +23,7 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; import tech.pegasys.teku.statetransition.datacolumns.retriever.BatchDataColumnsByRangeReqResp; import tech.pegasys.teku.statetransition.datacolumns.retriever.BatchDataColumnsByRootReqResp; import tech.pegasys.teku.statetransition.datacolumns.retriever.DataColumnPeerManager; @@ -68,7 +68,7 @@ public void banNode(final UInt256 node) { @Override public AsyncStream requestDataColumnSidecarsByRoot( - final UInt256 nodeId, final List columnIdentifiers) { + final UInt256 nodeId, final List byRootIdentifiers) { final Eth2Peer eth2Peer = connectedPeers.get(nodeId); final AsyncStreamPublisher ret = AsyncStream.createPublisher(Integer.MAX_VALUE); @@ -76,7 +76,7 @@ public AsyncStream requestDataColumnSidecarsByRoot( ret.onError(new DataColumnReqResp.DasPeerDisconnectedException()); } else { eth2Peer - .requestDataColumnSidecarsByRoot(columnIdentifiers, ret::onNext) + .requestDataColumnSidecarsByRoot(byRootIdentifiers, ret::onNext) .finish(__ -> ret.onComplete(), ret::onError); } return ret; diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java index 2836d1f9b9e..1ca829ecfeb 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java @@ -66,10 +66,10 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessageSchema; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRootRequestMessageSchema; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.GoodbyeMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.PingMessage; @@ -312,7 +312,7 @@ public SafeFuture requestBlobSidecarsByRoot( @Override public SafeFuture requestDataColumnSidecarsByRoot( - final List dataColumnIdentifiers, + final List dataColumnIdentifiers, final RpcResponseListener listener) { return rpcMethods .dataColumnSidecarsByRoot() diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java index d1f45519909..f5cc5ee60df 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java @@ -37,7 +37,7 @@ import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage; import tech.pegasys.teku.spec.datastructures.state.Checkpoint; @@ -109,7 +109,7 @@ SafeFuture requestBlobSidecarsByRoot( List blobIdentifiers, RpcResponseListener listener); SafeFuture requestDataColumnSidecarsByRoot( - List dataColumnIdentifiers, + List dataColumnIdentifiers, RpcResponseListener listener); SafeFuture> requestBlockBySlot(UInt64 slot); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java index 72356a43c77..3650e666dc7 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java @@ -45,6 +45,7 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessageSchema; import tech.pegasys.teku.spec.datastructures.state.Checkpoint; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger; import tech.pegasys.teku.storage.client.CombinedChainDataClient; @@ -76,6 +77,7 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler { final AsyncRunner asyncRunner, final CombinedChainDataClient combinedChainDataClient, final DataColumnSidecarByRootCustody dataColumnSidecarCustody, + final CustodyGroupCountManager custodyGroupCountManager, final RecentChainData recentChainData, final MetricsSystem metricsSystem, final Eth2PeerFactory eth2PeerFactory, @@ -97,6 +99,7 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler { this, combinedChainDataClient, dataColumnSidecarCustody, + custodyGroupCountManager, recentChainData, metricsSystem, statusMessageFactory, @@ -112,6 +115,7 @@ public static Eth2PeerManager create( final AsyncRunner asyncRunner, final CombinedChainDataClient combinedChainDataClient, final DataColumnSidecarByRootCustody dataColumnSidecarCustody, + final CustodyGroupCountManager custodyGroupCountManager, final MetadataMessagesFactory metadataMessagesFactory, final MetricsSystem metricsSystem, final SubnetSubscriptionService attestationSubnetService, @@ -144,6 +148,7 @@ public static Eth2PeerManager create( asyncRunner, combinedChainDataClient, dataColumnSidecarCustody, + custodyGroupCountManager, combinedChainDataClient.getRecentChainData(), metricsSystem, new Eth2PeerFactory( diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java index 30b87a27fc4..e2bc35d467f 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java @@ -66,6 +66,7 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsFulu; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger; import tech.pegasys.teku.storage.client.CombinedChainDataClient; @@ -132,6 +133,7 @@ public static BeaconChainMethods create( final PeerLookup peerLookup, final CombinedChainDataClient combinedChainDataClient, final DataColumnSidecarByRootCustody dataColumnSidecarCustody, + final CustodyGroupCountManager custodyGroupCountManager, final RecentChainData recentChainData, final MetricsSystem metricsSystem, final StatusMessageFactory statusMessageFactory, @@ -173,6 +175,7 @@ public static BeaconChainMethods create( asyncRunner, combinedChainDataClient, dataColumnSidecarCustody, + custodyGroupCountManager, peerLookup, rpcEncoding, recentChainData, @@ -382,6 +385,7 @@ private static Eth2RpcMethod createGoodBye( final AsyncRunner asyncRunner, final CombinedChainDataClient combinedChainDataClient, final DataColumnSidecarByRootCustody dataColumnSidecarCustody, + final CustodyGroupCountManager custodyGroupCountManager, final PeerLookup peerLookup, final RpcEncoding rpcEncoding, final RecentChainData recentChainData, @@ -396,7 +400,12 @@ private static Eth2RpcMethod createGoodBye( final DataColumnSidecarsByRootMessageHandler dataColumnSidecarsByRootMessageHandler = new DataColumnSidecarsByRootMessageHandler( - spec, metricsSystem, combinedChainDataClient, dataColumnSidecarCustody, dasLogger); + spec, + metricsSystem, + combinedChainDataClient, + dataColumnSidecarCustody, + custodyGroupCountManager, + dasLogger); final DataColumnSidecarsByRootRequestMessageSchema dataColumnSidecarsByRootRequestMessageSchema = SchemaDefinitionsFulu.required( diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootListenerValidatingProxy.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootListenerValidatingProxy.java index 0d57a5d5f33..45f360c4e6d 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootListenerValidatingProxy.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootListenerValidatingProxy.java @@ -22,7 +22,8 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; public class DataColumnSidecarsByRootListenerValidatingProxy extends DataColumnSidecarsByRootValidator implements RpcResponseListener { @@ -36,8 +37,21 @@ public DataColumnSidecarsByRootListenerValidatingProxy( final KZG kzg, final MetricsSystem metricsSystem, final TimeProvider timeProvider, - final List expectedDataColumnIdentifiers) { - super(peer, spec, kzg, metricsSystem, timeProvider, expectedDataColumnIdentifiers); + final List expectedByRootIdentifiers) { + super( + peer, + spec, + kzg, + metricsSystem, + timeProvider, + expectedByRootIdentifiers.stream() + .flatMap( + byRootIdentifier -> + byRootIdentifier.getColumns().stream() + .map( + column -> + new DataColumnIdentifier(byRootIdentifier.getBlockRoot(), column))) + .toList()); this.listener = listener; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootMessageHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootMessageHandler.java index 2e6348d081a..ddcbf89ff89 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootMessageHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootMessageHandler.java @@ -17,8 +17,10 @@ import com.google.common.base.Throwables; import java.nio.channels.ClosedChannelException; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -37,9 +39,10 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnSidecarsByRootRequestMessage; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.LoggingPeerId; @@ -60,6 +63,7 @@ public class DataColumnSidecarsByRootMessageHandler private final Spec spec; private final CombinedChainDataClient combinedChainDataClient; private final DataColumnSidecarByRootCustody dataColumnSidecarCustody; + private final CustodyGroupCountManager custodyGroupCountManager; private final LabelledMetric requestCounter; private final Counter totalDataColumnSidecarsRequestedCounter; @@ -70,22 +74,24 @@ public DataColumnSidecarsByRootMessageHandler( final MetricsSystem metricsSystem, final CombinedChainDataClient combinedChainDataClient, final DataColumnSidecarByRootCustody dataColumnSidecarCustody, + final CustodyGroupCountManager custodyGroupCountManager, final DasReqRespLogger dasLogger) { this.spec = spec; this.combinedChainDataClient = combinedChainDataClient; - requestCounter = + this.custodyGroupCountManager = custodyGroupCountManager; + this.dataColumnSidecarCustody = dataColumnSidecarCustody; + this.dasLogger = dasLogger; + this.requestCounter = metricsSystem.createLabelledCounter( TekuMetricCategory.NETWORK, "rpc_data_column_sidecars_by_root_requests_total", "Total number of data column sidecars by root requests received", "status"); - totalDataColumnSidecarsRequestedCounter = + this.totalDataColumnSidecarsRequestedCounter = metricsSystem.createCounter( TekuMetricCategory.NETWORK, "rpc_data_column_sidecars_by_root_requested_blob_sidecars_total", "Total number of data column sidecars requested in accepted data column sidecars by root requests from peers"); - this.dataColumnSidecarCustody = dataColumnSidecarCustody; - this.dasLogger = dasLogger; } private SafeFuture validateAndSendMaybeRespond( @@ -132,15 +138,24 @@ public void onIncomingMessage( final UInt64 finalizedEpoch = getFinalizedEpoch(); + final Set myCustodyColumns = + new HashSet<>(custodyGroupCountManager.getCustodyColumnIndices()); Stream> responseStream = message.stream() + .flatMap( + byRootIdentifier -> + byRootIdentifier.getColumns().stream() + .filter(myCustodyColumns::contains) + .map( + column -> + new DataColumnIdentifier(byRootIdentifier.getBlockRoot(), column))) .map( - identifier -> - retrieveDataColumnSidecar(identifier) + dataColumnIdentifier -> + retrieveDataColumnSidecar(dataColumnIdentifier) .thenCompose( maybeSidecar -> validateAndSendMaybeRespond( - identifier, + dataColumnIdentifier, maybeSidecar, finalizedEpoch, responseCallbackWithLogging))); @@ -167,14 +182,14 @@ public void onIncomingMessage( private SafeFuture> getNonCanonicalDataColumnSidecar( final DataColumnIdentifier identifier) { return combinedChainDataClient - .getBlockByBlockRoot(identifier.getBlockRoot()) + .getBlockByBlockRoot(identifier.blockRoot()) .thenApply(maybeBlock -> maybeBlock.map(SignedBeaconBlock::getSlot)) .thenCompose( maybeSlot -> { if (maybeSlot.isPresent()) { return combinedChainDataClient.getNonCanonicalSidecar( new DataColumnSlotAndIdentifier( - maybeSlot.get(), identifier.getBlockRoot(), identifier.getIndex())); + maybeSlot.get(), identifier.blockRoot(), identifier.columnId())); } else { return SafeFuture.completedFuture(Optional.empty()); } @@ -205,7 +220,7 @@ private SafeFuture validateMinimumRequestEpoch( .map(sidecar -> SafeFuture.completedFuture(Optional.of(sidecar.getSlot()))) .orElse( combinedChainDataClient - .getBlockByBlockRoot(identifier.getBlockRoot()) + .getBlockByBlockRoot(identifier.blockRoot()) .thenApply(maybeBlock -> maybeBlock.map(SignedBeaconBlock::getSlot))) .thenAcceptChecked( maybeSlot -> { @@ -221,7 +236,7 @@ private SafeFuture validateMinimumRequestEpoch( INVALID_REQUEST_CODE, String.format( "Block root (%s) references a block earlier than the minimum_request_epoch", - identifier.getBlockRoot())); + identifier.blockRoot())); } }); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootValidator.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootValidator.java index 69782618ae7..1c0c46b264a 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootValidator.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootValidator.java @@ -27,7 +27,7 @@ import tech.pegasys.teku.networking.p2p.peer.Peer; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.util.DataColumnIdentifier; public class DataColumnSidecarsByRootValidator extends AbstractDataColumnSidecarValidator { private final Set expectedDataColumnIdentifiers; diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootListenerValidatingProxyTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootListenerValidatingProxyTest.java index 58a27c92857..b72c0337286 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootListenerValidatingProxyTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootListenerValidatingProxyTest.java @@ -19,6 +19,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE; +import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; import java.util.List; import org.apache.tuweni.bytes.Bytes; @@ -36,12 +38,14 @@ import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlockHeader; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifierSchema; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsElectra; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsFulu; import tech.pegasys.teku.spec.util.DataStructureUtil; @@ -49,12 +53,15 @@ @SuppressWarnings("JavaCase") public class DataColumnSidecarsByRootListenerValidatingProxyTest { private final Spec spec = TestSpecFactory.createMainnetFulu(); + final DataColumnsByRootIdentifierSchema schema = + SchemaDefinitionsFulu.required(spec.forMilestone(SpecMilestone.FULU).getSchemaDefinitions()) + .getDataColumnsByRootIdentifierSchema(); private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); private DataColumnSidecarsByRootListenerValidatingProxy listenerWrapper; private final Eth2Peer peer = mock(Eth2Peer.class); private final KZG kzg = mock(KZG.class); private final MetricsSystem metricsSystem = new StubMetricsSystem(); - private final TimeProvider timeProvider = StubTimeProvider.withTimeInMillis(UInt64.ZERO); + private final TimeProvider timeProvider = StubTimeProvider.withTimeInMillis(ZERO); @SuppressWarnings("unchecked") private final RpcResponseListener listener = mock(RpcResponseListener.class); @@ -67,33 +74,31 @@ void setUp() { @Test void dataColumnSidecarsAreCorrect() { - final SignedBeaconBlock block1 = dataStructureUtil.randomSignedBeaconBlock(UInt64.ONE); + final SignedBeaconBlock block1 = dataStructureUtil.randomSignedBeaconBlock(ONE); final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(UInt64.valueOf(2)); final SignedBeaconBlock block3 = dataStructureUtil.randomSignedBeaconBlock(UInt64.valueOf(3)); final SignedBeaconBlock block4 = dataStructureUtil.randomSignedBeaconBlock(UInt64.valueOf(4)); - final List dataColumnIdentifiers = + final List dataColumnIdentifiers = List.of( - new DataColumnIdentifier(block1.getRoot(), UInt64.ZERO), - new DataColumnIdentifier(block1.getRoot(), UInt64.ONE), - new DataColumnIdentifier(block2.getRoot(), UInt64.ZERO), - new DataColumnIdentifier( - block2.getRoot(), UInt64.ONE), // will be missed, shouldn't be fatal - new DataColumnIdentifier(block3.getRoot(), UInt64.ZERO), - new DataColumnIdentifier(block4.getRoot(), UInt64.ZERO)); + schema.create(block1.getRoot(), List.of(ZERO, ONE)), + schema.create( + block2.getRoot(), List.of(ZERO, ONE)), // ONE will be missed, shouldn't be fatal + schema.create(block3.getRoot(), ZERO), + schema.create(block4.getRoot(), ZERO)); listenerWrapper = new DataColumnSidecarsByRootListenerValidatingProxy( peer, spec, listener, kzg, metricsSystem, timeProvider, dataColumnIdentifiers); final DataColumnSidecar dataColumnSidecar1_0 = - dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block1, UInt64.ZERO); + dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block1, ZERO); final DataColumnSidecar dataColumnSidecar1_1 = - dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block1, UInt64.ONE); + dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block1, ONE); final DataColumnSidecar dataColumnSidecar2_0 = - dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block2, UInt64.ZERO); + dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block2, ZERO); final DataColumnSidecar dataColumnSidecar3_0 = - dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block3, UInt64.ZERO); + dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block3, ZERO); final DataColumnSidecar dataColumnSidecar4_0 = - dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block4, UInt64.ZERO); + dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block4, ZERO); assertDoesNotThrow(() -> listenerWrapper.onResponse(dataColumnSidecar1_0).join()); assertDoesNotThrow(() -> listenerWrapper.onResponse(dataColumnSidecar1_1).join()); @@ -104,22 +109,20 @@ void dataColumnSidecarsAreCorrect() { @Test void blobSidecarIdentifierNotRequested() { - final SignedBeaconBlock block1 = dataStructureUtil.randomSignedBeaconBlock(UInt64.ONE); + final SignedBeaconBlock block1 = dataStructureUtil.randomSignedBeaconBlock(ONE); final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(UInt64.valueOf(2)); - final List dataColumnIdentifiers = - List.of( - new DataColumnIdentifier(block1.getRoot(), UInt64.ZERO), - new DataColumnIdentifier(block1.getRoot(), UInt64.ONE)); + final List dataColumnIdentifiers = + List.of(schema.create(block1.getRoot(), List.of(ZERO, ONE))); listenerWrapper = new DataColumnSidecarsByRootListenerValidatingProxy( peer, spec, listener, kzg, metricsSystem, timeProvider, dataColumnIdentifiers); final DataColumnSidecar datColumnSidecar1_0 = - dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block1, UInt64.ZERO); + dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block1, ZERO); final DataColumnSidecar datColumnSidecar1_1 = - dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block1, UInt64.ONE); + dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block1, ONE); final DataColumnSidecar datColumnSidecar2_0 = - dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block2, UInt64.ZERO); + dataStructureUtil.randomDataColumnSidecarWithInclusionProof(block2, ZERO); assertDoesNotThrow(() -> listenerWrapper.onResponse(datColumnSidecar1_0).join()); assertDoesNotThrow(() -> listenerWrapper.onResponse(datColumnSidecar1_1).join()); @@ -137,16 +140,15 @@ void blobSidecarIdentifierNotRequested() { @Test void dataColumnSidecarFailsKzgVerification() { when(kzg.verifyCellProofBatch(any(), any(), any())).thenReturn(false); - final SignedBeaconBlock block1 = dataStructureUtil.randomSignedBeaconBlock(UInt64.ONE); - final DataColumnIdentifier dataColumnIdentifier = - new DataColumnIdentifier(block1.getRoot(), UInt64.ZERO); + final SignedBeaconBlock block1 = dataStructureUtil.randomSignedBeaconBlock(ONE); + final DataColumnsByRootIdentifier dataColumnIdentifier = schema.create(block1.getRoot(), ZERO); listenerWrapper = new DataColumnSidecarsByRootListenerValidatingProxy( peer, spec, listener, kzg, metricsSystem, timeProvider, List.of(dataColumnIdentifier)); final DataColumnSidecar dataColumnSidecar = dataStructureUtil.randomDataColumnSidecarWithInclusionProof( - block1, dataColumnIdentifier.getIndex()); + block1, dataColumnIdentifier.getColumns().getFirst()); final SafeFuture result = listenerWrapper.onResponse(dataColumnSidecar); assertThat(result).isCompletedExceptionally(); @@ -167,7 +169,7 @@ void dataColumnSidecarFailsInclusionProofVerification() { SchemaDefinitionsFulu.required(schemaDefinitionsElectra) .getDataColumnSidecarSchema() .create( - UInt64.ZERO, + ZERO, SchemaDefinitionsFulu.required(schemaDefinitionsElectra) .getDataColumnSchema() .create(List.of()), @@ -195,8 +197,8 @@ void dataColumnSidecarFailsInclusionProofVerification() { "0xdb56114e00fdd4c1f85c892bf35ac9a89289aaecb1ebd0a96cde606a748b5d71"), Bytes32.fromHexString( "0x9535c3eb42aaf182b13b18aacbcbc1df6593ecafd0bf7d5e94fb727b2dc1f265"))); - final DataColumnIdentifier dataColumnIdentifier = - new DataColumnIdentifier(dataColumnSidecar.getBlockRoot(), UInt64.ZERO); + final DataColumnsByRootIdentifier dataColumnIdentifier = + schema.create(dataColumnSidecar.getBlockRoot(), ZERO); listenerWrapper = new DataColumnSidecarsByRootListenerValidatingProxy( peer, spec, listener, kzg, metricsSystem, timeProvider, List.of(dataColumnIdentifier)); diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java index 73a43e8bc46..d3b4d757f59 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java @@ -55,7 +55,7 @@ import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.DataColumnsByRootIdentifier; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage; import tech.pegasys.teku.spec.datastructures.state.Checkpoint; @@ -265,7 +265,7 @@ public SafeFuture requestBlobSidecarsByRoot( @Override public SafeFuture requestDataColumnSidecarsByRoot( - final List dataColumnIdentifiers, + final List dataColumnIdentifiers, final RpcResponseListener listener) { // TODO return SafeFuture.COMPLETE; diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 738635a7f8f..e022147f091 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -105,6 +105,7 @@ import tech.pegasys.teku.services.timer.TimerService; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.SpecVersion; import tech.pegasys.teku.spec.config.SpecConfigFulu; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; @@ -129,6 +130,7 @@ import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb; import tech.pegasys.teku.spec.logic.versions.fulu.helpers.MiscHelpersFulu; import tech.pegasys.teku.spec.networks.Eth2Network; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsFulu; import tech.pegasys.teku.statetransition.CustodyGroupCountChannel; import tech.pegasys.teku.statetransition.EpochCachePrimer; import tech.pegasys.teku.statetransition.LocalOperationAcceptedFilter; @@ -604,8 +606,8 @@ public void initAll() { initAttestationManager(); initBlockManager(); initSyncCommitteePools(); - initP2PNetwork(); initCustodyGroupCountManager(); + initP2PNetwork(); initDasCustody(); initDataColumnSidecarELRecoveryManager(); initDasSyncPreSampler(); @@ -719,8 +721,8 @@ protected void initDasCustody() { return; } LOG.info("Activating DAS Custody for FULU"); - final SpecConfigFulu specConfigFulu = - SpecConfigFulu.required(spec.forMilestone(SpecMilestone.FULU).getConfig()); + final SpecVersion specVersionFulu = spec.forMilestone(SpecMilestone.FULU); + final SpecConfigFulu specConfigFulu = SpecConfigFulu.required(specVersionFulu.getConfig()); final MinCustodyPeriodSlotCalculator minCustodyPeriodSlotCalculator = MinCustodyPeriodSlotCalculator.createFromSpec(spec); final int slotsPerEpoch = spec.getGenesisSpec().getSlotsPerEpoch(); @@ -746,13 +748,12 @@ protected void initDasCustody() { .getBlockAtSlotExact(slot) .thenApply(sbb -> sbb.flatMap(SignedBeaconBlock::getBeaconBlock)); - MiscHelpersFulu miscHelpersFulu = - MiscHelpersFulu.required(spec.forMilestone(SpecMilestone.FULU).miscHelpers()); + final MiscHelpersFulu miscHelpersFulu = MiscHelpersFulu.required(specVersionFulu.miscHelpers()); final int minCustodyGroupRequirement = specConfigFulu.getCustodyRequirement(); final int maxGroups = specConfigFulu.getNumberOfCustodyGroups(); final int totalMyCustodyGroups = - beaconConfig.p2pConfig().getTotalCustodyGroupCount(spec.forMilestone(SpecMilestone.FULU)); + beaconConfig.p2pConfig().getTotalCustodyGroupCount(specVersionFulu); eventChannels .getPublisher(CustodyGroupCountChannel.class) .onCustodyGroupCountUpdate(totalMyCustodyGroups); @@ -825,7 +826,10 @@ protected void initDasCustody() { BatchDataColumnsByRootReqResp loggingByRootReqResp = new LoggingBatchDataColumnsByRootReqResp(dasPeerManager, dasReqRespLogger); - DataColumnReqResp dasRpc = new DataColumnReqRespBatchingImpl(loggingByRootReqResp); + DataColumnReqResp dasRpc = + new DataColumnReqRespBatchingImpl( + loggingByRootReqResp, + SchemaDefinitionsFulu.required(specVersionFulu.getSchemaDefinitions())); MetadataDasPeerCustodyTracker peerCustodyTracker = new MetadataDasPeerCustodyTracker(); p2pNetwork.subscribeConnect(peerCustodyTracker); @@ -1513,6 +1517,7 @@ protected void initP2PNetwork() { .eventChannels(eventChannels) .combinedChainDataClient(combinedChainDataClient) .dataColumnSidecarCustody(dataColumnSidecarCustody) + .custodyGroupCountManager(custodyGroupCountManager) .metadataMessagesFactory(metadataMessagesFactory) .gossipedBlockProcessor(blockManager::validateAndImportBlock) .gossipedBlobSidecarProcessor(blobSidecarManager::validateAndPrepareForBlockImport) From 4942c6f821a97c38e7395e368eb590888be2e000 Mon Sep 17 00:00:00 2001 From: Dmitrii Shmatko Date: Tue, 29 Apr 2025 16:46:35 +0200 Subject: [PATCH 2/4] fix assemble --- .../pegasys/teku/networking/eth2/peers/Eth2PeerManagerTest.java | 2 ++ .../networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java | 2 ++ .../networking/eth2/rpc/core/AbstractRequestHandlerTest.java | 2 ++ .../pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java | 2 ++ 4 files changed, 8 insertions(+) diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManagerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManagerTest.java index 28009a1d531..ffbd4ac31ef 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManagerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManagerTest.java @@ -45,6 +45,7 @@ import tech.pegasys.teku.networking.p2p.peer.Peer; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger; import tech.pegasys.teku.storage.client.CombinedChainDataClient; @@ -72,6 +73,7 @@ public class Eth2PeerManagerTest { asyncRunner, combinedChainDataClient, DataColumnSidecarByRootCustody.NOOP, + CustodyGroupCountManager.NOOP, recentChainData, new NoOpMetricsSystem(), eth2PeerFactory, diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java index 05b0732ee30..746373b4303 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java @@ -35,6 +35,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.StatusMessage; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger; import tech.pegasys.teku.storage.client.CombinedChainDataClient; @@ -153,6 +154,7 @@ private BeaconChainMethods getMethods(final Spec spec) { peerLookup, combinedChainDataClient, DataColumnSidecarByRootCustody.NOOP, + CustodyGroupCountManager.NOOP, recentChainData, metricsSystem, statusMessageFactory, diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/AbstractRequestHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/AbstractRequestHandlerTest.java index 9748da2be74..03334367886 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/AbstractRequestHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/AbstractRequestHandlerTest.java @@ -37,6 +37,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger; import tech.pegasys.teku.storage.client.CombinedChainDataClient; @@ -67,6 +68,7 @@ public void setup() { peerLookup, combinedChainDataClient, DataColumnSidecarByRootCustody.NOOP, + CustodyGroupCountManager.NOOP, recentChainData, new NoOpMetricsSystem(), new StatusMessageFactory(recentChainData), diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java index c22523fbfe5..93d30b5584f 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java @@ -107,6 +107,7 @@ import tech.pegasys.teku.spec.schemas.SchemaDefinitionsSupplier; import tech.pegasys.teku.statetransition.BeaconChainUtil; import tech.pegasys.teku.statetransition.block.VerifiedBlockOperationsListener; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.gossip.DasGossipLogger; import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger; @@ -240,6 +241,7 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { asyncRunner, combinedChainDataClient, DataColumnSidecarByRootCustody.NOOP, + CustodyGroupCountManager.NOOP, new MetadataMessagesFactory(), METRICS_SYSTEM, attestationSubnetService, From 76aee9106810c13ae32f0e75aad940f11e9a4f78 Mon Sep 17 00:00:00 2001 From: Dmitrii Shmatko Date: Tue, 29 Apr 2025 18:37:31 +0200 Subject: [PATCH 3/4] fix excessive logging in debug --- .../util/DataColumnSidecarELRecoveryManagerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/util/DataColumnSidecarELRecoveryManagerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/util/DataColumnSidecarELRecoveryManagerImpl.java index 590007fe510..a5f0ab53004 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/util/DataColumnSidecarELRecoveryManagerImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/util/DataColumnSidecarELRecoveryManagerImpl.java @@ -324,7 +324,7 @@ private SafeFuture fetchMissingBlobsFromLocalEL(final SlotAndBlockRoot slo .engineGetBlobAndCellProofsList(versionedHashes, slotAndBlockRoot.getSlot()) .thenAccept( blobAndCellProofsList -> { - LOG.info("Found {} blobs", blobAndCellProofsList); + LOG.info("Found {} blobs", blobAndCellProofsList.size()); if (blobAndCellProofsList.isEmpty()) { LOG.debug( "Blobs for {} are not found on local EL, reconstruction is not possible", From 6c606ad59926b921251d6b844fbdb2539ad4fda8 Mon Sep 17 00:00:00 2001 From: Dmitrii Shmatko Date: Tue, 29 Apr 2025 18:37:51 +0200 Subject: [PATCH 4/4] fix not initialized CustodyGroupCountManager --- .../CustodyGroupCountManagerLateInit.java | 56 +++++++++++++++++++ .../beaconchain/BeaconChainController.java | 23 +++++--- 2 files changed, 70 insertions(+), 9 deletions(-) create mode 100644 ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerLateInit.java diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerLateInit.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerLateInit.java new file mode 100644 index 00000000000..56dad7ef073 --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerLateInit.java @@ -0,0 +1,56 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.datacolumns; + +import java.util.List; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; + +public class CustodyGroupCountManagerLateInit implements CustodyGroupCountManager { + private volatile CustodyGroupCountManager custodyGroupCountManager; + + public void setCustodyGroupCountManager(final CustodyGroupCountManager custodyGroupCountManager) { + this.custodyGroupCountManager = custodyGroupCountManager; + } + + @Override + public int getCustodyGroupCount() { + checkInitialized(); + return custodyGroupCountManager.getCustodyGroupCount(); + } + + @Override + public List getCustodyColumnIndices() { + checkInitialized(); + return custodyGroupCountManager.getCustodyColumnIndices(); + } + + @Override + public int getCustodyGroupSyncedCount() { + checkInitialized(); + return custodyGroupCountManager.getCustodyGroupSyncedCount(); + } + + @Override + public void setCustodyGroupSyncedCount(final int custodyGroupSyncedCount) { + checkInitialized(); + custodyGroupCountManager.setCustodyGroupSyncedCount(custodyGroupSyncedCount); + } + + private void checkInitialized() { + if (custodyGroupCountManager == null) { + throw new IllegalStateException( + "CustodyGroupCountManagerLateInit has not been initialized yet"); + } + } +} diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index e022147f091..0cace3d1111 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -153,8 +153,8 @@ import tech.pegasys.teku.statetransition.block.ReceivedBlockEventsChannel; import tech.pegasys.teku.statetransition.datacolumns.CanonicalBlockResolver; import tech.pegasys.teku.statetransition.datacolumns.CurrentSlotProvider; -import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManagerImpl; +import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManagerLateInit; import tech.pegasys.teku.statetransition.datacolumns.DasCustodySync; import tech.pegasys.teku.statetransition.datacolumns.DasLongPollCustody; import tech.pegasys.teku.statetransition.datacolumns.DasPreSampler; @@ -341,7 +341,7 @@ public class BeaconChainController extends Service implements BeaconChainControl protected volatile KZG kzg; protected volatile BlobSidecarManager blobSidecarManager; protected volatile BlobSidecarGossipValidator blobSidecarValidator; - protected volatile CustodyGroupCountManager custodyGroupCountManager; + protected volatile CustodyGroupCountManagerLateInit custodyGroupCountManagerLateInit; protected volatile DataColumnSidecarManager dataColumnSidecarManager; protected volatile LateInitDataColumnSidecarCustody dataColumnSidecarCustody = new LateInitDataColumnSidecarCustody(); @@ -606,8 +606,9 @@ public void initAll() { initAttestationManager(); initBlockManager(); initSyncCommitteePools(); - initCustodyGroupCountManager(); + preInitCustodyGroupCountManager(); initP2PNetwork(); + initCustodyGroupCountManager(); initDasCustody(); initDataColumnSidecarELRecoveryManager(); initDasSyncPreSampler(); @@ -766,7 +767,7 @@ protected void initDasCustody() { canonicalBlockResolver, dbAccessor, minCustodyPeriodSlotCalculator, - custodyGroupCountManager, + custodyGroupCountManagerLateInit, totalMyCustodyGroups); eventChannels.subscribe(SlotEventsChannel.class, dataColumnSidecarCustodyImpl); eventChannels.subscribe(FinalizedCheckpointChannel.class, dataColumnSidecarCustodyImpl); @@ -800,7 +801,7 @@ protected void initDasCustody() { eventChannels .getPublisher(DataColumnSidecarGossipChannel.class) .publishDataColumnSidecar(dataColumnSidecar, RemoteOrigin.RECOVERED), - custodyGroupCountManager, + custodyGroupCountManagerLateInit, specConfigFulu.getNumberOfColumns(), specConfigFulu.getNumberOfCustodyGroups(), slot -> Duration.ofMillis(spec.getMillisPerSlot(slot).dividedBy(3).longValue()), @@ -874,7 +875,7 @@ protected void initDasCustody() { dbAccessor, custody, recoveringSidecarRetriever, - custodyGroupCountManager); + custodyGroupCountManagerLateInit); LOG.info("DAS Basic Sampler initialized with {} groups to sample", totalMyCustodyGroups); eventChannels.subscribe(FinalizedCheckpointChannel.class, dasSampler); this.dataAvailabilitySampler = dasSampler; @@ -885,6 +886,10 @@ protected void initDasSyncPreSampler() { eventChannels.subscribe(SyncPreImportBlockChannel.class, dasPreSampler::onNewPreImportBlocks); } + protected void preInitCustodyGroupCountManager() { + this.custodyGroupCountManagerLateInit = new CustodyGroupCountManagerLateInit(); + } + protected void initCustodyGroupCountManager() { if (!spec.isMilestoneSupported(SpecMilestone.FULU)) { return; @@ -904,7 +909,7 @@ protected void initCustodyGroupCountManager() { nodeId, metricsSystem); eventChannels.subscribe(SlotEventsChannel.class, custodyGroupCountManager); - this.custodyGroupCountManager = custodyGroupCountManager; + custodyGroupCountManagerLateInit.setCustodyGroupCountManager(custodyGroupCountManager); } protected void initMergeMonitors() { @@ -971,7 +976,7 @@ protected void initDataColumnSidecarELRecoveryManager() { executionLayer, kzg, recoveredDataColumnSidecarPublisher, - custodyGroupCountManager); + custodyGroupCountManagerLateInit); eventChannels.subscribe(SlotEventsChannel.class, recoveryManager); dataColumnSidecarCustody.subscribeToValidDataColumnSidecars( recoveryManager::onNewDataColumnSidecar); @@ -1517,7 +1522,7 @@ protected void initP2PNetwork() { .eventChannels(eventChannels) .combinedChainDataClient(combinedChainDataClient) .dataColumnSidecarCustody(dataColumnSidecarCustody) - .custodyGroupCountManager(custodyGroupCountManager) + .custodyGroupCountManager(custodyGroupCountManagerLateInit) .metadataMessagesFactory(metadataMessagesFactory) .gossipedBlockProcessor(blockManager::validateAndImportBlock) .gossipedBlobSidecarProcessor(blobSidecarManager::validateAndPrepareForBlockImport)