Skip to content

Commit

Permalink
[improve][client] Implement HTTP client using javax.ws.rs
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Jan 28, 2025
1 parent 52e8730 commit d69ab83
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.Packages;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.packages.management.core.common.PackageName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.http.AsyncHttpConnector;
import org.apache.pulsar.client.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.common.net.ServiceURI;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
Expand Down Expand Up @@ -124,8 +125,10 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa
clientConfigData.setServiceUrl(serviceUrl);
}

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(serviceUrl);
AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData,
clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression);
clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression, pulsarServiceNameResolver);

ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Sink;
import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Source;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal.http;
package org.apache.pulsar.client.internal.http;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
Expand Down Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
import org.asynchttpclient.Request;
Expand Down Expand Up @@ -174,9 +176,11 @@ public void testShouldStopRetriesWhenTimeoutOccurs() throws IOException, Executi
Executor delayedExecutor = runnable -> {
scheduledExecutor.schedule(runnable, requestTimeout, TimeUnit.MILLISECONDS);
};
PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, requestTimeout,
requestTimeout, 0, conf, false) {
requestTimeout, 0, conf, false, pulsarServiceNameResolver) {
@Override
protected CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) {
// delay the response to simulate a timeout
Expand Down Expand Up @@ -214,7 +218,7 @@ public void failure(Throwable failure) {
}

@Test
void testMaxRedirects() {
void testMaxRedirects() throws InvalidServiceURL {
// Redirect to itself to test max redirects
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.willReturn(aResponse()
Expand All @@ -224,9 +228,11 @@ void testMaxRedirects() {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("GET")
.setUrl("http://localhost:" + server.port() + "/admin/v2/clusters")
Expand All @@ -243,21 +249,21 @@ void testMaxRedirects() {
}

@Test
void testRelativeRedirect() throws ExecutionException, InterruptedException {
void testRelativeRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("path2");
}

@Test
void testAbsoluteRedirect() throws ExecutionException, InterruptedException {
void testAbsoluteRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("/path2");
}

@Test
void testUrlRedirect() throws ExecutionException, InterruptedException {
void testUrlRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("http://localhost:" + server.port() + "/path2");
}

private void doTestRedirect(String location) throws InterruptedException, ExecutionException {
private void doTestRedirect(String location) throws InterruptedException, ExecutionException, InvalidServiceURL {
server.stubFor(get(urlEqualTo("/path1"))
.willReturn(aResponse()
.withStatus(301)
Expand All @@ -270,9 +276,11 @@ private void doTestRedirect(String location) throws InterruptedException, Execut
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("GET")
.setUrl("http://localhost:" + server.port() + "/path1")
Expand All @@ -283,7 +291,7 @@ private void doTestRedirect(String location) throws InterruptedException, Execut
}

@Test
void testRedirectWithBody() throws ExecutionException, InterruptedException {
void testRedirectWithBody() throws ExecutionException, InterruptedException, InvalidServiceURL {
server.stubFor(post(urlEqualTo("/path1"))
.willReturn(aResponse()
.withStatus(307)
Expand All @@ -296,9 +304,12 @@ void testRedirectWithBody() throws ExecutionException, InterruptedException {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);


Request request = new RequestBuilder("POST")
.setUrl("http://localhost:" + server.port() + "/path1")
Expand All @@ -310,7 +321,7 @@ void testRedirectWithBody() throws ExecutionException, InterruptedException {
}

@Test
void testMaxConnections() throws ExecutionException, InterruptedException {
void testMaxConnections() throws ExecutionException, InterruptedException, InvalidServiceURL {
server.stubFor(post(urlEqualTo("/concurrency-test"))
.willReturn(aResponse()
.withTransformers("concurrency-test")));
Expand All @@ -320,9 +331,11 @@ void testMaxConnections() throws ExecutionException, InterruptedException {
conf.setConnectionsPerBroker(maxConnections);
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("POST")
.setUrl("http://localhost:" + server.port() + "/concurrency-test")
Expand Down
22 changes: 22 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
</dependency>

<!-- Testing dependencies -->
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down Expand Up @@ -217,6 +232,13 @@
<artifactId>fastutil</artifactId>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>${wiremock.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Loading

0 comments on commit d69ab83

Please sign in to comment.