Skip to content

Commit 5aa7a2b

Browse files
committed
commit protocol support old hadoop api
1 parent b837bf9 commit 5aa7a2b

File tree

7 files changed

+457
-337
lines changed

7 files changed

+457
-337
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.io
19+
20+
import org.apache.hadoop.mapred._
21+
import org.apache.hadoop.mapreduce.{TaskAttemptContext => NewTaskAttemptContext}
22+
23+
/**
24+
* An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter
25+
* (from the old mapred API).
26+
*
27+
* Unlike Hadoop's OutputCommitter, this implementation is serializable.
28+
*/
29+
class HadoopMapRedCommitProtocol(jobId: String, path: String)
30+
extends HadoopMapReduceCommitProtocol(jobId, path) {
31+
32+
override def setupCommitter(context: NewTaskAttemptContext): OutputCommitter = {
33+
val config = context.getConfiguration.asInstanceOf[JobConf]
34+
config.getOutputCommitter
35+
}
36+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.io
19+
20+
import scala.reflect.ClassTag
21+
22+
import org.apache.hadoop.mapreduce._
23+
24+
import org.apache.spark.SparkConf
25+
26+
/**
27+
* Interface for create output format/committer/writer used during saving an RDD using a Hadoop
28+
* OutputFormat (both from the old mapred API and the new mapreduce API)
29+
*
30+
* Notes:
31+
* 1. Implementations should throw [[IllegalArgumentException]] when wrong hadoop API is
32+
* referenced;
33+
* 2. Implementations must be serializable, as the instance instantiated on the driver
34+
* will be used for tasks on executors;
35+
* 3. Implementations should have a constructor with exactly one argument:
36+
* (conf: SerializableConfiguration) or (conf: SerializableJobConf).
37+
*/
38+
abstract class HadoopWriteConfigUtil[K, V: ClassTag] extends Serializable {
39+
40+
// --------------------------------------------------------------------------
41+
// Create JobContext/TaskAttemptContext
42+
// --------------------------------------------------------------------------
43+
44+
def createJobContext(jobTrackerId: String, jobId: Int): JobContext
45+
46+
def createTaskAttemptContext(
47+
jobTrackerId: String,
48+
jobId: Int,
49+
splitId: Int,
50+
taskAttemptId: Int): TaskAttemptContext
51+
52+
// --------------------------------------------------------------------------
53+
// Create committer
54+
// --------------------------------------------------------------------------
55+
56+
def createCommitter(jobId: Int): HadoopMapReduceCommitProtocol
57+
58+
// --------------------------------------------------------------------------
59+
// Create writer
60+
// --------------------------------------------------------------------------
61+
62+
def initWriter(taskContext: TaskAttemptContext, splitId: Int): Unit
63+
64+
def write(pair: (K, V)): Unit
65+
66+
def closeWriter(taskContext: TaskAttemptContext): Unit
67+
68+
// --------------------------------------------------------------------------
69+
// Create OutputFormat
70+
// --------------------------------------------------------------------------
71+
72+
def initOutputFormat(jobContext: JobContext): Unit
73+
74+
// --------------------------------------------------------------------------
75+
// Verify hadoop config
76+
// --------------------------------------------------------------------------
77+
78+
def assertConf(jobContext: JobContext, conf: SparkConf): Unit
79+
}

core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala

Lines changed: 0 additions & 181 deletions
This file was deleted.

0 commit comments

Comments
 (0)