Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
Expand Down Expand Up @@ -188,9 +188,13 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
// get the table deserializer
val tableSerDe = tableDesc.getDeserializerClass.newInstance()
tableSerDe.initialize(hconf, tableDesc.getProperties)

// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
mutableRow, Some(tableSerDe))
}
}.toSeq

Expand Down Expand Up @@ -264,15 +268,31 @@ private[hive] object HadoopTableReader extends HiveInspectors {
* @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding
* positions in the output schema
* @param mutableRow A reusable `MutableRow` that should be filled
* @param convertdeserializer The `Deserializer` covert the `deserializer`
* @return An `Iterator[Row]` transformed from `iterator`
*/
def fillObject(
iterator: Iterator[Writable],
deserializer: Deserializer,
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[Row] = {
mutableRow: MutableRow,
convertdeserializer: Option[Deserializer] = None): Iterator[Row] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing the deserializer, how about take the converter as the argument? By the way, I think Hive provides the IdentityConverter, which mean we can make the parameter as "ObjectInspectorConverters.Converter", not necessary wrapped by Option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the val soi also need a convert deserializer when the schema doesn't match

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, you're right, forget about my comment above. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the convertdeserializer to outputStructObjectInspector?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable name should be in camel style. convertdeserializer => convertDeserializer? or change it to a better name?


val soi = convertdeserializer match {
case Some(convert) =>
// check need to convert
if (deserializer.getObjectInspector.equals(convert.getObjectInspector)) {
deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
}
else {
HiveShim.getConvertedOI(
deserializer.getObjectInspector(),
convert.getObjectInspector()).asInstanceOf[StructObjectInspector]
}
case None =>
deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
}

val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip
Expand Down Expand Up @@ -315,9 +335,23 @@ private[hive] object HadoopTableReader extends HiveInspectors {
}
}

val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
deserializer.getObjectInspector, soi)

// Map each tuple to a row object
iterator.map { value =>
val raw = deserializer.deserialize(value)
val raw = convertdeserializer match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, we'd better not to do the pattern matching within the iterator, and we can do that like:
`
xx match {
case xxx => iterator.map { ... }
case yyy => iterator.map { ... }
}


For this case, as I shown above, if we passed the converter directly into `fillObject`, I don't think we need the pattern match here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for remind

case Some(convert) =>
if (deserializer.getObjectInspector.equals(convert.getObjectInspector)) {
deserializer.deserialize(value)
}
// If partition schema does not match table schema, update the row to match
else {
partTblObjectInspectorConverter.convert(deserializer.deserialize(value))
}
case None =>
deserializer.deserialize(value)
}
var i = 0
while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,19 @@ class InsertIntoHiveTableSuite extends QueryTest {

sql("DROP TABLE hiveTableWithStructValue")
}

test("SPARK-5498:partition schema does not match table schema"){
val testData = TestHive.sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString)))
testData.registerTempTable("testData")
val tmpDir = Files.createTempDir()
sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked the Hive Document
It says:
The CASCADE|RESTRICT clause is available in Hive 0.15.0. ALTER TABLE CHANGE COLUMN with CASCADE command changes the columns of a table's metadata, and cascades the same change to all the partition metadata. RESTRICT is the default, limiting column change only to table metadata.
I guess in Hive 0.13.1, when table schema changed via alter table, only the table meta data will be updated, can you double check if above query works for Hive 0.13.1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I check this query in Hive 0.11 and hive-0.12 is OK,I will check this query in Hive 0.13.1 later.

checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.toSchemaRDD.collect.toSeq
)
sql("DROP TABLE table_with_partition")

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, PrimitiveObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, ObjectInspector, PrimitiveObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, TypeInfoFactory}
Expand Down Expand Up @@ -187,7 +187,7 @@ private[hive] object HiveShim {

def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE

def createDefaultDBIfNeeded(context: HiveContext) = { }
def createDefaultDBIfNeeded(context: HiveContext) = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unneeded and will cause conflicts with other PRs in flight.


def getCommandProcessor(cmd: Array[String], conf: HiveConf) = {
CommandProcessorFactory.get(cmd(0), conf)
Expand All @@ -208,7 +208,7 @@ private[hive] object HiveShim {

def getDataLocationPath(p: Partition) = p.getPartitionPath

def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)
def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)

def compatibilityBlackList = Seq(
"decimal_.*",
Expand Down Expand Up @@ -242,6 +242,11 @@ private[hive] object HiveShim {
}
}

// make getConvertedOI compatible between 0.12.0 and 0.13.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment seems not necessary, as all of the methods in this file are for the shim purpose.

def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): ObjectInspector = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And use the ObjectInspectorConverters.getConverter instead?

ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, new java.lang.Boolean(true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just true ?

}

def prepareWritable(w: Writable): Writable = {
w
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, DecimalTypeInfo, TypeInfoFactory}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.objectinspector.{PrimitiveObjectInspector, ObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, PrimitiveObjectInspector, ObjectInspector}
import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils}
import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
Expand Down Expand Up @@ -397,7 +397,12 @@ private[hive] object HiveShim {
Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
}
}


// make getConvertedOI compatible between 0.12.0 and 0.13.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Same here.

def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI)
}

/*
* Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
* is needed to initialize before serialization.
Expand Down