Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment;
import tech.pegasys.teku.spec.datastructures.type.SszKZGProof;
import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier;

public class DataColumnSidecar
extends Container6<
Expand Down Expand Up @@ -131,6 +132,10 @@ public SlotAndBlockRoot getSlotAndBlockRoot() {
return new SlotAndBlockRoot(getSlot(), getBlockRoot());
}

public DataColumnSlotAndIdentifier getDataColumnSlotAndIdentifier() {
return new DataColumnSlotAndIdentifier(getSlot(), getBlockRoot(), getIndex());
}

public String toLogString() {
return LogFormatter.formatDataColumnSidecar(
getSlot(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.statetransition.datacolumns.db;

import com.google.common.collect.Iterables;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -47,11 +48,8 @@ private SlotCache getOrCreateSlotCache(final UInt64 slot) {
slot,
__ ->
new SlotCache(
delegateDb.getColumnIdentifiers(slot), slotToNumberOfColumns.apply(slot)));
}

private void invalidateSlotCache(final UInt64 slot) {
slotCaches.remove(slot);
delegateDb.getColumnIdentifiers(slot).thenApply(Function.identity()),
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just need to change param type to:

        final SafeFuture<? extends Iterable<DataColumnSlotAndIdentifier>> dbResponseFuture,

slotToNumberOfColumns.apply(slot)));
}

@Override
Expand All @@ -61,15 +59,42 @@ public SafeFuture<List<DataColumnSlotAndIdentifier>> getColumnIdentifiers(final

@Override
public SafeFuture<Void> addSidecar(final DataColumnSidecar sidecar) {
invalidateSlotCache(sidecar.getSlot());
return delegateDb.addSidecar(sidecar);
return getOrCreateSlotCache(sidecar.getSlot())
.contains(sidecar.getDataColumnSlotAndIdentifier())
.thenCompose(
inDb -> {
if (inDb) {
return SafeFuture.COMPLETE;
} else {
return delegateDb
.addSidecar(sidecar)
.thenPeek(
__ -> {
final UInt64 slot = sidecar.getSlot();
updateSlotCache(slot, sidecar.getDataColumnSlotAndIdentifier());
});
}
});
}

private synchronized void updateSlotCache(
final UInt64 slot, final DataColumnSlotAndIdentifier newIdentifier) {
final SlotCache slotCache = getOrCreateSlotCache(slot);
final Map<Bytes32, BitSet> oldSet = slotCache.compactCacheFuture.getImmediately();
final List<DataColumnSlotAndIdentifier> columnIdentifiers =
SlotCache.toColumnIdentifiers(slot, oldSet);
final SlotCache updatedCache =
new SlotCache(
SafeFuture.completedFuture(Iterables.concat(columnIdentifiers, List.of(newIdentifier))),
slotToNumberOfColumns.apply(slot));
slotCaches.put(slot, updatedCache);
}

private static class SlotCache {
private final SafeFuture<Map<Bytes32, BitSet>> compactCacheFuture;

public SlotCache(
final SafeFuture<List<DataColumnSlotAndIdentifier>> dbResponseFuture,
final SafeFuture<Iterable<DataColumnSlotAndIdentifier>> dbResponseFuture,
final int numberOfColumns) {
this.compactCacheFuture =
dbResponseFuture.thenApply(slotColumns -> toCompactCache(slotColumns, numberOfColumns));
Expand All @@ -80,8 +105,13 @@ public SafeFuture<List<DataColumnSlotAndIdentifier>> generateColumnIdentifiers(
return compactCacheFuture.thenApply(compactCache -> toColumnIdentifiers(slot, compactCache));
}

public SafeFuture<Boolean> contains(final DataColumnSlotAndIdentifier id) {
return compactCacheFuture.thenApply(
cache -> toColumnIdentifiers(id.slot(), cache).contains(id));
}

private static Map<Bytes32, BitSet> toCompactCache(
final List<DataColumnSlotAndIdentifier> slotColumns, final int numberOfColumns) {
final Iterable<DataColumnSlotAndIdentifier> slotColumns, final int numberOfColumns) {
final Map<Bytes32, BitSet> compactCache = new HashMap<>();
slotColumns.forEach(
colId ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void testPendingRequestIsExecutedWhenLongReadQuickWrite() throws Exception {
delayedDb.setDelay(ofMillis(5));
custody.onNewValidatedDataColumnSidecar(sidecar10_0);

advanceTimeGradually(ofMillis(10));
advanceTimeGradually(ofMillis(15));
assertThat(fRet0).isCompletedWithValue(Optional.ofNullable(sidecar10_0));
assertThat(fHas0).isCompletedWithValue(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,39 @@ void sanityTest() {
}

@Test
void checkCacheIsInvalidated() {
void checkCacheIsUpdated() {
SafeFuture<List<DataColumnSlotAndIdentifier>> res1 =
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));

stubAsync.advanceTimeGradually(ofMillis(1));

SafeFuture<Void> addCompleteFuture = columnIdCachingDb.addSidecar(createSidecar(777, 77));
final DataColumnSidecar sidecar = createSidecar(777, 77);
SafeFuture<Void> addCompleteFuture1 = columnIdCachingDb.addSidecar(sidecar);
stubAsync.advanceTimeGraduallyUntilAllDone(ofSeconds(1));

assertThat(res1)
.isCompleted(); // no assumptions on result: cache may or may not pick up latest changes
assertThat(addCompleteFuture).isCompleted();
long reads0 = db.getDbReadCounter().get();
assertThat(reads0).isGreaterThan(0);
assertThat(addCompleteFuture1).isCompleted();
long reads1 = db.getDbReadCounter().get();
assertThat(reads1).isEqualTo(1);
long writes1 = db.getDbWriteCounter().get();
assertThat(writes1).isEqualTo(1);

SafeFuture<List<DataColumnSlotAndIdentifier>> res2 =
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));
stubAsync.advanceTimeGradually(dbDelay);

assertThat(res2).isCompletedWithValueMatching(l -> !l.isEmpty());

// Retry saving the same sidecar, db should not be used
SafeFuture<Void> addCompleteFuture2 = columnIdCachingDb.addSidecar(sidecar);
stubAsync.advanceTimeGraduallyUntilAllDone(ofSeconds(1));

assertThat(addCompleteFuture2).isCompleted();
long reads2 = db.getDbReadCounter().get();
assertThat(reads2).isEqualTo(reads1);
long writes2 = db.getDbWriteCounter().get();
assertThat(writes2).isEqualTo(writes1);
}

@Test
Expand Down