diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index 150130a9b39fb..57c6401bc18d6 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -1509,3 +1509,36 @@ Controls behavior when a materialized view is stale and no per-view staleness co Valid values are ``FAIL`` (throw an error) or ``USE_VIEW_QUERY`` (query base tables instead). The corresponding session property is :ref:`admin/properties-session:\`\`materialized_view_stale_read_behavior\`\``. + +Resource Manager Properties +--------------------------- + +``resource-manager-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``false`` + +Set to true when a resource manager exists in the cluster. + +``resource-manager.http-server-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``true`` + +Controls whether the resource manager's REST server is turned on. This will enable +nodes to communicate with the resource manager using HTTP/S. + +``internal-communication.resource-manager-communication-protocol`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``string`` +* **Allowed values:** ``THRIFT``, ``HTTP`` +* **Default value:** ``THRIFT`` + +Controls whether the node will communicate with the resource manager using Thrift, +or HTTP/S. HTTPS are supported using the same internal communication HTTPS +configs. + +To enable SSL/TLS, see :doc:`/security/internal-communication`. \ No newline at end of file diff --git a/presto-docs/src/main/sphinx/installation/deployment.rst b/presto-docs/src/main/sphinx/installation/deployment.rst index 1478b8ca2d98e..870005a5b82b7 100644 --- a/presto-docs/src/main/sphinx/installation/deployment.rst +++ b/presto-docs/src/main/sphinx/installation/deployment.rst @@ -161,6 +161,8 @@ At least one resource manager is needed for a cluster, and more can be added to discovery-server.enabled=true discovery.uri=http://example.net:8080 (Point to resource manager host/vip) thrift.server.ssl.enabled=true + resource-manager.http-server-enabled=false + internal-communication.resource-manager-communication-protocol=THRIFT * ``Coordinator`` @@ -175,6 +177,7 @@ A cluster can have a pool of coordinators. Each coordinator will run a subset of query.max-memory-per-node=1GB discovery.uri=http://example.net:8080 (Point to resource manager host/vip) resource-manager-enabled=true + internal-communication.resource-manager-communication-protocol=THRIFT * ``Worker`` @@ -188,6 +191,7 @@ A cluster can have a pool of workers. They send their heartbeats to the resource query.max-memory-per-node=1GB discovery.uri=http://example.net:8080 (Point to resource manager host/vip) resource-manager-enabled=true + internal-communication.resource-manager-communication-protocol=THRIFT These properties require some explanation: @@ -231,6 +235,15 @@ These properties require some explanation: the host and port of the Presto coordinator. This URI must not end in a slash. +* ``internal-communication.resource-manager-communication-protocol``: + The protocol used for communication with the resource manager. This + can be set to ``THRIFT`` or ``HTTP``. + +* ``resource-manager.http-server-enabled``: + Whether to enable the resource manager HTTP server or not. If + ``internal-communication.resource-manager-communication-protocol=HTTP``, this + must be set to ``true``. + The following flags can help one tune the disaggregated coordinator cluster’s resource groups to the desired consistency: * ``concurrency-threshold-to-enable-resource-group-refresh (default: 1.0)`` diff --git a/presto-docs/src/main/sphinx/overview/concepts.rst b/presto-docs/src/main/sphinx/overview/concepts.rst index 828da6829dc71..e92f552eb7734 100644 --- a/presto-docs/src/main/sphinx/overview/concepts.rst +++ b/presto-docs/src/main/sphinx/overview/concepts.rst @@ -38,7 +38,7 @@ data from all coordinators and workers and constructs a global view of the clust A Presto installation with a disaggregated coordinator needs a resource manager. Clusters support multiple resource managers, each acting as a primary. -Coordinators and workers communicate with resource managers using a thrift API. +Coordinators and workers can communicate with resource managers using either a Thrift or HTTP API. Coordinator ^^^^^^^^^^^ diff --git a/presto-internal-communication/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java b/presto-internal-communication/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java index 47812dc99f384..0f8f518af7815 100644 --- a/presto-internal-communication/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java +++ b/presto-internal-communication/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java @@ -47,6 +47,7 @@ public class InternalCommunicationConfig private DataSize maxTaskUpdateSize = new DataSize(16, MEGABYTE); private CommunicationProtocol taskCommunicationProtocol = CommunicationProtocol.HTTP; private CommunicationProtocol serverInfoCommunicationProtocol = CommunicationProtocol.HTTP; + private CommunicationProtocol resourceManagerCommunicationProtocol = CommunicationProtocol.THRIFT; private boolean memoizeDeadNodesEnabled; private String sharedSecret; private long nodeStatsRefreshIntervalMillis = 1_000; @@ -352,6 +353,19 @@ public InternalCommunicationConfig setInternalJwtEnabled(boolean internalJwtEnab return this; } + @Config("internal-communication.resource-manager-communication-protocol") + @ConfigDescription("Protocol for internal communication with resource managers.") + public InternalCommunicationConfig setResourceManagerCommunicationProtocol(CommunicationProtocol resourceManagerCommunicationProtocol) + { + this.resourceManagerCommunicationProtocol = resourceManagerCommunicationProtocol; + return this; + } + + public CommunicationProtocol getResourceManagerCommunicationProtocol() + { + return resourceManagerCommunicationProtocol; + } + @AssertTrue(message = "When internal JWT(internal-communication.jwt.enabled) authentication is enabled, a shared secret(internal-communication.shared-secret) is required") public boolean isRequiredSharedSecretSet() { diff --git a/presto-internal-communication/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java b/presto-internal-communication/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java index de88e6b3fc877..25eb1270beefa 100644 --- a/presto-internal-communication/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java +++ b/presto-internal-communication/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java @@ -53,6 +53,7 @@ public void testDefaults() .setTaskUpdateRequestThriftSerdeEnabled(false) .setTaskInfoResponseThriftSerdeEnabled(false) .setInternalJwtEnabled(false) + .setResourceManagerCommunicationProtocol(CommunicationProtocol.THRIFT) .setNodeStatsRefreshIntervalMillis(1_000) .setNodeDiscoveryPollingIntervalMillis(5_000)); } @@ -84,6 +85,7 @@ public void testExplicitPropertyMappings() .put("experimental.internal-communication.task-info-response-thrift-serde-enabled", "true") .put("internal-communication.node-stats-refresh-interval-millis", "2000") .put("internal-communication.node-discovery-polling-interval-millis", "3000") + .put("internal-communication.resource-manager-communication-protocol", "HTTP") .build(); InternalCommunicationConfig expected = new InternalCommunicationConfig() @@ -106,6 +108,7 @@ public void testExplicitPropertyMappings() .setMemoizeDeadNodesEnabled(true) .setSharedSecret("secret") .setInternalJwtEnabled(true) + .setResourceManagerCommunicationProtocol(CommunicationProtocol.HTTP) .setTaskUpdateRequestThriftSerdeEnabled(true) .setTaskInfoResponseThriftSerdeEnabled(true) .setNodeStatsRefreshIntervalMillis(2000) diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/ResourceGroupRuntimeInfo.java b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/ResourceGroupRuntimeInfo.java index b4b7956034833..63f4aff35a280 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/ResourceGroupRuntimeInfo.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/ResourceGroupRuntimeInfo.java @@ -17,6 +17,8 @@ import com.facebook.drift.annotations.ThriftField; import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.resourceGroups.ResourceGroupId; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Optional; @@ -35,7 +37,14 @@ public class ResourceGroupRuntimeInfo private final Optional resourceGroupConfigSpec; @ThriftConstructor - public ResourceGroupRuntimeInfo(ResourceGroupId resourceGroupId, long memoryUsageBytes, int queuedQueries, int descendantQueuedQueries, int runningQueries, int descendantRunningQueries, Optional resourceGroupConfigSpec) + @JsonCreator + public ResourceGroupRuntimeInfo(@JsonProperty("resourceGroupId") ResourceGroupId resourceGroupId, + @JsonProperty("memoryUsageBytes") long memoryUsageBytes, + @JsonProperty("queuedQueries") int queuedQueries, + @JsonProperty("descendantQueuedQueries") int descendantQueuedQueries, + @JsonProperty("runningQueries") int runningQueries, + @JsonProperty("descendantRunningQueries") int descendantRunningQueries, + @JsonProperty("resourceGroupConfigSpec") Optional resourceGroupConfigSpec) { this.resourceGroupId = requireNonNull(resourceGroupId, "resourceGroupId is null"); this.memoryUsageBytes = memoryUsageBytes; @@ -52,42 +61,49 @@ public static Builder builder(ResourceGroupId resourceGroupId) } @ThriftField(1) + @JsonProperty public ResourceGroupId getResourceGroupId() { return resourceGroupId; } @ThriftField(2) + @JsonProperty public long getMemoryUsageBytes() { return memoryUsageBytes; } @ThriftField(3) + @JsonProperty public int getQueuedQueries() { return queuedQueries; } @ThriftField(4) + @JsonProperty public int getDescendantQueuedQueries() { return descendantQueuedQueries; } @ThriftField(5) + @JsonProperty public int getRunningQueries() { return runningQueries; } @ThriftField(6) + @JsonProperty public int getDescendantRunningQueries() { return descendantRunningQueries; } @ThriftField(7) + @JsonProperty public Optional getResourceGroupConfigSpec() { return resourceGroupConfigSpec; diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ClusterMemoryManagerService.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ClusterMemoryManagerService.java index 51eb8a89941b5..e2291644ad35e 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ClusterMemoryManagerService.java +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ClusterMemoryManagerService.java @@ -15,6 +15,7 @@ import com.facebook.drift.client.DriftClient; import com.facebook.presto.memory.NodeMemoryConfig; +import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.spi.memory.MemoryPoolInfo; @@ -25,11 +26,14 @@ import jakarta.inject.Inject; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL; import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL; +import static com.facebook.presto.server.InternalCommunicationConfig.CommunicationProtocol.THRIFT; import static java.util.Objects.requireNonNull; public class ClusterMemoryManagerService @@ -39,19 +43,24 @@ public class ClusterMemoryManagerService 0, 0); - private final DriftClient resourceManagerClient; + private final DriftClient resourceManagerClient; + private final ResourceManagerClient httpResourceManagerClient; private final ScheduledExecutorService executorService; private final AtomicReference> memoryPools; private final long memoryPoolFetchIntervalMillis; private final boolean isReservedPoolEnabled; private final PeriodicTaskExecutor memoryPoolUpdater; + private final InternalCommunicationConfig internalCommunicationConfig; + private final AtomicBoolean inFlight = new AtomicBoolean(false); @Inject public ClusterMemoryManagerService( - @ForResourceManager DriftClient resourceManagerClient, + @ForResourceManager DriftClient resourceManagerClient, + ResourceManagerClient httpResourceManagerClient, @ForResourceManager ScheduledExecutorService executorService, ResourceManagerConfig resourceManagerConfig, - NodeMemoryConfig nodeMemoryConfig) + NodeMemoryConfig nodeMemoryConfig, + InternalCommunicationConfig internalCommunicationConfig) { this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerClient is null"); this.executorService = requireNonNull(executorService, "executorService is null"); @@ -64,7 +73,19 @@ public ClusterMemoryManagerService( defaultPoolBuilder.put(RESERVED_POOL, EMPTY_MEMORY_POOL); } this.memoryPools = new AtomicReference<>(defaultPoolBuilder.build()); - this.memoryPoolUpdater = new PeriodicTaskExecutor(memoryPoolFetchIntervalMillis, executorService, () -> memoryPools.set(updateMemoryPoolInfo())); + this.memoryPoolUpdater = new PeriodicTaskExecutor(memoryPoolFetchIntervalMillis, executorService, () -> { + if (!inFlight.compareAndSet(false, true)) { + return; + } + try { + memoryPools.set(updateMemoryPoolInfo()); + } + finally { + inFlight.set(false); + } + }); + this.httpResourceManagerClient = requireNonNull(httpResourceManagerClient, "httpResourceManagerClient is null"); + this.internalCommunicationConfig = requireNonNull(internalCommunicationConfig, "internalCommunicationConfig is null"); } @PostConstruct @@ -86,7 +107,13 @@ public Map getMemoryPoolInfo() private Map updateMemoryPoolInfo() { - Map memoryPoolInfos = resourceManagerClient.get().getMemoryPoolInfo(); + Map memoryPoolInfos; + if (internalCommunicationConfig.getResourceManagerCommunicationProtocol() == THRIFT) { + memoryPoolInfos = resourceManagerClient.get().getMemoryPoolInfo(); + } + else { + memoryPoolInfos = httpResourceManagerClient.getMemoryPoolInfo(Optional.empty()); + } memoryPoolInfos.putIfAbsent(GENERAL_POOL, EMPTY_MEMORY_POOL); if (isReservedPoolEnabled) { memoryPoolInfos.putIfAbsent(RESERVED_POOL, EMPTY_MEMORY_POOL); diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java index 8e8fa9d4396b5..b40d389b7e83a 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java @@ -14,35 +14,46 @@ package com.facebook.presto.resourcemanager; import com.facebook.drift.client.DriftClient; +import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.util.PeriodicTaskExecutor; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static com.facebook.presto.server.InternalCommunicationConfig.CommunicationProtocol.THRIFT; import static java.util.Objects.requireNonNull; public class ClusterQueryTrackerService { - private final DriftClient resourceManagerClient; + private final DriftClient resourceManagerClient; private final ScheduledExecutorService executorService; private final long runningTaskCountFetchIntervalMillis; private AtomicInteger runningTaskCount; private final PeriodicTaskExecutor runningTaskCountUpdater; + private final ResourceManagerClient httpResourceManagerClient; + private final InternalCommunicationConfig internalCommunicationConfig; + private final AtomicBoolean inFlight = new AtomicBoolean(false); @Inject public ClusterQueryTrackerService( - @ForResourceManager DriftClient resourceManagerClient, + @ForResourceManager DriftClient resourceManagerClient, + ResourceManagerClient httpResourceManagerClient, @ForResourceManager ScheduledExecutorService executorService, - ResourceManagerConfig resourceManagerConfig) + ResourceManagerConfig resourceManagerConfig, + InternalCommunicationConfig internalCommunicationConfig) { this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerClient is null"); this.executorService = requireNonNull(executorService, "executorService is null"); this.runningTaskCountFetchIntervalMillis = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getRunningTaskCountFetchInterval().toMillis(); this.runningTaskCount = new AtomicInteger(0); this.runningTaskCountUpdater = new PeriodicTaskExecutor(runningTaskCountFetchIntervalMillis, executorService, () -> updateRunningTaskCount()); + this.httpResourceManagerClient = requireNonNull(httpResourceManagerClient, "httpResourceManagerClient is null"); + this.internalCommunicationConfig = requireNonNull(internalCommunicationConfig, "internalCommunicationConfig is null"); } @PostConstruct @@ -64,6 +75,19 @@ public int getRunningTaskCount() private void updateRunningTaskCount() { - this.runningTaskCount.set(resourceManagerClient.get().getRunningTaskCount()); + if (internalCommunicationConfig.getResourceManagerCommunicationProtocol() == THRIFT) { + this.runningTaskCount.set(resourceManagerClient.get().getRunningTaskCount()); + } + else { + if (!inFlight.compareAndSet(false, true)) { + return; + } + try { + this.runningTaskCount.set(httpResourceManagerClient.getRunningTaskCount(Optional.empty())); + } + finally { + inFlight.set(false); + } + } } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/NoOpHttpResourceManagerClient.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/NoOpHttpResourceManagerClient.java new file mode 100644 index 0000000000000..92933f781536d --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/NoOpHttpResourceManagerClient.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.NodeStatus; +import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; +import com.facebook.presto.spi.memory.MemoryPoolId; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class NoOpHttpResourceManagerClient + implements ResourceManagerClient +{ + @Override + public void queryHeartbeat(Optional target, String internalNode, BasicQueryInfo basicQueryInfo, long sequenceId) {} + + @Override + public List getResourceGroupInfo(Optional target, String excludingNode) throws ResourceManagerInconsistentException + { + return ImmutableList.of(); + } + + @Override + public void nodeHeartbeat(Optional target, NodeStatus nodeStatus) {} + + @Override + public Map getMemoryPoolInfo(Optional target) + { + return ImmutableMap.of(); + } + + @Override + public void resourceGroupRuntimeHeartbeat(Optional target, String node, List resourceGroupRuntimeInfo) + {} + + @Override + public int getRunningTaskCount(Optional target) + { + return 0; + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java index e0ada85b8187a..583d93d4d629e 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java @@ -13,36 +13,29 @@ */ package com.facebook.presto.resourcemanager; -import com.facebook.drift.annotations.ThriftMethod; -import com.facebook.drift.annotations.ThriftService; import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.server.NodeStatus; import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; import com.facebook.presto.spi.memory.MemoryPoolId; +import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Optional; -@ThriftService("PrestoResourceManager") public interface ResourceManagerClient { - @ThriftMethod - void queryHeartbeat(String internalNode, BasicQueryInfo basicQueryInfo, long sequenceId); + void queryHeartbeat(Optional target, String internalNode, BasicQueryInfo basicQueryInfo, long sequenceId); - @ThriftMethod - List getResourceGroupInfo(String excludingNode) + List getResourceGroupInfo(Optional target, String excludingNode) throws ResourceManagerInconsistentException; - @ThriftMethod - void nodeHeartbeat(NodeStatus nodeStatus); + void nodeHeartbeat(Optional target, NodeStatus nodeStatus); - @ThriftMethod - Map getMemoryPoolInfo(); + Map getMemoryPoolInfo(Optional target); - @ThriftMethod - void resourceGroupRuntimeHeartbeat(String node, List resourceGroupRuntimeInfo); + void resourceGroupRuntimeHeartbeat(Optional target, String node, List resourceGroupRuntimeInfo); - @ThriftMethod - int getRunningTaskCount(); + int getRunningTaskCount(Optional target); } diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java index 828b1a2f71b05..ac5fd916a55f3 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java @@ -28,6 +28,7 @@ import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.spi.resourceGroups.ResourceGroupId; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -585,4 +586,10 @@ private static boolean isQueryCompleted(Query query) { return query.getBasicQueryInfo().getState().isDone(); } + + @VisibleForTesting + protected Map getResourceGroupStates() + { + return resourceGroupStates; + } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java index b8e5793873a95..17afdd4216228 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java @@ -42,7 +42,8 @@ public class ResourceManagerConfig private Duration memoryPoolFetchInterval = new Duration(1, SECONDS); private boolean resourceGroupServiceCacheEnabled; private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS); - private boolean heartbeatHttpEnabled; + private boolean httpServerEnabled; + private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS); private Duration runningTaskCountFetchInterval = new Duration(1, SECONDS); @@ -279,15 +280,16 @@ public ResourceManagerConfig setRunningTaskCountFetchInterval(Duration runningTa return this; } - public boolean getHeartbeatHttpEnabled() + @Config("resource-manager.http-server-enabled") + @ConfigDescription("Enable HTTP REST endpoints on the resource manager for internal communication.") + public ResourceManagerConfig setHttpServerEnabled(boolean httpEnabled) { - return heartbeatHttpEnabled; + this.httpServerEnabled = httpEnabled; + return this; } - @Config("resource-manager.heartbeat-http-enabled") - public ResourceManagerConfig setHeartbeatHttpEnabled(boolean heartbeatHttpEnabled) + public boolean getHttpServerEnabled() { - this.heartbeatHttpEnabled = heartbeatHttpEnabled; - return this; + return httpServerEnabled; } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerException.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerException.java new file mode 100644 index 0000000000000..b42de0113d0a5 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerException.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.presto.spi.PrestoException; + +import static com.facebook.presto.spi.StandardErrorCode.RESOURCE_MANAGER_ERROR; + +public class ResourceManagerException + extends PrestoException +{ + public ResourceManagerException(String message) + { + super(RESOURCE_MANAGER_ERROR, message); + } + + public ResourceManagerException(String message, Throwable throwable) + { + super(RESOURCE_MANAGER_ERROR, message, throwable); + } + + public ResourceManagerException(Throwable throwable) + { + super(RESOURCE_MANAGER_ERROR, throwable); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/thrift/ResourceManagerClient.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/thrift/ResourceManagerClient.java new file mode 100644 index 0000000000000..9e42173364a3a --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/thrift/ResourceManagerClient.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager.thrift; + +import com.facebook.drift.annotations.ThriftMethod; +import com.facebook.drift.annotations.ThriftService; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.resourcemanager.ResourceManagerInconsistentException; +import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.NodeStatus; +import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; +import com.facebook.presto.spi.memory.MemoryPoolId; + +import java.util.List; +import java.util.Map; + +@ThriftService("PrestoResourceManager") +@Deprecated +public interface ResourceManagerClient +{ + @ThriftMethod + void queryHeartbeat(String internalNode, BasicQueryInfo basicQueryInfo, long sequenceId); + + @ThriftMethod + List getResourceGroupInfo(String excludingNode) + throws ResourceManagerInconsistentException; + + @ThriftMethod + void nodeHeartbeat(NodeStatus nodeStatus); + + @ThriftMethod + Map getMemoryPoolInfo(); + + @ThriftMethod + void resourceGroupRuntimeHeartbeat(String node, List resourceGroupRuntimeInfo); + + @ThriftMethod + int getRunningTaskCount(); +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/thrift/ResourceManagerServer.java similarity index 93% rename from presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java rename to presto-main-base/src/main/java/com/facebook/presto/resourcemanager/thrift/ResourceManagerServer.java index e08702ad1ca69..819995b6b8c6e 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java +++ b/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/thrift/ResourceManagerServer.java @@ -11,12 +11,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.resourcemanager; +package com.facebook.presto.resourcemanager.thrift; import com.facebook.drift.annotations.ThriftException; import com.facebook.drift.annotations.ThriftMethod; import com.facebook.drift.annotations.ThriftService; import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.resourcemanager.ForResourceManager; +import com.facebook.presto.resourcemanager.ResourceManagerClusterStateProvider; +import com.facebook.presto.resourcemanager.ResourceManagerInconsistentException; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.server.NodeStatus; import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; diff --git a/presto-main-base/src/main/java/com/facebook/presto/util/RetryRunner.java b/presto-main-base/src/main/java/com/facebook/presto/util/RetryRunner.java new file mode 100644 index 0000000000000..fa9f2240f1ffb --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/util/RetryRunner.java @@ -0,0 +1,164 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.util; + +import com.facebook.airlift.units.Duration; +import com.facebook.presto.spi.PrestoException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; + +import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +public final class RetryRunner +{ + private final ScheduledExecutorService executor; + private final int maxRetries; + private final Duration minBackoff; + private final Duration maxBackoff; + private final double scale; + private final Duration maxRetryTime; + + public RetryRunner(ScheduledExecutorService executor) + { + this(executor, 5, new Duration(100, MILLISECONDS), new Duration(30, SECONDS), 2.0, + new Duration(1, MINUTES)); + } + + public RetryRunner( + ScheduledExecutorService executor, + int maxRetries, + Duration minBackoff, + Duration maxBackoff, + double scale, + Duration maxRetryTime) + { + this.executor = executor; + this.maxRetries = maxRetries; + this.minBackoff = minBackoff; + this.maxBackoff = maxBackoff; + this.scale = scale; + this.maxRetryTime = maxRetryTime; + } + + public T runWithRetry(Supplier attempt, boolean idempotent) + { + CompletableFuture result = new CompletableFuture<>(); + long startNanos = System.nanoTime(); + long maxRetryNanos = maxRetryTime.roundTo(NANOSECONDS); + + class State + implements Runnable + { + private int failures; + + @Override + public void run() + { + if (result.isDone()) { + return; + } + + executor.submit(() -> { + if (result.isDone()) { + return; + } + try { + result.complete(attempt.get()); + } + catch (Throwable t) { + if (!shouldRetry(t, idempotent)) { + result.completeExceptionally(t); + return; + } + if (failures >= maxRetries) { + result.completeExceptionally(t); + return; + } + + long elapsed = System.nanoTime() - startNanos; + if (elapsed >= maxRetryNanos) { + result.completeExceptionally(t); + return; + } + + failures++; + + Duration backoff = backoffDelay(failures); + long remainingNanos = maxRetryNanos - elapsed; + long delayMs = Math.min(backoff.toMillis(), NANOSECONDS.toMillis(remainingNanos)); + + executor.schedule(this, delayMs, MILLISECONDS); + } + }); + } + + private Duration backoffDelay(int failureCount) + { + double scaled = minBackoff.toMillis() * Math.pow(scale, failureCount - 1); + long delayMs = (long) Math.min(scaled, maxBackoff.toMillis()); + return new Duration(delayMs, MILLISECONDS); + } + } + + new State().run(); + + try { + return result.get(); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new PrestoException(GENERIC_INTERNAL_ERROR, ie); + } + catch (ExecutionException e) { + throwIfInstanceOf(e.getCause(), PrestoException.class); + throwIfUnchecked(e.getCause()); + throw new PrestoException(GENERIC_INTERNAL_ERROR, e); + } + } + + private static boolean shouldRetry(Throwable t, boolean idempotent) + { + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + return false; + } + + if (t instanceof UncheckedIOException && t.getCause() != null) { + t = t.getCause(); + } + + if (idempotent && t instanceof SocketTimeoutException) { + return true; + } + + if (t instanceof InterruptedIOException) { + return false; + } + + return idempotent && t instanceof IOException; + } +} diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java index d6510c0d76bf3..b056f7879d881 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java @@ -14,10 +14,12 @@ package com.facebook.presto.execution; import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; -import com.facebook.presto.resourcemanager.ResourceManagerClient; +import com.facebook.presto.resourcemanager.NoOpHttpResourceManagerClient; import com.facebook.presto.resourcemanager.ResourceManagerConfig; import com.facebook.presto.resourcemanager.TestingClusterQueryTrackerService; import com.facebook.presto.resourcemanager.TestingResourceManagerClient; +import com.facebook.presto.resourcemanager.thrift.ResourceManagerClient; +import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.spi.PrestoException; import org.testng.annotations.Test; @@ -79,7 +81,7 @@ public void testLargeQueryKilledDueToTaskCount_withClusterQueryTracker() .setMaxTotalRunningTaskCountToKillQuery(200); ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor(); ResourceManagerClient resourceManagerClient = new TestingResourceManagerClient(); - ClusterQueryTrackerService clusterQueryTrackerService = new TestingClusterQueryTrackerService((addressSelectionContext, headers) -> resourceManagerClient, newSingleThreadScheduledExecutor(), new ResourceManagerConfig(), 201); + ClusterQueryTrackerService clusterQueryTrackerService = new TestingClusterQueryTrackerService((addressSelectionContext, headers) -> resourceManagerClient, new NoOpHttpResourceManagerClient(), newSingleThreadScheduledExecutor(), new ResourceManagerConfig(), new InternalCommunicationConfig(), 201); try { QueryTracker queryTracker = new QueryTracker<>(config, scheduledExecutorService, Optional.of(clusterQueryTrackerService)); MockQueryExecution smallQuery = MockQueryExecution.withRunningTaskCount(50); diff --git a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java b/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java index 4afefed16b9a2..7b521bb848b8a 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java +++ b/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java @@ -724,7 +724,7 @@ private void assertMemoryPoolMap(ResourceManagerClusterStateProvider provider, i assertEquals(clusterMemoryPoolInfo.getLargestMemoryQuery().map(QueryId::getId), largestMemoryQuery); } - private static BasicQueryInfo createQueryInfo(String queryId, QueryState state) + public static BasicQueryInfo createQueryInfo(String queryId, QueryState state) { return createQueryInfo(queryId, state, "global", GENERAL_POOL); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java b/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java index 48c47a24731b6..81d917127e326 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java @@ -50,7 +50,7 @@ public void testDefaults() .setResourceGroupRuntimeHeartbeatInterval(new Duration(1, SECONDS)) .setRunningTaskCountFetchInterval(new Duration(1, SECONDS)) .setResourceGroupRuntimeInfoTimeout(new Duration(30, SECONDS)) - .setHeartbeatHttpEnabled(false)); + .setHttpServerEnabled(false)); } @Test @@ -75,7 +75,7 @@ public void testExplicitPropertyMappings() .put("resource-manager.resource-group-runtimeinfo-heartbeat-interval", "6m") .put("resource-manager.running-task-count-fetch-interval", "1m") .put("resource-manager.resource-group-runtimeinfo-timeout", "4s") - .put("resource-manager.heartbeat-http-enabled", "true") + .put("resource-manager.http-server-enabled", "true") .build(); ResourceManagerConfig expected = new ResourceManagerConfig() @@ -97,7 +97,7 @@ public void testExplicitPropertyMappings() .setResourceGroupRuntimeHeartbeatInterval(new Duration(6, MINUTES)) .setResourceGroupRuntimeInfoTimeout(new Duration(4, SECONDS)) .setRunningTaskCountFetchInterval(new Duration(1, MINUTES)) - .setHeartbeatHttpEnabled(true); + .setHttpServerEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java b/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java index a58bf8b29bd9f..4ad70b19d5d1f 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java +++ b/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java @@ -14,18 +14,19 @@ package com.facebook.presto.resourcemanager; import com.facebook.drift.client.DriftClient; +import com.facebook.presto.server.InternalCommunicationConfig; import java.util.concurrent.ScheduledExecutorService; public class TestingClusterQueryTrackerService extends ClusterQueryTrackerService { - DriftClient resourceManagerClient; + DriftClient resourceManagerClient; int runningTaskCount; - public TestingClusterQueryTrackerService(DriftClient resourceManagerClient, ScheduledExecutorService executorService, ResourceManagerConfig resourceManagerConfig, int runningTaskCount) + public TestingClusterQueryTrackerService(DriftClient resourceManagerClient, ResourceManagerClient httpResourceManagerClient, ScheduledExecutorService executorService, ResourceManagerConfig resourceManagerConfig, InternalCommunicationConfig internalCommunicationConfig, int runningTaskCount) { - super(resourceManagerClient, executorService, resourceManagerConfig); + super(resourceManagerClient, httpResourceManagerClient, executorService, resourceManagerConfig, internalCommunicationConfig); this.runningTaskCount = runningTaskCount; } diff --git a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java b/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java index 282a499a6dbbd..d21b9bf758156 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java +++ b/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java @@ -14,6 +14,7 @@ package com.facebook.presto.resourcemanager; import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.resourcemanager.thrift.ResourceManagerClient; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.server.NodeStatus; import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; diff --git a/presto-main/etc/config.properties b/presto-main/etc/config.properties index 0ee18a1e4df0f..133fc590d2279 100644 --- a/presto-main/etc/config.properties +++ b/presto-main/etc/config.properties @@ -6,9 +6,11 @@ # # sample nodeId to provide consistency across test runs -node.id=ffffffff-ffff-ffff-ffff-ffffffffffff +node.id=ffffffff-ffff-ffff-ffff-ffffffffffrm node.environment=test http-server.http.port=8080 +coordinator=false +resource-manager=true discovery-server.enabled=true discovery.uri=http://localhost:8080 @@ -18,11 +20,6 @@ exchange.http-client.max-connections-per-server=1000 exchange.http-client.connect-timeout=1m exchange.http-client.idle-timeout=1m -scheduler.http-client.max-connections=1000 -scheduler.http-client.max-connections-per-server=1000 -scheduler.http-client.connect-timeout=1m -scheduler.http-client.idle-timeout=1m - query.client.timeout=5m query.min-expire-age=30m @@ -56,3 +53,5 @@ plugin.bundles=\ presto.version=testversion node-scheduler.include-coordinator=true +resource-manager-enabled=true +internal-communication.resource-manager-communication-protocol=HTTP diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/HttpResourceManagerClient.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/HttpResourceManagerClient.java new file mode 100644 index 0000000000000..f3717c91f8d65 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/HttpResourceManagerClient.java @@ -0,0 +1,212 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.airlift.http.client.HttpClient; +import com.facebook.airlift.http.client.HttpUriBuilder; +import com.facebook.airlift.http.client.Request; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.metadata.InternalNode; +import com.facebook.presto.metadata.InternalNodeManager; +import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.NodeStatus; +import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; +import com.facebook.presto.spi.memory.MemoryPoolId; +import com.facebook.presto.util.RetryRunner; +import jakarta.inject.Inject; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; + +import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom; +import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler; +import static com.facebook.airlift.http.client.Request.Builder.prepareGet; +import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator; +import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; +import static java.util.Objects.requireNonNull; + +/** + * HTTP-based implementation of ResourceManagerClient. + * Communicates with Resource Manager via HTTP REST endpoints. + */ +public class HttpResourceManagerClient + implements ResourceManagerClient +{ + private static final Logger log = Logger.get(HttpResourceManagerClient.class); + + private final HttpClient httpClient; + private final JsonCodec nodeStatusCodec; + private final JsonCodec basicQueryInfoCodec; + private final JsonCodec> resourceGroupRuntimeInfoListCodec; + private final JsonCodec> memoryPoolInfoCodec; + private final JsonCodec integerCodec; + private final InternalNodeManager internalNodeManager; + private final RetryRunner retryRunner; + + @Inject + public HttpResourceManagerClient( + @ForResourceManager HttpClient httpClient, + @ForResourceManager ScheduledExecutorService executor, + JsonCodec nodeStatusCodec, + JsonCodec basicQueryInfoCodec, + JsonCodec> resourceGroupRuntimeInfoListCodec, + JsonCodec> memoryPoolInfoCodec, + InternalNodeManager internalNodeManager) + { + this.httpClient = requireNonNull(httpClient, "httpClient is null"); + this.nodeStatusCodec = requireNonNull(nodeStatusCodec, "nodeStatusCodec is null"); + this.basicQueryInfoCodec = requireNonNull(basicQueryInfoCodec, "basicQueryInfoCodec is null"); + this.resourceGroupRuntimeInfoListCodec = requireNonNull(resourceGroupRuntimeInfoListCodec, "resourceGroupRuntimeInfoListCodec is null"); + this.memoryPoolInfoCodec = requireNonNull(memoryPoolInfoCodec, "memoryPoolInfoCodec is null"); + this.integerCodec = JsonCodec.jsonCodec(Integer.class); + this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null"); + this.retryRunner = new RetryRunner(executor); + } + + @Override + public void queryHeartbeat(Optional target, String internalNode, BasicQueryInfo basicQueryInfo, long sequenceId) + { + URI uri = buildUri("/v1/resource-manager/nodes/" + internalNode + "/queries/" + basicQueryInfo.getQueryId().toString(), target); + + Request request = new Request.Builder() + .setMethod("PATCH") + .setUri(uri) + .setHeader("Content-Type", APPLICATION_JSON) + .setHeader("Sequence-Id", String.valueOf(sequenceId)) + .setBodyGenerator(createStaticBodyGenerator(basicQueryInfoCodec.toJsonBytes(basicQueryInfo))) + .build(); + + retryRunner.runWithRetry(() -> httpClient.execute(request, createStatusResponseHandler()), true); + } + + @Override + public List getResourceGroupInfo(Optional target, String excludingNode) + throws ResourceManagerInconsistentException + { + URI uri = buildUri( + "/v1/resource-manager/resource-groups", + target, + "excludingNode", + excludingNode); + + Request request = prepareGet() + .setUri(uri) + .setHeader("Accept", APPLICATION_JSON) + .build(); + + try { + List result = + httpClient.execute( + request, + createJsonResponseHandler(resourceGroupRuntimeInfoListCodec)); + return result; + } + catch (Exception e) { + log.error(e); + throw e; + } + } + + @Override + public void nodeHeartbeat(Optional target, NodeStatus nodeStatus) + { + URI uri = buildUri("/v1/resource-manager/node/" + nodeStatus.getNodeId(), target); + + Request request = new Request.Builder() + .setMethod("PATCH") + .setUri(uri) + .setHeader("Content-Type", APPLICATION_JSON) + .setBodyGenerator(createStaticBodyGenerator(nodeStatusCodec.toJsonBytes(nodeStatus))) + .build(); + + retryRunner.runWithRetry(() -> httpClient.execute(request, createStatusResponseHandler()), true); + } + + @Override + public Map getMemoryPoolInfo(Optional target) + { + URI uri = buildUri("/v1/resource-manager/memory-pools", target); + + Request request = prepareGet() + .setUri(uri) + .setHeader("Accept", APPLICATION_JSON) + .build(); + + return retryRunner.runWithRetry(() -> httpClient.execute(request, createJsonResponseHandler(memoryPoolInfoCodec)), true); + } + + @Override + public void resourceGroupRuntimeHeartbeat(Optional target, String nodeId, List resourceGroupRuntimeInfo) + { + URI uri = buildUri("/v1/resource-manager/nodes/" + nodeId + "/resource-groups", target); + + Request request = new Request.Builder() + .setMethod("PATCH") + .setUri(uri) + .setHeader("Content-Type", APPLICATION_JSON) + .setBodyGenerator(createStaticBodyGenerator(resourceGroupRuntimeInfoListCodec.toJsonBytes(resourceGroupRuntimeInfo))) + .build(); + + retryRunner.runWithRetry(() -> httpClient.execute(request, createStatusResponseHandler()), true); + } + + @Override + public int getRunningTaskCount(Optional target) + { + URI uri = buildUri("/v1/resource-manager/tasks/count", target, "state", "running"); + + Request request = prepareGet() + .setUri(uri) + .setHeader("Accept", APPLICATION_JSON) + .build(); + + return httpClient.execute(request, createJsonResponseHandler(integerCodec)); + } + + private URI buildUri(String path, Optional target, String... parameters) + { + URI uri = target.isPresent() ? target.get() : getResourceManagerURI(); + + HttpUriBuilder uriBuilder = uriBuilderFrom(uri) + .appendPath(path); + if (parameters.length % 2 != 0) { + throw new IllegalArgumentException("Parameters must be in key/value pairs"); + } + + for (int i = 0; i < parameters.length; i += 2) { + uriBuilder.addParameter(parameters[i], parameters[i + 1]); + } + return uriBuilder.build(); + } + + private URI getResourceManagerURI() + { + List resourceManagers = internalNodeManager.getResourceManagers().stream() + .map(InternalNode::getInternalUri) + .collect(toImmutableList()); + + if (resourceManagers.isEmpty()) { + throw new ResourceManagerException("No resource manager found"); + } + return resourceManagers.get(ThreadLocalRandom.current().nextInt(resourceManagers.size())); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java index bed088a412cf5..752bb4c3c8e39 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java @@ -13,12 +13,16 @@ */ package com.facebook.presto.resourcemanager; +import com.facebook.airlift.log.Logger; import com.facebook.airlift.units.Duration; import com.facebook.drift.client.DriftClient; import com.facebook.presto.execution.ManagedQueryExecution; import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.metadata.InternalNode; import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.server.NodeStatus; import com.facebook.presto.server.ServerConfig; import com.facebook.presto.server.StatusResource; @@ -29,62 +33,80 @@ import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static com.facebook.presto.server.InternalCommunicationConfig.CommunicationProtocol.HTTP; +import static com.facebook.presto.server.InternalCommunicationConfig.CommunicationProtocol.THRIFT; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; public class ResourceManagerClusterStatusSender implements ClusterStatusSender { - private final DriftClient resourceManagerClient; + private static final Logger log = Logger.get(ResourceManagerClusterStatusSender.class); + + private final DriftClient thriftResourceManagerClient; + private final ResourceManagerClient httpClient; private final InternalNodeManager internalNodeManager; private final ResourceGroupManager resourceGroupManager; private final Supplier statusSupplier; private final ScheduledExecutorService executor; private final Duration queryHeartbeatInterval; + private final InternalCommunicationConfig.CommunicationProtocol communicationProtocol; private final Map queries = new ConcurrentHashMap<>(); private final PeriodicTaskExecutor nodeHeartbeatSender; private final Optional resourceRuntimeHeartbeatSender; + ConcurrentMap nodeHeartbeatInFlight = new ConcurrentHashMap<>(); + ConcurrentMap resourceGroupHeartbeatInFlight = new ConcurrentHashMap<>(); + @Inject public ResourceManagerClusterStatusSender( - @ForResourceManager DriftClient resourceManagerClient, + @ForResourceManager DriftClient thriftResourceManagerClientProvider, + ResourceManagerClient httpClientProvider, InternalNodeManager internalNodeManager, StatusResource statusResource, @ForResourceManager ScheduledExecutorService executor, ResourceManagerConfig resourceManagerConfig, ServerConfig serverConfig, + InternalCommunicationConfig internalCommunicationConfig, ResourceGroupManager resourceGroupManager) { this( - resourceManagerClient, + thriftResourceManagerClientProvider, + httpClientProvider, internalNodeManager, requireNonNull(statusResource, "statusResource is null")::getStatus, executor, resourceManagerConfig, serverConfig, + internalCommunicationConfig, resourceGroupManager); } public ResourceManagerClusterStatusSender( - DriftClient resourceManagerClient, + DriftClient thriftResourceManagerClient, + ResourceManagerClient httpClient, InternalNodeManager internalNodeManager, Supplier statusResource, ScheduledExecutorService executor, ResourceManagerConfig resourceManagerConfig, ServerConfig serverConfig, + InternalCommunicationConfig internalCommunicationConfig, ResourceGroupManager resourceGroupManager) { - this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerService is null"); + this.communicationProtocol = internalCommunicationConfig.getResourceManagerCommunicationProtocol(); this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null"); this.statusSupplier = requireNonNull(statusResource, "statusResource is null"); this.executor = requireNonNull(executor, "executor is null"); @@ -93,6 +115,8 @@ public ResourceManagerClusterStatusSender( this.resourceRuntimeHeartbeatSender = serverConfig.isCoordinator() ? Optional.of( new PeriodicTaskExecutor(resourceManagerConfig.getResourceGroupRuntimeHeartbeatInterval().toMillis(), executor, this::sendResourceGroupRuntimeHeartbeat)) : Optional.empty(); this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null"); + this.thriftResourceManagerClient = (communicationProtocol == THRIFT) ? requireNonNull(thriftResourceManagerClient, "thriftResourceManagerClient is null") : thriftResourceManagerClient; + this.httpClient = (communicationProtocol == HTTP) ? requireNonNull(httpClient, "httpClient is null") : httpClient; } @PostConstruct @@ -122,10 +146,12 @@ public void registerQuery(ManagedQueryExecution queryExecution) QueryId queryId = queryExecution.getBasicQueryInfo().getQueryId(); queries.computeIfAbsent(queryId, unused -> { AtomicLong sequenceId = new AtomicLong(); + ConcurrentMap inFlight = new ConcurrentHashMap<>(); + PeriodicTaskExecutor taskExecutor = new PeriodicTaskExecutor( queryHeartbeatInterval.toMillis(), executor, - () -> sendQueryHeartbeat(queryExecution, sequenceId.incrementAndGet())); + () -> sendQueryHeartbeat(queryExecution, sequenceId.incrementAndGet(), inFlight)); taskExecutor.start(); return taskExecutor; }); @@ -140,21 +166,90 @@ public void registerQuery(ManagedQueryExecution queryExecution) }); } - private void sendQueryHeartbeat(ManagedQueryExecution queryExecution, long sequenceId) + private void sendQueryHeartbeat(ManagedQueryExecution queryExecution, long sequenceId, + ConcurrentMap inFlightMap) { BasicQueryInfo basicQueryInfo = queryExecution.getBasicQueryInfo(); String nodeIdentifier = internalNodeManager.getCurrentNode().getNodeIdentifier(); - getResourceManagers().forEach(hostAndPort -> - resourceManagerClient.get(Optional.of(hostAndPort.toString())).queryHeartbeat(nodeIdentifier, basicQueryInfo, sequenceId)); + + if (communicationProtocol == HTTP) { + getHttpResourceManagers().forEach(uri -> { + AtomicBoolean inFlight = inFlightMap.computeIfAbsent(uri, ignored -> new AtomicBoolean(false)); + if (!inFlight.compareAndSet(false, true)) { + return; + } + try { + httpClient.queryHeartbeat(Optional.of(uri), nodeIdentifier, basicQueryInfo, sequenceId); + } + catch (Exception e) { + log.error(e, "Failed to send query heartbeat to resource manager at %s for query %s", + uri, basicQueryInfo.getQueryId()); + } + finally { + inFlight.set(false); + } + }); + } + else { + getThriftResourceManagers().forEach(hostAndPort -> + thriftResourceManagerClient.get(Optional.of(hostAndPort.toString())).queryHeartbeat(nodeIdentifier, basicQueryInfo, sequenceId)); + } } private void sendNodeHeartbeat() { - getResourceManagers().forEach(hostAndPort -> - resourceManagerClient.get(Optional.of(hostAndPort.toString())).nodeHeartbeat(statusSupplier.get())); + NodeStatus nodeStatus = statusSupplier.get(); + if (communicationProtocol == HTTP) { + getHttpResourceManagers().forEach(uri -> { + AtomicBoolean inFlight = nodeHeartbeatInFlight.computeIfAbsent(uri, ignored -> new AtomicBoolean(false)); + if (!inFlight.compareAndSet(false, true)) { + return; + } + try { + httpClient.nodeHeartbeat(Optional.of(uri), nodeStatus); + } + catch (Exception e) { + log.error(e, "Failed to send node heartbeat to resource manager at %s", uri); + } + finally { + inFlight.set(false); + } + }); + } + else { + getThriftResourceManagers().forEach(hostAndPort -> + thriftResourceManagerClient.get(Optional.of(hostAndPort.toString())).nodeHeartbeat(nodeStatus)); + } + } + + public void sendResourceGroupRuntimeHeartbeat() + { + List resourceGroupRuntimeInfos = resourceGroupManager.getResourceGroupRuntimeInfos(); + + if (communicationProtocol == HTTP) { + getHttpResourceManagers().forEach(uri -> { + AtomicBoolean inFlight = nodeHeartbeatInFlight.computeIfAbsent(uri, ignored -> new AtomicBoolean(false)); + if (!inFlight.compareAndSet(false, true)) { + return; + } + try { + httpClient.resourceGroupRuntimeHeartbeat(Optional.of(uri), internalNodeManager.getCurrentNode().getNodeIdentifier(), resourceGroupRuntimeInfos); + } + catch (Exception e) { + log.error(e, "Failed to send node heartbeat to resource manager at %s", uri); + } + finally { + inFlight.set(false); + } + }); + } + else { + getThriftResourceManagers().forEach(hostAndPort -> + thriftResourceManagerClient.get(Optional.of(hostAndPort.toString())).resourceGroupRuntimeHeartbeat(internalNodeManager.getCurrentNode().getNodeIdentifier(), resourceGroupRuntimeInfos)); + } } - private List getResourceManagers() + private List getThriftResourceManagers() { return internalNodeManager.getResourceManagers().stream() .filter(node -> node.getThriftPort().isPresent()) @@ -165,10 +260,10 @@ private List getResourceManagers() .collect(toImmutableList()); } - public void sendResourceGroupRuntimeHeartbeat() + private List getHttpResourceManagers() { - List resourceGroupRuntimeInfos = resourceGroupManager.getResourceGroupRuntimeInfos(); - getResourceManagers().forEach(hostAndPort -> - resourceManagerClient.get(Optional.of(hostAndPort.toString())).resourceGroupRuntimeHeartbeat(internalNodeManager.getCurrentNode().getNodeIdentifier(), resourceGroupRuntimeInfos)); + return internalNodeManager.getResourceManagers().stream() + .map(InternalNode::getInternalUri) + .collect(toImmutableList()); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java similarity index 74% rename from presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java rename to presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java index cb50153fb958f..72492eba683d8 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java @@ -18,15 +18,19 @@ import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; import com.facebook.presto.metadata.InternalNode; import com.facebook.presto.metadata.InternalNodeManager; +import com.facebook.presto.server.InternalCommunicationConfig; +import com.facebook.presto.server.InternalCommunicationConfig.CommunicationProtocol; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import jakarta.inject.Inject; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.function.Function; +import static com.facebook.presto.server.InternalCommunicationConfig.CommunicationProtocol.THRIFT; import static com.google.common.cache.CacheLoader.asyncReloading; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -34,22 +38,28 @@ public class ResourceManagerResourceGroupService implements ResourceGroupService { - private final DriftClient resourceManagerClient; + private final DriftClient resourceManagerClient; private final InternalNodeManager internalNodeManager; + private final ResourceManagerClient httpResourceManagerClient; + private final CommunicationProtocol communicationMode; private final Function> cache; private final Executor executor = Executors.newCachedThreadPool(); private final Boolean resourceGroupServiceCacheEnable; @Inject public ResourceManagerResourceGroupService( - @ForResourceManager DriftClient resourceManagerClient, + @ForResourceManager DriftClient resourceManagerClient, + ResourceManagerClient httpResourceManagerClient, ResourceManagerConfig resourceManagerConfig, + InternalCommunicationConfig internalCommunicationConfig, InternalNodeManager internalNodeManager) { - this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerService is null"); + this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerDriftClient is null"); + this.httpResourceManagerClient = requireNonNull(httpResourceManagerClient, "resourceManagerHttpClient is null"); this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null"); Duration cacheExpireDuration = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getResourceGroupServiceCacheExpireInterval(); Duration cacheRefreshDuration = resourceManagerConfig.getResourceGroupServiceCacheRefreshInterval(); + this.communicationMode = internalCommunicationConfig.getResourceManagerCommunicationProtocol(); resourceGroupServiceCacheEnable = resourceManagerConfig.getResourceGroupServiceCacheEnabled(); if (resourceGroupServiceCacheEnable) { this.cache = CacheBuilder.newBuilder() @@ -81,6 +91,7 @@ public List getResourceGroupInfo() private List getResourceGroupInfos(InternalNode internalNode) throws ResourceManagerInconsistentException { - return resourceManagerClient.get().getResourceGroupInfo(internalNode.getNodeIdentifier()); + return communicationMode == THRIFT ? resourceManagerClient.get().getResourceGroupInfo(internalNode.getNodeIdentifier()) + : httpResourceManagerClient.getResourceGroupInfo(Optional.empty(), internalNode.getNodeIdentifier()); } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/CatalogServerModule.java b/presto-main/src/main/java/com/facebook/presto/server/CatalogServerModule.java index 3149443421ec5..d21506723f522 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/CatalogServerModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/CatalogServerModule.java @@ -22,6 +22,7 @@ import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; import com.facebook.presto.failureDetector.FailureDetectorModule; import com.facebook.presto.metadata.CatalogManager; +import com.facebook.presto.resourcemanager.ForResourceManager; import com.facebook.presto.transaction.ForTransactionManager; import com.facebook.presto.transaction.InMemoryTransactionManager; import com.facebook.presto.transaction.TransactionManager; @@ -37,6 +38,7 @@ import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.configuration.ConditionalModule.installModuleIf; import static com.facebook.airlift.discovery.client.DiscoveryBinder.discoveryBinder; +import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder; import static com.facebook.drift.server.guice.DriftServerBinder.driftServerBinder; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; @@ -67,6 +69,8 @@ protected void setup(Binder binder) driftServerBinder(binder).bindService(CatalogServer.class); binder.bind(NodeResourceStatusProvider.class).toInstance(() -> true); + + httpClientBinder(binder).bindHttpClient("resourceManager", ForResourceManager.class); } @Provides diff --git a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java index 8109e34287b36..add78041b42ec 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java @@ -57,6 +57,7 @@ import com.facebook.presto.execution.TaskManagerConfig; import com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager; import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; import com.facebook.presto.execution.scheduler.AdaptivePhasedExecutionPolicy; import com.facebook.presto.execution.scheduler.AllAtOnceExecutionPolicy; import com.facebook.presto.execution.scheduler.ExecutionPolicy; @@ -90,7 +91,9 @@ import com.facebook.presto.server.remotetask.ReactorNettyHttpClient; import com.facebook.presto.server.remotetask.ReactorNettyHttpClientConfig; import com.facebook.presto.server.remotetask.RemoteTaskStats; +import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; import com.facebook.presto.spi.memory.ClusterMemoryPoolManager; +import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.spi.security.SelectedRole; import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.analyzer.QueryExplainer; @@ -185,6 +188,10 @@ protected void setup(Binder binder) jsonCodecBinder(binder).bindJsonCodec(TaskInfo.class); jsonCodecBinder(binder).bindJsonCodec(QueryResults.class); jsonCodecBinder(binder).bindJsonCodec(SelectedRole.class); + jsonCodecBinder(binder).bindJsonCodec(NodeStatus.class); + jsonCodecBinder(binder).bindJsonCodec(BasicQueryInfo.class); + jsonCodecBinder(binder).bindListJsonCodec(ResourceGroupRuntimeInfo.class); + jsonCodecBinder(binder).bindMapJsonCodec(MemoryPoolId.class, ClusterMemoryPoolInfo.class); jaxrsBinder(binder).bind(QueuedStatementResource.class); jaxrsBinder(binder).bind(ExecutingStatementResource.class); binder.bind(StatementHttpExecutionMBean.class).in(Scopes.SINGLETON); @@ -342,11 +349,11 @@ protected void setup(Binder binder) binder.bind(NodeResourceStatusProvider.class).to(NodeResourceStatus.class).in(Scopes.SINGLETON); newOptionalBinder(binder, ResourceManagerProxy.class); + httpClientBinder(binder).bindHttpClient("resourceManager", ForResourceManager.class); install(installModuleIf( ServerConfig.class, ServerConfig::isResourceManagerEnabled, rmBinder -> { - httpClientBinder(rmBinder).bindHttpClient("resourceManager", ForResourceManager.class); rmBinder.bind(ResourceManagerProxy.class).in(Scopes.SINGLETON); })); diff --git a/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerHeartbeatResource.java b/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerHeartbeatResource.java deleted file mode 100644 index 660429d1dc606..0000000000000 --- a/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerHeartbeatResource.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.server; - -import com.facebook.presto.resourcemanager.ForResourceManager; -import com.facebook.presto.resourcemanager.ResourceManagerClusterStateProvider; -import com.google.common.util.concurrent.ListeningExecutorService; -import jakarta.inject.Inject; -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.PUT; -import jakarta.ws.rs.Path; - -import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; -import static java.util.Objects.requireNonNull; - -@Path("/v1/heartbeat") -public class ResourceManagerHeartbeatResource -{ - private final ResourceManagerClusterStateProvider clusterStateProvider; - private final ListeningExecutorService executor; - - @Inject - public ResourceManagerHeartbeatResource( - ResourceManagerClusterStateProvider clusterStateProvider, - @ForResourceManager ListeningExecutorService executor) - { - this.clusterStateProvider = requireNonNull(clusterStateProvider, "clusterStateProvider is null"); - this.executor = executor; - } - - /** - * Registers a node heartbeat with the resource manager. - */ - @PUT - @Consumes(APPLICATION_JSON) - public void nodeHeartbeat(NodeStatus nodeStatus) - { - executor.execute(() -> clusterStateProvider.registerNodeHeartbeat(nodeStatus)); - } -} diff --git a/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerModule.java b/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerModule.java index 6109c99d70113..1db7473f168a3 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerModule.java @@ -23,6 +23,7 @@ import com.facebook.presto.execution.QueryManager; import com.facebook.presto.execution.resourceGroups.NoOpResourceGroupManager; import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; import com.facebook.presto.failureDetector.FailureDetectorModule; import com.facebook.presto.resourcemanager.DistributedClusterStatsResource; import com.facebook.presto.resourcemanager.DistributedQueryInfoResource; @@ -34,7 +35,9 @@ import com.facebook.presto.resourcemanager.RatisServer; import com.facebook.presto.resourcemanager.ResourceManagerClusterStateProvider; import com.facebook.presto.resourcemanager.ResourceManagerProxy; -import com.facebook.presto.resourcemanager.ResourceManagerServer; +import com.facebook.presto.resourcemanager.thrift.ResourceManagerServer; +import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; +import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.transaction.NoOpTransactionManager; import com.facebook.presto.transaction.TransactionManager; import com.google.inject.Binder; @@ -91,6 +94,10 @@ protected void setup(Binder binder) jsonCodecBinder(binder).bindListJsonCodec(QueryStateInfo.class); jsonCodecBinder(binder).bindJsonCodec(ResourceGroupInfo.class); jsonCodecBinder(binder).bindListJsonCodec(ResourceGroupInfo.class); + jsonCodecBinder(binder).bindJsonCodec(NodeStatus.class); + jsonCodecBinder(binder).bindJsonCodec(BasicQueryInfo.class); + jsonCodecBinder(binder).bindListJsonCodec(ResourceGroupRuntimeInfo.class); + jsonCodecBinder(binder).bindMapJsonCodec(MemoryPoolId.class, ClusterMemoryPoolInfo.class); binder.bind(TransactionManager.class).to(NoOpTransactionManager.class); diff --git a/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerResource.java b/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerResource.java new file mode 100644 index 0000000000000..b61bd5c50760d --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerResource.java @@ -0,0 +1,181 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.resourcemanager.ForResourceManager; +import com.facebook.presto.resourcemanager.ResourceManagerClusterStateProvider; +import com.facebook.presto.resourcemanager.ResourceManagerException; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListeningExecutorService; +import jakarta.inject.Inject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.HeaderParam; +import jakarta.ws.rs.PATCH; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.Suspended; +import jakarta.ws.rs.core.Response; + +import java.util.List; + +import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; +import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST; +import static java.util.Objects.requireNonNull; + +@Path("/v1/resource-manager") +public class ResourceManagerResource +{ + private static final Logger LOG = Logger.get(ResourceGroupRuntimeInfo.class); + private final ResourceManagerClusterStateProvider clusterStateProvider; + private final ListeningExecutorService executor; + + @Inject + public ResourceManagerResource( + ResourceManagerClusterStateProvider clusterStateProvider, + @ForResourceManager ListeningExecutorService executor) + { + this.clusterStateProvider = requireNonNull(clusterStateProvider, "clusterStateProvider is null"); + this.executor = executor; + } + + /** + * Registers a node heartbeat with the resource manager. + */ + @PATCH + @Consumes(APPLICATION_JSON) + @Path("/node/{nodeId}") + public void patchNodeHeartbeat(@PathParam("nodeId") String nodeId, NodeStatus nodeStatus) + { + if (!nodeId.equals(nodeStatus.getNodeId())) { + throw new WebApplicationException( + Response.status(BAD_REQUEST) + .type(APPLICATION_JSON) + .entity(ImmutableMap.of( + "error", "nodeId mismatch", + "pathNodeId", nodeId, + "bodyNodeId", nodeStatus.getNodeId())) + .build()); + } + executor.execute(() -> clusterStateProvider.registerNodeHeartbeat(nodeStatus)); + } + + /** + * This method registers a heartbeat to the resource manager. A query heartbeat is used for the following purposes: + * + * 1) Inform resource managers about current resource group utilization. + * 2) Inform resource managers about current running queries. + * 3) Inform resource managers about coordinator status and health. + */ + @PATCH + @Consumes(APPLICATION_JSON) + @Path("/nodes/{nodeId}/queries/{queryId}") + public void patchQueryHeartbeat( + @PathParam("nodeId") String nodeId, + @PathParam("queryId") String queryId, + @HeaderParam("Sequence-Id") long sequenceId, + BasicQueryInfo basicQueryInfo) + { + if (!queryId.equals(basicQueryInfo.getQueryId().toString())) { + throw new WebApplicationException( + Response.status(BAD_REQUEST) + .type(APPLICATION_JSON) + .entity(ImmutableMap.of( + "error", "nodeId mismatch", + "pathQueryId", nodeId, + "bodyQueryId", basicQueryInfo.getQueryId())) + .build()); + } + executor.execute(() -> { + clusterStateProvider.registerQueryHeartbeat(nodeId, basicQueryInfo, sequenceId); + }); + } + + /** + * Returns the resource group information across all clusters except for {@code excludingNode}, which is excluded + * to prevent redundancy with local resource group information. + */ + @GET + @Produces(APPLICATION_JSON) + @Path("/resource-groups") + public void getResourceGroupInfo(@QueryParam("excludingNode") String excludingNode, @Suspended AsyncResponse async) + { + executor.execute(() -> { + try { + async.resume(clusterStateProvider.getClusterResourceGroups(excludingNode)); + } + catch (Throwable t) { + async.resume(Response.serverError() + .entity(ImmutableMap.of("error", t.getMessage())) + .type(APPLICATION_JSON) + .build()); + } + }); + } + + @GET + @Produces(APPLICATION_JSON) + @Path("/memory-pools") + public void getMemoryPoolInfo(@Suspended AsyncResponse async) + { + executor.execute(() -> { + try { + async.resume(clusterStateProvider.getClusterMemoryPoolInfo()); + } + catch (Throwable t) { + async.resume(Response.serverError() + .entity(ImmutableMap.of("error", t.getMessage())) + .type(APPLICATION_JSON) + .build()); + } + }); + } + + @PATCH + @Consumes(APPLICATION_JSON) + @Path("/nodes/{nodeId}/resource-groups") + public void putResourceGroupRuntimeInfo(@PathParam("nodeId") String node, List resourceGroupRuntimeInfos) + { + executor.execute(() -> clusterStateProvider.registerResourceGroupRuntimeHeartbeat(node, resourceGroupRuntimeInfos)); + } + + @GET + @Produces(APPLICATION_JSON) + @Path("/tasks/count") + public void getTaskCount(@QueryParam("state") String state, @Suspended AsyncResponse async) + { + executor.execute(() -> { + try { + if (state.equals("running")) { + async.resume(clusterStateProvider.getRunningTaskCount()); + } + else { + throw new ResourceManagerException("Invalid task count state requested"); + } + } + catch (Throwable t) { + async.resume(Response.serverError() + .entity(ImmutableMap.of("error", t.getMessage())) + .type(APPLICATION_JSON) + .build()); + } + }); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index f54a113c0a917..628af8ec3b258 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -147,6 +147,7 @@ import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.resourcemanager.ClusterStatusSender; import com.facebook.presto.resourcemanager.ForResourceManager; +import com.facebook.presto.resourcemanager.HttpResourceManagerClient; import com.facebook.presto.resourcemanager.NoopResourceGroupService; import com.facebook.presto.resourcemanager.RaftConfig; import com.facebook.presto.resourcemanager.RandomResourceManagerAddressSelector; @@ -474,7 +475,7 @@ else if (serverConfig.isCoordinator()) { binder.bind(RandomResourceManagerAddressSelector.class).in(Scopes.SINGLETON); driftClientBinder(binder) - .bindDriftClient(ResourceManagerClient.class, ForResourceManager.class) + .bindDriftClient(com.facebook.presto.resourcemanager.thrift.ResourceManagerClient.class, ForResourceManager.class) .withAddressSelector((addressSelectorBinder, annotation, prefix) -> addressSelectorBinder.bind(AddressSelector.class).annotatedWith(annotation).to(RandomResourceManagerAddressSelector.class)) .withExceptionClassifier(throwable -> { @@ -500,12 +501,14 @@ else if (serverConfig.isCoordinator()) { @Override public void configure(Binder moduleBinder) { + binder.bind(ResourceManagerClient.class).to(HttpResourceManagerClient.class).in(Scopes.SINGLETON); configBinder(moduleBinder).bindConfig(ResourceManagerConfig.class); - // HTTP endpoint for some of ResourceManagerServer methods. ResourceManagerConfig resourceManagerConfig = buildConfigObject(ResourceManagerConfig.class); - if (resourceManagerConfig.getHeartbeatHttpEnabled()) { - jaxrsBinder(moduleBinder).bind(ResourceManagerHeartbeatResource.class); + + if (serverConfig.isResourceManager() && resourceManagerConfig.getHttpServerEnabled()) { + jaxrsBinder(moduleBinder).bind(ResourceManagerResource.class); } + moduleBinder.bind(ClusterStatusSender.class).to(ResourceManagerClusterStatusSender.class).in(Scopes.SINGLETON); if (serverConfig.isCoordinator()) { moduleBinder.bind(ClusterMemoryManagerService.class).in(Scopes.SINGLETON); diff --git a/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java b/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java index 8b007302291a1..dc34a964d25f6 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java @@ -16,10 +16,14 @@ import com.facebook.presto.execution.QueryManager; import com.facebook.presto.execution.resourceGroups.NoOpResourceGroupManager; import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; import com.facebook.presto.failureDetector.FailureDetector; import com.facebook.presto.failureDetector.NoOpFailureDetector; import com.facebook.presto.memory.HighMemoryTaskKiller; import com.facebook.presto.memory.LowMemoryMonitor; +import com.facebook.presto.resourcemanager.ForResourceManager; +import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; +import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.transaction.NoOpTransactionManager; import com.facebook.presto.transaction.TransactionManager; import com.google.inject.Binder; @@ -28,6 +32,8 @@ import com.google.inject.Scopes; import jakarta.inject.Singleton; +import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder; +import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; import static com.google.common.reflect.Reflection.newProxy; public class WorkerModule @@ -57,9 +63,16 @@ public void configure(Binder binder) return true; })); + jsonCodecBinder(binder).bindJsonCodec(NodeStatus.class); + jsonCodecBinder(binder).bindJsonCodec(BasicQueryInfo.class); + jsonCodecBinder(binder).bindListJsonCodec(ResourceGroupRuntimeInfo.class); + jsonCodecBinder(binder).bindMapJsonCodec(MemoryPoolId.class, ClusterMemoryPoolInfo.class); + binder.bind(LowMemoryMonitor.class).in(Scopes.SINGLETON); binder.bind(HighMemoryTaskKiller.class).in(Scopes.SINGLETON); + + httpClientBinder(binder).bindHttpClient("resourceManager", ForResourceManager.class); } @Provides diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestHttpResourceManagerClient.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestHttpResourceManagerClient.java new file mode 100644 index 0000000000000..ad0e0c41dae8f --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestHttpResourceManagerClient.java @@ -0,0 +1,317 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.event.client.EventClient; +import com.facebook.airlift.event.client.NullEventClient; +import com.facebook.airlift.http.server.testing.TestingHttpServer; +import com.facebook.airlift.http.server.testing.TestingHttpServerModule; +import com.facebook.airlift.jaxrs.JaxrsModule; +import com.facebook.airlift.json.JsonModule; +import com.facebook.airlift.node.NodeConfig; +import com.facebook.airlift.node.NodeInfo; +import com.facebook.airlift.units.DataSize; +import com.facebook.airlift.units.Duration; +import com.facebook.presto.SystemSessionProperties; +import com.facebook.presto.client.NodeVersion; +import com.facebook.presto.execution.QueryState; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.memory.MemoryInfo; +import com.facebook.presto.metadata.InMemoryNodeManager; +import com.facebook.presto.metadata.InternalNode; +import com.facebook.presto.metadata.InternalNodeManager; +import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.BasicQueryStats; +import com.facebook.presto.server.NodeStatus; +import com.facebook.presto.server.ResourceManagerResource; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.QueryId; +import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; +import com.facebook.presto.spi.memory.MemoryPoolId; +import com.facebook.presto.spi.resourceGroups.ResourceGroupId; +import com.facebook.presto.spiller.NodeSpillConfig; +import com.facebook.presto.sql.analyzer.JavaFeaturesConfig; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import jakarta.inject.Singleton; +import org.joda.time.DateTime; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.concurrent.ScheduledExecutorService; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder; +import static com.facebook.airlift.jaxrs.JaxrsBinder.jaxrsBinder; +import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE; +import static com.facebook.presto.SessionTestUtils.TEST_SESSION; +import static com.facebook.presto.execution.QueryState.RUNNING; +import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL; +import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager; +import static com.facebook.presto.operator.BlockedReason.WAITING_FOR_MEMORY; +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.testng.Assert.assertEquals; + +@Test(singleThreaded = true) +public class TestHttpResourceManagerClient +{ + HttpResourceManagerClient testHttpClient; + ResourceManagerClusterStateProvider clusterStateProvider; + private ListeningExecutorService executor; + private ScheduledExecutorService scheduler; + private TestingHttpServer server; + + @BeforeMethod + public void setUp() throws Exception + { + executor = listeningDecorator(newDirectExecutorService()); + scheduler = newSingleThreadScheduledExecutor(); + Bootstrap app = new Bootstrap( + new JsonModule(), + new JaxrsModule(), + new TestingHttpServerModule(8080), + new Module() { + @Override + public void configure(Binder binder) + { + binder.bind(EventClient.class).to(NullEventClient.class); + jsonCodecBinder(binder).bindJsonCodec(NodeStatus.class); + jsonCodecBinder(binder).bindJsonCodec(BasicQueryInfo.class); + jsonCodecBinder(binder).bindListJsonCodec(ResourceGroupRuntimeInfo.class); + jsonCodecBinder(binder).bindMapJsonCodec(MemoryPoolId.class, ClusterMemoryPoolInfo.class); + binder.bind(ResourceManagerResource.class).in(Scopes.SINGLETON); + binder.bind(ResourceManagerClient.class).to(HttpResourceManagerClient.class).in(Scopes.SINGLETON); + jaxrsBinder(binder).bind(ResourceManagerResource.class); + configBinder(binder).bindConfigDefaults(NodeConfig.class, config -> { + config.setEnvironment("test"); + }); + configBinder(binder).bindConfigDefaults(ResourceManagerConfig.class, config -> { + config.setNodeHeartbeatInterval(new Duration(10, SECONDS)); + config.setResourceGroupRuntimeHeartbeatInterval(new Duration(10, SECONDS)); + config.setResourceGroupRuntimeInfoTimeout(new Duration(10, SECONDS)); + }); + configBinder(binder).bindConfig(NodeConfig.class); + binder.bind(NodeInfo.class).in(Scopes.SINGLETON); + httpClientBinder(binder).bindHttpClient("resourceManager", ForResourceManager.class); + } + + @Provides + @Singleton + InternalNodeManager providesInternalNodeManager() + { + InMemoryNodeManager manager = new InMemoryNodeManager(); + manager.addNode(new ConnectorId("temp"), new InternalNode("rm", URI.create("http://127.0.0.1:8080/"), new NodeVersion("1"), false, true, false, false)); + manager.addNode(new ConnectorId("one"), new InternalNode("coordinator", URI.create("http://fake.invalid/"), new NodeVersion("1"), true, false, false, false)); + return manager; + } + + @Provides + @Singleton + SessionPropertyManager provideSessionPropertyManager() + { + return createTestingSessionPropertyManager( + new SystemSessionProperties().getSessionProperties(), + new JavaFeaturesConfig(), + new NodeSpillConfig()); + } + + @Provides + @Singleton + @ForResourceManager + ListeningExecutorService provideListeningExecutorService() + { + return executor; + } + + @Provides + @Singleton + ResourceManagerClusterStateProvider provideClusterStateProvider( + InternalNodeManager nodeManager, + SessionPropertyManager sessionPropertyManager) + { + return new ResourceManagerClusterStateProvider( + nodeManager, + sessionPropertyManager, + 10, + Duration.valueOf("4s"), + Duration.valueOf("8s"), + Duration.valueOf("5s"), + Duration.valueOf("0s"), + Duration.valueOf("4s"), + true, + scheduler); + } + @Provides + @Singleton + @ForResourceManager + ScheduledExecutorService provideScheduledExecutorService() + { + return scheduler; + } + }); + + Injector injector = app + .initialize(); + + server = injector.getInstance(TestingHttpServer.class); + server.start(); + + testHttpClient = (HttpResourceManagerClient) injector.getInstance(ResourceManagerClient.class); + clusterStateProvider = injector.getInstance(ResourceManagerClusterStateProvider.class); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws Exception + { + server.stop(); + executor.shutdownNow(); + scheduler.shutdownNow(); + } + + @Test + public void testQueryHeartbeat() + { + testHttpClient.queryHeartbeat(Optional.empty(), "coordinator", createTestQueryInfo("temp", RUNNING), 1); + assertEquals(clusterStateProvider.getClusterQueries().size(), 1); + } + + @Test + public void testNodeHeartbeat() + { + testHttpClient.nodeHeartbeat(Optional.empty(), createTestNodeStatus("node")); + assertEquals(clusterStateProvider.getWorkerMemoryInfo().size(), 1); + } + + @Test + public void testResourceGroupRuntime() + { + List runtimeInfo = ImmutableList.of( + ResourceGroupRuntimeInfo.builder(new ResourceGroupId("test-group")) + .addRunningQueries(2) + .build()); + testHttpClient.nodeHeartbeat(Optional.empty(), createTestNodeStatus("temp")); + testHttpClient.resourceGroupRuntimeHeartbeat(Optional.empty(), "temp", runtimeInfo); + Map map = clusterStateProvider.getResourceGroupStates(); + assertEquals(map.size(), 1); + } + + @Test + public void testGetResourceGroupInfo() + { + testHttpClient.nodeHeartbeat(Optional.empty(), createTestNodeStatus("coordinator")); + testHttpClient.queryHeartbeat(Optional.empty(), "coordinator", createTestQueryInfo("temp", RUNNING), 1); + + List runtimeInfo = ImmutableList.of( + ResourceGroupRuntimeInfo.builder(new ResourceGroupId("test-group")) + .addRunningQueries(2) + .build()); + testHttpClient.resourceGroupRuntimeHeartbeat(Optional.empty(), "coordinator", runtimeInfo); + + List result = testHttpClient.getResourceGroupInfo(Optional.empty(), "temp"); + assertEquals(result.size(), 1); + assertEquals(result.get(0).getResourceGroupId(), new ResourceGroupId("test-group")); + } + + @Test + public void testGetMemoryPools() + { + Map result = testHttpClient.getMemoryPoolInfo(Optional.empty()); + assertEquals(result.size(), 2); + } + + @Test + public void testGetRunningTaskCount() + { + testHttpClient.queryHeartbeat(Optional.empty(), "coordinator", createTestQueryInfo("temp", RUNNING), 1); + int result = testHttpClient.getRunningTaskCount(Optional.empty()); + assertEquals(result, 1); + } + + private static BasicQueryInfo createTestQueryInfo(String queryId, QueryState state) + { + return new BasicQueryInfo( + new QueryId(queryId), + TEST_SESSION.toSessionRepresentation(), + Optional.of(new ResourceGroupId("test-group")), + state, + GENERAL_POOL, + true, + URI.create("http://localhost"), + "", + new BasicQueryStats( + DateTime.now().getMillis(), + DateTime.now().getMillis(), + new Duration(1, SECONDS), + new Duration(1, SECONDS), + new Duration(1, SECONDS), + new Duration(1, SECONDS), + new Duration(1, SECONDS), + 1, 1, 1, 1, 1, 100, 1, 1, 1, 100, 1, 1, 1, 100, + new DataSize(1, MEGABYTE), + 1, 1, 1, + new DataSize(1, MEGABYTE), + new DataSize(1, MEGABYTE), + new DataSize(1, MEGABYTE), + new DataSize(1, MEGABYTE), + new DataSize(1, MEGABYTE), + new DataSize(1, MEGABYTE), + new Duration(1, SECONDS), + new Duration(1, SECONDS), + true, + ImmutableSet.of(WAITING_FOR_MEMORY), + new DataSize(1, MEGABYTE), + OptionalDouble.of(1.0)), + null, + Optional.empty(), + ImmutableList.of(), + Optional.empty()); + } + + private static NodeStatus createTestNodeStatus(String nodeId) + { + return new NodeStatus(nodeId, + new NodeVersion("1"), + "test", + true, + Duration.valueOf("1ms"), + "loc", + "loc", + new MemoryInfo(new DataSize(1, MEGABYTE), ImmutableMap.of()), + 1, + 1, + 1, + 1, + 1, + 1); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java index d0ba55a09151d..7487efeee6206 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java @@ -21,6 +21,7 @@ import com.facebook.presto.memory.MemoryInfo; import com.facebook.presto.metadata.InMemoryNodeManager; import com.facebook.presto.metadata.InternalNode; +import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.server.NodeStatus; import com.facebook.presto.server.ServerConfig; import com.facebook.presto.spi.ConnectorId; @@ -83,6 +84,7 @@ public void setup() sender = new ResourceManagerClusterStatusSender( (addressSelectionContext, headers) -> resourceManagerClient, + null, nodeManager, () -> NODE_STATUS, newSingleThreadScheduledExecutor(), @@ -90,6 +92,7 @@ public void setup() .setNodeHeartbeatInterval(new Duration(HEARTBEAT_INTERVAL, MILLISECONDS)) .setQueryHeartbeatInterval(new Duration(HEARTBEAT_INTERVAL, MILLISECONDS)), new ServerConfig().setCoordinator(false), + new InternalCommunicationConfig(), new NoOpResourceGroupManager()); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java similarity index 87% rename from presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java rename to presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java index d668f6a221c96..0e70cb79bc726 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java @@ -15,6 +15,7 @@ import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; import com.facebook.presto.metadata.InMemoryNodeManager; +import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.spi.resourceGroups.ResourceGroupId; import com.google.common.collect.ImmutableList; import org.testng.annotations.Test; @@ -35,8 +36,10 @@ public void testGetResourceGroupInfo() { TestingResourceManagerClient resourceManagerClient = new TestingResourceManagerClient(); InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig(); ResourceManagerConfig resourceManagerConfig = new ResourceManagerConfig(); - ResourceManagerResourceGroupService service = new ResourceManagerResourceGroupService((addressSelectionContext, headers) -> resourceManagerClient, resourceManagerConfig, nodeManager); + NoOpHttpResourceManagerClient client = new NoOpHttpResourceManagerClient(); + ResourceManagerResourceGroupService service = new ResourceManagerResourceGroupService((addressSelectionContext, headers) -> resourceManagerClient, client, resourceManagerConfig, internalCommunicationConfig, nodeManager); List resourceGroupInfos = service.getResourceGroupInfo(); assertNotNull(resourceGroupInfos); assertTrue(resourceGroupInfos.isEmpty()); diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestRetryFunctionality.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestRetryFunctionality.java new file mode 100644 index 0000000000000..9c1eb2c73bdbf --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestRetryFunctionality.java @@ -0,0 +1,233 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.airlift.http.client.HttpClient; +import com.facebook.airlift.http.client.Request; +import com.facebook.airlift.http.client.RequestStats; +import com.facebook.airlift.http.client.ResponseHandler; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.units.DataSize; +import com.facebook.airlift.units.Duration; +import com.facebook.presto.client.NodeVersion; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.memory.MemoryInfo; +import com.facebook.presto.metadata.InMemoryNodeManager; +import com.facebook.presto.metadata.InternalNode; +import com.facebook.presto.metadata.InternalNodeManager; +import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.NodeStatus; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; +import com.facebook.presto.spi.memory.MemoryPoolId; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +@Test(singleThreaded = true) +public class TestRetryFunctionality +{ + private ScheduledExecutorService scheduler; + private HttpResourceManagerClient client; + private FailingHttpClient failingHttpClient; + private InternalNodeManager nodeManager; + + @BeforeMethod + public void setUp() + { + scheduler = newSingleThreadScheduledExecutor(); + failingHttpClient = new FailingHttpClient(); + + InMemoryNodeManager manager = new InMemoryNodeManager(); + manager.addNode(new ConnectorId("test"), + new InternalNode("rm", URI.create("http://localhost:8080/"), new NodeVersion("1"), false, true, false, false)); + nodeManager = manager; + + client = new HttpResourceManagerClient( + failingHttpClient, + scheduler, + JsonCodec.jsonCodec(NodeStatus.class), + JsonCodec.jsonCodec(BasicQueryInfo.class), + JsonCodec.listJsonCodec(ResourceGroupRuntimeInfo.class), + JsonCodec.mapJsonCodec(MemoryPoolId.class, ClusterMemoryPoolInfo.class), + nodeManager); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() + { + scheduler.shutdownNow(); + } + + @Test + public void testRetryOnIOException() + { + failingHttpClient.setFailuresBeforeSuccess(2); + failingHttpClient.setExceptionToThrow(new IOException("Simulated network error")); + + NodeStatus nodeStatus = createTestNodeStatus("test-node"); + client.nodeHeartbeat(Optional.empty(), nodeStatus); + + assertEquals(failingHttpClient.getAttemptCount(), 3, "Expected 3 attempts (2 failures + 1 success)"); + } + + @Test + public void testRetryOnSocketTimeout() + { + failingHttpClient.setFailuresBeforeSuccess(1); + failingHttpClient.setExceptionToThrow(new SocketTimeoutException("Connection timeout")); + + NodeStatus nodeStatus = createTestNodeStatus("test-node"); + client.nodeHeartbeat(Optional.empty(), nodeStatus); + + assertEquals(failingHttpClient.getAttemptCount(), 2, "Expected 2 attempts (1 timeout + 1 success)"); + } + + @Test + public void testMaxRetriesExceeded() + { + failingHttpClient.setFailuresBeforeSuccess(10); + failingHttpClient.setExceptionToThrow(new IOException("Persistent network error")); + + NodeStatus nodeStatus = createTestNodeStatus("test-node"); + + try { + client.nodeHeartbeat(Optional.empty(), nodeStatus); + fail("Expected ResourceManagerException to be thrown"); + } + catch (Exception e) { + e.printStackTrace(); + assertTrue(failingHttpClient.getAttemptCount() >= 6, "Expected at least 6 attempts, got: " + failingHttpClient.getAttemptCount()); + } + } + + @Test + public void testNoRetryOnNonIOException() + { + failingHttpClient.setFailuresBeforeSuccess(10); + failingHttpClient.setExceptionToThrow(new IllegalArgumentException("Invalid argument")); + + NodeStatus nodeStatus = createTestNodeStatus("test-node"); + try { + client.nodeHeartbeat(Optional.empty(), nodeStatus); + fail("Expected exception to be thrown"); + } + catch (Exception e) { + assertEquals(failingHttpClient.getAttemptCount(), 1, "Should not retry on non-IO exceptions"); + } + } + + @Test + public void testImmediateSuccess() + { + failingHttpClient.setFailuresBeforeSuccess(0); + + NodeStatus nodeStatus = createTestNodeStatus("test-node"); + client.nodeHeartbeat(Optional.empty(), nodeStatus); + + assertEquals(failingHttpClient.getAttemptCount(), 1, "Expected 1 attempt (immediate success)"); + } + + private static NodeStatus createTestNodeStatus(String nodeId) + { + return new NodeStatus( + nodeId, + new NodeVersion("1"), + "test", + true, + Duration.valueOf("1ms"), + "loc", + "loc", + new MemoryInfo(new DataSize(1, MEGABYTE), ImmutableMap.of()), + 1, 1, 1, 1, 1, 1); + } + + private static class FailingHttpClient + implements HttpClient + { + private final AtomicInteger attemptCount = new AtomicInteger(0); + private volatile int failuresBeforeSuccess; + private volatile Exception exceptionToThrow = new IOException("Default error"); + + public void setFailuresBeforeSuccess(int failures) + { + this.failuresBeforeSuccess = failures; + this.attemptCount.set(0); + } + + public void setExceptionToThrow(Exception exception) + { + this.exceptionToThrow = exception; + } + + public int getAttemptCount() + { + return attemptCount.get(); + } + + @Override + public T execute(Request request, ResponseHandler responseHandler) + throws E + { + int attempt = attemptCount.incrementAndGet(); + + if (attempt <= failuresBeforeSuccess) { + throw (E) exceptionToThrow; + } + + return null; + } + + @Override + public HttpClient.HttpResponseFuture executeAsync(Request request, ResponseHandler responseHandler) + { + return null; + } + + @Override + public RequestStats getStats() + { + return null; + } + + @Override + public long getMaxContentLength() + { + return 0; + } + + @Override + public void close() {} + + @Override + public boolean isClosed() + { + return false; + } + } +} diff --git a/presto-native-execution/presto_cpp/main/PeriodicHeartbeatManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicHeartbeatManager.cpp index b6406222c9407..cc0c02aeb6e5a 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicHeartbeatManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicHeartbeatManager.cpp @@ -33,11 +33,12 @@ PeriodicHeartbeatManager::PeriodicHeartbeatManager( std::tuple PeriodicHeartbeatManager::httpRequest() { nlohmann::json j; - to_json(j, nodeStatusFetcher_()); + auto status = nodeStatusFetcher_(); + to_json(j, status); std::string body = j.dump(); proxygen::HTTPMessage request; - request.setMethod(proxygen::HTTPMethod::PUT); - request.setURL("/v1/heartbeat"); + request.setMethod(proxygen::HTTPMethod::PATCH); + request.setURL("/v1/resource-manager/node/" + status.nodeId); request.getHeaders().set( proxygen::HTTP_HEADER_HOST, fmt::format("{}:{}", address_, port_)); request.getHeaders().set( diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java index eba1a0a0c91c5..81f10b1ef6924 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java @@ -154,6 +154,7 @@ public enum StandardErrorCode DUPLICATE_FUNCTION_ERROR(0x0002_0016, INTERNAL_ERROR), MEMORY_ARBITRATION_FAILURE(0x0002_0017, INSUFFICIENT_RESOURCES), AUTHENTICATOR_NOT_APPLICABLE(0x0002_0018, INTERNAL_ERROR), + RESOURCE_MANAGER_ERROR(0x0002_0019, INTERNAL_ERROR), /**/; // Error code range 0x0003 is reserved for Presto-on-Spark