Skip to content

Commit ca1805b

Browse files
committed
Removes duplicated partition discovery code in new Parquet
1 parent f18dec2 commit ca1805b

File tree

2 files changed

+2
-176
lines changed

2 files changed

+2
-176
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ class SparkHadoopUtil extends Logging {
298298
logDebug(text + " matched " + HADOOP_CONF_PATTERN)
299299
val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
300300
val eval = Option[String](hadoopConf.get(key))
301-
.map { value =>
301+
.map { value =>
302302
logDebug("Substituted " + matched + " with " + value)
303303
text.replace(matched, value)
304304
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 1 addition & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ private[sql] case class ParquetRelation2(
303303

304304
if (partitionDirs.nonEmpty) {
305305
// Parses names and values of partition columns, and infer their data types.
306-
ParquetRelation2.parsePartitions(partitionDirs, defaultPartitionName)
306+
PartitioningUtils.parsePartitions(partitionDirs, defaultPartitionName)
307307
} else {
308308
// No partition directories found, makes an empty specification
309309
PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
@@ -837,178 +837,4 @@ private[sql] object ParquetRelation2 extends Logging {
837837
.filter(_.nullable)
838838
StructType(parquetSchema ++ missingFields)
839839
}
840-
841-
842-
// TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
843-
// However, we are already using Catalyst expressions for partition pruning and predicate
844-
// push-down here...
845-
private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
846-
require(columnNames.size == literals.size)
847-
}
848-
849-
/**
850-
* Given a group of qualified paths, tries to parse them and returns a partition specification.
851-
* For example, given:
852-
* {{{
853-
* hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
854-
* hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
855-
* }}}
856-
* it returns:
857-
* {{{
858-
* PartitionSpec(
859-
* partitionColumns = StructType(
860-
* StructField(name = "a", dataType = IntegerType, nullable = true),
861-
* StructField(name = "b", dataType = StringType, nullable = true),
862-
* StructField(name = "c", dataType = DoubleType, nullable = true)),
863-
* partitions = Seq(
864-
* Partition(
865-
* values = Row(1, "hello", 3.14),
866-
* path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
867-
* Partition(
868-
* values = Row(2, "world", 6.28),
869-
* path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
870-
* }}}
871-
*/
872-
private[parquet] def parsePartitions(
873-
paths: Seq[Path],
874-
defaultPartitionName: String): PartitionSpec = {
875-
val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
876-
val fields = {
877-
val (PartitionValues(columnNames, literals)) = partitionValues.head
878-
columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
879-
StructField(name, dataType, nullable = true)
880-
}
881-
}
882-
883-
val partitions = partitionValues.zip(paths).map {
884-
case (PartitionValues(_, literals), path) =>
885-
Partition(Row(literals.map(_.value): _*), path.toString)
886-
}
887-
888-
PartitionSpec(StructType(fields), partitions)
889-
}
890-
891-
/**
892-
* Parses a single partition, returns column names and values of each partition column. For
893-
* example, given:
894-
* {{{
895-
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
896-
* }}}
897-
* it returns:
898-
* {{{
899-
* PartitionValues(
900-
* Seq("a", "b", "c"),
901-
* Seq(
902-
* Literal.create(42, IntegerType),
903-
* Literal.create("hello", StringType),
904-
* Literal.create(3.14, FloatType)))
905-
* }}}
906-
*/
907-
private[parquet] def parsePartition(
908-
path: Path,
909-
defaultPartitionName: String): PartitionValues = {
910-
val columns = ArrayBuffer.empty[(String, Literal)]
911-
// Old Hadoop versions don't have `Path.isRoot`
912-
var finished = path.getParent == null
913-
var chopped = path
914-
915-
while (!finished) {
916-
val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
917-
maybeColumn.foreach(columns += _)
918-
chopped = chopped.getParent
919-
finished = maybeColumn.isEmpty || chopped.getParent == null
920-
}
921-
922-
val (columnNames, values) = columns.reverse.unzip
923-
PartitionValues(columnNames, values)
924-
}
925-
926-
private def parsePartitionColumn(
927-
columnSpec: String,
928-
defaultPartitionName: String): Option[(String, Literal)] = {
929-
val equalSignIndex = columnSpec.indexOf('=')
930-
if (equalSignIndex == -1) {
931-
None
932-
} else {
933-
val columnName = columnSpec.take(equalSignIndex)
934-
assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
935-
936-
val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
937-
assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
938-
939-
val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
940-
Some(columnName -> literal)
941-
}
942-
}
943-
944-
/**
945-
* Resolves possible type conflicts between partitions by up-casting "lower" types. The up-
946-
* casting order is:
947-
* {{{
948-
* NullType ->
949-
* IntegerType -> LongType ->
950-
* FloatType -> DoubleType -> DecimalType.Unlimited ->
951-
* StringType
952-
* }}}
953-
*/
954-
private[parquet] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
955-
// Column names of all partitions must match
956-
val distinctPartitionsColNames = values.map(_.columnNames).distinct
957-
assert(distinctPartitionsColNames.size == 1, {
958-
val list = distinctPartitionsColNames.mkString("\t", "\n", "")
959-
s"Conflicting partition column names detected:\n$list"
960-
})
961-
962-
// Resolves possible type conflicts for each column
963-
val columnCount = values.head.columnNames.size
964-
val resolvedValues = (0 until columnCount).map { i =>
965-
resolveTypeConflicts(values.map(_.literals(i)))
966-
}
967-
968-
// Fills resolved literals back to each partition
969-
values.zipWithIndex.map { case (d, index) =>
970-
d.copy(literals = resolvedValues.map(_(index)))
971-
}
972-
}
973-
974-
/**
975-
* Converts a string to a `Literal` with automatic type inference. Currently only supports
976-
* [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and
977-
* [[StringType]].
978-
*/
979-
private[parquet] def inferPartitionColumnValue(
980-
raw: String,
981-
defaultPartitionName: String): Literal = {
982-
// First tries integral types
983-
Try(Literal.create(Integer.parseInt(raw), IntegerType))
984-
.orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
985-
// Then falls back to fractional types
986-
.orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType)))
987-
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
988-
.orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
989-
// Then falls back to string
990-
.getOrElse {
991-
if (raw == defaultPartitionName) Literal.create(null, NullType)
992-
else Literal.create(raw, StringType)
993-
}
994-
}
995-
996-
private val upCastingOrder: Seq[DataType] =
997-
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType)
998-
999-
/**
1000-
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
1001-
* types.
1002-
*/
1003-
private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
1004-
val desiredType = {
1005-
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
1006-
// Falls back to string if all values of this column are null or empty string
1007-
if (topType == NullType) StringType else topType
1008-
}
1009-
1010-
literals.map { case l @ Literal(_, dataType) =>
1011-
Literal.create(Cast(l, desiredType).eval(), desiredType)
1012-
}
1013-
}
1014840
}

0 commit comments

Comments
 (0)