Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,15 @@ object QueryExecutionErrors {
""".stripMargin.replaceAll("\n", " "))
}

def foundDuplicateFieldInFieldIdLookupModeError(
requiredId: Int, matchedFields: String): Throwable = {
new RuntimeException(
s"""
|Found duplicate field(s) "$requiredId": $matchedFields
|in id mapping mode
""".stripMargin.replaceAll("\n", " "))
}

def failedToMergeIncompatibleSchemasError(
left: StructType, right: StructType, e: Throwable): Throwable = {
new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,33 @@ object SQLConf {
.intConf
.createWithDefault(4096)

val PARQUET_FIELD_ID_WRITE_ENABLED =
buildConf("spark.sql.parquet.fieldId.write.enabled")
.doc("Field ID is a native field of the Parquet schema spec. When enabled," +
" Parquet writers will populate the field Id" +
Comment thread
jackierwzhang marked this conversation as resolved.
Outdated
" metadata (if present) in the Spark schema to the Parquet schema.")
.version("3.3.0")
.booleanConf
Comment thread
jackierwzhang marked this conversation as resolved.
.createWithDefault(true)
Comment thread
jackierwzhang marked this conversation as resolved.

val PARQUET_FIELD_ID_READ_ENABLED =
buildConf("spark.sql.parquet.fieldId.read.enabled")
.doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
" will use field IDs (if present) in the requested Spark schema to look up Parquet" +
" fields instead of using column names")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val IGNORE_MISSING_PARQUET_FIELD_ID =
buildConf("spark.sql.parquet.fieldId.ignoreMissing")
Comment thread
jackierwzhang marked this conversation as resolved.
Outdated
.doc("When the Parquet file doesn't have any field IDs but the" +
" Spark read schema is using field IDs to read, we will silently return nulls" +
" when this flag is enabled, or error otherwise.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
Expand Down Expand Up @@ -4251,6 +4278,12 @@ class SQLConf extends Serializable with Logging {

def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)

def parquetFieldIdReadEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED)

def parquetFieldIdWriteEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED)

def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)

Comment thread
jackierwzhang marked this conversation as resolved.
def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class ParquetFileFormat
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
sparkSession.sessionState.conf.parquetOutputTimestampType.toString)

conf.set(
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sparkSession.sessionState.conf.parquetFieldIdWriteEnabled.toString)

// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)

Expand Down Expand Up @@ -354,6 +358,7 @@ class ParquetFileFormat
}
} else {
logDebug(s"Falling back to parquet-mr")

Comment thread
jackierwzhang marked this conversation as resolved.
Outdated
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
Expand Down
Loading