Skip to content

Commit e0c29b1

Browse files
committed
GEODE-6195 putIfAbsent may get a returned value caused by the same operation due to retry
Moving the check for a retried putIfAbsent to be under the RegionEntry lock. This allows the operation to propagate throughout the cluster and allows the client to receive a valid version tag, if concurrency checks are enabled. (cherry-picked from 2b2fda6)
1 parent 6b05cae commit e0c29b1

File tree

8 files changed

+173
-209
lines changed

8 files changed

+173
-209
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the License
11+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
* or implied. See the License for the specific language governing permissions and limitations under
13+
* the License.
14+
*/
15+
package org.apache.geode.cache;
16+
17+
import static org.apache.geode.internal.Assert.assertTrue;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
21+
import org.junit.After;
22+
import org.junit.Before;
23+
import org.junit.Rule;
24+
import org.junit.Test;
25+
import org.junit.rules.ExpectedException;
26+
27+
import org.apache.geode.cache.query.CacheUtils;
28+
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
29+
import org.apache.geode.internal.cache.EventID;
30+
import org.apache.geode.internal.cache.EventIDHolder;
31+
import org.apache.geode.internal.cache.LocalRegion;
32+
import org.apache.geode.internal.cache.VMCachedDeserializable;
33+
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
34+
35+
public class RetryPutIfAbsentIntegrationTest {
36+
37+
Cache cache;
38+
39+
@Rule
40+
public ExpectedException expectedException = ExpectedException.none();
41+
42+
@Test
43+
public void duplicatePutIfAbsentIsAccepted() {
44+
final String key = "mykey";
45+
final String value = "myvalue";
46+
47+
LocalRegion myregion =
48+
(LocalRegion) CacheUtils.getCache().createRegionFactory(RegionShortcut.REPLICATE)
49+
.setConcurrencyChecksEnabled(true).create("myregion");
50+
51+
ClientProxyMembershipID id =
52+
new ClientProxyMembershipID(new InternalDistributedMember("localhost", 1));
53+
EventIDHolder clientEvent = new EventIDHolder(new EventID(new byte[] {1, 2, 3, 4, 5}, 1, 1));
54+
clientEvent.setRegion(myregion);
55+
byte[] valueBytes = new VMCachedDeserializable("myvalue", 7).getSerializedValue();
56+
System.out.println("first putIfAbsent");
57+
Object oldValue =
58+
myregion.basicBridgePutIfAbsent("mykey", valueBytes, true, null, id, true, clientEvent);
59+
assertEquals(null, oldValue);
60+
assertTrue(myregion.containsKey(key));
61+
62+
myregion.getEventTracker().clear();
63+
64+
clientEvent = new EventIDHolder(new EventID(new byte[] {1, 2, 3, 4, 5}, 1, 1));
65+
clientEvent.setRegion(myregion);
66+
clientEvent.setPossibleDuplicate(true);
67+
clientEvent.setOperation(Operation.PUT_IF_ABSENT);
68+
assertFalse(myregion.getEventTracker().hasSeenEvent(clientEvent));
69+
70+
System.out.println("second putIfAbsent");
71+
oldValue =
72+
myregion.basicBridgePutIfAbsent("mykey", valueBytes, true, null, id, true, clientEvent);
73+
assertEquals(null, oldValue);
74+
}
75+
76+
@Before
77+
public void setUp() throws Exception {
78+
CacheUtils.startCache();
79+
cache = CacheUtils.getCache();
80+
}
81+
82+
@After
83+
public void tearDown() throws Exception {
84+
CacheUtils.closeCache();
85+
}
86+
87+
}

geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java

Lines changed: 4 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3088,42 +3088,8 @@ void serverPut(EntryEventImpl event, boolean requireOldValue, Object expectedOld
30883088

30893089
void checkPutIfAbsentResult(EntryEventImpl event, Object value, Object result) {
30903090
if (result != null) {
3091-
// we may see a non null result possibly due to retry
3092-
if (event.hasRetried() && putIfAbsentResultHasSameValue(true, value, result)) {
3093-
if (logger.isDebugEnabled()) {
3094-
logger.debug("retried putIfAbsent and result is the value to be put,"
3095-
+ " treat as a successful putIfAbsent");
3096-
}
3097-
} else {
3098-
// customers don't see this exception
3099-
throw new EntryNotFoundException("entry existed for putIfAbsent");
3100-
}
3101-
}
3102-
}
3103-
3104-
boolean putIfAbsentResultHasSameValue(boolean isClient, Object valueToBePut, Object result) {
3105-
if (Token.isInvalid(result) || result == null) {
3106-
return valueToBePut == null;
3107-
}
3108-
3109-
boolean isCompressedOffHeap =
3110-
isClient ? false : getAttributes().getOffHeap() && getAttributes().getCompressor() != null;
3111-
return ValueComparisonHelper.checkEquals(valueToBePut, result, isCompressedOffHeap, getCache());
3112-
}
3113-
3114-
boolean bridgePutIfAbsentResultHasSameValue(byte[] valueToBePut, boolean isValueToBePutObject,
3115-
Object result) {
3116-
if (Token.isInvalid(result) || result == null) {
3117-
return valueToBePut == null;
3118-
}
3119-
3120-
boolean isCompressedOffHeap =
3121-
getAttributes().getOffHeap() && getAttributes().getCompressor() != null;
3122-
if (isValueToBePutObject) {
3123-
return ValueComparisonHelper.checkEquals(EntryEventImpl.deserialize(valueToBePut), result,
3124-
isCompressedOffHeap, getCache());
3091+
throw new EntryNotFoundException("entry existed for putIfAbsent");
31253092
}
3126-
return ValueComparisonHelper.checkEquals(valueToBePut, result, isCompressedOffHeap, getCache());
31273093
}
31283094

31293095
/**
@@ -10923,22 +10889,14 @@ public Object putIfAbsent(Object key, Object value, Object callbackArgument) {
1092310889

1092410890
try {
1092510891
if (generateEventID()) {
10926-
event.setNewEventId(this.cache.getDistributedSystem());
10892+
event.setNewEventId(cache.getDistributedSystem());
1092710893
}
1092810894
final Object oldValue = null;
1092910895
final boolean ifNew = true;
1093010896
final boolean ifOld = false;
1093110897
final boolean requireOldValue = true;
1093210898
if (!basicPut(event, ifNew, ifOld, oldValue, requireOldValue)) {
10933-
Object result = event.getOldValue();
10934-
if (event.isPossibleDuplicate() && putIfAbsentResultHasSameValue(false, value, result)) {
10935-
if (logger.isDebugEnabled()) {
10936-
logger.debug("possible duplicate putIfAbsent event and result is the value to be put,"
10937-
+ " treat this as a successful putIfAbsent");
10938-
}
10939-
return null;
10940-
}
10941-
return result;
10899+
return event.getOldValue();
1094210900
} else {
1094310901
if (!getDataView().isDeferredStats()) {
1094410902
getCachePerfStats().endPut(startPut, false);
@@ -11138,6 +11096,7 @@ public Object basicBridgePutIfAbsent(final Object key, Object value, boolean isO
1113811096

1113911097
// if this is a replayed operation we may already have a version tag
1114011098
event.setVersionTag(clientEvent.getVersionTag());
11099+
event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
1114111100

1114211101
// Set the new value to the input byte[] if it isn't null
1114311102
if (value != null) {
@@ -11176,19 +11135,6 @@ public Object basicBridgePutIfAbsent(final Object key, Object value, boolean isO
1117611135
clientEvent.setVersionTag(event.getVersionTag());
1117711136
clientEvent.isConcurrencyConflict(event.isConcurrencyConflict());
1117811137
} else {
11179-
if (value != null) {
11180-
assert (value instanceof byte[]);
11181-
}
11182-
if (event.isPossibleDuplicate()
11183-
&& bridgePutIfAbsentResultHasSameValue((byte[]) value, isObject, oldValue)) {
11184-
// result is possibly due to the retry
11185-
if (logger.isDebugEnabled()) {
11186-
logger.debug("retried putIfAbsent and got oldValue as the value to be put,"
11187-
+ " treat this as a successful putIfAbsent");
11188-
}
11189-
return null;
11190-
}
11191-
1119211138
if (oldValue == null) {
1119311139
// fix for 42189, putIfAbsent on server can return null if the
1119411140
// operation was not performed (oldValue in cache was null).

geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ public void stop() {
127127
}
128128
}
129129

130+
@Override
131+
public void clear() {
132+
recordedEvents.clear();
133+
recordedBulkOps.clear();
134+
recordedBulkOpVersionTags.clear();
135+
}
136+
130137
@Override
131138
public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() {
132139
Map<ThreadIdentifier, EventSequenceNumberHolder> result = new HashMap<>(recordedEvents.size());

geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,9 @@ void recordState(InternalDistributedMember provider,
133133
*/
134134
boolean isInitialImageProvider(DistributedMember mbr);
135135

136+
/**
137+
* clear the tracker
138+
*/
139+
void clear();
140+
136141
}

geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public void stop() {
4949

5050
}
5151

52+
@Override
53+
public void clear() {
54+
// nothing to clear
55+
}
56+
5257
@Override
5358
public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() {
5459
return null;

geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.Set;
2020

21+
import org.apache.logging.log4j.Logger;
22+
2123
import org.apache.geode.cache.CacheWriter;
2224
import org.apache.geode.cache.DiskAccessException;
2325
import org.apache.geode.cache.Operation;
@@ -27,22 +29,27 @@
2729
import org.apache.geode.internal.cache.RegionClearedException;
2830
import org.apache.geode.internal.cache.RegionEntry;
2931
import org.apache.geode.internal.cache.Token;
32+
import org.apache.geode.internal.cache.ValueComparisonHelper;
3033
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
3134
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
3235
import org.apache.geode.internal.cache.versions.VersionTag;
3336
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
37+
import org.apache.geode.internal.logging.LogService;
3438
import org.apache.geode.internal.offheap.OffHeapHelper;
3539
import org.apache.geode.internal.offheap.ReferenceCountHelper;
3640
import org.apache.geode.internal.offheap.annotations.Released;
3741
import org.apache.geode.internal.offheap.annotations.Unretained;
3842
import org.apache.geode.internal.sequencelog.EntryLogger;
3943

4044
public class RegionMapPut extends AbstractRegionMapPut {
45+
protected static final Logger logger = LogService.getLogger();
46+
4147
private final CacheModificationLock cacheModificationLock;
4248
private final EntryEventSerialization entryEventSerialization;
4349
private final boolean ifNew;
4450
private final boolean ifOld;
4551
private final boolean overwriteDestroyed;
52+
private boolean overwritePutIfAbsent;
4653
private final boolean requireOldValue;
4754
private final boolean retrieveOldValueForDelta;
4855
private final boolean replaceOnClient;
@@ -104,6 +111,10 @@ boolean isReplaceOnClient() {
104111
return replaceOnClient;
105112
}
106113

114+
boolean isOverwritePutIfAbsent() {
115+
return overwritePutIfAbsent;
116+
}
117+
107118
boolean isCacheWrite() {
108119
return cacheWrite;
109120
}
@@ -214,6 +225,9 @@ private void setOldValueEvenIfFaultedOut() {
214225
protected void unsetOldValueForDelta() {
215226
OffHeapHelper.release(getOldValueForDelta());
216227
setOldValueForDelta(null);
228+
if (isOverwritePutIfAbsent()) {
229+
getEvent().setOldValue(null);
230+
}
217231
}
218232

219233
@Override
@@ -396,12 +410,41 @@ private boolean checkUninitializedRegionPreconditions() {
396410
private boolean checkCreatePreconditions() {
397411
if (isIfNew()) {
398412
if (!getRegionEntry().isDestroyedOrRemoved()) {
413+
// retain the version stamp of the existing entry for use in processing failures
414+
EntryEventImpl event = getEvent();
415+
if (getOwner().getConcurrencyChecksEnabled() &&
416+
event.getOperation() == Operation.PUT_IF_ABSENT &&
417+
!event.hasValidVersionTag() &&
418+
event.isPossibleDuplicate()) {
419+
Object retainedValue = getRegionEntry().getValueRetain(getOwner());
420+
try {
421+
if (ValueComparisonHelper.checkEquals(retainedValue,
422+
getEvent().getRawNewValue(),
423+
isCompressedOffHeap(event), getOwner().getCache())) {
424+
if (logger.isDebugEnabled()) {
425+
logger.debug("retried putIfAbsent found same value already in cache "
426+
+ "- allowing the operation. entry={}; event={}", getRegionEntry(),
427+
getEvent());
428+
}
429+
this.overwritePutIfAbsent = true;
430+
return true;
431+
}
432+
} finally {
433+
OffHeapHelper.release(retainedValue);
434+
}
435+
}
399436
return false;
400437
}
401438
}
402439
return true;
403440
}
404441

442+
443+
private boolean isCompressedOffHeap(EntryEventImpl event) {
444+
return event.getRegion().getAttributes().getOffHeap()
445+
&& event.getRegion().getAttributes().getCompressor() != null;
446+
}
447+
405448
private boolean checkExpectedOldValuePrecondition() {
406449
// replace is propagated to server, so no need to check
407450
// satisfiesOldValue on client
@@ -411,7 +454,7 @@ private boolean checkExpectedOldValuePrecondition() {
411454
// We already called setOldValueInEvent so the event will have the old value.
412455
@Unretained
413456
Object v = event.getRawOldValue();
414-
// Note that v will be null instead of INVALID because setOldValue
457+
// Note that v will be null instead of INVALID because setOldValue`
415458
// converts INVALID to null.
416459
// But checkExpectedOldValue handle this and says INVALID equals null.
417460
if (!AbstractRegionEntry.checkExpectedOldValue(getExpectedOldValue(), v, event.getRegion())) {

0 commit comments

Comments
 (0)