diff --git a/README.md b/README.md index 70d03e43..140db9c6 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ - No required base classes for events - Support for both high-level (subscription) and low-level APIs - Pluggable HTTP client implementations + - Gzip encoding support for publishing and consuming events ## Installation @@ -45,8 +46,9 @@ final Listener listener = events -> { } }; -// Configure client, defaults to using the high level api with ManagedCursorManger and SimpleRequestFactory -final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) +// Configure client, defaults to using the high level api with ManagedCursorManger, +// using the SimpleRequestFactory without compression +final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY)) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); @@ -98,7 +100,7 @@ final DataSource dataSource = new HikariDataSource(hikariConfig); final CursorManager cursorManager = new JdbcCursorManager(dataSource, "fahrschein-demo"); -final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) +final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY)) .withAccessTokenProvider(new ZignAccessTokenProvider()) .withCursorManager(cursorManager) .build(); @@ -213,10 +215,9 @@ final CloseableHttpClient httpClient = HttpClients.custom() .setMaxConnPerRoute(2) .build(); -final RequestFactory requestFactory = new HttpComponentsRequestFactory(httpClient); +final RequestFactory requestFactory = new HttpComponentsRequestFactory(httpClient, ContentEncoding.GZIP); -final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) - .withRequestFactory(requestFactory) +final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, requestFactory) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); ``` @@ -236,17 +237,33 @@ Example using OkHttp 3.x: ```java final ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(); -final RequestFactory requestFactory = new SpringRequestFactory(clientHttpRequestFactory); +final RequestFactory requestFactory = new SpringRequestFactory(clientHttpRequestFactory, ContentEncoding.GZIP); -final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) - .withRequestFactory(requestFactory) +final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, requestFactory) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); ``` -**Note:** The implementations from spring framework don't handle closing of streams as expected. They will try to consume remaining data, which will usually time out when nakadi does not receive a commit. +**Note:** The implementations from the Spring framework don't handle closing of streams as expected. They will try to consume remaining data, which will usually time out when nakadi does not receive a commit. +## Content-Compression + +Fahrschein handles content compression transparently to the API consumer, and mostly independently of the actual HTTP +client implementation. Since version `0.20.0` it can be enabled to both compress HTTP POST bodies when event +publishing, and requesting payload compression from Nakadi when consuming events. + +### Consuming + +For event consumption the underlying HTTP client implementations send `Accept-Encoding` headers, indicating their supported compression algorithms. +At the time of writing, all tested client implementations default to `gzip` compression. If this is undesired, wrap your +RequestFactory into a `IdentityAcceptEncodingRequestFactory`, which sets the `Accept-Encoding` header to `identity`. + +### Publishing + +For event publishing, the `Request` body can also get gzip-encoded by Fahrschein, if enabled when building the RequestFactory. +For this, you need to pass `ContentEncoding.GZIP`, or if compression is undesired, pass `ContentEncoding.IDENTITY`. +In the future, we may support other encoding formats, like Zstandard. ## Fahrschein compared to other Nakadi client libraries diff --git a/fahrschein-e2e-test/pom.xml b/fahrschein-e2e-test/pom.xml index 264f5ee6..fc81563a 100644 --- a/fahrschein-e2e-test/pom.xml +++ b/fahrschein-e2e-test/pom.xml @@ -19,31 +19,43 @@ org.zalando fahrschein ${project.version} + test org.zalando fahrschein-http-api ${project.version} + test org.zalando fahrschein-http-apache ${project.version} + test org.zalando fahrschein-http-spring ${project.version} + test com.squareup.okhttp3 okhttp 4.9.3 + test + + + com.squareup.okhttp3 + logging-interceptor + 4.9.3 + test org.springframework spring-web ${version.spring} + test org.springframework @@ -63,29 +75,32 @@ org.slf4j slf4j-api ${version.slf4j} + test org.apache.logging.log4j log4j-api ${version.log4j} + test org.apache.logging.log4j log4j-core ${version.log4j} + test org.apache.logging.log4j log4j-slf4j-impl ${version.log4j} + test org.slf4j jcl-over-slf4j ${version.slf4j} + test - - junit junit diff --git a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/e2e/NakadiTestWithDockerCompose.java b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/e2e/NakadiTestWithDockerCompose.java index d77b0c73..82c30cad 100644 --- a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/e2e/NakadiTestWithDockerCompose.java +++ b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/e2e/NakadiTestWithDockerCompose.java @@ -12,6 +12,7 @@ import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; import java.io.File; import java.io.IOException; @@ -20,6 +21,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.function.Consumer; import java.util.stream.Stream; @@ -42,7 +44,8 @@ public abstract class NakadiTestWithDockerCompose { .withExposedService("nakadi_postgres_1", 5432) .withExposedService("zookeeper_1", 2181) .withExposedService("kafka_1", 9092) - .withExposedService("nakadi_1", 8080) + .withExposedService("nakadi_1", 8080, + Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(180))) .withLogConsumer("nakadi_1", LOG_CONSUMER); compose.start(); } diff --git a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/AbstractRequestFactoryTest.java b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/AbstractRequestFactoryTest.java index 7244edbf..1a8d7b64 100644 --- a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/AbstractRequestFactoryTest.java +++ b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/AbstractRequestFactoryTest.java @@ -1,6 +1,7 @@ package org.zalando.fahrschein.http; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategy; import com.fasterxml.jackson.databind.SerializationFeature; @@ -34,6 +35,9 @@ public abstract class AbstractRequestFactoryTest extends NakadiTestWithDockerCom objectMapper.registerModule(new Jdk8Module()); objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + // After a Nakadi-Docker upgrade, check if the response metadata changed + // by enabling deserialization failure on unknown properties. + objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); } @@ -42,9 +46,8 @@ public abstract class AbstractRequestFactoryTest extends NakadiTestWithDockerCom @Before public void setUpNakadiClient() { nakadiClient = NakadiClient - .builder(getNakadiUrl()) + .builder(getNakadiUrl(), getRequestFactory()) .withObjectMapper(objectMapper) - .withRequestFactory(getRequestFactory()) .build(); } @@ -87,10 +90,10 @@ public void testSubscribe() throws IOException, EventAlreadyProcessedException { return; })); testPublish(); - Mockito.verify(listener, timeout(10000).times(1)).accept(anyList()); + Mockito.verify(listener, timeout(10000).atLeastOnce()).accept(anyList()); } public Listener subscriptionListener() { - return Mockito.mock(Listener.class, withSettings().verboseLogging()); + return Mockito.mock(Listener.class); } } diff --git a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/apache/ApacheNakadiClientTest.java b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/apache/ApacheNakadiClientTest.java index 6fed9067..39572a4c 100644 --- a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/apache/ApacheNakadiClientTest.java +++ b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/apache/ApacheNakadiClientTest.java @@ -1,40 +1,17 @@ package org.zalando.fahrschein.http.apache; -import org.zalando.fahrschein.http.AbstractRequestFactoryTest; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.config.ConnectionConfig; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.zalando.fahrschein.http.AbstractRequestFactoryTest; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.RequestFactory; -import java.util.concurrent.TimeUnit; - public class ApacheNakadiClientTest extends AbstractRequestFactoryTest { @Override protected RequestFactory getRequestFactory() { - final RequestConfig requestConfig = RequestConfig.custom() - .setSocketTimeout(60000) - .setConnectTimeout(2000) - .setConnectionRequestTimeout(8000) - .setContentCompressionEnabled(false) - .build(); - - final ConnectionConfig connectionConfig = ConnectionConfig.custom() - .setBufferSize(512) - .build(); - - final CloseableHttpClient httpClient = HttpClients.custom() - .setDefaultRequestConfig(requestConfig) - .setDefaultConnectionConfig(connectionConfig) - .setConnectionTimeToLive(30, TimeUnit.SECONDS) - .disableAutomaticRetries() - .disableRedirectHandling() - .setMaxConnTotal(8) - .setMaxConnPerRoute(2) - .build(); - - return new HttpComponentsRequestFactory(httpClient); + final CloseableHttpClient httpClient = HttpClients.createDefault(); + return new HttpComponentsRequestFactory(httpClient, ContentEncoding.GZIP); } } diff --git a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/apache/IdentityEncodingApacheNakadiClientTest.java b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/apache/IdentityEncodingApacheNakadiClientTest.java new file mode 100644 index 00000000..4490c8ac --- /dev/null +++ b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/apache/IdentityEncodingApacheNakadiClientTest.java @@ -0,0 +1,18 @@ +package org.zalando.fahrschein.http.apache; + +import org.zalando.fahrschein.IdentityAcceptEncodingRequestFactory; +import org.zalando.fahrschein.http.AbstractRequestFactoryTest; +import org.zalando.fahrschein.http.api.ContentEncoding; +import org.zalando.fahrschein.http.api.RequestFactory; + +import static org.apache.http.impl.client.HttpClients.*; + +public class IdentityEncodingApacheNakadiClientTest extends AbstractRequestFactoryTest { + + @Override + protected RequestFactory getRequestFactory() { + + return new IdentityAcceptEncodingRequestFactory(new HttpComponentsRequestFactory(createMinimal(), ContentEncoding.IDENTITY)); + } + +} diff --git a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/simple/IdentityEncodingSimpleNakadiClientTest.java b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/simple/IdentityEncodingSimpleNakadiClientTest.java new file mode 100644 index 00000000..2cb76075 --- /dev/null +++ b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/simple/IdentityEncodingSimpleNakadiClientTest.java @@ -0,0 +1,13 @@ +package org.zalando.fahrschein.http.simple; + +import org.zalando.fahrschein.IdentityAcceptEncodingRequestFactory; +import org.zalando.fahrschein.http.AbstractRequestFactoryTest; +import org.zalando.fahrschein.http.api.ContentEncoding; +import org.zalando.fahrschein.http.api.RequestFactory; + +public class IdentityEncodingSimpleNakadiClientTest extends AbstractRequestFactoryTest { + @Override + protected RequestFactory getRequestFactory() { + return new IdentityAcceptEncodingRequestFactory(new SimpleRequestFactory(ContentEncoding.IDENTITY)); + } +} diff --git a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/simple/SimpleNakadiClientTest.java b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/simple/SimpleNakadiClientTest.java index 112bd55a..5b3e693b 100644 --- a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/simple/SimpleNakadiClientTest.java +++ b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/simple/SimpleNakadiClientTest.java @@ -1,11 +1,12 @@ package org.zalando.fahrschein.http.simple; import org.zalando.fahrschein.http.AbstractRequestFactoryTest; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.RequestFactory; public class SimpleNakadiClientTest extends AbstractRequestFactoryTest { @Override protected RequestFactory getRequestFactory() { - return new SimpleRequestFactory(); + return new SimpleRequestFactory(ContentEncoding.GZIP); } } diff --git a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/spring/IdentityEncodingSpringNakadiClientTest.java b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/spring/IdentityEncodingSpringNakadiClientTest.java new file mode 100644 index 00000000..f1f16d33 --- /dev/null +++ b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/spring/IdentityEncodingSpringNakadiClientTest.java @@ -0,0 +1,33 @@ +package org.zalando.fahrschein.http.spring; + +import okhttp3.OkHttpClient; +import okhttp3.logging.HttpLoggingInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.client.OkHttp3ClientHttpRequestFactory; +import org.zalando.fahrschein.IdentityAcceptEncodingRequestFactory; +import org.zalando.fahrschein.http.AbstractRequestFactoryTest; +import org.zalando.fahrschein.http.api.ContentEncoding; +import org.zalando.fahrschein.http.api.RequestFactory; + +import java.util.concurrent.TimeUnit; + +public class IdentityEncodingSpringNakadiClientTest extends AbstractRequestFactoryTest { + + private static final Logger logger = LoggerFactory.getLogger("okhttp3.wire"); + private static final HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(logger::debug); + static { + loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.HEADERS); + } + + @Override + protected RequestFactory getRequestFactory() { + final OkHttpClient client = new OkHttpClient.Builder() + .addInterceptor(loggingInterceptor) + .build(); + + final OkHttp3ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(client); + return new IdentityAcceptEncodingRequestFactory(new SpringRequestFactory(clientHttpRequestFactory, ContentEncoding.IDENTITY)); + } + +} diff --git a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/spring/SpringNakadiClientTest.java b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/spring/SpringNakadiClientTest.java index d06e4507..fcbfa2b3 100644 --- a/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/spring/SpringNakadiClientTest.java +++ b/fahrschein-e2e-test/src/test/java/org/zalando/fahrschein/http/spring/SpringNakadiClientTest.java @@ -1,26 +1,30 @@ package org.zalando.fahrschein.http.spring; -import org.zalando.fahrschein.http.AbstractRequestFactoryTest; -import okhttp3.ConnectionPool; import okhttp3.OkHttpClient; +import okhttp3.logging.HttpLoggingInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.client.OkHttp3ClientHttpRequestFactory; +import org.zalando.fahrschein.http.AbstractRequestFactoryTest; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.RequestFactory; -import java.util.concurrent.TimeUnit; - public class SpringNakadiClientTest extends AbstractRequestFactoryTest { + private static final Logger logger = LoggerFactory.getLogger("okhttp3.wire"); + private static final HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(logger::debug); + static { + loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.HEADERS); + } + @Override protected RequestFactory getRequestFactory() { final OkHttpClient client = new OkHttpClient.Builder() - .readTimeout(60, TimeUnit.SECONDS) - .connectTimeout(2, TimeUnit.SECONDS) - .writeTimeout(10, TimeUnit.SECONDS) - .connectionPool(new ConnectionPool(2, 5*60, TimeUnit.SECONDS)) + .addInterceptor(loggingInterceptor) .build(); final OkHttp3ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(client); - return new SpringRequestFactory(clientHttpRequestFactory); + return new SpringRequestFactory(clientHttpRequestFactory, ContentEncoding.GZIP); } } diff --git a/fahrschein-e2e-test/src/test/resources/docker-compose.yaml b/fahrschein-e2e-test/src/test/resources/docker-compose.yaml index b1939175..4c0ebf24 100644 --- a/fahrschein-e2e-test/src/test/resources/docker-compose.yaml +++ b/fahrschein-e2e-test/src/test/resources/docker-compose.yaml @@ -1,16 +1,16 @@ version: '3' services: - nakadi-ui: - image: nakadi/nakadi-ui:latest - ports: - - 3000 - depends_on: - - nakadi - environment: - - NAKADI_API_URL=http://nakadi:8080 +# nakadi-ui: +# image: nakadi/nakadi-ui:master-292 +# ports: +# - 3000 +# depends_on: +# - nakadi +# environment: +# - NAKADI_API_URL=http://nakadi:8080 nakadi: - image: adyach/nakadi-docker:latest + image: ghcr.io/adyach/nakadi-docker/nakadi:3.4 ports: - 8080:8080 - 7979 @@ -25,7 +25,7 @@ services: - SPRING_DATASOURCE_URL=jdbc:postgresql://nakadi_postgres:5432/local_nakadi_db nakadi_postgres: - image: adyach/nakadi-postgres:latest + image: ghcr.io/adyach/nakadi-docker/postgres:13.6 ports: - 5432 environment: @@ -34,12 +34,12 @@ services: POSTGRES_DB: local_nakadi_db zookeeper: - image: wurstmeister/zookeeper:3.4.6 + image: ghcr.io/adyach/nakadi-docker/zookeeper:3.6.3 ports: - 2181 kafka: - image: wurstmeister/kafka:2.12-2.5.0 + image: ghcr.io/adyach/kafka-docker/kafka-docker:2.7.0 ports: - 9092 depends_on: diff --git a/fahrschein-e2e-test/src/test/resources/log4j2.xml b/fahrschein-e2e-test/src/test/resources/log4j2.xml new file mode 100644 index 00000000..e765ab35 --- /dev/null +++ b/fahrschein-e2e-test/src/test/resources/log4j2.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/fahrschein-example/src/main/java/org/zalando/fahrschein/example/Main.java b/fahrschein-example/src/main/java/org/zalando/fahrschein/example/Main.java index 036c253a..abc70939 100644 --- a/fahrschein-example/src/main/java/org/zalando/fahrschein/example/Main.java +++ b/fahrschein-example/src/main/java/org/zalando/fahrschein/example/Main.java @@ -31,7 +31,9 @@ import org.zalando.fahrschein.example.domain.SalesOrder; import org.zalando.fahrschein.example.domain.SalesOrderPlaced; import org.zalando.fahrschein.http.apache.HttpComponentsRequestFactory; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.RequestFactory; +import org.zalando.fahrschein.http.simple.SimpleRequestFactory; import org.zalando.fahrschein.http.spring.SpringRequestFactory; import org.zalando.fahrschein.inmemory.InMemoryCursorManager; import org.zalando.fahrschein.jdbc.JdbcCursorManager; @@ -121,7 +123,7 @@ private static void subscriptionMultipleEvents(ObjectMapper objectMapper) throws } }; - final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) + final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY)) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); @@ -149,7 +151,7 @@ private static void subscriptionListenWithPositionCursors(ObjectMapper objectMap new Cursor("6", "000000000000109100", "sales-order-service.order-placed"), new Cursor("7", "000000000000109146", "sales-order-service.order-placed")); - final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) + final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY)) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); @@ -165,7 +167,7 @@ private static void subscriptionListenWithPositionCursors(ObjectMapper objectMap private static void subscriptionListen(ObjectMapper objectMapper, Listener listener) throws IOException { - final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) + final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY)) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); @@ -201,10 +203,9 @@ private static void subscriptionListenHttpComponents(ObjectMapper objectMapper, .setMaxConnPerRoute(2) .build(); - final RequestFactory requestFactory = new HttpComponentsRequestFactory(httpClient); + final RequestFactory requestFactory = new HttpComponentsRequestFactory(httpClient, ContentEncoding.IDENTITY); - final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) - .withRequestFactory(requestFactory) + final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, requestFactory) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); @@ -231,10 +232,9 @@ private static void subscriptionListenSpringAdapter(ObjectMapper objectMapper, L .build(); final OkHttp3ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(client); - final SpringRequestFactory requestFactory = new SpringRequestFactory(clientHttpRequestFactory); + final SpringRequestFactory requestFactory = new SpringRequestFactory(clientHttpRequestFactory, ContentEncoding.IDENTITY); - final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) - .withRequestFactory(requestFactory) + final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, requestFactory) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); @@ -252,7 +252,7 @@ private static void subscriptionListenSpringAdapter(ObjectMapper objectMapper, L private static void simpleListen(ObjectMapper objectMapper, Listener listener) throws IOException { final InMemoryCursorManager cursorManager = new InMemoryCursorManager(); - final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) + final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY)) .withAccessTokenProvider(new ZignAccessTokenProvider()) .withCursorManager(cursorManager) .build(); @@ -276,7 +276,7 @@ private static void persistentListen(ObjectMapper objectMapper, Listener listener) throws IOException { - final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI) + final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI, new SimpleRequestFactory(ContentEncoding.IDENTITY)) .withAccessTokenProvider(new ZignAccessTokenProvider()) .build(); @@ -328,7 +328,7 @@ private static void multiInstanceListen(ObjectMapper objectMapper, Listener + pattern="%date {%level} [%thread] [%logger] %message%n"/> diff --git a/fahrschein-http-apache/pom.xml b/fahrschein-http-apache/pom.xml index 68848e87..4604b59f 100644 --- a/fahrschein-http-apache/pom.xml +++ b/fahrschein-http-apache/pom.xml @@ -14,17 +14,6 @@ fahrschein-http-apache Fahrschein HTTP Client using Apache HttpComponents - - - - maven-surefire-plugin - - false - - - - - org.zalando @@ -36,6 +25,18 @@ httpclient 4.5.13 + + org.mockito + mockito-core + ${version.mockito} + test + + + junit + junit + ${version.junit} + test + - \ No newline at end of file + diff --git a/fahrschein-http-apache/src/main/java/org/zalando/fahrschein/http/apache/HttpComponentsRequest.java b/fahrschein-http-apache/src/main/java/org/zalando/fahrschein/http/apache/HttpComponentsRequest.java index 3ede52b4..b4f3bb41 100644 --- a/fahrschein-http-apache/src/main/java/org/zalando/fahrschein/http/apache/HttpComponentsRequest.java +++ b/fahrschein-http-apache/src/main/java/org/zalando/fahrschein/http/apache/HttpComponentsRequest.java @@ -2,11 +2,13 @@ import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.protocol.HTTP; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.Headers; import org.zalando.fahrschein.http.api.HeadersImpl; import org.zalando.fahrschein.http.api.Request; @@ -16,7 +18,9 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.util.Arrays; import java.util.List; +import java.util.zip.GZIPOutputStream; /** * {@link Request} implementation based on Apache HttpComponents HttpClient. @@ -33,14 +37,18 @@ final class HttpComponentsRequest implements Request { private final HttpClient httpClient; private final HttpUriRequest httpRequest; + private final ContentEncoding contentEncoding; private final Headers headers; private ByteArrayOutputStream bufferedOutput; private boolean executed; - HttpComponentsRequest(HttpClient client, HttpUriRequest request) { + private static final List writeMethods = Arrays.asList("POST", "PATCH", "PUT"); + + HttpComponentsRequest(HttpClient client, HttpUriRequest request, ContentEncoding contentEncoding) { this.httpClient = client; this.httpRequest = request; + this.contentEncoding = contentEncoding; this.headers = new HeadersImpl(); } @@ -93,6 +101,10 @@ public final OutputStream getBody() throws IOException { assertNotExecuted(); if (this.bufferedOutput == null) { this.bufferedOutput = new ByteArrayOutputStream(1024); + if (writeMethods.contains(getMethod()) && ContentEncoding.GZIP.equals(this.contentEncoding)) { + this.httpRequest.setHeader(HttpHeaders.CONTENT_ENCODING, this.contentEncoding.value()); + return new GZIPOutputStream(this.bufferedOutput); + } } return this.bufferedOutput; } diff --git a/fahrschein-http-apache/src/main/java/org/zalando/fahrschein/http/apache/HttpComponentsRequestFactory.java b/fahrschein-http-apache/src/main/java/org/zalando/fahrschein/http/apache/HttpComponentsRequestFactory.java index 978c5c22..76a260c5 100644 --- a/fahrschein-http-apache/src/main/java/org/zalando/fahrschein/http/apache/HttpComponentsRequestFactory.java +++ b/fahrschein-http-apache/src/main/java/org/zalando/fahrschein/http/apache/HttpComponentsRequestFactory.java @@ -10,6 +10,7 @@ import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpTrace; import org.apache.http.client.methods.HttpUriRequest; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.Request; import org.zalando.fahrschein.http.api.RequestFactory; @@ -35,13 +36,16 @@ public class HttpComponentsRequestFactory implements RequestFactory { private final HttpClient httpClient; + private final ContentEncoding contentEncoding; /** * Create a new instance of the {@code HttpComponentsRequestFactory} * with the given {@link HttpClient} instance. * @param httpClient the HttpClient instance to use for this request factory + * @param contentEncoding content encoding for request payloads. */ - public HttpComponentsRequestFactory(HttpClient httpClient) { + public HttpComponentsRequestFactory(HttpClient httpClient, ContentEncoding contentEncoding) { + this.contentEncoding = contentEncoding; if (httpClient == null) { throw new IllegalArgumentException("HttpClient must not be null"); } @@ -52,7 +56,7 @@ public HttpComponentsRequestFactory(HttpClient httpClient) { public Request createRequest(URI uri, String httpMethod) throws IOException { final HttpUriRequest httpRequest = createHttpUriRequest(httpMethod, uri); - return new HttpComponentsRequest(httpClient, httpRequest); + return new HttpComponentsRequest(httpClient, httpRequest, contentEncoding); } /** diff --git a/fahrschein-http-apache/src/test/java/org/zalando/fahrschein/http/apache/HttpComponentsRequestFactoryTest.java b/fahrschein-http-apache/src/test/java/org/zalando/fahrschein/http/apache/HttpComponentsRequestFactoryTest.java new file mode 100644 index 00000000..e34c0be0 --- /dev/null +++ b/fahrschein-http-apache/src/test/java/org/zalando/fahrschein/http/apache/HttpComponentsRequestFactoryTest.java @@ -0,0 +1,200 @@ +package org.zalando.fahrschein.http.apache; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.zalando.fahrschein.http.api.ContentEncoding; +import org.zalando.fahrschein.http.api.ContentType; +import org.zalando.fahrschein.http.api.Request; +import org.zalando.fahrschein.http.api.RequestFactory; +import org.zalando.fahrschein.http.api.Response; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +@RunWith(MockitoJUnitRunner.class) +public class HttpComponentsRequestFactoryTest { + + static HttpServer server; + static URI serverAddress; + + @BeforeClass + public static void startServer() throws IOException { + server = HttpServer.create(new InetSocketAddress("localhost", 0), 1); + serverAddress = URI.create("http://localhost:" + server.getAddress().getPort()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + server.setExecutor(executor); + server.start(); + } + + @Captor + public ArgumentCaptor exchangeCaptor; + + @Test + public void testGzippedResponseBody() throws IOException { + // given + String expectedResponse = "{}"; + HttpHandler spy = Mockito.spy(new GzippedResponseContentHandler(expectedResponse)); + server.createContext("/gzipped", spy); + + // when + final RequestFactory f = defaultRequestFactory(ContentEncoding.IDENTITY); + Request r = f.createRequest(serverAddress.resolve("/gzipped"), "GET"); + Response executed = r.execute(); + String actualResponse = readStream(executed.getBody()); + + // then + Mockito.verify(spy).handle(exchangeCaptor.capture()); + HttpExchange capturedArgument = exchangeCaptor.getValue(); + assertThat("accept-encoding header", capturedArgument.getRequestHeaders().get("accept-encoding"), equalTo(Arrays.asList("gzip,deflate"))); + assertThat("no content-encoding header", capturedArgument.getRequestHeaders().get("content-encoding"), nullValue()); + assertEquals(URI.create("/gzipped"), capturedArgument.getRequestURI()); + assertEquals(expectedResponse, actualResponse); + } + + @Test(expected = SocketTimeoutException.class) + public void testTimeout() throws IOException { + // given + server.createContext("/timeout", exchange -> { + try { + Thread.sleep(10l); + exchange.sendResponseHeaders(201, 0); + } catch (InterruptedException e) { } + }); + + // when + RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(1).build(); + final CloseableHttpClient httpClient = HttpClients.custom().setDefaultRequestConfig(requestConfig).build(); + RequestFactory f = new HttpComponentsRequestFactory(httpClient, ContentEncoding.GZIP); + Request r = f.createRequest(serverAddress.resolve("/timeout"), "GET"); + r.execute(); + } + + @Test + public void testGzippedRequestBody() throws IOException { + // given + String requestBody = "{}"; + String responseBody = "{}"; + SimpleRequestResponseContentHandler spy = Mockito.spy(new SimpleRequestResponseContentHandler(responseBody)); + server.createContext("/gzipped-post", spy); + + // when + RequestFactory f = defaultRequestFactory(ContentEncoding.GZIP); + + Request r = f.createRequest(serverAddress.resolve("/gzipped-post"), "POST"); + r.getHeaders().setContentType(ContentType.APPLICATION_JSON); + try (final OutputStream body = r.getBody()) { + body.write(requestBody.getBytes()); + } + Response executed = r.execute(); + String actualResponse = readStream(executed.getBody()); + + // then + Mockito.verify(spy).handle(exchangeCaptor.capture()); + HttpExchange capturedArgument = exchangeCaptor.getValue(); + assertEquals("POST", capturedArgument.getRequestMethod()); + assertEquals(URI.create("/gzipped-post"), capturedArgument.getRequestURI()); + assertThat("content-encoding header", capturedArgument.getRequestHeaders().get("content-encoding"), equalTo(Arrays.asList("gzip"))); + assertEquals(requestBody, spy.getRequestBody()); + assertEquals(responseBody, actualResponse); + } + + private RequestFactory defaultRequestFactory(ContentEncoding contentEncoding) { + return new HttpComponentsRequestFactory(HttpClients.createDefault(), contentEncoding); + } + + static String readStream(InputStream stream) throws IOException { + String res = new BufferedReader( + new InputStreamReader(stream, UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + stream.close(); + return res; + } + + private static class SimpleRequestResponseContentHandler implements HttpHandler { + + private String requestBody; + private final String responseBody; + + SimpleRequestResponseContentHandler(String responseBody) { + this.responseBody = responseBody; + } + + public String getRequestBody() { + return requestBody; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + + try { + if (exchange.getRequestHeaders().containsKey("Content-Encoding") && exchange.getRequestHeaders().get("Content-Encoding").contains("gzip")) { + requestBody = readStream(new GZIPInputStream(exchange.getRequestBody())); + } else { + requestBody = readStream(exchange.getRequestBody()); + } + } catch (Exception e) { + e.printStackTrace(); + } + + byte[] bytes = responseBody.getBytes(UTF_8); + exchange.sendResponseHeaders(200, bytes.length); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(bytes); + responseBody.close(); + } + } + + private static class GzippedResponseContentHandler implements HttpHandler { + + private final byte[] rawResponse; + + GzippedResponseContentHandler(String response) throws IOException { + byte[] stringResponse = response.getBytes(UTF_8); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + GZIPOutputStream zipStream = new GZIPOutputStream(byteStream); + zipStream.write(stringResponse); + zipStream.close(); + this.rawResponse = byteStream.toByteArray(); + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + exchange.getResponseHeaders().set("Content-Encoding", "gzip"); + exchange.sendResponseHeaders(200, rawResponse.length); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(rawResponse); + responseBody.close(); + } + } +} diff --git a/fahrschein-http-api/pom.xml b/fahrschein-http-api/pom.xml index c8a6371b..d8c132d0 100644 --- a/fahrschein-http-api/pom.xml +++ b/fahrschein-http-api/pom.xml @@ -14,17 +14,6 @@ fahrschein-http-api Fahrschein HTTP API - - - - maven-surefire-plugin - - false - - - - - com.google.code.findbugs @@ -40,4 +29,4 @@ - \ No newline at end of file + diff --git a/fahrschein-http-api/src/main/java/org/zalando/fahrschein/http/api/ContentEncoding.java b/fahrschein-http-api/src/main/java/org/zalando/fahrschein/http/api/ContentEncoding.java new file mode 100644 index 00000000..1d39e3d1 --- /dev/null +++ b/fahrschein-http-api/src/main/java/org/zalando/fahrschein/http/api/ContentEncoding.java @@ -0,0 +1,15 @@ +package org.zalando.fahrschein.http.api; + +public enum ContentEncoding { + IDENTITY("identity"), GZIP("gzip"); + + private final String value; + + ContentEncoding(String value) { + this.value = value; + } + + public String value() { + return value; + } +} diff --git a/fahrschein-http-api/src/main/java/org/zalando/fahrschein/http/api/RequestFactory.java b/fahrschein-http-api/src/main/java/org/zalando/fahrschein/http/api/RequestFactory.java index 6c3ddb2d..b5496b4d 100644 --- a/fahrschein-http-api/src/main/java/org/zalando/fahrschein/http/api/RequestFactory.java +++ b/fahrschein-http-api/src/main/java/org/zalando/fahrschein/http/api/RequestFactory.java @@ -5,6 +5,13 @@ public interface RequestFactory { + /** + * Creates a new request using the underlying RequestFactory implementation. + * @param uri request target URI + * @param method request method (GET, POST, ...) + * @return the request + * @throws IOException in case of I/O issues while trying to create the request. + */ Request createRequest(URI uri, String method) throws IOException; } diff --git a/fahrschein-http-simple/pom.xml b/fahrschein-http-simple/pom.xml index 5a7001a9..ea99cdf5 100644 --- a/fahrschein-http-simple/pom.xml +++ b/fahrschein-http-simple/pom.xml @@ -14,22 +14,23 @@ fahrschein-http-simple Fahrschein HTTP Client using HttpURLConnection - - - - maven-surefire-plugin - - false - - - - - org.zalando fahrschein-http-api ${project.version} + + junit + junit + ${version.junit} + test + + + org.mockito + mockito-core + ${version.mockito} + test + diff --git a/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleBufferingRequest.java b/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleBufferingRequest.java index 46747195..4b73654c 100644 --- a/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleBufferingRequest.java +++ b/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleBufferingRequest.java @@ -1,5 +1,6 @@ package org.zalando.fahrschein.http.simple; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.Headers; import org.zalando.fahrschein.http.api.HeadersImpl; import org.zalando.fahrschein.http.api.Request; @@ -12,11 +13,15 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.List; +import java.util.zip.GZIPOutputStream; /** * {@link Request} implementation that uses standard JDK facilities to * execute buffered requests. Created via the {@link SimpleRequestFactory}. * + * See original + * code from Spring Framework. + * * @author Arjen Poutsma * @author Juergen Hoeller * @author Joern Horstmann @@ -26,12 +31,14 @@ final class SimpleBufferingRequest implements Request { private final HttpURLConnection connection; private final Headers headers; + private final ContentEncoding contentEncoding; private ByteArrayOutputStream bufferedOutput; private boolean executed; - SimpleBufferingRequest(HttpURLConnection connection) { + SimpleBufferingRequest(HttpURLConnection connection, ContentEncoding contentEncoding) { this.connection = connection; this.headers = new HeadersImpl(); + this.contentEncoding = contentEncoding; } @Override @@ -52,7 +59,6 @@ private Response executeInternal() throws IOException { final int size = this.bufferedOutput != null ? this.bufferedOutput.size() : 0; final long contentLength = this.headers.getContentLength(); - if (contentLength >= 0 && contentLength != size) { throw new IllegalStateException("Invalid Content-Length header [" + contentLength + "], request size is [" + size + "]"); } @@ -67,6 +73,11 @@ private Response executeInternal() throws IOException { } } + // allow gzip-compression from server response + if (connection.getRequestProperty("Accept-Encoding") == null) { + connection.setRequestProperty("Accept-Encoding", "gzip"); + } + if (this.connection.getDoOutput()) { this.connection.setFixedLengthStreamingMode(size); } @@ -97,6 +108,10 @@ public final OutputStream getBody() throws IOException { assertNotExecuted(); if (this.bufferedOutput == null) { this.bufferedOutput = new ByteArrayOutputStream(1024); + if (this.connection.getDoOutput() && ContentEncoding.GZIP.equals(this.contentEncoding)) { + this.connection.setRequestProperty("Content-Encoding", this.contentEncoding.value()); + return new GZIPOutputStream(this.bufferedOutput); + } } return this.bufferedOutput; } diff --git a/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleRequestFactory.java b/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleRequestFactory.java index b152f2fd..d80dcc48 100644 --- a/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleRequestFactory.java +++ b/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleRequestFactory.java @@ -1,5 +1,6 @@ package org.zalando.fahrschein.http.simple; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.Request; import org.zalando.fahrschein.http.api.RequestFactory; @@ -12,6 +13,9 @@ /** * {@link RequestFactory} implementation that uses standard JDK facilities. * + * See original + * code from Spring Framework + * * @author Arjen Poutsma * @author Juergen Hoeller * @author Joern Horstmann @@ -19,8 +23,16 @@ */ public class SimpleRequestFactory implements RequestFactory { - private int connectTimeout = -1; - private int readTimeout = -1; + private static final int DEFAULT_CONNECT_TIMEOUT = 500; + private static final int DEFAULT_READ_TIMEOUT = 60 * 1000; + + private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int readTimeout = DEFAULT_READ_TIMEOUT; + private final ContentEncoding contentEncoding; + + public SimpleRequestFactory(ContentEncoding contentEncoding) { + this.contentEncoding = contentEncoding; + } /** * Set the underlying URLConnection's connect timeout (in milliseconds). @@ -49,20 +61,21 @@ public Request createRequest(URI uri, String method) throws IOException { HttpURLConnection connection = openConnection(uri.toURL()); prepareConnection(connection, method); - return new SimpleBufferingRequest(connection); + return new SimpleBufferingRequest(connection, contentEncoding); } /** * Opens and returns a connection to the given URL. * - * @param url the URL to open a connection to + * @param url the URL to open a connection to * @return the opened connection * @throws IOException in case of I/O errors + * @throws IllegalArgumentException in case {{@link java.net.URL#openConnection()}} does not lead to a HttpURLConnection */ private HttpURLConnection openConnection(URL url) throws IOException { URLConnection urlConnection = url.openConnection(); if (!(urlConnection instanceof HttpURLConnection)) { - throw new IllegalStateException("Connection should be an HttpURLConnection"); + throw new IllegalArgumentException("Connection should be an HttpURLConnection"); } return (HttpURLConnection) urlConnection; } diff --git a/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleResponse.java b/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleResponse.java index e88fe9b4..49d95a79 100644 --- a/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleResponse.java +++ b/fahrschein-http-simple/src/main/java/org/zalando/fahrschein/http/simple/SimpleResponse.java @@ -7,11 +7,15 @@ import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; +import java.util.zip.GZIPInputStream; /** * {@link Response} implementation that uses standard JDK facilities. * Obtained via {@link SimpleBufferingRequest#execute()}. * + * See original + * code from Spring Framework. + * * @author Arjen Poutsma * @author Brian Clozel * @author Joern Horstmann @@ -65,6 +69,9 @@ public InputStream getBody() throws IOException { if (this.responseStream == null) { final InputStream errorStream = connection.getErrorStream(); this.responseStream = (errorStream != null ? errorStream : connection.getInputStream()); + if (this.getHeaders().get("Content-Encoding").contains("gzip")) { + this.responseStream = new GZIPInputStream(this.responseStream); + } } return this.responseStream; } @@ -80,5 +87,4 @@ public void close() { } } } - } diff --git a/fahrschein-http-simple/src/test/java/org/zalando/fahrschein/http/simple/SimpleRequestFactoryTest.java b/fahrschein-http-simple/src/test/java/org/zalando/fahrschein/http/simple/SimpleRequestFactoryTest.java new file mode 100644 index 00000000..e4701ea4 --- /dev/null +++ b/fahrschein-http-simple/src/test/java/org/zalando/fahrschein/http/simple/SimpleRequestFactoryTest.java @@ -0,0 +1,237 @@ +package org.zalando.fahrschein.http.simple; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.zalando.fahrschein.http.api.ContentEncoding; +import org.zalando.fahrschein.http.api.ContentType; +import org.zalando.fahrschein.http.api.Request; +import org.zalando.fahrschein.http.api.RequestFactory; +import org.zalando.fahrschein.http.api.Response; + +import java.io.*; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +@RunWith(MockitoJUnitRunner.class) +public class SimpleRequestFactoryTest { + + static HttpServer server; + static URI serverAddress; + + @BeforeClass + public static void startServer() throws IOException { + server = HttpServer.create(new InetSocketAddress("localhost", 0), 1); + serverAddress = URI.create("http://localhost:" + server.getAddress().getPort()); + server.setExecutor(Executors.newSingleThreadExecutor()); + server.start(); + } + + @Captor + public ArgumentCaptor exchangeCaptor; + + @Test + public void testGetRequest() throws IOException { + // given + String expectedResponse = "{}"; + HttpHandler spy = Mockito.spy(new SimpleRequestResponseContentHandler(expectedResponse)); + server.createContext("/get", spy); + + // when + RequestFactory f = defaultRequestFactory(ContentEncoding.GZIP); + Request r = f.createRequest(serverAddress.resolve("/get"), "GET"); + Response executed = r.execute(); + String actualResponse = readStream(executed.getBody()); + + // then + assertEquals(serverAddress.resolve("/get"), r.getURI()); + Mockito.verify(spy).handle(exchangeCaptor.capture()); + HttpExchange capturedArgument = exchangeCaptor. getValue(); + assertEquals("GET", capturedArgument.getRequestMethod()); + assertEquals(200, executed.getStatusCode()); + assertEquals("OK", executed.getStatusText()); + assertEquals(expectedResponse, actualResponse); + } + + @Test + public void testPostRequest() throws IOException { + // given + String requestBody = "{}"; + String responseBody = "{}"; + SimpleRequestResponseContentHandler spy = Mockito.spy(new SimpleRequestResponseContentHandler(responseBody)); + server.createContext("/post", spy); + + // when + SimpleRequestFactory f = new SimpleRequestFactory(ContentEncoding.IDENTITY); + Request r = f.createRequest(serverAddress.resolve("/post"), "POST"); + r.getHeaders().setContentType(ContentType.APPLICATION_JSON); + try (final OutputStream body = r.getBody()) { + body.write(requestBody.getBytes()); + } + Response executed = r.execute(); + String actualResponse = readStream(executed.getBody()); + + // then + Mockito.verify(spy).handle(exchangeCaptor.capture()); + HttpExchange capturedArgument = exchangeCaptor.getValue(); + assertEquals("POST", capturedArgument.getRequestMethod()); + assertThat("no content-encoding header", capturedArgument.getRequestHeaders().get("content-encoding"), nullValue()); + assertEquals(URI.create("/post"), capturedArgument.getRequestURI()); + assertEquals(requestBody, spy.getRequestBody()); + assertEquals(responseBody, actualResponse); + } + + @Test + public void testGzippedResponseBody() throws IOException { + // given + String expectedResponse = "{}"; + HttpHandler spy = Mockito.spy(new GzippedResponseContentHandler(expectedResponse)); + server.createContext("/gzipped", spy); + + // when + RequestFactory f = defaultRequestFactory(ContentEncoding.IDENTITY); + Request r = f.createRequest(serverAddress.resolve("/gzipped"), "GET"); + Response executed = r.execute(); + String actualResponse = readStream(executed.getBody()); + + // then + Mockito.verify(spy).handle(exchangeCaptor.capture()); + HttpExchange capturedArgument = exchangeCaptor.getValue(); + assertThat("accept-encoding header", capturedArgument.getRequestHeaders().get("accept-encoding"), equalTo(Arrays.asList("gzip"))); + assertThat("no content-encoding header", capturedArgument.getRequestHeaders().get("content-encoding"), nullValue()); + assertEquals(URI.create("/gzipped"), capturedArgument.getRequestURI()); + assertEquals(expectedResponse, actualResponse); + } + + @Test(expected = SocketTimeoutException.class) + public void testTimeout() throws IOException { + // given + server.createContext("/timeout", exchange -> { + try { + Thread.sleep(10l); + exchange.sendResponseHeaders(201, 0); + } catch (InterruptedException e) { } + }); + + // when + SimpleRequestFactory f = new SimpleRequestFactory(ContentEncoding.IDENTITY); + f.setReadTimeout(1); + Request r = f.createRequest(serverAddress.resolve("/timeout"), "GET"); + r.execute(); + } + + @Test + public void testGzippedRequestBody() throws IOException { + // given + String requestBody = "{}"; + String responseBody = "{}"; + SimpleRequestResponseContentHandler spy = Mockito.spy(new SimpleRequestResponseContentHandler(responseBody)); + server.createContext("/gzipped-post", spy); + + // when + RequestFactory f = defaultRequestFactory(ContentEncoding.GZIP); + Request r = f.createRequest(serverAddress.resolve("/gzipped-post"), "POST"); + r.getHeaders().setContentType(ContentType.APPLICATION_JSON); + try (final OutputStream body = r.getBody()) { + body.write("{}".getBytes()); + } + Response executed = r.execute(); + String actualResponse = readStream(executed.getBody()); + + // then + Mockito.verify(spy).handle(exchangeCaptor.capture()); + HttpExchange capturedArgument = exchangeCaptor.getValue(); + assertEquals("POST", capturedArgument.getRequestMethod()); + assertEquals(URI.create("/gzipped-post"), capturedArgument.getRequestURI()); + assertThat("content-encoding header", capturedArgument.getRequestHeaders().get("content-encoding"), equalTo(Arrays.asList("gzip"))); + assertEquals(requestBody, spy.getRequestBody()); + assertEquals(responseBody, actualResponse); + } + + private RequestFactory defaultRequestFactory(ContentEncoding contentEncoding) { + return new SimpleRequestFactory(contentEncoding); + } + + static String readStream(InputStream stream) throws IOException { + String res = new BufferedReader( + new InputStreamReader(stream, UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + stream.close(); + return res; + } + + private static class SimpleRequestResponseContentHandler implements HttpHandler { + + private String requestBody; + private final String responseBody; + + SimpleRequestResponseContentHandler(String responseBody) { + this.responseBody = responseBody; + } + + public String getRequestBody() { + return requestBody; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + + if (exchange.getRequestHeaders().containsKey("Content-Encoding") && exchange.getRequestHeaders().get("Content-Encoding").contains("gzip")) { + requestBody = readStream(new GZIPInputStream(exchange.getRequestBody())); + } else { + requestBody = readStream(exchange.getRequestBody()); + } + + byte[] bytes = responseBody.getBytes(UTF_8); + exchange.sendResponseHeaders(200, bytes.length); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(bytes); + responseBody.flush(); + responseBody.close(); + } + } + + private static class GzippedResponseContentHandler implements HttpHandler { + + private final byte[] rawResponse; + + GzippedResponseContentHandler(String response) throws IOException { + byte[] stringResponse = response.getBytes(UTF_8); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + GZIPOutputStream zipStream = new GZIPOutputStream(byteStream); + zipStream.write(stringResponse); + zipStream.close(); + this.rawResponse = byteStream.toByteArray(); + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + exchange.getResponseHeaders().set("Content-Encoding", "gzip"); + exchange.sendResponseHeaders(200, rawResponse.length); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(rawResponse); + responseBody.close(); + } + } + +} diff --git a/fahrschein-http-spring/pom.xml b/fahrschein-http-spring/pom.xml index 22be58b7..49318e76 100644 --- a/fahrschein-http-spring/pom.xml +++ b/fahrschein-http-spring/pom.xml @@ -14,17 +14,6 @@ fahrschein-http-spring Fahrschein HTTP Spring Adapter - - - - maven-surefire-plugin - - false - - - - - org.zalando @@ -62,6 +51,18 @@ ${version.junit} test + + org.mockito + mockito-core + ${version.mockito} + test + + + com.squareup.okhttp3 + okhttp + 4.9.3 + test + - \ No newline at end of file + diff --git a/fahrschein-http-spring/src/main/java/org/zalando/fahrschein/http/spring/RequestAdapter.java b/fahrschein-http-spring/src/main/java/org/zalando/fahrschein/http/spring/RequestAdapter.java index 81027e5e..7119dbc8 100644 --- a/fahrschein-http-spring/src/main/java/org/zalando/fahrschein/http/spring/RequestAdapter.java +++ b/fahrschein-http-spring/src/main/java/org/zalando/fahrschein/http/spring/RequestAdapter.java @@ -1,6 +1,8 @@ package org.zalando.fahrschein.http.spring; +import org.springframework.http.HttpHeaders; import org.springframework.http.client.ClientHttpRequest; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.Headers; import org.zalando.fahrschein.http.api.Request; import org.zalando.fahrschein.http.api.Response; @@ -8,12 +10,19 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.zip.GZIPOutputStream; class RequestAdapter implements Request { private final ClientHttpRequest clientHttpRequest; + private final ContentEncoding contentEncoding; - RequestAdapter(ClientHttpRequest clientHttpRequest) { + private static final List writeMethods = Arrays.asList("POST", "PATCH", "PUT"); + + RequestAdapter(ClientHttpRequest clientHttpRequest, ContentEncoding contentEncoding) { this.clientHttpRequest = clientHttpRequest; + this.contentEncoding = contentEncoding; } @Override @@ -33,6 +42,10 @@ public Headers getHeaders() { @Override public OutputStream getBody() throws IOException { + if (writeMethods.contains(getMethod()) && ContentEncoding.GZIP.equals(contentEncoding)) { + clientHttpRequest.getHeaders().set(HttpHeaders.CONTENT_ENCODING, contentEncoding.value()); + return new GZIPOutputStream(clientHttpRequest.getBody()); + } return clientHttpRequest.getBody(); } diff --git a/fahrschein-http-spring/src/main/java/org/zalando/fahrschein/http/spring/SpringRequestFactory.java b/fahrschein-http-spring/src/main/java/org/zalando/fahrschein/http/spring/SpringRequestFactory.java index 82cb0de0..ab4ab1e7 100644 --- a/fahrschein-http-spring/src/main/java/org/zalando/fahrschein/http/spring/SpringRequestFactory.java +++ b/fahrschein-http-spring/src/main/java/org/zalando/fahrschein/http/spring/SpringRequestFactory.java @@ -2,6 +2,7 @@ import org.springframework.http.HttpMethod; import org.springframework.http.client.ClientHttpRequestFactory; +import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.fahrschein.http.api.Request; import org.zalando.fahrschein.http.api.RequestFactory; @@ -10,13 +11,15 @@ public class SpringRequestFactory implements RequestFactory { private final ClientHttpRequestFactory clientRequestFactory; + private final ContentEncoding contentEncoding; - public SpringRequestFactory(ClientHttpRequestFactory clientRequestFactory) { + public SpringRequestFactory(ClientHttpRequestFactory clientRequestFactory, ContentEncoding contentEncoding) { this.clientRequestFactory = clientRequestFactory; + this.contentEncoding = contentEncoding; } @Override public Request createRequest(URI uri, String method) throws IOException { - return new RequestAdapter(clientRequestFactory.createRequest(uri, HttpMethod.valueOf(method))); + return new RequestAdapter(clientRequestFactory.createRequest(uri, HttpMethod.valueOf(method)), contentEncoding); } } diff --git a/fahrschein-http-spring/src/test/java/org/zalando/fahrschein/http/spring/SpringRequestFactoryTest.java b/fahrschein-http-spring/src/test/java/org/zalando/fahrschein/http/spring/SpringRequestFactoryTest.java new file mode 100644 index 00000000..65da8646 --- /dev/null +++ b/fahrschein-http-spring/src/test/java/org/zalando/fahrschein/http/spring/SpringRequestFactoryTest.java @@ -0,0 +1,201 @@ +package org.zalando.fahrschein.http.spring; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import okhttp3.OkHttpClient; +import org.jetbrains.annotations.NotNull; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.http.client.OkHttp3ClientHttpRequestFactory; +import org.zalando.fahrschein.http.api.ContentEncoding; +import org.zalando.fahrschein.http.api.ContentType; +import org.zalando.fahrschein.http.api.Request; +import org.zalando.fahrschein.http.api.RequestFactory; +import org.zalando.fahrschein.http.api.Response; + +import java.io.*; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +@RunWith(MockitoJUnitRunner.class) +public class SpringRequestFactoryTest { + + static HttpServer server; + static URI serverAddress; + + @BeforeClass + public static void startServer() throws IOException { + server = HttpServer.create(new InetSocketAddress("localhost", 0), 1); + serverAddress = URI.create("http://localhost:" + server.getAddress().getPort()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + server.setExecutor(executor); + server.start(); + } + + @Captor + public ArgumentCaptor exchangeCaptor; + + @Test + public void testGzippedResponseBody() throws IOException { + // given + String expectedResponse = "{}"; + HttpHandler spy = Mockito.spy(new GzippedResponseContentHandler(expectedResponse)); + server.createContext("/gzipped", spy); + + // when + final RequestFactory f = defaultRequestFactory(ContentEncoding.IDENTITY); + Request r = f.createRequest(serverAddress.resolve("/gzipped"), "GET"); + Response executed = r.execute(); + String actualResponse = readStream(executed.getBody()); + + // then + Mockito.verify(spy).handle(exchangeCaptor.capture()); + HttpExchange capturedArgument = exchangeCaptor.getValue(); + assertThat("accept-encoding header", capturedArgument.getRequestHeaders().get("accept-encoding"), equalTo(Arrays.asList("gzip"))); + assertThat("no content-encoding header", capturedArgument.getRequestHeaders().get("content-encoding"), nullValue()); + assertEquals(URI.create("/gzipped"), capturedArgument.getRequestURI()); + assertEquals(expectedResponse, actualResponse); + } + + @Test(expected = SocketTimeoutException.class) + public void testTimeout() throws IOException { + // given + server.createContext("/timeout", exchange -> { + try { + Thread.sleep(10l); + exchange.sendResponseHeaders(201, 0); + } catch (InterruptedException e) { } + }); + + // when + OkHttpClient client = new OkHttpClient.Builder() + .readTimeout(1, TimeUnit.MILLISECONDS) + .build(); + OkHttp3ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(client); + SpringRequestFactory f = new SpringRequestFactory(clientHttpRequestFactory, ContentEncoding.IDENTITY); + Request r = f.createRequest(serverAddress.resolve("/timeout"), "GET"); + r.execute(); + } + + @Test + public void testGzippedRequestBody() throws IOException { + // given + String requestBody = "{}"; + String responseBody = "{}"; + SimpleRequestResponseContentHandler spy = Mockito.spy(new SimpleRequestResponseContentHandler(responseBody)); + server.createContext("/gzipped-post", spy); + + // when + final RequestFactory f = defaultRequestFactory(ContentEncoding.GZIP); + Request r = f.createRequest(serverAddress.resolve("/gzipped-post"), "POST"); + r.getHeaders().setContentType(ContentType.APPLICATION_JSON); + try (final OutputStream body = r.getBody()) { + body.write(requestBody.getBytes()); + } + Response executed = r.execute(); + String actualResponse = readStream(executed.getBody()); + + // then + Mockito.verify(spy).handle(exchangeCaptor.capture()); + HttpExchange capturedArgument = exchangeCaptor.getValue(); + assertEquals("POST", capturedArgument.getRequestMethod()); + assertEquals(URI.create("/gzipped-post"), capturedArgument.getRequestURI()); + assertThat("content-encoding header", capturedArgument.getRequestHeaders().get("content-encoding"), equalTo(Arrays.asList("gzip"))); + assertEquals(requestBody, spy.getRequestBody()); + assertEquals(responseBody, actualResponse); + } + + @NotNull + private RequestFactory defaultRequestFactory(ContentEncoding contentEncoding) { + final OkHttpClient client = new OkHttpClient.Builder() + .build(); + final OkHttp3ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(client); + final SpringRequestFactory f = new SpringRequestFactory(clientHttpRequestFactory, contentEncoding); + return f; + } + + static String readStream(InputStream stream) throws IOException { + String res = new BufferedReader( + new InputStreamReader(stream, UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + stream.close(); + return res; + } + + private static class SimpleRequestResponseContentHandler implements HttpHandler { + + private String requestBody; + private final String responseBody; + + SimpleRequestResponseContentHandler(String responseBody) { + this.responseBody = responseBody; + } + + public String getRequestBody() { + return requestBody; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + try { + if (exchange.getRequestHeaders().containsKey("Content-Encoding") && exchange.getRequestHeaders().get("Content-Encoding").contains("gzip")) { + requestBody = readStream(new GZIPInputStream(exchange.getRequestBody())); + } else { + requestBody = readStream(exchange.getRequestBody()); + } + } catch (Exception e) { + e.printStackTrace(); + } + + byte[] bytes = responseBody.getBytes(UTF_8); + exchange.sendResponseHeaders(200, bytes.length); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(bytes); + responseBody.close(); + } + } + + private static class GzippedResponseContentHandler implements HttpHandler { + + private final byte[] rawResponse; + + GzippedResponseContentHandler(String response) throws IOException { + byte[] stringResponse = response.getBytes(UTF_8); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + GZIPOutputStream zipStream = new GZIPOutputStream(byteStream); + zipStream.write(stringResponse); + zipStream.close(); + this.rawResponse = byteStream.toByteArray(); + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + exchange.getResponseHeaders().set("Content-Encoding", "gzip"); + exchange.sendResponseHeaders(200, rawResponse.length); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(rawResponse); + responseBody.close(); + } + } +} diff --git a/fahrschein-jdbc/src/test/resources/log4j2.xml b/fahrschein-jdbc/src/test/resources/log4j2.xml index 3fe57178..4822b7a1 100644 --- a/fahrschein-jdbc/src/test/resources/log4j2.xml +++ b/fahrschein-jdbc/src/test/resources/log4j2.xml @@ -4,7 +4,7 @@ + pattern="%date {%level} [%thread] [%logger] %message%n"/> diff --git a/fahrschein/src/main/java/org/zalando/fahrschein/IdentityAcceptEncodingRequestFactory.java b/fahrschein/src/main/java/org/zalando/fahrschein/IdentityAcceptEncodingRequestFactory.java new file mode 100644 index 00000000..60ac148f --- /dev/null +++ b/fahrschein/src/main/java/org/zalando/fahrschein/IdentityAcceptEncodingRequestFactory.java @@ -0,0 +1,23 @@ +package org.zalando.fahrschein; + +import org.zalando.fahrschein.http.api.Request; +import org.zalando.fahrschein.http.api.RequestFactory; + +import java.io.IOException; +import java.net.URI; + +public class IdentityAcceptEncodingRequestFactory implements RequestFactory { + + private final RequestFactory delegate; + + public IdentityAcceptEncodingRequestFactory(RequestFactory delegate) { + this.delegate = delegate; + } + + @Override + public Request createRequest(URI uri, String method) throws IOException { + Request request = delegate.createRequest(uri, method); + request.getHeaders().put("Accept-Encoding", "identity"); + return request; + } +} diff --git a/fahrschein/src/main/java/org/zalando/fahrschein/NakadiClient.java b/fahrschein/src/main/java/org/zalando/fahrschein/NakadiClient.java index fc01a280..3f21f0d8 100644 --- a/fahrschein/src/main/java/org/zalando/fahrschein/NakadiClient.java +++ b/fahrschein/src/main/java/org/zalando/fahrschein/NakadiClient.java @@ -38,8 +38,8 @@ public class NakadiClient { private final ObjectMapper objectMapper; private final CursorManager cursorManager; - public static NakadiClientBuilder builder(URI baseUri) { - return new NakadiClientBuilder(baseUri); + public static NakadiClientBuilder builder(URI baseUri, RequestFactory clientHttpRequestFactory) { + return new NakadiClientBuilder(baseUri, clientHttpRequestFactory); } NakadiClient(URI baseUri, RequestFactory clientHttpRequestFactory, ObjectMapper objectMapper, CursorManager cursorManager) { diff --git a/fahrschein/src/main/java/org/zalando/fahrschein/NakadiClientBuilder.java b/fahrschein/src/main/java/org/zalando/fahrschein/NakadiClientBuilder.java index 9db1928e..c529249d 100644 --- a/fahrschein/src/main/java/org/zalando/fahrschein/NakadiClientBuilder.java +++ b/fahrschein/src/main/java/org/zalando/fahrschein/NakadiClientBuilder.java @@ -9,8 +9,6 @@ import static org.zalando.fahrschein.Preconditions.checkNotNull; public final class NakadiClientBuilder { - public static final int DEFAULT_CONNECT_TIMEOUT = 500; - public static final int DEFAULT_READ_TIMEOUT = 60 * 1000; private final URI baseUri; @Nullable @@ -22,8 +20,8 @@ public final class NakadiClientBuilder { @Nullable private final CursorManager cursorManager; - NakadiClientBuilder(final URI baseUri) { - this(baseUri, DefaultObjectMapper.INSTANCE, null, null, null); + NakadiClientBuilder(final URI baseUri, RequestFactory requestFactory) { + this(baseUri, DefaultObjectMapper.INSTANCE, null, requestFactory, null); } private NakadiClientBuilder(URI baseUri, @Nullable ObjectMapper objectMapper, @Nullable AuthorizationProvider authorizationProvider, @Nullable RequestFactory clientHttpRequestFactory, @Nullable CursorManager cursorManager) { @@ -46,21 +44,10 @@ public NakadiClientBuilder withAuthorizationProvider(AuthorizationProvider autho return new NakadiClientBuilder(baseUri, objectMapper, authorizationProvider, clientHttpRequestFactory, cursorManager); } - public NakadiClientBuilder withRequestFactory(RequestFactory clientHttpRequestFactory) { - return new NakadiClientBuilder(baseUri, objectMapper, authorizationProvider, clientHttpRequestFactory, cursorManager); - } - public NakadiClientBuilder withCursorManager(CursorManager cursorManager) { return new NakadiClientBuilder(baseUri, objectMapper, authorizationProvider, clientHttpRequestFactory, cursorManager); } - private RequestFactory defaultClientHttpRequestFactory() { - final SimpleRequestFactory clientHttpRequestFactory = new SimpleRequestFactory(); - clientHttpRequestFactory.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); - clientHttpRequestFactory.setReadTimeout(DEFAULT_READ_TIMEOUT); - return clientHttpRequestFactory; - } - static RequestFactory wrapClientHttpRequestFactory(RequestFactory delegate, @Nullable AuthorizationProvider authorizationProvider) { RequestFactory requestFactory = new ProblemHandlingRequestFactory(delegate); if (authorizationProvider != null) { @@ -71,7 +58,7 @@ static RequestFactory wrapClientHttpRequestFactory(RequestFactory delegate, @Nul } public NakadiClient build() { - final RequestFactory clientHttpRequestFactory = wrapClientHttpRequestFactory(this.clientHttpRequestFactory != null ? this.clientHttpRequestFactory : defaultClientHttpRequestFactory(), authorizationProvider); + final RequestFactory clientHttpRequestFactory = wrapClientHttpRequestFactory(this.clientHttpRequestFactory, authorizationProvider); final CursorManager cursorManager = this.cursorManager != null ? this.cursorManager : new ManagedCursorManager(baseUri, clientHttpRequestFactory, true); final ObjectMapper objectMapper = this.objectMapper != null ? this.objectMapper : DefaultObjectMapper.INSTANCE; diff --git a/fahrschein/src/main/java/org/zalando/fahrschein/domain/Metadata.java b/fahrschein/src/main/java/org/zalando/fahrschein/domain/Metadata.java index 5223c0f7..9ab54c09 100644 --- a/fahrschein/src/main/java/org/zalando/fahrschein/domain/Metadata.java +++ b/fahrschein/src/main/java/org/zalando/fahrschein/domain/Metadata.java @@ -15,6 +15,7 @@ public final class Metadata { private final String eid; private final String partition; private final String version; + private final String publishedBy; private final OffsetDateTime occurredAt; private final OffsetDateTime receivedAt; private final String flowId; @@ -22,27 +23,28 @@ public final class Metadata { @JsonCreator @Deprecated - private Metadata(@JsonProperty("event_type") String eventType, @JsonProperty("eid") String eid, @JsonProperty("occurred_at") String occurredAt, @JsonProperty("partition") String partition, @JsonProperty("version") String version, @JsonProperty("received_at") String receivedAt, @JsonProperty("flow_id") String flowId, @JsonProperty("span_ctx") Map spanCtx) { - this(eventType, eid, occurredAt == null ? null : OffsetDateTime.parse(occurredAt), partition, version, receivedAt == null ? null : OffsetDateTime.parse(receivedAt), flowId, spanCtx); + private Metadata(@JsonProperty("event_type") String eventType, @JsonProperty("eid") String eid, @JsonProperty("occurred_at") String occurredAt, @JsonProperty("partition") String partition, @JsonProperty("version") String version, @JsonProperty("published_by") String publishedBy, @JsonProperty("received_at") String receivedAt, @JsonProperty("flow_id") String flowId, @JsonProperty("span_ctx") Map spanCtx) { + this(eventType, eid, occurredAt == null ? null : OffsetDateTime.parse(occurredAt), partition, version, publishedBy, receivedAt == null ? null : OffsetDateTime.parse(receivedAt), flowId, spanCtx); } - public Metadata(String eventType, String eid, OffsetDateTime occurredAt, String partition, String version, OffsetDateTime receivedAt, String flowId) { - this(eventType, eid, occurredAt, partition, version, receivedAt, flowId, null); + public Metadata(String eventType, String eid, OffsetDateTime occurredAt, String partition, String version, String publishedBy, OffsetDateTime receivedAt, String flowId) { + this(eventType, eid, occurredAt, partition, version, publishedBy, receivedAt, flowId, null); } - public Metadata(String eventType, String eid, OffsetDateTime occurredAt, String partition, String version, OffsetDateTime receivedAt, String flowId, Map spanCtx) { + public Metadata(String eventType, String eid, OffsetDateTime occurredAt, String partition, String version, String publishedBy, OffsetDateTime receivedAt, String flowId, Map spanCtx) { this.eventType = eventType; this.eid = eid; this.occurredAt = occurredAt; this.partition = partition; this.version = version; + this.publishedBy = publishedBy; this.receivedAt = receivedAt; this.flowId = flowId; this.spanCtx = spanCtx == null ? Collections.emptyMap() : Collections.unmodifiableMap(new LinkedHashMap<>(spanCtx)); } public Metadata(String eid, OffsetDateTime occurredAt) { - this(null, eid, occurredAt, null, null, null, null, null); + this(null, eid, occurredAt, null, null, null, null, null, null); } public String getEventType() { @@ -73,6 +75,10 @@ public Map getSpanCtx() { return spanCtx; } + public String getPublishedBy() { + return publishedBy; + } + @Override public String toString() { return "Metadata{" + @@ -81,6 +87,7 @@ public String toString() { ", occurredAt=" + occurredAt + ", partition='" + partition + '\'' + ", version='" + version + '\'' + + ", publishedBy='" + publishedBy + '\'' + ", receivedAt=" + receivedAt + ", flowId='" + flowId + '\'' + '}'; diff --git a/fahrschein/src/test/java/org/zalando/fahrschein/IdentityAcceptEncodingRequestFactoryTest.java b/fahrschein/src/test/java/org/zalando/fahrschein/IdentityAcceptEncodingRequestFactoryTest.java new file mode 100644 index 00000000..8f80cdf7 --- /dev/null +++ b/fahrschein/src/test/java/org/zalando/fahrschein/IdentityAcceptEncodingRequestFactoryTest.java @@ -0,0 +1,48 @@ +package org.zalando.fahrschein; + +import org.junit.Test; +import org.mockito.Mockito; +import org.zalando.fahrschein.http.api.Headers; +import org.zalando.fahrschein.http.api.HeadersImpl; +import org.zalando.fahrschein.http.api.Request; +import org.zalando.fahrschein.http.api.RequestFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +public class IdentityAcceptEncodingRequestFactoryTest { + + private final Request request = Mockito.mock(Request.class); + private final RequestFactory delegate = Mockito.mock(RequestFactory.class); + + @Test + public void shouldSetAcceptEncodingHeader() throws IOException { + final Headers headers = new HeadersImpl(); + when(request.getHeaders()).thenReturn(headers); + when(delegate.createRequest(any(URI.class), any(String.class))).thenReturn(request); + + IdentityAcceptEncodingRequestFactory SUT = new IdentityAcceptEncodingRequestFactory(delegate); + SUT.createRequest(URI.create("any://uri"), "GET"); + + assertEquals(Arrays.asList("identity"), headers.get("Accept-Encoding")); + } + + @Test + public void shouldOverrideExistingAcceptEncodingHeader() throws IOException { + final Headers headers = new HeadersImpl(); + headers.put("Accept-Encoding", "gzip"); + when(request.getHeaders()).thenReturn(headers); + when(delegate.createRequest(any(URI.class), any(String.class))).thenReturn(request); + + IdentityAcceptEncodingRequestFactory SUT = new IdentityAcceptEncodingRequestFactory(delegate); + SUT.createRequest(URI.create("any://uri"), "GET"); + + assertEquals(Arrays.asList("identity"), headers.get("Accept-Encoding")); + } + +} diff --git a/fahrschein/src/test/java/org/zalando/fahrschein/NakadiClientTest.java b/fahrschein/src/test/java/org/zalando/fahrschein/NakadiClientTest.java index 0bf40ad0..09468861 100644 --- a/fahrschein/src/test/java/org/zalando/fahrschein/NakadiClientTest.java +++ b/fahrschein/src/test/java/org/zalando/fahrschein/NakadiClientTest.java @@ -8,6 +8,7 @@ import org.zalando.fahrschein.domain.Subscription; import org.zalando.fahrschein.domain.SubscriptionRequest; import org.zalando.fahrschein.http.api.ContentType; +import org.zalando.fahrschein.http.simple.SimpleRequestFactory; import java.io.IOException; import java.net.URI; @@ -50,8 +51,7 @@ public void setup() { final CursorManager cursorManager = mock(CursorManager.class); - final NakadiClient nakadiClient = NakadiClient.builder(URI.create("http://example.com/")) - .withRequestFactory(clientHttpRequestFactory) + final NakadiClient nakadiClient = NakadiClient.builder(URI.create("http://example.com/"), clientHttpRequestFactory) .withCursorManager(cursorManager) .build(); diff --git a/fahrschein/src/test/java/org/zalando/fahrschein/domain/MetadataTest.java b/fahrschein/src/test/java/org/zalando/fahrschein/domain/MetadataTest.java index 2cf345b0..735ef43a 100644 --- a/fahrschein/src/test/java/org/zalando/fahrschein/domain/MetadataTest.java +++ b/fahrschein/src/test/java/org/zalando/fahrschein/domain/MetadataTest.java @@ -15,17 +15,19 @@ public void shouldHaveCleanStringRepresentation() { final String eid = "a3e25946-5ae9-3964-91fa-26ecb7588d67"; final String partition = "partition1"; final String version = "v1"; + final String publishedBy = "unauthenticated"; final OffsetDateTime occurredAt = OffsetDateTime.now(); final OffsetDateTime receivedAt = OffsetDateTime.now(); final String flowId = UUID.randomUUID().toString(); - final Metadata metadata = new Metadata(eventType, eid, occurredAt, partition, version, receivedAt, flowId, Collections.emptyMap()); + final Metadata metadata = new Metadata(eventType, eid, occurredAt, partition, version, publishedBy, receivedAt, flowId, Collections.emptyMap()); Assert.assertTrue(metadata.toString().contains(metadata.getEventType())); Assert.assertTrue(metadata.toString().contains(metadata.getEid())); Assert.assertTrue(metadata.toString().contains(metadata.getOccurredAt().toString())); Assert.assertTrue(metadata.toString().contains(metadata.getPartition())); Assert.assertTrue(metadata.toString().contains(metadata.getVersion())); + Assert.assertTrue(metadata.toString().contains(metadata.getPublishedBy())); Assert.assertTrue(metadata.toString().contains(metadata.getReceivedAt().toString())); Assert.assertTrue(metadata.toString().contains(metadata.getFlowId())); } diff --git a/fahrschein/src/test/resources/log4j2.xml b/fahrschein/src/test/resources/log4j2.xml index 5794d77c..81fbc33e 100644 --- a/fahrschein/src/test/resources/log4j2.xml +++ b/fahrschein/src/test/resources/log4j2.xml @@ -4,7 +4,7 @@ + pattern="%date {%level} [%thread] [%logger] %message%n"/>