From 6f19db753a4cba36f92bd225eb63b60080158054 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 18 Aug 2017 13:53:00 +0900 Subject: [PATCH 1/3] Modify DataSourceScanExec to use concrete ColumnVector type. --- .../spark/sql/execution/ColumnarBatchScan.scala | 12 ++++++++---- .../spark/sql/execution/DataSourceScanExec.scala | 7 +++++++ .../spark/sql/execution/datasources/FileFormat.scala | 12 ++++++++++++ .../datasources/parquet/ParquetFileFormat.scala | 10 ++++++++++ 4 files changed, 37 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 74a47da2deef2..81aa12cf9b7f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -33,6 +33,8 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val inMemoryTableScan: InMemoryTableScanExec = null + def vectorTypes: Option[Seq[String]] = None + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) @@ -83,13 +85,15 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") - val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) - val columnAssigns = colVars.zipWithIndex.map { case (name, i) => - ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = $batch.column($i);" + val columnVectorClzs = vectorTypes.getOrElse( + Seq.fill(colVars.size)("org.apache.spark.sql.execution.vectorized.ColumnVector")) + val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map { + case ((name, columnVectorClz), i) => + ctx.addMutableState(columnVectorClz, name, s"$name = null;") + s"$name = ($columnVectorClz) $batch.column($i);" } val nextBatch = ctx.freshName("nextBatch") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 588c937a13e45..47b040b3269d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -174,6 +174,13 @@ case class FileSourceScanExec( false } + override def vectorTypes: Option[Seq[String]] = + relation.fileFormat.vectorTypes( + sparkSession = relation.sparkSession, + dataSchema = relation.dataSchema, + partitionSchema = relation.partitionSchema, + requiredSchema = requiredSchema) + @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index dacf462953520..531f253849e56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -64,6 +64,18 @@ trait FileFormat { false } + /** + * Returns concrete column vector class names for each column to be used in a columnar batch + * if this format supports returning columnar batch. + */ + def vectorTypes( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType): Option[Seq[String]] = { + None + } + /** * Returns whether a file with `path` could be splitted or not. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 64eea26a9f98e..eb7fef2021778 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -272,6 +273,15 @@ class ParquetFileFormat schema.forall(_.dataType.isInstanceOf[AtomicType]) } + override def vectorTypes( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType): Option[Seq[String]] = { + Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)( + classOf[OnHeapColumnVector].getName)) + } + override def isSplitable( sparkSession: SparkSession, options: Map[String, String], From 9effea9379313b0aac1f392ca11ce0f678bb1e0c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 22 Aug 2017 12:55:26 +0900 Subject: [PATCH 2/3] Address a comment. --- .../apache/spark/sql/execution/DataSourceScanExec.scala | 7 +++---- .../spark/sql/execution/datasources/FileFormat.scala | 7 +++---- .../execution/datasources/parquet/ParquetFileFormat.scala | 7 +++---- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 47b040b3269d0..c8dc0d889a2cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -176,10 +176,9 @@ case class FileSourceScanExec( override def vectorTypes: Option[Seq[String]] = relation.fileFormat.vectorTypes( - sparkSession = relation.sparkSession, - dataSchema = relation.dataSchema, - partitionSchema = relation.partitionSchema, - requiredSchema = requiredSchema) + sparkSession = Option(relation.sparkSession), + requiredSchema = requiredSchema, + partitionSchema = relation.partitionSchema) @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 531f253849e56..db44f132910cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -69,10 +69,9 @@ trait FileFormat { * if this format supports returning columnar batch. */ def vectorTypes( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType): Option[Seq[String]] = { + sparkSession: Option[SparkSession], + requiredSchema: StructType, + partitionSchema: StructType): Option[Seq[String]] = { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index eb7fef2021778..5071c60a692a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -274,10 +274,9 @@ class ParquetFileFormat } override def vectorTypes( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType): Option[Seq[String]] = { + sparkSession: Option[SparkSession], + requiredSchema: StructType, + partitionSchema: StructType): Option[Seq[String]] = { Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)( classOf[OnHeapColumnVector].getName)) } From fbcf95c3f04cdf3d3437271be5db68eb2f647471 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 29 Aug 2017 18:02:28 +0900 Subject: [PATCH 3/3] Address comments. --- .../org/apache/spark/sql/execution/ColumnarBatchScan.scala | 4 ++-- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 1 - .../apache/spark/sql/execution/datasources/FileFormat.scala | 1 - .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 1 - 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 81aa12cf9b7f3..1afe83ea3539e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -81,7 +81,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val scanTimeTotalNs = ctx.freshName("scanTime") ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") - val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") @@ -89,7 +89,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnVectorClzs = vectorTypes.getOrElse( - Seq.fill(colVars.size)("org.apache.spark.sql.execution.vectorized.ColumnVector")) + Seq.fill(colVars.size)(classOf[ColumnVector].getName)) val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map { case ((name, columnVectorClz), i) => ctx.addMutableState(columnVectorClz, name, s"$name = null;") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index c8dc0d889a2cf..77e6dbf636476 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -176,7 +176,6 @@ case class FileSourceScanExec( override def vectorTypes: Option[Seq[String]] = relation.fileFormat.vectorTypes( - sparkSession = Option(relation.sparkSession), requiredSchema = requiredSchema, partitionSchema = relation.partitionSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index db44f132910cb..e5a7aee64a4f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -69,7 +69,6 @@ trait FileFormat { * if this format supports returning columnar batch. */ def vectorTypes( - sparkSession: Option[SparkSession], requiredSchema: StructType, partitionSchema: StructType): Option[Seq[String]] = { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 5071c60a692a6..e1e740500205a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -274,7 +274,6 @@ class ParquetFileFormat } override def vectorTypes( - sparkSession: Option[SparkSession], requiredSchema: StructType, partitionSchema: StructType): Option[Seq[String]] = { Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(