Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,35 @@ private[sql] object CatalystConverter {
protected[parquet] def createConverter(
field: FieldType,
fieldIndex: Int,
parent: CatalystConverter): Converter = {
parent: CatalystConverter,
fromProtobuf:Boolean = false): Converter = {
val fieldType: DataType = field.dataType
fieldType match {
case udt: UserDefinedType[_] => {
createConverter(field.copy(dataType = udt.sqlType), fieldIndex, parent)
}
// For native JVM types we use a converter with native arrays
case ArrayType(elementType: NativeType, false) => {
new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
if(fromProtobuf) {
new CatalystProtobufNativeArrayConverter(field.name,elementType,fieldIndex,parent)
} else {
new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
}
}
// This is for other types of arrays, including those with nested fields
case ArrayType(elementType: DataType, false) => {
new CatalystArrayConverter(elementType, fieldIndex, parent)
if(fromProtobuf){
elementType match {
case StructType(fields: Array[StructField]) => {
new CatalystProtobufStructArrayConverter(fields, fieldIndex, parent)
}
case _ => throw new RuntimeException(
s"unable to convert datatype ${field.dataType.toString} in CatalystConverter")

}
} else {
new CatalystArrayConverter(elementType, fieldIndex, parent)
}
}
case ArrayType(elementType: DataType, true) => {
new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent)
Expand Down Expand Up @@ -156,12 +172,13 @@ private[sql] object CatalystConverter {

protected[parquet] def createRootConverter(
parquetSchema: MessageType,
attributes: Seq[Attribute]): CatalystConverter = {
attributes: Seq[Attribute],
fromProtobuf:Boolean=false): CatalystConverter = {
// For non-nested types we use the optimized Row converter
if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
new CatalystPrimitiveRowConverter(attributes.toArray)
} else {
new CatalystGroupConverter(attributes.toArray)
new CatalystGroupConverter(attributes.toArray, fromProtobuf)
}
}
}
Expand Down Expand Up @@ -279,27 +296,29 @@ private[parquet] class CatalystGroupConverter(
protected[parquet] val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var current: ArrayBuffer[Any],
protected[parquet] var buffer: ArrayBuffer[Row])
protected[parquet] var buffer: ArrayBuffer[Row],
protected[parquet] val fromProtobuf: Boolean = false)
extends CatalystConverter {

def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) =
def this(schema: Array[FieldType], index: Int, parent: CatalystConverter, fromProtobuf:Boolean=false) =
this(
schema,
index,
parent,
current = null,
buffer = new ArrayBuffer[Row](
CatalystArrayConverter.INITIAL_ARRAY_SIZE))
CatalystArrayConverter.INITIAL_ARRAY_SIZE),
fromProtobuf)

/**
* This constructor is used for the root converter only!
*/
def this(attributes: Array[Attribute]) =
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
def this(attributes: Array[Attribute],fromProtobuf:Boolean = false) =
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null, fromProtobuf)

protected [parquet] val converters: Array[Converter] =
schema.zipWithIndex.map {
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
case (field, idx) => CatalystConverter.createConverter(field, idx, this, fromProtobuf)
}.toArray

override val size = schema.size
Expand Down Expand Up @@ -746,6 +765,8 @@ private[parquet] class CatalystNativeArrayConverter(
}
}



/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array contains null (see
Expand Down Expand Up @@ -825,6 +846,82 @@ private[parquet] class CatalystArrayContainsNullConverter(
}
}

private[parquet] class CatalystProtobufNativeArrayConverter(
val name:String,
val elementType: NativeType,
val fieldIndex: Int,
val parent: CatalystConverter,
var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE)
extends PrimitiveConverter {

type NativeType = elementType.JvmType

private var buffer: Array[NativeType] = elementType.classTag.newArray(capacity)

private var elements: Int = 0


private def addValue(value: NativeType): Unit = {
checkGrowBuffer()
buffer(elements) = value
elements += 1
parent.updateField(
fieldIndex,
buffer.slice(0, elements).toSeq)
}

override def addBinary(value: Binary): Unit = addValue(value.getBytes.asInstanceOf[NativeType])


override def addBoolean(value: Boolean): Unit = addValue(value.asInstanceOf[NativeType])

override def addDouble(value: Double): Unit = addValue(value.asInstanceOf[NativeType])

override def addFloat(value: Float): Unit = addValue(value.asInstanceOf[NativeType])


override def addInt(value: Int): Unit = addValue(value.asInstanceOf[NativeType])


override def addLong(value: Long): Unit = addValue(value.asInstanceOf[NativeType])


private def checkGrowBuffer(): Unit = {
if (elements >= capacity) {
val newCapacity = 2 * capacity
val tmp: Array[NativeType] = elementType.classTag.newArray(newCapacity)
Array.copy(buffer, 0, tmp, 0, capacity)
buffer = tmp
capacity = newCapacity
}
}
}

class CatalystProtobufStructArrayConverter(fields: Array[FieldType], myFieldIndex: Int, parent: CatalystConverter)
extends CatalystGroupConverter(fields, myFieldIndex, parent, fromProtobuf = true) {
val rowBuffer: ArrayBuffer[GenericRow] = new ArrayBuffer[GenericRow](CatalystArrayConverter.INITIAL_ARRAY_SIZE)
var currentRow:Array[Any] = new Array[Any](fields.length)
var elements = 0
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
currentRow(fieldIndex) = value
}

override def end(): Unit = {
rowBuffer += new GenericRow(currentRow)
currentRow = new Array[Any](fields.length)
elements +=1
parent.updateField(myFieldIndex, rowBuffer.slice(0, elements))
}

override def start(): Unit = {
super.start()
}

override protected[parquet] def clearBuffer(): Unit = {
super.clearBuffer()
rowBuffer.clear()
}
}
/**
* This converter is for multi-element groups of primitive or complex types
* that have repetition level optional or required (so struct fields).
Expand Down Expand Up @@ -923,3 +1020,5 @@ private[parquet] class CatalystMapConverter(
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit =
throw new UnsupportedOperationException
}


Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.parquet

import java.util
import java.util.{HashMap => JHashMap}

import org.apache.hadoop.conf.Configuration
Expand All @@ -39,8 +40,8 @@ import org.apache.spark.sql.types._
private[parquet] class RowRecordMaterializer(root: CatalystConverter)
extends RecordMaterializer[Row] {

def this(parquetSchema: MessageType, attributes: Seq[Attribute]) =
this(CatalystConverter.createRootConverter(parquetSchema, attributes))
def this(parquetSchema: MessageType, attributes: Seq[Attribute], fromProtobuf: Boolean = false) =
this(CatalystConverter.createRootConverter(parquetSchema, attributes, fromProtobuf))

override def getCurrentRecord: Row = root.getCurrentRecord

Expand Down Expand Up @@ -87,7 +88,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
parquetSchema, false, true)
}
log.debug(s"list of attributes that will be read: $schema")
new RowRecordMaterializer(parquetSchema, schema)
val isProtobuf = "true".equals(readContext.getReadSupportMetadata.get(RowReadSupport.FROM_PROTOBUF))
new RowRecordMaterializer(parquetSchema, schema, fromProtobuf = isProtobuf)
}

override def init(
Expand All @@ -99,14 +101,18 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)

if (requestedAttributes != null) {
val keySet: util.Set[String] = keyValueMetaData.keySet()
// If the parquet file is thrift derived, there is a good chance that
// it will have the thrift class in metadata.
val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
val isThriftDerived = keySet.contains("thrift.class")
val isProto = keySet.contains("parquet.proto.class")
metadata.put(RowReadSupport.FROM_PROTOBUF, isProto.toString)
parquetSchema = ParquetTypesConverter
.convertFromAttributes(requestedAttributes, isThriftDerived)
.convertFromAttributes(requestedAttributes, isThriftDerived,isProto)
val converter: String = ParquetTypesConverter.convertToString(requestedAttributes)
metadata.put(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(requestedAttributes))
converter)
}

val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
Expand All @@ -121,6 +127,7 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
private[parquet] object RowReadSupport {
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
val FROM_PROTOBUF = "org.apache.spark.sql.parquet.row.protobuf"

private def getRequestedSchema(configuration: Configuration): Seq[Attribute] = {
val schemaString = configuration.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
Expand Down
Loading