Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for tombstones (null value in messages) does not work #7407

Open
fmiguelez opened this issue Jul 1, 2020 · 11 comments · Fixed by #7408
Open

Support for tombstones (null value in messages) does not work #7407

fmiguelez opened this issue Jul 1, 2020 · 11 comments · Fixed by #7408
Labels
help wanted lifecycle/stale type/bug The PR fixed a bug or issue reported a bug

Comments

@fmiguelez
Copy link
Contributor

Describe the bug

The solution provided by #7139 to the BUG #4803 does not work.

  • When trying to read a message with null value a NullPointerException is thrown in other part of the code.
        msg = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS));

	java.lang.NullPointerException
		at org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.updateNumMsgsReceived(ConsumerStatsRecorderImpl.java:169)
		at org.apache.pulsar.client.impl.ConsumerImpl.messageProcessed(ConsumerImpl.java:1423)
		at org.apache.pulsar.client.impl.ConsumerImpl.internalReceive(ConsumerImpl.java:431)
		at org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:175)
                ...
  • It should not be required to explicitly indicate a null value to producer (only-key values should work just just fine). Exception thrown when working with implicit null value messages is EOFException in this case (the same before this a solution was provided).

To Reproduce
I have created a test project to reproduce these issues (null values implicitly and explictly set with both schema and schemaless consumer): pulsar-tombstone-test

Read README.md to reproduce it.

Expected behavior
Tombstones (null values in mesages with our without schema but with key) should be supported whether you indicate an schema or not and whether you explicitly indicate a null value or not (implicit null value). All tests should pass in the example project.

Producer<byte []> schemalessProducer = client.newProducer(Schema.BYTES).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();

Consumer<byte []> schemalessConsumer = client.newConsumer(Schema.BYTES).topic(TOPIC).subscriptionName("test")
				.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

// Implicit tombstone  without schema
schemalessProducer.key("1").send();

// Explicit tombstone (the one supposedly to work)
schemalessProducer.key("2").value(null).send();

Message<byte[]> implicitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue());
Message<byte[]> explictitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue())

System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
Producer<DummyObject> schemaProducer = client.newProducer(Schema.AVRO(DummyObject.class)).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();

Consumer<DummyObject> schemaConsumer = client.newConsumer(Schema.AVRO(DummyObject.class)).topic(TOPIC).subscriptionName("test")
				.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

// Implicit tombstone  without schema
schemaProducer.key("1").send();

// Explicit tombstone (the one supposedly to work)
schemaProducer.key("2").value(null).send();

Message<DummyObject> implicitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue());
Message<DummyObject> explictitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue())

System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));

Screenshots

Desktop (please complete the following information):

  • Windows 10 with Docker Deskto to run Pulsar containers

Additional context

@fmiguelez fmiguelez added the type/bug The PR fixed a bug or issue reported a bug label Jul 1, 2020
fmiguelez added a commit to fmiguelez/pulsar that referenced this issue Jul 1, 2020
Added check to prevent NPE when a tombstone (null value) is produced.

Documented in apache#7407
@jiazhai
Copy link
Member

jiazhai commented Jul 2, 2020

Thanks @fmiguelez for open the issue and related PR. @gaoran10 to help review the Pr.

sijie pushed a commit that referenced this issue Jul 6, 2020
Added check to prevent NPE when a tombstone (null value) is produced.

Fixes #7407
@fmiguelez
Copy link
Contributor Author

I do not think that this ticket should be closed as pull request only solves one of the cases described by the tests

@sijie sijie reopened this Jul 7, 2020
@sijie
Copy link
Member

sijie commented Jul 7, 2020

It was closed because of Fixes comment in the description. I have reopened it.

codelipenghui pushed a commit to streamnative/pulsar-archived that referenced this issue Jul 14, 2020
Added check to prevent NPE when a tombstone (null value) is produced.

Fixes apache#7407

(cherry picked from commit 90c2f4a)
wolfstudy pushed a commit that referenced this issue Jul 29, 2020
Added check to prevent NPE when a tombstone (null value) is produced.

Fixes #7407

(cherry picked from commit 90c2f4a)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
Added check to prevent NPE when a tombstone (null value) is produced.

Fixes apache#7407
@gaoran10
Copy link
Contributor

gaoran10 commented Sep 24, 2020

@fmiguelez Sorry, late response.

  1. producer.newMessage().key("1").send()

Currently, if we send messages as above the message payload is ByteBuffer.allocate(0), thus if the topic with Avro schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.

org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException

	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
	at org.testng.TestRunner.privateRun(TestRunner.java:648)
	at org.testng.TestRunner.run(TestRunner.java:505)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
	at org.testng.TestNG.runSuites(TestNG.java:1049)
	at org.testng.TestNG.run(TestNG.java:1017)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
Caused by: java.io.EOFException
	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
	... 28 more
  1. producer.newMessage().key("1").value(null).send()

If we specify the value is null, the messageMetaData will record the flag msgMetadataBuilder.setNullValue(true), the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null value directly.

  1. Currently, the message value has three forms, normal value, empty byte array, and null value, if topics use the Avro schema consumer couldn't handle the empty byte array payload, maybe the consumer should return a null value for the empty byte array.

@fmiguelez
Copy link
Contributor Author

@sijie, @gaoran10 Any update on this?

@gaoran10
Copy link
Contributor

@fmiguelez Is there any block about this issue? Could you provide more details?

@longtengz
Copy link

longtengz commented Dec 6, 2021

I think pulsar sql can't deal with empty payload messages because of the same issues described here. See my issue #13127.

@fmiguelez
Copy link
Contributor Author

@gaoran10
I have taken another look at the current status of this issue. I have upgraded pulsar-tombstone-test to use Pulsar 2.8.4. Current status is:

  • ✔ Explicit tombstones (setting null of message value on publication) are consumed without error
  • ❌ Implicit tombstones (only key is set on publication) throw an EOFException on consumption
  • ❌ Functions/sinks fail to receive a null as input on consumption throwing an exception instead (with both explicit and implicit tombstones)

Third use case is not covered by the provided sample test.

@gaoran10
Copy link
Contributor

gaoran10 commented Feb 9, 2023

@fmiguelez Sorry for the late response, thanks for your tests.

❌ Implicit tombstones (only key is set on publication) throw an EOFException on consumption

Currently, if we don't specify value when producing messages, the payload will be an empty byte array, this will cause EOFException when consumers try to deserialize the payload. I'll handle this problem.

❌ Functions/sinks fail to receive a null as input on consumption throwing an exception instead (with both explicit and implicit tombstones)

Could you provide error logs? Maybe caused by this method PulsarRecord#getValue?

@dao-jun
Copy link
Member

dao-jun commented May 19, 2024

this issue looks fixed by #9046, closing

@dao-jun dao-jun closed this as completed May 19, 2024
@longtengz
Copy link

@dao-jun please re-open this issue, as it was clearly stated by others in their comments (after the pr you linked was merged) that there's still issue not fixed.

@dao-jun dao-jun reopened this Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted lifecycle/stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants