Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][ml] Support Bookkeeper batch read #23180

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class ManagedLedgerConfig {
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
private boolean triggerOffloadOnTopicLoad = false;
private boolean isUseBookkeeperV2WireProtocol = false;

@Getter
@Setter
Expand Down Expand Up @@ -769,5 +770,13 @@ public String getShadowSource() {
return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
}

public void setUseBookkeeperV2WireProtocol(boolean isUseBookkeeperV2WireProtocol) {
this.isUseBookkeeperV2WireProtocol = isUseBookkeeperV2WireProtocol;
}

public boolean isUseBookkeeperV2WireProtocol() {
return isUseBookkeeperV2WireProtocol;
}

public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
public class EntryCacheDisabled implements EntryCache {
private final ManagedLedgerImpl ml;
private final ManagedLedgerInterceptor interceptor;
private final boolean useBookkeeperV2WireProtocol;

public EntryCacheDisabled(ManagedLedgerImpl ml) {
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
this.useBookkeeperV2WireProtocol = ml.getConfig().isUseBookkeeperV2WireProtocol();
}

@Override
Expand Down Expand Up @@ -79,8 +81,8 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) {
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, useBookkeeperV2WireProtocol)
.thenAcceptAsync(ledgerEntries -> {
List<Entry> entries = new ArrayList<>();
long totalSize = 0;
try {
Expand All @@ -98,17 +100,18 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
ml.getMbean().addReadEntriesSample(entries.size(), totalSize);

callback.readEntriesComplete(entries, ctx);
}, ml.getExecutor()).exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
}, ml.getExecutor())
.exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
}

@Override
public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), useBookkeeperV2WireProtocol)
.whenCompleteAsync((ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class RangeEntryCacheImpl implements EntryCache {
private final RangeCache<Position, EntryImpl> entries;
private final boolean copyEntries;
private final PendingReadsManager pendingReadsManager;
private final boolean useBookkeeperV2WireProtocol;

private volatile long estimatedEntrySize = 10 * 1024;

Expand All @@ -80,6 +81,7 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl
this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds();
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
this.copyEntries = copyEntries;
this.useBookkeeperV2WireProtocol = ml.getConfig().isUseBookkeeperV2WireProtocol();

if (log.isDebugEnabled()) {
log.debug("[{}] Initialized managed-ledger entry cache", ml.getName());
Expand Down Expand Up @@ -249,8 +251,8 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
} else {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ledgerEntries -> {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), useBookkeeperV2WireProtocol)
.thenAcceptAsync(ledgerEntries -> {
try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
Expand All @@ -264,17 +266,18 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
} else {
// got an empty sequence
callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
ctx);
ctx);
}
} finally {
ledgerEntries.close();
}
}, ml.getExecutor()).exceptionally(exception -> {
}, ml.getExecutor())
.exceptionally(exception -> {
ml.invalidateLedgerHandle(lh);
pendingReadsManager.invalidateLedger(lh.getId());
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return null;
});
});
}
}

Expand Down Expand Up @@ -429,7 +432,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils
.readAsync(ml, lh, firstEntry, lastEntry, useBookkeeperV2WireProtocol)
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,32 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;

class ReadEntryUtils {
@VisibleForTesting
public class ReadEntryUtils {

static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry) {
@VisibleForTesting
public static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry, boolean useBookkeeperV2WireProtocol) {
if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) {
// The read handle comes from another managed ledger, in this case, we can only compare the entry range with
// the LAC of that read handle. Specifically, it happens when this method is called by a
// ReadOnlyManagedLedgerImpl object.
return handle.readAsync(firstEntry, lastEntry);
int entriesToRead = (int) (lastEntry - firstEntry + 1);
// Batch read is not supported for striped ledgers.
LedgerMetadata m = handle.getLedgerMetadata();
boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be made clear in the document that enabling batch does not necessarily mean batch reading.

if (!useBatchRead(entriesToRead, useBookkeeperV2WireProtocol, isStriped)) {
return handle.readAsync(firstEntry, lastEntry);
}
return handle.batchReadAsync(firstEntry, entriesToRead, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If maxSize = 0, info log will be printed for each call, which will affect performance.
https://github.com/apache/bookkeeper/blob/7c41204506122ed6904289f4814d4130274874aa/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L955-L967

private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
            boolean isRecoveryRead) {
        int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes;
        if (maxSize > nettyMaxFrameSizeBytes) {
            LOG.info(
                "The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.",
                nettyMaxFrameSizeBytes);
            maxSize = nettyMaxFrameSizeBytes;
        }
        if (maxSize <= 0) {
            LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes);
            maxSize = nettyMaxFrameSizeBytes;
        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If isAutoSkipNonRecoverableData is set to true, is the batch read failure consistent with the behavior here?

} else if (cursor.getConfig().isAutoSkipNonRecoverableData()
&& exception instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();
Position nexReadPosition;
Long lostLedger = null;
if (exception instanceof ManagedLedgerException.LedgerNotExistException) {
// try to find and move to next valid ledger
nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId());
lostLedger = readPosition.getLedgerId();
} else {
// Skip this read operation
nexReadPosition = ledger.getValidPositionAfterSkippedEntries(readPosition, count);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If maxSize = 0, info log will be printed for each call, which will affect performance. https://github.com/apache/bookkeeper/blob/7c41204506122ed6904289f4814d4130274874aa/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L955-L967

private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
            boolean isRecoveryRead) {
        int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes;
        if (maxSize > nettyMaxFrameSizeBytes) {
            LOG.info(
                "The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.",
                nettyMaxFrameSizeBytes);
            maxSize = nettyMaxFrameSizeBytes;
        }
        if (maxSize <= 0) {
            LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes);
            maxSize = nettyMaxFrameSizeBytes;
        }

apache/bookkeeper#4485 I created a PR to change the log level to debug

}
// Compare the entry range with the lastConfirmedEntry maintained by the managed ledger because the entry cache
// of `ShadowManagedLedgerImpl` reads entries via `ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed`
Expand All @@ -51,4 +62,8 @@ static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle h
}
return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}

private static boolean useBatchRead(int entriesToRead, boolean useBookkeeperV2WireProtocol, boolean isStriped) {
return entriesToRead > 1 && useBookkeeperV2WireProtocol && !isStriped;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
package org.apache.bookkeeper.mledger.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -85,15 +81,9 @@
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.*;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
Expand Down Expand Up @@ -128,6 +118,7 @@
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.ReadEntryUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
Expand All @@ -148,7 +139,9 @@
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -4372,4 +4365,36 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry)
assertEquals(ml.currentLedgerEntries, 0);
});
}

@Test
public void testBatchReadExceedMaxFrameLength() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setUseBookkeeperV2WireProtocol(true);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testBatchReadExceedMaxFrameLength", config);
ml = Mockito.spy(ml);
Mockito.doReturn(Optional.empty()).when(ml).getOptionalLedgerInfo(Mockito.anyLong());
ManagedCursor cursor = ml.openCursor("c1");
// 1 MB per entry.
byte[] body = new byte[1024 * 1024];
for (int i = 0; i < 20; i++) {
ml.addEntry(body);
}

@Cleanup
MockedStatic<ReadEntryUtils> mockStatic = Mockito.mockStatic(ReadEntryUtils.class);
mockStatic.when(() ->
ReadEntryUtils.readAsync(any(), any(), anyLong(), anyLong(),
anyBoolean()))
.thenAnswer(inv -> {
ReadHandle handle = inv.getArgument(1);
int firstEntry = inv.getArgument(2);
int lastEntry = inv.getArgument(3);
int entriesToRead = lastEntry - firstEntry + 1;
return handle.batchReadAsync(firstEntry, entriesToRead, 0);
});

List<Entry> entries = cursor.readEntries(20);
Assert.assertEquals(entries.size(), 20);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
return CompletableFuture.completedFuture(LedgerEntriesImpl.create(readEntries));
}

@Override
public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long maxSize) {
long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed());
return readAsync(startEntry, lastEntry);
}

@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean bookkeeperClientSeparatedIoThreadsEnabled = false;

@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "Enable Bookkeeper client to read entries in batch mode. Default is false."
)
private boolean bookkeeperEnableBatchRead = false;

/**** --- Managed Ledger. --- ****/
@FieldContext(
minValue = 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ ClientConfiguration createBkClientConfiguration(MetadataStoreExtended store, Ser
bkConf.setDiskWeightBasedPlacementEnabled(conf.isBookkeeperDiskWeightBasedPlacementEnabled());
bkConf.setMetadataServiceUri(conf.getBookkeeperMetadataStoreUrl());
bkConf.setLimitStatsLogging(conf.isBookkeeperClientLimitStatsLogging());
bkConf.setBatchReadEnabled(conf.isBookkeeperEnableBatchRead());

if (!conf.isBookkeeperMetadataStoreSeparated()) {
// If we're connecting to the same metadata service, with same config, then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,7 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
managedLedgerConfig.setUseBookkeeperV2WireProtocol(serviceConfig.isBookkeeperUseV2WireProtocol());

OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,24 @@ public void testBookKeeperLimitStatsLoggingConfiguration() throws Exception {
(ClientConfiguration) FieldUtils.readField(builder, "conf", true);
assertFalse(clientConfiguration.getLimitStatsLogging());
}

@Test
public void testBookkeeperBatchReadConfig() throws Exception {
BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
ServiceConfiguration conf = new ServiceConfiguration();
conf.setBookkeeperEnableBatchRead(true);
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class),
factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf));
ClientConfiguration clientConfiguration =
(ClientConfiguration) FieldUtils.readField(builder, "conf", true);
assertTrue(clientConfiguration.isBatchReadEnabled());

conf.setBookkeeperEnableBatchRead(false);
builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class),
factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf));
clientConfiguration =
(ClientConfiguration) FieldUtils.readField(builder, "conf", true);
assertFalse(clientConfiguration.isBatchReadEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
return readHandle.readAsync(firstEntry, lastEntry);
}

@Override
public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long maxSize) {
return readHandle.batchReadAsync(startEntry, maxCount, maxSize);
}

@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readHandle.readUnconfirmedAsync(firstEntry, lastEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.mutable.MutableInt;

/**
* Mock implementation of ReadHandle.
Expand All @@ -40,6 +41,7 @@ class PulsarMockReadHandle implements ReadHandle {
private final long ledgerId;
private final LedgerMetadata metadata;
private final List<LedgerEntryImpl> entries;
private final int maxFrameSize = 5 * 1024 * 1024;

PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata,
List<LedgerEntryImpl> entries) {
Expand All @@ -64,6 +66,26 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
});
}

@Override
public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long _maxSize) {
long maxSize = _maxSize > 0 ? _maxSize : maxFrameSize;
long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed());
MutableInt size = new MutableInt(0);
return bk.getProgrammedFailure().thenComposeAsync(res -> {
List<LedgerEntry> seq = new ArrayList<>();
long entryId = startEntry;
while (entryId <= lastEntry && entryId < entries.size()) {
LedgerEntry entry = entries.get((int) entryId++).duplicate();
if (size.addAndGet(entry.getLength()) > maxSize) {
break;
}
seq.add(entry);
}
log.debug("Entries batch read: {}", seq);
return FutureUtils.value(LedgerEntriesImpl.create(seq));
});
}

@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
return promise;
}

@Override
public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long maxSize) {
long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed());
return readAsync(startEntry, lastEntry);
}

@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry);
Expand Down
Loading
Loading