Skip to content

Commit 5b612df

Browse files
feat: parallelize requests by default (#102)
1 parent 3466f5a commit 5b612df

File tree

11 files changed

+144
-13
lines changed

11 files changed

+144
-13
lines changed

helm/templates/serviceconfig.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ data:
1717
graphql.urlPath = {{ .Values.serviceConfig.urlPath }}
1818
graphql.corsEnabled = {{ .Values.serviceConfig.corsEnabled }}
1919
graphql.timeout = {{ .Values.serviceConfig.timeoutDuration }}
20+
21+
threads.io.max = {{ .Values.serviceConfig.threads.io }}
22+
threads.request.max = {{ .Values.serviceConfig.threads.request }}
2023
2124
attribute.service = {
2225
host = {{ .Values.serviceConfig.attributeService.host }}

helm/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ serviceConfig:
4646
corsEnabled: true
4747
defaultTenantId: ""
4848
timeoutDuration: 30s
49+
threads:
50+
io: 10
51+
request: 10
4952
attributeService:
5053
host: attribute-service
5154
port: 9012

hypertrace-core-graphql-context/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ dependencies {
1212
implementation(project(":hypertrace-core-graphql-spi"))
1313
implementation("com.google.guava:guava")
1414

15+
annotationProcessor("org.projectlombok:lombok")
16+
compileOnly("org.projectlombok:lombok")
17+
1518
testImplementation("org.junit.jupiter:junit-jupiter")
1619
testImplementation("org.mockito:mockito-core")
1720
testImplementation("org.mockito:mockito-junit-jupiter")
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.hypertrace.core.graphql.context;
2+
3+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4+
5+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
6+
import com.google.inject.Injector;
7+
import graphql.schema.DataFetcher;
8+
import graphql.schema.DataFetchingEnvironment;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
import javax.inject.Inject;
13+
import javax.inject.Singleton;
14+
import lombok.AllArgsConstructor;
15+
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
16+
17+
@Singleton
18+
class AsyncDataFetcherFactory {
19+
20+
private final Injector injector;
21+
private final GraphQlServiceConfig config;
22+
private final ExecutorService requestExecutor;
23+
24+
@Inject
25+
AsyncDataFetcherFactory(Injector injector, GraphQlServiceConfig config) {
26+
this.injector = injector;
27+
this.config = config;
28+
this.requestExecutor =
29+
Executors.newFixedThreadPool(
30+
config.getMaxRequestThreads(),
31+
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("request-handler-%d").build());
32+
}
33+
34+
<T> DataFetcher<CompletableFuture<T>> buildDataFetcher(
35+
Class<? extends DataFetcher<CompletableFuture<T>>> dataFetcherClass) {
36+
return new AsyncForwardingDataFetcher<>(
37+
this.injector.getInstance(dataFetcherClass), requestExecutor, config);
38+
}
39+
40+
@AllArgsConstructor
41+
private static class AsyncForwardingDataFetcher<T> implements DataFetcher<CompletableFuture<T>> {
42+
private final DataFetcher<CompletableFuture<T>> delegate;
43+
private final ExecutorService executorService;
44+
private final GraphQlServiceConfig config;
45+
46+
@Override
47+
public CompletableFuture<T> get(DataFetchingEnvironment dataFetchingEnvironment)
48+
throws Exception {
49+
// Really all we're doing here is changing the thread that the future is run on by default
50+
return CompletableFuture.supplyAsync(
51+
() -> {
52+
try {
53+
return delegate
54+
.get(dataFetchingEnvironment)
55+
.get(config.getGraphQlTimeout().toMillis(), MILLISECONDS);
56+
} catch (Exception e) {
57+
throw new RuntimeException(e);
58+
}
59+
},
60+
executorService);
61+
}
62+
}
63+
}

hypertrace-core-graphql-context/src/main/java/org/hypertrace/core/graphql/context/DefaultGraphQlRequestContextBuilder.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package org.hypertrace.core.graphql.context;
22

33
import com.google.common.collect.Streams;
4-
import com.google.inject.Injector;
54
import graphql.kickstart.servlet.context.DefaultGraphQLServletContext;
65
import graphql.kickstart.servlet.context.DefaultGraphQLServletContextBuilder;
76
import graphql.kickstart.servlet.context.GraphQLServletContext;
@@ -10,6 +9,7 @@
109
import java.util.Map;
1110
import java.util.Optional;
1211
import java.util.Set;
12+
import java.util.concurrent.CompletableFuture;
1313
import java.util.stream.Collectors;
1414
import javax.annotation.Nonnull;
1515
import javax.inject.Inject;
@@ -27,12 +27,13 @@ class DefaultGraphQlRequestContextBuilder extends DefaultGraphQLServletContextBu
2727
static final Set<String> TRACING_CONTEXT_HEADER_KEY_PREFIXES =
2828
Set.of("X-B3-", "traceparent", "tracestate");
2929

30-
private final Injector injector;
3130
private final GraphQlServiceConfig serviceConfig;
31+
private final AsyncDataFetcherFactory dataFetcherFactory;
3232

3333
@Inject
34-
DefaultGraphQlRequestContextBuilder(Injector injector, GraphQlServiceConfig serviceConfig) {
35-
this.injector = injector;
34+
DefaultGraphQlRequestContextBuilder(
35+
AsyncDataFetcherFactory dataFetcherFactory, GraphQlServiceConfig serviceConfig) {
36+
this.dataFetcherFactory = dataFetcherFactory;
3637
this.serviceConfig = serviceConfig;
3738
}
3839

@@ -63,8 +64,10 @@ public Optional<DataLoaderRegistry> getDataLoaderRegistry() {
6364
}
6465

6566
@Override
66-
public <T extends DataFetcher<?>> T constructDataFetcher(Class<T> dataFetcherClass) {
67-
return DefaultGraphQlRequestContextBuilder.this.injector.getInstance(dataFetcherClass);
67+
public <T> DataFetcher<CompletableFuture<T>> constructDataFetcher(
68+
Class<? extends DataFetcher<CompletableFuture<T>>> dataFetcherClass) {
69+
return DefaultGraphQlRequestContextBuilder.this.dataFetcherFactory.buildDataFetcher(
70+
dataFetcherClass);
6871
}
6972

7073
@Override

hypertrace-core-graphql-context/src/main/java/org/hypertrace/core/graphql/context/GraphQlRequestContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import graphql.schema.DataFetcher;
55
import java.util.Map;
66
import java.util.Optional;
7+
import java.util.concurrent.CompletableFuture;
78
import javax.annotation.Nonnull;
89

910
public interface GraphQlRequestContext extends GraphQLContext {
@@ -12,7 +13,8 @@ public interface GraphQlRequestContext extends GraphQLContext {
1213
* A tool to create data fetchers via injection container due to limitations in the framework. For
1314
* normal injectable instantiation, do not use this method.
1415
*/
15-
<T extends DataFetcher<?>> T constructDataFetcher(Class<T> dataFetcherClass);
16+
<T> DataFetcher<CompletableFuture<T>> constructDataFetcher(
17+
Class<? extends DataFetcher<CompletableFuture<T>>> dataFetcherClass);
1618

1719
Optional<String> getAuthorizationHeader();
1820

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.hypertrace.core.graphql.context;
2+
3+
import static org.junit.jupiter.api.Assertions.*;
4+
import static org.mockito.Mockito.when;
5+
6+
import com.google.inject.Guice;
7+
import graphql.schema.DataFetcher;
8+
import graphql.schema.DataFetchingEnvironment;
9+
import java.util.concurrent.CompletableFuture;
10+
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.extension.ExtendWith;
13+
import org.mockito.Mock;
14+
import org.mockito.junit.jupiter.MockitoExtension;
15+
16+
@ExtendWith(MockitoExtension.class)
17+
class AsyncDataFetcherFactoryTest {
18+
@Mock GraphQlServiceConfig graphQlServiceConfig;
19+
@Mock DataFetchingEnvironment dataFetchingEnvironment;
20+
21+
@Test
22+
void canBuildAsyncDataFetcher() throws Exception {
23+
when(graphQlServiceConfig.getMaxRequestThreads()).thenReturn(1);
24+
DataFetcher<CompletableFuture<Thread>> fetcher =
25+
new AsyncDataFetcherFactory(Guice.createInjector(), graphQlServiceConfig)
26+
.buildDataFetcher(ThreadEchoingDataFetcher.class);
27+
28+
Thread fetcherThread = fetcher.get(dataFetchingEnvironment).get();
29+
30+
assertNotEquals(Thread.currentThread(), fetcherThread);
31+
assertNotNull(fetcherThread);
32+
}
33+
34+
private static class ThreadEchoingDataFetcher implements DataFetcher<CompletableFuture<Thread>> {
35+
@Override
36+
public CompletableFuture<Thread> get(DataFetchingEnvironment environment) {
37+
return CompletableFuture.completedFuture(Thread.currentThread());
38+
}
39+
}
40+
}

hypertrace-core-graphql-context/src/test/java/org/hypertrace/core/graphql/context/DefaultGraphQlRequestContextBuilderTest.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
import static org.mockito.Mockito.verify;
1515
import static org.mockito.Mockito.when;
1616

17-
import com.google.inject.Injector;
1817
import graphql.schema.DataFetcher;
1918
import java.util.Collections;
2019
import java.util.List;
2120
import java.util.Map;
2221
import java.util.Optional;
22+
import java.util.concurrent.CompletableFuture;
2323
import javax.servlet.http.HttpServletRequest;
2424
import javax.servlet.http.HttpServletResponse;
2525
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
@@ -32,7 +32,7 @@
3232
@ExtendWith(MockitoExtension.class)
3333
class DefaultGraphQlRequestContextBuilderTest {
3434

35-
@Mock Injector mockInjector;
35+
@Mock AsyncDataFetcherFactory mockDataFetcherFactory;
3636
@Mock HttpServletRequest mockRequest;
3737
@Mock HttpServletResponse mockResponse;
3838
@Mock GraphQlServiceConfig mockServiceConfig;
@@ -43,7 +43,8 @@ class DefaultGraphQlRequestContextBuilderTest {
4343
@BeforeEach
4444
void beforeEach() {
4545
this.contextBuilder =
46-
new DefaultGraphQlRequestContextBuilder(this.mockInjector, this.mockServiceConfig);
46+
new DefaultGraphQlRequestContextBuilder(
47+
this.mockDataFetcherFactory, this.mockServiceConfig);
4748
this.requestContext = this.contextBuilder.build(this.mockRequest, this.mockResponse);
4849
}
4950

@@ -70,9 +71,9 @@ void delegatesDataLoaderRegistry() {
7071
}
7172

7273
@Test
73-
void canConstructDataFetcher() {
74-
this.requestContext.constructDataFetcher(DataFetcher.class);
75-
verify(this.mockInjector).getInstance(DataFetcher.class);
74+
void canDelegateDataFetcherConstruction() {
75+
this.requestContext.constructDataFetcher(TestDataFetcher.class);
76+
verify(this.mockDataFetcherFactory).buildDataFetcher(TestDataFetcher.class);
7677
}
7778

7879
@Test
@@ -135,4 +136,6 @@ void returnsLowerCasedTracingHeadersIfAnyMatches() {
135136
"x-b3-parent-trace-id value"),
136137
this.requestContext.getTracingContextHeaders());
137138
}
139+
140+
private interface TestDataFetcher extends DataFetcher<CompletableFuture<String>> {}
138141
}

hypertrace-core-graphql-service/src/main/java/org/hypertrace/core/graphql/service/DefaultGraphQlServiceConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class DefaultGraphQlServiceConfig implements GraphQlServiceConfig {
2020
private static final String DEFAULT_TENANT_ID = "defaultTenantId";
2121

2222
private static final String MAX_IO_THREADS_PROPERTY = "threads.io.max";
23+
private static final String MAX_REQUEST_THREADS_PROPERTY = "threads.request.max";
2324

2425
private static final String ATTRIBUTE_SERVICE_HOST_PROPERTY = "attribute.service.host";
2526
private static final String ATTRIBUTE_SERVICE_PORT_PROPERTY = "attribute.service.port";
@@ -36,6 +37,7 @@ class DefaultGraphQlServiceConfig implements GraphQlServiceConfig {
3637
private final Duration graphQlTimeout;
3738
private final Optional<String> defaultTenantId;
3839
private final int maxIoThreads;
40+
private final int maxRequestThreads;
3941
private final String attributeServiceHost;
4042
private final int attributeServicePort;
4143
private final Duration attributeServiceTimeout;
@@ -51,6 +53,7 @@ class DefaultGraphQlServiceConfig implements GraphQlServiceConfig {
5153
this.graphQlTimeout = untypedConfig.getDuration(GRAPHQL_TIMEOUT);
5254
this.defaultTenantId = optionallyGet(() -> untypedConfig.getString(DEFAULT_TENANT_ID));
5355
this.maxIoThreads = untypedConfig.getInt(MAX_IO_THREADS_PROPERTY);
56+
this.maxRequestThreads = untypedConfig.getInt(MAX_REQUEST_THREADS_PROPERTY);
5457

5558
this.attributeServiceHost = untypedConfig.getString(ATTRIBUTE_SERVICE_HOST_PROPERTY);
5659
this.attributeServicePort = untypedConfig.getInt(ATTRIBUTE_SERVICE_PORT_PROPERTY);
@@ -98,6 +101,11 @@ public int getMaxIoThreads() {
98101
return maxIoThreads;
99102
}
100103

104+
@Override
105+
public int getMaxRequestThreads() {
106+
return maxRequestThreads;
107+
}
108+
101109
@Override
102110
public String getAttributeServiceHost() {
103111
return this.attributeServiceHost;

hypertrace-core-graphql-service/src/main/resources/configs/common/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ graphql.corsEnabled = true
88
graphql.timeout = 30s
99

1010
threads.io.max = 10
11+
threads.request.max = 10
1112

1213
attribute.service = {
1314
host = localhost

0 commit comments

Comments
 (0)