Skip to content

Commit

Permalink
[fix][client] Use dedicated executor for requests in BinaryProtoLooku…
Browse files Browse the repository at this point in the history
…pService (#23378)

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored Oct 15, 2024
1 parent 5a8341a commit f98297f
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider;
private final Timer brokerClientSharedTimer;
private final ExecutorProvider brokerClientSharedLookupExecutorProvider;

private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
Expand Down Expand Up @@ -388,6 +389,8 @@ public PulsarService(ServiceConfiguration config,
new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
this.brokerClientSharedLookupExecutorProvider =
new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor");

// here in the constructor we don't have the offloader scheduler yet
this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
Expand Down Expand Up @@ -696,6 +699,7 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedScheduledExecutorProvider.shutdownNow();
brokerClientSharedLookupExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();

Expand Down Expand Up @@ -1687,6 +1691,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -58,9 +60,11 @@ public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService executor;
private final ExecutorService scheduleExecutor;
private final String listenerName;
private final int maxLookupRedirects;
private final ExecutorService lookupPinnedExecutor;
private final boolean createdLookupPinnedExecutor;

private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();
Expand All @@ -73,23 +77,43 @@ public class BinaryProtoLookupService implements LookupService {
private final LatencyHistogram histoGetSchema;
private final LatencyHistogram histoListTopics;

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, scheduleExecutor);
}

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, executor);
this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null);
}

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor,
ExecutorService lookupPinnedExecutor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
this.executor = executor;
this.scheduleExecutor = scheduleExecutor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
Expand All @@ -103,6 +127,15 @@ public BinaryProtoLookupService(PulsarClientImpl client,
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build());
histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());

if (lookupPinnedExecutor == null) {
this.createdLookupPinnedExecutor = true;
this.lookupPinnedExecutor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
} else {
this.createdLookupPinnedExecutor = false;
this.lookupPinnedExecutor = lookupPinnedExecutor;
}
}

@Override
Expand Down Expand Up @@ -180,7 +213,7 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
return addressFuture;
}

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId,
properties);
Expand Down Expand Up @@ -247,7 +280,7 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, lookupPinnedExecutor).exceptionally(connectionException -> {
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -260,7 +293,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
long startTime = System.nanoTime();
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
if (useFallbackForNonPIP344Brokers) {
Expand Down Expand Up @@ -301,7 +334,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, lookupPinnedExecutor).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -324,7 +357,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
return schemaFuture;
}
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
Expand All @@ -340,7 +373,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
}, lookupPinnedExecutor).exceptionally(ex -> {
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
Expand Down Expand Up @@ -385,7 +418,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
String topicsHash) {
long startTime = System.nanoTime();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern, topicsHash);
Expand All @@ -404,7 +437,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally((e) -> {
}, lookupPinnedExecutor).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
getTopicsResultFuture.completeExceptionally(
Expand All @@ -414,7 +447,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
return null;
}

((ScheduledExecutorService) executor).schedule(() -> {
((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
Expand All @@ -428,7 +461,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,

@Override
public void close() throws Exception {
// no-op
if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) {
lookupPinnedExecutor.shutdown();
}
}

public static class LookupDataResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class PulsarClientImpl implements PulsarClient {
private final boolean createdExecutorProviders;

private final boolean createdScheduledProviders;
private final boolean createdLookupProviders;
private LookupService lookup;
private Map<String, LookupService> urlLookupMap = new ConcurrentHashMap<>();
private final ConnectionPool cnxPool;
Expand All @@ -121,6 +122,7 @@ public class PulsarClientImpl implements PulsarClient {
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
private final ExecutorProvider lookupExecutorProvider;

private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
Expand Down Expand Up @@ -163,29 +165,39 @@ public SchemaInfoProvider load(String topicName) {
private TransactionCoordinatorClientImpl tcClient;

public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, null, null, null, null, null, null);
this(conf, null, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, null, null, null, null, null);
this(conf, eventLoopGroup, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, null, null, null, null);
this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider)
throws PulsarClientException {
this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider,
scheduledExecutorProvider, null);
}

@Builder(builderClassName = "PulsarClientImplBuilder")
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
ScheduledExecutorProvider scheduledExecutorProvider,
ExecutorProvider lookupExecutorProvider) throws PulsarClientException {

EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
Expand All @@ -198,6 +210,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
}
this.createdExecutorProviders = externalExecutorProvider == null;
this.createdScheduledProviders = scheduledExecutorProvider == null;
this.createdLookupProviders = lookupExecutorProvider == null;
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
if (conf == null || isBlank(conf.getServiceUrl())) {
Expand All @@ -218,11 +231,14 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider :
new ExecutorProvider(1, "pulsar-client-lookup");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(),
this.lookupExecutorProvider.getExecutor());
}
if (timer == null) {
this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -976,6 +992,16 @@ private void shutdownExecutors() throws PulsarClientException {
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) {
try {
lookupExecutorProvider.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown lookupExecutorProvider", t);
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (pulsarClientException != null) {
throw pulsarClientException;
}
Expand Down
Loading

0 comments on commit f98297f

Please sign in to comment.