Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.collections.LimitedMap;
import tech.pegasys.teku.infrastructure.collections.LimitedSet;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar;
import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier;
Expand All @@ -31,27 +33,30 @@ class ColumnIdCachingDasDb implements DataColumnSidecarDB {
private final DataColumnSidecarDB delegateDb;
private final Function<UInt64, Integer> slotToNumberOfColumns;

private final Map<UInt64, SlotCache> slotCaches;
private final Map<UInt64, SlotCache> readSlotCaches;
private final Set<DataColumnSlotAndIdentifier> latestAdded;

public ColumnIdCachingDasDb(
final DataColumnSidecarDB delegateDb,
final Function<UInt64, Integer> slotToNumberOfColumns,
final int maxCacheSize) {
final int slotReadCacheSize,
final int sidecarsWriteCacheSize) {
this.delegateDb = delegateDb;
this.slotToNumberOfColumns = slotToNumberOfColumns;
this.slotCaches = LimitedMap.createSynchronizedLRU(maxCacheSize);
this.readSlotCaches = LimitedMap.createSynchronizedLRU(slotReadCacheSize);
this.latestAdded = LimitedSet.createSynchronized(sidecarsWriteCacheSize);
}

private SlotCache getOrCreateSlotCache(final UInt64 slot) {
return slotCaches.computeIfAbsent(
return readSlotCaches.computeIfAbsent(
slot,
__ ->
new SlotCache(
delegateDb.getColumnIdentifiers(slot), slotToNumberOfColumns.apply(slot)));
}

private void invalidateSlotCache(final UInt64 slot) {
slotCaches.remove(slot);
readSlotCaches.remove(slot);
}

@Override
Expand All @@ -61,6 +66,9 @@ public SafeFuture<List<DataColumnSlotAndIdentifier>> getColumnIdentifiers(final

@Override
public SafeFuture<Void> addSidecar(final DataColumnSidecar sidecar) {
if (!latestAdded.add(DataColumnSlotAndIdentifier.fromDataColumn(sidecar))) {
return SafeFuture.COMPLETE;
}
invalidateSlotCache(sidecar.getSlot());
return delegateDb.addSidecar(sidecar);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
public class DataColumnSidecarDbAccessorBuilder {

// is roughly 600Kb (cache entry for one slot is about 60 bytes)
private static final int DEFAULT_COLUMN_ID_CACHE_MAX_SLOT_COUNT = 10 * 1024;
private static final int DEFAULT_COLUMN_ID_READ_CACHE_MAX_SLOT_COUNT = 10 * 1024;
private static final int DEFAULT_COLUMN_ID_WRITE_CACHE_MAX_COUNT = 3 * 128;

private final DataColumnSidecarDB db;
private Spec spec;
private MinCustodyPeriodSlotCalculator minCustodyPeriodSlotCalculator;
private final AutoPruneDbBuilder autoPruneDbBuilder = new AutoPruneDbBuilder();
private int columnIdCacheMaxSlotCount = DEFAULT_COLUMN_ID_CACHE_MAX_SLOT_COUNT;
private int columnIdReadCacheSlotCount = DEFAULT_COLUMN_ID_READ_CACHE_MAX_SLOT_COUNT;
private int columnIdWriteCacheCount = DEFAULT_COLUMN_ID_WRITE_CACHE_MAX_COUNT;

DataColumnSidecarDbAccessorBuilder(final DataColumnSidecarDB db) {
this.db = db;
Expand All @@ -47,9 +49,15 @@ public DataColumnSidecarDbAccessorBuilder minCustodyPeriodSlotCalculator(
return this;
}

public DataColumnSidecarDbAccessorBuilder columnIdCacheMaxSlotCount(
final int columnIdCacheMaxSlotCount) {
this.columnIdCacheMaxSlotCount = columnIdCacheMaxSlotCount;
public DataColumnSidecarDbAccessorBuilder columnIdCacheSlotCount(
final int columnIdCacheSlotCount) {
this.columnIdReadCacheSlotCount = columnIdCacheSlotCount;
return this;
}

public DataColumnSidecarDbAccessorBuilder columnIdWriteCacheCount(
final int columnIdWriteCacheCount) {
this.columnIdWriteCacheCount = columnIdWriteCacheCount;
return this;
}

Expand All @@ -69,7 +77,11 @@ private int getNumberOfColumnsForSlot(final UInt64 slot) {

public DataColumnSidecarDbAccessor build() {
ColumnIdCachingDasDb columnIdCachingDasDb =
new ColumnIdCachingDasDb(db, this::getNumberOfColumnsForSlot, columnIdCacheMaxSlotCount);
new ColumnIdCachingDasDb(
db,
this::getNumberOfColumnsForSlot,
columnIdReadCacheSlotCount,
columnIdWriteCacheCount);
return autoPruneDbBuilder.build(columnIdCachingDasDb);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,20 @@

@SuppressWarnings("FutureReturnValueIgnored")
public class ColumnIdCachingDasDbTest {
final Spec spec = TestSpecFactory.createMinimalFulu();
final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, spec);
private final Spec spec = TestSpecFactory.createMinimalFulu();
private final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, spec);

final Duration dbDelay = ofMillis(5);
final StubAsync stubAsync = new StubAsync();
private final Duration dbDelay = ofMillis(5);
private final StubAsync stubAsync = new StubAsync();

DataColumnSidecarDBStub db = new DataColumnSidecarDBStub();
DataColumnSidecarDB asyncDb = new DelayedDasDb(this.db, stubAsync.getStubAsyncRunner(), dbDelay);
private final DataColumnSidecarDBStub db = new DataColumnSidecarDBStub();
private final DataColumnSidecarDB asyncDb =
new DelayedDasDb(this.db, stubAsync.getStubAsyncRunner(), dbDelay);

final int cacheSize = 2;
ColumnIdCachingDasDb columnIdCachingDb = new ColumnIdCachingDasDb(asyncDb, __ -> 128, cacheSize);
private final int slotReadCacheSize = 2;
private final int sidecarsWriteCacheSize = 2;
private final ColumnIdCachingDasDb columnIdCachingDb =
new ColumnIdCachingDasDb(asyncDb, __ -> 128, slotReadCacheSize, sidecarsWriteCacheSize);

private DataColumnSidecar createSidecar(final int slot, final int index) {
final UInt64 slotU = UInt64.valueOf(slot);
Expand All @@ -59,58 +62,99 @@ private DataColumnSidecar createSidecar(final int slot, final int index) {

@Test
void sanityTest() {
SafeFuture<List<DataColumnSlotAndIdentifier>> res1 =
final SafeFuture<List<DataColumnSlotAndIdentifier>> res1 =
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));

assertThat(res1).isNotDone();
stubAsync.advanceTimeGradually(dbDelay);
assertThat(res1).isCompletedWithValue(emptyList());
long reads0 = db.getDbReadCounter().get();
final long reads0 = db.getDbReadCounter().get();
assertThat(reads0).isGreaterThan(0);

SafeFuture<List<DataColumnSlotAndIdentifier>> res2 =
final SafeFuture<List<DataColumnSlotAndIdentifier>> res2 =
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));

assertThat(res2).isCompletedWithValue(emptyList());
long reads1 = db.getDbReadCounter().get();
final long reads1 = db.getDbReadCounter().get();
assertThat(reads1).isEqualTo(reads0);
}

@Test
void checkCacheIsInvalidated() {
SafeFuture<List<DataColumnSlotAndIdentifier>> res1 =
void checkReadCacheIsUpdated() {
final SafeFuture<List<DataColumnSlotAndIdentifier>> res1 =
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));

stubAsync.advanceTimeGradually(ofMillis(1));

SafeFuture<Void> addCompleteFuture = columnIdCachingDb.addSidecar(createSidecar(777, 77));
final SafeFuture<Void> addCompleteFuture = columnIdCachingDb.addSidecar(createSidecar(777, 77));
stubAsync.advanceTimeGraduallyUntilAllDone(ofSeconds(1));

assertThat(res1)
.isCompleted(); // no assumptions on result: cache may or may not pick up latest changes
assertThat(addCompleteFuture).isCompleted();
long reads0 = db.getDbReadCounter().get();
final long reads0 = db.getDbReadCounter().get();
assertThat(reads0).isGreaterThan(0);

SafeFuture<List<DataColumnSlotAndIdentifier>> res2 =
final SafeFuture<List<DataColumnSlotAndIdentifier>> res2 =
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));
stubAsync.advanceTimeGradually(dbDelay);

assertThat(res2).isCompletedWithValueMatching(l -> !l.isEmpty());
}

@Test
void checkWriteCacheIsUsed() {
final SafeFuture<List<DataColumnSlotAndIdentifier>> res1 =
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));

stubAsync.advanceTimeGradually(ofMillis(1));

final DataColumnSidecar sidecar = createSidecar(777, 77);
final SafeFuture<Void> addCompleteFuture1 = columnIdCachingDb.addSidecar(sidecar);
stubAsync.advanceTimeGraduallyUntilAllDone(ofSeconds(1));

assertThat(res1)
.isCompleted(); // no assumptions on result: cache may or may not pick up latest changes
assertThat(addCompleteFuture1).isCompleted();
final long reads1 = db.getDbReadCounter().get();
assertThat(reads1).isEqualTo(1);
final long writes1 = db.getDbWriteCounter().get();
assertThat(writes1).isEqualTo(1);

final SafeFuture<List<DataColumnSlotAndIdentifier>> res2 =
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));
stubAsync.advanceTimeGradually(dbDelay);

assertThat(res2).isCompletedWithValueMatching(l -> !l.isEmpty());

final long reads2 = db.getDbReadCounter().get();
assertThat(reads2).isEqualTo(2);
final long writes2 = db.getDbWriteCounter().get();
assertThat(writes2).isEqualTo(writes1);

// Retry saving the same sidecar, db should not be used
final SafeFuture<Void> addCompleteFuture2 = columnIdCachingDb.addSidecar(sidecar);
stubAsync.advanceTimeGraduallyUntilAllDone(ofSeconds(1));

assertThat(addCompleteFuture2).isCompleted();
final long reads3 = db.getDbReadCounter().get();
assertThat(reads3).isEqualTo(reads2);
final long writes3 = db.getDbWriteCounter().get();
assertThat(writes3).isEqualTo(writes1);
}

@Test
void checkCacheIsPruned() {
for (int i = 0; i < cacheSize + 1; i++) {
for (int i = 0; i < slotReadCacheSize + 1; i++) {
columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777 + i));
}
stubAsync.advanceTimeGradually(dbDelay);
long reads0 = db.getDbReadCounter().get();
final long reads0 = db.getDbReadCounter().get();
assertThat(reads0).isGreaterThan(0);

columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777));
stubAsync.advanceTimeGradually(dbDelay);
long reads1 = db.getDbReadCounter().get();
final long reads1 = db.getDbReadCounter().get();
// the first cache entry (for slot 777) should be evicted and a query to underlying db should be
// done
assertThat(reads1).isGreaterThan(reads0);
Expand Down