Skip to content

Commit cdc3849

Browse files
committed
#654 Some code cleanup
1 parent 3396e85 commit cdc3849

File tree

5 files changed

+62
-51
lines changed

5 files changed

+62
-51
lines changed

gateleen-kafka/README_kafka.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ The following topic configuration values are required:
4848
Besides these required configuration values, additional string values can be added. See documentation from Apache Kafka [here](https://kafka.apache.org/documentation/#producerconfigs).
4949

5050
## Usage
51-
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
51+
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
5252
incoming requests. See [Playground Server](../gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java) and [Runconfig](../gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java).
5353

5454
The following sequence diagram shows the setup of the "MainVerticle". The `streamingPath` (KafkaHandler) is configured to `/playground/server/streaming/`

gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,11 @@ Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer,
4343
Promise<Void> promise = Promise.promise();
4444
log.debug("Start processing {} messages for kafka", messages.size());
4545

46-
@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
47-
List<Future> futures = messages.stream()
46+
List<Future<Void>> futures = messages.stream()
4847
.map(message -> KafkaMessageSender.this.sendMessage(kafkaProducer, message))
4948
.collect(toList());
5049

51-
CompositeFuture.all(futures).<Void>mapEmpty().onComplete(result -> {
50+
Future.all(futures).<Void>mapEmpty().onComplete(result -> {
5251
if (result.succeeded()) {
5352
promise.complete();
5453
log.debug("Batch messages successfully sent to Kafka.");

gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ Optional<Pair<KafkaProducer<String, String>, Pattern>> findMatchingKafkaProducer
4646
Promise<Void> closeAll() {
4747
log.info("About to close all kafka producers");
4848
Promise<Void> promise = Promise.promise();
49-
List<Future> futures = new ArrayList<>();
49+
List<Future<Void>> futures = new ArrayList<>();
5050

5151
for (Map.Entry<Pattern, KafkaProducer<String, String>> entry : kafkaProducers.entrySet()) {
52-
Promise entryFuture = Promise.promise();
52+
Promise<Void> entryFuture = Promise.promise();
5353
futures.add(entryFuture.future());
5454
entry.getValue().close(event -> {
5555
if (event.succeeded()) {
@@ -62,7 +62,7 @@ Promise<Void> closeAll() {
6262
}
6363

6464
// wait for all producers to be closed
65-
CompositeFuture.all(futures).onComplete(event -> {
65+
Future.all(futures).onComplete(event -> {
6666
kafkaProducers.clear();
6767
promise.complete();
6868
});

gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.vertx.core.Future;
44
import io.vertx.core.MultiMap;
5-
import io.vertx.core.Promise;
65
import io.vertx.core.Vertx;
76
import io.vertx.core.http.HttpMethod;
87
import io.vertx.core.http.HttpServerRequest;
@@ -30,7 +29,6 @@
3029
import java.util.List;
3130
import java.util.Map;
3231
import java.util.concurrent.Callable;
33-
import java.util.concurrent.atomic.AtomicReference;
3432
import java.util.regex.Pattern;
3533

3634
import static java.lang.Thread.currentThread;
@@ -66,7 +64,7 @@ public class KafkaHandlerTest {
6664
private ConfigurationResourceManager configurationResourceManager;
6765
private KafkaHandler handler;
6866
private MockResourceStorage storage;
69-
private GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory();
67+
private final GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory();
7068
private Vertx vertxMock;
7169

7270
private final String configResourceUri = "/kafka/topicsConfig";

gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java

+55-41
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@
88
import io.vertx.ext.unit.TestContext;
99
import io.vertx.ext.unit.junit.VertxUnitRunner;
1010
import io.vertx.kafka.client.producer.KafkaProducerRecord;
11-
import org.junit.Rule;
1211
import org.junit.Test;
13-
import org.junit.rules.ExpectedException;
1412
import org.junit.runner.RunWith;
1513
import org.swisspush.gateleen.core.util.JsonObjectUtils;
1614
import org.swisspush.gateleen.validation.ValidationException;
1715

1816
import java.util.List;
1917

18+
import static org.junit.Assert.assertThrows;
2019
import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords;
2120

2221
/**
@@ -27,35 +26,40 @@
2726
@RunWith(VertxUnitRunner.class)
2827
public class KafkaProducerRecordBuilderTest {
2928

30-
@Rule
31-
public ExpectedException thrown = ExpectedException.none();
32-
3329
@Test
34-
public void buildRecordsInvalidJson() throws ValidationException {
35-
thrown.expect( ValidationException.class );
36-
thrown.expectMessage("Error while parsing payload");
37-
buildRecords("myTopic", Buffer.buffer("notValidJson"));
30+
public void buildRecordsInvalidJson(TestContext context) {
31+
Exception exception = assertThrows(ValidationException.class, () -> {
32+
buildRecords("myTopic", Buffer.buffer("notValidJson"));
33+
});
34+
35+
context.assertEquals("Error while parsing payload", exception.getMessage());
3836
}
3937

4038
@Test
41-
public void buildRecordsMissingRecordsArray() throws ValidationException {
42-
thrown.expect( ValidationException.class );
43-
thrown.expectMessage("Missing 'records' array");
44-
buildRecords("myTopic", Buffer.buffer("{}"));
39+
public void buildRecordsMissingRecordsArray(TestContext context) {
40+
Exception exception = assertThrows(ValidationException.class, () -> {
41+
buildRecords("myTopic", Buffer.buffer("{}"));
42+
});
43+
44+
context.assertEquals("Missing 'records' array", exception.getMessage());
4545
}
4646

4747
@Test
48-
public void buildRecordsNotArray() throws ValidationException {
49-
thrown.expect( ValidationException.class );
50-
thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects");
51-
buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}"));
48+
public void buildRecordsNotArray(TestContext context) {
49+
Exception exception = assertThrows(ValidationException.class, () -> {
50+
buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}"));
51+
});
52+
53+
context.assertEquals("Property 'records' must be of type JsonArray holding JsonObject objects", exception.getMessage());
5254
}
5355

5456
@Test
55-
public void buildRecordsInvalidRecordsType() throws ValidationException {
56-
thrown.expect( ValidationException.class );
57-
thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects");
58-
buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}"));
57+
public void buildRecordsInvalidRecordsType(TestContext context) {
58+
Exception exception = assertThrows(ValidationException.class, () -> {
59+
buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}"));
60+
});
61+
62+
context.assertEquals("Property 'records' must be of type JsonArray holding JsonObject objects", exception.getMessage());
5963
}
6064

6165
@Test
@@ -66,38 +70,48 @@ public void buildRecordsEmptyRecordsArray(TestContext context) throws Validation
6670
}
6771

6872
@Test
69-
public void buildRecordsInvalidKeyType() throws ValidationException {
70-
thrown.expect( ValidationException.class );
71-
thrown.expectMessage("Property 'key' must be of type String");
72-
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}"));
73+
public void buildRecordsInvalidKeyType(TestContext context) {
74+
Exception exception = assertThrows(ValidationException.class, () -> {
75+
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}"));
76+
});
77+
78+
context.assertEquals("Property 'key' must be of type String", exception.getMessage());
7379
}
7480

7581
@Test
76-
public void buildRecordsInvalidValueType() throws ValidationException {
77-
thrown.expect( ValidationException.class );
78-
thrown.expectMessage("Property 'value' must be of type JsonObject");
79-
buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}"));
82+
public void buildRecordsInvalidValueType(TestContext context) {
83+
Exception exception = assertThrows(ValidationException.class, () -> {
84+
buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}"));
85+
});
86+
87+
context.assertEquals("Property 'value' must be of type JsonObject", exception.getMessage());
8088
}
8189

8290
@Test
83-
public void buildRecordsMissingValue() throws ValidationException {
84-
thrown.expect( ValidationException.class );
85-
thrown.expectMessage("Property 'value' is required");
86-
buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}"));
91+
public void buildRecordsMissingValue(TestContext context) {
92+
Exception exception = assertThrows(ValidationException.class, () -> {
93+
buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}"));
94+
});
95+
96+
context.assertEquals("Property 'value' is required", exception.getMessage());
8797
}
8898

8999
@Test
90-
public void buildRecordsInvalidHeadersType() throws ValidationException {
91-
thrown.expect( ValidationException.class );
92-
thrown.expectMessage("Property 'headers' must be of type JsonObject");
93-
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}"));
100+
public void buildRecordsInvalidHeadersType(TestContext context) {
101+
Exception exception = assertThrows(ValidationException.class, () -> {
102+
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}"));
103+
});
104+
105+
context.assertEquals("Property 'headers' must be of type JsonObject", exception.getMessage());
94106
}
95107

96108
@Test
97-
public void buildRecordsInvalidHeadersValueType() throws ValidationException {
98-
thrown.expect( ValidationException.class );
99-
thrown.expectMessage("Property 'headers' must be of type JsonObject holding String values only");
100-
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}"));
109+
public void buildRecordsInvalidHeadersValueType(TestContext context) {
110+
Exception exception = assertThrows(ValidationException.class, () -> {
111+
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}"));
112+
});
113+
114+
context.assertEquals("Property 'headers' must be of type JsonObject holding String values only", exception.getMessage());
101115
}
102116

103117
@Test

0 commit comments

Comments
 (0)