Skip to content

Conversation

@parthchandra
Copy link
Contributor

Jira

This PR addresses the following PARQUET-2149: Implement async IO for Parquet file reader

Tests

This PR adds the following unit tests
AsyncMultiBufferInputStream.*
TestMultipleWriteRead.testReadWriteAsync
TestColumnChunkPageWriteStore.testAsync

The PR is also tested by changing the default configuration to make all reads async and then ensuring all unit tests pass

@parthchandra
Copy link
Contributor Author

Anyone know why the CI checks are failing with a SocketTimeout exception, and what to do to address this?

@dbtsai
Copy link
Member

dbtsai commented May 17, 2022

@parthchandra do you have performance benchmark? Thanks

@parthchandra
Copy link
Contributor Author

I have some numbers from an internal benchmark using Spark. I didn't see any benchmarks in the Parquet codebase that I could reuse.

Here are the numbers from my own benchmark -

  • 10 runs, each run reads all columns from store_sales (the largest table) in the TPC-DS (100G) dataset
    spark.sql("select * from store_sales")
  • Sync reader with default 8MB buffer size, Async reader with 1MB buffer size (achieves better pipelining)
  • Run on Macbook Pro, reading from S3. Spark has 6 cores.
  • All times in seconds
Run Async Sync Async (w/o outliers) Sync (w/o outliers)
1 84 102 - -
2 90 366 90 366
3 78 156 - 156
4 84 128 84 -
5 108 402 - -
6 90 432 90 -
7 84 378 84 378
8 108 324 - 324
9 90 318 90 318
10 90 282 90 282
Average 90.6 288.8 88 304
Median 90 321 90 321
StdDev 9.98 119.

After removing the two highest and two lowest runs for each case, and taking the median value:

Async: 90 sec
Sync: 321 sec

@shangxinli
Copy link
Contributor

Great effort! WIll have a look after the build succeed.

@theosib-amazon
Copy link
Contributor

@parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch.
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing

@dbtsai
Copy link
Member

dbtsai commented May 18, 2022

cc @rdblue @gszadovszky @ggershinsky

@parthchandra
Copy link
Contributor Author

Great effort! WIll have a look after the build succeed.
@shangxinli I have no idea how to get the failed CI to pass. These failures appear to be in unrelated areas caused by some infra issues. Is there a way to trigger a rerun?

@parthchandra
Copy link
Contributor Author

@parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch.
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing

@theosib-amazon I read your document and went thru #960. It looks like for the most part, #960 and this PR and complement each other. The overlap I see is in the changes to MultiBufferInputStream where you have added the readFully, and skipFully APIs. The bulk of my changes for async IO are in a class derived from MultiBufferInputStream and the heart of the changes depends on overriding MultiBufferInputStream.nextBuffer. In MultiBufferInputStream.nextBuffer the assumption is that all the buffers have been read into. In AsyncMultiBufferInputStream.nextBuffer this assumption is removed and the call blocks only if the next required buffer has not been read into.
Now, skipFully and readFully are potentially blocking calls because both call nextBuffer repeatedly if necessary. To gain maximum pipelining, you want to make calls to skipFully and readFully such that you never block for too long (or at all) in the call. You will get this if you are skipping or reading less than the number of bytes in a single buffer. This is generally the case as decompression and decoding is at the page level and that is smaller than the size of a single buffer. However, for your optimizations, you should be aware of this behaviour.
From what I see, I don't think there will be a conflict. I'll pull in your PR and give it a deeper look.

@theosib-amazon
Copy link
Contributor

@parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O?

/**
* Close the page reader. By default it is no-op.
*/
default void close() throws IOException {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add Closeable as an implemented interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I know a lot of times I check if instances of closeable and then call close.

Comment on lines +124 to +116
checkState();
// hack: parent constructor can call this method before this class is fully initialized.
// Just return without doing anything.
if (readFutures == null) {
return false;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because checkState has synchronization, would it be safe to move the checkState before this somehow or add some kind of less expensive boolean check that we can set to true immediately after the super() call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad you made me look. The synchronization was needed because I was saving any exception that might have occurred in the reader thread. However, that exception will be captured by the Future object and thrown as an ExecutionException when Future,get is called in line 135.
So we don't really need to save the exception and the synchronization around it can also be removed.

@parthchandra
Copy link
Contributor Author

@parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O?

Good point. MultiBufferInputStream is constructed using buffers that have been filled already. AsyncMultiBufferInputStream takes an input stream as a parameter in the constructor and performs the IO itself. In ByteBufferInputStream I added

  public static ByteBufferInputStream wrapAsync(ExecutorService threadPool, SeekableInputStream fileInputStream,
    List<ByteBuffer> buffers) {
    return new AsyncMultiBufferInputStream(threadPool, fileInputStream, buffers);
  }

@parthchandra
Copy link
Contributor Author

@theosib-amazon I applied my PR on top of your PR, ran thru some tests using Spark, and hit no issues. (All unit tests passed as well).

Copy link

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @parthchandra A few cosmetic comments

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't seem to find Iterator is used... Should we remove if it is the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. This and a bunch of other imports are not used. Removing them all.

import java.util.Queue;

import java.util.concurrent.LinkedBlockingDeque;
import org.apache.parquet.ParquetRuntimeException;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I am not sure if ParquetRuntimeException is used...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines 40 to 48
int fetchIndex = 0;
int readIndex = 0;
ExecutorService threadPool;
LinkedBlockingQueue<Future<Void>> readFutures;
boolean closed = false;

LongAdder totalTimeBlocked = new LongAdder();
LongAdder totalCountBlocked = new LongAdder();
LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to change these to private if they are not accessed from other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import java.util.Map;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.Queue;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Queue is no longer used with this change. Should we remove the import?

@steveloughran
Copy link
Contributor

  1. whose s3 client was used for testing here -if the s3a one, which hadoop release?
  2. the azure abfs and gcs connectors do async prefetching of the next block, but are simply assuming that code will read sequentially; if there is another seek/readFully to a new location, those prefetches will be abandoned. there is work in s3a to do prefetching here with caching, so as to reduce the penalty of backwards seeks. https://issues.apache.org/jira/browse/HADOOP-18028

hadoop is adding a vectored IO api intended for libraries like orc and parquet to be able to use, where the application provides an unordered list of ranges, a bytebuffer supplier and gets back a list of futures to wait for. the base implementation simply reads using readFully APi. s3a (and later abfs) will do full async retrieval itself, using the http connection pool.
https://issues.apache.org/jira/browse/HADOOP-18103

both vectored io and s3a prefetching will ship this summer in hadoop 3.4.0. i don't see this change conflicting with this, though they may obsolete a lot of it.

have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there?

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments. i would recommend looking at https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/fsdatainputstream.html and lifting the tests in hadoop-common to make sure they work with this stream. because, correct or not, the semantics there are what applications often expect.

LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
long putStart = System.nanoTime();
long putCompleted = System.nanoTime();
LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't. Sorry, this got left behind after a cleanup.

}

@Override
public int read(ByteBuffer out) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you 100% confident that all uses of read() in parquet are in a single thread? it is not unknown for apps to read across threads, even though the java APIs say "don't"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure it's always in a single thread. The stream is read from in only one place in ParquetFileRaader and the original code was always single threaded. The async + parallel column reader creates a new instance for every column reader and no column reader crosses a thread boundary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to know.

if (iterator.hasNext()) {
current = iterator.next();
} else {
throw new EOFException("End of streams");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputStream.read mandates return -1 on eof

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does.
I based this on the fact that for most cases this class would encapsulate a MultiBufferInputStream and that class throws an EOFException instead of returning -1 and I was afraid that not throwing the exception would cause something to fail because the calling code would not be checking for -1.
Let me try to change this to -1 and see.

@parthchandra
Copy link
Contributor Author

@steveloughran thank you very much for taking the time to review and provide feedback!

  1. whose s3 client was used for testing here -if the s3a one, which hadoop release?

I was working with s3a -
Spark 3.2.1
Hadoop (Hadoop-aws) 3.3.2
AWS SDK 1.11.655

  1. the azure abfs and gcs connectors do async prefetching of the next block, but are simply assuming that code will read sequentially; if there is another seek/readFully to a new location, those prefetches will be abandoned. there is work in s3a to do prefetching here with caching, so as to reduce the penalty of backwards seeks. https://issues.apache.org/jira/browse/HADOOP-18028

I haven't worked with abfs or gcs. If the connectors do async pre-fetching, that would be great. Essentially, the time the Parquet reader would have to block in the file system API would reduce substantially. In such a case, we could turn the async reader on/off and rerun the benchmark to compare. From past experience with the MaprFS which had very aggressive read ahead in its hdfs client, I would still expect better parquet speeds. The fact that the prefetch is turned off when a seek occurs is usual behaviour, but we may see no benefit from the connector in that case. So a combination of async reader and async connector might end up being a great solution (maybe at a slightly greater CPU utilization). We would still have to do a benchmark to see the real effect.
The async version in this PR takes care of the sequential read requirement by a) opening a new stream for each column and ensuring every column is read sequentially. Footers are read using a separate stream. Except for the footer, no other stream ever seeks to a new location. b) The amount of data to be read is predetermined so there is never a read ahead that is discarded.

hadoop is adding a vectored IO api intended for libraries like orc and parquet to be able to use, where the application provides an unordered list of ranges, a bytebuffer supplier and gets back a list of futures to wait for. the base implementation simply reads using readFully APi. s3a (and later abfs) will do full async retrieval itself, using the http connection pool. https://issues.apache.org/jira/browse/HADOOP-18103

both vectored io and s3a prefetching will ship this summer in hadoop 3.4.0. i don't see this change conflicting with this, though they may obsolete a lot of it.

Yes, I became aware of this recently. I'm discussing integration of these efforts in a separate channel. At the moment I see no conflict, but have yet to determine how much of this async work would need to be changed. I suspect we may be able to eliminate or vastly simplify AsyncMultiBufferInputStream.

have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there?

No I have not. Would love help from anyone in the community with access to these. I only have access to S3.

@steveloughran
Copy link
Contributor

I was working with s3a
Spark 3.2.1
Hadoop (Hadoop-aws) 3.3.2
AWS SDK 1.11.655

thanks., that means you are current with all shipping improvments. the main one extra is to use openFile(), passing in length and requesting randomio. this guarantees ranged GET requests and cuts the initial HEAD probe for existence/size of file.

have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there?

No I have not. Would love help from anyone in the community with access to these. I only have access to S3.

that I have. FWIW, with the right tuning of abfs prefetch (4 threads, 128 MB blocks) i can get full FTTH link rate from a remote store; 700 mbit/s . that's to the base station. once you add wifi the bottlenecks move.

*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Copy link
Contributor

@theosib-amazon theosib-amazon May 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're running in to the same bug in IntelliJ as I am, where it makes whitespace changes without authorization. Would you mind commenting on this bug report that I filed?
https://youtrack.jetbrains.com/issue/IDEA-293197/IntelliJ-makes-unauthorized-changes-to-whitespace-in-comments-wo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@theosib-amazon
Copy link
Contributor

Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer?

@parthchandra
Copy link
Contributor Author

thanks., that means you are current with all shipping improvments. the main one extra is to use openFile(), passing in length and requesting randomio. this guarantees ranged GET requests and cuts the initial HEAD probe for existence/size of file.

By openFile() do you mean FileSystem.openFileWithOptions(Path,OpenFileParameters)?
While looking I realized the Parquet builds with a much older version of hadoop

have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there?

No I have not. Would love help from anyone in the community with access to these. I only have access to S3.

that I have. FWIW, with the right tuning of abfs prefetch (4 threads, 128 MB blocks) i can get full FTTH link rate from a remote store; 700 mbit/s . that's to the base station. once you add wifi the bottlenecks move.

Wow! That is nearly as fast as local HDD. At this point the bottlenecks in parquet begin to move towards decompression and decoding but IO remains the slowest link in the chain. One thing we get with my PR is that the ParquetFileReader had assumptions built in that all data must be read before downstream can proceed. Some of my changes are related to removing these assumptions and ensuring that downstream processing does not block until an entire column is read so we get efficient pipelining.
What does the 128 MB block mean? Is this the amount prefetched for a stream? The read API does not block until the entire block is filled, I presume.
With my PR, parquet IO is reading 8MB at a time (default) and downstream is processing 1MB at a time (default) and several such streams (one per column) are in progress at the same time. Hopefully, this read pattern would work with the prefetch.

@parthchandra
Copy link
Contributor Author

Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer?
The input stream API is generally unaware of the datatypes of its contents and so those are the only apis I use. The other reason is that the ParquetFileReader returns Pages which basically contain metadata and ByteBuffers of compressed data. The decompression and decoding into types comes much later in a downstream thread.
For your PR, I don't think the AsyncMultibufferInputStream is every going to be in play in the paths you're optimizing. But just in case it is, your type aware methods will work as is because AsyncMultibufferInputStream is derived from MultiBufferInputStream and will inherit those methods.

@steveloughran
Copy link
Contributor

At this point the bottlenecks in parquet begin to move towards decompression and decoding but IO remains the slowest link in the chain.

Latency is the killer; in an HTTP request you want read enough but not discard data or break an http connection if the client suddenly does a seek() or readFully() somewhere else. file listings, existence checks etc.

One thing we get with my PR is that the ParquetFileReader had assumptions built in that all data must be read before downstream can proceed. Some of my changes are related to removing these assumptions and ensuring that downstream processing does not block until an entire column is read so we get efficient pipelining.

That'd be great. now, if you could also handle requesting different columns in parallel and processing them out of order.

What does the 128 MB block mean? Is this the amount prefetched for a stream? The read API does not block until the entire block is filled, I presume.

this was the abfs client set to do four GET requests of 128MB each. this would be awful for columns stores where smaller ranges are often requested/processed before another seek is made, but quite often parquet does do more back to back reads than just one read/readFully request

With my PR, parquet IO is reading 8MB at a time (default) and downstream is processing 1MB at a time (default) and several such streams (one per column) are in progress at the same time. Hopefully, this read pattern would work with the prefetch.

be good to think about vectored IO.

and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline.

just sketched out my thoughts on this. I've played with some of this in my own branch. I think the next step would be for me to look at the benchmark code to make it targetable elsewhere.

https://docs.google.com/document/d/1y9oOSYbI6fFt547zcQJ0BD8VgvJWdyHBveaiCHzk79k/

@theosib-amazon
Copy link
Contributor

This is interesting, because when I did profiling of Trino, I found that although I/O (from S3, over the network no less) was significant, even more time was spent in compute. Maybe you're getting improved performance because you're increasing parallelism between I/O and compute.

@parthchandra
Copy link
Contributor Author

Latency is the killer; in an HTTP request you want read enough but not discard data or break an http connection if the client suddenly does a seek() or readFully() somewhere else. file listings, existence checks etc.

That'd be great. now, if you could also handle requesting different columns in parallel and processing them out of order.

I do. The Parquet file reader api that reads row groups in sync mode reads all columns in sequence. In async mode, it fires off a task for every column blocking only to read the first page of every column before returning. This part also uses a different thread pool from the IO tasks so that IO tasks never wait because there are no available threads in the thread pool.

be good to think about vectored IO.

I think I know how to integrate this PR with the vectored IO, but this is only after a cursory look.

and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline.

Who can drive this (presumably) non-trivial change? I myself have no karma points :(

just sketched out my thoughts on this. I've played with some of this in my own branch. I think the next step would be for me to look at the benchmark code to make it targetable elsewhere.

https://docs.google.com/document/d/1y9oOSYbI6fFt547zcQJ0BD8VgvJWdyHBveaiCHzk79k/

This is great. I now have much more context of where you are coming from (and going to) !

@theosib-amazon
Copy link
Contributor

Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer?
The input stream API is generally unaware of the datatypes of its contents and so those are the only apis I use. The other reason is that the ParquetFileReader returns Pages which basically contain metadata and ByteBuffers of compressed data. The decompression and decoding into types comes much later in a downstream thread.
For your PR, I don't think the AsyncMultibufferInputStream is every going to be in play in the paths you're optimizing. But just in case it is, your type aware methods will work as is because AsyncMultibufferInputStream is derived from MultiBufferInputStream and will inherit those methods.

I'm still learning Parquet's structure. So it sounds to me like these buffer input streams are used twice. Once to get data and decompress it, and then once again to decode it into data structures. Is that correct? So it sounds like you're optimizing one layer of processing, and I'm optimizing the next layer up, and it's kindof a coincidence that we're touching some of the same classes just because code reuse has been possible here.

@parthchandra
Copy link
Contributor Author

BTW, adding more tests for the InputStream implementations.

@parthchandra
Copy link
Contributor Author

parthchandra commented May 24, 2022

So it sounds like you're optimizing one layer of processing, and I'm optimizing the next layer up, and it's kindof a coincidence that we're touching some of the same classes just because code reuse has been possible here.

Yeah, kind of cool :)

@parthchandra
Copy link
Contributor Author

This is interesting, because when I did profiling of Trino, I found that although I/O (from S3, over the network no less) was significant, even more time was spent in compute. Maybe you're getting improved performance because you're increasing parallelism between I/O and compute.

It may be because I was using Spark's vectorized parquet decoding which is an order or magnitude faster than parquet library's row by row decoding (see Spark benchmarks). If trino is not doing vectorized decoding (I took a very quick look and I don't think it is), I would suggest you can look into that next. All the cool kids are doing it.

@sunchao
Copy link
Member

sunchao commented May 25, 2022

and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline.

+1 on upgrading to 3.3.0, although currently parquet is using 2.10.1 as a provided dependency and we need to make sure it continues to work with hadoop 2.x

It may be because I was using Spark's vectorized parquet decoding which is an order or magnitude faster than parquet library's row by row decoding (see Spark benchmarks). If trino is not doing vectorized decoding (I took a very quick look and I don't think it is), I would suggest you can look into that next. All the cool kids are doing it.

Presto already has a batch reader but seems the feature is not in Trino yet. The batch reader did help a lot to reduce the CPU load. See the slides.

@parthchandra
Copy link
Contributor Author

@wgtmac Thank you!

readResult.get(1, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
// Do nothing
Copy link
Contributor

@shangxinli shangxinli Dec 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think adding a log make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really useful to log anything here because there is nothing to be done. Some of the calls in the try block throw exceptions and we catch them to prevent them from propagating. We are shutting down and cleaning up the futures so it doesn't matter if there was an exception or not.

@Override
public boolean nextBuffer() {
checkState();
// hack: parent constructor can call this method before this class is fully initialized.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the comment with 'hack'. What is the proper implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix would be to change behavior of the parent class. Basically the parent class implicitly assumes that there will be no derived classes and called a private method nextBuffer in the constructor. I extended the class and made next buffer protected so it is no longer correct to call it in the constructor.
Changing the parent class behavior broke too many unit tests. I could have fixed the unit tests as well, but wasn't sure that downstream projects might be depending on the behavior.
In the end, it seemed that handling this in the derived class was a safer approach.
It's really a workaround to make things easier to implement. Perhaps I should not have called it a hack.

@shangxinli
Copy link
Contributor

@wgtmac
Copy link
Member

wgtmac commented Feb 1, 2023

@kazuyukitanimura @steveloughran @kbendick @ggershinsky @wgtmac @theosib-amazon Do you still have comments?

It looks good to me. Please feel free to merge as you see fit. @shangxinli

@whcdjj
Copy link

whcdjj commented Feb 24, 2023

Hi, I am very interested in this optimization and just have some questiones when testing in a cluster with 4nodes/96 cores using spark3.1. Unfortunately, I see little improvement.
I am confused than whether it is neccessary to keep spark.sql.parquet.enableVectorizedReader = false in spark when testing with spark 3.1 and how can I set the parquet buffer size. Sincerely ask for advice @parthchandra

@parthchandra
Copy link
Contributor Author

Hi, I am very interested in this optimization and just have some questions when testing in a cluster with 4nodes/96 cores using spark3.1. Unfortunately, I see little improvement.

You're likely to see improvement in cases where file i/o is the bottleneck. Most TPC-DS queries are join heavy and you will see little improvement there. You might do better with TPC-H.

I am confused than whether it is necessary to keep spark.sql.parquet.enableVectorizedReader = false in spark when testing with spark 3.1 and how can I set the parquet buffer size.

It's probably best to keep the parquet (read) buffer size untouched.

You should keep spark.sql.parquet.enableVectorizedReader = true irrespective of this. This feature improves I/O speed of reading raw data. The Spark vectorized reader kicks in after data is read from storage and converts the raw data into Spark's internal columnar representation and is faster than the row based version.

@whcdjj
Copy link

whcdjj commented Feb 25, 2023

My test is spark.sql("select * from store_sales order by ss_customer_sk limit 10"), store_sales is table of 1TB TP-CDS.
Parquet-io and parquet-process threads is hardcoded given like this in ParquetReadOptions.java
public static class Builder {
protected ExecutorService ioThreadPool = Executors.newFixedThreadPool(4);
protected ExecutorService processThreadPool = Executors.newFixedThreadPool(4);
}

I also take the follow test with local filesystem using 100GB TP-CDS store_sales table,and I see there is a degradation with async io feature.

test("parquet reader") {
val sc = SparkSession.builder().master("local[4]").getOrCreate()
val df = sc.read.parquet("file:///xxx/store_sales")
df.createOrReplaceTempView("table")
val start = System.currentTimeMillis()
sc.sql("select * from table order by ss_customer_sk limit 10").show()
val end = System.currentTimeMillis()
System.out.println("time: " + (end - start))
}

without this feature -> time: 7240
with this feature -> time: 19923

Threads are as expected
image

What process did I go wrong and can you show me the correct way to use this feature?
@parthchandra

@parthchandra
Copy link
Contributor Author

Looks correct to me. Couple of questions, are you running this on a cluster or on local system? Also, is the data on SSD's? If you are on a single machine, there might not be enough CPU and the async threads may be contending for time with the processing threads. We'll need some profiling info to get a better diagnosis. Also, with SSD's, reads are so fast from the file system that the async feature might show very little improvement.
You could turn on the debug logging level for FilePageReader and AsyncMultibufferInputStream.
We can continue this in a thread outside this PR.

@whcdjj
Copy link

whcdjj commented Feb 27, 2023

Looks correct to me. Couple of questions, are you running this on a cluster or on local system? Also, is the data on SSD's? If you are on a single machine, there might not be enough CPU and the async threads may be contending for time with the processing threads. We'll need some profiling info to get a better diagnosis. Also, with SSD's, reads are so fast from the file system that the async feature might show very little improvement. You could turn on the debug logging level for FilePageReader and AsyncMultibufferInputStream. We can continue this in a thread outside this PR.

OK,I will test with the debug logging for more details these days.

@parthchandra
Copy link
Contributor Author

Also try a query like

  select
        SUM(length(IFNULL(ss_sold_date_sk, ' '))),
        SUM(length(IFNULL(ss_sold_time_sk, ' '))),
        SUM(length(IFNULL(ss_item_sk, ' '))),
        SUM(length(IFNULL(ss_customer_sk, ' '))),
        SUM(length(IFNULL(ss_cdemo_sk, ' '))),
        SUM(length(IFNULL(ss_hdemo_sk, ' '))),
        SUM(length(IFNULL(ss_addr_sk, ' '))),
        SUM(length(IFNULL(ss_store_sk, ' '))),
        SUM(length(IFNULL(ss_promo_sk, ' '))),
        SUM(length(IFNULL(ss_ticket_number, ' '))),
        SUM(ss_quantity),
        SUM(ss_wholesale_Cost),
        SUM(ss_list_price ),
        SUM(ss_sales_price),
        SUM(ss_ext_discount_amt),
        SUM(ss_ext_sales_price),
        SUM(ss_ext_wholesale_cost),
        SUM(ss_ext_list_price),
        SUM(ss_ext_tax),
        SUM(ss_coupon_amt),
        SUM(ss_net_paid),
        SUM(ss_net_paid_inc_tax),
        SUM(ss_net_profit)
from store_sales

which avoids the expensive sort

@hazelnutsgz
Copy link

hazelnutsgz commented Mar 23, 2023

Sorry to interrupt, just wondering if this PR can work with S3AsyncClient by any chance? @parthchandra

Thanks!

@parthchandra
Copy link
Contributor Author

Sorry to interrupt, just wondering if this PR can work with S3AsyncClient by any chance? @parthchandra

Thanks!

This PR uses the hdfs interface to s3 (s3a) if the url is s3a. I don't think that the s3a implementation uses any of the implementations of S3AsyncClient.

@wgtmac
Copy link
Member

wgtmac commented Mar 24, 2023

@parthchandra Do you have time to resolve the conflicts? I think it would be nice to be included in the next release.

@parthchandra
Copy link
Contributor Author

@parthchandra Do you have time to resolve the conflicts? I think it would be nice to be included in the next release.

Done

@steveloughran
Copy link
Contributor

@hazelnutsgz hadoop 3.3.5 supports vector IO on an s3 stream; async parallel fetch of blocks, which also works on local fs (and with gcs, abfs TODO items). we see significant performance increases there. There's a PR for it, though as it is 3.3.5+ only, not merged in to asf parquet branches unless the move or we finish a shim library to offer (serialized) support for the api on older releases.

@parthchandra
Copy link
Contributor Author

FWIW the hadoop 3.3.5 vector io changes might make this PR redundant.

@steveloughran
Copy link
Contributor

FWIW the hadoop 3.3.5 vector io changes might make this PR redundant.
on those stores which do it well (s3a, native filesystem); until gcs and abfs add it they'll benefit from it, as will others.

FWIW i'll be at berlin buzzwords in june and 1 of my 2 talks will be on this...if anyone from the parquet dev team is around, it'd be great to have you in the session.

@Fokko
Copy link
Contributor

Fokko commented Jun 6, 2023

@steveloughran 🤚🏻 Looking forward to your talk!

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally sat down to scan through this; I think I'd need to look some more at this to understand it and compare with vector IO.

);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be nice if it was always UncheckedIOException being raised here and elsewhere

ParquetFileReader.setAsyncIOThreadPool(parquetIOThreadPool, false);
ParquetFileReader.setAsyncProcessThreadPool(parquetProcessThreadPool, false );
try {
conf.set("parquet.read.async.io.enabled", Boolean.toString(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or use setBoolean()

public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";

private final ParquetMetadataConverter converter;
// Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have javadocs here so IDEs do popups of their details

private final ParquetMetadataConverter converter;
// Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool
// to initialize this with their own implementations.
public static ExecutorService ioThreadPool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these be public?

boolean useColumnIndexFilter,
boolean usePageChecksumVerification,
boolean useBloomFilter,
boolean enableAsyncIOReader,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

life would be a lot simpler if this code moved to a parameter object containing all the args: adding new options would be straightforward. We did this for the S3ClientFactory to stop external implementations having incompatible signatures when new stuff went in

try {
compressedPage = compressedPages.take().orElse(null);
} catch (InterruptedException e) {
throw new RuntimeException("Error reading parquet page data.") ;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: include caught exception

break;
case DATA_PAGE:
DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
pageBytes = chunk.readAsBytesInput(compressedPageSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've had to do the pool pair in other code to avoid this. what we also do is (and i forget where...) collect statistics on the time spent waiting for an executor. This helps identify when pool size is limiting performance.

@parthchandra
Copy link
Contributor Author

Thanks for the comments @steveloughran. Not sure if we are still thinking of merging this. I'm not sure how well this will play with vector io.
I'll spend some time on this in the next few days.

@steveloughran
Copy link
Contributor

thanks. really looking at how to maximise parquet cloud read perf right now. IF you have traces, flame graphs etc that'd be good, especially for queries with different types and selectivity.

@parthchandra
Copy link
Contributor Author

I'll be able to spend some time on this this week (I bet it will not rebase on top of latest without a ton of conflicts).
Re flamegraphs etc. I don't have any with async io, but from past efforts, I see too much noise from the execution engine. Mostly what you see in the flame graph is the time spent is in waiting and copying data into read buffers. The benefits from parallelism and from threads not waiting for input are not so obvious. I'll give it another shot and share.

@steveloughran
Copy link
Contributor

Mostly what you see in the flame graph is the time spent is in waiting and copying data into read buffers.

on heap or off-heap buffers?

Recent profiling of Hive Tez has shown that because it closes all fs instances in worker process after sub-query execution, time to instantiate S3A filesystem and negotiate TLS connections become significant
apache/hadoop#6892 aims to cut startup/shutdown time in half provided nothing calls rename(), but really they should either stop deleting the instances or cache them until the next sub query come in, and if its from the same user: recycle, else delete asynchronously.

@parthchandra
Copy link
Contributor Author

Mostly what you see in the flame graph is the time spent is in waiting and copying data into read buffers.

on heap or off-heap buffers?

On heap.

@parthchandra
Copy link
Contributor Author

@steveloughran I rebased this so now we can read using either vectorIO and async reader (but not both). Haven't been able to find time to compare.

@steveloughran
Copy link
Contributor

I think it'd be great to have a benchmark which could target remote stores; the current one does local where the latencies are less so the differences less tangible (though vector IO is fast with SSD as you can do parallel reads directly into buffers)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.