Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.csv._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -245,8 +245,7 @@ case class StructsToCsv(
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes
with NullIntolerant {
extends UnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes with NullIntolerant {
override def nullable: Boolean = true

def this(options: Map[String, String], child: Expression) = this(options, child, None)
Expand Down Expand Up @@ -293,4 +292,10 @@ case class StructsToCsv(

override protected def withNewChildInternal(newChild: Expression): StructsToCsv =
copy(child = newChild)

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val structsToCsv = ctx.addReferenceObj("structsToCsv", this)
nullSafeCodeGen(ctx, ev,
eval => s"${ev.value} = (UTF8String) $structsToCsv.converter().apply($eval);")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,11 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P
CsvToStructs(schema, Map.empty, Literal.create("1 day")),
InternalRow(new CalendarInterval(0, 1, 0)))
}

test("StructsToCsv should not generate codes beyond 64KB") {
val range = Range.inclusive(1, 5000)
val struct = CreateStruct.create(range.map(Literal.apply))
val expected = range.mkString(",")
checkEvaluation(StructsToCsv(Map.empty, struct), expected)
}
}
82 changes: 41 additions & 41 deletions sql/core/benchmarks/CSVBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,69 @@
Benchmark to measure CSV read/write performance
================================================================================================

OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure
OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1040-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Parsing quoted values: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
One quoted string 38218 38618 520 0.0 764362.7 1.0X
One quoted string 43871 44151 336 0.0 877415.7 1.0X

OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure
OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1040-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Wide rows with 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Select 1000 columns 97679 98487 1143 0.0 97678.6 1.0X
Select 100 columns 39193 39339 193 0.0 39193.1 2.5X
Select one column 32781 33041 265 0.0 32780.7 3.0X
count() 7154 7228 86 0.1 7153.5 13.7X
Select 100 columns, one bad input field 53968 54158 165 0.0 53967.9 1.8X
Select 100 columns, corrupt record field 59730 60100 484 0.0 59730.2 1.6X
Select 1000 columns 115001 115810 1382 0.0 115001.2 1.0X
Select 100 columns 45575 45646 84 0.0 45575.5 2.5X
Select one column 38701 38744 67 0.0 38700.7 3.0X
count() 8544 8556 12 0.1 8544.0 13.5X
Select 100 columns, one bad input field 67789 67841 79 0.0 67788.5 1.7X
Select 100 columns, corrupt record field 74026 74050 26 0.0 74026.4 1.6X

OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure
OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1040-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Count a dataset with 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Select 10 columns + count() 15305 15627 282 0.7 1530.5 1.0X
Select 1 column + count() 13688 13777 106 0.7 1368.8 1.1X
count() 3189 3214 39 3.1 318.9 4.8X
Select 10 columns + count() 16855 16980 179 0.6 1685.5 1.0X
Select 1 column + count() 11053 11075 29 0.9 1105.3 1.5X
count() 3646 3664 17 2.7 364.6 4.6X

OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure
OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1040-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Create a dataset of timestamps 1630 1641 9 6.1 163.0 1.0X
to_csv(timestamp) 11606 11665 76 0.9 1160.6 0.1X
write timestamps to files 10636 10742 121 0.9 1063.6 0.2X
Create a dataset of dates 1854 1879 25 5.4 185.4 0.9X
to_csv(date) 7522 7563 37 1.3 752.2 0.2X
write dates to files 6435 6526 85 1.6 643.5 0.3X
Create a dataset of timestamps 1864 1904 35 5.4 186.4 1.0X
to_csv(timestamp) 12050 12258 279 0.8 1205.0 0.2X
write timestamps to files 12564 12586 22 0.8 1256.4 0.1X
Create a dataset of dates 2093 2106 20 4.8 209.3 0.9X
to_csv(date) 7216 7236 33 1.4 721.6 0.3X
write dates to files 7300 7382 71 1.4 730.0 0.3X

OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure
OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1040-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------
read timestamp text from files 2245 2310 57 4.5 224.5 1.0X
read timestamps from files 27283 27875 513 0.4 2728.3 0.1X
infer timestamps from files 55465 56311 859 0.2 5546.5 0.0X
read date text from files 2054 2088 38 4.9 205.4 1.1X
read date from files 15957 16190 202 0.6 1595.7 0.1X
infer date from files 33163 33319 135 0.3 3316.3 0.1X
timestamp strings 2518 2594 71 4.0 251.8 0.9X
parse timestamps from Dataset[String] 30168 30266 87 0.3 3016.8 0.1X
infer timestamps from Dataset[String] 58608 59332 728 0.2 5860.8 0.0X
date strings 2803 2847 44 3.6 280.3 0.8X
parse dates from Dataset[String] 17613 17877 421 0.6 1761.3 0.1X
from_csv(timestamp) 27736 28241 482 0.4 2773.6 0.1X
from_csv(date) 16415 16816 367 0.6 1641.5 0.1X
infer error timestamps from Dataset[String] with default format 18335 18494 138 0.5 1833.5 0.1X
infer error timestamps from Dataset[String] with user-provided format 18327 18598 422 0.5 1832.7 0.1X
infer error timestamps from Dataset[String] with legacy format 18713 18907 267 0.5 1871.3 0.1X
read timestamp text from files 2432 2458 40 4.1 243.2 1.0X
read timestamps from files 31897 31950 79 0.3 3189.7 0.1X
infer timestamps from files 65093 65196 90 0.2 6509.3 0.0X
read date text from files 2201 2211 15 4.5 220.1 1.1X
read date from files 16138 18869 NaN 0.6 1613.8 0.2X
infer date from files 33633 33742 126 0.3 3363.3 0.1X
timestamp strings 2909 2930 34 3.4 290.9 0.8X
parse timestamps from Dataset[String] 34951 34984 39 0.3 3495.1 0.1X
infer timestamps from Dataset[String] 68347 68448 92 0.1 6834.7 0.0X
date strings 3234 3256 24 3.1 323.4 0.8X
parse dates from Dataset[String] 18591 18657 96 0.5 1859.1 0.1X
from_csv(timestamp) 32386 32476 78 0.3 3238.6 0.1X
from_csv(date) 17333 17402 67 0.6 1733.3 0.1X
infer error timestamps from Dataset[String] with default format 21486 21565 68 0.5 2148.6 0.1X
infer error timestamps from Dataset[String] with user-provided format 21683 21697 16 0.5 2168.3 0.1X
infer error timestamps from Dataset[String] with legacy format 21327 21379 85 0.5 2132.7 0.1X

OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure
OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1040-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
w/o filters 19420 19520 87 0.0 194201.0 1.0X
pushdown disabled 19196 19507 409 0.0 191958.0 1.0X
w/ filters 1380 1402 19 0.1 13796.9 14.1X
w/o filters 22031 22075 46 0.0 220305.7 1.0X
pushdown disabled 21935 21958 21 0.0 219353.1 1.0X
w/ filters 1466 1481 15 0.1 14662.5 15.0X


Loading