From 3ba97288eb63d1ec9972b8736b6d460e5329975e Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 24 Feb 2020 12:24:58 +0800 Subject: [PATCH] [Reader] Should set either start message id or start message from roll back duration. (#6392) Currently, when constructing a reader, users can set both start message id and start time. This is strange and the behavior should be forbidden. (cherry picked from commit f862961cb84c0cc19dff29b8db5a54a6c578fbe4) --- .../apache/pulsar/client/impl/ReaderTest.java | 2 +- .../pulsar/client/impl/ReaderBuilderImpl.java | 10 ++++++-- .../pulsar/client/impl/BuildersTest.java | 25 +++++++++++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 6418568a7d21d..e0a0fe28a78c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -231,7 +231,7 @@ public void testReaderWithTimeLong() throws Exception { // (3) Create reader and set position 1 hour back so, it should only read messages which are 2 hours old which // published on step 2 - Reader reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest) + Reader reader = pulsarClient.newReader().topic(topic) .startMessageFromRollbackDuration(2, TimeUnit.HOURS).create(); List receivedMessageIds = Lists.newArrayList(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index c024a0ba7c7f6..921cf2dcbde1f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -78,9 +78,15 @@ public CompletableFuture> createAsync() { .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); } - if (conf.getStartMessageId() == null) { + if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 || + conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) { return FutureUtil - .failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder")); + .failedFuture(new IllegalArgumentException( + "Start message id or start message from roll back must be specified but they cannot be specified at the same time")); + } + + if (conf.getStartMessageFromRollbackDurationInSec() > 0) { + conf.setStartMessageId(MessageId.earliest); } return client.createReaderAsync(conf, schema); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java index 2560d674a600f..4f7554e7bcbef 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -26,8 +26,12 @@ import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.testng.annotations.Test; @@ -96,5 +100,26 @@ public void readerBuilderLoadConfTest() throws Exception { assertTrue(obj instanceof ReaderConfigurationData); assertEquals(((ReaderConfigurationData) obj).getTopicName(), topicName); assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), messageId); + client.close(); + } + + @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*") + public void shouldNotSetTwoOptAtTheSameTime() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest).startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) { + // no-op + } finally { + client.close(); + } + } + + @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*") + public void shouldSetOneStartOpt() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + try (Reader reader = client.newReader().topic("abc").create()) { + // no-op + } finally { + client.close(); + } } }