Skip to content

Commit 2b2fda6

Browse files
committed
GEODE-6195 putIfAbsent may get a returned value caused by the same operation due to retry
Moving the check for a retried putIfAbsent to be under the RegionEntry lock. This allows the operation to propagate throughout the cluster and allows the client to receive a valid version tag, if concurrency checks are enabled.
1 parent 1631f86 commit 2b2fda6

File tree

8 files changed

+172
-210
lines changed

8 files changed

+172
-210
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the License
11+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
* or implied. See the License for the specific language governing permissions and limitations under
13+
* the License.
14+
*/
15+
package org.apache.geode.cache;
16+
17+
import static org.apache.geode.internal.Assert.assertTrue;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
21+
import org.junit.After;
22+
import org.junit.Before;
23+
import org.junit.Rule;
24+
import org.junit.Test;
25+
import org.junit.rules.ExpectedException;
26+
27+
import org.apache.geode.cache.query.CacheUtils;
28+
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
29+
import org.apache.geode.internal.cache.EventID;
30+
import org.apache.geode.internal.cache.EventIDHolder;
31+
import org.apache.geode.internal.cache.LocalRegion;
32+
import org.apache.geode.internal.cache.VMCachedDeserializable;
33+
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
34+
35+
public class RetryPutIfAbsentIntegrationTest {
36+
37+
Cache cache;
38+
39+
@Rule
40+
public ExpectedException expectedException = ExpectedException.none();
41+
42+
@Test
43+
public void duplicatePutIfAbsentIsAccepted() {
44+
final String key = "mykey";
45+
final String value = "myvalue";
46+
47+
LocalRegion myregion =
48+
(LocalRegion) CacheUtils.getCache().createRegionFactory(RegionShortcut.REPLICATE)
49+
.setConcurrencyChecksEnabled(true).create("myregion");
50+
51+
ClientProxyMembershipID id =
52+
new ClientProxyMembershipID(new InternalDistributedMember("localhost", 1));
53+
EventIDHolder clientEvent = new EventIDHolder(new EventID(new byte[] {1, 2, 3, 4, 5}, 1, 1));
54+
clientEvent.setRegion(myregion);
55+
byte[] valueBytes = new VMCachedDeserializable("myvalue", 7).getSerializedValue();
56+
System.out.println("first putIfAbsent");
57+
Object oldValue =
58+
myregion.basicBridgePutIfAbsent("mykey", valueBytes, true, null, id, true, clientEvent);
59+
assertEquals(null, oldValue);
60+
assertTrue(myregion.containsKey(key));
61+
62+
myregion.getEventTracker().clear();
63+
64+
clientEvent = new EventIDHolder(new EventID(new byte[] {1, 2, 3, 4, 5}, 1, 1));
65+
clientEvent.setRegion(myregion);
66+
clientEvent.setPossibleDuplicate(true);
67+
clientEvent.setOperation(Operation.PUT_IF_ABSENT);
68+
assertFalse(myregion.getEventTracker().hasSeenEvent(clientEvent));
69+
70+
System.out.println("second putIfAbsent");
71+
oldValue =
72+
myregion.basicBridgePutIfAbsent("mykey", valueBytes, true, null, id, true, clientEvent);
73+
assertEquals(null, oldValue);
74+
}
75+
76+
@Before
77+
public void setUp() throws Exception {
78+
CacheUtils.startCache();
79+
cache = CacheUtils.getCache();
80+
}
81+
82+
@After
83+
public void tearDown() throws Exception {
84+
CacheUtils.closeCache();
85+
}
86+
87+
}

geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java

Lines changed: 3 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3130,44 +3130,8 @@ void serverPut(EntryEventImpl event, boolean requireOldValue, Object expectedOld
31303130
@VisibleForTesting
31313131
void checkPutIfAbsentResult(EntryEventImpl event, Object value, Object result) {
31323132
if (result != null) {
3133-
// we may see a non null result possibly due to retry
3134-
if (event.hasRetried() && putIfAbsentResultHasSameValue(true, value, result)) {
3135-
if (logger.isDebugEnabled()) {
3136-
logger.debug("retried putIfAbsent and result is the value to be put,"
3137-
+ " treat as a successful putIfAbsent");
3138-
}
3139-
} else {
3140-
// customers don't see this exception
3141-
throw new EntryNotFoundException("entry existed for putIfAbsent");
3142-
}
3143-
}
3144-
}
3145-
3146-
@VisibleForTesting
3147-
boolean putIfAbsentResultHasSameValue(boolean isClient, Object valueToBePut, Object result) {
3148-
if (Token.isInvalid(result) || result == null) {
3149-
return valueToBePut == null;
3133+
throw new EntryNotFoundException("entry existed for putIfAbsent");
31503134
}
3151-
3152-
boolean isCompressedOffHeap =
3153-
isClient ? false : getAttributes().getOffHeap() && getAttributes().getCompressor() != null;
3154-
return ValueComparisonHelper.checkEquals(valueToBePut, result, isCompressedOffHeap, getCache());
3155-
}
3156-
3157-
@VisibleForTesting
3158-
boolean bridgePutIfAbsentResultHasSameValue(byte[] valueToBePut, boolean isValueToBePutObject,
3159-
Object result) {
3160-
if (Token.isInvalid(result) || result == null) {
3161-
return valueToBePut == null;
3162-
}
3163-
3164-
boolean isCompressedOffHeap =
3165-
getAttributes().getOffHeap() && getAttributes().getCompressor() != null;
3166-
if (isValueToBePutObject) {
3167-
return ValueComparisonHelper.checkEquals(EntryEventImpl.deserialize(valueToBePut), result,
3168-
isCompressedOffHeap, getCache());
3169-
}
3170-
return ValueComparisonHelper.checkEquals(valueToBePut, result, isCompressedOffHeap, getCache());
31713135
}
31723136

31733137
/**
@@ -10626,15 +10590,7 @@ public Object putIfAbsent(Object key, Object value, Object callbackArgument) {
1062610590
final boolean ifOld = false;
1062710591
final boolean requireOldValue = true;
1062810592
if (!basicPut(event, ifNew, ifOld, oldValue, requireOldValue)) {
10629-
Object result = event.getOldValue();
10630-
if (event.isPossibleDuplicate() && putIfAbsentResultHasSameValue(false, value, result)) {
10631-
if (logger.isDebugEnabled()) {
10632-
logger.debug("possible duplicate putIfAbsent event and result is the value to be put,"
10633-
+ " treat this as a successful putIfAbsent");
10634-
}
10635-
return null;
10636-
}
10637-
return result;
10593+
return event.getOldValue();
1063810594
} else {
1063910595
if (!getDataView().isDeferredStats()) {
1064010596
getCachePerfStats().endPut(startPut, false);
@@ -10835,6 +10791,7 @@ public Object basicBridgePutIfAbsent(final Object key, Object value, boolean isO
1083510791

1083610792
// if this is a replayed operation we may already have a version tag
1083710793
event.setVersionTag(clientEvent.getVersionTag());
10794+
event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
1083810795

1083910796
// Set the new value to the input byte[] if it isn't null
1084010797
if (value != null) {
@@ -10873,19 +10830,6 @@ public Object basicBridgePutIfAbsent(final Object key, Object value, boolean isO
1087310830
clientEvent.setVersionTag(event.getVersionTag());
1087410831
clientEvent.isConcurrencyConflict(event.isConcurrencyConflict());
1087510832
} else {
10876-
if (value != null) {
10877-
assert (value instanceof byte[]);
10878-
}
10879-
if (event.isPossibleDuplicate()
10880-
&& bridgePutIfAbsentResultHasSameValue((byte[]) value, isObject, oldValue)) {
10881-
// result is possibly due to the retry
10882-
if (logger.isDebugEnabled()) {
10883-
logger.debug("retried putIfAbsent and got oldValue as the value to be put,"
10884-
+ " treat this as a successful putIfAbsent");
10885-
}
10886-
return null;
10887-
}
10888-
1088910833
if (oldValue == null) {
1089010834
// fix for 42189, putIfAbsent on server can return null if the
1089110835
// operation was not performed (oldValue in cache was null).

geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ public void stop() {
127127
}
128128
}
129129

130+
@Override
131+
public void clear() {
132+
recordedEvents.clear();
133+
recordedBulkOps.clear();
134+
recordedBulkOpVersionTags.clear();
135+
}
136+
130137
@Override
131138
public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() {
132139
Map<ThreadIdentifier, EventSequenceNumberHolder> result = new HashMap<>(recordedEvents.size());

geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,9 @@ void recordState(InternalDistributedMember provider,
133133
*/
134134
boolean isInitialImageProvider(DistributedMember mbr);
135135

136+
/**
137+
* clear the tracker
138+
*/
139+
void clear();
140+
136141
}

geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public void stop() {
4949

5050
}
5151

52+
@Override
53+
public void clear() {
54+
// nothing to clear
55+
}
56+
5257
@Override
5358
public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() {
5459
return null;

geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.Set;
2020

21+
import org.apache.logging.log4j.Logger;
22+
2123
import org.apache.geode.cache.CacheWriter;
2224
import org.apache.geode.cache.DiskAccessException;
2325
import org.apache.geode.cache.Operation;
@@ -27,22 +29,27 @@
2729
import org.apache.geode.internal.cache.RegionClearedException;
2830
import org.apache.geode.internal.cache.RegionEntry;
2931
import org.apache.geode.internal.cache.Token;
32+
import org.apache.geode.internal.cache.ValueComparisonHelper;
3033
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
3134
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
3235
import org.apache.geode.internal.cache.versions.VersionTag;
3336
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
37+
import org.apache.geode.internal.logging.LogService;
3438
import org.apache.geode.internal.offheap.OffHeapHelper;
3539
import org.apache.geode.internal.offheap.ReferenceCountHelper;
3640
import org.apache.geode.internal.offheap.annotations.Released;
3741
import org.apache.geode.internal.offheap.annotations.Unretained;
3842
import org.apache.geode.internal.sequencelog.EntryLogger;
3943

4044
public class RegionMapPut extends AbstractRegionMapPut {
45+
protected static final Logger logger = LogService.getLogger();
46+
4147
private final CacheModificationLock cacheModificationLock;
4248
private final EntryEventSerialization entryEventSerialization;
4349
private final boolean ifNew;
4450
private final boolean ifOld;
4551
private final boolean overwriteDestroyed;
52+
private boolean overwritePutIfAbsent;
4653
private final boolean requireOldValue;
4754
private final boolean retrieveOldValueForDelta;
4855
private final boolean replaceOnClient;
@@ -104,6 +111,10 @@ boolean isReplaceOnClient() {
104111
return replaceOnClient;
105112
}
106113

114+
boolean isOverwritePutIfAbsent() {
115+
return overwritePutIfAbsent;
116+
}
117+
107118
boolean isCacheWrite() {
108119
return cacheWrite;
109120
}
@@ -214,6 +225,9 @@ private void setOldValueEvenIfFaultedOut() {
214225
protected void unsetOldValueForDelta() {
215226
OffHeapHelper.release(getOldValueForDelta());
216227
setOldValueForDelta(null);
228+
if (isOverwritePutIfAbsent()) {
229+
getEvent().setOldValue(null);
230+
}
217231
}
218232

219233
@Override
@@ -396,12 +410,41 @@ private boolean checkUninitializedRegionPreconditions() {
396410
private boolean checkCreatePreconditions() {
397411
if (isIfNew()) {
398412
if (!getRegionEntry().isDestroyedOrRemoved()) {
413+
// retain the version stamp of the existing entry for use in processing failures
414+
EntryEventImpl event = getEvent();
415+
if (getOwner().getConcurrencyChecksEnabled() &&
416+
event.getOperation() == Operation.PUT_IF_ABSENT &&
417+
!event.hasValidVersionTag() &&
418+
event.isPossibleDuplicate()) {
419+
Object retainedValue = getRegionEntry().getValueRetain(getOwner());
420+
try {
421+
if (ValueComparisonHelper.checkEquals(retainedValue,
422+
getEvent().getRawNewValue(),
423+
isCompressedOffHeap(event), getOwner().getCache())) {
424+
if (logger.isDebugEnabled()) {
425+
logger.debug("retried putIfAbsent found same value already in cache "
426+
+ "- allowing the operation. entry={}; event={}", getRegionEntry(),
427+
getEvent());
428+
}
429+
this.overwritePutIfAbsent = true;
430+
return true;
431+
}
432+
} finally {
433+
OffHeapHelper.release(retainedValue);
434+
}
435+
}
399436
return false;
400437
}
401438
}
402439
return true;
403440
}
404441

442+
443+
private boolean isCompressedOffHeap(EntryEventImpl event) {
444+
return event.getRegion().getAttributes().getOffHeap()
445+
&& event.getRegion().getAttributes().getCompressor() != null;
446+
}
447+
405448
private boolean checkExpectedOldValuePrecondition() {
406449
// replace is propagated to server, so no need to check
407450
// satisfiesOldValue on client
@@ -411,7 +454,7 @@ private boolean checkExpectedOldValuePrecondition() {
411454
// We already called setOldValueInEvent so the event will have the old value.
412455
@Unretained
413456
Object v = event.getRawOldValue();
414-
// Note that v will be null instead of INVALID because setOldValue
457+
// Note that v will be null instead of INVALID because setOldValue`
415458
// converts INVALID to null.
416459
// But checkExpectedOldValue handle this and says INVALID equals null.
417460
if (!AbstractRegionEntry.checkExpectedOldValue(getExpectedOldValue(), v, event.getRegion())) {

0 commit comments

Comments
 (0)