Skip to content

Commit

Permalink
[improve] Refactored BK ClientFactory to return futures (#22853)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Jun 6, 2024
1 parent 3e2ca29 commit 217f1f0
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, (policyConfig) -> bookKeeper, config);
this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper), config);
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
Expand Down Expand Up @@ -232,8 +232,8 @@ public DefaultBkFactory(ClientConfiguration bkClientConfiguration)
}

@Override
public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
return bkClient;
public CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig policy) {
return CompletableFuture.completedFuture(bkClient);
}
}

Expand Down Expand Up @@ -377,52 +377,59 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
ledgers.computeIfAbsent(name, (mlName) -> {
// Create the managed ledger
CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>();
BookKeeper bk = bookkeeperFactory.get(
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()));
final ManagedLedgerImpl newledger = config.getShadowSource() == null
? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker)
: new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
public void initializeComplete() {
log.info("[{}] Successfully initialize managed ledger", name);
pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);
bookkeeperFactory.get(
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()))
.thenAccept(bk -> {
final ManagedLedgerImpl newledger = config.getShadowSource() == null
? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
mlOwnershipChecker)
: new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
public void initializeComplete() {
log.info("[{}] Successfully initialize managed ledger", name);
pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
}
// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
}

@Override
public void initializeFailed(ManagedLedgerException e) {
if (config.isCreateIfMissing()) {
log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage());
}
@Override
public void initializeFailed(ManagedLedgerException e) {
if (config.isCreateIfMissing()) {
log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage());
}

// Clean the map if initialization fails
ledgers.remove(name, future);
// Clean the map if initialization fails
ledgers.remove(name, future);

if (pendingInitializeLedgers.remove(name, pendingLedger)) {
pendingLedger.ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
// no-op
}
if (pendingInitializeLedgers.remove(name, pendingLedger)) {
pendingLedger.ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
// no-op
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to a pending initialization managed ledger", name, exception);
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to a pending initialization managed ledger", name,
exception);
}
}, null);
}

future.completeExceptionally(e);
}
}, null);
}

future.completeExceptionally(e);
}
}, null);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}).thenAccept(ml -> callback.openLedgerComplete(ml, ctx)).exceptionally(exception -> {
callback.openLedgerFailed((ManagedLedgerException) exception.getCause(), ctx);
Expand All @@ -438,20 +445,22 @@ public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
callback.openReadOnlyManagedLedgerFailed(
new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
}
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);
roManagedLedger.initialize().thenRun(() -> {
log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);

}).exceptionally(e -> {
log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx);
return null;
});

bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()))
.thenCompose(bk -> {
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this, bk,
store, config, scheduledExecutor, managedLedgerName);
return roManagedLedger.initialize().thenApply(v -> roManagedLedger);
}).thenAccept(roManagedLedger -> {
log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
}).exceptionally(e -> {
log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx);
return null;
});
}

@Override
Expand Down Expand Up @@ -573,49 +582,35 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
ledgerFuture.completeExceptionally(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
}
}
CompletableFuture<Void> bookkeeperFuture = new CompletableFuture<>();
futures.add(bookkeeperFuture);
futures.add(CompletableFuture.runAsync(() -> {
if (isBookkeeperManaged) {
try {
BookKeeper bookkeeper = bookkeeperFactory.get();
if (bookkeeper != null) {
bookkeeper.close();
}
bookkeeperFuture.complete(null);
} catch (Throwable throwable) {
bookkeeperFuture.completeExceptionally(throwable);
}
} else {
bookkeeperFuture.complete(null);
}
if (!ledgers.isEmpty()) {
log.info("Force closing {} ledgers.", ledgers.size());
//make sure all callbacks is called.
ledgers.forEach(((ledgerName, ledgerFuture) -> {
if (!ledgerFuture.isDone()) {
ledgerFuture.completeExceptionally(
new ManagedLedgerException.ManagedLedgerFactoryClosedException());
} else {
ManagedLedgerImpl managedLedger = ledgerFuture.getNow(null);
if (managedLedger == null) {
return;
}
try {
managedLedger.close();
} catch (Throwable throwable) {
log.warn("[{}] Got exception when closing managed ledger: {}", managedLedger.getName(),
throwable);
CompletableFuture<BookKeeper> bookkeeperFuture = isBookkeeperManaged
? bookkeeperFactory.get()
: CompletableFuture.completedFuture(null);
return bookkeeperFuture
.thenRun(() -> {
log.info("Closing {} ledgers.", ledgers.size());
//make sure all callbacks is called.
ledgers.forEach(((ledgerName, ledgerFuture) -> {
if (!ledgerFuture.isDone()) {
ledgerFuture.completeExceptionally(
new ManagedLedgerException.ManagedLedgerFactoryClosedException());
} else {
ManagedLedgerImpl managedLedger = ledgerFuture.getNow(null);
if (managedLedger == null) {
return;
}
try {
managedLedger.close();
} catch (Throwable throwable) {
log.warn("[{}] Got exception when closing managed ledger: {}", managedLedger.getName(),
throwable);
}
}
}
}));
}
}));
return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
});
}));
}).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
});
}

@Override
Expand Down Expand Up @@ -856,14 +851,14 @@ void deleteManagedLedger(String managedLedgerName, CompletableFuture<ManagedLedg
asyncGetManagedLedgerInfo(managedLedgerName, new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
BookKeeper bkc = getBookKeeper();

// First delete all cursors resources
List<CompletableFuture<Void>> futures = info.cursors.entrySet().stream()
.map(e -> deleteCursor(bkc, managedLedgerName, e.getKey(), e.getValue()))
.collect(Collectors.toList());
Futures.waitForAll(futures).thenRun(() -> {
deleteManagedLedgerData(bkc, managedLedgerName, info, mlConfigFuture, callback, ctx);
getBookKeeper().thenCompose(bk -> {
// First delete all cursors resources
List<CompletableFuture<Void>> futures = info.cursors.entrySet().stream()
.map(e -> deleteCursor(bk, managedLedgerName, e.getKey(), e.getValue()))
.collect(Collectors.toList());
return Futures.waitForAll(futures).thenApply(v -> bk);
}).thenAccept(bk -> {
deleteManagedLedgerData(bk, managedLedgerName, info, mlConfigFuture, callback, ctx);
}).exceptionally(ex -> {
callback.deleteLedgerFailed(new ManagedLedgerException(ex), ctx);
return null;
Expand Down Expand Up @@ -1048,7 +1043,7 @@ public ManagedLedgerFactoryMXBean getCacheStats() {
return this.mbean;
}

public BookKeeper getBookKeeper() {
public CompletableFuture<BookKeeper> getBookKeeper() {
return bookkeeperFactory.get();
}

Expand All @@ -1057,7 +1052,7 @@ public BookKeeper getBookKeeper() {
*
*/
public interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
default BookKeeper get() {
default CompletableFuture<BookKeeper> get() {
return get(null);
}

Expand All @@ -1068,7 +1063,7 @@ default BookKeeper get() {
* @param ensemblePlacementPolicyMetadata
* @return
*/
BookKeeper get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata);
CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata);
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicN
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();

final CountDownLatch mlMetaCounter = new CountDownLatch(1);

store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */,
Expand Down Expand Up @@ -180,12 +180,16 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat)
if (log.isDebugEnabled()) {
log.debug("[{}] Opening ledger {}", managedLedgerName, id);
}
try {
bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null);
} catch (Exception e) {
log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, e);
mlMetaCounter.countDown();
}

factory.getBookKeeper()
.thenAccept(bk -> {
bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, ex);
opencb.openComplete(-1, null, null);
mlMetaCounter.countDown();
return null;
});
} else {
log.warn("[{}] Ledger list empty", managedLedgerName);
mlMetaCounter.countDown();
Expand Down Expand Up @@ -217,7 +221,7 @@ private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, fin
}
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
BookKeeper bk = factory.getBookKeeper().get();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.pulsar.broker;

import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.stats.StatsLogger;
Expand All @@ -31,13 +31,16 @@
* Provider of a new BookKeeper client instance.
*/
public interface BookKeeperClientFactory {
BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;
CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> policyClass,
Map<String, Object> ensemblePlacementPolicyProperties);

CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> policyClass,
Map<String, Object> ensemblePlacementPolicyProperties,
StatsLogger statsLogger);

BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties,
StatsLogger statsLogger) throws IOException;
void close();
}
Loading

0 comments on commit 217f1f0

Please sign in to comment.