Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import javax.xml.stream.events._
import javax.xml.transform.stream.StreamSource
import javax.xml.validation.Schema

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.Try
Expand All @@ -35,7 +36,21 @@ import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, BadRecordException, CaseInsensitiveMap, DateFormatter, DropMalformedMode, FailureSafeParser, GenericArrayData, MapData, ParseMode, PartialResultArrayException, PartialResultException, PermissiveMode, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.{
ArrayBasedMapData,
BadRecordException,
CaseInsensitiveMap,
DateFormatter,
DropMalformedMode,
FailureSafeParser,
GenericArrayData,
MapData,
ParseMode,
PartialResultArrayException,
PartialResultException,
PermissiveMode,
TimestampFormatter
}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.xml.StaxXmlParser.convertStream
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -69,6 +84,7 @@ class StaxXmlParser(

private val decimalParser = ExprUtils.getDecimalParser(options.locale)

private val caseSensitive = SQLConf.get.caseSensitiveAnalysis

/**
* Parses a single XML string and turns it into either one resulting row or no row (if the
Expand All @@ -85,7 +101,7 @@ class StaxXmlParser(
}

private def getFieldNameToIndex(schema: StructType): Map[String, Int] = {
if (SQLConf.get.caseSensitiveAnalysis) {
if (caseSensitive) {
schema.map(_.name).zipWithIndex.toMap
} else {
CaseInsensitiveMap(schema.map(_.name).zipWithIndex.toMap)
Expand Down Expand Up @@ -201,27 +217,30 @@ class StaxXmlParser(
case (_: EndElement, _: DataType) => null
case (c: Characters, ArrayType(st, _)) =>
// For `ArrayType`, it needs to return the type of element. The values are merged later.
parser.next
convertTo(c.getData, st)
case (c: Characters, st: StructType) =>
// If a value tag is present, this can be an attribute-only element whose values is in that
// value tag field. Or, it can be a mixed-type element with both some character elements
// and other complex structure. Character elements are ignored.
val attributesOnly = st.fields.forall { f =>
f.name == options.valueTag || f.name.startsWith(options.attributePrefix)
}
if (attributesOnly) {
// If everything else is an attribute column, there's no complex structure.
// Just return the value of the character element, or null if we don't have a value tag
st.find(_.name == options.valueTag).map(
valueTag => convertTo(c.getData, valueTag.dataType)).orNull
} else {
// Otherwise, ignore this character element, and continue parsing the following complex
// structure
parser.next
parser.peek match {
case _: EndElement => null // no struct here at all; done
case _ => convertObject(parser, st)
}
parser.next
parser.peek match {
case _: EndElement =>
// It couldn't be an array of value tags
// as the opening tag is immediately followed by a closing tag.
if (c.isWhiteSpace) {
return null
}
val indexOpt = getFieldNameToIndex(st).get(options.valueTag)
indexOpt match {
case Some(index) =>
convertTo(c.getData, st.fields(index).dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be returning a Row? If yes, please make sure that there is a test scenario covering this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convertTo converts primitive values based on its data type. It will not return a Row

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I get that. It looks like the assumption is that convertField will either return a Row or a singleton valueTag with just value.

case None => null
}
case _ =>
val row = convertObject(parser, st)
Comment on lines +237 to +238
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this handle values separated by comment or cdata? If so, we don't need case _: EndElement above.

<ROW>
  <a> 1 <!--this is a comment--> 2 </a>
</ROW>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up! I added some test cases for comments. We still need this branch asconvertObject cannot handle value tag.

if (!c.isWhiteSpace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document the behavior of whitespaces for valueTag. Also, the following scenarios, which contain whitespaces with quotes:

<ROW><a>" "</a></ROW>
<ROW><b>" "<c>1</c></b></ROW>
<ROW><d><e attr=" "></e></d></ROW>

addOrUpdate(row.toSeq(st).toArray, st, options.valueTag, c.getData, addToTail = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why addToTail is false here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because in this case, we encounter the interspersed value first and then the nested objects. We want to make sure that the value tag appears before the nested objects

} else {
row
}
}
case (_: Characters, _: StringType) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is parser.next not required here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to move the next event. currentStructureAsString will move the parser pointer.

convertTo(StaxXmlParserUtils.currentStructureAsString(parser), StringType)
Expand All @@ -237,6 +256,7 @@ class StaxXmlParser(
case _ => convertField(parser, dataType, attributes)
}
case (c: Characters, dt: DataType) =>
parser.next
convertTo(c.getData, dt)
case (e: XMLEvent, dt: DataType) =>
throw new IllegalArgumentException(
Expand All @@ -262,7 +282,12 @@ class StaxXmlParser(
case e: StartElement =>
kvPairs +=
(UTF8String.fromString(StaxXmlParserUtils.getName(e.asStartElement.getName, options)) ->
convertField(parser, valueType))
convertField(parser, valueType))
case c: Characters if !c.isWhiteSpace =>
// Create a value tag field for it
kvPairs +=
// TODO: We don't support an array value tags in map yet.
(UTF8String.fromString(options.valueTag) -> convertTo(c.getData, valueType))
case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
case _ => // do nothing
Expand Down Expand Up @@ -343,8 +368,9 @@ class StaxXmlParser(
val row = new Array[Any](schema.length)
val nameToIndex = getFieldNameToIndex(schema)
// If there are attributes, then we process them first.
convertAttributes(rootAttributes, schema).toSeq.foreach { case (f, v) =>
nameToIndex.get(f).foreach { row(_) = v }
convertAttributes(rootAttributes, schema).toSeq.foreach {
case (f, v) =>
nameToIndex.get(f).foreach { row(_) = v }
}

val wildcardColName = options.wildcardColName
Expand Down Expand Up @@ -405,15 +431,11 @@ class StaxXmlParser(
badRecordException = badRecordException.orElse(Some(e))
}

case c: Characters if !c.isWhiteSpace && isRootAttributesOnly =>
nameToIndex.get(options.valueTag) match {
case Some(index) =>
row(index) = convertTo(c.getData, schema(index).dataType)
case None => // do nothing
}
case c: Characters if !c.isWhiteSpace =>
addOrUpdate(row, schema, options.valueTag, c.getData)

case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
shouldStop = parseAndCheckEndElement(row, schema, parser)

case _ => // do nothing
}
Expand Down Expand Up @@ -576,6 +598,54 @@ class StaxXmlParser(
castTo(data, FloatType).asInstanceOf[Float]
}
}

@tailrec
private def parseAndCheckEndElement(
row: Array[Any],
schema: StructType,
parser: XMLEventReader): Boolean = {
parser.peek match {
case _: EndElement | _: EndDocument => true
case _: StartElement => false
case c: Characters if !c.isWhiteSpace =>
parser.nextEvent()
addOrUpdate(row, schema, options.valueTag, c.getData)
parseAndCheckEndElement(row, schema, parser)
case _ =>
parser.nextEvent()
parseAndCheckEndElement(row, schema, parser)
}
}

private def addOrUpdate(
row: Array[Any],
schema: StructType,
name: String,
data: String,
addToTail: Boolean = true): InternalRow = {
schema.getFieldIndex(name) match {
case Some(index) =>
schema(index).dataType match {
case ArrayType(elementType, _) =>
val value = convertTo(data, elementType)
val result = if (row(index) == null) {
ArrayBuffer(value)
} else {
val genericArrayData = row(index).asInstanceOf[GenericArrayData]
if (addToTail) {
genericArrayData.toArray(elementType) :+ value
} else {
value +: genericArrayData.toArray(elementType)
}
}
row(index) = new GenericArrayData(result)
case dataType =>
row(index) = convertTo(data, dataType)
}
case None => // do nothing
}
InternalRow.fromSeq(row.toIndexedSeq)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
}
}

@tailrec
private def inferField(parser: XMLEventReader): DataType = {
parser.peek match {
case _: EndElement => NullType
Expand All @@ -182,18 +181,25 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
case _ => inferField(parser)
}
case c: Characters if !c.isWhiteSpace =>
// This could be the characters of a character-only element, or could have mixed
// characters and other complex structure
val characterType = inferFrom(c.getData)
parser.nextEvent()
parser.peek match {
case _: StartElement =>
// Some more elements follow; so ignore the characters.
// Use the schema of the rest
inferObject(parser).asInstanceOf[StructType]
// Some more elements follow;
// This is a mix of values and other elements
val innerType = inferObject(parser).asInstanceOf[StructType]
addOrUpdateValueTagType(innerType, characterType)
case _ =>
// That's all, just the character-only body; use that as the type
characterType
val fieldType = inferField(parser)
fieldType match {
case st: StructType => addOrUpdateValueTagType(st, characterType)
case _: NullType => characterType
case _: DataType =>
// The field type couldn't be an array type
new StructType()
.add(options.valueTag, addOrUpdateType(Some(characterType), fieldType))

}
}
case e: XMLEvent =>
throw new IllegalArgumentException(s"Failed to parse data with unexpected event $e")
Expand Down Expand Up @@ -229,17 +235,19 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
val nameToDataType =
collection.mutable.TreeMap.empty[String, DataType](caseSensitivityOrdering)

def addOrUpdateType(fieldName: String, newType: DataType): Unit = {
val oldTypeOpt = nameToDataType.get(fieldName)
oldTypeOpt match {
// If the field name exists in the map,
// merge the type and infer the combined field as an array type if necessary
case Some(oldType) if !oldType.isInstanceOf[ArrayType] =>
nameToDataType.update(fieldName, ArrayType(compatibleType(oldType, newType)))
case Some(oldType) =>
nameToDataType.update(fieldName, compatibleType(oldType, newType))
case None =>
nameToDataType.put(fieldName, newType)
@tailrec
def inferAndCheckEndElement(parser: XMLEventReader): Boolean = {
parser.peek match {
case _: EndElement | _: EndDocument => true
case _: StartElement => false
case c: Characters if !c.isWhiteSpace =>
val characterType = inferFrom(c.getData)
parser.nextEvent()
addOrUpdateType(nameToDataType, options.valueTag, characterType)
inferAndCheckEndElement(parser)
case _ =>
parser.nextEvent()
inferAndCheckEndElement(parser)
}
}

Expand All @@ -248,7 +256,7 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
StaxXmlParserUtils.convertAttributesToValuesMap(rootAttributes, options)
rootValuesMap.foreach {
case (f, v) =>
addOrUpdateType(f, inferFrom(v))
addOrUpdateType(nameToDataType, f, inferFrom(v))
}
var shouldStop = false
while (!shouldStop) {
Expand Down Expand Up @@ -281,29 +289,19 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
}
// Add the field and datatypes so that we can check if this is ArrayType.
val field = StaxXmlParserUtils.getName(e.asStartElement.getName, options)
addOrUpdateType(field, inferredType)
addOrUpdateType(nameToDataType, field, inferredType)

case c: Characters if !c.isWhiteSpace =>
// This can be an attribute-only object
val valueTagType = inferFrom(c.getData)
addOrUpdateType(options.valueTag, valueTagType)
addOrUpdateType(nameToDataType, options.valueTag, valueTagType)

case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
shouldStop = inferAndCheckEndElement(parser)

case _ => // do nothing
}
}
// A structure object is an attribute-only element
// if it only consists of attributes and valueTags.
// If not, we will remove the valueTag field from the schema
val attributesOnly = nameToDataType.forall {
case (fieldName, _) =>
fieldName == options.valueTag || fieldName.startsWith(options.attributePrefix)
}
if (!attributesOnly) {
nameToDataType -= options.valueTag
}

// Note: other code relies on this sorting for correctness, so don't remove it!
StructType(nameToDataType.map{
Expand Down Expand Up @@ -534,4 +532,75 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
}
}
}

/**
* This helper function merges the data type of value tags and inner elements.
* It could only be structure data. Consider the following case,
* <a>
* value1
* <b>1</b>
* value2
* </a>
* Input: ''a struct<b int, _VALUE string>'' and ''_VALUE string''
* Return: ''a struct<b int, _VALUE array<string>>''
* @param objectType inner elements' type
* @param valueTagType value tag's type
*/
private[xml] def addOrUpdateValueTagType(
objectType: DataType,
valueTagType: DataType): DataType = {
(objectType, valueTagType) match {
case (st: StructType, _) =>
val valueTagIndexOpt = st.getFieldIndex(options.valueTag)

valueTagIndexOpt match {
// If the field name exists in the inner elements,
// merge the type and infer the combined field as an array type if necessary
case Some(index) if !st(index).dataType.isInstanceOf[ArrayType] =>
updateStructField(
st,
index,
ArrayType(compatibleType(st(index).dataType, valueTagType)))
case Some(index) =>
updateStructField(st, index, compatibleType(st(index).dataType, valueTagType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't st(index).dataType will be of ArrayType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this branch handles this case of array type. If it's an array, we will merge the element types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add this test case scenario where Array<LongType> is updated to Array<DoubleType>:

<ROW>
  <a>
    1
    <b>2</b>
    3
    <b>4</b>
    5.0
  </a>
</ROW>

case None =>
st.add(options.valueTag, valueTagType)
}
case _ =>
throw new IllegalStateException(
"illegal state when merging value tags types in schema inference"
)
}
}

private def updateStructField(
structType: StructType,
index: Int,
newType: DataType): StructType = {
val newFields: Array[StructField] =
structType.fields.updated(index, structType.fields(index).copy(dataType = newType))
StructType(newFields)
}

private def addOrUpdateType(
nameToDataType: collection.mutable.TreeMap[String, DataType],
fieldName: String,
newType: DataType): Unit = {
val oldTypeOpt = nameToDataType.get(fieldName)
val mergedType = addOrUpdateType(oldTypeOpt, newType)
nameToDataType.put(fieldName, mergedType)
}

private def addOrUpdateType(oldTypeOpt: Option[DataType], newType: DataType): DataType = {
oldTypeOpt match {
// If the field name already exists,
// merge the type and infer the combined field as an array type if necessary
case Some(oldType) if !oldType.isInstanceOf[ArrayType] && !newType.isInstanceOf[NullType] =>
ArrayType(compatibleType(oldType, newType))
case Some(oldType) =>
compatibleType(oldType, newType)
case None =>
newType
}
}
}
Loading