From 5dd9984c783854ebdcb115cad6edcb6ca4613c4d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 10 Nov 2020 16:09:07 +0000 Subject: [PATCH 1/3] [SPARK-33402][CORE] SparkHadoopWriter to set unique job ID in "spark.sql.sources.writeJobUUID" Change-Id: I60d2a7e18601648b07258df9269f1af9c0a670b3 --- .../org/apache/spark/internal/io/SparkHadoopWriter.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 6d174b5e0f81..37b470802067 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -18,7 +18,7 @@ package org.apache.spark.internal.io import java.text.NumberFormat -import java.util.{Date, Locale} +import java.util.{Date, Locale, UUID} import scala.reflect.ClassTag @@ -70,6 +70,11 @@ object SparkHadoopWriter extends Logging { // Assert the output format/key/value class is set in JobConf. config.assertConf(jobContext, rdd.conf) + // propagate the description UUID into the jobs, so that committers + // get an ID guaranteed to be unique. + jobContext.getConfiguration.set("spark.sql.sources.writeJobUUID", + UUID.randomUUID.toString) + val committer = config.createCommitter(commitJobId) committer.setupJob(jobContext) From 72331d779090329ae74db429ab3724c0656a598a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 11 Nov 2020 15:03:22 +0000 Subject: [PATCH 2/3] [SPARK-33402][CORE] Jobs launched in same second have duplicate JobIDs * The generated JobID combines a pair of random long numbers to ensure the probability of a collision is near-zero. * with tests of uniqueness, round trips and negative jobID rejection. Change-Id: I7572f7ee358de3f3abe61f2a60d92d5a09e64c2f --- .../internal/io/SparkHadoopWriterUtils.scala | 32 +++++- .../io/SparkHadoopWriterUtilsSuite.scala | 102 ++++++++++++++++++ 2 files changed, 130 insertions(+), 4 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/internal/io/SparkHadoopWriterUtilsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala index de828a6d6156..63ae5358eb87 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala @@ -17,10 +17,9 @@ package org.apache.spark.internal.io -import java.text.SimpleDateFormat -import java.util.{Date, Locale} +import java.util.Date -import scala.util.DynamicVariable +import scala.util.{DynamicVariable, Random} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.{JobConf, JobID} @@ -37,14 +36,39 @@ private[spark] object SparkHadoopWriterUtils { private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 + private val RAND = new Random() + /** + * Create a job ID. + * + * @param time (current) time + * @param id job number + * @return a job ID + */ def createJobID(time: Date, id: Int): JobID = { + if (id < 0) { + throw new IllegalArgumentException("Job number is negative") + } val jobtrackerID = createJobTrackerID(time) new JobID(jobtrackerID, id) } + /** + * Generate an ID for a job tracker. + * @param time (current) time + * @return a string for a job ID + */ def createJobTrackerID(time: Date): String = { - new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time) + var l1 = RAND.nextLong() + if (l1 < 0) { + l1 = -l1 + } + var l2 = RAND.nextLong() + if (l2 < 0) { + l2 = -l2 + } + // use leading zeros to ensure the id is always at least four digits long + f"$l1%04d$l2" } def createPathFromString(path: String, conf: JobConf): Path = { diff --git a/core/src/test/scala/org/apache/spark/internal/io/SparkHadoopWriterUtilsSuite.scala b/core/src/test/scala/org/apache/spark/internal/io/SparkHadoopWriterUtilsSuite.scala new file mode 100644 index 000000000000..33b58ec9e666 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/internal/io/SparkHadoopWriterUtilsSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import java.util.Date + +import org.apache.hadoop.mapreduce.JobID + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.io.SparkHadoopWriterUtils.createJobID + +/** + * Unit tests for functions in SparkHadoopWriterUtils. + */ +class SparkHadoopWriterUtilsSuite extends SparkFunSuite { + + /** + * Core test of JobID generation: + * They are created. + * The job number is converted to the job ID. + * They round trip to string and back + * (which implies that the full string matches the regexp + * in the JobID class). + */ + test("JobID Generation") { + val jobNumber = 1010 + val j1 = createJobID(new Date(), jobNumber) + assert(jobNumber == j1.getId, + s"Job number mismatch in $j1") + + val jobStr = j1.toString + // the string value begins with job_ + assert(jobStr.startsWith("job_"), + s"wrong prefix of $jobStr") + // and the hadoop code can parse it + val j2 = roundTrip(j1) + assert(j1.getId == j2.getId, "Job ID mismatch") + assert(j1.getJtIdentifier == j2.getJtIdentifier, "Job identifier mismatch") + } + + /** + * This is the problem surfacing in situations where committers expect + * Job IDs to be unique: if the timestamp is (exclusively) used + * then there will conflict in directories created. + */ + test("JobIDs generated at same time are different") { + val now = new Date() + val j1 = createJobID(now, 1) + val j2 = createJobID(now, 1) + assert(j1.toString != j2.toString) + } + + /** + * There's nothing explicitly in the Hadoop classes to stop + * job numbers being negative. + * There's some big assumptions in the FileOutputCommitter about attempt IDs + * being positive during any recovery operations; for safety the ID + * job number is validated. + */ + test("JobIDs with negative job number") { + intercept[IllegalArgumentException] { + createJobID(new Date(), -1) + } + } + + /** + * If someone ever does reinstate use of timestamps, + * make sure that the case of timestamp == 0 is handled. + */ + test("JobIDs on Epoch are different") { + val j1 = createJobID(new Date(0), 0) + val j2 = createJobID(new Date(0), 0) + assert (j1.toString != j2.toString) + } + + /** + * Do a round trip as a string and back again. + * This uses the JobID parser. + * @param jobID job ID + * @return the returned jobID + */ + private def roundTrip(jobID: JobID): JobID = { + val parsedJobId = JobID.forName(jobID.toString) + assert(jobID == parsedJobId, "Round trip was inconsistent") + parsedJobId + } +} From 115036067697d978ddddc48adba78521f1b6d327 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 11 Nov 2020 15:41:31 +0000 Subject: [PATCH 3/3] [SPARK-33402] use date as prefix, single random long adds 2^63 bits of entropy at the end Change-Id: Id449ce9eee9dbc1a4b2b78740406e3d5850ee6c9 --- .../spark/internal/io/SparkHadoopWriterUtils.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala index 63ae5358eb87..657842c620f3 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala @@ -17,7 +17,8 @@ package org.apache.spark.internal.io -import java.util.Date +import java.text.SimpleDateFormat +import java.util.{Date, Locale} import scala.util.{DynamicVariable, Random} @@ -59,16 +60,12 @@ object SparkHadoopWriterUtils { * @return a string for a job ID */ def createJobTrackerID(time: Date): String = { + val base = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time) var l1 = RAND.nextLong() if (l1 < 0) { l1 = -l1 } - var l2 = RAND.nextLong() - if (l2 < 0) { - l2 = -l2 - } - // use leading zeros to ensure the id is always at least four digits long - f"$l1%04d$l2" + base + l1 } def createPathFromString(path: String, conf: JobConf): Path = {