@@ -42,7 +42,7 @@ import parquet.schema.MessageType
4242
4343import org .apache .spark .rdd .RDD
4444import org .apache .spark .sql .SQLContext
45- import org .apache .spark .sql .catalyst .expressions .{ Attribute , Expression , Row }
45+ import org .apache .spark .sql .catalyst .expressions ._
4646import org .apache .spark .sql .execution .{LeafNode , SparkPlan , UnaryNode }
4747import org .apache .spark .{Logging , SerializableWritable , TaskContext }
4848
@@ -59,28 +59,38 @@ case class ParquetTableScan(
5959 // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
6060 // by exprId. note: output cannot be transient, see
6161 // https://issues.apache.org/jira/browse/SPARK-1367
62- val output = attributes.map { a =>
63- relation.output
64- .find(o => o.exprId == a.exprId)
65- .getOrElse(sys.error(s " Invalid parquet attribute $a in ${relation.output.mkString(" ," )}" ))
66- }
62+ val normalOutput =
63+ attributes
64+ .filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
65+ .flatMap(a => relation.output.find(o => o.exprId == a.exprId))
66+
67+ val partOutput =
68+ attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))
69+
70+ def output = partOutput ++ normalOutput
71+
72+ assert(normalOutput.size + partOutput.size == attributes.size,
73+ s " $normalOutput + $partOutput != $attributes, ${relation.output}" )
6774
6875 override def execute (): RDD [Row ] = {
6976 val sc = sqlContext.sparkContext
7077 val job = new Job (sc.hadoopConfiguration)
7178 ParquetInputFormat .setReadSupportClass(job, classOf [RowReadSupport ])
7279
7380 val conf : Configuration = ContextUtil .getConfiguration(job)
74- val qualifiedPath = {
75- val path = new Path (relation.path)
76- path.getFileSystem(conf).makeQualified(path)
81+
82+ relation.path.split(" ," ).foreach { curPath =>
83+ val qualifiedPath = {
84+ val path = new Path (curPath)
85+ path.getFileSystem(conf).makeQualified(path)
86+ }
87+ NewFileInputFormat .addInputPath(job, qualifiedPath)
7788 }
78- NewFileInputFormat .addInputPath(job, qualifiedPath)
7989
8090 // Store both requested and original schema in `Configuration`
8191 conf.set(
8292 RowReadSupport .SPARK_ROW_REQUESTED_SCHEMA ,
83- ParquetTypesConverter .convertToString(output ))
93+ ParquetTypesConverter .convertToString(normalOutput ))
8494 conf.set(
8595 RowWriteSupport .SPARK_ROW_SCHEMA ,
8696 ParquetTypesConverter .convertToString(relation.output))
@@ -96,13 +106,41 @@ case class ParquetTableScan(
96106 ParquetFilters .serializeFilterExpressions(columnPruningPred, conf)
97107 }
98108
99- sc.newAPIHadoopRDD(
100- conf,
101- classOf [FilteringParquetRowInputFormat ],
102- classOf [Void ],
103- classOf [Row ])
104- .map(_._2)
105- .filter(_ != null ) // Parquet's record filters may produce null values
109+ val baseRDD =
110+ new org.apache.spark.rdd.NewHadoopRDD (
111+ sc,
112+ classOf [FilteringParquetRowInputFormat ],
113+ classOf [Void ],
114+ classOf [Row ],
115+ conf)
116+
117+ if (partOutput.nonEmpty) {
118+ baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
119+ val partValue = " ([^=]+)=([^=]+)" .r
120+ val partValues =
121+ split.asInstanceOf [parquet.hadoop.ParquetInputSplit ]
122+ .getPath
123+ .toString
124+ .split(" /" )
125+ .flatMap {
126+ case partValue(key, value) => Some (key -> value)
127+ case _ => None
128+ }.toMap
129+
130+ val partitionRowValues =
131+ partOutput.map(a => Cast (Literal (partValues(a.name)), a.dataType).eval(EmptyRow ))
132+
133+ new Iterator [Row ] {
134+ private [this ] val joinedRow = new JoinedRow (Row (partitionRowValues:_* ), null )
135+
136+ def hasNext = iter.hasNext
137+
138+ def next () = joinedRow.withRight(iter.next()._2)
139+ }
140+ }
141+ } else {
142+ baseRDD.map(_._2)
143+ }.filter(_ != null ) // Parquet's record filters may produce null values
106144 }
107145
108146 /**
0 commit comments