Reduce memory utilization on the Driver during the commit phase#16120
Reduce memory utilization on the Driver during the commit phase#16120arhimondr merged 3 commits intoprestodb:masterfrom
Conversation
|
This is all in addition to the #16036, that should significantly reduce memory utilization on the driver as the statistic pages no longer have to be buffered in the TableFinishOperator for the Presto on Spark usecase (Thanks @viczhang861 for optimizing it!) |
viczhang861
left a comment
There was a problem hiding this comment.
Try to make title "Release inmemory input pages incrementally" better, inmemory is only used for PrestoSparkTaskInputs
presto-hive/src/main/java/com/facebook/presto/hive/CreateEmptyPartitionProcedure.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java
Outdated
Show resolved
Hide resolved
presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java
Outdated
Show resolved
Hide resolved
a089538 to
07a07d0
Compare
There was a problem hiding this comment.
extra "the" after "compress"
d3c8757 to
5af3c26
Compare
To decrease memory pressure pages from the inmemory input can be released as soon as they are read by the Spark source operator
5af3c26 to
faa30da
Compare
| try (ByteArrayOutputStream output = new ByteArrayOutputStream(); | ||
| ZstdOutputStreamNoFinalizer zstdOutput = new ZstdOutputStreamNoFinalizer(output)) { | ||
| codec.writeBytes(zstdOutput, instance); | ||
| zstdOutput.close(); | ||
| output.close(); | ||
| return output.toByteArray(); | ||
| } | ||
| catch (IOException e) { | ||
| throw new UncheckedIOException(e); | ||
| } | ||
| } |
There was a problem hiding this comment.
| try (ByteArrayOutputStream output = new ByteArrayOutputStream(); | |
| ZstdOutputStreamNoFinalizer zstdOutput = new ZstdOutputStreamNoFinalizer(output)) { | |
| codec.writeBytes(zstdOutput, instance); | |
| zstdOutput.close(); | |
| output.close(); | |
| return output.toByteArray(); | |
| } | |
| catch (IOException e) { | |
| throw new UncheckedIOException(e); | |
| } | |
| } | |
| try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { | |
| try (ZstdOutputStreamNoFinalizer zstdOutput = new ZstdOutputStreamNoFinalizer(output)) { | |
| codec.writeBytes(zstdOutput, instance); | |
| } | |
| return output.toByteArray(); | |
| } | |
| catch (IOException e) { | |
| throw new UncheckedIOException(e); | |
| } | |
| } |
There was a problem hiding this comment.
In theory it should be the same. Java guarantees to close all resources in a reverse order.
public class Main
{
private static class Closeable1
implements Closeable
{
@Override
public void close()
{
System.out.println("Close Closeable1");
}
}
private static class Closeable2
implements Closeable
{
@Override
public void close()
{
System.out.println("Close Closeable2");
}
}
public static void main(String[] args)
{
try (Closeable1 closeable1 = new Closeable1(); Closeable2 closeable2 = new Closeable2()) {
System.out.println("Body");
}
}
}
Prints
Body
Close Closeable2
Close Closeable1
There was a problem hiding this comment.
The comment was more around not calling close() explicitly, since the try-with-resources does that.
There was a problem hiding this comment.
Oh, sorry. I misunderstood. Let me create a patch
There was a problem hiding this comment.
Actually I'm not sure if it is correct to call return output.toByteArray() before the ByteArrayOutputStream is closed? The implementation allows it, but I wonder if that's what is expected?
| return scala.reflect.ClassTag$.MODULE$.apply(clazz); | ||
| } | ||
|
|
||
| public static <T> Iterator<T> getNullifyingIterator(List<T> list) |
There was a problem hiding this comment.
I guess you can't just call remove() on the iterator because that's a code change in Spark?
There was a problem hiding this comment.
In theory remove on the iterator for an ArrayList is an O(N) operation (because it has to shift the "tail"). Although in practice I don't think it is going to be an issue, just being on a safer side (just in case there's a query that produces a list with a number of pages that would make this complexity to create a problem)
In Presto each writer produces a PartitionUpdate object that contains file names and other meta information for the files being written. This information is then collected on the driver to perform a final commit (do file renames). In some cases this meta information could be quite large. This patch tries to optimize several things:
PartitionUpdatememory footprint on the Driver by serializing to SMILE instead of JSON and applying ZSTD compression