Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 17 additions & 33 deletions common/src/main/java/org/apache/comet/parquet/ReadOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,15 @@
import org.apache.spark.SparkEnv;
import org.apache.spark.launcher.SparkLauncher;

import org.apache.comet.CometConf;

/**
* Comet specific Parquet related read options.
*
* <p>TODO: merge this with {@link org.apache.parquet.HadoopReadOptions} once PARQUET-2203 is done.
*/
public class ReadOptions {
private static final Logger LOG = LoggerFactory.getLogger(ReadOptions.class);
public static final String COMET_PARQUET_PARALLEL_IO_ENABLED =
"comet.parquet.read.parallel.io.enabled";
public static final boolean COMET_PARQUET_PARALLEL_IO_ENABLED_DEFAULT = true;

public static final String COMET_PARQUET_PARALLEL_IO_THREADS =
"comet.parquet.read.parallel.io.thread-pool.size";
public static final int COMET_PARQUET_PARALLEL_IO_THREADS_DEFAULT = 32;

public static final String COMET_IO_MERGE_RANGES = "comet.parquet.read.io.mergeRanges";
private static final boolean COMET_IO_MERGE_RANGES_DEFAULT = true;

public static final String COMET_IO_MERGE_RANGES_DELTA =
"comet.parquet.read.io.mergeRanges.delta";
private static final int COMET_IO_MERGE_RANGES_DELTA_DEFAULT = 1 << 23; // 8 MB

// In the parallel reader, if the read ranges submitted are skewed in sizes, this
// option will cause the reader to break up larger read ranges into smaller ranges
// to reduce the skew. This will result in a slightly larger number of connections
// opened to the file system but may give improved performance.
// The option is off by default.
public static final String COMET_IO_ADJUST_READRANGE_SKEW =
"comet.parquet.read.io.adjust.readRange.skew";

private static final boolean COMET_IO_ADJUST_READRANGE_SKEW_DEFAULT = false;

// Max number of concurrent tasks we expect. Used to autoconfigure S3 client connections
public static final int S3A_MAX_EXPECTED_PARALLELISM = 32;
Expand Down Expand Up @@ -112,10 +90,6 @@ public static Builder builder(Configuration conf) {
return new Builder(conf);
}

public static Builder builder() {
return builder(new Configuration());
}

public static class Builder {
private final Configuration conf;

Expand Down Expand Up @@ -173,14 +147,24 @@ public Builder(Configuration conf) {
this.conf = conf;
this.parallelIOEnabled =
conf.getBoolean(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is reading from the Hadoop conf. If I set the new configs on my Spark context, how would they get propagated to the Hadoop conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark sql copies over configs that are not from spark into the hadoop config when the sql context is created. There are other settings that also use this ( e.g. COMET_USE_LAZY_MATERIALIZATION, COMET_SCAN_PREFETCH_ENABLED)

COMET_PARQUET_PARALLEL_IO_ENABLED, COMET_PARQUET_PARALLEL_IO_ENABLED_DEFAULT);
CometConf.COMET_PARQUET_PARALLEL_IO_ENABLED().key(),
(Boolean) CometConf.COMET_PARQUET_PARALLEL_IO_ENABLED().defaultValue().get());
this.parallelIOThreadPoolSize =
conf.getInt(COMET_PARQUET_PARALLEL_IO_THREADS, COMET_PARQUET_PARALLEL_IO_THREADS_DEFAULT);
this.ioMergeRanges = conf.getBoolean(COMET_IO_MERGE_RANGES, COMET_IO_MERGE_RANGES_DEFAULT);
conf.getInt(
CometConf.COMET_PARQUET_PARALLEL_IO_THREADS().key(),
(Integer) CometConf.COMET_PARQUET_PARALLEL_IO_THREADS().defaultValue().get());
this.ioMergeRanges =
conf.getBoolean(
CometConf.COMET_IO_MERGE_RANGES().key(),
(boolean) CometConf.COMET_IO_MERGE_RANGES().defaultValue().get());
this.ioMergeRangesDelta =
conf.getInt(COMET_IO_MERGE_RANGES_DELTA, COMET_IO_MERGE_RANGES_DELTA_DEFAULT);
conf.getInt(
CometConf.COMET_IO_MERGE_RANGES_DELTA().key(),
(Integer) CometConf.COMET_IO_MERGE_RANGES_DELTA().defaultValue().get());
this.adjustReadRangeSkew =
conf.getBoolean(COMET_IO_ADJUST_READRANGE_SKEW, COMET_IO_ADJUST_READRANGE_SKEW_DEFAULT);
conf.getBoolean(
CometConf.COMET_IO_ADJUST_READRANGE_SKEW().key(),
(Boolean) CometConf.COMET_IO_ADJUST_READRANGE_SKEW().defaultValue().get());
// override some S3 defaults
setS3Config();
}
Expand Down
42 changes: 42 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,48 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
.doc(
"Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads " +
"ranges of consecutive data in a file in parallel. It is faster for large files and " +
"row groups but uses more resources. The parallel reader is enabled by default.")
.booleanConf
.createWithDefault(true)

val COMET_PARQUET_PARALLEL_IO_THREADS: ConfigEntry[Int] =
conf("spark.comet.parquet.read.parallel.io.thread-pool.size")
.doc("The maximum number of parallel threads the parallel reader will use in a single " +
"executor. For executors configured with a smaller number of cores, use a smaller number.")
.intConf
.createWithDefault(16)

val COMET_IO_MERGE_RANGES: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.io.mergeRanges")
.doc(
"When enabled the parallel reader will try to merge ranges of data that are separated " +
"by less than 'comet.parquet.read.io.mergeRanges.delta' bytes. Longer continuous reads " +
"are faster on cloud storage. The default behavior is to merge consecutive ranges.")
.booleanConf
.createWithDefault(true)

val COMET_IO_MERGE_RANGES_DELTA: ConfigEntry[Int] =
conf("spark.comet.parquet.read.io.mergeRanges.delta")
.doc(
"The delta in bytes between consecutive read ranges below which the parallel reader " +
"will try to merge the ranges. The default is 8MB.")
.intConf
.createWithDefault(1 << 23) // 8 MB

val COMET_IO_ADJUST_READRANGE_SKEW: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.io.adjust.readRange.skew")
.doc("In the parallel reader, if the read ranges submitted are skewed in sizes, this " +
"option will cause the reader to break up larger read ranges into smaller ranges to " +
"reduce the skew. This will result in a slightly larger number of connections opened to " +
"the file system but may give improved performance. The option is off by default.")
.booleanConf
.createWithDefault(false)

val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.convert.parquet.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Types;

import org.apache.comet.CometConf;

import static org.apache.parquet.column.Encoding.*;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
import static org.junit.Assert.*;
Expand Down Expand Up @@ -615,12 +617,12 @@ public void testColumnIndexReadWrite() throws Exception {
@Test
public void testWriteReadMergeScanRange() throws Throwable {
Configuration conf = new Configuration();
conf.set(ReadOptions.COMET_IO_MERGE_RANGES, Boolean.toString(true));
conf.set(CometConf.COMET_IO_MERGE_RANGES().key(), Boolean.toString(true));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is setting the config values in a Hadoop Configuration still, and not setting them in the Spark config. Would it make sense to update the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no spark context in this test. I've added a new test with the configuration set thru the spark config

// Set the merge range delta so small that ranges do not get merged
conf.set(ReadOptions.COMET_IO_MERGE_RANGES_DELTA, Integer.toString(1024));
conf.set(CometConf.COMET_IO_MERGE_RANGES_DELTA().key(), Integer.toString(1024));
testReadWrite(conf, 2, 1024);
// Set the merge range delta so large that all ranges get merged
conf.set(ReadOptions.COMET_IO_MERGE_RANGES_DELTA, Integer.toString(1024 * 1024));
conf.set(CometConf.COMET_IO_MERGE_RANGES_DELTA().key(), Integer.toString(1024 * 1024));
testReadWrite(conf, 2, 1024);
}

Expand Down
5 changes: 5 additions & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ Comet provides the following configuration settings.
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false |
| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. The option is off by default. | false |
| spark.comet.parquet.read.io.mergeRanges | When enabled the parallel reader will try to merge ranges of data that are separated by less than 'comet.parquet.read.io.mergeRanges.delta' bytes. Longer continuous reads are faster on cloud storage. The default behavior is to merge consecutive ranges. | true |
| spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 |
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. The parallel reader is enabled by default. | true |
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,93 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}
}

test("test merge scan range") {
def makeRawParquetFile(path: Path, n: Int): Seq[Option[Int]] = {
val dictionaryPageSize = 1024
val pageRowCount = 500
val schemaStr =
"""
|message root {
| optional int32 _1(INT_16);
| optional int32 _2;
| optional int64 _3;
|}
""".stripMargin

val schema = MessageTypeParser.parseMessageType(schemaStr)
val writer = createParquetWriter(
schema,
path,
dictionaryEnabled = true,
dictionaryPageSize = dictionaryPageSize,
pageRowCountLimit = pageRowCount)

val rand = scala.util.Random
val expected = (0 until n).map { i =>
// use a single value for the first page, to make sure dictionary encoding kicks in
val value = if (i < pageRowCount) i % 8 else i
if (rand.nextBoolean()) None
else Some(value)
}

expected.foreach { opt =>
val record = new SimpleGroup(schema)
opt match {
case Some(i) =>
record.add(0, i.toShort)
record.add(1, i)
record.add(2, i.toLong)
case _ =>
}
writer.write(record)
}

writer.close()
expected
}

Seq(16, 128).foreach { batchSize =>
Seq(1024, 1024 * 1024).foreach { mergeRangeDelta =>
{
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> batchSize.toString,
CometConf.COMET_IO_MERGE_RANGES.key -> "true",
CometConf.COMET_IO_MERGE_RANGES_DELTA.key -> mergeRangeDelta.toString) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to test that this config is actually making into ReadOptions? If I comment out all of the code in ReadOptions that reads these configs, the test still passes. Perhaps we just need a specific test to show that setting the config on a Spark context causes a change to the ReadOptions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some debug logging and I do see that it is working correctly, but would be good to have a test to confirm (and prevent regressions)

test is setting COMET_IO_MERGE_RANGES_DELTA = 1048576
ReadOptions ioMergeRangesDelta = 1048576
test is setting COMET_IO_MERGE_RANGES_DELTA = 1024
ReadOptions ioMergeRangesDelta = 1024

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an additional check for the config. The configuration passed in to read options is not accessible but I tried to simulate the next best thing. See if that makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Parth. LGTM. Looks like you need to run make format to fix some import ordering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove. Fixed style.

withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
val expected = makeRawParquetFile(path, 10000)
val schema = StructType(
Seq(StructField("_1", ShortType, true), StructField("_3", LongType, true)))
readParquetFile(path.toString, Some(schema)) { df =>
{
// CometScanExec calls sessionState.newHadoopConfWithOptions which copies
// the sqlConf and some additional options to the hadoopConf and then
// uses the result to create the inputRDD (https://github.com/apache/datafusion-comet/blob/3783faaa01078a35bee93b299368f8c72869198d/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala#L181).
// We don't have access to the created hadoop Conf, but can confirm that the
// result does contain the correct configuration
assert(
df.sparkSession.sessionState
.newHadoopConfWithOptions(Map.empty)
.get(CometConf.COMET_IO_MERGE_RANGES_DELTA.key)
.equals(mergeRangeDelta.toString))
checkAnswer(
df,
expected.map {
case None =>
Row(null, null)
case Some(i) =>
Row(i.toShort, i.toLong)
})
}
}
}
}
}
}
}
}

def testScanner(cometEnabled: String, scanner: String, v1: Option[String] = None): Unit = {
withSQLConf(
CometConf.COMET_ENABLED.key -> cometEnabled,
Expand Down