Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class InternalResourceGroup
private final ResourceGroupId id;
private final BiConsumer<InternalResourceGroup, Boolean> jmxExportListener;
private final Executor executor;
private final boolean staticResourceGroup;
Comment thread
cemcayiroglu marked this conversation as resolved.
Outdated

// Configuration
// =============
Expand Down Expand Up @@ -140,7 +141,12 @@ public class InternalResourceGroup
@GuardedBy("root")
private final CounterStat timeBetweenStartsSec = new CounterStat();

protected InternalResourceGroup(Optional<InternalResourceGroup> parent, String name, BiConsumer<InternalResourceGroup, Boolean> jmxExportListener, Executor executor)
protected InternalResourceGroup(
Optional<InternalResourceGroup> parent,
String name,
BiConsumer<InternalResourceGroup, Boolean> jmxExportListener,
Executor executor,
boolean staticResourceGroup)
{
this.parent = requireNonNull(parent, "parent is null");
this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null");
Expand All @@ -154,9 +160,10 @@ protected InternalResourceGroup(Optional<InternalResourceGroup> parent, String n
id = new ResourceGroupId(name);
root = this;
}
this.staticResourceGroup = staticResourceGroup;
}

public ResourceGroupInfo getFullInfo()
public ResourceGroupInfo getResourceGroupInfo(boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly)
{
synchronized (root) {
return new ResourceGroupInfo(
Expand All @@ -174,9 +181,10 @@ public ResourceGroupInfo getFullInfo()
eligibleSubGroups.size(),
subGroups.values().stream()
.filter(group -> group.getRunningQueries() + group.getQueuedQueries() > 0)
.map(InternalResourceGroup::getSummaryInfo)
.filter(group -> !includeStaticSubgroupsOnly || group.isStaticResourceGroup())
.map(group -> summarizeSubgroups ? group.getSummaryInfo() : group.getResourceGroupInfo(includeQueryInfo, false, includeStaticSubgroupsOnly))
.collect(toImmutableList()),
getAggregatedRunningQueriesInfo());
includeQueryInfo ? getAggregatedRunningQueriesInfo() : null);
}
}

Expand Down Expand Up @@ -225,6 +233,11 @@ private ResourceGroupInfo getSummaryInfo()
}
}

boolean isStaticResourceGroup()
{
return staticResourceGroup;
}

private ResourceGroupState getState()
{
synchronized (root) {
Expand Down Expand Up @@ -565,15 +578,22 @@ public void setJmxExport(boolean export)
jmxExportListener.accept(this, export);
}

public InternalResourceGroup getOrCreateSubGroup(String name)
public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegment)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this should have been named as staticSubGroup

{
requireNonNull(name, "name is null");
synchronized (root) {
checkArgument(runningQueries.isEmpty() && queuedQueries.isEmpty(), "Cannot add sub group to %s while queries are running", id);
if (subGroups.containsKey(name)) {
return subGroups.get(name);
}
InternalResourceGroup subGroup = new InternalResourceGroup(Optional.of(this), name, jmxExportListener, executor);
// parent segments size equals to subgroup segment index
int subGroupSegmentIndex = id.getSegments().size();
InternalResourceGroup subGroup = new InternalResourceGroup(
Optional.of(this),
name,
jmxExportListener,
executor,
staticResourceGroup && staticSegment);
// Sub group must use query priority to ensure ordering
if (schedulingPolicy == QUERY_PRIORITY) {
subGroup.setSchedulingPolicy(QUERY_PRIORITY);
Expand Down Expand Up @@ -941,9 +961,12 @@ public static final class RootInternalResourceGroup
{
private AtomicBoolean taskLimitExceeded = new AtomicBoolean();

public RootInternalResourceGroup(String name, BiConsumer<InternalResourceGroup, Boolean> jmxExportListener, Executor executor)
public RootInternalResourceGroup(
String name,
BiConsumer<InternalResourceGroup, Boolean> jmxExportListener,
Executor executor)
{
super(Optional.empty(), name, jmxExportListener, executor);
super(Optional.empty(), name, jmxExportListener, executor, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this true? Sorry I don't have much background about resource group.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the root resource group is always static

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

}

public synchronized void processQueuedQueries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -103,10 +104,10 @@ public InternalResourceGroupManager(
}

@Override
public ResourceGroupInfo getResourceGroupInfo(ResourceGroupId id)
public ResourceGroupInfo getResourceGroupInfo(ResourceGroupId id, boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

summarizeSubgroups should be summarizeSubGroups to make it true camel case.

{
checkArgument(groups.containsKey(id), "Group %s does not exist", id);
return groups.get(id).getFullInfo();
return groups.get(id).getResourceGroupInfo(includeQueryInfo, summarizeSubgroups, includeStaticSubgroupsOnly);
}

@Override
Expand Down Expand Up @@ -239,7 +240,9 @@ private synchronized void createGroupIfNecessary(SelectionContext<C> context, Ex
createGroupIfNecessary(new SelectionContext<>(id.getParent().get(), context.getContext()), executor);
InternalResourceGroup parent = groups.get(id.getParent().get());
requireNonNull(parent, "parent is null");
group = parent.getOrCreateSubGroup(id.getLastSegment());
// parent segments size equals to subgroup segment index
int subGroupSegmentIndex = parent.getId().getSegments().size();
group = parent.getOrCreateSubGroup(id.getLastSegment(), !context.getFirstDynamicSegmentPosition().equals(OptionalInt.of(subGroupSegmentIndex)));
}
else {
RootInternalResourceGroup root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void submit(Statement statement, ManagedQueryExecution queryExecution, Se
}

@Override
public ResourceGroupInfo getResourceGroupInfo(ResourceGroupId id)
public ResourceGroupInfo getResourceGroupInfo(ResourceGroupId id, boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface ResourceGroupManager<C>

SelectionContext<C> selectGroup(SelectionCriteria criteria);

ResourceGroupInfo getResourceGroupInfo(ResourceGroupId id);
ResourceGroupInfo getResourceGroupInfo(ResourceGroupId id, boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly);

List<ResourceGroupInfo> getPathToRoot(ResourceGroupId id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupState;
import com.facebook.presto.spi.resourceGroups.SchedulingPolicy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;

Expand Down Expand Up @@ -51,26 +52,22 @@ public class ResourceGroupInfo
private final List<ResourceGroupInfo> subGroups;
private final List<QueryStateInfo> runningQueries;

@JsonCreator
Comment thread
cemcayiroglu marked this conversation as resolved.
Outdated
public ResourceGroupInfo(
ResourceGroupId id,
ResourceGroupState state,

SchedulingPolicy schedulingPolicy,
int schedulingWeight,

DataSize softMemoryLimit,
int softConcurrencyLimit,
int hardConcurrencyLimit,
int maxQueuedQueries,

DataSize memoryUsage,
int numQueuedQueries,
int numRunningQueries,
int numEligibleSubGroups,

List<ResourceGroupInfo> subGroups,

List<QueryStateInfo> runningQueries)
@JsonProperty("id") ResourceGroupId id,
@JsonProperty("state") ResourceGroupState state,
@JsonProperty("schedulingPolicy") SchedulingPolicy schedulingPolicy,
@JsonProperty("schedulingWeight") int schedulingWeight,
@JsonProperty("softMemoryLimit") DataSize softMemoryLimit,
@JsonProperty("softConcurrencyLimit") int softConcurrencyLimit,
@JsonProperty("hardConcurrencyLimit") int hardConcurrencyLimit,
@JsonProperty("maxQueuedQueries") int maxQueuedQueries,
@JsonProperty("memoryUsage") DataSize memoryUsage,
@JsonProperty("numQueuedQueries") int numQueuedQueries,
@JsonProperty("numRunningQueries") int numRunningQueries,
@JsonProperty("numEligibleSubGroups") int numEligibleSubGroups,
@JsonProperty("subGroups") List<ResourceGroupInfo> subGroups,
@JsonProperty("runningQueries") List<QueryStateInfo> runningQueries)
{
this.id = requireNonNull(id, "id is null");
this.state = requireNonNull(state, "state is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;

import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;

Expand Down Expand Up @@ -51,15 +53,22 @@ public ResourceGroupStateInfoResource(ResourceGroupManager<?> resourceGroupManag
@Produces(MediaType.APPLICATION_JSON)
@Encoded
@Path("{resourceGroupId: .+}")
public ResourceGroupInfo getQueryStateInfos(@PathParam("resourceGroupId") String resourceGroupIdString)
public ResourceGroupInfo getQueryStateInfos(
@PathParam("resourceGroupId") String resourceGroupIdString,
@QueryParam("includeQueryInfo") @DefaultValue("true") boolean includeQueryInfo,
@QueryParam("summarizeSubgroups") @DefaultValue("true") boolean summarizeSubgroups,
@QueryParam("includeStaticSubgroupsOnly") @DefaultValue("false") boolean includeStaticSubgroupsOnly)
{
if (!isNullOrEmpty(resourceGroupIdString)) {
try {
return resourceGroupManager.getResourceGroupInfo(
new ResourceGroupId(
Arrays.stream(resourceGroupIdString.split("/"))
.map(ResourceGroupStateInfoResource::urlDecode)
.collect(toImmutableList())));
.collect(toImmutableList())),
includeQueryInfo,
summarizeSubgroups,
includeStaticSubgroupsOnly);
}
catch (NoSuchElementException e) {
throw new WebApplicationException(NOT_FOUND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void setup()
root.setHardConcurrencyLimit(queries);
InternalResourceGroup group = root;
for (int i = 0; i < children; i++) {
group = root.getOrCreateSubGroup(String.valueOf(i));
group = root.getOrCreateSubGroup(String.valueOf(i), true);
group.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group.setMaxQueuedQueries(queries);
group.setHardConcurrencyLimit(queries);
Expand Down
Loading