Skip to content

Commit 353606c

Browse files
committed
Code clean up
1 parent e3d4349 commit 353606c

File tree

7 files changed

+15
-33
lines changed

7 files changed

+15
-33
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -145,18 +145,7 @@ class HadoopMapReduceCommitProtocol(
145145
}
146146

147147
override def setupJob(jobContext: JobContext): Unit = {
148-
// Setup IDs
149-
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
150-
val taskId = new TaskID(jobId, TaskType.MAP, 0)
151-
val taskAttemptId = new TaskAttemptID(taskId, 0)
152-
153-
// Set up the configuration object
154-
jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
155-
jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
156-
jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
157-
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
158-
jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)
159-
148+
val taskAttemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
160149
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
161150
committer = setupCommitter(taskAttemptContext)
162151
committer.setupJob(jobContext)

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
4747
OrcColumnVector(DataType type, ColumnVector vector) {
4848
super(type);
4949

50-
if (type instanceof TimestampType) {
51-
isTimestamp = true;
52-
} else {
53-
isTimestamp = false;
54-
}
50+
isTimestamp = type instanceof TimestampType;
5551

5652
baseData = vector;
5753
if (vector instanceof LongColumnVector) {

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ private int readIntLittleEndian() throws IOException {
571571
int ch3 = in.read();
572572
int ch2 = in.read();
573573
int ch1 = in.read();
574-
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
574+
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
575575
}
576576

577577
/**
@@ -592,7 +592,7 @@ private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
592592
int ch3 = in.read();
593593
int ch2 = in.read();
594594
int ch1 = in.read();
595-
return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
595+
return (ch1 << 16) + (ch2 << 8) + (ch3);
596596
}
597597
case 4: {
598598
return readIntLittleEndian();

sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private[sql] object SQLUtils extends Logging {
188188
dataType match {
189189
case 's' =>
190190
// Read StructType for DataFrame
191-
val fields = SerDe.readList(dis, jvmObjectTracker = null).asInstanceOf[Array[Object]]
191+
val fields = SerDe.readList(dis, jvmObjectTracker = null)
192192
Row.fromSeq(fields)
193193
case _ => null
194194
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -410,12 +410,10 @@ object ViewHelper {
410410
}
411411

412412
// Detect cyclic references from subqueries.
413-
plan.expressions.foreach { expr =>
414-
expr match {
415-
case s: SubqueryExpression =>
416-
checkCyclicViewReference(s.plan, path, viewIdent)
417-
case _ => // Do nothing.
418-
}
413+
plan.expressions.foreach {
414+
case s: SubqueryExpression =>
415+
checkCyclicViewReference(s.plan, path, viewIdent)
416+
case _ => // Do nothing.
419417
}
420418
}
421419
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,18 +244,17 @@ object FileFormatWriter extends Logging {
244244
iterator: Iterator[InternalRow]): WriteTaskResult = {
245245

246246
val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
247-
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
248-
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
249247

250248
// Set up the attempt context required to use in the output committer.
251249
val taskAttemptContext: TaskAttemptContext = {
250+
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
251+
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
252252
// Set up the configuration object
253253
val hadoopConf = description.serializableHadoopConf.value
254254
hadoopConf.set("mapreduce.job.id", jobId.toString)
255255
hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
256256
hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
257257
hadoopConf.setBoolean("mapreduce.task.ismap", true)
258-
hadoopConf.setInt("mapreduce.task.partition", 0)
259258

260259
new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
261260
}
@@ -378,7 +377,7 @@ object FileFormatWriter extends Logging {
378377
dataSchema = description.dataColumns.toStructType,
379378
context = taskAttemptContext)
380379

381-
statsTrackers.map(_.newFile(currentPath))
380+
statsTrackers.foreach(_.newFile(currentPath))
382381
}
383382

384383
override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
@@ -429,10 +428,10 @@ object FileFormatWriter extends Logging {
429428
committer: FileCommitProtocol) extends ExecuteWriteTask {
430429

431430
/** Flag saying whether or not the data to be written out is partitioned. */
432-
val isPartitioned = desc.partitionColumns.nonEmpty
431+
private val isPartitioned = desc.partitionColumns.nonEmpty
433432

434433
/** Flag saying whether or not the data to be written out is bucketed. */
435-
val isBucketed = desc.bucketIdExpression.isDefined
434+
private val isBucketed = desc.bucketIdExpression.isDefined
436435

437436
assert(isPartitioned || isBucketed,
438437
s"""DynamicPartitionWriteTask should be used for writing out data that's either

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ class SQLAppStatusListener(
170170
.filter { case (id, _) => metricIds.contains(id) }
171171
.groupBy(_._1)
172172
.map { case (id, values) =>
173-
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
173+
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2))
174174
}
175175

176176
// Check the execution again for whether the aggregated metrics data has been calculated.

0 commit comments

Comments
 (0)