Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Support gzip encoding for request and response #301

Merged
merged 14 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- No required base classes for events
- Support for both high-level (subscription) and low-level APIs
- Pluggable HTTP client implementations
- Gzip encoding support for publishing and consuming events

## Installation

Expand All @@ -45,8 +46,9 @@ final Listener<SalesOrderPlaced> listener = events -> {
}
};

// Configure client, defaults to using the high level api with ManagedCursorManger and SimpleRequestFactory
final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI)
// Configure client, defaults to using the high level api with ManagedCursorManger,
// using the SimpleRequestFactory without compression
final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY))
.withAccessTokenProvider(new ZignAccessTokenProvider())
.build();

Expand Down Expand Up @@ -98,7 +100,7 @@ final DataSource dataSource = new HikariDataSource(hikariConfig);

final CursorManager cursorManager = new JdbcCursorManager(dataSource, "fahrschein-demo");

final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI)
final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY))
.withAccessTokenProvider(new ZignAccessTokenProvider())
.withCursorManager(cursorManager)
.build();
Expand Down Expand Up @@ -213,10 +215,9 @@ final CloseableHttpClient httpClient = HttpClients.custom()
.setMaxConnPerRoute(2)
.build();

final RequestFactory requestFactory = new HttpComponentsRequestFactory(httpClient);
final RequestFactory requestFactory = new HttpComponentsRequestFactory(httpClient, ContentEncoding.GZIP);

final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI)
.withRequestFactory(requestFactory)
final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, requestFactory)
.withAccessTokenProvider(new ZignAccessTokenProvider())
.build();
```
Expand All @@ -236,17 +237,33 @@ Example using OkHttp 3.x:
```java

final ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory();
final RequestFactory requestFactory = new SpringRequestFactory(clientHttpRequestFactory);
final RequestFactory requestFactory = new SpringRequestFactory(clientHttpRequestFactory, ContentEncoding.GZIP);

final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI)
.withRequestFactory(requestFactory)
final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, requestFactory)
.withAccessTokenProvider(new ZignAccessTokenProvider())
.build();

```

**Note:** The implementations from spring framework don't handle closing of streams as expected. They will try to consume remaining data, which will usually time out when nakadi does not receive a commit.
**Note:** The implementations from the Spring framework don't handle closing of streams as expected. They will try to consume remaining data, which will usually time out when nakadi does not receive a commit.

## Content-Compression

Fahrschein handles content compression transparently to the API consumer, and mostly independently of the actual HTTP
client implementation. Since version `0.20.0` it can be enabled to both compress HTTP POST bodies when event
publishing, and requesting payload compression from Nakadi when consuming events.

### Consuming

For event consumption the underlying HTTP client implementations send `Accept-Encoding` headers, indicating their supported compression algorithms.
At the time of writing, all tested client implementations default to `gzip` compression. If this is undesired, wrap your
RequestFactory into a `IdentityAcceptEncodingRequestFactory`, which sets the `Accept-Encoding` header to `identity`.

### Publishing

For event publishing, the `Request` body can also get gzip-encoded by Fahrschein, if enabled when building the RequestFactory.
For this, you need to pass `ContentEncoding.GZIP`, or if compression is undesired, pass `ContentEncoding.IDENTITY`.
In the future, we may support other encoding formats, like Zstandard.

## Fahrschein compared to other Nakadi client libraries

Expand Down
19 changes: 17 additions & 2 deletions fahrschein-e2e-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,43 @@
<groupId>org.zalando</groupId>
<artifactId>fahrschein</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.zalando</groupId>
<artifactId>fahrschein-http-api</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.zalando</groupId>
<artifactId>fahrschein-http-apache</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.zalando</groupId>
<artifactId>fahrschein-http-spring</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
otrosien marked this conversation as resolved.
Show resolved Hide resolved
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
<version>4.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${version.spring}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
Expand All @@ -63,29 +75,32 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${version.slf4j}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${version.log4j}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${version.log4j}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${version.log4j}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${version.slf4j}</version>
<scope>test</scope>
</dependency>

<!-- testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.io.File;
import java.io.IOException;
Expand All @@ -20,6 +21,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.stream.Stream;

Expand All @@ -42,7 +44,8 @@ public abstract class NakadiTestWithDockerCompose {
.withExposedService("nakadi_postgres_1", 5432)
.withExposedService("zookeeper_1", 2181)
.withExposedService("kafka_1", 9092)
.withExposedService("nakadi_1", 8080)
.withExposedService("nakadi_1", 8080,
Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(180)))
.withLogConsumer("nakadi_1", LOG_CONSUMER);
compose.start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.fahrschein.http;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand Down Expand Up @@ -34,6 +35,9 @@ public abstract class AbstractRequestFactoryTest extends NakadiTestWithDockerCom
objectMapper.registerModule(new Jdk8Module());
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
// After a Nakadi-Docker upgrade, check if the response metadata changed
// by enabling deserialization failure on unknown properties.
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}

Expand All @@ -42,9 +46,8 @@ public abstract class AbstractRequestFactoryTest extends NakadiTestWithDockerCom
@Before
public void setUpNakadiClient() {
nakadiClient = NakadiClient
.builder(getNakadiUrl())
.builder(getNakadiUrl(), getRequestFactory())
.withObjectMapper(objectMapper)
.withRequestFactory(getRequestFactory())
.build();
}

Expand Down Expand Up @@ -87,10 +90,10 @@ public void testSubscribe() throws IOException, EventAlreadyProcessedException {
return;
}));
testPublish();
Mockito.verify(listener, timeout(10000).times(1)).accept(anyList());
Mockito.verify(listener, timeout(10000).atLeastOnce()).accept(anyList());
}

public Listener<OrderEvent> subscriptionListener() {
return Mockito.mock(Listener.class, withSettings().verboseLogging());
return Mockito.mock(Listener.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,17 @@
package org.zalando.fahrschein.http.apache;

import org.zalando.fahrschein.http.AbstractRequestFactoryTest;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.zalando.fahrschein.http.AbstractRequestFactoryTest;
import org.zalando.fahrschein.http.api.ContentEncoding;
import org.zalando.fahrschein.http.api.RequestFactory;

import java.util.concurrent.TimeUnit;

public class ApacheNakadiClientTest extends AbstractRequestFactoryTest {

@Override
protected RequestFactory getRequestFactory() {
final RequestConfig requestConfig = RequestConfig.custom()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simplified the setup of Spring and Apache request factories for the tests, it's not necessarily giving us any benefit.

.setSocketTimeout(60000)
.setConnectTimeout(2000)
.setConnectionRequestTimeout(8000)
.setContentCompressionEnabled(false)
.build();

final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setBufferSize(512)
.build();

final CloseableHttpClient httpClient = HttpClients.custom()
.setDefaultRequestConfig(requestConfig)
.setDefaultConnectionConfig(connectionConfig)
.setConnectionTimeToLive(30, TimeUnit.SECONDS)
.disableAutomaticRetries()
.disableRedirectHandling()
.setMaxConnTotal(8)
.setMaxConnPerRoute(2)
.build();

return new HttpComponentsRequestFactory(httpClient);
final CloseableHttpClient httpClient = HttpClients.createDefault();
return new HttpComponentsRequestFactory(httpClient, ContentEncoding.GZIP);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.zalando.fahrschein.http.apache;

import org.zalando.fahrschein.IdentityAcceptEncodingRequestFactory;
import org.zalando.fahrschein.http.AbstractRequestFactoryTest;
import org.zalando.fahrschein.http.api.ContentEncoding;
import org.zalando.fahrschein.http.api.RequestFactory;

import static org.apache.http.impl.client.HttpClients.*;

public class IdentityEncodingApacheNakadiClientTest extends AbstractRequestFactoryTest {

@Override
protected RequestFactory getRequestFactory() {

return new IdentityAcceptEncodingRequestFactory(new HttpComponentsRequestFactory(createMinimal(), ContentEncoding.IDENTITY));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.zalando.fahrschein.http.simple;

import org.zalando.fahrschein.IdentityAcceptEncodingRequestFactory;
import org.zalando.fahrschein.http.AbstractRequestFactoryTest;
import org.zalando.fahrschein.http.api.ContentEncoding;
import org.zalando.fahrschein.http.api.RequestFactory;

public class IdentityEncodingSimpleNakadiClientTest extends AbstractRequestFactoryTest {
@Override
protected RequestFactory getRequestFactory() {
return new IdentityAcceptEncodingRequestFactory(new SimpleRequestFactory(ContentEncoding.IDENTITY));
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.zalando.fahrschein.http.simple;

import org.zalando.fahrschein.http.AbstractRequestFactoryTest;
import org.zalando.fahrschein.http.api.ContentEncoding;
import org.zalando.fahrschein.http.api.RequestFactory;

public class SimpleNakadiClientTest extends AbstractRequestFactoryTest {
@Override
protected RequestFactory getRequestFactory() {
return new SimpleRequestFactory();
return new SimpleRequestFactory(ContentEncoding.GZIP);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.zalando.fahrschein.http.spring;

import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.zalando.fahrschein.IdentityAcceptEncodingRequestFactory;
import org.zalando.fahrschein.http.AbstractRequestFactoryTest;
import org.zalando.fahrschein.http.api.ContentEncoding;
import org.zalando.fahrschein.http.api.RequestFactory;

import java.util.concurrent.TimeUnit;

public class IdentityEncodingSpringNakadiClientTest extends AbstractRequestFactoryTest {

private static final Logger logger = LoggerFactory.getLogger("okhttp3.wire");
private static final HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(logger::debug);
static {
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.HEADERS);
}

@Override
protected RequestFactory getRequestFactory() {
final OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(loggingInterceptor)
.build();

final OkHttp3ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(client);
return new IdentityAcceptEncodingRequestFactory(new SpringRequestFactory(clientHttpRequestFactory, ContentEncoding.IDENTITY));
}

}
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
package org.zalando.fahrschein.http.spring;

import org.zalando.fahrschein.http.AbstractRequestFactoryTest;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.zalando.fahrschein.http.AbstractRequestFactoryTest;
import org.zalando.fahrschein.http.api.ContentEncoding;
import org.zalando.fahrschein.http.api.RequestFactory;

import java.util.concurrent.TimeUnit;

public class SpringNakadiClientTest extends AbstractRequestFactoryTest {

private static final Logger logger = LoggerFactory.getLogger("okhttp3.wire");
private static final HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(logger::debug);
static {
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.HEADERS);
}

@Override
protected RequestFactory getRequestFactory() {
final OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(60, TimeUnit.SECONDS)
.connectTimeout(2, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(2, 5*60, TimeUnit.SECONDS))
.addInterceptor(loggingInterceptor)
.build();

final OkHttp3ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(client);
return new SpringRequestFactory(clientHttpRequestFactory);
return new SpringRequestFactory(clientHttpRequestFactory, ContentEncoding.GZIP);
}

}
Loading