diff --git a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java index 39910c5dccd82..e01b87e92f852 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java @@ -393,6 +393,11 @@ public synchronized long getPeakNodeTotalMemory() return peakNodeTotalMemory; } + public synchronized void setPeakNodeTotalMemory(long peakNodeTotalMemoryInBytes) + { + this.peakNodeTotalMemory = peakNodeTotalMemoryInBytes; + } + private static class QueryMemoryReservationHandler implements MemoryReservationHandler { @@ -473,7 +478,7 @@ private void enforceRevocableMemoryLimit(long allocated, long delta, long maxMem } @GuardedBy("this") - private String getAdditionalFailureInfo(long allocated, long delta) + public String getAdditionalFailureInfo(long allocated, long delta) { Map queryAllocations = memoryPool.getTaggedMemoryAllocations(queryId); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java index 165e52efcd33d..4821affaf0791 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java @@ -17,6 +17,8 @@ import com.facebook.airlift.configuration.ConfigDescription; import io.airlift.units.DataSize; +import javax.validation.constraints.DecimalMax; +import javax.validation.constraints.DecimalMin; import javax.validation.constraints.NotNull; import static io.airlift.units.DataSize.Unit.GIGABYTE; @@ -37,6 +39,7 @@ public class PrestoSparkConfig private DataSize sparkBroadcastJoinMaxMemoryOverride; private boolean smileSerializationEnabled = true; private int splitAssignmentBatchSize = 1_000_000; + private double memoryRevokingThreshold; public boolean isSparkPartitionCountAutoTuneEnabled() { @@ -191,4 +194,19 @@ public PrestoSparkConfig setSplitAssignmentBatchSize(int splitAssignmentBatchSiz this.splitAssignmentBatchSize = splitAssignmentBatchSize; return this; } + + @DecimalMin("0.0") + @DecimalMax("1.0") + public double getMemoryRevokingThreshold() + { + return memoryRevokingThreshold; + } + + @Config("spark.memory-revoking-threshold") + @ConfigDescription("Revoke memory when memory pool is filled over threshold") + public PrestoSparkConfig setMemoryRevokingThreshold(double memoryRevokingThreshold) + { + this.memoryRevokingThreshold = memoryRevokingThreshold; + return this; + } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index 06e94487eb110..585e3a8b974d2 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -116,7 +116,6 @@ import com.facebook.presto.spi.relation.DomainTranslator; import com.facebook.presto.spi.relation.PredicateCompiler; import com.facebook.presto.spi.relation.VariableReferenceExpression; -import com.facebook.presto.spiller.FileSingleStreamSpillerFactory; import com.facebook.presto.spiller.GenericPartitioningSpillerFactory; import com.facebook.presto.spiller.GenericSpillerFactory; import com.facebook.presto.spiller.NodeSpillConfig; @@ -124,6 +123,7 @@ import com.facebook.presto.spiller.SingleStreamSpillerFactory; import com.facebook.presto.spiller.SpillerFactory; import com.facebook.presto.spiller.SpillerStats; +import com.facebook.presto.spiller.TempStorageSingleStreamSpillerFactory; import com.facebook.presto.split.PageSinkManager; import com.facebook.presto.split.PageSinkProvider; import com.facebook.presto.split.PageSourceManager; @@ -379,7 +379,7 @@ protected void setup(Binder binder) // spill binder.bind(SpillerFactory.class).to(GenericSpillerFactory.class).in(Scopes.SINGLETON); - binder.bind(SingleStreamSpillerFactory.class).to(FileSingleStreamSpillerFactory.class).in(Scopes.SINGLETON); + binder.bind(SingleStreamSpillerFactory.class).to(TempStorageSingleStreamSpillerFactory.class).in(Scopes.SINGLETON); binder.bind(PartitioningSpillerFactory.class).to(GenericPartitioningSpillerFactory.class).in(Scopes.SINGLETON); binder.bind(SpillerStats.class).in(Scopes.SINGLETON); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java index c868e4c1def9a..5c2716a649f8c 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java @@ -24,6 +24,7 @@ import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static com.facebook.presto.spi.session.PropertyMetadata.dataSizeProperty; +import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty; import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty; public class PrestoSparkSessionProperties @@ -38,6 +39,7 @@ public class PrestoSparkSessionProperties public static final String STORAGE_BASED_BROADCAST_JOIN_WRITE_BUFFER_SIZE = "storage_based_broadcast_join_write_buffer_size"; public static final String SPARK_BROADCAST_JOIN_MAX_MEMORY_OVERRIDE = "spark_broadcast_join_max_memory_override"; public static final String SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE = "spark_split_assignment_batch_size"; + public static final String SPARK_MEMORY_REVOKING_THRESHOLD = "spark_memory_revoking_threshold"; private final List> sessionProperties; @@ -94,6 +96,11 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig) SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE, "Number of splits are processed in a single iteration", prestoSparkConfig.getSplitAssignmentBatchSize(), + false), + doubleProperty( + SPARK_MEMORY_REVOKING_THRESHOLD, + "Revoke memory when memory pool is filled over threshold", + prestoSparkConfig.getMemoryRevokingThreshold(), false)); } @@ -151,4 +158,9 @@ public static int getSplitAssignmentBatchSize(Session session) { return session.getSystemProperty(SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE, Integer.class); } + + public static double getMemoryRevokingThreshold(Session session) + { + return session.getSystemProperty(SPARK_MEMORY_REVOKING_THRESHOLD, Double.class); + } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java index 087da7b8e5b56..4fd187897ce96 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java @@ -38,8 +38,10 @@ import com.facebook.presto.memory.MemoryPool; import com.facebook.presto.memory.NodeMemoryConfig; import com.facebook.presto.memory.QueryContext; +import com.facebook.presto.memory.VoidTraversingQueryContextVisitor; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.OutputFactory; import com.facebook.presto.operator.TaskContext; import com.facebook.presto.operator.TaskStats; @@ -109,16 +111,20 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.CRC32; +import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalTotalMemoryLimit; import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount; import static com.facebook.presto.SystemSessionProperties.getQueryMaxBroadcastMemory; import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; +import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; import static com.facebook.presto.execution.TaskState.FAILED; import static com.facebook.presto.execution.TaskStatus.STARTING_VERSION; import static com.facebook.presto.execution.buffer.BufferState.FINISHED; import static com.facebook.presto.metadata.MetadataUpdates.DEFAULT_METADATA_UPDATES; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMemoryRevokingThreshold; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getShuffleOutputTargetAverageRowSize; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getSparkBroadcastJoinMaxMemoryOverride; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getStorageBasedBroadcastJoinWriteBufferSize; @@ -134,7 +140,9 @@ import static com.google.common.base.Throwables.propagateIfPossible; import static com.google.common.collect.Iterables.getFirst; import static io.airlift.units.DataSize.Unit.BYTE; +import static io.airlift.units.DataSize.succinctBytes; import static java.lang.Math.min; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -176,6 +184,8 @@ public class PrestoSparkTaskExecutorFactory private final PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager; private final String storageBasedBroadcastJoinStorage; + private AtomicBoolean memoryRevokePending = new AtomicBoolean(); + @Inject public PrestoSparkTaskExecutorFactory( SessionPropertyManager sessionPropertyManager, @@ -362,6 +372,7 @@ public IPrestoSparkTaskExecutor doCreate( } MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("spark-executor-memory-pool"), maxTotalMemory); + SpillSpaceTracker spillSpaceTracker = new SpillSpaceTracker(maxSpillMemory); QueryContext queryContext = new QueryContext( @@ -387,6 +398,42 @@ public IPrestoSparkTaskExecutor doCreate( allocationTrackingEnabled, false); + final double memoryRevokingThreshold = getMemoryRevokingThreshold(session); + if (isSpillEnabled(session)) { + memoryPool.addListener((pool, queryId, totalMemoryReservationBytes) -> { + if (totalMemoryReservationBytes > queryContext.getPeakNodeTotalMemory()) { + queryContext.setPeakNodeTotalMemory(totalMemoryReservationBytes); + } + if (totalMemoryReservationBytes > maxTotalMemory.toBytes()) { + throw exceededLocalTotalMemoryLimit( + maxTotalMemory, + queryContext.getAdditionalFailureInfo(totalMemoryReservationBytes, 0) + + format("Total reserved memory: %s, Total revocable memory: %s", + succinctBytes(pool.getQueryMemoryReservation(queryId)), + succinctBytes(pool.getQueryRevocableMemoryReservation(queryId)))); + } + if (totalMemoryReservationBytes > pool.getMaxBytes() * memoryRevokingThreshold && memoryRevokePending.compareAndSet(false, true)) { + memoryUpdateExecutor.execute(() -> { + try { + taskContext.accept(new VoidTraversingQueryContextVisitor() + { + @Override + public Void visitOperatorContext(OperatorContext operatorContext, Void nothing) + { + operatorContext.requestMemoryRevoking(); + return null; + } + }, null); + memoryRevokePending.set(false); + } + catch (Exception e) { + log.error(e, "Error requesting memory revoking"); + } + }); + } + }); + } + ImmutableMap.Builder> shuffleInputs = ImmutableMap.builder(); ImmutableMap.Builder>> pageInputs = ImmutableMap.builder(); ImmutableMap.Builder> broadcastInputs = ImmutableMap.builder(); @@ -547,7 +594,7 @@ private List getTaskSources(Iterator configProperties = ImmutableMap.builder(); + configProperties.put("experimental.spill-enabled", "true"); + configProperties.put("experimental.join-spill-enabled", "true"); + configProperties.put("experimental.temp-storage-buffer-size", "1MB"); + configProperties.put("spark.memory-revoking-threshold", "0.0"); + configProperties.put("experimental.spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString()); + return createHivePrestoSparkQueryRunner(getTables(), configProperties.build()); + } +} diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java index c151f3a2a3d4c..a58b684c4d9ad 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java @@ -160,7 +160,7 @@ public void testSelectLargeInterval() assertEquals(result.getMaterializedRows().get(0).getField(0), new SqlIntervalYearMonth(Short.MAX_VALUE, 0)); } - @Test + @Test(enabled = false) public void testEmptyJoins() { Session sessionWithEmptyJoin = Session.builder(getSession())