diff --git a/data/provider/src/main/java/tech/pegasys/teku/api/blobselector/BlobSidecarSelectorFactory.java b/data/provider/src/main/java/tech/pegasys/teku/api/blobselector/BlobSidecarSelectorFactory.java index 26ecd633d5f..59dacb11089 100644 --- a/data/provider/src/main/java/tech/pegasys/teku/api/blobselector/BlobSidecarSelectorFactory.java +++ b/data/provider/src/main/java/tech/pegasys/teku/api/blobselector/BlobSidecarSelectorFactory.java @@ -178,12 +178,32 @@ private SafeFuture>> getBlobSidecarsForBlock( private SafeFuture>> getBlobSidecars( final SlotAndBlockRoot slotAndBlockRoot, final List indices) { - return client.getBlobSidecars(slotAndBlockRoot, indices).thenApply(Optional::of); + return client + .getBlobSidecars(slotAndBlockRoot, indices) + .thenCompose( + blobSidecars -> { + if (blobSidecars.isEmpty()) { + // attempt retrieving from archive (when enabled) + return client.getArchivedBlobSidecars(slotAndBlockRoot, indices); + } + return SafeFuture.completedFuture(blobSidecars); + }) + .thenApply(Optional::of); } private SafeFuture>> getBlobSidecars( final UInt64 slot, final List indices) { - return client.getBlobSidecars(slot, indices).thenApply(Optional::of); + return client + .getBlobSidecars(slot, indices) + .thenCompose( + blobSidecars -> { + if (blobSidecars.isEmpty()) { + // attempt retrieving from archive (when enabled) + return client.getArchivedBlobSidecars(slot, indices); + } + return SafeFuture.completedFuture(blobSidecars); + }) + .thenApply(Optional::of); } private Optional addMetaData( diff --git a/data/provider/src/test/java/tech/pegasys/teku/api/blobselector/BlobSidecarSelectorFactoryTest.java b/data/provider/src/test/java/tech/pegasys/teku/api/blobselector/BlobSidecarSelectorFactoryTest.java index 030618f3769..b725db051a5 100644 --- a/data/provider/src/test/java/tech/pegasys/teku/api/blobselector/BlobSidecarSelectorFactoryTest.java +++ b/data/provider/src/test/java/tech/pegasys/teku/api/blobselector/BlobSidecarSelectorFactoryTest.java @@ -119,6 +119,31 @@ public void blockRootSelector_shouldGetBlobSidecarsForFinalizedSlot() assertThat(result.get().getData()).isEqualTo(blobSidecars); } + @Test + public void blockRootSelector_shouldAttemptToGetFinalizedBlobSidecarsFromArchive() + throws ExecutionException, InterruptedException { + final UInt64 finalizedSlot = UInt64.valueOf(42); + final SignedBlockAndState blockAndState = data.randomSignedBlockAndState(100); + when(client.getChainHead()).thenReturn(Optional.of(ChainHead.create(blockAndState))); + + when(client.getFinalizedSlotByBlockRoot(block.getRoot())) + .thenReturn(SafeFuture.completedFuture(Optional.of(finalizedSlot))); + final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(finalizedSlot, block.getRoot()); + // not available in DB + when(client.getBlobSidecars(slotAndBlockRoot, indices)) + .thenReturn(SafeFuture.completedFuture(List.of())); + // available in archive + when(client.getArchivedBlobSidecars(slotAndBlockRoot, indices)) + .thenReturn(SafeFuture.completedFuture(blobSidecars)); + + final Optional result = + blobSidecarSelectorFactory + .blockRootSelector(block.getRoot()) + .getBlobSidecars(indices) + .get(); + assertThat(result.get().getData()).isEqualTo(blobSidecars); + } + @Test public void blockRootSelector_shouldGetBlobSidecarsByRetrievingBlock() throws ExecutionException, InterruptedException { @@ -153,6 +178,23 @@ public void slotSelector_shouldGetBlobSidecarsFromFinalizedSlot() assertThat(result.get().getData()).isEqualTo(blobSidecars); } + @Test + public void slotSelector_shouldAttemptToGetFinalizedBlobSidecarsFromArchive() + throws ExecutionException, InterruptedException { + when(client.isFinalized(block.getSlot())).thenReturn(true); + // not available in DB + when(client.getBlobSidecars(block.getSlot(), indices)) + .thenReturn(SafeFuture.completedFuture(List.of())); + // available in archive + when(client.getArchivedBlobSidecars(block.getSlot(), indices)) + .thenReturn(SafeFuture.completedFuture(blobSidecars)); + when(client.isChainHeadOptimistic()).thenReturn(false); + + final Optional result = + blobSidecarSelectorFactory.slotSelector(block.getSlot()).getBlobSidecars(indices).get(); + assertThat(result.get().getData()).isEqualTo(blobSidecars); + } + @Test public void slotSelector_shouldGetBlobSidecarsByRetrievingBlockWhenSlotNotFinalized() throws ExecutionException, InterruptedException { @@ -228,6 +270,11 @@ public void shouldLookForBlobSidecarsOnlyAfterDeneb( .thenReturn(SafeFuture.completedFuture(Optional.of(block))); when(client.getBlobSidecars(any(SlotAndBlockRoot.class), anyList())) .thenReturn(SafeFuture.completedFuture(List.of())); + when(client.getArchivedBlobSidecars(any(SlotAndBlockRoot.class), anyList())) + .thenReturn(SafeFuture.completedFuture(List.of())); + when(client.getArchivedBlobSidecars(any(UInt64.class), anyList())) + .thenReturn(SafeFuture.completedFuture(List.of())); + blobSidecarSelectorFactory.slotSelector(block.getSlot()).getBlobSidecars(indices).get(); if (ctx.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) { verify(client).getBlobSidecars(any(SlotAndBlockRoot.class), anyList()); @@ -273,8 +320,6 @@ public void slotSelector_whenSelectingNonFinalizedBlockMetadataReturnsFinalizedF when(client.isFinalized(block.getSlot())).thenReturn(false); when(client.getBlockAtSlotExact(block.getSlot())) .thenReturn(SafeFuture.completedFuture(Optional.of(block))); - when(client.getBlobSidecars(block.getSlot(), indices)) - .thenReturn(SafeFuture.completedFuture(blobSidecars)); when(client.isChainHeadOptimistic()).thenReturn(false); when(client.getBlobSidecars(block.getSlotAndBlockRoot(), indices)) .thenReturn(SafeFuture.completedFuture(blobSidecars)); diff --git a/infrastructure/metrics/src/main/java/tech/pegasys/teku/infrastructure/metrics/TekuMetricCategory.java b/infrastructure/metrics/src/main/java/tech/pegasys/teku/infrastructure/metrics/TekuMetricCategory.java index 2a0966c0e37..5970bf7a8ed 100644 --- a/infrastructure/metrics/src/main/java/tech/pegasys/teku/infrastructure/metrics/TekuMetricCategory.java +++ b/infrastructure/metrics/src/main/java/tech/pegasys/teku/infrastructure/metrics/TekuMetricCategory.java @@ -27,7 +27,6 @@ public enum TekuMetricCategory implements MetricCategory { STORAGE("storage"), STORAGE_HOT_DB("storage_hot"), STORAGE_FINALIZED_DB("storage_finalized"), - REMOTE_VALIDATOR("remote_validator"), VALIDATOR("validator"), VALIDATOR_PERFORMANCE("validator_performance"), VALIDATOR_DUTY("validator_duty"); diff --git a/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java b/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java index 4f74c2c0f1d..e077e8b96ec 100644 --- a/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java +++ b/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java @@ -37,9 +37,8 @@ import tech.pegasys.teku.storage.api.CombinedStorageChannel; import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel; import tech.pegasys.teku.storage.api.VoteUpdateChannel; -import tech.pegasys.teku.storage.archive.DataArchive; -import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive; -import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; +import tech.pegasys.teku.storage.archive.filesystem.FileSystemBlobSidecarsArchiver; import tech.pegasys.teku.storage.server.BatchingVoteUpdateChannel; import tech.pegasys.teku.storage.server.ChainStorage; import tech.pegasys.teku.storage.server.CombinedStorageChannelSplitter; @@ -159,11 +158,13 @@ protected SafeFuture doStart() { pruningActiveLabelledGauge); } - final DataArchive dataArchive = + final BlobSidecarsArchiver blobSidecarsArchiver = config .getBlobsArchivePath() - .map(path -> new FileSystemArchive(Path.of(path))) - .orElse(new NoopDataArchive()); + .map( + path -> + new FileSystemBlobSidecarsArchiver(config.getSpec(), Path.of(path))) + .orElse(BlobSidecarsArchiver.NOOP); if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) { blobsPruner = @@ -171,7 +172,7 @@ protected SafeFuture doStart() { new BlobSidecarPruner( config.getSpec(), database, - dataArchive, + blobSidecarsArchiver, serviceConfig.getMetricsSystem(), storagePrunerAsyncRunner, serviceConfig.getTimeProvider(), @@ -183,13 +184,16 @@ protected SafeFuture doStart() { pruningActiveLabelledGauge, config.isStoreNonCanonicalBlocksEnabled())); } - final EventChannels eventChannels = serviceConfig.getEventChannels(); chainStorage = ChainStorage.create( database, config.getSpec(), config.getDataStorageMode(), - config.getStateRebuildTimeoutSeconds()); + config.getStateRebuildTimeoutSeconds(), + blobSidecarsArchiver); + + final EventChannels eventChannels = serviceConfig.getEventChannels(); + final DepositStorage depositStorage = DepositStorage.create( eventChannels.getPublisher(Eth1EventsChannel.class), diff --git a/storage/api/src/main/java/tech/pegasys/teku/storage/api/StorageQueryChannel.java b/storage/api/src/main/java/tech/pegasys/teku/storage/api/StorageQueryChannel.java index ad6bc10ed6a..4c895e03979 100644 --- a/storage/api/src/main/java/tech/pegasys/teku/storage/api/StorageQueryChannel.java +++ b/storage/api/src/main/java/tech/pegasys/teku/storage/api/StorageQueryChannel.java @@ -56,7 +56,7 @@ public interface StorageQueryChannel extends ChannelInterface { SafeFuture> getHotBlockAndStateByBlockRoot(Bytes32 blockRoot); SafeFuture> getHotStateAndBlockSummaryByBlockRoot( - final Bytes32 blockRoot); + Bytes32 blockRoot); /** * Returns "hot" blocks - the latest finalized block or blocks that descend from the latest @@ -109,4 +109,9 @@ SafeFuture> getBlobSidecarKeys( SafeFuture> getBlobSidecarKeys( SlotAndBlockRoot slotAndBlockRoot); + + // Methods for retrieving archived blobs (when enabled) + SafeFuture> getArchivedBlobSidecars(SlotAndBlockRoot slotAndBlockRoot); + + SafeFuture> getArchivedBlobSidecars(UInt64 slot); } diff --git a/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java b/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java index 237a17b53df..fef61a7934d 100644 --- a/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java +++ b/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java @@ -82,7 +82,7 @@ import tech.pegasys.teku.storage.api.OnDiskStoreData; import tech.pegasys.teku.storage.api.StorageUpdate; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; -import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive; +import tech.pegasys.teku.storage.archive.filesystem.FileSystemBlobSidecarsArchiver; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.server.Database; import tech.pegasys.teku.storage.server.DatabaseContext; @@ -122,7 +122,7 @@ public class DatabaseTest { private StateStorageMode storageMode; private StorageSystem storageSystem; private Database database; - private FileSystemArchive fileSystemDataArchive; + private FileSystemBlobSidecarsArchiver blobSidecarsArchiver; private RecentChainData recentChainData; private UpdatableStore store; private final List storageSystems = new ArrayList<>(); @@ -139,7 +139,7 @@ private void setupWithSpec(final Spec spec) throws IOException { this.chainProperties = new ChainProperties(spec); final Path blobsArchive = Files.createTempDirectory("blobs"); tmpDirectories.add(blobsArchive.toFile()); - this.fileSystemDataArchive = new FileSystemArchive(blobsArchive); + this.blobSidecarsArchiver = new FileSystemBlobSidecarsArchiver(spec, blobsArchive); genesisBlockAndState = chainBuilder.generateGenesis(genesisTime, true); genesisCheckpoint = getCheckpointForBlock(genesisBlockAndState.getBlock()); genesisAnchor = AnchorPoint.fromGenesisState(spec, genesisBlockAndState.getState()); @@ -300,9 +300,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti List.of(blobSidecar5_0))); // let's prune with limit to 1 - assertThat( - database.pruneOldestBlobSidecars( - UInt64.MAX_VALUE, 1, fileSystemDataArchive.getBlobSidecarWriter())) + assertThat(database.pruneOldestBlobSidecars(UInt64.MAX_VALUE, 1, blobSidecarsArchiver)) .isTrue(); assertBlobSidecarKeys( blobSidecar2_0.getSlot(), @@ -324,9 +322,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).doesNotExist(); // let's prune up to slot 1 (nothing will be pruned) - assertThat( - database.pruneOldestBlobSidecars(ONE, 10, fileSystemDataArchive.getBlobSidecarWriter())) - .isFalse(); + assertThat(database.pruneOldestBlobSidecars(ONE, 10, blobSidecarsArchiver)).isFalse(); assertBlobSidecarKeys( blobSidecar2_0.getSlot(), blobSidecar5_0.getSlot(), @@ -343,9 +339,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L); // let's prune all from slot 4 excluded - assertThat( - database.pruneOldestBlobSidecars( - UInt64.valueOf(3), 10, fileSystemDataArchive.getBlobSidecarWriter())) + assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(3), 10, blobSidecarsArchiver)) .isFalse(); assertBlobSidecarKeys( blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0)); @@ -359,9 +353,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(getSlotBlobsArchiveFile(blobSidecar5_0)).doesNotExist(); // let's prune all - assertThat( - database.pruneOldestBlobSidecars( - UInt64.valueOf(5), 1, fileSystemDataArchive.getBlobSidecarWriter())) + assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(5), 1, blobSidecarsArchiver)) .isTrue(); // all empty now assertBlobSidecarKeys(ZERO, UInt64.valueOf(10)); @@ -372,8 +364,9 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(getSlotBlobsArchiveFile(blobSidecar5_0)).exists(); } - private File getSlotBlobsArchiveFile(final BlobSidecar blobSidecar) { - return fileSystemDataArchive.resolve(blobSidecar.getSlotAndBlockRoot()); + private Path getSlotBlobsArchiveFile(final BlobSidecar blobSidecar) { + return blobSidecarsArchiver.resolveArchivePath( + blobSidecar.getSlotAndBlockRoot().getBlockRoot()); } @TestTemplate @@ -473,8 +466,7 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro // Pruning with a prune limit set to 1: Only blobSidecar1 will be pruned assertThat( - database.pruneOldestNonCanonicalBlobSidecars( - UInt64.MAX_VALUE, 1, fileSystemDataArchive.getBlobSidecarWriter())) + database.pruneOldestNonCanonicalBlobSidecars(UInt64.MAX_VALUE, 1, blobSidecarsArchiver)) .isTrue(); assertNonCanonicalBlobSidecarKeys( blobSidecar2_0.getSlot(), @@ -495,9 +487,7 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).doesNotExist(); // Pruning up to slot 1: No blobs pruned - assertThat( - database.pruneOldestNonCanonicalBlobSidecars( - ONE, 10, fileSystemDataArchive.getBlobSidecarWriter())) + assertThat(database.pruneOldestNonCanonicalBlobSidecars(ONE, 10, blobSidecarsArchiver)) .isFalse(); assertNonCanonicalBlobSidecarKeys( blobSidecar2_0.getSlot(), @@ -520,7 +510,7 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro // Prune blobs up to slot 3 assertThat( database.pruneOldestNonCanonicalBlobSidecars( - UInt64.valueOf(3), 10, fileSystemDataArchive.getBlobSidecarWriter())) + UInt64.valueOf(3), 10, blobSidecarsArchiver)) .isFalse(); assertNonCanonicalBlobSidecarKeys( blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0)); @@ -535,7 +525,7 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro // Pruning all blobs assertThat( database.pruneOldestNonCanonicalBlobSidecars( - UInt64.valueOf(5), 1, fileSystemDataArchive.getBlobSidecarWriter())) + UInt64.valueOf(5), 1, blobSidecarsArchiver)) .isTrue(); // No blobs should be left assertNonCanonicalBlobSidecarKeys(ZERO, UInt64.valueOf(10)); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/BlobSidecarsArchiver.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/BlobSidecarsArchiver.java new file mode 100644 index 00000000000..5c8dffc509f --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/BlobSidecarsArchiver.java @@ -0,0 +1,46 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive; + +import java.util.List; +import java.util.Optional; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; + +public interface BlobSidecarsArchiver { + + BlobSidecarsArchiver NOOP = + new BlobSidecarsArchiver() { + @Override + public void archive( + final SlotAndBlockRoot slotAndBlockRoot, final List blobSidecars) {} + + @Override + public Optional> retrieve(final SlotAndBlockRoot slotAndBlockRoot) { + return Optional.empty(); + } + + @Override + public Optional> retrieve(final UInt64 slot) { + return Optional.empty(); + } + }; + + void archive(SlotAndBlockRoot slotAndBlockRoot, List blobSidecars); + + Optional> retrieve(SlotAndBlockRoot slotAndBlockRoot); + + Optional> retrieve(UInt64 slot); +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java deleted file mode 100644 index 201a1fd4ef0..00000000000 --- a/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2025 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.storage.archive; - -import java.io.IOException; -import java.util.List; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; - -/** - * Interface for a data archive which stores prunable BlobSidecars outside the data availability - * window and could be extended later to include other data types. It is expected that the - * DataArchive is on disk or externally stored with slow write and recovery times. Initial interface - * is write only, but may be expanded to include read operations later. - */ -public interface DataArchive { - - /** - * Returns the archive writer capable of storing BlobSidecars. - * - * @return a closeable DataArchiveWriter for writing BlobSidecars - * @throws IOException throw exception if it fails to get a writer. - */ - DataArchiveWriter> getBlobSidecarWriter() throws IOException; -} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchiveWriter.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchiveWriter.java deleted file mode 100644 index 1331365fab6..00000000000 --- a/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchiveWriter.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2025 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.storage.archive; - -import java.io.Closeable; - -/** - * An interface to allow storing data that is to be pruned from the Database. If the store function - * is successful it returns true, signalling the data can be pruned. If the store function fails, - * the data was not stored and the data should not be pruned. - * - * @param the data to be stored. - */ -public interface DataArchiveWriter extends Closeable { - boolean archive(final T data); -} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/filesystem/FileSystemBlobSidecarsArchiver.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/filesystem/FileSystemBlobSidecarsArchiver.java new file mode 100644 index 00000000000..cf43ee28080 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/filesystem/FileSystemBlobSidecarsArchiver.java @@ -0,0 +1,209 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive.filesystem; + +import static tech.pegasys.teku.infrastructure.json.types.DeserializableTypeDefinition.listOf; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.json.JsonUtil; +import tech.pegasys.teku.infrastructure.json.types.DeserializableTypeDefinition; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; + +public class FileSystemBlobSidecarsArchiver implements BlobSidecarsArchiver { + + private static final String INDEX_FILE_SUFFIX = "index.dat"; + private static final long INDEX_FILE_SLOT_RANGE_SIZE = 100_000; + + private static final Logger LOG = LogManager.getLogger(); + + private final Spec spec; + private final Path baseDirectory; + + private final Map>> + milestoneToTypeDefinitionCache = new HashMap<>(); + + public FileSystemBlobSidecarsArchiver(final Spec spec, final Path baseDirectory) { + this.spec = spec; + this.baseDirectory = baseDirectory; + } + + @Override + public void archive( + final SlotAndBlockRoot slotAndBlockRoot, final List blobSidecars) { + final Path archivePath = resolveArchivePath(slotAndBlockRoot.getBlockRoot()); + + if (Files.exists(archivePath)) { + LOG.error( + "Failed to archive blob sidecars for {}. File exists: {}", slotAndBlockRoot, archivePath); + return; + } + + try { + Files.createDirectories(archivePath.getParent()); + } catch (final IOException __) { + LOG.error( + "Failed to archive blob sidecars for {}. Could not create directories: {}", + slotAndBlockRoot, + archivePath.getParent()); + return; + } + + final Path indexFile = resolveIndexFile(slotAndBlockRoot.getSlot()); + + try (final OutputStream output = Files.newOutputStream(archivePath); + final BufferedWriter indexWriter = + Files.newBufferedWriter( + indexFile, + StandardCharsets.UTF_8, + StandardOpenOption.CREATE, + StandardOpenOption.APPEND)) { + if (blobSidecars.isEmpty()) { + // empty list + output.write("[]".getBytes(StandardCharsets.UTF_8)); + } else { + writeBlobSidecars(output, slotAndBlockRoot.getSlot(), blobSidecars); + } + indexWriter.write(formatIndexFileOutput(slotAndBlockRoot)); + indexWriter.newLine(); + } catch (final IOException ex) { + LOG.error(String.format("Failed to archive blob sidecars for %s", slotAndBlockRoot), ex); + } + } + + @Override + public Optional> retrieve(final SlotAndBlockRoot slotAndBlockRoot) { + try { + final Path archivePath = resolveArchivePath(slotAndBlockRoot.getBlockRoot()); + if (!Files.exists(archivePath)) { + return Optional.empty(); + } + final String blobSidecarsJson = Files.readString(archivePath); + final List blobSidecars = + JsonUtil.parse(blobSidecarsJson, getJsonTypeDefinition(slotAndBlockRoot.getSlot())); + return Optional.of(blobSidecars); + } catch (IOException ex) { + LOG.error( + String.format( + "Failed to retrieve blob sidecars for slot %s and block root %s", + slotAndBlockRoot.getSlot(), slotAndBlockRoot.getBlockRoot()), + ex); + return Optional.empty(); + } + } + + @Override + public Optional> retrieve(final UInt64 slot) { + final Path indexFile = resolveIndexFile(slot); + if (!Files.exists(indexFile)) { + return Optional.empty(); + } + try (final Stream lines = Files.lines(indexFile)) { + return lines + .filter(line -> line.startsWith(slot.toString())) + .findFirst() + .flatMap( + line -> { + // lines in the index file are in the format of: " " + final Bytes32 blockRoot = + Bytes32.fromHexString(Iterables.get(Splitter.on(' ').split(line), 1)); + return retrieve(new SlotAndBlockRoot(slot, blockRoot)); + }); + } catch (IOException ex) { + LOG.error(String.format("Failed to retrieve blob sidecars for slot %s", slot), ex); + return Optional.empty(); + } + } + + /** + * Given a basePath, block root, return where to store/find the BlobSidecar. Initial + * implementation uses blockRoot as a hex string in the directory of the first two characters. + * + * @param blockRoot the block root. + * @return a path of where to store or find the BlobSidecar + */ + @VisibleForTesting + public Path resolveArchivePath(final Bytes32 blockRoot) { + // For blockroot 0x1a2bcd... the directory is basePath/1a/2b/1a2bcd... + // 256 * 256 directories = 65,536. + // Assume 8000 to 10000 blobs per day. With perfect hash distribution, + // all directories have one file after a week. After 1 year, expect 50 files in each directory. + final String blockRootString = blockRoot.toUnprefixedHexString(); + final String dir1 = blockRootString.substring(0, 2); + final String dir2 = blockRootString.substring(2, 4); + final String blobSidecarFilename = + dir1 + File.separator + dir2 + File.separator + blockRootString; + return baseDirectory.resolve(blobSidecarFilename); + } + + /** + * Given a basePath, slot, return where to store/find the slot -> root index file + * + * @param slot the slot. + * @return a path of where to store or find the slot -> root index file + */ + @VisibleForTesting + Path resolveIndexFile(final UInt64 slot) { + final UInt64 lowerBound = + slot.dividedBy(INDEX_FILE_SLOT_RANGE_SIZE).times(INDEX_FILE_SLOT_RANGE_SIZE); + final UInt64 upperBound = lowerBound.plus(INDEX_FILE_SLOT_RANGE_SIZE).minusMinZero(1); + final String fileName = String.format("%s-%s_%s", lowerBound, upperBound, INDEX_FILE_SUFFIX); + return baseDirectory.resolve(fileName); + } + + private void writeBlobSidecars( + final OutputStream out, final UInt64 slot, final List blobSidecars) + throws IOException { + JsonUtil.serializeToBytes(blobSidecars, getJsonTypeDefinition(slot), out); + } + + private DeserializableTypeDefinition> getJsonTypeDefinition(final UInt64 slot) { + return milestoneToTypeDefinitionCache.computeIfAbsent( + spec.atSlot(slot).getMilestone(), + milestone -> + listOf( + SchemaDefinitionsDeneb.required(spec.forMilestone(milestone).getSchemaDefinitions()) + .getBlobSidecarSchema() + .getJsonTypeDefinition())); + } + + private String formatIndexFileOutput(final SlotAndBlockRoot slotAndBlockRoot) { + return slotAndBlockRoot.getSlot() + + " " + + slotAndBlockRoot.getBlockRoot().toUnprefixedHexString(); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriter.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriter.java deleted file mode 100644 index 2c85a310235..00000000000 --- a/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2025 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.storage.archive.fsarchive; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static tech.pegasys.teku.infrastructure.json.types.DeserializableTypeDefinition.listOf; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; -import java.util.Objects; -import tech.pegasys.teku.infrastructure.json.JsonUtil; -import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; - -public class BlobSidecarJsonWriter { - - public void writeSlotBlobSidecars(final OutputStream out, final List blobSidecars) - throws IOException { - Objects.requireNonNull(out); - Objects.requireNonNull(blobSidecars); - - // Technically not possible as pruner prunes sidecars and not slots. - if (blobSidecars.isEmpty()) { - out.write("[]".getBytes(UTF_8)); - return; - } - - final SerializableTypeDefinition> type = - listOf(blobSidecars.getFirst().getSchema().getJsonTypeDefinition()); - JsonUtil.serializeToBytes(blobSidecars, type, out); - } -} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java deleted file mode 100644 index 0839207063b..00000000000 --- a/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2025 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.storage.archive.fsarchive; - -import java.io.BufferedWriter; -import java.io.Closeable; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; -import tech.pegasys.teku.storage.archive.DataArchive; -import tech.pegasys.teku.storage.archive.DataArchiveWriter; - -/** - * A file system based implementations of the DataArchive. Writes to a directory using the - * PathResolver method to decide where to write the files. - */ -public class FileSystemArchive implements DataArchive { - static final String INDEX_FILE = "index.dat"; - private static final Logger LOG = LogManager.getLogger(); - - private final Path baseDirectory; - private final BlobSidecarJsonWriter jsonWriter; - - public FileSystemArchive(final Path baseDirectory) { - this.baseDirectory = baseDirectory; - this.jsonWriter = new BlobSidecarJsonWriter(); - } - - @Override - public DataArchiveWriter> getBlobSidecarWriter() throws IOException { - - try { - final File indexFile = baseDirectory.resolve(INDEX_FILE).toFile(); - return new FileSystemBlobSidecarWriter(indexFile); - } catch (IOException e) { - LOG.warn("Unable to create BlobSidecar archive writer", e); - throw e; - } - } - - private class FileSystemBlobSidecarWriter - implements DataArchiveWriter>, Closeable { - final BufferedWriter indexWriter; - - public FileSystemBlobSidecarWriter(final File indexFile) throws IOException { - indexWriter = - new BufferedWriter( - new OutputStreamWriter( - new FileOutputStream(indexFile, true), StandardCharsets.UTF_8)); - } - - @Override - public boolean archive(final List blobSidecars) { - if (blobSidecars == null || blobSidecars.isEmpty()) { - return true; - } - - final SlotAndBlockRoot slotAndBlockRoot = blobSidecars.getFirst().getSlotAndBlockRoot(); - final File file = resolve(slotAndBlockRoot); - if (file.exists()) { - LOG.error("Failed to write BlobSidecar. File exists: {}", file.toString()); - return false; - } - - try { - Files.createDirectories(file.toPath().getParent()); - } catch (IOException e) { - LOG.error( - "Failed to write BlobSidecar. Could not make directories to: {}", - file.getParentFile().toString()); - return false; - } - - try (FileOutputStream output = new FileOutputStream(file)) { - jsonWriter.writeSlotBlobSidecars(output, blobSidecars); - indexWriter.write(formatIndexOutput(slotAndBlockRoot)); - indexWriter.newLine(); - return true; - } catch (IOException | NullPointerException e) { - LOG.error("Failed to write BlobSidecar.", e); - return false; - } - } - - private String formatIndexOutput(final SlotAndBlockRoot slotAndBlockRoot) { - return slotAndBlockRoot.getSlot() - + " " - + slotAndBlockRoot.getBlockRoot().toUnprefixedHexString(); - } - - @Override - public void close() throws IOException { - indexWriter.flush(); - indexWriter.close(); - } - } - - /** - * Given a basePath, slot and block root, return where to store/find the BlobSidecar. Initial - * implementation uses blockRoot as a hex string in the directory of the first two characters. - * - * @param slotAndBlockRoot The slot and block root. - * @return a path of where to store or find the BlobSidecar - */ - public File resolve(final SlotAndBlockRoot slotAndBlockRoot) { - // For blockroot 0x1a2bcd... the directory is basePath/1a/2b/1a2bcd... - // 256 * 256 directories = 65,536. - // Assume 8000 to 10000 blobs per day. With perfect hash distribution, - // all directories have one file after a week. After 1 year, expect 50 files in each directory. - String blockRootString = slotAndBlockRoot.getBlockRoot().toUnprefixedHexString(); - final String dir1 = blockRootString.substring(0, 2); - final String dir2 = blockRootString.substring(2, 4); - final String blobSidecarFilename = - dir1 + File.separator + dir2 + File.separator + blockRootString; - return baseDirectory.resolve(blobSidecarFilename).toFile(); - } -} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/DataArchiveNoopWriter.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/DataArchiveNoopWriter.java deleted file mode 100644 index 00a631f5baa..00000000000 --- a/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/DataArchiveNoopWriter.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2025 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.storage.archive.nooparchive; - -import java.io.IOException; -import tech.pegasys.teku.storage.archive.DataArchiveWriter; - -public class DataArchiveNoopWriter implements DataArchiveWriter { - - @Override - public boolean archive(final T data) { - return true; - } - - @Override - public void close() throws IOException {} -} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java deleted file mode 100644 index 6de2de80585..00000000000 --- a/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2025 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.storage.archive.nooparchive; - -import java.util.List; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.storage.archive.DataArchive; -import tech.pegasys.teku.storage.archive.DataArchiveWriter; - -public class NoopDataArchive implements DataArchive { - - private static final DataArchiveWriter> BLOB_SIDECAR_WRITER = - new DataArchiveNoopWriter<>(); - - @Override - public DataArchiveWriter> getBlobSidecarWriter() { - return BLOB_SIDECAR_WRITER; - } -} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java b/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java index e4193a5cf2b..78309233c5d 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java @@ -177,15 +177,15 @@ public SafeFuture> getBlobSidecars( public SafeFuture> getBlobSidecars( final SlotAndBlockRoot slotAndBlockRoot, final List indices) { - final Optional> maybeBlobSidecars = - recentChainData.getBlobSidecars(slotAndBlockRoot); - if (maybeBlobSidecars.isPresent()) { - return SafeFuture.completedFuture(filterBlobSidecars(maybeBlobSidecars.get(), indices)); - } - return historicalChainData - .getBlobSidecarKeys(slotAndBlockRoot) - .thenApply(keys -> filterBlobSidecarKeys(keys, indices)) - .thenCompose(this::getBlobSidecars); + return recentChainData + .getBlobSidecars(slotAndBlockRoot) + .map(blobSidecars -> SafeFuture.completedFuture(filterBlobSidecars(blobSidecars, indices))) + .orElseGet( + () -> + historicalChainData + .getBlobSidecarKeys(slotAndBlockRoot) + .thenApply(keys -> filterBlobSidecarKeys(keys, indices)) + .thenCompose(this::getBlobSidecars)); } public SafeFuture> getAllBlobSidecars( @@ -193,7 +193,21 @@ public SafeFuture> getAllBlobSidecars( return historicalChainData .getAllBlobSidecarKeys(slot) .thenApply(keys -> filterBlobSidecarKeys(keys, indices)) - .thenCompose(this::getAllBlobSidecars); + .thenCompose(this::getBlobSidecars); + } + + public SafeFuture> getArchivedBlobSidecars( + final SlotAndBlockRoot slotAndBlockRoot, final List indices) { + return historicalChainData + .getArchivedBlobSidecars(slotAndBlockRoot) + .thenApply(blobSidecars -> filterBlobSidecars(blobSidecars, indices)); + } + + public SafeFuture> getArchivedBlobSidecars( + final UInt64 slot, final List indices) { + return historicalChainData + .getArchivedBlobSidecars(slot) + .thenApply(blobSidecars -> filterBlobSidecars(blobSidecars, indices)); } public SafeFuture> getStateAtSlotExact(final UInt64 slot) { @@ -514,15 +528,15 @@ public SafeFuture> getBlobSidecarByBlockRootAndIndex( public SafeFuture> getBlobSidecarByKey( final SlotAndBlockRootAndBlobIndex key) { - final Optional> maybeBlobSidecars = - recentChainData.getBlobSidecars(key.getSlotAndBlockRoot()); - if (maybeBlobSidecars.isPresent()) { - return key.getBlobIndex().isLessThan(maybeBlobSidecars.get().size()) - ? SafeFuture.completedFuture( - Optional.of(maybeBlobSidecars.get().get(key.getBlobIndex().intValue()))) - : SafeFuture.completedFuture(Optional.empty()); - } - return historicalChainData.getBlobSidecar(key); + return recentChainData + .getBlobSidecars(key.getSlotAndBlockRoot()) + .>>map( + blobSidecars -> + key.getBlobIndex().isLessThan(blobSidecars.size()) + ? SafeFuture.completedFuture( + Optional.of(blobSidecars.get(key.getBlobIndex().intValue()))) + : SafeFuture.completedFuture(Optional.empty())) + .orElseGet(() -> historicalChainData.getBlobSidecar(key)); } public SafeFuture> getBlobSidecarKeys( @@ -817,12 +831,6 @@ private SafeFuture> getBlobSidecars( .thenApply(blobSidecars -> blobSidecars.stream().flatMap(Optional::stream).toList()); } - private SafeFuture> getAllBlobSidecars( - final Stream keys) { - return SafeFuture.collectAll(keys.map(this::getAllBlobSidecarByKey)) - .thenApply(blobSidecars -> blobSidecars.stream().flatMap(Optional::stream).toList()); - } - private Optional getFinalizedCheckpoint() { return Optional.ofNullable(getStore()).map(ReadOnlyStore::getFinalizedCheckpoint); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/ChainStorage.java b/storage/src/main/java/tech/pegasys/teku/storage/server/ChainStorage.java index 85b446b5dac..7b14357ceab 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/ChainStorage.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/ChainStorage.java @@ -48,38 +48,44 @@ import tech.pegasys.teku.storage.api.VoteUpdateChannel; import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; import tech.pegasys.teku.storage.server.state.FinalizedStateCache; public class ChainStorage implements StorageUpdateChannel, StorageQueryChannel, VoteUpdateChannel, ChainStorageFacade { private static final Logger LOG = LogManager.getLogger(); + private final Database database; private final FinalizedStateCache finalizedStateCache; - private final StateStorageMode dataStorageMode; + private final BlobSidecarsArchiver blobSidecarsArchiver; private Optional cachedStoreData = Optional.empty(); private ChainStorage( final Database database, final FinalizedStateCache finalizedStateCache, - final StateStorageMode dataStorageMode) { + final StateStorageMode dataStorageMode, + final BlobSidecarsArchiver blobSidecarsArchiver) { this.database = database; this.finalizedStateCache = finalizedStateCache; this.dataStorageMode = dataStorageMode; + this.blobSidecarsArchiver = blobSidecarsArchiver; } public static ChainStorage create( final Database database, final Spec spec, final StateStorageMode dataStorageMode, - final int stateRebuildTimeoutSeconds) { + final int stateRebuildTimeoutSeconds, + final BlobSidecarsArchiver blobSidecarsArchiver) { final int finalizedStateCacheSize = spec.getSlotsPerEpoch(SpecConfig.GENESIS_EPOCH) * 3; return new ChainStorage( database, new FinalizedStateCache( spec, database, finalizedStateCacheSize, true, stateRebuildTimeoutSeconds), - dataStorageMode); + dataStorageMode, + blobSidecarsArchiver); } private synchronized Optional getStore() { @@ -363,4 +369,15 @@ public SafeFuture> getBlobSidecarKeys( final SlotAndBlockRoot slotAndBlockRoot) { return SafeFuture.of(() -> database.getBlobSidecarKeys(slotAndBlockRoot)); } + + @Override + public SafeFuture> getArchivedBlobSidecars( + final SlotAndBlockRoot slotAndBlockRoot) { + return SafeFuture.of(() -> blobSidecarsArchiver.retrieve(slotAndBlockRoot).orElse(List.of())); + } + + @Override + public SafeFuture> getArchivedBlobSidecars(final UInt64 slot) { + return SafeFuture.of(() -> blobSidecarsArchiver.retrieve(slot).orElse(List.of())); + } } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/CombinedStorageChannelSplitter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/CombinedStorageChannelSplitter.java index 163a5066a15..843f0b12a92 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/CombinedStorageChannelSplitter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/CombinedStorageChannelSplitter.java @@ -249,4 +249,15 @@ public SafeFuture> getBlobSidecarKeys( final SlotAndBlockRoot slotAndBlockRoot) { return asyncRunner.runAsync(() -> queryDelegate.getBlobSidecarKeys(slotAndBlockRoot)); } + + @Override + public SafeFuture> getArchivedBlobSidecars( + final SlotAndBlockRoot slotAndBlockRoot) { + return asyncRunner.runAsync(() -> queryDelegate.getArchivedBlobSidecars(slotAndBlockRoot)); + } + + @Override + public SafeFuture> getArchivedBlobSidecars(final UInt64 slot) { + return asyncRunner.runAsync(() -> queryDelegate.getArchivedBlobSidecars(slot)); + } } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java b/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java index 4b1ddfd3f89..182f98ff894 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java @@ -40,7 +40,7 @@ import tech.pegasys.teku.storage.api.UpdateResult; import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; -import tech.pegasys.teku.storage.archive.DataArchiveWriter; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; public interface Database extends AutoCloseable { @@ -74,16 +74,14 @@ void storeFinalizedBlocks( * * @param lastSlotToPrune inclusive, not reached if limit happens first * @param pruneLimit maximum number of slots to prune. - * @param archiveWriter write BlobSidecars to archive when pruning. + * @param blobSidecarsArchiver write BlobSidecars to archive when pruning. * @return true if number of pruned blobs reached the pruneLimit, false otherwise */ boolean pruneOldestBlobSidecars( - UInt64 lastSlotToPrune, - int pruneLimit, - final DataArchiveWriter> archiveWriter); + UInt64 lastSlotToPrune, int pruneLimit, BlobSidecarsArchiver blobSidecarsArchiver); boolean pruneOldestNonCanonicalBlobSidecars( - UInt64 lastSlotToPrune, int pruneLimit, DataArchiveWriter> archiveWriter); + UInt64 lastSlotToPrune, int pruneLimit, BlobSidecarsArchiver blobSidecarsArchiver); @MustBeClosed Stream streamBlobSidecarKeys(UInt64 startSlot, UInt64 endSlot); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java index 5e1107e433b..805a5f523dd 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java @@ -71,7 +71,7 @@ import tech.pegasys.teku.storage.api.UpdateResult; import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; -import tech.pegasys.teku.storage.archive.DataArchiveWriter; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; import tech.pegasys.teku.storage.server.Database; import tech.pegasys.teku.storage.server.StateStorageMode; import tech.pegasys.teku.storage.server.kvstore.dataaccess.CombinedKvStoreDao; @@ -943,7 +943,7 @@ public Optional getNonCanonicalBlobSidecar(final SlotAndBlockRootAn public boolean pruneOldestBlobSidecars( final UInt64 lastSlotToPrune, final int pruneLimit, - final DataArchiveWriter> archiveWriter) { + final BlobSidecarsArchiver blobSidecarsArchiver) { final Optional earliestBlobSidecarSlot = getEarliestBlobSidecarSlot(); if (earliestBlobSidecarSlot.isPresent() && earliestBlobSidecarSlot.get().isGreaterThan(lastSlotToPrune)) { @@ -951,7 +951,7 @@ public boolean pruneOldestBlobSidecars( } try (final Stream prunableBlobKeys = streamBlobSidecarKeys(earliestBlobSidecarSlot.orElse(UInt64.ZERO), lastSlotToPrune)) { - return pruneBlobSidecars(pruneLimit, prunableBlobKeys, archiveWriter, false); + return pruneBlobSidecars(pruneLimit, prunableBlobKeys, blobSidecarsArchiver, false); } } @@ -959,7 +959,7 @@ public boolean pruneOldestBlobSidecars( public boolean pruneOldestNonCanonicalBlobSidecars( final UInt64 lastSlotToPrune, final int pruneLimit, - final DataArchiveWriter> archiveWriter) { + final BlobSidecarsArchiver blobSidecarsArchiver) { final Optional earliestBlobSidecarSlot = getEarliestBlobSidecarSlot(); if (earliestBlobSidecarSlot.isPresent() && earliestBlobSidecarSlot.get().isGreaterThan(lastSlotToPrune)) { @@ -968,14 +968,15 @@ public boolean pruneOldestNonCanonicalBlobSidecars( try (final Stream prunableNoncanonicalBlobKeys = streamNonCanonicalBlobSidecarKeys( earliestBlobSidecarSlot.orElse(UInt64.ZERO), lastSlotToPrune)) { - return pruneBlobSidecars(pruneLimit, prunableNoncanonicalBlobKeys, archiveWriter, true); + return pruneBlobSidecars( + pruneLimit, prunableNoncanonicalBlobKeys, blobSidecarsArchiver, true); } } private boolean pruneBlobSidecars( final int pruneLimit, final Stream prunableBlobKeys, - final DataArchiveWriter> archiveWriter, + final BlobSidecarsArchiver blobSidecarsArchiver, final boolean nonCanonicalBlobSidecars) { int pruned = 0; @@ -1008,11 +1009,9 @@ private boolean pruneBlobSidecars( LOG.warn("Failed to retrieve BlobSidecars for keys: {}", keys); } - // Attempt to archive the BlobSidecars. - final boolean blobSidecarArchived = archiveWriter.archive(blobSidecars); - if (!blobSidecarArchived) { - LOG.error("Failed to archive and prune BlobSidecars. Stopping pruning"); - break; + if (!keys.isEmpty()) { + final SlotAndBlockRoot slotAndBlockRoot = keys.getFirst().getSlotAndBlockRoot(); + blobSidecarsArchiver.archive(slotAndBlockRoot, blobSidecars); } // Remove the BlobSidecars from the database. diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java b/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java index 185cc19e865..268eeebb543 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java @@ -43,7 +43,7 @@ import tech.pegasys.teku.storage.api.UpdateResult; import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; -import tech.pegasys.teku.storage.archive.DataArchiveWriter; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; import tech.pegasys.teku.storage.server.Database; public class NoOpDatabase implements Database { @@ -305,6 +305,22 @@ public Optional getNonCanonicalBlobSidecar(final SlotAndBlockRootAn return Optional.empty(); } + @Override + public boolean pruneOldestBlobSidecars( + final UInt64 lastSlotToPrune, + final int pruneLimit, + final BlobSidecarsArchiver blobSidecarsArchiver) { + return false; + } + + @Override + public boolean pruneOldestNonCanonicalBlobSidecars( + final UInt64 lastSlotToPrune, + final int pruneLimit, + final BlobSidecarsArchiver blobSidecarsArchiver) { + return false; + } + @Override public Stream streamBlobSidecarKeys( final UInt64 startSlot, final UInt64 endSlot) { @@ -333,22 +349,6 @@ public Optional getEarliestBlobSidecarSlot() { return Optional.empty(); } - @Override - public boolean pruneOldestBlobSidecars( - final UInt64 lastSlotToPrune, - final int pruneLimit, - final DataArchiveWriter> archiveWriter) { - return false; - } - - @Override - public boolean pruneOldestNonCanonicalBlobSidecars( - final UInt64 lastSlotToPrune, - final int pruneLimit, - final DataArchiveWriter> archiveWriter) { - return false; - } - @Override public void close() {} } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java b/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java index fb37cb1c9e6..c35f2b8ab09 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java @@ -13,9 +13,7 @@ package tech.pegasys.teku.storage.server.pruner; -import java.io.IOException; import java.time.Duration; -import java.util.List; import java.util.Optional; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; @@ -33,9 +31,7 @@ import tech.pegasys.teku.service.serviceutils.Service; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.config.SpecConfigDeneb; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.storage.archive.DataArchive; -import tech.pegasys.teku.storage.archive.DataArchiveWriter; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; import tech.pegasys.teku.storage.server.Database; import tech.pegasys.teku.storage.server.ShuttingDownException; @@ -59,12 +55,12 @@ public class BlobSidecarPruner extends Service { private final AtomicLong blobColumnSize = new AtomicLong(0); private final AtomicLong earliestBlobSidecarSlot = new AtomicLong(-1); private final boolean storeNonCanonicalBlobSidecars; - private final DataArchive dataArchive; + private final BlobSidecarsArchiver blobSidecarsArchiver; public BlobSidecarPruner( final Spec spec, final Database database, - final DataArchive dataArchive, + final BlobSidecarsArchiver blobSidecarsArchiver, final MetricsSystem metricsSystem, final AsyncRunner asyncRunner, final TimeProvider timeProvider, @@ -77,7 +73,7 @@ public BlobSidecarPruner( final boolean storeNonCanonicalBlobSidecars) { this.spec = spec; this.database = database; - this.dataArchive = dataArchive; + this.blobSidecarsArchiver = blobSidecarsArchiver; this.asyncRunner = asyncRunner; this.pruneInterval = pruneInterval; this.pruneLimit = pruneLimit; @@ -152,10 +148,10 @@ private void pruneBlobsPriorToAvailabilityWindow() { return; } LOG.debug("Pruning blobs up to slot {}, limit {}", latestPrunableSlot, pruneLimit); - try (DataArchiveWriter> archiveWriter = dataArchive.getBlobSidecarWriter()) { + try { final long blobsPruningStart = System.currentTimeMillis(); final boolean blobsPruningLimitReached = - database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, archiveWriter); + database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, blobSidecarsArchiver); logPruningResult( "Blobs pruning finished in {} ms. Limit reached: {}", blobsPruningStart, @@ -165,14 +161,12 @@ private void pruneBlobsPriorToAvailabilityWindow() { final long nonCanonicalBlobsPruningStart = System.currentTimeMillis(); final boolean nonCanonicalBlobsLimitReached = database.pruneOldestNonCanonicalBlobSidecars( - latestPrunableSlot, pruneLimit, archiveWriter); + latestPrunableSlot, pruneLimit, blobSidecarsArchiver); logPruningResult( "Non canonical Blobs pruning finished in {} ms. Limit reached: {}", nonCanonicalBlobsPruningStart, nonCanonicalBlobsLimitReached); } - } catch (IOException ex) { - LOG.error("Failed to get the BlobSidecar archive writer", ex); } catch (ShuttingDownException | RejectedExecutionException ex) { LOG.debug("Shutting down", ex); } diff --git a/storage/src/test/java/tech/pegasys/teku/storage/archive/filesystem/FileSystemBlobSidecarsArchiverTest.java b/storage/src/test/java/tech/pegasys/teku/storage/archive/filesystem/FileSystemBlobSidecarsArchiverTest.java new file mode 100644 index 00000000000..0c3b76d1f6c --- /dev/null +++ b/storage/src/test/java/tech/pegasys/teku/storage/archive/filesystem/FileSystemBlobSidecarsArchiverTest.java @@ -0,0 +1,171 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive.filesystem; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; +import tech.pegasys.teku.spec.datastructures.util.SlotAndBlockRootAndBlobIndex; +import tech.pegasys.teku.spec.logic.common.helpers.Predicates; +import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb; +import tech.pegasys.teku.spec.util.DataStructureUtil; + +public class FileSystemBlobSidecarsArchiverTest { + + private static final Spec SPEC = TestSpecFactory.createMinimalDeneb(); + private final Predicates predicates = new Predicates(SPEC.getGenesisSpecConfig()); + private final SchemaDefinitionsDeneb schemaDefinitionsDeneb = + SchemaDefinitionsDeneb.required(SPEC.getGenesisSchemaDefinitions()); + private final MiscHelpersDeneb miscHelpersDeneb = + new MiscHelpersDeneb( + SPEC.getGenesisSpecConfig().toVersionDeneb().orElseThrow(), + predicates, + schemaDefinitionsDeneb); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(SPEC); + + static Path testTempDir; + static FileSystemBlobSidecarsArchiver blobSidecarsArchiver; + + @BeforeAll + static void beforeEach() throws IOException { + testTempDir = Files.createTempDirectory("blobs"); + blobSidecarsArchiver = new FileSystemBlobSidecarsArchiver(SPEC, testTempDir); + } + + @AfterEach + public void tearDown() throws IOException { + // Delete the temporary directory after each test + if (Files.exists(testTempDir)) { + try (Stream walk = Files.walk(testTempDir)) { + walk.map(Path::toFile) + .forEach( + file -> { + if (!file.delete()) { + file.deleteOnExit(); + } + }); + } + } + } + + @Test + void testResolveArchivePath() { + final SlotAndBlockRootAndBlobIndex slotAndBlockRootAndBlobIndex = + new SlotAndBlockRootAndBlobIndex( + UInt64.ONE, dataStructureUtil.randomBytes32(), UInt64.ZERO); + final Path path = + blobSidecarsArchiver.resolveArchivePath( + slotAndBlockRootAndBlobIndex.getSlotAndBlockRoot().getBlockRoot()); + + // Check if the file path is correct. Doesn't check the intermediate directories. + assertTrue(path.toString().startsWith(testTempDir.toString())); + assertTrue( + path.toString() + .endsWith(slotAndBlockRootAndBlobIndex.getBlockRoot().toUnprefixedHexString())); + } + + @ParameterizedTest + @MethodSource("testResolveIndexPathArguments") + void testResolveIndexPath(final UInt64 slot, final String expectedIndexFileName) { + final Path actualIndexFile = blobSidecarsArchiver.resolveIndexFile(slot); + assertThat(actualIndexFile).hasFileName(expectedIndexFileName); + } + + @Test + void testArchiveWithEmptyList() { + final SlotAndBlockRoot slotAndBlockRoot = dataStructureUtil.randomSlotAndBlockRoot(); + // archiving + blobSidecarsArchiver.archive(slotAndBlockRoot, List.of()); + + // retrieving + final Optional> blobSidecars = + blobSidecarsArchiver.retrieve(slotAndBlockRoot); + // empty list + assertThat(blobSidecars).hasValue(List.of()); + } + + @Test + void testArchiveAndRetrieveBlobSidecars() { + final UInt64 slot = UInt64.valueOf(42); + final SlotAndBlockRoot slotAndBlockRoot = + new SlotAndBlockRoot(slot, dataStructureUtil.randomBytes32()); + final List blobSidecars = + List.of(createBlobSidecar(slot), createBlobSidecar(slot)); + + // archiving + blobSidecarsArchiver.archive(slotAndBlockRoot, blobSidecars); + + // test index file exists + assertThat(testTempDir.resolve("0-99999_index.dat")).exists(); + + // retrieving by slot and block root + final Optional> retrievedBlobSidecarsByRoot = + blobSidecarsArchiver.retrieve(slotAndBlockRoot); + + assertThat(retrievedBlobSidecarsByRoot).hasValue(blobSidecars); + + // retrieving by slot (using index file) + final Optional> retrievedBlobSidecarsBySlot = + blobSidecarsArchiver.retrieve(slotAndBlockRoot.getSlot()); + + assertThat(retrievedBlobSidecarsBySlot).hasValue(blobSidecars); + } + + private BlobSidecar createBlobSidecar(final UInt64 slot) { + final SignedBeaconBlock signedBeaconBlock = + dataStructureUtil.randomSignedBeaconBlockWithCommitments(slot, 1); + final Blob blob = dataStructureUtil.randomBlob(); + final SszKZGProof proof = dataStructureUtil.randomSszKZGProof(); + return miscHelpersDeneb.constructBlobSidecar(signedBeaconBlock, UInt64.ZERO, blob, proof); + } + + private static Stream testResolveIndexPathArguments() { + return Stream.of( + Arguments.of(UInt64.valueOf(0L), "0-99999_index.dat"), + Arguments.of(UInt64.valueOf(42L), "0-99999_index.dat"), + Arguments.of(UInt64.valueOf(99999L), "0-99999_index.dat"), + Arguments.of(UInt64.valueOf(100000L), "100000-199999_index.dat"), + Arguments.of(UInt64.valueOf(100001L), "100000-199999_index.dat"), + Arguments.of(UInt64.valueOf(199999L), "100000-199999_index.dat"), + Arguments.of(UInt64.valueOf(200000L), "200000-299999_index.dat"), + Arguments.of(UInt64.valueOf(999999999L), "999900000-999999999_index.dat"), + Arguments.of(UInt64.valueOf(1000000000L), "1000000000-1000099999_index.dat"), + Arguments.of(UInt64.valueOf(1L), "0-99999_index.dat"), + Arguments.of(UInt64.valueOf(99998L), "0-99999_index.dat"), + Arguments.of(UInt64.valueOf(99999L), "0-99999_index.dat"), + Arguments.of(UInt64.valueOf(100000L), "100000-199999_index.dat"), + Arguments.of(UInt64.valueOf(123456789L), "123400000-123499999_index.dat")); + } +} diff --git a/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriterTest.java b/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriterTest.java deleted file mode 100644 index 974e47d87c9..00000000000 --- a/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriterTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2025 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.storage.archive.fsarchive; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.TestSpecFactory; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.util.DataStructureUtil; - -public class BlobSidecarJsonWriterTest { - private static final Spec SPEC = TestSpecFactory.createMinimalDeneb(); - - BlobSidecarJsonWriter blobSidecarJsonWriter; - private DataStructureUtil dataStructureUtil; - - @BeforeEach - public void test() { - this.blobSidecarJsonWriter = new BlobSidecarJsonWriter(); - this.dataStructureUtil = new DataStructureUtil(SPEC); - } - - @Test - void testWriteSlotBlobSidecarsWithEmptyList() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - List blobSidecars = new ArrayList<>(); - blobSidecarJsonWriter.writeSlotBlobSidecars(out, blobSidecars); - String json = out.toString(StandardCharsets.UTF_8); - assertEquals("[]", json); - } - - @Test - void testWriteSlotBlobSidecarsWithSingleElement() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - List blobSidecars = new ArrayList<>(); - final BlobSidecar blobSidecar = - dataStructureUtil.randomBlobSidecarForBlock( - dataStructureUtil.randomSignedBeaconBlock(1), 0); - blobSidecars.add(blobSidecar); - blobSidecarJsonWriter.writeSlotBlobSidecars(out, blobSidecars); - String json = out.toString(StandardCharsets.UTF_8); - assertTrue(json.contains("index")); - assertTrue(json.contains("blob")); - assertTrue(json.contains("kzg_commitment")); - assertTrue(json.contains("kzg_proof")); - assertTrue(json.contains("signed_block_header")); - assertTrue(json.contains("parent_root")); - assertTrue(json.contains("state_root")); - assertTrue(json.contains("body_root")); - assertTrue(json.contains("signature")); - } - - @Test - void testWriteSlotBlobSidecarsNulls() { - assertThrows( - NullPointerException.class, () -> blobSidecarJsonWriter.writeSlotBlobSidecars(null, null)); - } - - @Test - void testWriteSlotBlobSidecarsNullOut() { - assertThrows( - NullPointerException.class, - () -> { - List blobSidecars = new ArrayList<>(); - blobSidecarJsonWriter.writeSlotBlobSidecars(null, blobSidecars); - }); - } - - @Test - void testWriteSlotBlobSidecarsNullList() { - assertThrows( - NullPointerException.class, - () -> { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - blobSidecarJsonWriter.writeSlotBlobSidecars(out, null); - }); - } -} diff --git a/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchiveTest.java b/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchiveTest.java deleted file mode 100644 index 59f81ab9717..00000000000 --- a/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchiveTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2025 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.storage.archive.fsarchive; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.TestSpecFactory; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; -import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; -import tech.pegasys.teku.spec.datastructures.util.SlotAndBlockRootAndBlobIndex; -import tech.pegasys.teku.spec.logic.common.helpers.Predicates; -import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb; -import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb; -import tech.pegasys.teku.spec.util.DataStructureUtil; -import tech.pegasys.teku.storage.archive.DataArchiveWriter; - -public class FileSystemArchiveTest { - private static final Spec SPEC = TestSpecFactory.createMinimalDeneb(); - private final Predicates predicates = new Predicates(SPEC.getGenesisSpecConfig()); - private final SchemaDefinitionsDeneb schemaDefinitionsDeneb = - SchemaDefinitionsDeneb.required(SPEC.getGenesisSchemaDefinitions()); - private final MiscHelpersDeneb miscHelpersDeneb = - new MiscHelpersDeneb( - SPEC.getGenesisSpecConfig().toVersionDeneb().orElseThrow(), - predicates, - schemaDefinitionsDeneb); - private final DataStructureUtil dataStructureUtil = new DataStructureUtil(SPEC); - - static Path testTempDir; - static FileSystemArchive dataArchive; - - @BeforeAll - static void beforeEach() throws IOException { - testTempDir = Files.createTempDirectory("blobs"); - dataArchive = new FileSystemArchive(testTempDir); - } - - @AfterEach - public void tearDown() throws IOException { - // Delete the temporary directory after each test - if (Files.exists(testTempDir)) { - try (Stream walk = Files.walk(testTempDir)) { - walk.map(Path::toFile) - .forEach( - file -> { - if (!file.delete()) { - file.deleteOnExit(); - } - }); - } - } - } - - BlobSidecar createBlobSidecar() { - final SignedBeaconBlock signedBeaconBlock = - dataStructureUtil.randomSignedBeaconBlockWithCommitments(1); - final Blob blob = dataStructureUtil.randomBlob(); - final SszKZGProof proof = dataStructureUtil.randomSszKZGProof(); - - return miscHelpersDeneb.constructBlobSidecar(signedBeaconBlock, UInt64.ZERO, blob, proof); - } - - @Test - void testResolve() { - SlotAndBlockRootAndBlobIndex slotAndBlockRootAndBlobIndex = - new SlotAndBlockRootAndBlobIndex( - UInt64.ONE, dataStructureUtil.randomBytes32(), UInt64.ZERO); - File file = dataArchive.resolve(slotAndBlockRootAndBlobIndex.getSlotAndBlockRoot()); - - // Check if the file path is correct. Doesn't check the intermediate directories. - assertTrue(file.toString().startsWith(testTempDir.toString())); - assertTrue( - file.toString() - .endsWith(slotAndBlockRootAndBlobIndex.getBlockRoot().toUnprefixedHexString())); - } - - @Test - void testArchiveWithEmptyList() throws IOException { - DataArchiveWriter> blobWriter = dataArchive.getBlobSidecarWriter(); - ArrayList list = new ArrayList<>(); - assertTrue(blobWriter.archive(list)); - blobWriter.close(); - } - - @Test - void testArchiveWithNullList() throws IOException { - DataArchiveWriter> blobWriter = dataArchive.getBlobSidecarWriter(); - assertTrue(blobWriter.archive(null)); - blobWriter.close(); - } - - @Test - void testWriteBlobSidecar() throws IOException { - DataArchiveWriter> blobWriter = dataArchive.getBlobSidecarWriter(); - ArrayList list = new ArrayList<>(); - BlobSidecar blobSidecar = createBlobSidecar(); - list.add(blobSidecar); - assertTrue(blobWriter.archive(list)); - blobWriter.close(); - - // Check if the file was written - try (FileInputStream fis = - new FileInputStream(testTempDir.resolve(FileSystemArchive.INDEX_FILE).toFile())) { - String content = new String(fis.readAllBytes(), StandardCharsets.UTF_8); - String expected = - blobSidecar.getSlot().toString() - + " " - + blobSidecar.getSlotAndBlockRoot().getBlockRoot().toUnprefixedHexString(); - - // Windows new lines are different, so don't include new lines in the comparison. - assertTrue(content.contains(expected)); - } - } - - @Test - void testFileAlreadyExists() throws IOException { - DataArchiveWriter> blobWriter = dataArchive.getBlobSidecarWriter(); - ArrayList list = new ArrayList<>(); - list.add(createBlobSidecar()); - assertTrue(blobWriter.archive(list)); - // Try to write the same file again - assertFalse(blobWriter.archive(list)); - blobWriter.close(); - } -} diff --git a/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java b/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java index 378e7306035..ed6065df9c5 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.IOException; import java.time.Duration; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; @@ -38,8 +37,7 @@ import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigDeneb; -import tech.pegasys.teku.storage.archive.DataArchive; -import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; import tech.pegasys.teku.storage.server.Database; public class BlobSidecarPrunerTest { @@ -57,13 +55,13 @@ public class BlobSidecarPrunerTest { private final StubAsyncRunner asyncRunner = new StubAsyncRunner(timeProvider); private final Database database = mock(Database.class); private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem(); - private final DataArchive dataArchive = new NoopDataArchive(); + private final BlobSidecarsArchiver blobSidecarsArchiver = mock(BlobSidecarsArchiver.class); private final BlobSidecarPruner blobsPruner = new BlobSidecarPruner( spec, database, - dataArchive, + blobSidecarsArchiver, stubMetricsSystem, asyncRunner, timeProvider, @@ -118,7 +116,7 @@ void shouldNotPruneWhenLatestPrunableIncludeGenesis() { } @Test - void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() throws IOException { + void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() { final SpecConfig config = spec.forMilestone(SpecMilestone.DENEB).getConfig(); final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config); // set current slot to MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS + 1 epoch + half epoch @@ -133,18 +131,14 @@ void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() throws IOExce asyncRunner.executeDueActions(); verify(database) .pruneOldestBlobSidecars( - UInt64.valueOf((slotsPerEpoch / 2) - 1), - PRUNE_LIMIT, - dataArchive.getBlobSidecarWriter()); + UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT, blobSidecarsArchiver); verify(database) .pruneOldestNonCanonicalBlobSidecars( - UInt64.valueOf((slotsPerEpoch / 2) - 1), - PRUNE_LIMIT, - dataArchive.getBlobSidecarWriter()); + UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT, blobSidecarsArchiver); } @Test - void shouldUseEpochsStoreBlobs() throws IOException { + void shouldUseEpochsStoreBlobs() { final SpecConfig config = spec.forMilestone(SpecMilestone.DENEB).getConfig(); final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config); final int defaultValue = specConfigDeneb.getMinEpochsForBlobSidecarsRequests(); @@ -164,7 +158,7 @@ void shouldUseEpochsStoreBlobs() throws IOException { new BlobSidecarPruner( specOverride, databaseOverride, - dataArchive, + blobSidecarsArchiver, stubMetricsSystem, asyncRunner, timeProvider, @@ -200,14 +194,10 @@ void shouldUseEpochsStoreBlobs() throws IOException { asyncRunner.executeDueActions(); verify(databaseOverride) .pruneOldestBlobSidecars( - UInt64.valueOf((slotsPerEpoch / 2) - 1), - PRUNE_LIMIT, - dataArchive.getBlobSidecarWriter()); + UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT, blobSidecarsArchiver); verify(databaseOverride) .pruneOldestNonCanonicalBlobSidecars( - UInt64.valueOf((slotsPerEpoch / 2) - 1), - PRUNE_LIMIT, - dataArchive.getBlobSidecarWriter()); + UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT, blobSidecarsArchiver); } @Test @@ -228,7 +218,7 @@ void shouldNotPruneWhenEpochsStoreBlobsIsMax() { new BlobSidecarPruner( specOverride, databaseOverride, - dataArchive, + blobSidecarsArchiver, stubMetricsSystem, asyncRunner, timeProvider, diff --git a/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java b/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java index 8c7aef79f74..d5153c79b59 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java @@ -51,7 +51,7 @@ import tech.pegasys.teku.spec.generator.ChainBuilder; import tech.pegasys.teku.storage.api.StubStorageUpdateChannel; import tech.pegasys.teku.storage.api.StubStorageUpdateChannelWithDelays; -import tech.pegasys.teku.storage.archive.nooparchive.DataArchiveNoopWriter; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder; import tech.pegasys.teku.storage.storageSystem.StorageSystem; import tech.pegasys.teku.storage.store.UpdatableStore.StoreTransaction; @@ -396,7 +396,7 @@ public void retrieveEarliestBlobSidecarSlot_shouldReturnUpdatedValue() { storageSystem .database() - .pruneOldestBlobSidecars(UInt64.valueOf(5), 3, new DataArchiveNoopWriter<>()); + .pruneOldestBlobSidecars(UInt64.valueOf(5), 3, BlobSidecarsArchiver.NOOP); assertThat(store.retrieveEarliestBlobSidecarSlot()) .isCompletedWithValueMatching( diff --git a/storage/src/testFixtures/java/tech/pegasys/teku/storage/api/StubStorageQueryChannel.java b/storage/src/testFixtures/java/tech/pegasys/teku/storage/api/StubStorageQueryChannel.java index de8d78bb2d9..ca24a9dbdd1 100644 --- a/storage/src/testFixtures/java/tech/pegasys/teku/storage/api/StubStorageQueryChannel.java +++ b/storage/src/testFixtures/java/tech/pegasys/teku/storage/api/StubStorageQueryChannel.java @@ -176,6 +176,17 @@ public SafeFuture> getBlobSidecarKeys( return SafeFuture.completedFuture(List.of()); } + @Override + public SafeFuture> getArchivedBlobSidecars( + final SlotAndBlockRoot slotAndBlockRoot) { + return SafeFuture.completedFuture(List.of()); + } + + @Override + public SafeFuture> getArchivedBlobSidecars(final UInt64 slot) { + return SafeFuture.completedFuture(List.of()); + } + @Override public SafeFuture> getBlobSidecarsBySlotAndBlockRoot( final SlotAndBlockRoot slotAndBlockRoot) { diff --git a/storage/src/testFixtures/java/tech/pegasys/teku/storage/storageSystem/StorageSystem.java b/storage/src/testFixtures/java/tech/pegasys/teku/storage/storageSystem/StorageSystem.java index a986d5cc950..a6c30d7683f 100644 --- a/storage/src/testFixtures/java/tech/pegasys/teku/storage/storageSystem/StorageSystem.java +++ b/storage/src/testFixtures/java/tech/pegasys/teku/storage/storageSystem/StorageSystem.java @@ -29,6 +29,7 @@ import tech.pegasys.teku.storage.api.FinalizedCheckpointChannel; import tech.pegasys.teku.storage.api.StubFinalizedCheckpointChannel; import tech.pegasys.teku.storage.api.TrackingChainHeadChannel; +import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver; import tech.pegasys.teku.storage.client.ChainUpdater; import tech.pegasys.teku.storage.client.CombinedChainDataClient; import tech.pegasys.teku.storage.client.EarliestAvailableBlockSlot; @@ -97,7 +98,8 @@ static StorageSystem create( // Create and start storage server final ChainStorage chainStorageServer = - ChainStorage.create(database, spec, storageMode, stateRebuildTimeoutSeconds); + ChainStorage.create( + database, spec, storageMode, stateRebuildTimeoutSeconds, BlobSidecarsArchiver.NOOP); // Create recent chain data final FinalizedCheckpointChannel finalizedCheckpointChannel =