Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import javax.xml.stream.events._
import javax.xml.transform.stream.StreamSource
import javax.xml.validation.Schema

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.Try
Expand All @@ -35,7 +36,21 @@ import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, BadRecordException, CaseInsensitiveMap, DateFormatter, DropMalformedMode, FailureSafeParser, GenericArrayData, MapData, ParseMode, PartialResultArrayException, PartialResultException, PermissiveMode, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.{
ArrayBasedMapData,
BadRecordException,
CaseInsensitiveMap,
DateFormatter,
DropMalformedMode,
FailureSafeParser,
GenericArrayData,
MapData,
ParseMode,
PartialResultArrayException,
PartialResultException,
PermissiveMode,
TimestampFormatter
}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.xml.StaxXmlParser.convertStream
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand All @@ -62,6 +77,7 @@ class StaxXmlParser(

private val decimalParser = ExprUtils.getDecimalParser(options.locale)

private val caseSensitive = SQLConf.get.caseSensitiveAnalysis

/**
* Parses a single XML string and turns it into either one resulting row or no row (if the
Expand All @@ -78,7 +94,7 @@ class StaxXmlParser(
}

private def getFieldNameToIndex(schema: StructType): Map[String, Int] = {
if (SQLConf.get.caseSensitiveAnalysis) {
if (caseSensitive) {
schema.map(_.name).zipWithIndex.toMap
} else {
CaseInsensitiveMap(schema.map(_.name).zipWithIndex.toMap)
Expand Down Expand Up @@ -194,27 +210,30 @@ class StaxXmlParser(
case (_: EndElement, _: DataType) => null
case (c: Characters, ArrayType(st, _)) =>
// For `ArrayType`, it needs to return the type of element. The values are merged later.
parser.next
convertTo(c.getData, st)
case (c: Characters, st: StructType) =>
// If a value tag is present, this can be an attribute-only element whose values is in that
// value tag field. Or, it can be a mixed-type element with both some character elements
// and other complex structure. Character elements are ignored.
val attributesOnly = st.fields.forall { f =>
f.name == options.valueTag || f.name.startsWith(options.attributePrefix)
}
if (attributesOnly) {
// If everything else is an attribute column, there's no complex structure.
// Just return the value of the character element, or null if we don't have a value tag
st.find(_.name == options.valueTag).map(
valueTag => convertTo(c.getData, valueTag.dataType)).orNull
} else {
// Otherwise, ignore this character element, and continue parsing the following complex
// structure
parser.next
parser.peek match {
case _: EndElement => null // no struct here at all; done
case _ => convertObject(parser, st)
}
parser.next
parser.peek match {
case _: EndElement =>
// It couldn't be an array of value tags
// as the opening tag is immediately followed by a closing tag.
if (isEmptyString(c)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not allow any whitespace values for valueTag.

Suggested change
if (isEmptyString(c)) {
if (!c.isWhiteSpace) {

return null
}
val indexOpt = getFieldNameToIndex(st).get(options.valueTag)
indexOpt match {
case Some(index) =>
convertTo(c.getData, st.fields(index).dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be returning a Row? If yes, please make sure that there is a test scenario covering this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convertTo converts primitive values based on its data type. It will not return a Row

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I get that. It looks like the assumption is that convertField will either return a Row or a singleton valueTag with just value.

case None => null
}
case _ =>
val row = convertObject(parser, st)
Comment on lines +237 to +238
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this handle values separated by comment or cdata? If so, we don't need case _: EndElement above.

<ROW>
  <a> 1 <!--this is a comment--> 2 </a>
</ROW>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up! I added some test cases for comments. We still need this branch asconvertObject cannot handle value tag.

if (!isEmptyString(c)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!isEmptyString(c)) {
if (!c.isWhiteSpace) {

addOrUpdate(row.toSeq(st).toArray, st, options.valueTag, c.getData, addToTail = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why addToTail is false here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because in this case, we encounter the interspersed value first and then the nested objects. We want to make sure that the value tag appears before the nested objects

} else {
row
}
}
case (_: Characters, _: StringType) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is parser.next not required here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to move the next event. currentStructureAsString will move the parser pointer.

convertTo(StaxXmlParserUtils.currentStructureAsString(parser), StringType)
Expand All @@ -230,6 +249,7 @@ class StaxXmlParser(
case _ => convertField(parser, dataType, attributes)
}
case (c: Characters, dt: DataType) =>
parser.next
convertTo(c.getData, dt)
case (e: XMLEvent, dt: DataType) =>
throw new IllegalArgumentException(
Expand All @@ -255,7 +275,12 @@ class StaxXmlParser(
case e: StartElement =>
kvPairs +=
(UTF8String.fromString(StaxXmlParserUtils.getName(e.asStartElement.getName, options)) ->
convertField(parser, valueType))
convertField(parser, valueType))
case c: Characters if !isEmptyString(c) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case c: Characters if !isEmptyString(c) =>
case c: Characters if !c.isWhiteSpace =>

// Create a value tag field for it
kvPairs +=
// TODO: We don't support an array value tags in map yet.
(UTF8String.fromString(options.valueTag) -> convertTo(c.getData, valueType))
case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
case _ => // do nothing
Expand Down Expand Up @@ -336,8 +361,9 @@ class StaxXmlParser(
val row = new Array[Any](schema.length)
val nameToIndex = getFieldNameToIndex(schema)
// If there are attributes, then we process them first.
convertAttributes(rootAttributes, schema).toSeq.foreach { case (f, v) =>
nameToIndex.get(f).foreach { row(_) = v }
convertAttributes(rootAttributes, schema).toSeq.foreach {
case (f, v) =>
nameToIndex.get(f).foreach { row(_) = v }
}

val wildcardColName = options.wildcardColName
Expand Down Expand Up @@ -398,15 +424,11 @@ class StaxXmlParser(
badRecordException = badRecordException.orElse(Some(e))
}

case c: Characters if !c.isWhiteSpace && isRootAttributesOnly =>
nameToIndex.get(options.valueTag) match {
case Some(index) =>
row(index) = convertTo(c.getData, schema(index).dataType)
case None => // do nothing
}
case c: Characters if !isEmptyString(c) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case c: Characters if !isEmptyString(c) =>
case c: Characters if !c.isWhiteSpace =>

addOrUpdate(row, schema, options.valueTag, c.getData)

case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
shouldStop = parseAndCheckEndElement(row, schema, parser)

case _ => // do nothing
}
Expand Down Expand Up @@ -567,6 +589,61 @@ class StaxXmlParser(
castTo(data, FloatType).asInstanceOf[Float]
}
}
private[xml] def isEmptyString(c: Characters): Boolean = {
if (options.ignoreSurroundingSpaces) {
c.getData.trim.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is same as c.isWhiteSpace

} else {
c.isWhiteSpace
}
}

@tailrec
private def parseAndCheckEndElement(
row: Array[Any],
schema: StructType,
parser: XMLEventReader): Boolean = {
parser.peek match {
case _: EndElement | _: EndDocument => true
case _: StartElement => false
case c: Characters if !isEmptyString(c) =>
parser.nextEvent()
addOrUpdate(row, schema, options.valueTag, c.getData)
parseAndCheckEndElement(row, schema, parser)
case _ =>
parser.nextEvent()
parseAndCheckEndElement(row, schema, parser)
}
}

private def addOrUpdate(
row: Array[Any],
schema: StructType,
name: String,
string: String,
addToTail: Boolean = true): InternalRow = {
schema.getFieldIndex(name) match {
case Some(index) =>
schema(index).dataType match {
case ArrayType(elementType, _) =>
val value = convertTo(string, elementType)
val result = if (row(index) == null) {
ArrayBuffer(value)
} else {
val genericArrayData = row(index).asInstanceOf[GenericArrayData]
if (addToTail) {
genericArrayData.toArray(elementType) :+ value
} else {
value +: genericArrayData.toArray(elementType)
}
}
row(index) = new GenericArrayData(result)
case dataType =>
row(index) = convertTo(string, dataType)
}
case None => // do nothing
}
InternalRow.fromSeq(row.toIndexedSeq)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import javax.xml.namespace.QName
import javax.xml.stream.{EventFilter, XMLEventReader, XMLInputFactory, XMLStreamConstants}
import javax.xml.stream.events._

import scala.annotation.tailrec
import scala.jdk.CollectionConverters._

object StaxXmlParserUtils {
Expand Down Expand Up @@ -70,7 +69,6 @@ object StaxXmlParserUtils {
/**
* Checks if current event points the EndElement.
*/
@tailrec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Adding it back

def checkEndElement(parser: XMLEventReader): Boolean = {
parser.peek match {
case _: EndElement | _: EndDocument => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
parser.peek match {
case _: EndElement => NullType
case _: StartElement => inferObject(parser)
case c: Characters if c.isWhiteSpace =>
case c: Characters if isEmptyString(c) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case c: Characters if isEmptyString(c) =>
case c: Characters if c.isWhiteSpace =>

// When `Characters` is found, we need to look further to decide
// if this is really data or space between other elements.
val data = c.getData
Expand All @@ -171,16 +171,18 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
case _: EndElement => StringType
case _ => inferField(parser)
}
case c: Characters if !c.isWhiteSpace =>
// what about new line character
case c: Characters if !isEmptyString(c) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, can't we return inferObject(parser)?
In inferObject(parser), the case for StructType can be updated to "unnest" StructType with just valueTag.
Without this, there is lot of code duplication logic for valueTag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO inferObject can't do this. This branch handles both primitive types and nested objects. If we return inferObject(parser), the primitive types will be inferred as a structFields of valueTag

// This could be the characters of a character-only element, or could have mixed
// characters and other complex structure
val characterType = inferFrom(c.getData)
parser.nextEvent()
parser.peek match {
case _: StartElement =>
// Some more elements follow; so ignore the characters.
// Use the schema of the rest
inferObject(parser).asInstanceOf[StructType]
// Some more elements follow;
// This is a mix of values and other elements
val innerType = inferObject(parser).asInstanceOf[StructType]
addOrUpdateValueTagType(innerType, characterType)
case _ =>
// That's all, just the character-only body; use that as the type
characterType
Expand Down Expand Up @@ -233,6 +235,22 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
}
}

@tailrec
def inferAndCheckEndElement(parser: XMLEventReader): Boolean = {
parser.peek match {
case _: EndElement | _: EndDocument => true
case _: StartElement => false
case c: Characters if !isEmptyString(c) =>
val characterType = inferFrom(c.getData)
parser.nextEvent()
addOrUpdateType(options.valueTag, characterType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test case for this scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A valueTag that locates after a closing tag in the inner element and before the closing tag in the outer element will cover this scenario.

<a>
    value2
    <b>1</b>
    value3
</a>

We covered this case in the most our test cases

inferAndCheckEndElement(parser)
case _ =>
parser.nextEvent()
inferAndCheckEndElement(parser)
}
}

// If there are attributes, then we should process them first.
val rootValuesMap =
StaxXmlParserUtils.convertAttributesToValuesMap(rootAttributes, options)
Expand Down Expand Up @@ -273,27 +291,17 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
val field = StaxXmlParserUtils.getName(e.asStartElement.getName, options)
addOrUpdateType(field, inferredType)

case c: Characters if !c.isWhiteSpace =>
case c: Characters if !isEmptyString(c) =>
// This can be an attribute-only object
val valueTagType = inferFrom(c.getData)
addOrUpdateType(options.valueTag, valueTagType)

case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
shouldStop = inferAndCheckEndElement(parser)

case _ => // do nothing
}
}
// A structure object is an attribute-only element
// if it only consists of attributes and valueTags.
// If not, we will remove the valueTag field from the schema
val attributesOnly = nameToDataType.forall {
case (fieldName, _) =>
fieldName == options.valueTag || fieldName.startsWith(options.attributePrefix)
}
if (!attributesOnly) {
nameToDataType -= options.valueTag
}

// Note: other code relies on this sorting for correctness, so don't remove it!
StructType(nameToDataType.map{
Expand Down Expand Up @@ -505,4 +513,57 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
}
}
}

/**
* This helper function merges the data type of value tags and inner elements.
* It could only be structure data. Consider the following case,
* <a>
* value1
* <b>1</b>
* value2
* </a>
* Input: ''a struct<b int, _VALUE string>'' and ''_VALUE string''
* Return: ''a struct<b int, _VALUE array<string>>''
* @param objectType inner elements' type
* @param valueTagType value tag's type
*/
private[xml] def addOrUpdateValueTagType(
objectType: DataType,
valueTagType: DataType): DataType = {
(objectType, valueTagType) match {
case (st: StructType, _) =>
// TODO(shujing): case sensitive?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while the case for valueTag is unlikely to change, its better to add case sensitivity logic to it to make it consistent with other fields. Can be a separate PR. Not a high prio.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for answering this question! I create a Jira ticket for it

val valueTagIndexOpt = st.getFieldIndex(options.valueTag)

valueTagIndexOpt match {
// If the field name exists in the inner elements,
// merge the type and infer the combined field as an array type if necessary
case Some(index) if !st(index).dataType.isInstanceOf[ArrayType] =>
updateStructField(
st,
index,
ArrayType(compatibleType(st(index).dataType, valueTagType)))
case Some(index) =>
updateStructField(st, index, compatibleType(st(index).dataType, valueTagType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't st(index).dataType will be of ArrayType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this branch handles this case of array type. If it's an array, we will merge the element types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add this test case scenario where Array<LongType> is updated to Array<DoubleType>:

<ROW>
  <a>
    1
    <b>2</b>
    3
    <b>4</b>
    5.0
  </a>
</ROW>

case None =>
st.add(options.valueTag, valueTagType)
}
case _ =>
throw new IllegalStateException(
"illegal state when merging value tags types in schema inference"
)
}
}

private[xml] def isEmptyString(c: Characters): Boolean = c.getData.trim.isEmpty

private def updateStructField(
structType: StructType,
index: Int,
newType: DataType): StructType = {
val newFields: Array[StructField] =
structType.fields.updated(index, structType.fields(index).copy(dataType = newType))
StructType(newFields)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class XmlOptions(
val nullValue = parameters.getOrElse(NULL_VALUE, XmlOptions.DEFAULT_NULL_VALUE)
val columnNameOfCorruptRecord =
parameters.getOrElse(COLUMN_NAME_OF_CORRUPT_RECORD, defaultColumnNameOfCorruptRecord)
val ignoreSurroundingSpaces = getBool(IGNORE_SURROUNDING_SPACES, false)
val ignoreSurroundingSpaces = getBool(IGNORE_SURROUNDING_SPACES, true)
val parseMode = ParseMode.fromString(parameters.getOrElse(MODE, PermissiveMode.name))
val inferSchema = getBool(INFER_SCHEMA, true)
val rowValidationXSDPath = parameters.get(ROW_VALIDATION_XSD_PATH).orNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0"?>
<ROWSET>
<ROW>
value1
<array>
value2
<b>1</b>
value3
</array>
<array>
value4
<b>2</b>
value5
<c>3</c>
value6
</array>
</ROW>
</ROWSET>
Loading