Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,32 @@ private SafeFuture<Optional<List<BlobSidecar>>> getBlobSidecarsForBlock(

private SafeFuture<Optional<List<BlobSidecar>>> getBlobSidecars(
final SlotAndBlockRoot slotAndBlockRoot, final List<UInt64> indices) {
return client.getBlobSidecars(slotAndBlockRoot, indices).thenApply(Optional::of);
return client
.getBlobSidecars(slotAndBlockRoot, indices)
.thenCompose(
blobSidecars -> {
if (blobSidecars.isEmpty()) {
// attempt retrieving from archive (when enabled)
return client.getArchivedBlobSidecars(slotAndBlockRoot, indices);
}
return SafeFuture.completedFuture(blobSidecars);
})
.thenApply(Optional::of);
}

private SafeFuture<Optional<List<BlobSidecar>>> getBlobSidecars(
final UInt64 slot, final List<UInt64> indices) {
return client.getBlobSidecars(slot, indices).thenApply(Optional::of);
return client
.getBlobSidecars(slot, indices)
.thenCompose(
blobSidecars -> {
if (blobSidecars.isEmpty()) {
// attempt retrieving from archive (when enabled)
return client.getArchivedBlobSidecars(slot, indices);
}
return SafeFuture.completedFuture(blobSidecars);
})
.thenApply(Optional::of);
}

private Optional<BlobSidecarsAndMetaData> addMetaData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,31 @@ public void blockRootSelector_shouldGetBlobSidecarsForFinalizedSlot()
assertThat(result.get().getData()).isEqualTo(blobSidecars);
}

@Test
public void blockRootSelector_shouldAttemptToGetFinalizedBlobSidecarsFromArchive()
throws ExecutionException, InterruptedException {
final UInt64 finalizedSlot = UInt64.valueOf(42);
final SignedBlockAndState blockAndState = data.randomSignedBlockAndState(100);
when(client.getChainHead()).thenReturn(Optional.of(ChainHead.create(blockAndState)));

when(client.getFinalizedSlotByBlockRoot(block.getRoot()))
.thenReturn(SafeFuture.completedFuture(Optional.of(finalizedSlot)));
final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(finalizedSlot, block.getRoot());
// not available in DB
when(client.getBlobSidecars(slotAndBlockRoot, indices))
.thenReturn(SafeFuture.completedFuture(List.of()));
// available in archive
when(client.getArchivedBlobSidecars(slotAndBlockRoot, indices))
.thenReturn(SafeFuture.completedFuture(blobSidecars));

final Optional<BlobSidecarsAndMetaData> result =
blobSidecarSelectorFactory
.blockRootSelector(block.getRoot())
.getBlobSidecars(indices)
.get();
assertThat(result.get().getData()).isEqualTo(blobSidecars);
}

@Test
public void blockRootSelector_shouldGetBlobSidecarsByRetrievingBlock()
throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -153,6 +178,23 @@ public void slotSelector_shouldGetBlobSidecarsFromFinalizedSlot()
assertThat(result.get().getData()).isEqualTo(blobSidecars);
}

@Test
public void slotSelector_shouldAttemptToGetFinalizedBlobSidecarsFromArchive()
throws ExecutionException, InterruptedException {
when(client.isFinalized(block.getSlot())).thenReturn(true);
// not available in DB
when(client.getBlobSidecars(block.getSlot(), indices))
.thenReturn(SafeFuture.completedFuture(List.of()));
// available in archive
when(client.getArchivedBlobSidecars(block.getSlot(), indices))
.thenReturn(SafeFuture.completedFuture(blobSidecars));
when(client.isChainHeadOptimistic()).thenReturn(false);

final Optional<BlobSidecarsAndMetaData> result =
blobSidecarSelectorFactory.slotSelector(block.getSlot()).getBlobSidecars(indices).get();
assertThat(result.get().getData()).isEqualTo(blobSidecars);
}

@Test
public void slotSelector_shouldGetBlobSidecarsByRetrievingBlockWhenSlotNotFinalized()
throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -228,6 +270,11 @@ public void shouldLookForBlobSidecarsOnlyAfterDeneb(
.thenReturn(SafeFuture.completedFuture(Optional.of(block)));
when(client.getBlobSidecars(any(SlotAndBlockRoot.class), anyList()))
.thenReturn(SafeFuture.completedFuture(List.of()));
when(client.getArchivedBlobSidecars(any(SlotAndBlockRoot.class), anyList()))
.thenReturn(SafeFuture.completedFuture(List.of()));
when(client.getArchivedBlobSidecars(any(UInt64.class), anyList()))
.thenReturn(SafeFuture.completedFuture(List.of()));

blobSidecarSelectorFactory.slotSelector(block.getSlot()).getBlobSidecars(indices).get();
if (ctx.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
verify(client).getBlobSidecars(any(SlotAndBlockRoot.class), anyList());
Expand Down Expand Up @@ -273,8 +320,6 @@ public void slotSelector_whenSelectingNonFinalizedBlockMetadataReturnsFinalizedF
when(client.isFinalized(block.getSlot())).thenReturn(false);
when(client.getBlockAtSlotExact(block.getSlot()))
.thenReturn(SafeFuture.completedFuture(Optional.of(block)));
when(client.getBlobSidecars(block.getSlot(), indices))
.thenReturn(SafeFuture.completedFuture(blobSidecars));
when(client.isChainHeadOptimistic()).thenReturn(false);
when(client.getBlobSidecars(block.getSlotAndBlockRoot(), indices))
.thenReturn(SafeFuture.completedFuture(blobSidecars));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public enum TekuMetricCategory implements MetricCategory {
STORAGE("storage"),
STORAGE_HOT_DB("storage_hot"),
STORAGE_FINALIZED_DB("storage_finalized"),
REMOTE_VALIDATOR("remote_validator"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is not used anywhere. Even though not really relevant to this PR.

VALIDATOR("validator"),
VALIDATOR_PERFORMANCE("validator_performance"),
VALIDATOR_DUTY("validator_duty");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@
import tech.pegasys.teku.storage.api.CombinedStorageChannel;
import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel;
import tech.pegasys.teku.storage.api.VoteUpdateChannel;
import tech.pegasys.teku.storage.archive.DataArchive;
import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive;
import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive;
import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver;
import tech.pegasys.teku.storage.archive.filesystem.FileSystemBlobSidecarsArchiver;
import tech.pegasys.teku.storage.server.BatchingVoteUpdateChannel;
import tech.pegasys.teku.storage.server.ChainStorage;
import tech.pegasys.teku.storage.server.CombinedStorageChannelSplitter;
Expand Down Expand Up @@ -159,19 +158,21 @@ protected SafeFuture<?> doStart() {
pruningActiveLabelledGauge);
}

final DataArchive dataArchive =
final BlobSidecarsArchiver blobSidecarsArchiver =
config
.getBlobsArchivePath()
.<DataArchive>map(path -> new FileSystemArchive(Path.of(path)))
.orElse(new NoopDataArchive());
.<BlobSidecarsArchiver>map(
path ->
new FileSystemBlobSidecarsArchiver(config.getSpec(), Path.of(path)))
.orElse(BlobSidecarsArchiver.NOOP);

if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
blobsPruner =
Optional.of(
new BlobSidecarPruner(
config.getSpec(),
database,
dataArchive,
blobSidecarsArchiver,
serviceConfig.getMetricsSystem(),
storagePrunerAsyncRunner,
serviceConfig.getTimeProvider(),
Expand All @@ -183,13 +184,16 @@ protected SafeFuture<?> doStart() {
pruningActiveLabelledGauge,
config.isStoreNonCanonicalBlocksEnabled()));
}
final EventChannels eventChannels = serviceConfig.getEventChannels();
chainStorage =
ChainStorage.create(
database,
config.getSpec(),
config.getDataStorageMode(),
config.getStateRebuildTimeoutSeconds());
config.getStateRebuildTimeoutSeconds(),
blobSidecarsArchiver);

final EventChannels eventChannels = serviceConfig.getEventChannels();

final DepositStorage depositStorage =
DepositStorage.create(
eventChannels.getPublisher(Eth1EventsChannel.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface StorageQueryChannel extends ChannelInterface {
SafeFuture<Optional<SignedBlockAndState>> getHotBlockAndStateByBlockRoot(Bytes32 blockRoot);

SafeFuture<Optional<StateAndBlockSummary>> getHotStateAndBlockSummaryByBlockRoot(
final Bytes32 blockRoot);
Bytes32 blockRoot);

/**
* Returns "hot" blocks - the latest finalized block or blocks that descend from the latest
Expand Down Expand Up @@ -109,4 +109,9 @@ SafeFuture<List<SlotAndBlockRootAndBlobIndex>> getBlobSidecarKeys(

SafeFuture<List<SlotAndBlockRootAndBlobIndex>> getBlobSidecarKeys(
SlotAndBlockRoot slotAndBlockRoot);

// Methods for retrieving archived blobs (when enabled)
SafeFuture<List<BlobSidecar>> getArchivedBlobSidecars(SlotAndBlockRoot slotAndBlockRoot);

SafeFuture<List<BlobSidecar>> getArchivedBlobSidecars(UInt64 slot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import tech.pegasys.teku.storage.api.OnDiskStoreData;
import tech.pegasys.teku.storage.api.StorageUpdate;
import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate;
import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive;
import tech.pegasys.teku.storage.archive.filesystem.FileSystemBlobSidecarsArchiver;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseContext;
Expand Down Expand Up @@ -122,7 +122,7 @@ public class DatabaseTest {
private StateStorageMode storageMode;
private StorageSystem storageSystem;
private Database database;
private FileSystemArchive fileSystemDataArchive;
private FileSystemBlobSidecarsArchiver blobSidecarsArchiver;
private RecentChainData recentChainData;
private UpdatableStore store;
private final List<StorageSystem> storageSystems = new ArrayList<>();
Expand All @@ -139,7 +139,7 @@ private void setupWithSpec(final Spec spec) throws IOException {
this.chainProperties = new ChainProperties(spec);
final Path blobsArchive = Files.createTempDirectory("blobs");
tmpDirectories.add(blobsArchive.toFile());
this.fileSystemDataArchive = new FileSystemArchive(blobsArchive);
this.blobSidecarsArchiver = new FileSystemBlobSidecarsArchiver(spec, blobsArchive);
genesisBlockAndState = chainBuilder.generateGenesis(genesisTime, true);
genesisCheckpoint = getCheckpointForBlock(genesisBlockAndState.getBlock());
genesisAnchor = AnchorPoint.fromGenesisState(spec, genesisBlockAndState.getState());
Expand Down Expand Up @@ -300,9 +300,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
List.of(blobSidecar5_0)));

// let's prune with limit to 1
assertThat(
database.pruneOldestBlobSidecars(
UInt64.MAX_VALUE, 1, fileSystemDataArchive.getBlobSidecarWriter()))
assertThat(database.pruneOldestBlobSidecars(UInt64.MAX_VALUE, 1, blobSidecarsArchiver))
.isTrue();
assertBlobSidecarKeys(
blobSidecar2_0.getSlot(),
Expand All @@ -324,9 +322,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).doesNotExist();

// let's prune up to slot 1 (nothing will be pruned)
assertThat(
database.pruneOldestBlobSidecars(ONE, 10, fileSystemDataArchive.getBlobSidecarWriter()))
.isFalse();
assertThat(database.pruneOldestBlobSidecars(ONE, 10, blobSidecarsArchiver)).isFalse();
assertBlobSidecarKeys(
blobSidecar2_0.getSlot(),
blobSidecar5_0.getSlot(),
Expand All @@ -343,9 +339,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L);

// let's prune all from slot 4 excluded
assertThat(
database.pruneOldestBlobSidecars(
UInt64.valueOf(3), 10, fileSystemDataArchive.getBlobSidecarWriter()))
assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(3), 10, blobSidecarsArchiver))
.isFalse();
assertBlobSidecarKeys(
blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0));
Expand All @@ -359,9 +353,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(getSlotBlobsArchiveFile(blobSidecar5_0)).doesNotExist();

// let's prune all
assertThat(
database.pruneOldestBlobSidecars(
UInt64.valueOf(5), 1, fileSystemDataArchive.getBlobSidecarWriter()))
assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(5), 1, blobSidecarsArchiver))
.isTrue();
// all empty now
assertBlobSidecarKeys(ZERO, UInt64.valueOf(10));
Expand All @@ -372,8 +364,9 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(getSlotBlobsArchiveFile(blobSidecar5_0)).exists();
}

private File getSlotBlobsArchiveFile(final BlobSidecar blobSidecar) {
return fileSystemDataArchive.resolve(blobSidecar.getSlotAndBlockRoot());
private Path getSlotBlobsArchiveFile(final BlobSidecar blobSidecar) {
return blobSidecarsArchiver.resolveArchivePath(
blobSidecar.getSlotAndBlockRoot().getBlockRoot());
}

@TestTemplate
Expand Down Expand Up @@ -473,8 +466,7 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro

// Pruning with a prune limit set to 1: Only blobSidecar1 will be pruned
assertThat(
database.pruneOldestNonCanonicalBlobSidecars(
UInt64.MAX_VALUE, 1, fileSystemDataArchive.getBlobSidecarWriter()))
database.pruneOldestNonCanonicalBlobSidecars(UInt64.MAX_VALUE, 1, blobSidecarsArchiver))
.isTrue();
assertNonCanonicalBlobSidecarKeys(
blobSidecar2_0.getSlot(),
Expand All @@ -495,9 +487,7 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro
assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).doesNotExist();

// Pruning up to slot 1: No blobs pruned
assertThat(
database.pruneOldestNonCanonicalBlobSidecars(
ONE, 10, fileSystemDataArchive.getBlobSidecarWriter()))
assertThat(database.pruneOldestNonCanonicalBlobSidecars(ONE, 10, blobSidecarsArchiver))
.isFalse();
assertNonCanonicalBlobSidecarKeys(
blobSidecar2_0.getSlot(),
Expand All @@ -520,7 +510,7 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro
// Prune blobs up to slot 3
assertThat(
database.pruneOldestNonCanonicalBlobSidecars(
UInt64.valueOf(3), 10, fileSystemDataArchive.getBlobSidecarWriter()))
UInt64.valueOf(3), 10, blobSidecarsArchiver))
.isFalse();
assertNonCanonicalBlobSidecarKeys(
blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0));
Expand All @@ -535,7 +525,7 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro
// Pruning all blobs
assertThat(
database.pruneOldestNonCanonicalBlobSidecars(
UInt64.valueOf(5), 1, fileSystemDataArchive.getBlobSidecarWriter()))
UInt64.valueOf(5), 1, blobSidecarsArchiver))
.isTrue();
// No blobs should be left
assertNonCanonicalBlobSidecarKeys(ZERO, UInt64.valueOf(10));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.archive;

import java.util.List;
import java.util.Optional;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;

public interface BlobSidecarsArchiver {

BlobSidecarsArchiver NOOP =
new BlobSidecarsArchiver() {
@Override
public void archive(
final SlotAndBlockRoot slotAndBlockRoot, final List<BlobSidecar> blobSidecars) {}

@Override
public Optional<List<BlobSidecar>> retrieve(final SlotAndBlockRoot slotAndBlockRoot) {
return Optional.empty();
}

@Override
public Optional<List<BlobSidecar>> retrieve(final UInt64 slot) {
return Optional.empty();
}
};

void archive(SlotAndBlockRoot slotAndBlockRoot, List<BlobSidecar> blobSidecars);

Optional<List<BlobSidecar>> retrieve(SlotAndBlockRoot slotAndBlockRoot);

Optional<List<BlobSidecar>> retrieve(UInt64 slot);
}
Loading