Skip to content

Commit

Permalink
Spark: Support read with settings (#367)
Browse files Browse the repository at this point in the history
* allow read with settings

* make  optional

* update doc

* update doc to describe  default value is None

---------

Co-authored-by: Hua Shi <[email protected]>
  • Loading branch information
harryshi10 and huashi-st authored Dec 9, 2024
1 parent 35088ca commit 7a06a13
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/configurations/02_sql_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using comp
spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0
spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0
spark.clickhouse.read.fixedStringAs|binary|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string|0.8.0
spark.clickhouse.read.settings|None|Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
spark.clickhouse.read.format|json|Serialize format for reading. Supported formats: json, binary|0.6.0
spark.clickhouse.read.runtimeFilter.enabled|false|Enable runtime filter for reading.|0.8.0
spark.clickhouse.read.splitByPartitionId|true|If `true`, construct input partition filter by virtual column `_partition_id`, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+|0.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.apache.spark.sql.clickhouse

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf._
import com.clickhouse.spark.exception.ClickHouseErrCode._

Expand Down Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.read.settings")
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.apache.spark.sql.clickhouse

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf._
import com.clickhouse.spark.exception.ClickHouseErrCode._

Expand Down Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.read.settings")
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.apache.spark.sql.clickhouse

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf._
import com.clickhouse.spark.exception.ClickHouseErrCode._

Expand Down Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.read.settings")
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}

0 comments on commit 7a06a13

Please sign in to comment.