-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-12871][SQL] Support to specify the option for compression codec. #10805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
06774ad
5e1611d
d154f02
5b57fc2
e7ebddd
f4ffbf6
adb9eb2
432da5d
4388fe5
6400b76
56316a8
c9217be
0245eea
cd9f742
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,10 @@ package org.apache.spark.sql.execution.datasources.csv | |
|
|
||
| import java.nio.charset.Charset | ||
|
|
||
| import org.apache.hadoop.io.compress._ | ||
|
|
||
| import org.apache.spark.Logging | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging { | ||
|
|
||
|
|
@@ -35,7 +38,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] | |
|
|
||
| private def getBool(paramName: String, default: Boolean = false): Boolean = { | ||
| val param = parameters.getOrElse(paramName, default.toString) | ||
| if (param.toLowerCase() == "true") { | ||
| if (param.toLowerCase == "true") { | ||
| true | ||
| } else if (param.toLowerCase == "false") { | ||
| false | ||
|
|
@@ -73,6 +76,14 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] | |
|
|
||
| val nullValue = parameters.getOrElse("nullValue", "") | ||
|
|
||
| val compressionCodecName = | ||
| parameters.getOrElse("compression", parameters.getOrElse("codec", null)) | ||
| val compressionCodec = if (compressionCodecName != null) { | ||
| CSVCompressionCodecs.getCodecClassName(compressionCodecName) | ||
| } else { | ||
| null | ||
| } | ||
|
|
||
| val maxColumns = 20480 | ||
|
|
||
| val maxCharsPerColumn = 100000 | ||
|
|
@@ -85,7 +96,6 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] | |
| } | ||
|
|
||
| private[csv] object ParseModes { | ||
|
|
||
| val PERMISSIVE_MODE = "PERMISSIVE" | ||
| val DROP_MALFORMED_MODE = "DROPMALFORMED" | ||
| val FAIL_FAST_MODE = "FAILFAST" | ||
|
|
@@ -107,3 +117,28 @@ private[csv] object ParseModes { | |
| true // We default to permissive is the mode string is not valid | ||
| } | ||
| } | ||
|
|
||
| private[csv] object CSVCompressionCodecs { | ||
| val shortCompressionCodecNames = Map( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private? |
||
| "bzip2" -> classOf[BZip2Codec].getName, | ||
| "gzip" -> classOf[GzipCodec].getName, | ||
| "lz4" -> classOf[Lz4Codec].getName, | ||
| "snappy" -> classOf[SnappyCodec].getName) | ||
|
|
||
| /** | ||
| * Return the full version of the given codec class. | ||
| * If it is already a class name, just return it. | ||
| */ | ||
| def getCodecClassName(name: String): String = { | ||
| val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) | ||
| val codecClassName = try { | ||
| // Validate the codec name | ||
| Utils.classForName(codecName) | ||
| Some(codecName) | ||
| } catch { | ||
| case e: ClassNotFoundException => None | ||
| } | ||
| codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should just put this throw inside the catch block and not bother with the Option stuff |
||
| s"is not available. Available codecs are ${shortCompressionCodecNames.keys.mkString(",")}.")) | ||
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal | |
| import com.google.common.base.Objects | ||
| import org.apache.hadoop.fs.{FileStatus, Path} | ||
| import org.apache.hadoop.io.{LongWritable, NullWritable, Text} | ||
| import org.apache.hadoop.io.SequenceFile.CompressionType | ||
| import org.apache.hadoop.mapred.TextInputFormat | ||
| import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} | ||
| import org.apache.hadoop.mapreduce.RecordWriter | ||
|
|
@@ -99,6 +100,15 @@ private[csv] class CSVRelation( | |
| } | ||
|
|
||
| override def prepareJobForWrite(job: Job): OutputWriterFactory = { | ||
| val conf = job.getConfiguration | ||
| Option(params.compressionCodec).foreach { codec => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as suggested above, I'd make "compressionCodec" itself an Option, rather than nullable. |
||
| conf.set("mapreduce.output.fileoutputformat.compress", "true") | ||
| conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) | ||
| conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) | ||
| conf.set("mapreduce.map.output.compress", "true") | ||
| conf.set("mapreduce.map.output.compress.codec", codec) | ||
| } | ||
|
|
||
| new CSVOutputWriterFactory(params) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't expose this variable. e.g. you can do