Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"sync"

snappy "github.com/eapache/go-xerial-snappy"
"github.com/klauspost/compress/gzip"
snappy "github.com/klauspost/compress/snappy/xerial"
"github.com/pierrec/lz4/v4"
)

Expand Down Expand Up @@ -116,7 +116,7 @@ func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
case CompressionGZIP:
return gzipCompress(level, data)
case CompressionSnappy:
return snappy.Encode(data), nil
return snappy.Encode(nil, data), nil
case CompressionLZ4:
return lz4Compress(data)
case CompressionZSTD:
Expand Down
2 changes: 1 addition & 1 deletion decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"sync"

snappy "github.com/eapache/go-xerial-snappy"
"github.com/klauspost/compress/gzip"
snappy "github.com/klauspost/compress/snappy/xerial"
"github.com/pierrec/lz4/v4"
)

Expand Down
224 changes: 224 additions & 0 deletions functional_java_interop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
//go:build functional

package sarama

import (
"bufio"
"context"
"fmt"
"os/exec"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)

const (
brokerContainer = "kafka-1"
brokerAddr = "kafka-1:9091"
)

// verifyMessages verifies that consumed messages match expected messages.
func verifyMessages(t *testing.T, expectedMessages, consumedMessages []string) {
if len(consumedMessages) != len(expectedMessages) {
t.Fatalf("Expected %d messages, got %d", len(expectedMessages), len(consumedMessages))
}

for i, expected := range expectedMessages {
if i >= len(consumedMessages) {
t.Fatalf("Missing message at index %d", i)
}
consumed := strings.TrimRight(consumedMessages[i], "\n\r")
if consumed != expected {
t.Errorf("Message %d mismatch:\n Expected: %q\n Got: %q", i+1, expected, consumed)
}
}
}

// TestJavaProducerSnappyRoundTrip tests that messages produced by Kafka's Java
// console producer with snappy compression can be correctly consumed and decompressed
// by Sarama. This verifies compatibility between Java's snappy implementation and
// klauspost/compress/snappy/xerial.
func TestJavaProducerSnappyRoundTrip(t *testing.T) {
checkKafkaVersion(t, "2.0.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

topic := "test.1"

expectedMessages := []string{
"Hello from Java producer with snappy compression!",
"This is message number two.",
"Testing snappy compression round-trip.",
"Message four: verifying decompression works correctly.",
"Final message: Java producer -> Sarama consumer.",
}

producerPath := fmt.Sprintf("/opt/kafka-%s/bin/kafka-console-producer.sh", FunctionalTestEnv.KafkaVersion)
cmd := exec.Command("docker", "compose", "exec", "-T", brokerContainer,
producerPath,
"--bootstrap-server", brokerAddr,
"--topic", topic,
"--compression-codec", "snappy",
)

stdin, err := cmd.StdinPipe()
require.NoError(t, err, "Failed to get stdin pipe")

err = cmd.Start()
require.NoError(t, err, "Failed to start producer command")

for _, msg := range expectedMessages {
_, err := fmt.Fprintln(stdin, msg)
if err != nil {
stdin.Close()
_ = cmd.Wait()
require.NoError(t, err, "Failed to write message")
}
}
stdin.Close()

err = cmd.Wait()
require.NoError(t, err, "Producer command failed")

t.Logf("Produced %d messages via Java console producer with snappy compression", len(expectedMessages))

config := NewFunctionalTestConfig()
consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err, "Failed to create consumer")
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition(topic, 0, OffsetOldest)
require.NoError(t, err, "Failed to create partition consumer")
defer partitionConsumer.Close()

var consumedMessages []string
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

for i := 0; i < len(expectedMessages); i++ {
select {
case msg := <-partitionConsumer.Messages():
if msg == nil {
t.Fatal("Received nil message")
}
consumedMessages = append(consumedMessages, string(msg.Value))
t.Logf("Consumed message %d: offset=%d, value=%s", i+1, msg.Offset, string(msg.Value))

case err := <-partitionConsumer.Errors():
require.NoError(t, err, "Consumer error")

case <-ctx.Done():
t.Fatalf("Timeout waiting for message %d", i+1)
}
}

verifyMessages(t, expectedMessages, consumedMessages)
t.Logf("Successfully verified round-trip: Java producer (snappy) -> Sarama consumer")
}

// TestJavaConsumerSnappyRoundTrip tests that messages produced by Sarama
// with snappy compression can be correctly consumed and decompressed by Kafka's Java
// console consumer. This verifies compatibility between klauspost/compress/snappy/xerial
// and Java's snappy implementation.
func TestJavaConsumerSnappyRoundTrip(t *testing.T) {
checkKafkaVersion(t, "2.0.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

topic := "test.1"

expectedMessages := []string{
"Hello from Sarama producer with snappy compression!",
"This is message number two from Sarama.",
"Testing snappy compression compatibility with Java consumer.",
"Message four: verifying Sarama -> Java round-trip works correctly.",
"Final message: Sarama producer -> Java consumer with snappy!",
}

producerConfig := NewFunctionalTestConfig()
producerConfig.Producer.Compression = CompressionSnappy
producerConfig.Producer.Return.Successes = true
producerConfig.Producer.RequiredAcks = WaitForAll

producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, producerConfig)
require.NoError(t, err, "Failed to create producer")
defer producer.Close()

for i, msgText := range expectedMessages {
message := &ProducerMessage{
Topic: topic,
Key: StringEncoder(fmt.Sprintf("key-%d", i+1)),
Value: StringEncoder(msgText),
}

partition, offset, err := producer.SendMessage(message)
require.NoError(t, err, "Failed to send message %d", i+1)
t.Logf("Produced message %d: partition=%d, offset=%d, value=%s", i+1, partition, offset, msgText)
}

t.Logf("Produced %d messages via Sarama with snappy compression", len(expectedMessages))

consumerPath := fmt.Sprintf("/opt/kafka-%s/bin/kafka-console-consumer.sh", FunctionalTestEnv.KafkaVersion)
cmd := exec.Command("docker", "compose", "exec", "-T", brokerContainer,
consumerPath,
"--bootstrap-server", brokerAddr,
"--topic", topic,
"--from-beginning",
"--max-messages", fmt.Sprintf("%d", len(expectedMessages)),
)

stdout, err := cmd.StdoutPipe()
require.NoError(t, err, "Failed to get stdout pipe")

stderr, err := cmd.StderrPipe()
require.NoError(t, err, "Failed to get stderr pipe")

err = cmd.Start()
require.NoError(t, err, "Failed to start consumer command")

var consumedMessages []string
scanner := bufio.NewScanner(stdout)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

done := make(chan bool)
go func() {
for scanner.Scan() {
line := strings.TrimRight(scanner.Text(), "\n\r")
if line != "" {
consumedMessages = append(consumedMessages, line)
t.Logf("Consumed message %d: %s", len(consumedMessages), line)
}
if len(consumedMessages) >= len(expectedMessages) {
break
}
}
done <- true
}()

var stderrOutput strings.Builder
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
stderrOutput.WriteString(scanner.Text() + "\n")
}
}()

select {
case <-done:
case <-ctx.Done():
t.Fatalf("Timeout waiting for messages to be consumed")
}

if err := cmd.Wait(); err != nil {
if len(consumedMessages) < len(expectedMessages) {
t.Logf("Consumer stderr: %s", stderrOutput.String())
t.Fatalf("Consumer command failed: %v (consumed %d/%d messages)", err, len(consumedMessages), len(expectedMessages))
}
}

verifyMessages(t, expectedMessages, consumedMessages)
t.Logf("Successfully verified round-trip: Sarama producer (snappy) -> Java consumer")
}
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.24.0
require (
github.com/davecgh/go-spew v1.1.1
github.com/eapache/go-resiliency v1.7.0
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3
github.com/eapache/queue v1.1.0
github.com/fortytw2/leaktest v1.3.0
github.com/jcmturner/gofork v1.7.6
Expand All @@ -19,7 +18,6 @@ require (
)

require (
github.com/golang/snappy v1.0.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
Expand Down
13 changes: 9 additions & 4 deletions record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ func recordBatchTestCases() []struct {
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 72, // Length
0, 0, 0, 92, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
21, 0, 159, 97, // CRC
2, // Version
133, 100, 178, 36, // CRC
0, 2, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
Expand All @@ -191,7 +191,12 @@ func recordBatchTestCases() []struct {
0, 0, // Producer Epoch
0, 0, 0, 0, // First Sequence
0, 0, 0, 1, // Number of Records
21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
// Xerial framing header + compressed data
130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
0, 0, 0, 1, // min version
0, 0, 0, 1, // default version
0, 0, 0, 23, // compressed length
21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12, // compressed data
},
},
{
Expand Down
Loading