From 65dc33b445fbf58e86eabd5a8c278c795cc66cc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 10 Oct 2023 15:13:43 +0200 Subject: [PATCH 1/2] Use Table in TaskDescriptorStorage --- .../scheduler/TaskDescriptorStorage.java | 77 ++++--------------- 1 file changed, 13 insertions(+), 64 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java index d00036326ac7..ef544fb151d9 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java @@ -14,7 +14,9 @@ package io.trino.execution.scheduler; import com.google.common.base.VerifyException; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.Multimap; +import com.google.common.collect.Table; import com.google.common.math.Stats; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; @@ -36,11 +38,9 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Optional; import java.util.Set; -import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -197,25 +197,25 @@ public synchronized long getReservedBytes() @NotThreadSafe private class TaskDescriptors { - private final Map descriptors = new HashMap<>(); + private final Table descriptors = HashBasedTable.create(); + private long reservedBytes; private RuntimeException failure; public void put(StageId stageId, int partitionId, TaskDescriptor descriptor) { throwIfFailed(); - TaskDescriptorKey key = new TaskDescriptorKey(stageId, partitionId); - checkState(descriptors.putIfAbsent(key, descriptor) == null, "task descriptor is already present for key %s ", key); + checkState(!descriptors.contains(stageId, partitionId), "task descriptor is already present for key %s/%s ", stageId, partitionId); + descriptors.put(stageId, partitionId, descriptor); reservedBytes += descriptor.getRetainedSizeInBytes(); } public TaskDescriptor get(StageId stageId, int partitionId) { throwIfFailed(); - TaskDescriptorKey key = new TaskDescriptorKey(stageId, partitionId); - TaskDescriptor descriptor = descriptors.get(key); + TaskDescriptor descriptor = descriptors.get(stageId, partitionId); if (descriptor == null) { - throw new NoSuchElementException(format("descriptor not found for key %s", key)); + throw new NoSuchElementException(format("descriptor not found for key %s/%s", stageId, partitionId)); } return descriptor; } @@ -223,10 +223,9 @@ public TaskDescriptor get(StageId stageId, int partitionId) public void remove(StageId stageId, int partitionId) { throwIfFailed(); - TaskDescriptorKey key = new TaskDescriptorKey(stageId, partitionId); - TaskDescriptor descriptor = descriptors.remove(key); + TaskDescriptor descriptor = descriptors.remove(stageId, partitionId); if (descriptor == null) { - throw new NoSuchElementException(format("descriptor not found for key %s", key)); + throw new NoSuchElementException(format("descriptor not found for key %s/%s", stageId, partitionId)); } reservedBytes -= descriptor.getRetainedSizeInBytes(); } @@ -238,10 +237,10 @@ public long getReservedBytes() private String getDebugInfo() { - Multimap descriptorsByStageId = descriptors.entrySet().stream() + Multimap descriptorsByStageId = descriptors.cellSet().stream() .collect(toImmutableSetMultimap( - entry -> entry.getKey().getStageId(), - Map.Entry::getValue)); + Table.Cell::getRowKey, + Table.Cell::getValue)); Map debugInfoByStageId = descriptorsByStageId.asMap().entrySet().stream() .collect(toImmutableMap( @@ -300,54 +299,4 @@ private void throwIfFailed() } } } - - private static class TaskDescriptorKey - { - private final StageId stageId; - private final int partitionId; - - private TaskDescriptorKey(StageId stageId, int partitionId) - { - this.stageId = requireNonNull(stageId, "stageId is null"); - this.partitionId = partitionId; - } - - public StageId getStageId() - { - return stageId; - } - - public int getPartitionId() - { - return partitionId; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TaskDescriptorKey key = (TaskDescriptorKey) o; - return partitionId == key.partitionId && Objects.equals(stageId, key.stageId); - } - - @Override - public int hashCode() - { - return Objects.hash(stageId, partitionId); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("stageId", stageId) - .add("partitionId", partitionId) - .toString(); - } - } } From 15b9058198ab881dd536a26ec99ebf5fe2e7f949 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 10 Oct 2023 15:02:53 +0200 Subject: [PATCH 2/2] Expose more stats for TaskDescriptorStorage --- .../scheduler/TaskDescriptorStorage.java | 190 +++++++++++++++++- 1 file changed, 186 insertions(+), 4 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java index ef544fb151d9..10a17253030e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptorStorage.java @@ -13,10 +13,13 @@ */ package io.trino.execution.scheduler; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Suppliers; import com.google.common.base.VerifyException; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Multimap; import com.google.common.collect.Table; +import com.google.common.math.Quantiles; import com.google.common.math.Stats; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; @@ -31,6 +34,7 @@ import io.trino.spi.TrinoException; import io.trino.sql.planner.plan.PlanNodeId; import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; import java.util.Collection; import java.util.Comparator; @@ -40,12 +44,18 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap; +import static com.google.common.math.Quantiles.percentiles; import static io.airlift.units.DataSize.succinctBytes; import static io.trino.spi.StandardErrorCode.EXCEEDED_TASK_DESCRIPTOR_STORAGE_CAPACITY; import static java.lang.String.format; @@ -57,6 +67,7 @@ public class TaskDescriptorStorage private final long maxMemoryInBytes; private final JsonCodec splitJsonCodec; + private final StorageStats storageStats; @GuardedBy("this") private final Map storages = new HashMap<>(); @@ -75,6 +86,7 @@ public TaskDescriptorStorage(DataSize maxMemory, JsonCodec splitJsonCodec { this.maxMemoryInBytes = maxMemory.toBytes(); this.splitJsonCodec = requireNonNull(splitJsonCodec, "splitJsonCodec is null"); + this.storageStats = new StorageStats(Suppliers.memoizeWithExpiration(this::computeStats, 1, TimeUnit.SECONDS)); } /** @@ -188,18 +200,84 @@ private synchronized void updateMemoryReservation(long delta) } } - @Managed - public synchronized long getReservedBytes() + @VisibleForTesting + synchronized long getReservedBytes() { return reservedBytes; } + @Managed + @Nested + public StorageStats getStats() + { + // This should not contain materialized values. GuiceMBeanExporter calls it only once during application startup + // and then only @Managed methods all called on that instance. + return storageStats; + } + + private synchronized StorageStatsValue computeStats() + { + int queriesCount = storages.size(); + long stagesCount = storages.values().stream().mapToLong(TaskDescriptors::getStagesCount).sum(); + + Quantiles.ScaleAndIndexes percentiles = percentiles().indexes(50, 90, 95); + + long queryReservedBytesP50 = 0; + long queryReservedBytesP90 = 0; + long queryReservedBytesP95 = 0; + long queryReservedBytesAvg = 0; + long stageReservedBytesP50 = 0; + long stageReservedBytesP90 = 0; + long stageReservedBytesP95 = 0; + long stageReservedBytesAvg = 0; + + if (queriesCount > 0) { // we cannot compute percentiles for empty set + + Map queryReservedBytesPercentiles = percentiles.compute( + storages.values().stream() + .map(TaskDescriptors::getReservedBytes) + .collect(toImmutableList())); + + queryReservedBytesP50 = queryReservedBytesPercentiles.get(50).longValue(); + queryReservedBytesP90 = queryReservedBytesPercentiles.get(90).longValue(); + queryReservedBytesP95 = queryReservedBytesPercentiles.get(95).longValue(); + queryReservedBytesAvg = reservedBytes / queriesCount; + + List storagesReservedBytes = storages.values().stream() + .flatMap(TaskDescriptors::getStagesReservedBytes) + .collect(toImmutableList()); + + if (!storagesReservedBytes.isEmpty()) { + Map stagesReservedBytesPercentiles = percentiles.compute( + storagesReservedBytes); + stageReservedBytesP50 = stagesReservedBytesPercentiles.get(50).longValue(); + stageReservedBytesP90 = stagesReservedBytesPercentiles.get(90).longValue(); + stageReservedBytesP95 = stagesReservedBytesPercentiles.get(95).longValue(); + stageReservedBytesAvg = reservedBytes / stagesCount; + } + } + + return new StorageStatsValue( + queriesCount, + stagesCount, + reservedBytes, + queryReservedBytesAvg, + queryReservedBytesP50, + queryReservedBytesP90, + queryReservedBytesP95, + stageReservedBytesAvg, + stageReservedBytesP50, + stageReservedBytesP90, + stageReservedBytesP95); + } + @NotThreadSafe private class TaskDescriptors { private final Table descriptors = HashBasedTable.create(); private long reservedBytes; + private final Map stagesReservedBytes = new HashMap<>(); private RuntimeException failure; public void put(StageId stageId, int partitionId, TaskDescriptor descriptor) @@ -207,7 +285,9 @@ public void put(StageId stageId, int partitionId, TaskDescriptor descriptor) throwIfFailed(); checkState(!descriptors.contains(stageId, partitionId), "task descriptor is already present for key %s/%s ", stageId, partitionId); descriptors.put(stageId, partitionId, descriptor); - reservedBytes += descriptor.getRetainedSizeInBytes(); + long descriptorRetainedBytes = descriptor.getRetainedSizeInBytes(); + reservedBytes += descriptorRetainedBytes; + stagesReservedBytes.computeIfAbsent(stageId, ignored -> new AtomicLong()).addAndGet(descriptorRetainedBytes); } public TaskDescriptor get(StageId stageId, int partitionId) @@ -227,7 +307,9 @@ public void remove(StageId stageId, int partitionId) if (descriptor == null) { throw new NoSuchElementException(format("descriptor not found for key %s/%s", stageId, partitionId)); } - reservedBytes -= descriptor.getRetainedSizeInBytes(); + long descriptorRetainedBytes = descriptor.getRetainedSizeInBytes(); + reservedBytes -= descriptorRetainedBytes; + requireNonNull(stagesReservedBytes.get(stageId), () -> format("no entry for stage %s", stageId)).addAndGet(-descriptorRetainedBytes); } public long getReservedBytes() @@ -298,5 +380,105 @@ private void throwIfFailed() throw failure; } } + + public int getStagesCount() + { + return descriptors.rowMap().size(); + } + + public Stream getStagesReservedBytes() + { + return stagesReservedBytes.values().stream() + .map(AtomicLong::get); + } + } + + private record StorageStatsValue( + long queriesCount, + long stagesCount, + long reservedBytes, + long queryReservedBytesAvg, + long queryReservedBytesP50, + long queryReservedBytesP90, + long queryReservedBytesP95, + long stageReservedBytesAvg, + long stageReservedBytesP50, + long stageReservedBytesP90, + long stageReservedBytesP95) {} + + public static class StorageStats + { + private final Supplier statsSupplier; + + StorageStats(Supplier statsSupplier) + { + this.statsSupplier = requireNonNull(statsSupplier, "statsSupplier is null"); + } + + @Managed + public long getQueriesCount() + { + return statsSupplier.get().queriesCount(); + } + + @Managed + public long getStagesCount() + { + return statsSupplier.get().stagesCount(); + } + + @Managed + public long getReservedBytes() + { + return statsSupplier.get().reservedBytes(); + } + + @Managed + public long getQueryReservedBytesAvg() + { + return statsSupplier.get().queryReservedBytesAvg(); + } + + @Managed + public long getQueryReservedBytesP50() + { + return statsSupplier.get().queryReservedBytesP50(); + } + + @Managed + public long getQueryReservedBytesP90() + { + return statsSupplier.get().queryReservedBytesP90(); + } + + @Managed + public long getQueryReservedBytesP95() + { + return statsSupplier.get().queryReservedBytesP95(); + } + + @Managed + public long getStageReservedBytesAvg() + { + return statsSupplier.get().stageReservedBytesP50(); + } + + @Managed + public long getStageReservedBytesP50() + { + return statsSupplier.get().stageReservedBytesP50(); + } + + @Managed + public long getStageReservedBytesP90() + { + return statsSupplier.get().stageReservedBytesP90(); + } + + @Managed + public long getStageReservedBytesP95() + { + return statsSupplier.get().stageReservedBytesP95(); + } } }