Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The ID in a message is always -1 when UseSchemaID in the serde.Serializer is set. #848

Closed
6 of 7 tasks
falau opened this issue Aug 25, 2022 · 1 comment
Closed
6 of 7 tasks

Comments

@falau
Copy link

falau commented Aug 25, 2022

Description

From here:

type SerializerConfig struct {
// AutoRegisterSchemas determines whether to automatically register schemas during serialization
AutoRegisterSchemas bool
// UseSchemaID specifies a schema ID to use during serialization
UseSchemaID int
// UseLatestVersion specifies whether to use the latest schema version during serialization
UseLatestVersion bool
// NormalizeSchemas determines whether to normalize schemas during serialization
NormalizeSchemas bool
}

I expect that the behaviour of UseSchemaID is to tell a Serializer to use that ID regardless what's on a Schema Registry.

When invoking serde.Serialize()(I'm using the specific avro one), the embedded BaseSerializer.GetID would be used to get the schema ID of a message

id, err := s.GetID(topic, avroMsg, info)

However, the condition for UseSchemaID is like the snippet below. There is no line actually assigning anything from either the query results or the UseSchemaID to the id variable in the same method:

}
} else if useSchemaID >= 0 {
info, err = s.Client.GetBySubjectAndID(subject, useSchemaID)
if err != nil {
return -1, err
}
_, err := s.Client.GetID(subject, info, false)
if err != nil {
return -1, err
}
} else if useLatest {
metadata, err := s.Client.GetLatestSchemaMetadata(subject)

Therefore, the GetID method is always like this when UseSchemaID >= 0:

	var id = -1
	...
	if elses... {
        }
        ...
	return id, nil

How to reproduce

With the specific avro serializer example, the issue can be reproduced via setting UseSchemaID to whatever >= 0:

        url := "https://myschemaregistry.example.com"
	client, err := schemaregistry.NewClient(schemaregistry.NewConfig(url))

	if err != nil {
		fmt.Printf("Failed to create schema registry client: %s\n", err)
		os.Exit(1)
	}
	serdeCfg := avro.NewSerializerConfig()
	serdeCfg.AutoRegisterSchemas = false
	serdeCfg.UseLatestVersion = false
	serdeCfg.UseSchemaID = 1234
	ser, err := avro.NewSpecificSerializer(client, serde.ValueSerde, serdeCfg)

	if err != nil {
		fmt.Printf("Failed to create serializer: %s\n", err)
		os.Exit(1)
	}
	var value MyStruct;
	payload, err := ser.Serialize(topic, &value)
        // the first 5 bytes would be [0,-1]
	...

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): v1.9.2 / v1.9.2
  • Apache Kafka broker version: irrelevant
  • Client configuration: ConfigMap{...}: irrelevant
  • Operating system: irrelevant
  • Provide client logs (with "debug": ".." as necessary): irrelevant
  • Provide broker log excerpts: irrelevant
  • Critical issue
@slice-dinesh
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants