diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/db/ColumnIdCachingDasDb.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/db/ColumnIdCachingDasDb.java index 53d86bb210a..376748b2fbc 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/db/ColumnIdCachingDasDb.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/db/ColumnIdCachingDasDb.java @@ -18,10 +18,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.collections.LimitedMap; +import tech.pegasys.teku.infrastructure.collections.LimitedSet; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; @@ -31,19 +33,22 @@ class ColumnIdCachingDasDb implements DataColumnSidecarDB { private final DataColumnSidecarDB delegateDb; private final Function slotToNumberOfColumns; - private final Map slotCaches; + private final Map readSlotCaches; + private final Set latestAdded; public ColumnIdCachingDasDb( final DataColumnSidecarDB delegateDb, final Function slotToNumberOfColumns, - final int maxCacheSize) { + final int slotReadCacheSize, + final int sidecarsWriteCacheSize) { this.delegateDb = delegateDb; this.slotToNumberOfColumns = slotToNumberOfColumns; - this.slotCaches = LimitedMap.createSynchronizedLRU(maxCacheSize); + this.readSlotCaches = LimitedMap.createSynchronizedLRU(slotReadCacheSize); + this.latestAdded = LimitedSet.createSynchronized(sidecarsWriteCacheSize); } private SlotCache getOrCreateSlotCache(final UInt64 slot) { - return slotCaches.computeIfAbsent( + return readSlotCaches.computeIfAbsent( slot, __ -> new SlotCache( @@ -51,7 +56,7 @@ private SlotCache getOrCreateSlotCache(final UInt64 slot) { } private void invalidateSlotCache(final UInt64 slot) { - slotCaches.remove(slot); + readSlotCaches.remove(slot); } @Override @@ -61,6 +66,9 @@ public SafeFuture> getColumnIdentifiers(final @Override public SafeFuture addSidecar(final DataColumnSidecar sidecar) { + if (!latestAdded.add(DataColumnSlotAndIdentifier.fromDataColumn(sidecar))) { + return SafeFuture.COMPLETE; + } invalidateSlotCache(sidecar.getSlot()); return delegateDb.addSidecar(sidecar); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/db/DataColumnSidecarDbAccessorBuilder.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/db/DataColumnSidecarDbAccessorBuilder.java index 98bd2fba4ba..d498f994afe 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/db/DataColumnSidecarDbAccessorBuilder.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/db/DataColumnSidecarDbAccessorBuilder.java @@ -24,13 +24,15 @@ public class DataColumnSidecarDbAccessorBuilder { // is roughly 600Kb (cache entry for one slot is about 60 bytes) - private static final int DEFAULT_COLUMN_ID_CACHE_MAX_SLOT_COUNT = 10 * 1024; + private static final int DEFAULT_COLUMN_ID_READ_CACHE_MAX_SLOT_COUNT = 10 * 1024; + private static final int DEFAULT_COLUMN_ID_WRITE_CACHE_MAX_COUNT = 3 * 128; private final DataColumnSidecarDB db; private Spec spec; private MinCustodyPeriodSlotCalculator minCustodyPeriodSlotCalculator; private final AutoPruneDbBuilder autoPruneDbBuilder = new AutoPruneDbBuilder(); - private int columnIdCacheMaxSlotCount = DEFAULT_COLUMN_ID_CACHE_MAX_SLOT_COUNT; + private int columnIdReadCacheSlotCount = DEFAULT_COLUMN_ID_READ_CACHE_MAX_SLOT_COUNT; + private int columnIdWriteCacheCount = DEFAULT_COLUMN_ID_WRITE_CACHE_MAX_COUNT; DataColumnSidecarDbAccessorBuilder(final DataColumnSidecarDB db) { this.db = db; @@ -47,9 +49,15 @@ public DataColumnSidecarDbAccessorBuilder minCustodyPeriodSlotCalculator( return this; } - public DataColumnSidecarDbAccessorBuilder columnIdCacheMaxSlotCount( - final int columnIdCacheMaxSlotCount) { - this.columnIdCacheMaxSlotCount = columnIdCacheMaxSlotCount; + public DataColumnSidecarDbAccessorBuilder columnIdCacheSlotCount( + final int columnIdCacheSlotCount) { + this.columnIdReadCacheSlotCount = columnIdCacheSlotCount; + return this; + } + + public DataColumnSidecarDbAccessorBuilder columnIdWriteCacheCount( + final int columnIdWriteCacheCount) { + this.columnIdWriteCacheCount = columnIdWriteCacheCount; return this; } @@ -69,7 +77,11 @@ private int getNumberOfColumnsForSlot(final UInt64 slot) { public DataColumnSidecarDbAccessor build() { ColumnIdCachingDasDb columnIdCachingDasDb = - new ColumnIdCachingDasDb(db, this::getNumberOfColumnsForSlot, columnIdCacheMaxSlotCount); + new ColumnIdCachingDasDb( + db, + this::getNumberOfColumnsForSlot, + columnIdReadCacheSlotCount, + columnIdWriteCacheCount); return autoPruneDbBuilder.build(columnIdCachingDasDb); } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/db/ColumnIdCachingDasDbTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/db/ColumnIdCachingDasDbTest.java index 3f275fd53b6..6443753a85d 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/db/ColumnIdCachingDasDbTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/db/ColumnIdCachingDasDbTest.java @@ -36,17 +36,20 @@ @SuppressWarnings("FutureReturnValueIgnored") public class ColumnIdCachingDasDbTest { - final Spec spec = TestSpecFactory.createMinimalFulu(); - final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, spec); + private final Spec spec = TestSpecFactory.createMinimalFulu(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, spec); - final Duration dbDelay = ofMillis(5); - final StubAsync stubAsync = new StubAsync(); + private final Duration dbDelay = ofMillis(5); + private final StubAsync stubAsync = new StubAsync(); - DataColumnSidecarDBStub db = new DataColumnSidecarDBStub(); - DataColumnSidecarDB asyncDb = new DelayedDasDb(this.db, stubAsync.getStubAsyncRunner(), dbDelay); + private final DataColumnSidecarDBStub db = new DataColumnSidecarDBStub(); + private final DataColumnSidecarDB asyncDb = + new DelayedDasDb(this.db, stubAsync.getStubAsyncRunner(), dbDelay); - final int cacheSize = 2; - ColumnIdCachingDasDb columnIdCachingDb = new ColumnIdCachingDasDb(asyncDb, __ -> 128, cacheSize); + private final int slotReadCacheSize = 2; + private final int sidecarsWriteCacheSize = 2; + private final ColumnIdCachingDasDb columnIdCachingDb = + new ColumnIdCachingDasDb(asyncDb, __ -> 128, slotReadCacheSize, sidecarsWriteCacheSize); private DataColumnSidecar createSidecar(final int slot, final int index) { final UInt64 slotU = UInt64.valueOf(slot); @@ -59,58 +62,99 @@ private DataColumnSidecar createSidecar(final int slot, final int index) { @Test void sanityTest() { - SafeFuture> res1 = + final SafeFuture> res1 = columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777)); assertThat(res1).isNotDone(); stubAsync.advanceTimeGradually(dbDelay); assertThat(res1).isCompletedWithValue(emptyList()); - long reads0 = db.getDbReadCounter().get(); + final long reads0 = db.getDbReadCounter().get(); assertThat(reads0).isGreaterThan(0); - SafeFuture> res2 = + final SafeFuture> res2 = columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777)); assertThat(res2).isCompletedWithValue(emptyList()); - long reads1 = db.getDbReadCounter().get(); + final long reads1 = db.getDbReadCounter().get(); assertThat(reads1).isEqualTo(reads0); } @Test - void checkCacheIsInvalidated() { - SafeFuture> res1 = + void checkReadCacheIsUpdated() { + final SafeFuture> res1 = columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777)); stubAsync.advanceTimeGradually(ofMillis(1)); - SafeFuture addCompleteFuture = columnIdCachingDb.addSidecar(createSidecar(777, 77)); + final SafeFuture addCompleteFuture = columnIdCachingDb.addSidecar(createSidecar(777, 77)); stubAsync.advanceTimeGraduallyUntilAllDone(ofSeconds(1)); assertThat(res1) .isCompleted(); // no assumptions on result: cache may or may not pick up latest changes assertThat(addCompleteFuture).isCompleted(); - long reads0 = db.getDbReadCounter().get(); + final long reads0 = db.getDbReadCounter().get(); assertThat(reads0).isGreaterThan(0); - SafeFuture> res2 = + final SafeFuture> res2 = columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777)); stubAsync.advanceTimeGradually(dbDelay); assertThat(res2).isCompletedWithValueMatching(l -> !l.isEmpty()); } + @Test + void checkWriteCacheIsUsed() { + final SafeFuture> res1 = + columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777)); + + stubAsync.advanceTimeGradually(ofMillis(1)); + + final DataColumnSidecar sidecar = createSidecar(777, 77); + final SafeFuture addCompleteFuture1 = columnIdCachingDb.addSidecar(sidecar); + stubAsync.advanceTimeGraduallyUntilAllDone(ofSeconds(1)); + + assertThat(res1) + .isCompleted(); // no assumptions on result: cache may or may not pick up latest changes + assertThat(addCompleteFuture1).isCompleted(); + final long reads1 = db.getDbReadCounter().get(); + assertThat(reads1).isEqualTo(1); + final long writes1 = db.getDbWriteCounter().get(); + assertThat(writes1).isEqualTo(1); + + final SafeFuture> res2 = + columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777)); + stubAsync.advanceTimeGradually(dbDelay); + + assertThat(res2).isCompletedWithValueMatching(l -> !l.isEmpty()); + + final long reads2 = db.getDbReadCounter().get(); + assertThat(reads2).isEqualTo(2); + final long writes2 = db.getDbWriteCounter().get(); + assertThat(writes2).isEqualTo(writes1); + + // Retry saving the same sidecar, db should not be used + final SafeFuture addCompleteFuture2 = columnIdCachingDb.addSidecar(sidecar); + stubAsync.advanceTimeGraduallyUntilAllDone(ofSeconds(1)); + + assertThat(addCompleteFuture2).isCompleted(); + final long reads3 = db.getDbReadCounter().get(); + assertThat(reads3).isEqualTo(reads2); + final long writes3 = db.getDbWriteCounter().get(); + assertThat(writes3).isEqualTo(writes1); + } + @Test void checkCacheIsPruned() { - for (int i = 0; i < cacheSize + 1; i++) { + for (int i = 0; i < slotReadCacheSize + 1; i++) { columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777 + i)); } stubAsync.advanceTimeGradually(dbDelay); - long reads0 = db.getDbReadCounter().get(); + final long reads0 = db.getDbReadCounter().get(); assertThat(reads0).isGreaterThan(0); columnIdCachingDb.getColumnIdentifiers(UInt64.valueOf(777)); stubAsync.advanceTimeGradually(dbDelay); - long reads1 = db.getDbReadCounter().get(); + final long reads1 = db.getDbReadCounter().get(); // the first cache entry (for slot 777) should be evicted and a query to underlying db should be // done assertThat(reads1).isGreaterThan(reads0);