Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ import org.apache.spark.unsafe.types.UTF8String
*/
class OrcDeserializer(
dataSchema: StructType,
requiredSchema: StructType,
requestedColIds: Array[Int]) {

private val resultRow = new SpecificInternalRow(requiredSchema.map(_.dataType))
resultSchema: StructType,
requestedColIds: Array[Int],
partitionValues: InternalRow) {

// Make a resultRow and initialize the partition column values once.
private val resultRow = new SpecificInternalRow(resultSchema.map(_.dataType))
private var i = 0
private val start = resultSchema.length - partitionValues.numFields
while (i < partitionValues.numFields) {
resultRow.update(start + i, partitionValues.get(i, resultSchema(start + i).dataType))
i += 1
}

private val fieldWriters: Array[WritableComparable[_] => Unit] = {
requiredSchema.zipWithIndex
resultSchema.zipWithIndex
// The value of missing columns are always null, do not need writers.
.filterNot { case (_, index) => requestedColIds(index) == -1 }
.map { case (f, index) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class OrcFileFormat
true
}

override def buildReader(
override def buildReaderWithPartitionValues(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @cloud-fan . During the previous ORC PR, we left this behind.

sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
Expand All @@ -138,6 +138,8 @@ class OrcFileFormat
}
}

val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)

val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
Expand Down Expand Up @@ -167,8 +169,10 @@ class OrcFileFormat
val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))

val unsafeProjection = UnsafeProjection.create(requiredSchema)
val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds)
val colIds = requestedColIds ++ List.fill(partitionSchema.length)(-1).toArray[Int]
val unsafeProjection = UnsafeProjection.create(resultSchema)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we follow parquet and just join the data row and partition row, and do a final unsafe projection? It's much easier and there is no performance difference.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet Vectorization work like the following.

      // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
      if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
          enableVectorizedReader) {
        iter.asInstanceOf[Iterator[InternalRow]]

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. you meant non-vectorized path. Sorry, I was confused since I focused too much on vectorized path. I'll do.

val deserializer =
new OrcDeserializer(dataSchema, resultSchema, colIds, file.partitionValues)
iter.map(value => unsafeProjection(deserializer.deserialize(value)))
}
}
Expand Down