-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-12871][SQL] Support to specify the option for compression codec. #10805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
06774ad
5e1611d
d154f02
5b57fc2
e7ebddd
f4ffbf6
adb9eb2
432da5d
4388fe5
6400b76
56316a8
c9217be
0245eea
cd9f742
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv | |
|
|
||
| import java.nio.charset.Charset | ||
|
|
||
| import org.apache.hadoop.io.compress._ | ||
|
|
||
| import org.apache.spark.Logging | ||
|
|
||
| private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging { | ||
|
|
@@ -35,7 +37,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] | |
|
|
||
| private def getBool(paramName: String, default: Boolean = false): Boolean = { | ||
| val param = parameters.getOrElse(paramName, default.toString) | ||
| if (param.toLowerCase() == "true") { | ||
| if (param.toLowerCase == "true") { | ||
| true | ||
| } else if (param.toLowerCase == "false") { | ||
| false | ||
|
|
@@ -44,6 +46,13 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] | |
| } | ||
| } | ||
|
|
||
| // Available compression codec list | ||
| val shortCompressionCodecNames = Map( | ||
| "bzip2" -> classOf[BZip2Codec].getName, | ||
| "gzip" -> classOf[GzipCodec].getName, | ||
| "lz4" -> classOf[Lz4Codec].getName, | ||
| "snappy" -> classOf[SnappyCodec].getName) | ||
|
|
||
| val delimiter = CSVTypeCast.toChar( | ||
| parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) | ||
| val parseMode = parameters.getOrElse("mode", "PERMISSIVE") | ||
|
|
@@ -73,6 +82,12 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] | |
|
|
||
| val nullValue = parameters.getOrElse("nullValue", "") | ||
|
|
||
| val compressionCodec: String = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we should do some data validation here, i.e. throwing exceptions if class not found. |
||
| val maybeCodecName = | ||
| Option(parameters.getOrElse("compression", parameters.getOrElse("codec", null))) | ||
| maybeCodecName.map(_.toLowerCase).map(shortCompressionCodecNames).orNull | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this logic is becoming confusing with so many level of nesting. we should rewrite it (even with more loc) to make it more readable. |
||
| } | ||
|
|
||
| val maxColumns = 20480 | ||
|
|
||
| val maxCharsPerColumn = 100000 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal | |
| import com.google.common.base.Objects | ||
| import org.apache.hadoop.fs.{FileStatus, Path} | ||
| import org.apache.hadoop.io.{LongWritable, NullWritable, Text} | ||
| import org.apache.hadoop.io.SequenceFile.CompressionType | ||
| import org.apache.hadoop.mapred.TextInputFormat | ||
| import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} | ||
| import org.apache.hadoop.mapreduce.RecordWriter | ||
|
|
@@ -99,6 +100,15 @@ private[csv] class CSVRelation( | |
| } | ||
|
|
||
| override def prepareJobForWrite(job: Job): OutputWriterFactory = { | ||
| val conf = job.getConfiguration | ||
| Option(params.compressionCodec).foreach { codec => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as suggested above, I'd make "compressionCodec" itself an Option, rather than nullable. |
||
| conf.set("mapreduce.output.fileoutputformat.compress", "true") | ||
| conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) | ||
| conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) | ||
| conf.set("mapreduce.map.output.compress", "true") | ||
| conf.set("mapreduce.map.output.compress.codec", codec) | ||
| } | ||
|
|
||
| new CSVOutputWriterFactory(params) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should go into the object rather than in the case class