Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit 5c5fc5f

Browse files
author
Patrick Johnson
committed
DATAGEODE-295 - Functions return results from all servers.
1 parent 638eb97 commit 5c5fc5f

File tree

6 files changed

+291
-7
lines changed

6 files changed

+291
-7
lines changed

spring-data-geode/src/main/java/org/springframework/data/gemfire/function/execution/AbstractFunctionExecution.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
*
3535
* @author David Turanski
3636
* @author John Blum
37+
* @author Patrick Johnson
3738
*/
3839
@SuppressWarnings("unused")
3940
abstract class AbstractFunctionExecution {
@@ -137,7 +138,11 @@ <T> Iterable<T> execute(Boolean returnResult) {
137138
}
138139
}
139140
else {
140-
results = (Iterable<T>) resultCollector.getResult();
141+
if(resultCollector.getResult() instanceof Iterable) {
142+
results = (Iterable<T>) resultCollector.getResult();
143+
} else {
144+
results = (Iterable<T>) Collections.singleton(resultCollector.getResult());
145+
}
141146
}
142147

143148
return replaceSingletonNullCollectionWithEmptyList(results);

spring-data-geode/src/main/java/org/springframework/data/gemfire/function/execution/GemfireFunctionProxyFactoryBean.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
/*
2-
<<<<<<< Updated upstream
32
* Copyright 2002-2020 the original author or authors.
4-
=======
5-
* Copyright 2002-2020 the original author or authors.
6-
>>>>>>> Stashed changes
73
*
84
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
95
* the License. You may obtain a copy of the License at
@@ -27,6 +23,7 @@
2723
import org.springframework.aop.support.AopUtils;
2824
import org.springframework.beans.factory.BeanClassLoaderAware;
2925
import org.springframework.beans.factory.FactoryBean;
26+
import org.springframework.data.gemfire.function.annotation.OnServers;
3027
import org.springframework.util.Assert;
3128
import org.springframework.util.ClassUtils;
3229

@@ -35,6 +32,7 @@
3532
*
3633
* @author David Turanski
3734
* @author John Blum
35+
* @author Patrick Johnson
3836
* @see java.lang.reflect.Method
3937
* @see org.aopalliance.intercept.MethodInterceptor
4038
* @see org.springframework.beans.factory.BeanClassLoaderAware
@@ -98,8 +96,13 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
9896

9997
protected Object invokeFunction(Method method, Object[] args) {
10098

101-
return this.gemfireFunctionOperations
102-
.executeAndExtract(this.methodMetadata.getMethodMetadata(method).getFunctionId(), args);
99+
if(method.getDeclaringClass().isAnnotationPresent(OnServers.class)) {
100+
return this.gemfireFunctionOperations
101+
.execute(this.methodMetadata.getMethodMetadata(method).getFunctionId(), args);
102+
} else {
103+
return this.gemfireFunctionOperations
104+
.executeAndExtract(this.methodMetadata.getMethodMetadata(method).getFunctionId(), args);
105+
}
103106
}
104107

105108
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2016-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package org.springframework.data.gemfire.function.execution.onservers;
18+
19+
import org.springframework.data.gemfire.function.annotation.FunctionId;
20+
import org.springframework.data.gemfire.function.annotation.OnServers;
21+
22+
import java.util.List;
23+
24+
/**
25+
* @author Patrick Johnson
26+
*/
27+
@OnServers
28+
public interface AllServersAdminFunctions {
29+
30+
@FunctionId("GetAllMetricsFunction")
31+
List<List<Metric>> getAllMetrics();
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2016-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package org.springframework.data.gemfire.function.execution.onservers;
18+
19+
import org.apache.geode.StatisticDescriptor;
20+
import org.apache.geode.Statistics;
21+
import org.apache.geode.StatisticsType;
22+
import org.apache.geode.cache.Cache;
23+
import org.apache.geode.cache.CacheFactory;
24+
import org.apache.geode.cache.execute.Function;
25+
import org.apache.geode.cache.execute.FunctionContext;
26+
import org.apache.geode.cache.execute.FunctionService;
27+
import org.apache.geode.cache.server.CacheServer;
28+
import org.apache.geode.distributed.internal.InternalDistributedSystem;
29+
import org.apache.geode.internal.statistics.StatisticsManager;
30+
import org.junit.AfterClass;
31+
import org.junit.BeforeClass;
32+
import org.junit.Test;
33+
import org.junit.runner.RunWith;
34+
import org.springframework.beans.factory.annotation.Autowired;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.data.gemfire.config.annotation.ClientCacheApplication;
37+
import org.springframework.data.gemfire.function.config.EnableGemfireFunctionExecutions;
38+
import org.springframework.data.gemfire.process.ProcessWrapper;
39+
import org.springframework.data.gemfire.test.support.ClientServerIntegrationTestsSupport;
40+
import org.springframework.data.gemfire.transaction.config.EnableGemfireCacheTransactions;
41+
import org.springframework.test.context.ContextConfiguration;
42+
import org.springframework.test.context.junit4.SpringRunner;
43+
44+
import java.io.IOException;
45+
import java.util.ArrayList;
46+
import java.util.List;
47+
48+
import static org.assertj.core.api.Assertions.assertThat;
49+
50+
/**
51+
* @author Patrick Johnson
52+
*/
53+
@SuppressWarnings("unused")
54+
@RunWith(SpringRunner.class)
55+
@ContextConfiguration(classes = FunctionsReturnResultsFromAllServersIntegrationTests.Config.class)
56+
public class FunctionsReturnResultsFromAllServersIntegrationTests extends ClientServerIntegrationTestsSupport {
57+
58+
private static final int PORT_1 = 40407;
59+
private static final int PORT_2 = 40403;
60+
61+
private static ProcessWrapper gemfireServer1;
62+
private static ProcessWrapper gemfireServer2;
63+
64+
@Autowired
65+
private AllServersAdminFunctions allServersAdminFunctions;
66+
67+
@Autowired
68+
private SingleServerAdminFunctions singleServerAdminFunctions;
69+
70+
@BeforeClass
71+
public static void startGemFireServer() throws Exception {
72+
73+
gemfireServer1 = run(MetricsFunctionServerProcess.class,
74+
String.format("-D%s=%d", GEMFIRE_CACHE_SERVER_PORT_PROPERTY, PORT_1));
75+
76+
waitForServerToStart(DEFAULT_HOSTNAME, PORT_1);
77+
78+
gemfireServer2 = run(MetricsFunctionServerProcess.class,
79+
String.format("-D%s=%d", GEMFIRE_CACHE_SERVER_PORT_PROPERTY, PORT_2));
80+
81+
waitForServerToStart(DEFAULT_HOSTNAME, PORT_2);
82+
}
83+
84+
@ClientCacheApplication(servers = {@ClientCacheApplication.Server(port = PORT_1), @ClientCacheApplication.Server(port = PORT_2)})
85+
@Configuration
86+
@EnableGemfireFunctionExecutions(basePackageClasses = AllServersAdminFunctions.class)
87+
@EnableGemfireCacheTransactions
88+
static class Config { }
89+
90+
@AfterClass
91+
public static void stopGemFireServer() {
92+
stop(gemfireServer1);
93+
stop(gemfireServer2);
94+
}
95+
96+
@Test
97+
public void executeFunctionOnAllServers() {
98+
List<List<Metric>> metrics = allServersAdminFunctions.getAllMetrics();
99+
assertThat(metrics.size()).isEqualTo(2);
100+
}
101+
102+
@Test
103+
public void executeFunctionOnSingleServer() {
104+
List<Metric> metrics = singleServerAdminFunctions.getAllMetrics();
105+
assertThat(metrics.size()).isEqualTo(672);
106+
}
107+
108+
static class MetricsFunctionServerProcess {
109+
110+
private static final int DEFAULT_CACHE_SERVER_PORT = 40404;
111+
112+
private static final String CACHE_SERVER_PORT_PROPERTY = "spring.data.gemfire.cache.server.port";
113+
private static final String GEMFIRE_LOG_LEVEL = "error";
114+
private static final String GEMFIRE_NAME = "MetricsServer" + getCacheServerPort();
115+
116+
public static void main(String[] args) throws Exception {
117+
registerFunctions(startCacheServer(newGemFireCache()));
118+
}
119+
120+
private static Cache newGemFireCache() {
121+
122+
return new CacheFactory()
123+
.set("name", GEMFIRE_NAME)
124+
.set("log-level", GEMFIRE_LOG_LEVEL)
125+
.create();
126+
}
127+
128+
private static Cache startCacheServer(Cache gemfireCache) throws IOException {
129+
CacheServer cacheServer = gemfireCache.addCacheServer();
130+
cacheServer.setPort(getCacheServerPort());
131+
cacheServer.start();
132+
return gemfireCache;
133+
}
134+
135+
private static int getCacheServerPort() {
136+
return Integer.getInteger(CACHE_SERVER_PORT_PROPERTY, DEFAULT_CACHE_SERVER_PORT);
137+
}
138+
139+
private static Cache registerFunctions(Cache gemfireCache) {
140+
141+
FunctionService.registerFunction(new GetAllMetricsFunction());
142+
143+
return gemfireCache;
144+
}
145+
}
146+
147+
static class GetAllMetricsFunction implements Function<List<Metric>> {
148+
149+
private final InternalDistributedSystem system =
150+
(InternalDistributedSystem) CacheFactory.getAnyInstance().getDistributedSystem();
151+
152+
@Override
153+
public void execute(FunctionContext context) {
154+
List<Metric> allMetrics = new ArrayList<>();
155+
StatisticsManager statisticsManager = system.getStatisticsManager();
156+
for (Statistics statistics : statisticsManager.getStatsList()) {
157+
StatisticsType statisticsType = statistics.getType();
158+
for (StatisticDescriptor descriptor : statisticsType.getStatistics()) {
159+
String statName = descriptor.getName();
160+
Metric metric = new Metric(statName, statistics.get(statName), statisticsType.getName(), statistics.getTextId());
161+
allMetrics.add(metric);
162+
}
163+
}
164+
context.getResultSender().lastResult(allMetrics);
165+
}
166+
167+
@Override
168+
public String getId() {
169+
return getClass().getSimpleName();
170+
}
171+
}
172+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2016-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package org.springframework.data.gemfire.function.execution.onservers;
18+
19+
import java.io.Serializable;
20+
21+
/**
22+
* @author Patrick Johnson
23+
*/
24+
public class Metric implements Serializable {
25+
26+
private String name;
27+
28+
private Number value;
29+
30+
private String category;
31+
32+
private String type;
33+
34+
public Metric(String name, Number value, String category, String type) {
35+
this.name = name;
36+
this.value = value;
37+
this.category = category;
38+
this.type = type;
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2016-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package org.springframework.data.gemfire.function.execution.onservers;
18+
19+
import org.springframework.data.gemfire.function.annotation.FunctionId;
20+
import org.springframework.data.gemfire.function.annotation.OnServer;
21+
22+
import java.util.List;
23+
24+
/**
25+
* @author Patrick Johnson
26+
*/
27+
@OnServer
28+
public interface SingleServerAdminFunctions {
29+
30+
@FunctionId("GetAllMetricsFunction")
31+
List<Metric> getAllMetrics();
32+
}

0 commit comments

Comments
 (0)