Skip to content

Conversation

@comphead
Copy link
Contributor

Which issue does this PR close?

Related #1360.

Rationale for this change

Very often the native engine behavior depends on external Spark job params (HDFS configuration, INT96, etc) but there is no access from native code to Spark configuration.

What changes are included in this PR?

Extending ExecutionContext to enclose Spark params from JVM

How are these changes tested?

@comphead
Copy link
Contributor Author

Some part were rolled back in #1101

debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get())
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
sparkConfig = SparkEnv.get.conf.getAll.toMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't toMap return a Scala Map (and not a Java HashMap)? Does this translate correctly to a JMap?
Also, SparkConf.getAll will return all Spark confs. Should we filter based on Comet conf and Spark confs of interest only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments are super valid and everything done in local branch yes, haven't yet pushed it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented because I thought we were planning to get this in the 0.8.0 release :)

task_attempt_id: jlong,
debug_native: jboolean,
explain_native: jboolean,
spark_conf: JObject,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternate approach could be to encode the Spark config in protobuf format and pass those bytes into native code rather than have native code make calls to the JVM to iterate over the map.

@comphead
Copy link
Contributor Author

When testing I just realized the Apache Spark already send defaultFS with schema to the logical plan and then to Comet

So this test works and Spark sends /tmp/2 to native site with prefixed hdfs://namenode:9000/tmp/2

  test("Test V1 parquet scan uses native_datafusion with HDFS") {
    withSQLConf(
      CometConf.COMET_ENABLED.key -> "true",
      CometConf.COMET_EXEC_ENABLED.key -> "true",
      CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
      SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
      "fs.defaultFS" -> "hdfs://namenode:9000",
      "dfs.client.use.datanode.hostname" -> "true") {
      val df = spark.read.parquet("/tmp/2")
      df.show(false)
      df.explain("extended")
    }
  }

However when running spark-shell another param should be used --conf spark.hadoop.fs.defaultFS=hdfs://namenode:9000

@comphead
Copy link
Contributor Author

@parthchandra @andygrove I'm planning to close the PR as spark transparently sends the conf to native, however keeping the Spark conf code in DataFusion like style in this PR for future if we still need it.

@comphead
Copy link
Contributor Author

Code in 3e93995

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants