Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1509,3 +1509,36 @@ Controls behavior when a materialized view is stale and no per-view staleness co
Valid values are ``FAIL`` (throw an error) or ``USE_VIEW_QUERY`` (query base tables instead).

The corresponding session property is :ref:`admin/properties-session:\`\`materialized_view_stale_read_behavior\`\``.

Resource Manager Properties
---------------------------

``resource-manager-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

Set to true when a resource manager exists in the cluster.

``resource-manager.http-server-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

Controls whether the resource manager's REST server is turned on. This will enable
nodes to communicate with the resource manager using HTTP/S.

``internal-communication.resource-manager-communication-protocol``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Allowed values:** ``THRIFT``, ``HTTP``
* **Default value:** ``THRIFT``

Controls whether the node will communicate with the resource manager using Thrift,
or HTTP/S. HTTPS are supported using the same internal communication HTTPS
configs.

To enable SSL/TLS, see :doc:`/security/internal-communication`.
13 changes: 13 additions & 0 deletions presto-docs/src/main/sphinx/installation/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ At least one resource manager is needed for a cluster, and more can be added to
discovery-server.enabled=true
discovery.uri=http://example.net:8080 (Point to resource manager host/vip)
thrift.server.ssl.enabled=true
resource-manager.http-server-enabled=false
internal-communication.resource-manager-communication-protocol=THRIFT

* ``Coordinator``

Expand All @@ -175,6 +177,7 @@ A cluster can have a pool of coordinators. Each coordinator will run a subset of
query.max-memory-per-node=1GB
discovery.uri=http://example.net:8080 (Point to resource manager host/vip)
resource-manager-enabled=true
internal-communication.resource-manager-communication-protocol=THRIFT

* ``Worker``

Expand All @@ -188,6 +191,7 @@ A cluster can have a pool of workers. They send their heartbeats to the resource
query.max-memory-per-node=1GB
discovery.uri=http://example.net:8080 (Point to resource manager host/vip)
resource-manager-enabled=true
internal-communication.resource-manager-communication-protocol=THRIFT

These properties require some explanation:

Expand Down Expand Up @@ -231,6 +235,15 @@ These properties require some explanation:
the host and port of the Presto coordinator. This URI must not end
in a slash.

* ``internal-communication.resource-manager-communication-protocol``:
The protocol used for communication with the resource manager. This
can be set to ``THRIFT`` or ``HTTP``.

* ``resource-manager.http-server-enabled``:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the value of internal-communication.resource-manager-communication-protocol, and if it's HTTP, then HTTP server is enabled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the C++ workers use the HTTP protocol regardless if the coordinator communicates in thrift. So you could want to keep the thrift implementation & keep the http server on. We could have internal-communication.resource-manager-communication-protocol override resource-manager.http-server-enabled so that if it's set to HTTP & the http-server-enabled is set to false, it'll turn on anyway.

Whether to enable the resource manager HTTP server or not. If
``internal-communication.resource-manager-communication-protocol=HTTP``, this
must be set to ``true``.

The following flags can help one tune the disaggregated coordinator cluster’s resource groups to the desired consistency:

* ``concurrency-threshold-to-enable-resource-group-refresh (default: 1.0)``
Expand Down
2 changes: 1 addition & 1 deletion presto-docs/src/main/sphinx/overview/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ data from all coordinators and workers and constructs a global view of the clust
A Presto installation with a disaggregated coordinator needs a resource manager.
Clusters support multiple resource managers, each acting as a primary.

Coordinators and workers communicate with resource managers using a thrift API.
Coordinators and workers can communicate with resource managers using either a Thrift or HTTP API.

Coordinator
^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class InternalCommunicationConfig
private DataSize maxTaskUpdateSize = new DataSize(16, MEGABYTE);
private CommunicationProtocol taskCommunicationProtocol = CommunicationProtocol.HTTP;
private CommunicationProtocol serverInfoCommunicationProtocol = CommunicationProtocol.HTTP;
private CommunicationProtocol resourceManagerCommunicationProtocol = CommunicationProtocol.THRIFT;
private boolean memoizeDeadNodesEnabled;
private String sharedSecret;
private long nodeStatsRefreshIntervalMillis = 1_000;
Expand Down Expand Up @@ -352,6 +353,19 @@ public InternalCommunicationConfig setInternalJwtEnabled(boolean internalJwtEnab
return this;
}

@Config("internal-communication.resource-manager-communication-protocol")
@ConfigDescription("Protocol for internal communication with resource managers.")
public InternalCommunicationConfig setResourceManagerCommunicationProtocol(CommunicationProtocol resourceManagerCommunicationProtocol)
{
this.resourceManagerCommunicationProtocol = resourceManagerCommunicationProtocol;
return this;
}

public CommunicationProtocol getResourceManagerCommunicationProtocol()
{
return resourceManagerCommunicationProtocol;
}

@AssertTrue(message = "When internal JWT(internal-communication.jwt.enabled) authentication is enabled, a shared secret(internal-communication.shared-secret) is required")
public boolean isRequiredSharedSecretSet()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void testDefaults()
.setTaskUpdateRequestThriftSerdeEnabled(false)
.setTaskInfoResponseThriftSerdeEnabled(false)
.setInternalJwtEnabled(false)
.setResourceManagerCommunicationProtocol(CommunicationProtocol.THRIFT)
.setNodeStatsRefreshIntervalMillis(1_000)
.setNodeDiscoveryPollingIntervalMillis(5_000));
}
Expand Down Expand Up @@ -84,6 +85,7 @@ public void testExplicitPropertyMappings()
.put("experimental.internal-communication.task-info-response-thrift-serde-enabled", "true")
.put("internal-communication.node-stats-refresh-interval-millis", "2000")
.put("internal-communication.node-discovery-polling-interval-millis", "3000")
.put("internal-communication.resource-manager-communication-protocol", "HTTP")
.build();

InternalCommunicationConfig expected = new InternalCommunicationConfig()
Expand All @@ -106,6 +108,7 @@ public void testExplicitPropertyMappings()
.setMemoizeDeadNodesEnabled(true)
.setSharedSecret("secret")
.setInternalJwtEnabled(true)
.setResourceManagerCommunicationProtocol(CommunicationProtocol.HTTP)
.setTaskUpdateRequestThriftSerdeEnabled(true)
.setTaskInfoResponseThriftSerdeEnabled(true)
.setNodeStatsRefreshIntervalMillis(2000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Optional;

Expand All @@ -35,7 +37,14 @@ public class ResourceGroupRuntimeInfo
private final Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec;

@ThriftConstructor
public ResourceGroupRuntimeInfo(ResourceGroupId resourceGroupId, long memoryUsageBytes, int queuedQueries, int descendantQueuedQueries, int runningQueries, int descendantRunningQueries, Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec)
@JsonCreator
public ResourceGroupRuntimeInfo(@JsonProperty("resourceGroupId") ResourceGroupId resourceGroupId,
@JsonProperty("memoryUsageBytes") long memoryUsageBytes,
@JsonProperty("queuedQueries") int queuedQueries,
@JsonProperty("descendantQueuedQueries") int descendantQueuedQueries,
@JsonProperty("runningQueries") int runningQueries,
@JsonProperty("descendantRunningQueries") int descendantRunningQueries,
@JsonProperty("resourceGroupConfigSpec") Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec)
{
this.resourceGroupId = requireNonNull(resourceGroupId, "resourceGroupId is null");
this.memoryUsageBytes = memoryUsageBytes;
Expand All @@ -52,42 +61,49 @@ public static Builder builder(ResourceGroupId resourceGroupId)
}

@ThriftField(1)
@JsonProperty
public ResourceGroupId getResourceGroupId()
{
return resourceGroupId;
}

@ThriftField(2)
@JsonProperty
public long getMemoryUsageBytes()
{
return memoryUsageBytes;
}

@ThriftField(3)
@JsonProperty
public int getQueuedQueries()
{
return queuedQueries;
}

@ThriftField(4)
@JsonProperty
public int getDescendantQueuedQueries()
{
return descendantQueuedQueries;
}

@ThriftField(5)
@JsonProperty
public int getRunningQueries()
{
return runningQueries;
}

@ThriftField(6)
@JsonProperty
public int getDescendantRunningQueries()
{
return descendantRunningQueries;
}

@ThriftField(7)
@JsonProperty
public Optional<ResourceGroupSpecInfo> getResourceGroupConfigSpec()
{
return resourceGroupConfigSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.drift.client.DriftClient;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.memory.MemoryPoolInfo;
Expand All @@ -25,11 +26,14 @@
import jakarta.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL;
import static com.facebook.presto.server.InternalCommunicationConfig.CommunicationProtocol.THRIFT;
import static java.util.Objects.requireNonNull;

public class ClusterMemoryManagerService
Expand All @@ -39,19 +43,24 @@ public class ClusterMemoryManagerService
0,
0);

private final DriftClient<ResourceManagerClient> resourceManagerClient;
private final DriftClient<com.facebook.presto.resourcemanager.thrift.ResourceManagerClient> resourceManagerClient;
private final ResourceManagerClient httpResourceManagerClient;
private final ScheduledExecutorService executorService;
private final AtomicReference<Map<MemoryPoolId, ClusterMemoryPoolInfo>> memoryPools;
private final long memoryPoolFetchIntervalMillis;
private final boolean isReservedPoolEnabled;
private final PeriodicTaskExecutor memoryPoolUpdater;
private final InternalCommunicationConfig internalCommunicationConfig;
private final AtomicBoolean inFlight = new AtomicBoolean(false);

@Inject
public ClusterMemoryManagerService(
@ForResourceManager DriftClient<ResourceManagerClient> resourceManagerClient,
@ForResourceManager DriftClient<com.facebook.presto.resourcemanager.thrift.ResourceManagerClient> resourceManagerClient,
ResourceManagerClient httpResourceManagerClient,
@ForResourceManager ScheduledExecutorService executorService,
ResourceManagerConfig resourceManagerConfig,
NodeMemoryConfig nodeMemoryConfig)
NodeMemoryConfig nodeMemoryConfig,
InternalCommunicationConfig internalCommunicationConfig)
{
this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerClient is null");
this.executorService = requireNonNull(executorService, "executorService is null");
Expand All @@ -64,7 +73,19 @@ public ClusterMemoryManagerService(
defaultPoolBuilder.put(RESERVED_POOL, EMPTY_MEMORY_POOL);
}
this.memoryPools = new AtomicReference<>(defaultPoolBuilder.build());
this.memoryPoolUpdater = new PeriodicTaskExecutor(memoryPoolFetchIntervalMillis, executorService, () -> memoryPools.set(updateMemoryPoolInfo()));
this.memoryPoolUpdater = new PeriodicTaskExecutor(memoryPoolFetchIntervalMillis, executorService, () -> {
if (!inFlight.compareAndSet(false, true)) {
return;
}
try {
memoryPools.set(updateMemoryPoolInfo());
}
finally {
inFlight.set(false);
}
});
this.httpResourceManagerClient = requireNonNull(httpResourceManagerClient, "httpResourceManagerClient is null");
this.internalCommunicationConfig = requireNonNull(internalCommunicationConfig, "internalCommunicationConfig is null");
}

@PostConstruct
Expand All @@ -86,7 +107,13 @@ public Map<MemoryPoolId, ClusterMemoryPoolInfo> getMemoryPoolInfo()

private Map<MemoryPoolId, ClusterMemoryPoolInfo> updateMemoryPoolInfo()
{
Map<MemoryPoolId, ClusterMemoryPoolInfo> memoryPoolInfos = resourceManagerClient.get().getMemoryPoolInfo();
Map<MemoryPoolId, ClusterMemoryPoolInfo> memoryPoolInfos;
if (internalCommunicationConfig.getResourceManagerCommunicationProtocol() == THRIFT) {
memoryPoolInfos = resourceManagerClient.get().getMemoryPoolInfo();
}
else {
memoryPoolInfos = httpResourceManagerClient.getMemoryPoolInfo(Optional.empty());
}
memoryPoolInfos.putIfAbsent(GENERAL_POOL, EMPTY_MEMORY_POOL);
if (isReservedPoolEnabled) {
memoryPoolInfos.putIfAbsent(RESERVED_POOL, EMPTY_MEMORY_POOL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,46 @@
package com.facebook.presto.resourcemanager;

import com.facebook.drift.client.DriftClient;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.facebook.presto.util.PeriodicTaskExecutor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.facebook.presto.server.InternalCommunicationConfig.CommunicationProtocol.THRIFT;
import static java.util.Objects.requireNonNull;

public class ClusterQueryTrackerService
{
private final DriftClient<ResourceManagerClient> resourceManagerClient;
private final DriftClient<com.facebook.presto.resourcemanager.thrift.ResourceManagerClient> resourceManagerClient;
private final ScheduledExecutorService executorService;
private final long runningTaskCountFetchIntervalMillis;
private AtomicInteger runningTaskCount;
private final PeriodicTaskExecutor runningTaskCountUpdater;
private final ResourceManagerClient httpResourceManagerClient;
private final InternalCommunicationConfig internalCommunicationConfig;
private final AtomicBoolean inFlight = new AtomicBoolean(false);

@Inject
public ClusterQueryTrackerService(
@ForResourceManager DriftClient<ResourceManagerClient> resourceManagerClient,
@ForResourceManager DriftClient<com.facebook.presto.resourcemanager.thrift.ResourceManagerClient> resourceManagerClient,
ResourceManagerClient httpResourceManagerClient,
@ForResourceManager ScheduledExecutorService executorService,
ResourceManagerConfig resourceManagerConfig)
ResourceManagerConfig resourceManagerConfig,
InternalCommunicationConfig internalCommunicationConfig)
{
this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerClient is null");
this.executorService = requireNonNull(executorService, "executorService is null");
this.runningTaskCountFetchIntervalMillis = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getRunningTaskCountFetchInterval().toMillis();
this.runningTaskCount = new AtomicInteger(0);
this.runningTaskCountUpdater = new PeriodicTaskExecutor(runningTaskCountFetchIntervalMillis, executorService, () -> updateRunningTaskCount());
this.httpResourceManagerClient = requireNonNull(httpResourceManagerClient, "httpResourceManagerClient is null");
this.internalCommunicationConfig = requireNonNull(internalCommunicationConfig, "internalCommunicationConfig is null");
}

@PostConstruct
Expand All @@ -64,6 +75,19 @@ public int getRunningTaskCount()

private void updateRunningTaskCount()
{
this.runningTaskCount.set(resourceManagerClient.get().getRunningTaskCount());
if (internalCommunicationConfig.getResourceManagerCommunicationProtocol() == THRIFT) {
this.runningTaskCount.set(resourceManagerClient.get().getRunningTaskCount());
}
else {
if (!inFlight.compareAndSet(false, true)) {
return;
}
try {
this.runningTaskCount.set(httpResourceManagerClient.getRunningTaskCount(Optional.empty()));
}
finally {
inFlight.set(false);
}
}
}
}
Loading
Loading