diff --git a/compress.go b/compress.go index b752cb8a0..72dc0eea8 100644 --- a/compress.go +++ b/compress.go @@ -5,8 +5,8 @@ import ( "fmt" "sync" - snappy "github.com/eapache/go-xerial-snappy" "github.com/klauspost/compress/gzip" + snappy "github.com/klauspost/compress/snappy/xerial" "github.com/pierrec/lz4/v4" ) @@ -116,7 +116,7 @@ func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { case CompressionGZIP: return gzipCompress(level, data) case CompressionSnappy: - return snappy.Encode(data), nil + return snappy.Encode(nil, data), nil case CompressionLZ4: return lz4Compress(data) case CompressionZSTD: diff --git a/decompress.go b/decompress.go index 0a0998329..f98299687 100644 --- a/decompress.go +++ b/decompress.go @@ -5,8 +5,8 @@ import ( "fmt" "sync" - snappy "github.com/eapache/go-xerial-snappy" "github.com/klauspost/compress/gzip" + snappy "github.com/klauspost/compress/snappy/xerial" "github.com/pierrec/lz4/v4" ) diff --git a/functional_java_interop_test.go b/functional_java_interop_test.go new file mode 100644 index 000000000..af82a808f --- /dev/null +++ b/functional_java_interop_test.go @@ -0,0 +1,224 @@ +//go:build functional + +package sarama + +import ( + "bufio" + "context" + "fmt" + "os/exec" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const ( + brokerContainer = "kafka-1" + brokerAddr = "kafka-1:9091" +) + +// verifyMessages verifies that consumed messages match expected messages. +func verifyMessages(t *testing.T, expectedMessages, consumedMessages []string) { + if len(consumedMessages) != len(expectedMessages) { + t.Fatalf("Expected %d messages, got %d", len(expectedMessages), len(consumedMessages)) + } + + for i, expected := range expectedMessages { + if i >= len(consumedMessages) { + t.Fatalf("Missing message at index %d", i) + } + consumed := strings.TrimRight(consumedMessages[i], "\n\r") + if consumed != expected { + t.Errorf("Message %d mismatch:\n Expected: %q\n Got: %q", i+1, expected, consumed) + } + } +} + +// TestJavaProducerSnappyRoundTrip tests that messages produced by Kafka's Java +// console producer with snappy compression can be correctly consumed and decompressed +// by Sarama. This verifies compatibility between Java's snappy implementation and +// klauspost/compress/snappy/xerial. +func TestJavaProducerSnappyRoundTrip(t *testing.T) { + checkKafkaVersion(t, "2.0.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + topic := "test.1" + + expectedMessages := []string{ + "Hello from Java producer with snappy compression!", + "This is message number two.", + "Testing snappy compression round-trip.", + "Message four: verifying decompression works correctly.", + "Final message: Java producer -> Sarama consumer.", + } + + producerPath := fmt.Sprintf("/opt/kafka-%s/bin/kafka-console-producer.sh", FunctionalTestEnv.KafkaVersion) + cmd := exec.Command("docker", "compose", "exec", "-T", brokerContainer, + producerPath, + "--bootstrap-server", brokerAddr, + "--topic", topic, + "--compression-codec", "snappy", + ) + + stdin, err := cmd.StdinPipe() + require.NoError(t, err, "Failed to get stdin pipe") + + err = cmd.Start() + require.NoError(t, err, "Failed to start producer command") + + for _, msg := range expectedMessages { + _, err := fmt.Fprintln(stdin, msg) + if err != nil { + stdin.Close() + _ = cmd.Wait() + require.NoError(t, err, "Failed to write message") + } + } + stdin.Close() + + err = cmd.Wait() + require.NoError(t, err, "Producer command failed") + + t.Logf("Produced %d messages via Java console producer with snappy compression", len(expectedMessages)) + + config := NewFunctionalTestConfig() + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) + require.NoError(t, err, "Failed to create consumer") + defer consumer.Close() + + partitionConsumer, err := consumer.ConsumePartition(topic, 0, OffsetOldest) + require.NoError(t, err, "Failed to create partition consumer") + defer partitionConsumer.Close() + + var consumedMessages []string + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + for i := 0; i < len(expectedMessages); i++ { + select { + case msg := <-partitionConsumer.Messages(): + if msg == nil { + t.Fatal("Received nil message") + } + consumedMessages = append(consumedMessages, string(msg.Value)) + t.Logf("Consumed message %d: offset=%d, value=%s", i+1, msg.Offset, string(msg.Value)) + + case err := <-partitionConsumer.Errors(): + require.NoError(t, err, "Consumer error") + + case <-ctx.Done(): + t.Fatalf("Timeout waiting for message %d", i+1) + } + } + + verifyMessages(t, expectedMessages, consumedMessages) + t.Logf("Successfully verified round-trip: Java producer (snappy) -> Sarama consumer") +} + +// TestJavaConsumerSnappyRoundTrip tests that messages produced by Sarama +// with snappy compression can be correctly consumed and decompressed by Kafka's Java +// console consumer. This verifies compatibility between klauspost/compress/snappy/xerial +// and Java's snappy implementation. +func TestJavaConsumerSnappyRoundTrip(t *testing.T) { + checkKafkaVersion(t, "2.0.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + topic := "test.1" + + expectedMessages := []string{ + "Hello from Sarama producer with snappy compression!", + "This is message number two from Sarama.", + "Testing snappy compression compatibility with Java consumer.", + "Message four: verifying Sarama -> Java round-trip works correctly.", + "Final message: Sarama producer -> Java consumer with snappy!", + } + + producerConfig := NewFunctionalTestConfig() + producerConfig.Producer.Compression = CompressionSnappy + producerConfig.Producer.Return.Successes = true + producerConfig.Producer.RequiredAcks = WaitForAll + + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, producerConfig) + require.NoError(t, err, "Failed to create producer") + defer producer.Close() + + for i, msgText := range expectedMessages { + message := &ProducerMessage{ + Topic: topic, + Key: StringEncoder(fmt.Sprintf("key-%d", i+1)), + Value: StringEncoder(msgText), + } + + partition, offset, err := producer.SendMessage(message) + require.NoError(t, err, "Failed to send message %d", i+1) + t.Logf("Produced message %d: partition=%d, offset=%d, value=%s", i+1, partition, offset, msgText) + } + + t.Logf("Produced %d messages via Sarama with snappy compression", len(expectedMessages)) + + consumerPath := fmt.Sprintf("/opt/kafka-%s/bin/kafka-console-consumer.sh", FunctionalTestEnv.KafkaVersion) + cmd := exec.Command("docker", "compose", "exec", "-T", brokerContainer, + consumerPath, + "--bootstrap-server", brokerAddr, + "--topic", topic, + "--from-beginning", + "--max-messages", fmt.Sprintf("%d", len(expectedMessages)), + ) + + stdout, err := cmd.StdoutPipe() + require.NoError(t, err, "Failed to get stdout pipe") + + stderr, err := cmd.StderrPipe() + require.NoError(t, err, "Failed to get stderr pipe") + + err = cmd.Start() + require.NoError(t, err, "Failed to start consumer command") + + var consumedMessages []string + scanner := bufio.NewScanner(stdout) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + done := make(chan bool) + go func() { + for scanner.Scan() { + line := strings.TrimRight(scanner.Text(), "\n\r") + if line != "" { + consumedMessages = append(consumedMessages, line) + t.Logf("Consumed message %d: %s", len(consumedMessages), line) + } + if len(consumedMessages) >= len(expectedMessages) { + break + } + } + done <- true + }() + + var stderrOutput strings.Builder + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + stderrOutput.WriteString(scanner.Text() + "\n") + } + }() + + select { + case <-done: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for messages to be consumed") + } + + if err := cmd.Wait(); err != nil { + if len(consumedMessages) < len(expectedMessages) { + t.Logf("Consumer stderr: %s", stderrOutput.String()) + t.Fatalf("Consumer command failed: %v (consumed %d/%d messages)", err, len(consumedMessages), len(expectedMessages)) + } + } + + verifyMessages(t, expectedMessages, consumedMessages) + t.Logf("Successfully verified round-trip: Sarama producer (snappy) -> Java consumer") +} diff --git a/go.mod b/go.mod index 710a38c6c..2986e3ae7 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.24.0 require ( github.com/davecgh/go-spew v1.1.1 github.com/eapache/go-resiliency v1.7.0 - github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 github.com/eapache/queue v1.1.0 github.com/fortytw2/leaktest v1.3.0 github.com/jcmturner/gofork v1.7.6 @@ -19,7 +18,6 @@ require ( ) require ( - github.com/golang/snappy v1.0.0 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 8f526947d..f91d83d3a 100644 --- a/go.sum +++ b/go.sum @@ -4,14 +4,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= -github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= -github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= -github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= diff --git a/record_test.go b/record_test.go index 02c594d1e..40ce9ad2f 100644 --- a/record_test.go +++ b/record_test.go @@ -179,10 +179,10 @@ func recordBatchTestCases() []struct { }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset - 0, 0, 0, 72, // Length + 0, 0, 0, 92, // Length 0, 0, 0, 0, // Partition Leader Epoch - 2, // Version - 21, 0, 159, 97, // CRC + 2, // Version + 133, 100, 178, 36, // CRC 0, 2, // Attributes 0, 0, 0, 0, // Last Offset Delta 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp @@ -191,7 +191,12 @@ func recordBatchTestCases() []struct { 0, 0, // Producer Epoch 0, 0, 0, 0, // First Sequence 0, 0, 0, 1, // Number of Records - 21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12, + // Xerial framing header + compressed data + 130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic + 0, 0, 0, 1, // min version + 0, 0, 0, 1, // default version + 0, 0, 0, 23, // compressed length + 21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12, // compressed data }, }, {