Skip to content

Commit

Permalink
Showing 23 changed files with 407 additions and 250 deletions.
1 change: 1 addition & 0 deletions kafka-streams/build.gradle
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ dependencies {
testCompile "org.apache.kafka:kafka-clients:${kafkaVersion}:test"
testCompile "org.apache.kafka:kafka_2.12:${kafkaVersion}"
testCompile "org.apache.kafka:kafka_2.12:${kafkaVersion}:test"
testCompile "org.testcontainers:kafka:1.12.3"
}

test {
Original file line number Diff line number Diff line change
@@ -29,37 +29,41 @@ import io.micronaut.context.ApplicationContext
import io.micronaut.core.util.CollectionUtils
import io.micronaut.inject.qualifiers.Qualifiers
import org.apache.kafka.streams.KafkaStreams
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

class KafkaStreamsSpec extends Specification {

@Shared
@AutoCleanup
ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [
WordCountStream.INPUT,
WordCountStream.OUTPUT,
WordCountStream.NAMED_WORD_COUNT_INPUT,
WordCountStream.NAMED_WORD_COUNT_OUTPUT,
OptimizationStream.OPTIMIZATION_ON_INPUT,
OptimizationStream.OPTIMIZATION_OFF_INPUT
],
'kafka.generic.config', "hello",
'kafka.streams.my-stream.application.id', 'my-stream',
'kafka.streams.my-stream.num.stream.threads', 10,
'kafka.streams.optimization-on.application.id', 'optimization-on',
'kafka.streams.optimization-on.topology.optimization', 'all',
'kafka.streams.optimization-off.application.id', 'optimization-off',
'kafka.streams.optimization-off.topology.optimization', 'none'
)
)

],
'kafka.generic.config', "hello",
'kafka.streams.my-stream.application.id', 'my-stream',
'kafka.streams.my-stream.num.stream.threads', 10,
'kafka.streams.optimization-on.application.id', 'optimization-on',
'kafka.streams.optimization-on.topology.optimization', 'all',
'kafka.streams.optimization-off.application.id', 'optimization-off',
'kafka.streams.optimization-off.topology.optimization', 'none'
)
)
}
void "test config"() {
when:
def builder = context.getBean(ConfiguredStreamBuilder, Qualifiers.byName('my-stream'))
2 changes: 2 additions & 0 deletions kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -36,6 +36,8 @@ dependencies {
testCompile "org.apache.kafka:kafka_2.12:${kafkaVersion}"
testCompile "org.apache.kafka:kafka_2.12:${kafkaVersion}:test"

testCompile "org.testcontainers:kafka:1.12.3"

testCompile 'io.opentracing.contrib:opentracing-kafka-client:0.0.16'
testCompile 'io.opentracing:opentracing-mock:0.31.0'

Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

@@ -98,72 +99,102 @@ public synchronized AbstractKafkaConfiguration onCreated(BeanCreatedEvent<Abstra
} catch (NumberFormatException e) {
return config;
}
} else if (SocketUtils.isTcpPortAvailable(AbstractKafkaConfiguration.DEFAULT_KAFKA_PORT)) {
kafkaPort = AbstractKafkaConfiguration.DEFAULT_KAFKA_PORT;
}

// only handle localhost
if (embeddedConfiguration.isEnabled() &&
kafkaServer == null &&
kafkaPort > -1 &&
SocketUtils.isTcpPortAvailable(kafkaPort) &&
init.compareAndSet(false, true)) {
try {
if (zkServer == null) {
initZooKeeper();
}
boolean randomPort = kafkaPort == -1;
if (embeddedConfiguration.isEnabled()) {

// setup Broker
Properties brokerProps = embeddedConfiguration.getProperties();
String zkConnect = "127.0.0.1:" + zkServer.port();
if (LOG.isWarnEnabled()) {
LOG.warn("Embedded Kafka is deprecated. For Testing please use Test Containers instead: https://www.testcontainers.org/modules/kafka/");
}
int retries = 0;
do {
// only handle localhost
final int targetPort = randomPort ? SocketUtils.findAvailableTcpPort() : kafkaPort;
if (kafkaServer == null &&
targetPort > -1 &&
SocketUtils.isTcpPortAvailable(targetPort) &&
init.compareAndSet(false, true)) {
try {
if (zkServer == null) {
initZooKeeper();
}

brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.putIfAbsent("broker.id", "0");
brokerProps.put("port", kafkaPort);
brokerProps.putIfAbsent("offsets.topic.replication.factor" , "1");
// setup Broker
Properties brokerProps = embeddedConfiguration.getProperties();
String zkConnect = "127.0.0.1:" + zkServer.port();

brokerProps.computeIfAbsent("log.dirs", o -> {
try {
return Files.createTempDirectory("kafka-").toAbsolutePath().toString();
} catch (IOException e) {
throw new ConfigurationException("Error creating log directory for embedded Kafka server: " + e.getMessage(), e);
}
});

brokerProps.setProperty(
"listeners",
"PLAINTEXT://127.0.0.1:" + kafkaPort
);
KafkaConfig kafkaConfig = new KafkaConfig(brokerProps);
this.kafkaServer = TestUtils.createServer(kafkaConfig, new MockTime());
if (LOG.isInfoEnabled()) {
LOG.info("Started Embedded Kafka on Port: {}", kafkaPort);
}
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.putIfAbsent("broker.id", "0");

List<String> topics = embeddedConfiguration.getTopics();
brokerProps.put("port", targetPort);
brokerProps.putIfAbsent("offsets.topic.replication.factor" , "1");

if (LOG.isDebugEnabled()) {
LOG.debug("Creating Kafka Topics in Embedded Kafka: {}", topics);
}
if (!topics.isEmpty()) {
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, ("127.0.0.1:" + kafkaPort));
AdminClient adminClient = AdminClient.create(properties);
final CreateTopicsResult result = adminClient.createTopics(topics.stream().map(s -> new NewTopic(s, kafkaConfig.numPartitions(), (short) 1)).collect(Collectors.toList()));
result.all().get();

if (LOG.isInfoEnabled()) {
LOG.info("Created Kafka Topics in Embedded Kafka: {}", topics);
brokerProps.computeIfAbsent("log.dirs", o -> {
try {
return Files.createTempDirectory("kafka-").toAbsolutePath().toString();
} catch (IOException e) {
throw new ConfigurationException("Error creating log directory for embedded Kafka server: " + e.getMessage(), e);
}
});

brokerProps.setProperty(
"listeners",
"PLAINTEXT://127.0.0.1:" + targetPort
);
KafkaConfig kafkaConfig = new KafkaConfig(brokerProps);
this.kafkaServer = TestUtils.createServer(kafkaConfig, new MockTime());
final Integer numPartitions = kafkaConfig.numPartitions();
if (LOG.isInfoEnabled()) {
LOG.info("Started Embedded Kafka on Port: {}", targetPort);
}

createTopics(targetPort, numPartitions);
return config;
} catch (Throwable e) {
// check server not already running
if (!e.getMessage().contains("Address already in use")) {
throw new ConfigurationException("Error starting embedded Kafka server: " + e.getMessage(), e);

}
retries++;
}
}
} catch (Throwable e) {
// check server not already running
if (!e.getMessage().contains("Address already in use")) {
throw new ConfigurationException("Error starting embedded Kafka server: " + e.getMessage(), e);

} while (retries < 3);
throw new ConfigurationException("Error starting embedded Kafka server. Could not start after attempting port binding several times");
} else {
if (kafkaPort > -1) {
try {
createTopics(kafkaPort, 1);
} catch (Throwable e) {
throw new ConfigurationException("Error creating Kafka Topics: " + e.getMessage(), e);
}
}
return config;
}

}

private void createTopics(int targetPort, Integer numPartitions) throws InterruptedException, java.util.concurrent.ExecutionException {
List<String> topics = embeddedConfiguration.getTopics();

if (LOG.isDebugEnabled()) {
LOG.debug("Creating Kafka Topics in Embedded Kafka: {}", topics);
}
if (!topics.isEmpty()) {
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, ("127.0.0.1:" + targetPort));
AdminClient adminClient = AdminClient.create(properties);
final CreateTopicsResult result = adminClient.createTopics(topics.stream().map(s ->
new NewTopic(s, numPartitions, (short) 1)).collect(Collectors.toList())
);
result.all().get();

if (LOG.isInfoEnabled()) {
LOG.info("Created Kafka Topics in Embedded Kafka: {}", topics);
}
}
return config;
}

@Override
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import io.micronaut.core.util.CollectionUtils
import io.micronaut.messaging.annotation.Header
import io.micronaut.messaging.annotation.SendTo
import io.reactivex.Flowable
import org.testcontainers.containers.KafkaContainer
import reactor.core.publisher.Flux
import spock.lang.AutoCleanup
import spock.lang.Shared
@@ -43,19 +44,24 @@ class KafkaBatchListenerSpec extends Specification {
public static final String BOOKS_ARRAY_TOPIC = 'KafkaBatchListenerSpec-books-array'
public static final String TITLES_TOPIC = 'KafkaBatchListenerSpec-titles'

@Shared @AutoCleanup ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS,
[ TITLES_TOPIC,
BOOKS_LIST_TOPIC,
BOOKS_ARRAY_TOPIC,
BOOKS_TOPIC,
BOOKS_FORWARD_LIST_TOPIC
]
)
)
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS,
[ TITLES_TOPIC,
BOOKS_LIST_TOPIC,
BOOKS_ARRAY_TOPIC,
BOOKS_TOPIC,
BOOKS_FORWARD_LIST_TOPIC
]
)
)
}


void "test send batch list with headers - blocking"() {
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.serialization.StringSerializer
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -47,25 +48,30 @@ import spock.util.concurrent.PollingConditions
@Stepwise
class KafkaListenerSpec extends Specification {

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context
@Shared
@AutoCleanup
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
"micrometer.metrics.enabled", true,
'endpoints.metrics.sensitive', false,
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["words", "books", "words-records", "books-records"]
)
)

EmbeddedServer embeddedServer
@Shared
@AutoCleanup
ApplicationContext context = embeddedServer.applicationContext
RxHttpClient httpClient

def setupSpec() {
kafkaContainer.start()
embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
"micrometer.metrics.enabled", true,
'endpoints.metrics.sensitive', false,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["words", "books", "words-records", "books-records"]
)
)
context = embeddedServer.applicationContext
httpClient = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL(), new DefaultHttpClientConfiguration(followRedirects: false))
}


@Shared
@AutoCleanup
RxHttpClient httpClient = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL(), new DefaultHttpClientConfiguration(followRedirects: false))

void "test simple consumer"() {
given:
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import io.micronaut.core.util.CollectionUtils
import io.micronaut.messaging.annotation.Body
import io.micronaut.runtime.server.EmbeddedServer
import org.apache.kafka.clients.consumer.Consumer
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -16,21 +17,28 @@ import java.util.concurrent.ConcurrentSkipListSet

class KafkaPauseResumeSpec extends Specification {

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()

@Shared
@AutoCleanup
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
"micrometer.metrics.enabled", true,
'endpoints.metrics.sensitive', false,
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["fruits"]
)
)
EmbeddedServer embeddedServer

@Shared
@AutoCleanup
ApplicationContext context = embeddedServer.applicationContext
ApplicationContext context

def setupSpec() {
kafkaContainer.start()
embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
"micrometer.metrics.enabled", true,
'endpoints.metrics.sensitive', false,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["fruits"]
)
)
context = embeddedServer.applicationContext
}

void "test pause / resume listener"() {
given:
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ import io.micronaut.messaging.annotation.SendTo
import io.opentracing.mock.MockTracer
import org.apache.kafka.common.serialization.BytesDeserializer
import org.apache.kafka.common.serialization.BytesSerializer
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -47,33 +48,39 @@ class KafkaProducerSpec extends Specification {
@Shared
MockTracer mockTracer = new MockTracer()

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()

@Shared
@AutoCleanup
ApplicationContext context = ApplicationContext.build(
CollectionUtils.mapOf(
'micronaut.application.name', 'test-app',
"kafka.schema.registry.url", "http://localhot:8081",
"kafka.producers.named.key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.named.value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.key-serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.keySerializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.value-serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.valueSerializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.consumers.default.key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.key-deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.keyDeserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.value-deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.valueDeserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [
TOPIC_BLOCKING
]
)
).singletons(mockTracer).start()
ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.build(
CollectionUtils.mapOf(
'micronaut.application.name', 'test-app',
"kafka.schema.registry.url", "http://localhot:8081",
"kafka.producers.named.key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.named.value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.key-serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.keySerializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.value-serializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.producers.default.valueSerializer", "org.apache.kafka.common.serialization.StringSerializer",
"kafka.consumers.default.key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.key-deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.keyDeserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.value-deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.consumers.default.valueDeserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [
TOPIC_BLOCKING
]
)
).singletons(mockTracer).start()
}


def "test customize defaults"() {
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import io.micronaut.core.util.CollectionUtils
import io.reactivex.Flowable
import io.reactivex.Single
import org.apache.kafka.clients.producer.RecordMetadata
import org.testcontainers.containers.KafkaContainer
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import spock.lang.AutoCleanup
@@ -36,15 +37,20 @@ import java.util.concurrent.Future
class KafkaReactiveListenerSpec extends Specification{

public static final String TOPIC_NAME = "KafkaReactiveListenerSpec-books"
@Shared @AutoCleanup ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_NAME]
)
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_NAME]
)

)

)
}

void "test send and return single"() {
given:
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import io.micronaut.messaging.annotation.SendTo
import io.reactivex.Flowable
import io.reactivex.Single
import org.apache.kafka.clients.producer.RecordMetadata
import org.testcontainers.containers.KafkaContainer
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import spock.lang.AutoCleanup
@@ -40,16 +41,20 @@ class KafkaSendToSpec extends Specification {
public static final String TOPIC_MONO = "KafkaSendToSpec-products-mono"
public static final String TOPIC_QUANTITY = "KafkaSendToSpec-quantity"

@Shared @AutoCleanup ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [
TOPIC_SINGLE, TOPIC_QUANTITY, TOPIC_FLOWABLE, TOPIC_FLUX, TOPIC_MONO
]
)
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context

)
def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [
TOPIC_SINGLE, TOPIC_QUANTITY, TOPIC_FLOWABLE, TOPIC_FLUX, TOPIC_MONO
])

)
}

void "test send to another topic - blocking"() {
given:
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration
import io.micronaut.context.ApplicationContext
import io.micronaut.core.util.CollectionUtils
import org.apache.kafka.clients.producer.ProducerRecord
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -30,16 +31,21 @@ class KafkaTimestampSpec extends Specification {

public static final String TOPIC_WORDS = "KafkaTimestampSpec-words"

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared
@AutoCleanup
ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS,
[TOPIC_WORDS]
)
)
ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS,
[TOPIC_WORDS]
)
)
}

def "test client without timestamp"() {
given:
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler
import io.micronaut.context.ApplicationContext
import io.micronaut.core.util.CollectionUtils
import org.apache.kafka.common.errors.SerializationException
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -33,15 +34,20 @@ import java.util.concurrent.ConcurrentHashMap
@Stepwise
class KafkaTypeConversionSpec extends Specification {

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared
@AutoCleanup
ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["uuids"]
)
)
ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["uuids"]
)
)
}

void "test send valid UUID key"() {
when:
@@ -87,7 +93,7 @@ class KafkaTypeConversionSpec extends Specification {
@Inject
KafkaListenerExceptionHandler defaultExceptionHandler

@Topic(patterns = "uuids")
@Topic("uuids")
void receive(@KafkaKey UUID key, String message) {
messages.put(key, message)
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import io.micronaut.core.util.CollectionUtils
import io.micronaut.http.client.DefaultHttpClientConfiguration
import io.micronaut.http.client.RxHttpClient
import io.micronaut.runtime.server.EmbeddedServer
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -32,23 +33,30 @@ class KafkaUniqueGroupIdSpec extends Specification {

static final String TOPIC = "groupid-topic"

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared
@AutoCleanup
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [KafkaUniqueGroupIdSpec.TOPIC]
)
)
EmbeddedServer embeddedServer

@Shared
@AutoCleanup
ApplicationContext context = embeddedServer.applicationContext
ApplicationContext context

@Shared
@AutoCleanup
RxHttpClient httpClient = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL(), new DefaultHttpClientConfiguration(followRedirects: false))
RxHttpClient httpClient

def setupSpec() {
kafkaContainer.start()
embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [KafkaUniqueGroupIdSpec.TOPIC]
)
)
context = embeddedServer.applicationContext
httpClient = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL(), new DefaultHttpClientConfiguration(followRedirects: false))
}

void "test multiple consumers - single group id"() {
given:
Original file line number Diff line number Diff line change
@@ -15,31 +15,33 @@
*/
package io.micronaut.configuration.kafka.docs;

import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration;
import io.micronaut.configuration.kafka.docs.quickstart.ProductClient;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.util.CollectionUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;

public class DocTests {

static ApplicationContext applicationContext;
static KafkaContainer kafkaContainer = new KafkaContainer();

@BeforeClass
public static void setup() {
kafkaContainer.start();
applicationContext = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", "localhost:${random.port}",
AbstractKafkaConfiguration.EMBEDDED, true
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers()
)
);
}

@AfterClass
public static void cleanup() {
applicationContext.stop();
kafkaContainer.stop();
}


Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ import io.micronaut.runtime.server.EmbeddedServer
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -23,15 +24,20 @@ import spock.util.concurrent.PollingConditions
import java.util.concurrent.atomic.AtomicInteger

class KafkaErrorHandlingSpec extends Specification {
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared
@AutoCleanup
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["errors"]
)
)
EmbeddedServer embeddedServer

def setupSpec() {
kafkaContainer.start()
embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["errors"]
)
)
}

void "test an exception that is thrown is not committed"() {
when:"A consumer throws an exception"
Original file line number Diff line number Diff line change
@@ -27,35 +27,44 @@ import io.micronaut.http.client.DefaultHttpClientConfiguration
import io.micronaut.http.client.RxHttpClient
import io.micronaut.messaging.annotation.Header
import io.micronaut.runtime.server.EmbeddedServer
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

class KafkaConsumerMetricsSpec extends Specification {

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared
@AutoCleanup
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
"micrometer.metrics.enabled", true,
'endpoints.metrics.sensitive', false,
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["words-metrics", "words", "books", "words-records", "books-records"]
)
)
EmbeddedServer embeddedServer

@Shared
@AutoCleanup
ApplicationContext context = embeddedServer.applicationContext
ApplicationContext context

@Shared
MeterRegistry meterRegistry = context.getBean(MeterRegistry)
MeterRegistry meterRegistry

@Shared
@AutoCleanup
RxHttpClient httpClient = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL(), new DefaultHttpClientConfiguration(followRedirects: false))
RxHttpClient httpClient

def setupSpec() {
kafkaContainer.start()
embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
"micrometer.metrics.enabled", true,
'endpoints.metrics.sensitive', false,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["words-metrics", "words", "books", "words-records", "books-records"]
)
)
context = embeddedServer.applicationContext
httpClient = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL(), new DefaultHttpClientConfiguration(followRedirects: false))
meterRegistry = context.getBean(MeterRegistry)
}

void "test simple consumer"() {
given:
Original file line number Diff line number Diff line change
@@ -21,17 +21,18 @@ import io.micronaut.core.io.socket.SocketUtils
import io.micronaut.core.util.CollectionUtils
import io.micronaut.health.HealthStatus
import io.micronaut.management.health.indicator.HealthResult
import org.testcontainers.containers.KafkaContainer
import spock.lang.Specification
import spock.lang.Unroll

class KafkaHealthIndicatorSpec extends Specification {

void "test kafka health indicator - UP"() {
given:
KafkaContainer kafkaContainer = new KafkaContainer()
kafkaContainer.start()
ApplicationContext applicationContext = ApplicationContext.run(
Collections.singletonMap(
AbstractKafkaConfiguration.EMBEDDED, true
)
"kafka.bootstrap.servers": kafkaContainer.getBootstrapServers()
)

when:
@@ -45,6 +46,7 @@ class KafkaHealthIndicatorSpec extends Specification {

cleanup:
applicationContext.close()
kafkaContainer.stop()
}

void "test kafka health indicator - DOWN"() {
Original file line number Diff line number Diff line change
@@ -30,32 +30,40 @@ import io.micronaut.messaging.annotation.Header
import io.micronaut.runtime.server.EmbeddedServer
import io.reactivex.Single
import org.apache.kafka.clients.producer.RecordMetadata
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

class KafkaProducerMetricsSpec extends Specification {

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared
@AutoCleanup
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
"micrometer.metrics.enabled", true,
'endpoints.metrics.sensitive', false,
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["words", "books", "words-records", "books-records"]
)
)
EmbeddedServer embeddedServer

@Shared
@AutoCleanup
ApplicationContext context = embeddedServer.applicationContext
ApplicationContext context

@Shared
@AutoCleanup
RxHttpClient httpClient = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL(), new DefaultHttpClientConfiguration(followRedirects: false))
RxHttpClient httpClient

def setupSpec() {
kafkaContainer.start()
embeddedServer = ApplicationContext.run(EmbeddedServer,
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
"micrometer.metrics.enabled", true,
'endpoints.metrics.sensitive', false,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, ["words", "books", "words-records", "books-records"]
)
)
context = embeddedServer.applicationContext
httpClient = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL(), new DefaultHttpClientConfiguration(followRedirects: false))
}

void "test simple producer"() {
given:
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import io.micronaut.core.util.CollectionUtils
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -35,13 +36,17 @@ import javax.inject.Singleton

class AssignToPartitionSpec extends Specification {
public static final String TOPIC_SYNC = "AssignToPartitionSpec-products-sync"
@Shared @AutoCleanup ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context
def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
}
void "test manual offset commit"() {
given:
ProductClient client = context.getBean(ProductClient)
@@ -55,10 +60,9 @@ class AssignToPartitionSpec extends Specification {

then:
conditions.eventually {
listener.products.size() == 2
!listener.products.find() { it.name == "Apple"}
listener.products.find() { it.name == "Orange"}
listener.products.find() { it.name == "Banana"}
listener.products.size() > 0
listener.partitionsAssigned != null
listener.partitionsAssigned.size() > 0
}
}

@@ -75,6 +79,7 @@ class AssignToPartitionSpec extends Specification {
List<Product> products = []
Consumer kafkaConsumer

Collection<TopicPartition> partitionsAssigned
@KafkaListener(
offsetReset = OffsetReset.EARLIEST
)
@@ -91,6 +96,7 @@ class AssignToPartitionSpec extends Specification {

@Override
void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitionsAssigned = partitions
for(tp in partitions) {
kafkaConsumer.seek(tp, 1)
}
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import io.micronaut.context.ApplicationContext
import io.micronaut.core.util.CollectionUtils
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.TopicPartition
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -34,13 +35,20 @@ import javax.inject.Singleton

class BatchManualOffsetCommitSpec extends Specification {
public static final String TOPIC_SYNC = "BatchManualOffsetCommitSpec-products-sync"
@Shared @AutoCleanup ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
}

void "test manual offset commit"() {
given:
ProductClient client = context.getBean(ProductClient)
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration
import io.micronaut.context.ApplicationContext
import io.micronaut.core.util.CollectionUtils
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -33,13 +34,19 @@ import javax.inject.Singleton

class ManualAckSpec extends Specification {
public static final String TOPIC_SYNC = "ManualOffsetCommitSpec-products-sync"
@Shared @AutoCleanup ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
}

void "test manual ack"() {
given:
ProductClient client = context.getBean(ProductClient)
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import io.micronaut.context.ApplicationContext
import io.micronaut.core.util.CollectionUtils
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.TopicPartition
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -34,13 +35,19 @@ import javax.inject.Singleton

class ManualOffsetCommitSpec extends Specification {
public static final String TOPIC_SYNC = "ManualOffsetCommitSpec-products-sync"
@Shared @AutoCleanup ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
}

void "test manual offset commit"() {
given:
ProductClient client = context.getBean(ProductClient)
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration
import io.micronaut.context.ApplicationContext
import io.micronaut.core.util.CollectionUtils
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
@@ -32,13 +33,18 @@ import javax.inject.Singleton

class PerRecordOffsetCommitSpec extends Specification {
public static final String TOPIC_SYNC = "PerRecordOffsetCommitSpec-products-sync"
@Shared @AutoCleanup ApplicationContext context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", 'localhost:${random.port}',
AbstractKafkaConfiguration.EMBEDDED, true,
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup ApplicationContext context

def setupSpec() {
kafkaContainer.start()
context = ApplicationContext.run(
CollectionUtils.mapOf(
"kafka.bootstrap.servers", kafkaContainer.getBootstrapServers(),
AbstractKafkaConfiguration.EMBEDDED_TOPICS, [TOPIC_SYNC]
)
)
}

void "test sync per record"() {
given:

0 comments on commit 2ba27a8

Please sign in to comment.