Skip to content

Commit

Permalink
Fix Deserialize (avro_specific) and improve tests for specific/generic (
Browse files Browse the repository at this point in the history
#849)

1. Fixes avro_specific Deserialize method which did not return the
   right thing (it did not return anything useable).
2. Update tests for (1) - add Deserialize to tests which were only
   testing DeserializeInto
3. Add Deserialize to avro_generic tests as well.
  • Loading branch information
milindl authored Aug 29, 2022
1 parent 2bb8bdf commit 50d6a03
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 7 deletions.
36 changes: 33 additions & 3 deletions schemaregistry/serde/avro/avro_generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,31 @@
package avro

import (
"errors"
"testing"

"github.com/confluentinc/confluent-kafka-go/schemaregistry"
"github.com/confluentinc/confluent-kafka-go/schemaregistry/serde"
"github.com/confluentinc/confluent-kafka-go/schemaregistry/test"
)

func testMessageFactoryGeneric(subject string, name string) (interface{}, error) {
if subject != "topic1-value" {
return nil, errors.New("Message factory only handles topic1.")
}

switch name {
case "GenericDemoSchema":
return &GenericDemoSchema{}, nil
case "GenericLinkedList":
return &GenericLinkedList{}, nil
case "GenericNestedTestRecord":
return &GenericNestedTestRecord{}, nil
}

return nil, errors.New("Schema not found.")
}

func TestGenericAvroSerdeWithSimple(t *testing.T) {
serde.MaybeFail = serde.InitFailFunc(t)
var err error
Expand All @@ -47,10 +65,14 @@ func TestGenericAvroSerdeWithSimple(t *testing.T) {
deser, err := NewGenericDeserializer(client, serde.ValueSerde, NewDeserializerConfig())
serde.MaybeFail("Deserializer configuration", err)
deser.Client = ser.Client
deser.MessageFactory = testMessageFactoryGeneric

var newobj GenericDemoSchema
err = deser.DeserializeInto("topic1", bytes, &newobj)
serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj))
serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj))

msg, err := deser.Deserialize("topic1", bytes)
serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj))
}

func TestGenericAvroSerdeWithNested(t *testing.T) {
Expand Down Expand Up @@ -80,10 +102,14 @@ func TestGenericAvroSerdeWithNested(t *testing.T) {
deser, err := NewGenericDeserializer(client, serde.ValueSerde, NewDeserializerConfig())
serde.MaybeFail("Deserializer configuration", err)
deser.Client = ser.Client
deser.MessageFactory = testMessageFactoryGeneric

var newobj GenericNestedTestRecord
err = deser.DeserializeInto("topic1", bytes, &newobj)
serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj))
serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj))

msg, err := deser.Deserialize("topic1", bytes)
serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj))
}

func TestGenericAvroSerdeWithCycle(t *testing.T) {
Expand Down Expand Up @@ -111,10 +137,14 @@ func TestGenericAvroSerdeWithCycle(t *testing.T) {
deser, err := NewGenericDeserializer(client, serde.ValueSerde, NewDeserializerConfig())
serde.MaybeFail("Deserializer configuration", err)
deser.Client = ser.Client
deser.MessageFactory = testMessageFactoryGeneric

var newobj GenericLinkedList
err = deser.DeserializeInto("topic1", bytes, &newobj)
serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj))
serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj))

msg, err := deser.Deserialize("topic1", bytes)
serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj))
}

type GenericDemoSchema struct {
Expand Down
6 changes: 5 additions & 1 deletion schemaregistry/serde/avro/avro_specific.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ func (s *SpecificDeserializer) Deserialize(topic string, payload []byte) (interf
return nil, err
}
r := bytes.NewReader(payload[5:])
return vm.Eval(r, deser, avroMsg), nil

if err = vm.Eval(r, deser, avroMsg); err != nil {
return nil, err
}
return avroMsg, nil
}

// DeserializeInto implements deserialization of specific Avro data to the given object
Expand Down
36 changes: 33 additions & 3 deletions schemaregistry/serde/avro/avro_specific_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,31 @@
package avro

import (
"errors"
"testing"

"github.com/confluentinc/confluent-kafka-go/schemaregistry"
"github.com/confluentinc/confluent-kafka-go/schemaregistry/serde"
"github.com/confluentinc/confluent-kafka-go/schemaregistry/test"
)

func testMessageFactorySpecific(subject string, name string) (interface{}, error) {
if subject != "topic1-value" {
return nil, errors.New("Message factory only handles topic1.")
}

switch name {
case "DemoSchema":
return &test.DemoSchema{}, nil
case "NestedTestRecord":
return &test.NestedTestRecord{}, nil
case "RecursiveUnionTestRecord":
return &test.RecursiveUnionTestRecord{}, nil
}

return nil, errors.New("Schema not found.")
}

func TestSpecificAvroSerdeWithSimple(t *testing.T) {
serde.MaybeFail = serde.InitFailFunc(t)
var err error
Expand All @@ -47,10 +65,14 @@ func TestSpecificAvroSerdeWithSimple(t *testing.T) {
deser, err := NewSpecificDeserializer(client, serde.ValueSerde, NewDeserializerConfig())
serde.MaybeFail("Deserializer configuration", err)
deser.Client = ser.Client
deser.MessageFactory = testMessageFactorySpecific

var newobj test.DemoSchema
err = deser.DeserializeInto("topic1", bytes, &newobj)
serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj))
serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj))

msg, err := deser.Deserialize("topic1", bytes)
serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj))
}

func TestSpecificAvroSerdeWithNested(t *testing.T) {
Expand Down Expand Up @@ -85,10 +107,14 @@ func TestSpecificAvroSerdeWithNested(t *testing.T) {
deser, err := NewSpecificDeserializer(client, serde.ValueSerde, NewDeserializerConfig())
serde.MaybeFail("Deserializer configuration", err)
deser.Client = ser.Client
deser.MessageFactory = testMessageFactorySpecific

var newobj test.NestedTestRecord
err = deser.DeserializeInto("topic1", bytes, &newobj)
serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj))
serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj))

msg, err := deser.Deserialize("topic1", bytes)
serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj))
}

func TestSpecificAvroSerdeWithCycle(t *testing.T) {
Expand Down Expand Up @@ -118,8 +144,12 @@ func TestSpecificAvroSerdeWithCycle(t *testing.T) {
deser, err := NewSpecificDeserializer(client, serde.ValueSerde, NewDeserializerConfig())
serde.MaybeFail("Deserializer configuration", err)
deser.Client = ser.Client
deser.MessageFactory = testMessageFactorySpecific

var newobj test.RecursiveUnionTestRecord
err = deser.DeserializeInto("topic1", bytes, &newobj)
serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj))
serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj))

msg, err := deser.Deserialize("topic1", bytes)
serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj))
}

0 comments on commit 50d6a03

Please sign in to comment.