-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Remove failed OpAddEntry from pendingAddEntries #23817
[fix][broker] Remove failed OpAddEntry from pendingAddEntries #23817
Conversation
Does it will lead to msg out of order? |
No, because the failed OpAddEntry wasn't removed from the |
(cherry picked from commit 420f62e)
@gaoran10 The after cherry-picking to branch-3.0 and branch-3.3, the test fails.
In branch-4.0, there wasn't a problem in applying the changes. Can you please handle cherry-picking and backporting to branch-3.0 and branch-3.3? |
@@ -446,26 +446,33 @@ public Processor inputProcessor() { | |||
return new Processor() { | |||
@Override | |||
public ByteBuf process(Object contextObj, ByteBuf inputPayload) { | |||
throw new RuntimeException(failureMsg); | |||
Commands.skipBrokerEntryMetadataIfExist(inputPayload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In branch-3.0 & branch-3.3, the test fails due to this line. Removing it makes the test pass.
java.lang.IndexOutOfBoundsException: readerIndex(0) + length(2) exceeds writerIndex(1): UnpooledDuplicatedByteBuf(ridx: 0, widx: 1, cap: 256, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 1, cap: 256))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[netty-buffer-4.1.116.Final.jar:4.1.116.Final]
at io.netty.buffer.AbstractByteBuf.readShort(AbstractByteBuf.java:749) ~[netty-buffer-4.1.116.Final.jar:4.1.116.Final]
at org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist(Commands.java:1706) ~[classes/:?]
at org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest$3$1.process(ManagedLedgerInterceptorImplTest.java:454) ~[test-classes/:?]
at org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl.processPayload(ManagedLedgerInterceptorImpl.java:171) ~[classes/:?]
at org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl.processPayloadBeforeLedgerWrite(ManagedLedgerInterceptorImpl.java:196) ~[classes/:?]
at org.apache.bookkeeper.mledger.impl.OpAddEntry.initiate(OpAddEntry.java:143) ~[classes/:?]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:873) ~[classes/:?]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$2(ManagedLedgerImpl.java:787) ~[classes/:?]
at org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46) ~[bookkeeper-common-4.16.6.jar:4.16.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaoran10 I'll remove this line while backporting to branch-3.0 and branch-3.3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, thanks.
(cherry picked from commit 420f62e)
(cherry picked from commit 420f62e)
var expectedException = new ArrayList<Exception>(); | ||
ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() { | ||
|
||
var addEntryCallback = new AsyncCallbacks.AddEntryCallback() { | ||
@Override | ||
public void addComplete(Position position, ByteBuf entryData, Object ctx) { | ||
entryData.release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the original test, calling entryData.release()
seems to be invalid since there are warning logs:
2025-01-07T19:48:06,671 - WARN - [test-OrderedScheduler-0-0:SingleThreadSafeScheduledExecutorService] - Unexpected throwable from task class org.apache.bookkeeper.mledger.impl.OpAddEntry: refCnt: 0, decrement: 1
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101) ~[netty-buffer-4.1.116.Final.jar:4.1.116.Final]
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at org.apache.bookkeeper.mledger.impl.OpAddEntry.run(OpAddEntry.java:277) ~[classes/:?]
at org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46) ~[bookkeeper-common-4.17.1.jar:4.17.1]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry, we don't need to release the ByteBuf here. I'll fix the test. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Motivation
The PR cached the exceptions for entry payload interceptor processor, but the failed OpAddEntry wasn't removed from pendingAddEntries, this will cause the next write to fail.
Modifications
Remove failed OpAddEntry from pendingAddEntries.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: