From 27348ee15d3c6859e6c84f833dda65077d56fe64 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 21 Feb 2020 16:31:22 +0800 Subject: [PATCH 1/2] one of reader start setting --- .../java/org/apache/pulsar/client/impl/ReaderTest.java | 2 +- .../apache/pulsar/client/impl/ReaderBuilderImpl.java | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index e61089b81fd21..ba05147ac9f2f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -242,7 +242,7 @@ public void testReaderWithTimeLong() throws Exception { // (3) Create reader and set position 1 hour back so, it should only read messages which are 2 hours old which // published on step 2 - Reader reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest) + Reader reader = pulsarClient.newReader().topic(topic) .startMessageFromRollbackDuration(2, TimeUnit.HOURS).create(); List receivedMessageIds = Lists.newArrayList(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index 7901320cd427e..1c62a472254e4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -82,9 +82,15 @@ public CompletableFuture> createAsync() { .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); } - if (conf.getStartMessageId() == null) { + if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 || + conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) { return FutureUtil - .failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder")); + .failedFuture(new IllegalArgumentException( + "Start message id or start message from roll back must be specified but they cannot be specified at the same time")); + } + + if (conf.getStartMessageFromRollbackDurationInSec() > 0) { + conf.setStartMessageId(MessageId.earliest); } return client.createReaderAsync(conf, schema); From a1cf29222d2c65a575c737f4e44daeac2745994b Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 22 Feb 2020 19:18:01 +0800 Subject: [PATCH 2/2] add test --- .../pulsar/client/impl/BuildersTest.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java index 2560d674a600f..4f7554e7bcbef 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -26,8 +26,12 @@ import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.testng.annotations.Test; @@ -96,5 +100,26 @@ public void readerBuilderLoadConfTest() throws Exception { assertTrue(obj instanceof ReaderConfigurationData); assertEquals(((ReaderConfigurationData) obj).getTopicName(), topicName); assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), messageId); + client.close(); + } + + @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*") + public void shouldNotSetTwoOptAtTheSameTime() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest).startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) { + // no-op + } finally { + client.close(); + } + } + + @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*") + public void shouldSetOneStartOpt() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + try (Reader reader = client.newReader().topic("abc").create()) { + // no-op + } finally { + client.close(); + } } }