Skip to content

Commit

Permalink
Avoid using same OpAddEntry between different ledger handles (apache#…
Browse files Browse the repository at this point in the history
…5942)

### Motivation

Avoid using same OpAddEntry between different ledger handles.

### Modifications

Add state for OpAddEntry, if op handled by new ledger handle, the op will set to CLOSED state, after the legacy callback happens will check the op state, only INITIATED can be processed.

When ledger rollover happens, pendingAddEntries will be processed. when process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry to avoid different ledger handles use same OpAddEntry.
  • Loading branch information
codelipenghui authored Jan 9, 2020
1 parent 5fc4a90 commit 7ec17b2
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -553,13 +553,12 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> {
pendingAddEntries.add(addOperation);

internalAsyncAddEntry(addOperation);
}));
}

private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
pendingAddEntries.add(addOperation);
final State state = STATE_UPDATER.get(this);
if (state == State.Fenced) {
addOperation.failed(new ManagedLedgerFencedException());
Expand Down Expand Up @@ -1294,9 +1293,24 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
}

// Avoid use same OpAddEntry between different ledger handle
int pendingSize = pendingAddEntries.size();
OpAddEntry existsOp;
do {
existsOp = pendingAddEntries.poll();
if (existsOp != null) {
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
existsOp.close();
existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.callback, existsOp.ctx);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
}
} while (existsOp != null && --pendingSize > 0);

// Process all the pending addEntry requests
for (OpAddEntry op : pendingAddEntries) {
op.setLedger(currentLedger);
++currentLedgerEntries;
currentLedgerSize += op.data.readableBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
*
*/
class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
private ManagedLedgerImpl ml;
protected ManagedLedgerImpl ml;
LedgerHandle ledger;
private long entryId;

@SuppressWarnings("unused")
private volatile AddEntryCallback callback;
private static final AtomicReferenceFieldUpdater<OpAddEntry, AddEntryCallback> callbackUpdater =
AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, AddEntryCallback.class, "callback");
protected volatile AddEntryCallback callback;
Object ctx;
volatile long addOpCount;
private static final AtomicLongFieldUpdater<OpAddEntry> ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater
Expand All @@ -60,8 +62,16 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
ByteBuf data;
private int dataLength;

private static final AtomicReferenceFieldUpdater<OpAddEntry, AddEntryCallback> callbackUpdater =
AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, AddEntryCallback.class, "callback");
private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
volatile State state;

enum State {
OPEN,
INITIATED,
COMPLETED,
CLOSED
}

public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
Expand All @@ -75,6 +85,7 @@ public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCall
op.closeWhenDone = false;
op.entryId = -1;
op.startTime = System.nanoTime();
op.state = State.OPEN;
ml.mbean.addAddEntrySample(op.dataLength);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
Expand All @@ -91,12 +102,16 @@ public void setCloseWhenDone(boolean closeWhenDone) {
}

public void initiate() {
ByteBuf duplicateBuffer = data.retainedDuplicate();
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {
ByteBuf duplicateBuffer = data.retainedDuplicate();

// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);;
lastInitTime = System.nanoTime();
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
lastInitTime = System.nanoTime();
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
} else {
log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state);
}
}

public void failed(ManagedLedgerException e) {
Expand All @@ -110,6 +125,13 @@ public void failed(ManagedLedgerException e) {

@Override
public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) {

if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) {
log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(), entryId);
OpAddEntry.this.recycle();
return;
}

if (ledger.getId() != lh.getId()) {
log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), lh.getId());
}
Expand Down Expand Up @@ -216,6 +238,7 @@ private boolean checkAndCompleteOp(Object ctx) {

void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
if (checkAndCompleteOp(ctx)) {
this.close();
this.handleAddFailure(ledger);
}
}
Expand All @@ -237,6 +260,14 @@ void handleAddFailure(final LedgerHandle ledger) {
ml.ledgerClosed(ledger);
}));
}

void close() {
STATE_UPDATER.set(OpAddEntry.this, State.CLOSED);
}

public State getState() {
return state;
}

private final Handle<OpAddEntry> recyclerHandle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import com.google.common.collect.Sets;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -112,6 +114,7 @@
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Expand Down Expand Up @@ -2489,6 +2492,41 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null);
}

@Test
public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(0);
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, zkc, config);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger");

List<OpAddEntry> oldOps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
OpAddEntry op = OpAddEntry.create(ledger, ByteBufAllocator.DEFAULT.buffer(128), null, null);
if (i > 4) {
op.setLedger(mock(LedgerHandle.class));
}
oldOps.add(op);
ledger.pendingAddEntries.add(op);
}

ledger.updateLedgersIdsComplete(mock(Stat.class));
for (int i = 0; i < 10; i++) {
OpAddEntry oldOp = oldOps.get(i);
if (i > 4) {
Assert.assertEquals(oldOp.getState(), OpAddEntry.State.CLOSED);
} else {
Assert.assertEquals(oldOp.getState(), OpAddEntry.State.INITIATED);
}
OpAddEntry newOp = ledger.pendingAddEntries.poll();
Assert.assertEquals(newOp.getState(), OpAddEntry.State.INITIATED);
if (i > 4) {
Assert.assertNotSame(oldOp, newOp);
} else {
Assert.assertSame(oldOp, newOp);
}
}
}

private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
Expand Down

0 comments on commit 7ec17b2

Please sign in to comment.