Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ To run the SparkR unit tests on Windows, the following steps are required —ass

```
R -e "install.packages('testthat', repos='http://cran.us.r-project.org')"
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R
```

2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ build_script:
- cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package

test_script:
- cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
- cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R

notifications:
- provider: Email
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
val taskAttemptId = new TaskAttemptID(taskId, 0)

// Set up the configuration object
jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
jobContext.getConfiguration.setInt("mapred.task.partition", 0)
jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)

val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object SparkHadoopMapReduceWriter extends Logging {
val committer = FileCommitProtocol.instantiate(
className = classOf[HadoopMapReduceCommitProtocol].getName,
jobId = stageId.toString,
outputPath = conf.value.get("mapred.output.dir"),
outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,11 @@ private[spark] object HadoopRDD extends Logging {
val jobID = new JobID(jobTrackerId, jobId)
val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId)

conf.set("mapred.tip.id", taId.getTaskID.toString)
conf.set("mapred.task.id", taId.toString)
conf.setBoolean("mapred.task.is.map", true)
conf.setInt("mapred.task.partition", splitId)
conf.set("mapred.job.id", jobID.toString)
conf.set("mapreduce.task.id", taId.getTaskID.toString)
conf.set("mapreduce.task.attempt.id", taId.toString)
conf.setBoolean("mapreduce.task.ismap", true)
conf.setInt("mapreduce.task.partition", splitId)
conf.set("mapreduce.job.id", jobID.toString)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapred.output.dir", path)
jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}

Expand Down Expand Up @@ -1039,10 +1039,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
CompressionType.BLOCK.toString)
}

// Use configured output committer if already set
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
job.set("mapred.output.dir", tempDir.getPath + "/outputDataset_old")
job.set("mapreduce.output.fileoutputformat.outputdir", tempDir.getPath + "/outputDataset_old")
randomRDD.saveAsHadoopDataset(job)
assert(new File(tempDir.getPath + "/outputDataset_old/part-00000").exists() === true)
}
Expand All @@ -517,7 +517,8 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
val jobConfig = job.getConfiguration
jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
jobConfig.set("mapreduce.output.fileoutputformat.outputdir",
tempDir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}
Expand Down
4 changes: 2 additions & 2 deletions docs/hardware-provisioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ possible**. We recommend the following:
* If at all possible, run Spark on the same nodes as HDFS. The simplest way is to set up a Spark
[standalone mode cluster](spark-standalone.html) on the same nodes, and configure Spark and
Hadoop's memory and CPU usage to avoid interference (for Hadoop, the relevant options are
`mapred.child.java.opts` for the per-task memory and `mapred.tasktracker.map.tasks.maximum`
and `mapred.tasktracker.reduce.tasks.maximum` for number of tasks). Alternatively, you can run
`mapred.child.java.opts` for the per-task memory and `mapreduce.tasktracker.map.tasks.maximum`
and `mapreduce.tasktracker.reduce.tasks.maximum` for number of tasks). Alternatively, you can run
Hadoop and Spark on a common cluster manager like [Mesos](running-on-mesos.html) or
[Hadoop YARN](running-on-yarn.html).

Expand Down
47 changes: 24 additions & 23 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ def test_oldhadoop(self):
self.assertEqual(ints, ei)

hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
oldconf = {"mapred.input.dir": hellopath}
oldconf = {"mapreduce.input.fileinputformat.inputdir": hellopath}
hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
Expand All @@ -1366,7 +1366,7 @@ def test_newhadoop(self):
self.assertEqual(ints, ei)

hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
newconf = {"mapred.input.dir": hellopath}
newconf = {"mapreduce.input.fileinputformat.inputdir": hellopath}
hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
Expand Down Expand Up @@ -1515,12 +1515,12 @@ def test_oldhadoop(self):

conf = {
"mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this key was supposed to be before; maybe mapreduce.outputformat.class? but it can be mapreduce.job.outputformat.class now?

Copy link
Member Author

@wangyum wangyum Feb 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen
mapred.output.format.class map to old API and mapreduce.job.outputformat.class map to new API. see:
https://github.com/wangyum/spark/blob/97734c5af3df4e6525e8015459af16ab193dfc24/python/pyspark/tests.py#L1664-L1679

"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.hadoop.io.MapWritable",
"mapred.output.dir": basepath + "/olddataset/"
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.MapWritable",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/olddataset/"
}
self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
input_conf = {"mapred.input.dir": basepath + "/olddataset/"}
input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/olddataset/"}
result = self.sc.hadoopRDD(
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
Expand All @@ -1547,14 +1547,14 @@ def test_newhadoop(self):
self.assertEqual(result, data)

conf = {
"mapreduce.outputformat.class":
"mapreduce.job.outputformat.class":
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.hadoop.io.Text",
"mapred.output.dir": basepath + "/newdataset/"
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Text",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/newdataset/"
}
self.sc.parallelize(data).saveAsNewAPIHadoopDataset(conf)
input_conf = {"mapred.input.dir": basepath + "/newdataset/"}
input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/newdataset/"}
new_dataset = sorted(self.sc.newAPIHadoopRDD(
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
Expand Down Expand Up @@ -1584,16 +1584,16 @@ def test_newhadoop_with_array(self):
self.assertEqual(result, array_data)

conf = {
"mapreduce.outputformat.class":
"mapreduce.job.outputformat.class":
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable",
"mapred.output.dir": basepath + "/newdataset/"
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/newdataset/"
}
self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(
conf,
valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
input_conf = {"mapred.input.dir": basepath + "/newdataset/"}
input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/newdataset/"}
new_dataset = sorted(self.sc.newAPIHadoopRDD(
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
Expand Down Expand Up @@ -1663,18 +1663,19 @@ def test_reserialization(self):

conf4 = {
"mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.dir": basepath + "/reserialize/dataset"}
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/reserialize/dataset"}
rdd.saveAsHadoopDataset(conf4)
result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
self.assertEqual(result4, data)

conf5 = {"mapreduce.outputformat.class":
conf5 = {"mapreduce.job.outputformat.class":
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
"mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapred.output.dir": basepath + "/reserialize/newdataset"}
"mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapreduce.output.fileoutputformat.outputdir": basepath + "/reserialize/newdataset"
}
rdd.saveAsNewAPIHadoopDataset(conf5)
result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
self.assertEqual(result5, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ case class LoadDataCommand(
} else {
// Follow Hive's behavior:
// If no schema or authority is provided with non-local inpath,
// we will use hadoop configuration "fs.default.name".
val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.default.name")
// we will use hadoop configuration "fs.defaultFS".
val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
val defaultFS = if (defaultFSConf == null) {
new URI("")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ object FileFormatWriter extends Logging {
val taskAttemptContext: TaskAttemptContext = {
// Set up the configuration object
val hadoopConf = description.serializableHadoopConf.value
hadoopConf.set("mapred.job.id", jobId.toString)
hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
hadoopConf.set("mapred.task.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapred.task.is.map", true)
hadoopConf.setInt("mapred.task.partition", 0)
hadoopConf.set("mapreduce.job.id", jobId.toString)
hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapreduce.task.ismap", true)
hadoopConf.setInt("mapreduce.task.partition", 0)

new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte
// This is used to generate golden files.
sql("set hive.plan.serialization.format=kryo")
// Explicitly set fs to local fs.
sql(s"set fs.default.name=file://$testTempDir/")
sql(s"set fs.defaultFS=file://$testTempDir/")
// Ask Hive to run jobs in-process as a single map and reduce task.
sql("set mapred.job.tracker=local")
sql("set mapreduce.jobtracker.address=local")
}

override def afterAll() {
Expand Down Expand Up @@ -764,9 +764,9 @@ class HiveWindowFunctionQueryFileSuite
// This is used to generate golden files.
// sql("set hive.plan.serialization.format=kryo")
// Explicitly set fs to local fs.
// sql(s"set fs.default.name=file://$testTempDir/")
// sql(s"set fs.defaultFS=file://$testTempDir/")
// Ask Hive to run jobs in-process as a single map and reduce task.
// sql("set mapred.job.tracker=local")
// sql("set mapreduce.jobtracker.address=local")
}

override def afterAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ class HadoopTableReader(
hadoopConf: Configuration)
extends TableReader with Logging {

// Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
// https://hadoop.apache.org/docs/r1.0.4/mapred-default.html
// Hadoop honors "mapreduce.job.maps" as hint,
// but will ignore when mapreduce.jobtracker.address is "local".
// https://hadoop.apache.org/docs/r2.6.5/hadoop-mapreduce-client/hadoop-mapreduce-client-core/
// mapred-default.xml
//
// In order keep consistency with Hive, we will let it be 0 in local mode also.
private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(hadoopConf.getInt("mapred.map.tasks", 1),
math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
sparkSession.sparkContext.defaultMinPartitions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,16 @@ case class InsertIntoHiveTable(
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean

if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
hadoopConf.set("mapred.output.compress", "true")
// Please note that isCompressed, "mapreduce.output.fileoutputformat.compress",
// "mapreduce.output.fileoutputformat.compress.codec", and
// "mapreduce.output.fileoutputformat.compress.type"
// have no impact on ORC because it uses table properties to store compression information.
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
fileSinkConf.setCompressed(true)
fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type"))
fileSinkConf.setCompressCodec(hadoopConf
.get("mapreduce.output.fileoutputformat.compress.codec"))
fileSinkConf.setCompressType(hadoopConf
.get("mapreduce.output.fileoutputformat.compress.type"))
}

val numDynamicPartitions = partition.values.count(_.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ private[hive] class TestHiveSparkSession(
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }

// Some tests corrupt this value on purpose, which breaks the RESET call below.
sessionState.conf.setConfString("fs.default.name", new File(".").toURI.toString)
sessionState.conf.setConfString("fs.defaultFS", new File(".").toURI.toString)
// It is important that we RESET first as broken hooks that might have been set could break
// other sql exec here.
sessionState.metadataHive.runSqlHive("RESET")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set hive.auto.convert.join = true;

CREATE TABLE dest1(c1 INT, c2 STRING) STORED AS TEXTFILE;

set mapred.job.tracker=localhost:58;
set mapreduce.jobtracker.address=localhost:58;
set hive.exec.mode.local.auto=true;

explain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set hive.enforce.sorting = true;
set hive.exec.reducers.max = 1;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = true;
set mapred.reduce.tasks = 2;
set mapreduce.job.reduces = 2;

-- Tests that when a multi insert inserts into a bucketed table and a table which is not bucketed
-- the bucketed table is not merged and the table which is not bucketed is
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
set hive.enforce.bucketing = true;
set hive.exec.mode.local.auto=false;
set mapred.reduce.tasks = 10;
set mapreduce.job.reduces = 10;

-- This test sets number of mapred tasks to 10 for a database with 50 buckets,
-- and uses a post-hook to confirm that 10 tasks were created
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set mapred.min.split.size = 64;
set mapreduce.input.fileinputformat.split.minsize = 64;

CREATE TABLE T1(name STRING) STORED AS TEXTFILE;

Expand Down
Loading