From 3c9047bcd33dec47bda1ddf75779b838acec0315 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Aug 2024 11:57:52 +0800 Subject: [PATCH] [fix][broker] Fix shadow topics cannot be consumed when the entry is not cached (#23147) For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`. Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because `ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger. - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/33 (cherry picked from commit 15b88d250818bada5c1a94f5c54ef7806f88a500) (cherry picked from commit 14b3672c08838d006e32008bec8ed52d889b4859) --- .../mledger/impl/ManagedLedgerImpl.java | 2 + .../impl/cache/EntryCacheDisabled.java | 4 +- .../impl/cache/RangeEntryCacheImpl.java | 4 +- .../mledger/impl/cache/ReadEntryUtils.java | 54 +++++ .../mledger/impl/EntryCacheManagerTest.java | 7 +- .../mledger/impl/EntryCacheTest.java | 187 +++++++----------- .../mledger/impl/OffloadPrefixReadTest.java | 2 +- .../persistent/ShadowTopicRealBkTest.java | 109 ++++++++++ 8 files changed, 252 insertions(+), 117 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1ffa275c2ce642..a540828000e1c1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -4100,6 +4100,8 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod public static ManagedLedgerException createManagedLedgerException(Throwable t) { if (t instanceof org.apache.bookkeeper.client.api.BKException) { return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) t).getCode()); + } else if (t instanceof ManagedLedgerException) { + return (ManagedLedgerException) t; } else if (t instanceof CompletionException && !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) { return createManagedLedgerException(t.getCause()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index d1050e0062826a..64595dfe47e66e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -79,7 +79,7 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) { @Override public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - lh.readAsync(firstEntry, lastEntry).thenAcceptAsync( + ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync( ledgerEntries -> { List entries = new ArrayList<>(); long totalSize = 0; @@ -107,7 +107,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole @Override public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { - lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync( + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync( (ledgerEntries, exception) -> { if (exception != null) { ml.invalidateLedgerHandle(lh); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 27aec6f178e398..21eb62e5a8caa4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -248,7 +248,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength()); callback.readEntryComplete(cachedEntry, ctx); } else { - lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync( + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync( ledgerEntries -> { try { Iterator iterator = ledgerEntries.iterator(); @@ -428,7 +428,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; - CompletableFuture> readResult = lh.readAsync(firstEntry, lastEntry) + CompletableFuture> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) .thenApply( ledgerEntries -> { requireNonNull(ml.getName()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java new file mode 100644 index 00000000000000..5cf5f053f0ce7f --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; + +class ReadEntryUtils { + + static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, + long lastEntry) { + if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { + // The read handle comes from another managed ledger, in this case, we can only compare the entry range with + // the LAC of that read handle. Specifically, it happens when this method is called by a + // ReadOnlyManagedLedgerImpl object. + return handle.readAsync(firstEntry, lastEntry); + } + // Compare the entry range with the lastConfirmedEntry maintained by the managed ledger because the entry cache + // of `ShadowManagedLedgerImpl` reads entries via `ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed` + final var lastConfirmedEntry = ml.getLastConfirmedEntry(); + if (lastConfirmedEntry == null) { + return CompletableFuture.failedFuture(new ManagedLedgerException( + "LastConfirmedEntry is null when reading ledger " + handle.getId())); + } + if (handle.getId() > lastConfirmedEntry.getLedgerId()) { + return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is " + + lastConfirmedEntry + " when reading ledger " + handle.getId())); + } + if (handle.getId() == lastConfirmedEntry.getLedgerId() && lastEntry > lastConfirmedEntry.getEntryId()) { + return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is " + + lastConfirmedEntry + " when reading entry " + lastEntry)); + } + return handle.readUnconfirmedAsync(firstEntry, lastEntry); + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 1ab3198498ac36..de8c6f5d7d0759 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -43,6 +44,7 @@ import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.testng.Assert; import org.testng.annotations.Test; @@ -390,6 +392,9 @@ void entryCacheDisabledAsyncReadEntry() throws Exception { EntryCache entryCache = cacheManager.getEntryCache(ml1); final CountDownLatch counter = new CountDownLatch(1); + when(ml1.getLastConfirmedEntry()).thenReturn(new PositionImpl(1L, 1L)); + when(ml1.getOptionalLedgerInfo(lh.getId())).thenReturn(Optional.of(mock( + MLDataFormats.ManagedLedgerInfo.LedgerInfo.class))); entryCache.asyncReadEntry(lh, new PositionImpl(1L,1L), new AsyncCallbacks.ReadEntryCallback() { public void readEntryComplete(Entry entry, Object ctx) { Assert.assertNotEquals(entry, null); @@ -404,7 +409,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { }, null); counter.await(); - verify(lh).readAsync(anyLong(), anyLong()); + verify(lh).readUnconfirmedAsync(anyLong(), anyLong()); } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index c8338798f271bd..f6f395f7773108 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -25,14 +25,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; - +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import io.netty.buffer.Unpooled; - import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; - +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -45,8 +46,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; -import org.testng.Assert; import org.testng.annotations.Test; public class EntryCacheTest extends MockedBookKeeperTestCase { @@ -60,6 +61,8 @@ protected void setUpTestCase() throws Exception { when(ml.getExecutor()).thenReturn(executor); when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml)); when(ml.getConfig()).thenReturn(new ManagedLedgerConfig()); + when(ml.getOptionalLedgerInfo(0L)).thenReturn(Optional.of(mock( + MLDataFormats.ManagedLedgerInfo.LedgerInfo.class))); } @Test(timeOut = 5000) @@ -76,22 +79,13 @@ public void testRead() throws Exception { entryCache.insert(EntryImpl.create(0, i, data)); } - final CountDownLatch counter = new CountDownLatch(1); - - entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() { - public void readEntriesComplete(List entries, Object ctx) { - assertEquals(entries.size(), 10); - entries.forEach(Entry::release); - counter.countDown(); - } - - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - Assert.fail("should not have failed"); - } - }, null); - counter.await(); + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9)); + final var entries = readEntry(entryCache, lh, 0, 9, false, null); + assertEquals(entries.size(), 10); + entries.forEach(Entry::release); // Verify no entries were read from bookkeeper + verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); verify(lh, never()).readAsync(anyLong(), anyLong()); } @@ -109,19 +103,9 @@ public void testReadMissingBefore() throws Exception { entryCache.insert(EntryImpl.create(0, i, data)); } - final CountDownLatch counter = new CountDownLatch(1); - - entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() { - public void readEntriesComplete(List entries, Object ctx) { - assertEquals(entries.size(), 10); - counter.countDown(); - } - - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - Assert.fail("should not have failed"); - } - }, null); - counter.await(); + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9)); + final var entries = readEntry(entryCache, lh, 0, 9, false, null); + assertEquals(entries.size(), 10); } @Test(timeOut = 5000) @@ -138,19 +122,9 @@ public void testReadMissingAfter() throws Exception { entryCache.insert(EntryImpl.create(0, i, data)); } - final CountDownLatch counter = new CountDownLatch(1); - - entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() { - public void readEntriesComplete(List entries, Object ctx) { - assertEquals(entries.size(), 10); - counter.countDown(); - } - - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - Assert.fail("should not have failed"); - } - }, null); - counter.await(); + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9)); + final var entries = readEntry(entryCache, lh, 0, 9, false, null); + assertEquals(entries.size(), 10); } @Test(timeOut = 5000) @@ -168,19 +142,9 @@ public void testReadMissingMiddle() throws Exception { entryCache.insert(EntryImpl.create(0, 8, data)); entryCache.insert(EntryImpl.create(0, 9, data)); - final CountDownLatch counter = new CountDownLatch(1); - - entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() { - public void readEntriesComplete(List entries, Object ctx) { - assertEquals(entries.size(), 10); - counter.countDown(); - } - - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - Assert.fail("should not have failed"); - } - }, null); - counter.await(); + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9)); + final var entries = readEntry(entryCache, lh, 0, 9, false, null); + assertEquals(entries.size(), 10); } @Test(timeOut = 5000) @@ -198,19 +162,9 @@ public void testReadMissingMultiple() throws Exception { entryCache.insert(EntryImpl.create(0, 5, data)); entryCache.insert(EntryImpl.create(0, 8, data)); - final CountDownLatch counter = new CountDownLatch(1); - - entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() { - public void readEntriesComplete(List entries, Object ctx) { - assertEquals(entries.size(), 10); - counter.countDown(); - } - - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - Assert.fail("should not have failed"); - } - }, null); - counter.await(); + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9)); + final var entries = readEntry(entryCache, lh, 0, 9, false, null); + assertEquals(entries.size(), 10); } @Test @@ -222,19 +176,25 @@ public void testCachedReadReturnsDifferentByteBuffer() throws Exception { @Cleanup(value = "clear") EntryCache entryCache = cacheManager.getEntryCache(ml); - CompletableFuture> cacheMissFutureEntries = new CompletableFuture<>(); - - entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() { - public void readEntriesComplete(List entries, Object ctx) { - cacheMissFutureEntries.complete(entries); - } - - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - cacheMissFutureEntries.completeExceptionally(exception); - } - }, null); - - List cacheMissEntries = cacheMissFutureEntries.get(); + readEntry(entryCache, lh, 0, 1, true, e -> { + assertTrue(e instanceof ManagedLedgerException); + assertTrue(e.getMessage().contains("LastConfirmedEntry is null when reading ledger 0")); + }); + + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(-1, -1)); + readEntry(entryCache, lh, 0, 1, true, e -> { + assertTrue(e instanceof ManagedLedgerException); + assertTrue(e.getMessage().contains("LastConfirmedEntry is -1:-1 when reading ledger 0")); + }); + + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 0)); + readEntry(entryCache, lh, 0, 1, true, e -> { + assertTrue(e instanceof ManagedLedgerException); + assertTrue(e.getMessage().contains("LastConfirmedEntry is 0:0 when reading entry 1")); + }); + + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 1)); + List cacheMissEntries = readEntry(entryCache, lh, 0, 1, true, null); // Ensure first entry is 0 and assertEquals(cacheMissEntries.size(), 2); assertEquals(cacheMissEntries.get(0).getEntryId(), 0); @@ -243,19 +203,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { // Move the reader index to simulate consumption cacheMissEntries.get(0).getDataBuffer().readerIndex(10); - CompletableFuture> cacheHitFutureEntries = new CompletableFuture<>(); - - entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() { - public void readEntriesComplete(List entries, Object ctx) { - cacheHitFutureEntries.complete(entries); - } - - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - cacheHitFutureEntries.completeExceptionally(exception); - } - }, null); - - List cacheHitEntries = cacheHitFutureEntries.get(); + List cacheHitEntries = readEntry(entryCache, lh, 0, 1, true, null); assertEquals(cacheHitEntries.get(0).getEntryId(), 0); assertEquals(cacheHitEntries.get(0).getDataBuffer().readerIndex(), 0); } @@ -269,7 +217,7 @@ public void testReadWithError() throws Exception { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new BKNoSuchLedgerExistsException()); return future; - }).when(lh).readAsync(anyLong(), anyLong()); + }).when(lh).readUnconfirmedAsync(anyLong(), anyLong()); EntryCacheManager cacheManager = factory.getEntryCacheManager(); @Cleanup(value = "clear") @@ -278,18 +226,9 @@ public void testReadWithError() throws Exception { byte[] data = new byte[10]; entryCache.insert(EntryImpl.create(0, 2, data)); - final CountDownLatch counter = new CountDownLatch(1); - - entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() { - public void readEntriesComplete(List entries, Object ctx) { - Assert.fail("should not complete"); - } - - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - counter.countDown(); - } - }, null); - counter.await(); + when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9)); + readEntry(entryCache, lh, 0, 9, false, e -> + assertTrue(e instanceof ManagedLedgerException.LedgerNotExistException)); } static ReadHandle getLedgerHandle() { @@ -306,9 +245,35 @@ static ReadHandle getLedgerHandle() { LedgerEntries ledgerEntries = mock(LedgerEntries.class); doAnswer((invocation2) -> entries.iterator()).when(ledgerEntries).iterator(); return CompletableFuture.completedFuture(ledgerEntries); - }).when(lh).readAsync(anyLong(), anyLong()); + }).when(lh).readUnconfirmedAsync(anyLong(), anyLong()); return lh; } + private List readEntry(EntryCache entryCache, ReadHandle lh, long firstEntry, long lastEntry, + boolean shouldCacheEntry, Consumer assertion) + throws InterruptedException { + final var future = new CompletableFuture>(); + entryCache.asyncReadEntry(lh, firstEntry, lastEntry, shouldCacheEntry, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + future.complete(entries); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + try { + final var entries = future.get(); + assertNull(assertion); + return entries; + } catch (ExecutionException e) { + if (assertion != null) { + assertion.accept(e.getCause()); + } + return List.of(); + } + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 00c50249b4ac24..59e815fb1b4a5c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -322,7 +322,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { - return unsupported(); + return readAsync(firstEntry, lastEntry); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java new file mode 100644 index 00000000000000..9d810b06a7c7b6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import com.google.common.collect.Lists; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.PortManager; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ShadowTopicRealBkTest { + + private static final String cluster = "test"; + private final int zkPort = PortManager.nextLockedFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private PulsarService pulsar; + private PulsarAdmin admin; + + @BeforeClass + public void setup() throws Exception { + bk.start(); + final var config = new ServiceConfiguration(); + config.setClusterName(cluster); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:localhost:" + zkPort); + pulsar = new PulsarService(config); + pulsar.start(); + admin = pulsar.getAdminClient(); + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()) + .brokerServiceUrl(pulsar.getBrokerServiceUrl()).build()); + admin.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Set.of(cluster)).build()); + admin.namespaces().createNamespace("public/default"); + } + + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + if (pulsar != null) { + pulsar.close(); + } + bk.stop(); + } + + @Test + public void testReadFromStorage() throws Exception { + final var sourceTopic = TopicName.get("test-read-from-source").toString(); + final var shadowTopic = sourceTopic + "-shadow"; + + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()->{ + final var sourcePersistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopicIfExists(sourceTopic).get().orElseThrow(); + final var replicator = (ShadowReplicator) sourcePersistentTopic.getShadowReplicators().get(shadowTopic); + Assert.assertNotNull(replicator); + Assert.assertEquals(String.valueOf(replicator.getState()), "Started"); + }); + + final var client = pulsar.getClient(); + // When the message was sent, there is no cursor, so it will read from the cache + final var producer = client.newProducer().topic(sourceTopic).create(); + producer.send("message".getBytes()); + // 1. Verify RangeEntryCacheImpl#readFromStorage + final var consumer = client.newConsumer().topic(shadowTopic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + final var msg = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertEquals(msg.getValue(), "message".getBytes()); + + // 2. Verify EntryCache#asyncReadEntry + final var shadowManagedLedger = ((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get() + .orElseThrow()).getManagedLedger(); + Assert.assertTrue(shadowManagedLedger instanceof ShadowManagedLedgerImpl); + shadowManagedLedger.getEarliestMessagePublishTimeInBacklog().get(3, TimeUnit.SECONDS); + } +}