Skip to content

Commit

Permalink
[feat][admin] Enable Gzip Compression by Default in Admin Client (#22464
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lhotari authored Apr 12, 2024
1 parent 4a5400f commit 7984cc2
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,13 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map<String, String
*/
PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader);

}
/**
* Determines whether to include the "Accept-Encoding: gzip" header in HTTP requests.
* By default, the "Accept-Encoding: gzip" header is included in HTTP requests.
* If this is set to false, the "Accept-Encoding: gzip" header will not be included in the requests.
*
* @param acceptGzipCompression A flag that indicates whether to include the "Accept-Encoding: gzip" header in HTTP
* requests
*/
PulsarAdminBuilder acceptGzipCompression(boolean acceptGzipCompression);
}
7 changes: 7 additions & 0 deletions pulsar-client-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>${wiremock.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
protected ClientConfigurationData conf;

private ClassLoader clientBuilderClassLoader = null;
private boolean acceptGzipCompression = true;

@Override
public PulsarAdmin build() throws PulsarClientException {
return new PulsarAdminImpl(conf.getServiceUrl(), conf, clientBuilderClassLoader);
return new PulsarAdminImpl(conf.getServiceUrl(), conf, clientBuilderClassLoader, acceptGzipCompression);
}

public PulsarAdminBuilderImpl() {
Expand All @@ -54,13 +55,24 @@ private PulsarAdminBuilderImpl(ClientConfigurationData conf) {

@Override
public PulsarAdminBuilder clone() {
return new PulsarAdminBuilderImpl(conf.clone());
PulsarAdminBuilderImpl pulsarAdminBuilder = new PulsarAdminBuilderImpl(conf.clone());
pulsarAdminBuilder.clientBuilderClassLoader = clientBuilderClassLoader;
pulsarAdminBuilder.acceptGzipCompression = acceptGzipCompression;
return pulsarAdminBuilder;
}

@Override
public PulsarAdminBuilder loadConf(Map<String, Object> config) {
conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class);
setAuthenticationFromPropsIfAvailable(conf);
if (config.containsKey("acceptGzipCompression")) {
Object acceptGzipCompressionObj = config.get("acceptGzipCompression");
if (acceptGzipCompressionObj instanceof Boolean) {
acceptGzipCompression = (Boolean) acceptGzipCompressionObj;
} else {
acceptGzipCompression = Boolean.parseBoolean(acceptGzipCompressionObj.toString());
}
}
return this;
}

Expand Down Expand Up @@ -227,4 +239,10 @@ public PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLo
this.clientBuilderClassLoader = clientBuilderClassLoader;
return this;
}

@Override
public PulsarAdminBuilder acceptGzipCompression(boolean acceptGzipCompression) {
this.acceptGzipCompression = acceptGzipCompression;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public class PulsarAdminImpl implements PulsarAdmin {

public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigData,
ClassLoader clientBuilderClassLoader) throws PulsarClientException {
this(serviceUrl, clientConfigData, clientBuilderClassLoader, true);
}

public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigData,
ClassLoader clientBuilderClassLoader, boolean acceptGzipCompression)
throws PulsarClientException {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service URL needs to be specified");

this.clientConfigData = clientConfigData;
Expand All @@ -119,7 +125,7 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa
}

AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData,
clientConfigData.getAutoCertRefreshSeconds());
clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression);

ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,23 @@ public class AsyncHttpConnector implements Connector {
private final PulsarServiceNameResolver serviceNameResolver;
private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
new DefaultThreadFactory("delayer"));
private final boolean acceptGzipCompression;

public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds) {
public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds,
boolean acceptGzipCompression) {
this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
(int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000,
autoCertRefreshTimeSeconds,
conf);
conf, acceptGzipCompression);
}

@SneakyThrows
public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs,
int requestTimeoutMs,
int autoCertRefreshTimeSeconds, ClientConfigurationData conf) {
int autoCertRefreshTimeSeconds, ClientConfigurationData conf,
boolean acceptGzipCompression) {
this.acceptGzipCompression = acceptGzipCompression;
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
Expand Down Expand Up @@ -339,6 +343,10 @@ private CompletableFuture<Response> oneShot(InetSocketAddress host, ClientReques
}
});

if (acceptGzipCompression) {
builder.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
}

return builder.execute().toCompletableFuture();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider {
private final ClientConfigurationData conf;
private Connector connector;
private final int autoCertRefreshTimeSeconds;
private final boolean acceptGzipCompression;

public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefreshTimeSeconds) {
public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefreshTimeSeconds,
boolean acceptGzipCompression) {
this.conf = conf;
this.autoCertRefreshTimeSeconds = autoCertRefreshTimeSeconds;
this.acceptGzipCompression = acceptGzipCompression;
}

@Override
public Connector getConnector(Client client, Configuration runtimeConfig) {
if (connector == null) {
connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds);
connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds, acceptGzipCompression);
}
return connector;
}
Expand All @@ -50,6 +53,6 @@ public Connector getConnector(Client client, Configuration runtimeConfig) {
public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs,
int autoCertRefreshTimeSeconds) {
return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, autoCertRefreshTimeSeconds,
conf);
conf, acceptGzipCompression);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.absent;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.testng.Assert.assertEquals;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.zip.GZIPOutputStream;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class PulsarAdminGzipTest {
WireMockServer server;

@BeforeClass(alwaysRun = true)
void beforeClass() throws IOException {
server = new WireMockServer(WireMockConfiguration.wireMockConfig()
.port(0));
server.start();
}

@AfterClass(alwaysRun = true)
void afterClass() {
if (server != null) {
server.stop();
}
}

static byte[] gzipContent(String content) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try(GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
gzipOutputStream.write(content.getBytes(StandardCharsets.UTF_8));
}
return byteArrayOutputStream.toByteArray();
}

@AfterMethod
void resetAllMocks() {
server.resetAll();
}

@Test
public void testGzipRequestedGzipResponse() throws Exception {
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.withHeader("Accept-Encoding", equalTo("gzip"))
.willReturn(aResponse()
.withHeader("Content-Type", "application/json")
.withHeader("Content-Encoding", "gzip")
.withBody(gzipContent("[\"gzip-test\", \"gzip-test2\"]"))));

@Cleanup
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:" + server.port())
.acceptGzipCompression(true)
.build();

assertEquals(admin.clusters().getClusters(), Arrays.asList("gzip-test", "gzip-test2"));
}

@Test
public void testGzipRequestedNoGzipResponse() throws Exception {
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.withHeader("Accept-Encoding", equalTo("gzip"))
.willReturn(aResponse()
.withHeader("Content-Type", "application/json")
.withBody("[\"test\", \"test2\"]")));

@Cleanup
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:" + server.port())
.acceptGzipCompression(true)
.build();

assertEquals(admin.clusters().getClusters(), Arrays.asList("test", "test2"));
}

@Test
public void testNoGzipRequestedNoGzipResponse() throws Exception {
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.withHeader("Accept-Encoding", absent())
.willReturn(aResponse()
.withHeader("Content-Type", "application/json")
.withBody("[\"test\", \"test2\"]")));

@Cleanup
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:" + server.port())
.acceptGzipCompression(false)
.build();

assertEquals(admin.clusters().getClusters(), Arrays.asList("test", "test2"));
}
}

0 comments on commit 7984cc2

Please sign in to comment.