Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Remove failed OpAddEntry from pendingAddEntries #23817

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public void initiate() {
payloadProcessorHandle = ml.getManagedLedgerInterceptor()
.processPayloadBeforeLedgerWrite(this.getCtx(), duplicateBuffer);
} catch (Exception e) {
ml.pendingAddEntries.remove(this);
ReferenceCountUtil.safeRelease(duplicateBuffer);
log.error("[{}] Error processing payload before ledger write", ml.getName(), e);
this.failed(new ManagedLedgerException.ManagedLedgerInterceptException(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.bookkeeper.mledger.impl;

import static org.testng.Assert.assertEquals;
import static org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest.TestPayloadProcessor;
import static org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.TestPayloadProcessor;
import java.util.HashSet;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,13 +28,13 @@
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest;
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

/***
* Differ to {@link MangedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified
* Differ to {@link ManagedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified
* by "default".
*/
@Slf4j
Expand Down Expand Up @@ -73,7 +73,7 @@ public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws Exception {
switchLedgerManually(ledger);

// verify.
assertEquals(currentLedgerSize, MangedLedgerInterceptorImplTest.calculatePreciseSize(ledger));
assertEquals(currentLedgerSize, ManagedLedgerInterceptorImplTest.calculatePreciseSize(ledger));

// cleanup.
cursor.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -33,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand All @@ -59,8 +59,8 @@
import org.testng.annotations.Test;

@Test(groups = "broker")
public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);
public class ManagedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImplTest.class);

public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
@Override
Expand Down Expand Up @@ -446,26 +446,33 @@ public Processor inputProcessor() {
return new Processor() {
@Override
public ByteBuf process(Object contextObj, ByteBuf inputPayload) {
throw new RuntimeException(failureMsg);
Commands.skipBrokerEntryMetadataIfExist(inputPayload);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In branch-3.0 & branch-3.3, the test fails due to this line. Removing it makes the test pass.

java.lang.IndexOutOfBoundsException: readerIndex(0) + length(2) exceeds writerIndex(1): UnpooledDuplicatedByteBuf(ridx: 0, widx: 1, cap: 256, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 1, cap: 256))
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[netty-buffer-4.1.116.Final.jar:4.1.116.Final]
	at io.netty.buffer.AbstractByteBuf.readShort(AbstractByteBuf.java:749) ~[netty-buffer-4.1.116.Final.jar:4.1.116.Final]
	at org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist(Commands.java:1706) ~[classes/:?]
	at org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest$3$1.process(ManagedLedgerInterceptorImplTest.java:454) ~[test-classes/:?]
	at org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl.processPayload(ManagedLedgerInterceptorImpl.java:171) ~[classes/:?]
	at org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl.processPayloadBeforeLedgerWrite(ManagedLedgerInterceptorImpl.java:196) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.OpAddEntry.initiate(OpAddEntry.java:143) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:873) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$2(ManagedLedgerImpl.java:787) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46) ~[bookkeeper-common-4.16.6.jar:4.16.6]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaoran10 I'll remove this line while backporting to branch-3.0 and branch-3.3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks.

if (inputPayload.readBoolean()) {
throw new RuntimeException(failureMsg);
}
return inputPayload;
}

@Override
public void release(ByteBuf processedPayload) {
// no-op
fail("the release method can't be reached");
}
};
}
})));

var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", config);
var countDownLatch = new CountDownLatch(1);
int count = 10;
var countDownLatch = new CountDownLatch(count);
var successCount = new AtomicInteger(0);
var expectedException = new ArrayList<Exception>();
ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() {

var addEntryCallback = new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
entryData.release();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the original test, calling entryData.release() seems to be invalid since there are warning logs:

2025-01-07T19:48:06,671 - WARN  - [test-OrderedScheduler-0-0:SingleThreadSafeScheduledExecutorService] - Unexpected throwable from task class org.apache.bookkeeper.mledger.impl.OpAddEntry: refCnt: 0, decrement: 1
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101) ~[netty-buffer-4.1.116.Final.jar:4.1.116.Final]
	at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
	at org.apache.bookkeeper.mledger.impl.OpAddEntry.run(OpAddEntry.java:277) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46) ~[bookkeeper-common-4.17.1.jar:4.17.1]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

Copy link
Contributor Author

@gaoran10 gaoran10 Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, we don't need to release the ByteBuf here. I'll fix the test. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

countDownLatch.countDown();
successCount.incrementAndGet();
}

@Override
Expand All @@ -474,10 +481,23 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
expectedException.add(exception);
countDownLatch.countDown();
}
}, null);
};

for (int i = 0; i < count; i++) {
if (i % 2 == 0) {
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(true), addEntryCallback, null);
} else {
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(false), addEntryCallback, null);
}
}

countDownLatch.await();
assertEquals(expectedException.size(), 1);
assertEquals(expectedException.get(0).getCause().getMessage(), failureMsg);
assertEquals(expectedException.size(), count / 2);
assertEquals(successCount.get(), count / 2);
for (Exception e : expectedException) {
assertEquals(e.getCause().getMessage(), failureMsg);
}
ledger.close();
}

}
Loading