Important
Kafka for JUnit will not be updated regularly any longer. Unfortunately, I do not have the time to allocate my attention to this project in a sustainable fashion. Updates and bugfixes may still happen (support for new versions, ...), but if they do, it will be solely based on my personal requirements. Thank you for your understanding.
Kafka for JUnit enables developers to start and stop a complete Kafka cluster comprised of Kafka brokers and distributed Kafka Connect workers from within a JUnit test. It also provides a rich set of convenient accessors to interact with such an embedded Kafka cluster in a lean and non-obtrusive way.
Kafka for JUnit can be used to both whitebox-test individual Kafka-based components of your application or to blackbox-test applications that offer an incoming and/or outgoing Kafka-based interface.
Kafka for JUnit provides the necessary infrastructure to exercise your Kafka-based components against an embeddable Kafka cluster. However, Kafka for JUnit got you covered as well if you are simply interested in using the convenient accessors against Kafka clusters that are already present in your infrastructure. Checkout sections Working with an embedded Kafka cluster and Working with an external Kafka cluster in the user's guide for more information.
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
class KafkaTest {
private EmbeddedKafkaCluster kafka;
@BeforeEach
void setupKafka() {
kafka = provisionWith(defaultClusterConfig());
kafka.start();
}
@AfterEach
void tearDownKafka() {
kafka.stop();
}
@Test
void shouldWaitForRecordsToBePublished() throws Exception {
kafka.send(to("test-topic", "a", "b", "c"));
kafka.observe(on("test-topic", 3));
}
}
This starts an embedded Kafka cluster and submits three records to the topic named test-topic
. The call to kafka.observe(on("test-topic", 3))
watches that same topic for a configurable amount of time and checks if it observes the previously submitted records. If it doesn't, Kafka for JUnit raises an AssertionError
which would fail the test. Surely, Kafka for JUnit provides lots of more ways to interact with a Kafka cluster.
Since EmbeddedKafkaCluster
implements the AutoCloseable
interface, you can achieve the same behavior using a try-with-resources
-construct.
import org.junit.jupiter.api.Test;
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
class KafkaTest {
@Test
void shouldWaitForRecordsToBePublished() throws Exception {
try (EmbeddedKafkaCluster kafka = provisionWith(defaultClusterConfig())) {
kafka.start();
kafka.send(to("test-topic", "a", "b", "c"));
kafka.observe(on("test-topic", 3));
}
}
}
Version of Kafka for JUnit | Supports (up to) |
---|---|
3.6.0 | Apache Kafka 3.6.1 |
3.5.1 | Apache Kafka 3.5.1 |
3.4.0 | Apache Kafka 3.4.0 |
3.3.0 | Apache Kafka 3.3.1 |
3.2.2 | Apache Kafka 3.2.3 |
3.1.1 | Apache Kafka 3.1.0 |
3.0.1 | Apache Kafka 3.0.0 |
2.8.0 | Apache Kafka 2.8.0 |
2.7.0 | Apache Kafka 2.7.0 |
2.6.0 | Apache Kafka 2.6.0 |
2.5.1 | Apache Kafka 2.5.1 |
2.4.0 | Apache Kafka 2.4.0 |
2.3.0 | Apache Kafka 2.3.0 |
2.2.0 | Apache Kafka 2.2.1 |
2.1.1 | Apache Kafka 2.1.1 |
2.0.0 | Apache Kafka 2.0.0 |
1.0.0 | Apache Kafka 1.1.1 |
See the comprehensive user's guide for examples on how to interact with the Kafka cluster from within your JUnit test.
This work is released under the terms of the Apache 2.0 license.