Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Schema Resolve] When the parameter of the listener method is an Object, JSON Schema cannot be used, and Byte Schema will be automatically used as a replacement. #773

Open
kaifahm opened this issue Aug 10, 2024 · 1 comment
Assignees
Labels
status: waiting-for-feedback We need additional information before we can continue

Comments

@kaifahm
Copy link

kaifahm commented Aug 10, 2024

My Code

Below is my code. I want to use Object to receive the message and use JSON Schema. However, in practice, I cannot use JSON Schema; instead, Byte Schema is used, even though I explicitly specified schemaType = SchemaType.JSON.

@PulsarListener(topics = "persistent://public/default/test-topic",
    subscriptionName = "test-subscription",
    subscriptionType = SubscriptionType.Shared,
    schemaType = SchemaType.JSON
)
public void listen(Object message) {
  System.out.println("Received message: " + message);
}

Problem Analysis

After reading the source code, the issue might be here:

If rawClass is Object.class, isContainerType will return true, leading to a ResolvableType with a null rawClass. This eventually causes an error in the SchemaResolver when parsing the Schema (essentially resulting in JSONSchema.of(null);), which leads to the use of the default ByteSchema.

My suggestion is: the isContainerType method should check whether rawClass is Object. If it is, return false directly; otherwise, continue checking isAssignableFrom.

Expectations

  • The parameter of the listener method should be able to use Object and use JSON Schema.
  • If JSON Schema is explicitly specified in @PulsarListener or elsewhere, but SchemaResolver cannot resolve it correctly, an exception should be thrown instead of automatically using Byte Schema as a fallback.
  • If no Schema Type is explicitly specified and SchemaResolver fails to resolve it correctly, a warning-level log should be output instead of a debug-level log.
@onobc onobc added the status: waiting-for-triage An issue we've not yet triaged label Aug 10, 2024
@onobc onobc self-assigned this Aug 10, 2024
@onobc
Copy link
Collaborator

onobc commented Aug 18, 2024

Hi @kaifahm ,

Thanks for the detailed report w/ your code, analysis, and expectations - well done.

Let's chat about the last 2 expectations, which ultimately ask for more strict behavior when a user explicitly sets "JSON" as the schema type but it can't be resolved.

If JSON Schema is explicitly specified in @PulsarListener or elsewhere, but SchemaResolver cannot resolve it correctly, an exception should be thrown instead of automatically using Byte Schema as a fallback.

If no Schema Type is explicitly specified and SchemaResolver fails to resolve it correctly, a warning-level log should be output instead of a debug-level log.

These seem reasonable and I could also envision adding a configuration option that would drive this behavior a bit more as well (e.g. never return byte[] etc..). Although, we need to dig in a bit more on our side to understand the original motivation of the current behavior.

On the The parameter of the listener method should be able to use Object and use JSON Schema. expectation, in your example, what schema does the topic being consumed from have? Also, what underlying object type do you expect the incoming msg to be?

If you could provide an example that uses the Pulsar Java client directly (you can use the auto-configured PulsarClient in your current app) to produce and consume a message that would be super helpful as I don't see a way to create a consumer that will return a typed object without specifying the target JSON class. The best I think we can do is a GenericRecord or HashMap of fields in this case.

I look forward to your reply and sorry for the delay in my original response.

@onobc onobc added status: waiting-for-feedback We need additional information before we can continue and removed status: waiting-for-triage An issue we've not yet triaged labels Aug 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-feedback We need additional information before we can continue
Projects
None yet
Development

No branches or pull requests

2 participants