diff --git a/functional_java_interop_test.go b/functional_java_interop_test.go index af82a808f..51df49b13 100644 --- a/functional_java_interop_test.go +++ b/functional_java_interop_test.go @@ -8,6 +8,7 @@ import ( "fmt" "os/exec" "strings" + "sync" "testing" "time" @@ -17,208 +18,268 @@ import ( const ( brokerContainer = "kafka-1" brokerAddr = "kafka-1:9091" + zookeeperAddr = "zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181" ) -// verifyMessages verifies that consumed messages match expected messages. -func verifyMessages(t *testing.T, expectedMessages, consumedMessages []string) { - if len(consumedMessages) != len(expectedMessages) { - t.Fatalf("Expected %d messages, got %d", len(expectedMessages), len(consumedMessages)) - } - - for i, expected := range expectedMessages { - if i >= len(consumedMessages) { - t.Fatalf("Missing message at index %d", i) - } - consumed := strings.TrimRight(consumedMessages[i], "\n\r") - if consumed != expected { - t.Errorf("Message %d mismatch:\n Expected: %q\n Got: %q", i+1, expected, consumed) - } - } +var compressionTests = []struct { + codec CompressionCodec + minKafka string +}{ + {CompressionNone, "0.8.0"}, + {CompressionGZIP, "0.8.0"}, + {CompressionSnappy, "0.8.0"}, + {CompressionLZ4, "0.10.0"}, + {CompressionZSTD, "2.1.0"}, } -// TestJavaProducerSnappyRoundTrip tests that messages produced by Kafka's Java -// console producer with snappy compression can be correctly consumed and decompressed -// by Sarama. This verifies compatibility between Java's snappy implementation and -// klauspost/compress/snappy/xerial. -func TestJavaProducerSnappyRoundTrip(t *testing.T) { - checkKafkaVersion(t, "2.0.0") - setupFunctionalTest(t) - defer teardownFunctionalTest(t) - - topic := "test.1" - - expectedMessages := []string{ - "Hello from Java producer with snappy compression!", - "This is message number two.", - "Testing snappy compression round-trip.", - "Message four: verifying decompression works correctly.", - "Final message: Java producer -> Sarama consumer.", - } - +func produceWithJava(t *testing.T, topic string, codec CompressionCodec, messages []string) { + t.Helper() producerPath := fmt.Sprintf("/opt/kafka-%s/bin/kafka-console-producer.sh", FunctionalTestEnv.KafkaVersion) - cmd := exec.Command("docker", "compose", "exec", "-T", brokerContainer, - producerPath, - "--bootstrap-server", brokerAddr, - "--topic", topic, - "--compression-codec", "snappy", + args := append( + []string{"compose", "exec", "-T", brokerContainer, producerPath}, + javaProducerArgs(topic, codec)..., ) + cmd := exec.Command("docker", args...) stdin, err := cmd.StdinPipe() - require.NoError(t, err, "Failed to get stdin pipe") + require.NoError(t, err) - err = cmd.Start() - require.NoError(t, err, "Failed to start producer command") + require.NoError(t, cmd.Start()) - for _, msg := range expectedMessages { + for _, msg := range messages { _, err := fmt.Fprintln(stdin, msg) if err != nil { stdin.Close() - _ = cmd.Wait() - require.NoError(t, err, "Failed to write message") + waitErr := cmd.Wait() + if waitErr != nil { + err = fmt.Errorf("failed to write message: %w; Java producer failed: %w", err, waitErr) + } } + require.NoError(t, err) } stdin.Close() - err = cmd.Wait() - require.NoError(t, err, "Producer command failed") - - t.Logf("Produced %d messages via Java console producer with snappy compression", len(expectedMessages)) + require.NoError(t, cmd.Wait(), "Java producer failed") +} +func consumeWithSarama(t *testing.T, topic string, startOffset int64, count int) []string { + t.Helper() config := NewFunctionalTestConfig() consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) - require.NoError(t, err, "Failed to create consumer") + require.NoError(t, err) defer consumer.Close() - partitionConsumer, err := consumer.ConsumePartition(topic, 0, OffsetOldest) - require.NoError(t, err, "Failed to create partition consumer") + partitionConsumer, err := consumer.ConsumePartition(topic, 0, startOffset) + require.NoError(t, err) defer partitionConsumer.Close() - var consumedMessages []string + var messages []string ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - for i := 0; i < len(expectedMessages); i++ { + for i := 0; i < count; i++ { select { case msg := <-partitionConsumer.Messages(): - if msg == nil { - t.Fatal("Received nil message") - } - consumedMessages = append(consumedMessages, string(msg.Value)) - t.Logf("Consumed message %d: offset=%d, value=%s", i+1, msg.Offset, string(msg.Value)) - + require.NotNil(t, msg) + messages = append(messages, string(msg.Value)) case err := <-partitionConsumer.Errors(): - require.NoError(t, err, "Consumer error") - + require.NoError(t, err) case <-ctx.Done(): - t.Fatalf("Timeout waiting for message %d", i+1) + require.Fail(t, "timeout waiting for messages") } } - - verifyMessages(t, expectedMessages, consumedMessages) - t.Logf("Successfully verified round-trip: Java producer (snappy) -> Sarama consumer") + return messages } -// TestJavaConsumerSnappyRoundTrip tests that messages produced by Sarama -// with snappy compression can be correctly consumed and decompressed by Kafka's Java -// console consumer. This verifies compatibility between klauspost/compress/snappy/xerial -// and Java's snappy implementation. -func TestJavaConsumerSnappyRoundTrip(t *testing.T) { - checkKafkaVersion(t, "2.0.0") - setupFunctionalTest(t) - defer teardownFunctionalTest(t) - - topic := "test.1" - - expectedMessages := []string{ - "Hello from Sarama producer with snappy compression!", - "This is message number two from Sarama.", - "Testing snappy compression compatibility with Java consumer.", - "Message four: verifying Sarama -> Java round-trip works correctly.", - "Final message: Sarama producer -> Java consumer with snappy!", - } - - producerConfig := NewFunctionalTestConfig() - producerConfig.Producer.Compression = CompressionSnappy - producerConfig.Producer.Return.Successes = true - producerConfig.Producer.RequiredAcks = WaitForAll +func produceWithSarama(t *testing.T, topic string, codec CompressionCodec, messages []string) { + t.Helper() + config := NewFunctionalTestConfig() + config.Producer.Compression = codec + config.Producer.Return.Successes = true + config.Producer.RequiredAcks = WaitForAll - producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, producerConfig) - require.NoError(t, err, "Failed to create producer") + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) + require.NoError(t, err) defer producer.Close() - for i, msgText := range expectedMessages { - message := &ProducerMessage{ + for _, msgText := range messages { + _, _, err := producer.SendMessage(&ProducerMessage{ Topic: topic, - Key: StringEncoder(fmt.Sprintf("key-%d", i+1)), Value: StringEncoder(msgText), - } - - partition, offset, err := producer.SendMessage(message) - require.NoError(t, err, "Failed to send message %d", i+1) - t.Logf("Produced message %d: partition=%d, offset=%d, value=%s", i+1, partition, offset, msgText) + }) + require.NoError(t, err) } +} - t.Logf("Produced %d messages via Sarama with snappy compression", len(expectedMessages)) - +func consumeWithJava(t *testing.T, topic string, startOffset int64, count int) []string { + t.Helper() consumerPath := fmt.Sprintf("/opt/kafka-%s/bin/kafka-console-consumer.sh", FunctionalTestEnv.KafkaVersion) - cmd := exec.Command("docker", "compose", "exec", "-T", brokerContainer, - consumerPath, - "--bootstrap-server", brokerAddr, - "--topic", topic, - "--from-beginning", - "--max-messages", fmt.Sprintf("%d", len(expectedMessages)), + args := append( + []string{"compose", "exec", "-T", brokerContainer, consumerPath}, + javaConsumerArgs(topic, startOffset, count)..., ) + cmd := exec.Command("docker", args...) stdout, err := cmd.StdoutPipe() - require.NoError(t, err, "Failed to get stdout pipe") + require.NoError(t, err) stderr, err := cmd.StderrPipe() - require.NoError(t, err, "Failed to get stderr pipe") + require.NoError(t, err) - err = cmd.Start() - require.NoError(t, err, "Failed to start consumer command") + require.NoError(t, cmd.Start()) - var consumedMessages []string + var messages []string scanner := bufio.NewScanner(stdout) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - done := make(chan bool) + done := make(chan struct{}) + stdoutErrCh := make(chan error, 1) + stderrErrCh := make(chan error, 1) + var wg sync.WaitGroup + wg.Add(2) go func() { + defer wg.Done() for scanner.Scan() { - line := strings.TrimRight(scanner.Text(), "\n\r") - if line != "" { - consumedMessages = append(consumedMessages, line) - t.Logf("Consumed message %d: %s", len(consumedMessages), line) + if line := strings.TrimSpace(scanner.Text()); line != "" { + messages = append(messages, line) } - if len(consumedMessages) >= len(expectedMessages) { + if len(messages) >= count { break } } - done <- true + stdoutErrCh <- scanner.Err() + close(done) }() var stderrOutput strings.Builder go func() { - scanner := bufio.NewScanner(stderr) - for scanner.Scan() { - stderrOutput.WriteString(scanner.Text() + "\n") + defer wg.Done() + s := bufio.NewScanner(stderr) + for s.Scan() { + stderrOutput.WriteString(s.Text() + "\n") } + stderrErrCh <- s.Err() }() select { case <-done: case <-ctx.Done(): - t.Fatalf("Timeout waiting for messages to be consumed") + require.Fail(t, "timeout waiting for Java consumer") + _ = cmd.Process.Kill() } - if err := cmd.Wait(); err != nil { - if len(consumedMessages) < len(expectedMessages) { - t.Logf("Consumer stderr: %s", stderrOutput.String()) - t.Fatalf("Consumer command failed: %v (consumed %d/%d messages)", err, len(consumedMessages), len(expectedMessages)) - } + if err := cmd.Wait(); err != nil && len(messages) < count { + t.Logf("stderr: %s", stderrOutput.String()) + require.NoError(t, err, "Java consumer failed") + } + wg.Wait() + require.NoError(t, <-stdoutErrCh) + require.NoError(t, <-stderrErrCh) + return messages +} + +func endOffsetForPartition(t *testing.T, topic string, partition int32) int64 { + t.Helper() + config := NewFunctionalTestConfig() + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + require.NoError(t, err) + defer safeClose(t, client) + + offset, err := client.GetOffset(topic, partition, OffsetNewest) + require.NoError(t, err) + return offset +} + +// TestJavaProducerCompressionRoundTrip tests that messages produced by Kafka's Java +// console producer with various compression codecs can be correctly consumed and +// decompressed by Sarama. +func TestJavaProducerCompressionRoundTrip(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + for _, tc := range compressionTests { + t.Run(tc.codec.String(), func(t *testing.T) { + checkKafkaVersion(t, tc.minKafka) + + expected := []string{ + fmt.Sprintf("Message 1 with %s compression", tc.codec), + "Message 2", + "Message 3", + } + + initialOffset := endOffsetForPartition(t, "test.1", 0) + produceWithJava(t, "test.1", tc.codec, expected) + actual := consumeWithSarama(t, "test.1", initialOffset, len(expected)) + + require.Equal(t, expected, actual) + }) } +} - verifyMessages(t, expectedMessages, consumedMessages) - t.Logf("Successfully verified round-trip: Sarama producer (snappy) -> Java consumer") +// TestJavaConsumerCompressionRoundTrip tests that messages produced by Sarama +// with various compression codecs can be correctly consumed and decompressed +// by Kafka's Java console consumer. +func TestJavaConsumerCompressionRoundTrip(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + for _, tc := range compressionTests { + t.Run(tc.codec.String(), func(t *testing.T) { + checkKafkaVersion(t, tc.minKafka) + + expected := []string{ + fmt.Sprintf("Message 1 with %s compression", tc.codec), + "Message 2", + "Message 3", + } + + initialOffset := endOffsetForPartition(t, "test.1", 0) + produceWithSarama(t, "test.1", tc.codec, expected) + actual := consumeWithJava(t, "test.1", initialOffset, len(expected)) + + require.Equal(t, expected, actual) + }) + } +} + +func kafkaVersionAtLeast(requiredVersion string) bool { + kafkaVersion := FunctionalTestEnv.KafkaVersion + if kafkaVersion == "" { + return false + } + return parseKafkaVersion(kafkaVersion).satisfies(parseKafkaVersion(requiredVersion)) +} + +func javaProducerArgs(topic string, codec CompressionCodec) []string { + args := make([]string, 0, 8) + if kafkaVersionAtLeast("2.0.0") { + args = append(args, "--bootstrap-server", brokerAddr) + } else { + args = append(args, "--broker-list", brokerAddr) + } + args = append(args, "--topic", topic) + return append(args, javaProducerCompressionArgs(codec)...) +} + +func javaProducerCompressionArgs(codec CompressionCodec) []string { + if kafkaVersionAtLeast("0.10.0") { + return []string{"--producer-property", fmt.Sprintf("compression.type=%s", codec.String())} + } + return []string{"--compression-codec", codec.String()} +} + +func javaConsumerArgs(topic string, startOffset int64, count int) []string { + args := make([]string, 0, 12) + if kafkaVersionAtLeast("0.10.0") { + args = append(args, "--bootstrap-server", brokerAddr) + } else { + args = append(args, "--zookeeper", zookeeperAddr) + } + return append(args, + "--topic", topic, + "--partition", "0", + "--offset", fmt.Sprint(startOffset), + "--max-messages", fmt.Sprint(count), + ) }