Skip to content

Commit

Permalink
GEODE-9204: thread hung waiting for response (#6426)
Browse files Browse the repository at this point in the history
Co-authored-by: Bruce Schuchardt <[email protected]>
  • Loading branch information
kamilla1201 and bschuchardt authored Sep 20, 2021
1 parent b07f320 commit 19f55ad
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;

import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.Properties;

import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.DistributedRule;

public class BadCacheLoaderDUnitTest implements Serializable {
public static final String TEST_REGION = "testRegion";
public static final String TEST_KEY = "testKey";
public static final String TEST_VALUE = "testValue";

static class NotSerializableTestException extends RuntimeException {
Object unserializableField = new Object();
}

@Rule
public DistributedRule distributedRule = new DistributedRule(2);

@Rule
public CacheRule cacheRule = CacheRule.builder().build();

/**
* Ensure that a cache loader throwing an exception that is not serializable is handled
* correctly
*/
@Test
public void testNonSerializableObjectReturnedByCacheLoader() throws Exception {
final VM cacheLoaderVM = VM.getVM(0);
final VM fetchingVM = VM.getVM(1);

final Properties properties = new Properties();
properties.put(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
NotSerializableTestException.class.getName());

cacheLoaderVM.invoke("create a region with a bad cache loader",
() -> createRegionWithBadCacheLoader(properties));

Assertions.assertThatThrownBy(() -> fetchingVM.invoke("fetch something from the cache",
() -> fetchValueCausingCacheLoad(properties)))
.hasCauseInstanceOf(InternalGemFireException.class)
.hasRootCauseInstanceOf(NotSerializableException.class)
.hasRootCauseMessage("java.lang.Object");
}

private void fetchValueCausingCacheLoad(Properties properties) {
final Cache cache = cacheRule.getOrCreateCache(properties);
final Region<String, Object> testRegion =
cache.<String, Object>createRegionFactory(RegionShortcut.PARTITION)
.setCacheLoader(helper -> new Object())
.create(TEST_REGION);
testRegion.getAttributesMutator().setCacheLoader(helper -> "should not be invoked");
testRegion.get(TEST_KEY);
}

private void createRegionWithBadCacheLoader(Properties properties) {
final Cache cache = cacheRule.getOrCreateCache(properties);
final Region<String, Object> testRegion =
cache.<String, Object>createRegionFactory(RegionShortcut.PARTITION)
.setCacheLoader(helper -> {
throw new NotSerializableTestException();
})
.create(TEST_REGION);
testRegion.put(TEST_KEY, TEST_VALUE);
testRegion.destroy(TEST_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ fromData,52
toData,48

org/apache/geode/distributed/internal/ReplyMessage,2
fromData,102
toData,133
fromData,129
toData,277

org/apache/geode/distributed/internal/SerialAckedMessage,2
fromData,28
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.NotSerializableException;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
Expand Down Expand Up @@ -226,9 +227,9 @@ public ReplyException getException() {
if (this.returnValueIsException) {
ReplyException exception = (ReplyException) this.returnValue;
if (exception != null) {
InternalDistributedMember sendr = getSender();
if (sendr != null) {
exception.setSenderIfNull(sendr);
InternalDistributedMember sender = getSender();
if (sender != null) {
exception.setSenderIfNull(sender);
}
}
return exception;
Expand Down Expand Up @@ -273,6 +274,19 @@ public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);

final HeapDataOutputStream hdos =
new HeapDataOutputStream(context.getSerializationVersion());
boolean failedSerialization = false;
if (this.returnValueIsException || this.returnValue != null) {
try {
context.getSerializer().writeObject(this.returnValue, hdos);
} catch (NotSerializableException e) {
logger.warn("Unable to serialize a reply to " + getRecipientsDescription(), e);
failedSerialization = true;
this.returnValue = new ReplyException(e);
this.returnValueIsException = true;
}
}
byte status = 0;
if (this.ignored) {
status |= IGNORED_FLAG;
Expand All @@ -296,8 +310,13 @@ public void toData(DataOutput out,
out.writeInt(processorId);
}
if (this.returnValueIsException || this.returnValue != null) {
DataSerializer.writeObject(this.returnValue, out);
if (failedSerialization) {
context.getSerializer().writeObject(this.returnValue, out);
} else {
hdos.sendTo(out);
}
}
hdos.close();
}

@Override
Expand All @@ -311,10 +330,11 @@ public void fromData(DataInput in,
this.processorId = in.readInt();
}
if (testFlag(status, EXCEPTION_FLAG)) {
this.returnValue = DataSerializer.readObject(in);
this.returnValue = context.getDeserializer().readObject(in);
this.returnValueIsException = true;
} else if (testFlag(status, OBJECT_FLAG)) {
this.returnValue = DataSerializer.readObject(in);
this.returnValue = context.getDeserializer().readObject(in);
this.returnValueIsException = (returnValue instanceof ReplyException);
}
this.internal = testFlag(status, INTERNAL_FLAG);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.distributed.internal;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.NotSerializableException;

import org.junit.Test;

import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;

public class ReplyMessageTest {
@Test
public void replyMessageCanSerializeWithNonSerializableValue() throws Exception {
final ReplyMessage replyMessage = new ReplyMessage();
replyMessage.setReturnValue(new Object());
final HeapDataOutputStream heapDataOutputStream = new HeapDataOutputStream(1000);
InternalDataSerializer.getDSFIDSerializer().invokeToData(replyMessage, heapDataOutputStream);
final byte[] bytes = heapDataOutputStream.toByteArray();
final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
final ReplyMessage newReplyMessage = new ReplyMessage();
InternalDataSerializer.getDSFIDSerializer().invokeFromData(newReplyMessage, dataInputStream);
assertThat(newReplyMessage.getException()).hasCauseInstanceOf(
NotSerializableException.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package org.apache.geode.internal.cache.partitioned;

import static org.mockito.Mockito.mock;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
Expand All @@ -26,7 +24,6 @@
import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
import org.apache.geode.internal.cache.OldValueImporterTestBase;
import org.apache.geode.internal.cache.partitioned.PutMessage.PutReplyMessage;
import org.apache.geode.internal.serialization.DeserializationContext;

public class PutPutReplyMessageJUnitTest extends OldValueImporterTestBase {

Expand All @@ -48,7 +45,8 @@ protected void toData(OldValueImporter ovi, HeapDataOutputStream hdos) throws IO
@Override
protected void fromData(OldValueImporter ovi, byte[] bytes)
throws IOException, ClassNotFoundException {
((PutReplyMessage) ovi).fromData(new DataInputStream(new ByteArrayInputStream(bytes)), mock(
DeserializationContext.class));
final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
((PutReplyMessage) ovi).fromData(dataInputStream,
InternalDataSerializer.createDeserializationContext(dataInputStream));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package org.apache.geode.internal.cache.tx;

import static org.mockito.Mockito.mock;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
Expand All @@ -26,7 +24,6 @@
import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
import org.apache.geode.internal.cache.OldValueImporterTestBase;
import org.apache.geode.internal.cache.tx.RemotePutMessage.PutReplyMessage;
import org.apache.geode.internal.serialization.DeserializationContext;

public class RemotePutReplyMessageJUnitTest extends OldValueImporterTestBase {

Expand All @@ -48,7 +45,8 @@ protected void toData(OldValueImporter ovi, HeapDataOutputStream hdos) throws IO
@Override
protected void fromData(OldValueImporter ovi, byte[] bytes)
throws IOException, ClassNotFoundException {
((PutReplyMessage) ovi).fromData(new DataInputStream(new ByteArrayInputStream(bytes)), mock(
DeserializationContext.class));
final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
((PutReplyMessage) ovi).fromData(dataInputStream,
InternalDataSerializer.createDeserializationContext(dataInputStream));
}
}

0 comments on commit 19f55ad

Please sign in to comment.