Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.data.gemfire.function.annotation.OnMember;
import org.springframework.data.gemfire.function.annotation.OnServer;
import org.springframework.data.gemfire.support.AbstractFactoryBeanSupport;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand Down Expand Up @@ -113,7 +115,14 @@ protected Object invokeFunction(Method method, Object[] args) {
.getMethodMetadata(method)
.getFunctionId();

return getGemfireFunctionOperations().execute(functionId, args);
return methodExecutedOnSingleMember(method)
? getGemfireFunctionOperations().executeAndExtract(functionId, args)
: getGemfireFunctionOperations().execute(functionId, args);
}

protected boolean methodExecutedOnSingleMember(Method method) {
return method.getDeclaringClass().isAnnotationPresent(OnServer.class)
|| method.getDeclaringClass().isAnnotationPresent(OnMember.class);
}

protected Object resolveResult(MethodInvocation invocation, Object result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,40 @@
*/
package org.springframework.data.gemfire.function.execution.onservers;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.apache.geode.StatisticDescriptor;
import org.apache.geode.Statistics;
import org.apache.geode.StatisticsType;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.statistics.StatisticsManager;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.data.gemfire.config.annotation.CacheServerApplication;
import org.springframework.data.gemfire.config.annotation.ClientCacheApplication;
import org.springframework.data.gemfire.function.annotation.GemfireFunction;
import org.springframework.data.gemfire.function.config.EnableGemfireFunctionExecutions;
import org.springframework.data.gemfire.function.config.EnableGemfireFunctions;
import org.springframework.data.gemfire.function.sample.AllServersAdminFunctions;
import org.springframework.data.gemfire.function.sample.Metric;
import org.springframework.data.gemfire.function.sample.SingleServerAdminFunctions;
import org.springframework.data.gemfire.process.ProcessWrapper;
import org.springframework.data.gemfire.test.support.ClientServerIntegrationTestsSupport;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Patrick Johnson
*/
@SuppressWarnings("unused")
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = FunctionsReturnResultsFromAllServersIntegrationTests.GeodeClientConfiguration.class)
@Ignore
@ContextConfiguration(classes = FunctionsReturnResultsFromAllServersIntegrationTests.TestConfiguration.class)
public class FunctionsReturnResultsFromAllServersIntegrationTests extends ClientServerIntegrationTestsSupport {

private static final int PORT_1 = 40407;
private static final int PORT_2 = 40403;
private static final int NUMBER_OF_METRICS = 10;

private static ProcessWrapper gemfireServer1;
private static ProcessWrapper gemfireServer2;
Expand All @@ -72,15 +63,21 @@ public class FunctionsReturnResultsFromAllServersIntegrationTests extends Client
@BeforeClass
public static void startGemFireServer() throws Exception {

gemfireServer1 = run(MetricsFunctionServerProcess.class,
String.format("-D%s=%d", GEMFIRE_CACHE_SERVER_PORT_PROPERTY, PORT_1));
final int port1 = findAvailablePort();

waitForServerToStart(DEFAULT_HOSTNAME, PORT_1);
gemfireServer1 = run(MetricsFunctionServerConfiguration.class,
String.format("-D%s=%d", GEMFIRE_CACHE_SERVER_PORT_PROPERTY, port1));

gemfireServer2 = run(MetricsFunctionServerProcess.class,
String.format("-D%s=%d", GEMFIRE_CACHE_SERVER_PORT_PROPERTY, PORT_2));
waitForServerToStart(DEFAULT_HOSTNAME, port1);

waitForServerToStart(DEFAULT_HOSTNAME, PORT_2);
final int port2 = findAvailablePort();

gemfireServer2 = run(MetricsFunctionServerConfiguration.class,
String.format("-D%s=%d", GEMFIRE_CACHE_SERVER_PORT_PROPERTY, port2));

waitForServerToStart(DEFAULT_HOSTNAME, port2);

System.setProperty(GEMFIRE_POOL_SERVERS_PROPERTY, String.format("%s[%d],%s[%d]", DEFAULT_HOSTNAME, port1, DEFAULT_HOSTNAME, port2));
}

@AfterClass
Expand All @@ -93,81 +90,42 @@ public static void stopGemFireServer() {
public void executeFunctionOnAllServers() {
List<List<Metric>> metrics = allServersAdminFunctions.getAllMetrics();
assertThat(metrics.size()).isEqualTo(2);
assertThat(metrics.get(0).size()).isEqualTo(NUMBER_OF_METRICS);
assertThat(metrics.get(1).size()).isEqualTo(NUMBER_OF_METRICS);
}

@Test
public void executeFunctionOnSingleServer() {
List<Metric> metrics = singleServerAdminFunctions.getAllMetrics();
assertThat(metrics.size()).isEqualTo(672);
assertThat(metrics.size()).isEqualTo(NUMBER_OF_METRICS);
}

@ClientCacheApplication(servers = {
@ClientCacheApplication.Server(port = PORT_1),
@ClientCacheApplication.Server(port = PORT_2)}
)
@ClientCacheApplication
@EnableGemfireFunctionExecutions(basePackageClasses = AllServersAdminFunctions.class)
static class GeodeClientConfiguration { }

static class MetricsFunctionServerProcess {

private static final int DEFAULT_CACHE_SERVER_PORT = 40404;

private static final String CACHE_SERVER_PORT_PROPERTY = "spring.data.gemfire.cache.server.port";
private static final String GEMFIRE_NAME = "MetricsServer" + getCacheServerPort();

public static void main(String[] args) throws Exception {
registerFunctions(startCacheServer(newGemFireCache()));
}

private static Cache newGemFireCache() {

return new CacheFactory()
.set("name", GEMFIRE_NAME)
.create();
}

private static Cache startCacheServer(Cache gemfireCache) throws IOException {
CacheServer cacheServer = gemfireCache.addCacheServer();
cacheServer.setPort(getCacheServerPort());
cacheServer.start();
return gemfireCache;
}
@EnableTransactionManagement
static class TestConfiguration {
}

private static int getCacheServerPort() {
return Integer.getInteger(CACHE_SERVER_PORT_PROPERTY, DEFAULT_CACHE_SERVER_PORT);
}
@CacheServerApplication
@EnableGemfireFunctions
public static class MetricsFunctionServerConfiguration {

private static Cache registerFunctions(Cache gemfireCache) {
public static void main(String[] args) {

FunctionService.registerFunction(new GetAllMetricsFunction());
AnnotationConfigApplicationContext applicationContext =
new AnnotationConfigApplicationContext(MetricsFunctionServerConfiguration.class);

return gemfireCache;
applicationContext.registerShutdownHook();
}
}

static class GetAllMetricsFunction implements Function<List<Metric>> {

private final InternalDistributedSystem system =
(InternalDistributedSystem) CacheFactory.getAnyInstance().getDistributedSystem();

@Override
public void execute(FunctionContext context) {
@GemfireFunction(id = "GetAllMetricsFunction", hasResult = true)
public List<Metric> getMetrics() {
List<Metric> allMetrics = new ArrayList<>();
StatisticsManager statisticsManager = system.getStatisticsManager();
for (Statistics statistics : statisticsManager.getStatsList()) {
StatisticsType statisticsType = statistics.getType();
for (StatisticDescriptor descriptor : statisticsType.getStatistics()) {
String statName = descriptor.getName();
Metric metric = new Metric(statName, statistics.get(statName), statisticsType.getName(), statistics.getTextId());
allMetrics.add(metric);
}
for (int i = 0; i < NUMBER_OF_METRICS; i++) {
Metric metric = new Metric("statName" + i, i, "statCat" + i, "statType" + i);
allMetrics.add(metric);
}
context.getResultSender().lastResult(allMetrics);
}

@Override
public String getId() {
return getClass().getSimpleName();
return allMetrics;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*
*/
package org.springframework.data.gemfire.function.execution.onservers;
package org.springframework.data.gemfire.function.sample;

import org.springframework.data.gemfire.function.annotation.FunctionId;
import org.springframework.data.gemfire.function.annotation.OnServers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*
*/
package org.springframework.data.gemfire.function.execution.onservers;
package org.springframework.data.gemfire.function.sample;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*
*/
package org.springframework.data.gemfire.function.execution.onservers;
package org.springframework.data.gemfire.function.sample;

import org.springframework.data.gemfire.function.annotation.FunctionId;
import org.springframework.data.gemfire.function.annotation.OnServer;
Expand Down