feat(server): Add http support for internal resource manager communication#26635
feat(server): Add http support for internal resource manager communication#26635tdcmeehan merged 1 commit intoprestodb:masterfrom
Conversation
Reviewer's GuideThis PR adds HTTP-based internal communication support for the Resource Manager alongside the existing Thrift protocol. It introduces configuration to select the communication protocol, binds a new HTTP client and REST endpoints, and updates the heartbeat sender to dispatch requests over HTTP or Thrift transparently. Sequence diagram for sending a node heartbeat with protocol selectionsequenceDiagram
participant Coordinator
participant ResourceManagerClusterStatusSender
participant ResourceManagerClient
participant HttpResourceManagerClient
participant ResourceManagerResource
Coordinator->>ResourceManagerClusterStatusSender: sendNodeHeartbeat()
alt CommunicationProtocol = HTTP
ResourceManagerClusterStatusSender->>HttpResourceManagerClient: nodeHeartbeat(NodeStatus)
HttpResourceManagerClient->>ResourceManagerResource: HTTP PUT /v1/resourcemanager/nodeHeartbeat
ResourceManagerResource->>ResourceManagerClusterStateProvider: registerNodeHeartbeat(NodeStatus)
else CommunicationProtocol = Thrift
ResourceManagerClusterStatusSender->>ResourceManagerClient: nodeHeartbeat(NodeStatus)
ResourceManagerClient->>ResourceManagerClusterStateProvider: registerNodeHeartbeat(NodeStatus)
end
Class diagram for ResourceManagerClusterStatusSender and related clientsclassDiagram
class ResourceManagerClusterStatusSender {
- DriftClient thriftResourceManagerClient
- HttpClient httpClient
- InternalNodeManager internalNodeManager
- ResourceGroupManager resourceGroupManager
- Supplier statusSupplier
- ScheduledExecutorService executor
- Duration queryHeartbeatInterval
- InternalCommunicationConfig.CommunicationProtocol communicationProtocol
- Map queries
- Map httpClientCache
- PeriodicTaskExecutor nodeHeartbeatSender
- Optional resourceRuntimeHeartbeatSender
+ sendNodeHeartbeat()
+ sendQueryHeartbeat(...)
+ sendResourceGroupRuntimeHeartbeat()
- getResourceManagers()
- getOrCreateHttpClient(HostAddress)
}
class ResourceManagerClient {
<<interface>>
+ queryHeartbeat(...)
+ nodeHeartbeat(...)
+ resourceGroupRuntimeHeartbeat(...)
+ getResourceGroupInfo(...)
+ getMemoryPoolInfo()
+ getRunningTaskCount()
}
class HttpResourceManagerClient {
+ queryHeartbeat(...)
+ nodeHeartbeat(...)
+ resourceGroupRuntimeHeartbeat(...)
+ getResourceGroupInfo(...)
+ getMemoryPoolInfo()
+ getRunningTaskCount()
}
ResourceManagerClusterStatusSender --> ResourceManagerClient
ResourceManagerClusterStatusSender --> HttpResourceManagerClient
HttpResourceManagerClient ..|> ResourceManagerClient
Class diagram for ResourceManagerConfig protocol selectionclassDiagram
class ResourceManagerConfig {
- boolean httpServerEnabled
- InternalCommunicationConfig.CommunicationProtocol communicationProtocol
+ setHttpServerEnabled(boolean)
+ getHttpServerEnabled()
+ setCommunicationProtocol(CommunicationProtocol)
+ getCommunicationProtocol()
}
class InternalCommunicationConfig {
}
class CommunicationProtocol {
<<enumeration>>
THRIFT
HTTP
}
ResourceManagerConfig --> InternalCommunicationConfig
InternalCommunicationConfig --> CommunicationProtocol
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- TestResourceManagerConfig uses resource-manager.http-enabled but the actual @config annotation is resource-manager.http-server-enabled, so update the test or the annotation to use matching property names.
- TestResourceManagerClusterStatusSender is passing null for the HttpClient parameter, causing NPEs—either supply a mock HttpClient in the test setup or default the protocol to THRIFT for that test.
- ResourceManagerClusterStatusSender has repeated loops for HTTP vs THRIFT; consider pulling the per-host send logic into a shared method or strategy to avoid duplication and make future protocol additions easier.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- TestResourceManagerConfig uses resource-manager.http-enabled but the actual @Config annotation is resource-manager.http-server-enabled, so update the test or the annotation to use matching property names.
- TestResourceManagerClusterStatusSender is passing null for the HttpClient parameter, causing NPEs—either supply a mock HttpClient in the test setup or default the protocol to THRIFT for that test.
- ResourceManagerClusterStatusSender has repeated loops for HTTP vs THRIFT; consider pulling the per-host send logic into a shared method or strategy to avoid duplication and make future protocol additions easier.
## Individual Comments
### Comment 1
<location> `presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java:202` </location>
<code_context>
+
+ public void sendResourceGroupRuntimeHeartbeat()
+ {
+ List resourceGroupRuntimeInfos = resourceGroupManager.getResourceGroupRuntimeInfos();
+
+ if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) {
</code_context>
<issue_to_address>
**nitpick:** Raw type used for resourceGroupRuntimeInfos; should be parameterized.
Specify the generic type for 'resourceGroupRuntimeInfos' to improve type safety and code clarity.
</issue_to_address>
### Comment 2
<location> `presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java:223` </location>
<code_context>
private List<HostAddress> getResourceManagers()
{
return internalNodeManager.getResourceManagers().stream()
- .filter(node -> node.getThriftPort().isPresent())
+ .filter(node -> communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP ||
</code_context>
<issue_to_address>
**question (bug_risk):** Filtering logic for resource managers may allow nodes without Thrift port in HTTP mode.
Verify that all nodes included in HTTP mode are valid HTTP resource managers to prevent connection attempts to unsupported nodes.
</issue_to_address>
### Comment 3
<location> `presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java:46-48` </location>
<code_context>
private boolean resourceGroupServiceCacheEnabled;
private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS);
- private boolean heartbeatHttpEnabled;
+ private boolean httpServerEnabled;
+
+ private InternalCommunicationConfig.CommunicationProtocol communicationProtocol = InternalCommunicationConfig.CommunicationProtocol.THRIFT;
private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);
</code_context>
<issue_to_address>
**suggestion:** Both httpServerEnabled and communicationProtocol are present; possible configuration ambiguity.
Clarify how these settings interact, especially when httpServerEnabled is false but communicationProtocol is set to HTTP, to prevent misconfiguration.
Suggested implementation:
```java
/**
* If true, enables the HTTP server for resource manager communication.
* If false, communicationProtocol must not be set to HTTP.
*/
private boolean httpServerEnabled;
/**
* Protocol used for internal communication. If set to HTTP, httpServerEnabled must be true.
*/
private InternalCommunicationConfig.CommunicationProtocol communicationProtocol = InternalCommunicationConfig.CommunicationProtocol.THRIFT;
```
```java
private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);
/**
* Validates the configuration to prevent ambiguous or invalid settings.
* Throws IllegalArgumentException if httpServerEnabled is false but communicationProtocol is set to HTTP.
*/
@jakarta.annotation.PostConstruct
public void validateConfiguration() {
if (!httpServerEnabled &&
communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) {
throw new IllegalArgumentException(
"Invalid configuration: communicationProtocol is set to HTTP but httpServerEnabled is false. " +
"Either enable httpServerEnabled or set communicationProtocol to a non-HTTP value."
);
}
}
```
</issue_to_address>
### Comment 4
<location> `presto-main/src/main/java/com/facebook/presto/server/ResourceManagerResource.java:61` </location>
<code_context>
+ }
+
+ @Override
+ public void nodeHeartbeat(NodeStatus nodeStatus)
+ {
+ URI uri = buildUri("/v1/resourcemanager/nodeHeartbeat");
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Node heartbeat and other endpoints use executor.execute/submit; consider error handling for async tasks.
Exceptions from background tasks may be missed. Add error handling or logging to capture failures from clusterStateProvider methods.
</issue_to_address>
### Comment 5
<location> `presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java:80-81` </location>
<code_context>
.put("resource-manager.running-task-count-fetch-interval", "1m")
.put("resource-manager.resource-group-runtimeinfo-timeout", "4s")
- .put("resource-manager.heartbeat-http-enabled", "true")
+ .put("resource-manager.http-enabled", "true")
+ .put("resource-manager.communication-protocol", "HTTP")
.build();
</code_context>
<issue_to_address>
**issue (testing):** Test for explicit property mappings should use the correct property name.
The test should use "resource-manager.http-server-enabled" to match the implementation and properly validate the mapping.
</issue_to_address>
### Comment 6
<location> `presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java:85-86` </location>
<code_context>
sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> resourceManagerClient,
+ null,
nodeManager,
() -> NODE_STATUS,
</code_context>
<issue_to_address>
**suggestion (testing):** TestResourceManagerClusterStatusSender does not test HTTP protocol path.
Please add tests for the HTTP protocol path, including error scenarios and edge cases, to ensure full coverage of the new logic.
Suggested implementation:
```java
sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> resourceManagerClient,
null,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor(),
);
// --- HTTP protocol path tests ---
@Test
public void testHttpProtocolSuccess() throws Exception {
ResourceManagerClient httpClient = mock(ResourceManagerClient.class);
when(httpClient.sendClusterStatus(any(), any())).thenReturn(/* mock successful response */);
ResourceManagerClusterStatusSender httpSender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> httpClient,
ResourceManagerClusterStatusSender.Protocol.HTTP,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Call the method and assert success
httpSender.sendClusterStatus();
// Add assertions to verify correct behavior
}
@Test
public void testHttpProtocolErrorResponse() throws Exception {
ResourceManagerClient httpClient = mock(ResourceManagerClient.class);
when(httpClient.sendClusterStatus(any(), any())).thenThrow(new IOException("HTTP error"));
ResourceManagerClusterStatusSender httpSender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> httpClient,
ResourceManagerClusterStatusSender.Protocol.HTTP,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Call the method and assert error handling
assertThrows(IOException.class, httpSender::sendClusterStatus);
}
@Test
public void testHttpProtocolTimeout() throws Exception {
ResourceManagerClient httpClient = mock(ResourceManagerClient.class);
when(httpClient.sendClusterStatus(any(), any())).thenAnswer(invocation -> {
Thread.sleep(2000); // Simulate timeout
return null;
});
ResourceManagerClusterStatusSender httpSender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> httpClient,
ResourceManagerClusterStatusSender.Protocol.HTTP,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Call the method and assert timeout handling
// (Assume sender has timeout logic, otherwise this test can be adjusted)
}
@Test
public void testHttpProtocolInvalidResponse() throws Exception {
ResourceManagerClient httpClient = mock(ResourceManagerClient.class);
when(httpClient.sendClusterStatus(any(), any())).thenReturn(null); // Simulate invalid response
ResourceManagerClusterStatusSender httpSender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> httpClient,
ResourceManagerClusterStatusSender.Protocol.HTTP,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Call the method and assert handling of invalid response
// Add assertions as appropriate
}
```
- You may need to adjust the mock responses and assertions to match the actual implementation details of `ResourceManagerClient` and `ResourceManagerClusterStatusSender`.
- Ensure that `ResourceManagerClusterStatusSender.Protocol.HTTP` is a valid enum value; if not, use the correct value for HTTP protocol.
- If the sender's error/timeout handling is more complex, add more detailed assertions and possibly use Awaitility or similar for async tests.
- Import necessary classes: `org.junit.jupiter.api.Test`, `org.mockito.Mockito`, etc.
</issue_to_address>
### Comment 7
<location> `presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java:86-87` </location>
<code_context>
sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> resourceManagerClient,
+ null,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor(),
</code_context>
<issue_to_address>
**suggestion (testing):** No tests for error handling in HTTP client usage.
Please add tests that simulate HTTP failures to ensure error handling and logging work as intended.
Suggested implementation:
```java
sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> resourceManagerClient,
null,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor(),
```
```java
@Test
public void testSendClusterStatusHandlesHttpClientFailure() throws Exception
{
// Arrange: mock ResourceManagerClient to throw an exception
ResourceManagerClient failingClient = mock(ResourceManagerClient.class);
when(failingClient.sendClusterStatus(any(), any())).thenThrow(new RuntimeException("HTTP failure"));
ResourceManagerClusterStatusSender sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> failingClient,
null,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Act & Assert: error should be handled gracefully
try {
sender.sendClusterStatus();
// If no exception is thrown, error handling is working as expected
} catch (Exception e) {
fail("Exception should be handled within sendClusterStatus");
}
// Optionally, verify logging (if using a logging framework with test hooks)
// For example, using a log capturing library to assert error was logged
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...in/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java
Outdated
Show resolved
Hide resolved
...in/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java
Show resolved
Hide resolved
presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/server/ResourceManagerResource.java
Outdated
Show resolved
Hide resolved
a9b3233 to
4fd39b9
Compare
43135aa to
d9c6e38
Compare
2428079 to
bf72aca
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Thanks for the doc! Just a couple of suggestions, looks good overall.
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull branch, local doc build. Looks good, thanks!
| The protocol used for communication with the resource manager. This | ||
| can be set to ``THRIFT`` or ``HTTP``. | ||
|
|
||
| * ``resource-manager.http-server-enabled``: |
There was a problem hiding this comment.
Could we use the value of internal-communication.resource-manager-communication-protocol, and if it's HTTP, then HTTP server is enabled?
There was a problem hiding this comment.
The issue is that the C++ workers use the HTTP protocol regardless if the coordinator communicates in thrift. So you could want to keep the thrift implementation & keep the http server on. We could have internal-communication.resource-manager-communication-protocol override resource-manager.http-server-enabled so that if it's set to HTTP & the http-server-enabled is set to false, it'll turn on anyway.
|
|
||
| try { | ||
| client.nodeHeartbeat(nodeStatus); | ||
| fail("Expected PrestoException"); |
There was a problem hiding this comment.
Use idiomatic TestNG assertThrows
| client.nodeHeartbeat(createTestNodeStatus("node1")); | ||
| } | ||
|
|
||
| private static BasicQueryInfo createTestQueryInfo(String queryId, QueryState state) |
There was a problem hiding this comment.
I think we can actually test this completely end to end using the actual client and server. Look at how TestHttpRemoteTask uses JaxrsTestingHttpProcessor, I think we can do something similar:
There was a problem hiding this comment.
I added the jaxrs testing, but I don't think it's possible to test the async GET endpoints with it. I've accessed the ResourceManagerClusterStateProvider directly & tested the endpoints locally instead.
There was a problem hiding this comment.
I rewrote the tests using a regular HTTP server & a regular HTTP client to make sure the GET endpoints are tested too.
| getResourceManagers().forEach(hostAndPort -> | ||
| resourceManagerClient.get(Optional.of(hostAndPort.toString())).queryHeartbeat(nodeIdentifier, basicQueryInfo, sequenceId)); | ||
|
|
||
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { |
There was a problem hiding this comment.
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { | |
| if (communicationProtocol == HTTP) { |
| getResourceManagers().forEach(hostAndPort -> | ||
| resourceManagerClient.get(Optional.of(hostAndPort.toString())).nodeHeartbeat(statusSupplier.get())); | ||
| NodeStatus nodeStatus = statusSupplier.get(); | ||
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { |
There was a problem hiding this comment.
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { | |
| if (communicationProtocol == HTTP) { |
| { | ||
| List<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfos = resourceGroupManager.getResourceGroupRuntimeInfos(); | ||
|
|
||
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { |
There was a problem hiding this comment.
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { | |
| if (communicationProtocol == HTTP) { |
| { | ||
| log.error(exception, "Resource manager %s request to %s failed", operationName, request.getUri()); | ||
| throw new PrestoException( | ||
| GENERIC_INTERNAL_ERROR, |
There was a problem hiding this comment.
We'll want to add a dedicated error code.
There was a problem hiding this comment.
Added a new RESOURCE_MANAGER_EXCEPTION
| import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| @Path("/v1/resourcemanager") |
There was a problem hiding this comment.
I want to think about how to make this more RESTful. These endpoints are RPC style like Thrift, and for HTTP we follow REST conventions.
There was a problem hiding this comment.
I reworked the endpoints, let me know what you think
85c07a6 to
0b38315
Compare
| import java.util.Map; | ||
| import java.util.Optional; | ||
|
|
||
| public interface HttpResourceManagerClient |
There was a problem hiding this comment.
Can we unify this with ResourceManagerClient?
There was a problem hiding this comment.
The issue is ResourceManagerClient is already bound to a drift client so trying to bound it to the HttpResourceManagerClient causes issues. Also, the http client requires the URL to be passed when calling a method while the drift client has a different instance for each URL and calls .get(URL) to call methods.
There was a problem hiding this comment.
Then let's rename the existing ResourceManagerClient to something that indicates it's Drift related and mark it as deprecated.
| @Path("/v1/resource-manager") | ||
| public class ResourceManagerResource | ||
| { | ||
| Logger log = Logger.get(ResourceGroupRuntimeInfo.class); |
There was a problem hiding this comment.
| Logger log = Logger.get(ResourceGroupRuntimeInfo.class); | |
| private static final Logger LOG = Logger.get(ResourceManagerResource.class); |
| throw new PrestoException(GENERIC_INTERNAL_ERROR, ie); | ||
| } | ||
| catch (Exception e) { | ||
| throw new PrestoException(GENERIC_INTERNAL_ERROR, e); |
There was a problem hiding this comment.
Can we try something along the lines of this?
catch (ExecutionException e) {
throwIfInstanceOf(e.getCause(), PrestoException.class);
throwIfUnchecked(e.getCause());
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
}a127d6d to
7e55572
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Just one nit of formatting.
|
Hi @infvg , just checking in to see if you’re still planning to continue work on this PR. |
|
@Dilli-Babu-Godari yes, I'm continuing this |
steveburnett
left a comment
There was a problem hiding this comment.
Thanks for the doc. Looks good, just a couple of nits.
|
CC: @amitkdutta -- this changes the HTTP endpoint for worker -> coordinator resource manager heartbeats, do you guys need this to go out in two PRs? |
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, new local doc build, marked all my open comments as Resolved (I think, let me know if I missed anything) looks good. Thank you!
| request.setMethod(proxygen::HTTPMethod::PUT); | ||
| request.setURL("/v1/heartbeat"); | ||
| request.setMethod(proxygen::HTTPMethod::PATCH); | ||
| request.setURL("/v1/resource-manager/node/" + status.nodeId); |
There was a problem hiding this comment.
This is the change in Worker side. PeriodicHeartbeatManager is used for multi-coordinator setup. Its instantiated in
presto/presto-native-execution/presto_cpp/main/PrestoServer.cpp
Lines 728 to 737 in 3020af7
At Meta deployment configs, we are setting heartbeat-frequency-ms is not set, so this is not actively used. CC: @gggrace14 @kewang1024
Nope. Lets merge this one as is. Thanks @tdcmeehan |
…communication (prestodb#26635)" This reverts commit b0b83a4.
…communication (prestodb#26635)" This reverts commit b0b83a4.
…manager communication (prestodb#26635)"" This reverts commit fcbd39f.
…communication (prestodb#26635)" This reverts commit b0b83a4.
Description
This PR adds a HTTP server for the resource manager and allows communication to and from the resource manager using HTTPS as opposed to thrift.
Motivation and Context
This will help Presto use the resource manager in cases where using the thrift protocol is not feasible.
Impact
Previously, there was a server on the resource manager located at
/v1/heartbeatfor C++ workers to send their heartbeat. This PR will revamp that server and modify the endpoint.There was also a config:
resource-manager.heartbeat-http-enabledto enable that http server. This has becomeresource-manager.http-server-enabled.Test Plan
UTs, local testing.
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.