Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ To run the SparkR unit tests on Windows, the following steps are required —ass

```
R -e "install.packages('testthat', repos='http://cran.us.r-project.org')"
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R
```

2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ build_script:
- cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package

test_script:
- cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
- cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R

notifications:
- provider: Email
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
val taskAttemptId = new TaskAttemptID(taskId, 0)

// Set up the configuration object
jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
jobContext.getConfiguration.setInt("mapred.task.partition", 0)
jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)

val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object SparkHadoopMapReduceWriter extends Logging {
val committer = FileCommitProtocol.instantiate(
className = classOf[HadoopMapReduceCommitProtocol].getName,
jobId = stageId.toString,
outputPath = conf.value.get("mapred.output.dir"),
outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,11 @@ private[spark] object HadoopRDD extends Logging {
val jobID = new JobID(jobTrackerId, jobId)
val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId)

conf.set("mapred.tip.id", taId.getTaskID.toString)
conf.set("mapred.task.id", taId.toString)
conf.setBoolean("mapred.task.is.map", true)
conf.setInt("mapred.task.partition", splitId)
conf.set("mapred.job.id", jobID.toString)
conf.set("mapreduce.task.id", taId.getTaskID.toString)
conf.set("mapreduce.task.attempt.id", taId.toString)
conf.setBoolean("mapreduce.task.ismap", true)
conf.setInt("mapreduce.task.partition", splitId)
conf.set("mapreduce.job.id", jobID.toString)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapred.output.dir", path)
jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}

Expand Down Expand Up @@ -1039,10 +1039,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
CompressionType.BLOCK.toString)
}

// Use configured output committer if already set
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
job.set("mapred.output.dir", tempDir.getPath + "/outputDataset_old")
job.set("mapreduce.output.fileoutputformat.outputdir", tempDir.getPath + "/outputDataset_old")
randomRDD.saveAsHadoopDataset(job)
assert(new File(tempDir.getPath + "/outputDataset_old/part-00000").exists() === true)
}
Expand All @@ -517,7 +517,8 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
val jobConfig = job.getConfiguration
jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
jobConfig.set("mapreduce.output.fileoutputformat.outputdir",
tempDir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}
Expand Down
4 changes: 2 additions & 2 deletions docs/hardware-provisioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ possible**. We recommend the following:
* If at all possible, run Spark on the same nodes as HDFS. The simplest way is to set up a Spark
[standalone mode cluster](spark-standalone.html) on the same nodes, and configure Spark and
Hadoop's memory and CPU usage to avoid interference (for Hadoop, the relevant options are
`mapred.child.java.opts` for the per-task memory and `mapred.tasktracker.map.tasks.maximum`
and `mapred.tasktracker.reduce.tasks.maximum` for number of tasks). Alternatively, you can run
`mapred.child.java.opts` for the per-task memory and `mapreduce.tasktracker.map.tasks.maximum`
and `mapreduce.tasktracker.reduce.tasks.maximum` for number of tasks). Alternatively, you can run
Hadoop and Spark on a common cluster manager like [Mesos](running-on-mesos.html) or
[Hadoop YARN](running-on-yarn.html).

Expand Down
47 changes: 24 additions & 23 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ def test_oldhadoop(self):
self.assertEqual(ints, ei)

hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
oldconf = {"mapred.input.dir": hellopath}
oldconf = {"mapreduce.input.fileinputformat.inputdir": hellopath}
hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
Expand All @@ -1366,7 +1366,7 @@ def test_newhadoop(self):
self.assertEqual(ints, ei)

hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
newconf = {"mapred.input.dir": hellopath}
newconf = {"mapreduce.input.fileinputformat.inputdir": hellopath}
hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
Expand Down Expand Up @@ -1515,12 +1515,12 @@ def test_oldhadoop(self):

conf = {
"mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this key was supposed to be before; maybe mapreduce.outputformat.class? but it can be mapreduce.job.outputformat.class now?

@wangyum wangyum Feb 27, 2017

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen
mapred.output.format.class map to old API and mapreduce.job.outputformat.class map to new API. see:
https://github.com/wangyum/spark/blob/97734c5af3df4e6525e8015459af16ab193dfc24/python/pyspark/tests.py#L1664-L1679

"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.hadoop.io.MapWritable",
"mapred.output.dir": basepath + "/olddataset/"
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.MapWritable",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/olddataset/"
}
self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
input_conf = {"mapred.input.dir": basepath + "/olddataset/"}
input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/olddataset/"}
result = self.sc.hadoopRDD(
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
Expand All @@ -1547,14 +1547,14 @@ def test_newhadoop(self):
self.assertEqual(result, data)

conf = {
"mapreduce.outputformat.class":
"mapreduce.job.outputformat.class":
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.hadoop.io.Text",
"mapred.output.dir": basepath + "/newdataset/"
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Text",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/newdataset/"
}
self.sc.parallelize(data).saveAsNewAPIHadoopDataset(conf)
input_conf = {"mapred.input.dir": basepath + "/newdataset/"}
input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/newdataset/"}
new_dataset = sorted(self.sc.newAPIHadoopRDD(
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
Expand Down Expand Up @@ -1584,16 +1584,16 @@ def test_newhadoop_with_array(self):
self.assertEqual(result, array_data)

conf = {
"mapreduce.outputformat.class":
"mapreduce.job.outputformat.class":
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable",
"mapred.output.dir": basepath + "/newdataset/"
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/newdataset/"
}
self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(
conf,
valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
input_conf = {"mapred.input.dir": basepath + "/newdataset/"}
input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/newdataset/"}
new_dataset = sorted(self.sc.newAPIHadoopRDD(
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
Expand Down Expand Up @@ -1663,18 +1663,19 @@ def test_reserialization(self):

conf4 = {
"mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.dir": basepath + "/reserialize/dataset"}
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/reserialize/dataset"}
rdd.saveAsHadoopDataset(conf4)
result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
self.assertEqual(result4, data)

conf5 = {"mapreduce.outputformat.class":
conf5 = {"mapreduce.job.outputformat.class":
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.dir": basepath + "/reserialize/newdataset"}
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/reserialize/newdataset"
}
rdd.saveAsNewAPIHadoopDataset(conf5)
result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
self.assertEqual(result5, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ case class LoadDataCommand(
} else {
// Follow Hive's behavior:
// If no schema or authority is provided with non-local inpath,
// we will use hadoop configuration "fs.default.name".
val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.default.name")
// we will use hadoop configuration "fs.defaultFS".
val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
val defaultFS = if (defaultFSConf == null) {
new URI("")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ object FileFormatWriter extends Logging {
val taskAttemptContext: TaskAttemptContext = {
// Set up the configuration object
val hadoopConf = description.serializableHadoopConf.value
hadoopConf.set("mapred.job.id", jobId.toString)
hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
hadoopConf.set("mapred.task.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapred.task.is.map", true)
hadoopConf.setInt("mapred.task.partition", 0)
hadoopConf.set("mapreduce.job.id", jobId.toString)
hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapreduce.task.ismap", true)
hadoopConf.setInt("mapreduce.task.partition", 0)

new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte
// This is used to generate golden files.
sql("set hive.plan.serialization.format=kryo")
// Explicitly set fs to local fs.
sql(s"set fs.default.name=file://$testTempDir/")
sql(s"set fs.defaultFS=file://$testTempDir/")
// Ask Hive to run jobs in-process as a single map and reduce task.
sql("set mapred.job.tracker=local")
sql("set mapreduce.jobtracker.address=local")
}

override def afterAll() {
Expand Down Expand Up @@ -764,9 +764,9 @@ class HiveWindowFunctionQueryFileSuite
// This is used to generate golden files.
// sql("set hive.plan.serialization.format=kryo")
// Explicitly set fs to local fs.
// sql(s"set fs.default.name=file://$testTempDir/")
// sql(s"set fs.defaultFS=file://$testTempDir/")
// Ask Hive to run jobs in-process as a single map and reduce task.
// sql("set mapred.job.tracker=local")
// sql("set mapreduce.jobtracker.address=local")
}

override def afterAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ class HadoopTableReader(
hadoopConf: Configuration)
extends TableReader with Logging {

// Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
// https://hadoop.apache.org/docs/r1.0.4/mapred-default.html
// Hadoop honors "mapreduce.job.maps" as hint,
// but will ignore when mapreduce.jobtracker.address is "local".
// https://hadoop.apache.org/docs/r2.6.5/hadoop-mapreduce-client/hadoop-mapreduce-client-core/
// mapred-default.xml
//
// In order keep consistency with Hive, we will let it be 0 in local mode also.
private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(hadoopConf.getInt("mapred.map.tasks", 1),
math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
sparkSession.sparkContext.defaultMinPartitions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,16 @@ case class InsertIntoHiveTable(
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean

if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
hadoopConf.set("mapred.output.compress", "true")
// Please note that isCompressed, "mapreduce.output.fileoutputformat.compress",
// "mapreduce.output.fileoutputformat.compress.codec", and
// "mapreduce.output.fileoutputformat.compress.type"
// have no impact on ORC because it uses table properties to store compression information.
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
fileSinkConf.setCompressed(true)
fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type"))
fileSinkConf.setCompressCodec(hadoopConf
.get("mapreduce.output.fileoutputformat.compress.codec"))
fileSinkConf.setCompressType(hadoopConf
.get("mapreduce.output.fileoutputformat.compress.type"))
}

val numDynamicPartitions = partition.values.count(_.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ private[hive] class TestHiveSparkSession(
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }

// Some tests corrupt this value on purpose, which breaks the RESET call below.
sessionState.conf.setConfString("fs.default.name", new File(".").toURI.toString)
sessionState.conf.setConfString("fs.defaultFS", new File(".").toURI.toString)
// It is important that we RESET first as broken hooks that might have been set could break
// other sql exec here.
sessionState.metadataHive.runSqlHive("RESET")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set hive.auto.convert.join = true;

CREATE TABLE dest1(c1 INT, c2 STRING) STORED AS TEXTFILE;

set mapred.job.tracker=localhost:58;
set mapreduce.jobtracker.address=localhost:58;
set hive.exec.mode.local.auto=true;

explain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set hive.enforce.sorting = true;
set hive.exec.reducers.max = 1;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = true;
set mapred.reduce.tasks = 2;
set mapreduce.job.reduces = 2;

-- Tests that when a multi insert inserts into a bucketed table and a table which is not bucketed
-- the bucketed table is not merged and the table which is not bucketed is
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
set hive.enforce.bucketing = true;
set hive.exec.mode.local.auto=false;
set mapred.reduce.tasks = 10;
set mapreduce.job.reduces = 10;

-- This test sets number of mapred tasks to 10 for a database with 50 buckets,
-- and uses a post-hook to confirm that 10 tasks were created
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set mapred.min.split.size = 64;
set mapreduce.input.fileinputformat.split.minsize = 64;

CREATE TABLE T1(name STRING) STORED AS TEXTFILE;

Expand Down
Loading