Skip to content

Conversation

@steveloughran
Copy link
Contributor

@steveloughran steveloughran commented Nov 10, 2020

What changes were proposed in this pull request?

  1. Applies the SQL changes in SPARK-33230 to SparkHadoopWriter, so that rdd.saveAsNewAPIHadoopDataset passes in a unique job UUID in spark.sql.sources.writeJobUUID
  2. SparkHadoopWriterUtils.createJobTrackerID generates a JobID by appending a random long number to the supplied timestamp to ensure the probability of a collision is near-zero.
  3. With tests of uniqueness, round trips and negative jobID rejection.

Why are the changes needed?

Without this, if more than one job is started in the same second and the committer expects application attempt IDs to be unique is at risk of clashing with other jobs.

With the fix,

  • those committers which use the ID set in spark.sql.sources.writeJobUUID as a priority ID will pick that up instead and so be unique.
  • committers which use the Hadoop JobID for unique paths and filenames will get the randomly generated jobID. Assuming all clocks in a cluster in sync, the probability of two jobs launched in the same second has dropped from 1 to 1/(2^63)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit tests

There's a new test suite SparkHadoopWriterUtilsSuite which creates jobID, verifies they are unique even for the same timestamp and that they can be marshalled to string and parsed back in the hadoop code, which contains some (brittle) assumptions about the format of job IDs.

Functional Integration Tests

  1. Hadoop-trunk built with [HADOOP-17318], publishing to local maven repository
  2. Spark built with hadoop.version=3.4.0-SNAPSHOT to pick up these JARs.
  3. Spark + Object store integration tests at https://github.com/hortonworks-spark/cloud-integration were built against that local spark version
  4. And executed against AWS london.

The tests were run with fs.s3a.committer.require.uuid=true, so the s3a committers fail fast if they don't get a job ID down. This showed that rdd.saveAsNewAPIHadoopDataset wasn't setting the UUID option. It again uses the current Date value for an app attempt -which is not guaranteed to be unique.

With the change applied to spark, the relevant tests work, therefore the committers are getting unique job IDs.

…sql.sources.writeJobUUID"

Change-Id: I60d2a7e18601648b07258df9269f1af9c0a670b3
@steveloughran steveloughran changed the title [SPARK-33302][CORE] SparkHadoopWriter to set unique job ID in "spark.sql.sources.writeJobUUID" [SPARK-33402][CORE] SparkHadoopWriter to set unique job ID in "spark.sql.sources.writeJobUUID" Nov 10, 2020
@github-actions github-actions bot added the CORE label Nov 10, 2020
@SparkQA
Copy link

SparkQA commented Nov 10, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35487/

@SparkQA
Copy link

SparkQA commented Nov 10, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35487/

@steveloughran
Copy link
Contributor Author

FYI, @dongjoon-hyun @rdblue -this is the rdd version of the sql-side patch. Tests downstream can now flag the absence/presence of this property when I set the committer to fail in job set up if it wasn't found

@SparkQA
Copy link

SparkQA commented Nov 10, 2020

Test build #130881 has finished for PR 30319 at commit 5dd9984.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran
Copy link
Contributor Author

For full due diligence, one test suite did fail as Hive couldn't bring up an S3A connector instance in the isolated classloader. https://issues.apache.org/jira/browse/HADOOP-17372

@dongjoon-hyun
Copy link
Member

Thanks, @steveloughran .
cc @sunchao , too

@steveloughran
Copy link
Contributor Author

I'm thinking it may be best to not only do this but do better randomness in the creation of the timestamp of the artificial app attempt ID. Today: Date() is used, hence the conflict if >1 job is started in the same second. The staging committer is most vulnerable to this, but if someone uses FileOutputCommitter to the same destination dir and overwrite is enabled, the same conflict occurs

I'm making sure the S3a committers pick up this UUID everywhere (staging already does for the clusterfs, but not for local task attempt dir). What I'm not going to go near is the classic FileOutputCommitter, for the following reason: fear

I don't want to go anywhere near that committer as it is way too critical, and it contains deep assumptions that application attempt IDs are sequential; Hadoop MR uses that for recoverability on restarted job attempts. Spark doesn't worry about failed drivers, so doesn't need that sequential naming,

* The generated JobID combines a pair of random long numbers to ensure
  the probability of a collision is near-zero.
* with tests of uniqueness, round trips and negative jobID rejection.

Change-Id: I7572f7ee358de3f3abe61f2a60d92d5a09e64c2f
@dongjoon-hyun
Copy link
Member

Spark built with hadoop.version=3.4.0-SNAPSHOT to pick up these JARS. For reasons I don't understand, spark master wouldn't build with that hadoop version, but an internal spark 2.x branch was happy.

For the above, we were blocked at #30135 [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.0 due to apache/hadoop#2411 HADOOP-17324. Don't relocate org.bouncycastle in shaded client jars and it's resolved yesterday.

l2 = -l2
}
// use leading zeros to ensure the id is always at least four digits long
f"$l1%04d$l2"
Copy link
Member

@dongjoon-hyun dongjoon-hyun Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is a random string, this looks like a big change to me. Isn't it enough to add a random number at the end of the existing string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did think about doing something here retaining date and time. If you want a random number at the end, happy to oblige. What's key is: it must be the digits 0-9 only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the patch. Timestamp + single random long...reduces probability of collision by 2^63.

entropy at the end

Change-Id: Id449ce9eee9dbc1a4b2b78740406e3d5850ee6c9
@steveloughran
Copy link
Contributor Author

steveloughran commented Nov 11, 2020

For the above, we were blocked at #30135 [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.0 due to apache/hadoop#2411 HADOOP-17324. Don't relocate org.bouncycastle in shaded client jars and it's resolved yesterday.

I am happy someone else is also trying to build trunk branches of multiple things together. It spreads the joy

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35547/

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35547/

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35550/

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35550/

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Test build #130942 has finished for PR 30319 at commit 72331d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 11, 2020

Test build #130944 has finished for PR 30319 at commit 1150360.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran steveloughran changed the title [SPARK-33402][CORE] SparkHadoopWriter to set unique job ID in "spark.sql.sources.writeJobUUID" [SPARK-33402][CORE] Jobs launched in same second have duplicate MapReduce JobIDs Nov 11, 2020
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you @steveloughran .
Merged to master/3.0.

dongjoon-hyun pushed a commit that referenced this pull request Nov 11, 2020
…duce JobIDs

### What changes were proposed in this pull request?

1. Applies the SQL changes in SPARK-33230 to SparkHadoopWriter, so that `rdd.saveAsNewAPIHadoopDataset` passes in a unique job UUID in `spark.sql.sources.writeJobUUID`
1. `SparkHadoopWriterUtils.createJobTrackerID` generates a JobID by appending a random long number to the supplied timestamp to ensure the probability of a collision is near-zero.
1. With tests of uniqueness, round trips and negative jobID rejection.

### Why are the changes needed?

Without this, if more than one job is started in the same second *and the committer expects application attempt IDs to be unique* is at risk of clashing with other jobs.

With the fix,

* those committers which use the ID set in `spark.sql.sources.writeJobUUID` as a priority ID will pick that up instead and so be unique.
* committers which use the Hadoop JobID for unique paths and filenames will get the randomly generated jobID.  Assuming all clocks in a cluster in sync, the probability of two jobs launched in the same second has dropped from 1 to 1/(2^63)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests

There's a new test suite SparkHadoopWriterUtilsSuite which creates jobID, verifies they are unique even for the same timestamp and that they can be marshalled to string and parsed back in the hadoop code, which contains some (brittle) assumptions about the format of job IDs.

Functional Integration Tests

1. Hadoop-trunk built with [HADOOP-17318], publishing to local maven repository
1. Spark built with hadoop.version=3.4.0-SNAPSHOT to pick up these JARs.
1. Spark + Object store integration tests at [https://github.com/hortonworks-spark/cloud-integration](https://github.com/hortonworks-spark/cloud-integration) were built against that local spark version
1. And executed against AWS london.

The tests were run with `fs.s3a.committer.require.uuid=true`, so the s3a committers fail fast if they don't get a job ID down. This showed that `rdd.saveAsNewAPIHadoopDataset` wasn't setting the UUID option. It again uses the current Date value for an app attempt -which is not guaranteed to be unique.

With the change applied to spark, the relevant tests work, therefore the committers are getting unique job IDs.

Closes #30319 from steveloughran/BUG/SPARK-33402-jobuuid.

Authored-by: Steve Loughran <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 318a173)
Signed-off-by: Dongjoon Hyun <[email protected]>
@steveloughran
Copy link
Contributor Author

Thanks!

@steveloughran steveloughran deleted the BUG/SPARK-33402-jobuuid branch November 12, 2020 10:27
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 11, 2021
…duce JobIDs

### What changes were proposed in this pull request?

1. Applies the SQL changes in SPARK-33230 to SparkHadoopWriter, so that `rdd.saveAsNewAPIHadoopDataset` passes in a unique job UUID in `spark.sql.sources.writeJobUUID`
1. `SparkHadoopWriterUtils.createJobTrackerID` generates a JobID by appending a random long number to the supplied timestamp to ensure the probability of a collision is near-zero.
1. With tests of uniqueness, round trips and negative jobID rejection.

### Why are the changes needed?

Without this, if more than one job is started in the same second *and the committer expects application attempt IDs to be unique* is at risk of clashing with other jobs.

With the fix,

* those committers which use the ID set in `spark.sql.sources.writeJobUUID` as a priority ID will pick that up instead and so be unique.
* committers which use the Hadoop JobID for unique paths and filenames will get the randomly generated jobID.  Assuming all clocks in a cluster in sync, the probability of two jobs launched in the same second has dropped from 1 to 1/(2^63)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests

There's a new test suite SparkHadoopWriterUtilsSuite which creates jobID, verifies they are unique even for the same timestamp and that they can be marshalled to string and parsed back in the hadoop code, which contains some (brittle) assumptions about the format of job IDs.

Functional Integration Tests

1. Hadoop-trunk built with [HADOOP-17318], publishing to local maven repository
1. Spark built with hadoop.version=3.4.0-SNAPSHOT to pick up these JARs.
1. Spark + Object store integration tests at [https://github.com/hortonworks-spark/cloud-integration](https://github.com/hortonworks-spark/cloud-integration) were built against that local spark version
1. And executed against AWS london.

The tests were run with `fs.s3a.committer.require.uuid=true`, so the s3a committers fail fast if they don't get a job ID down. This showed that `rdd.saveAsNewAPIHadoopDataset` wasn't setting the UUID option. It again uses the current Date value for an app attempt -which is not guaranteed to be unique.

With the change applied to spark, the relevant tests work, therefore the committers are getting unique job IDs.

Closes apache#30319 from steveloughran/BUG/SPARK-33402-jobuuid.

Authored-by: Steve Loughran <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 11, 2021
…duce JobIDs

### What changes were proposed in this pull request?

1. Applies the SQL changes in SPARK-33230 to SparkHadoopWriter, so that `rdd.saveAsNewAPIHadoopDataset` passes in a unique job UUID in `spark.sql.sources.writeJobUUID`
1. `SparkHadoopWriterUtils.createJobTrackerID` generates a JobID by appending a random long number to the supplied timestamp to ensure the probability of a collision is near-zero.
1. With tests of uniqueness, round trips and negative jobID rejection.

### Why are the changes needed?

Without this, if more than one job is started in the same second *and the committer expects application attempt IDs to be unique* is at risk of clashing with other jobs.

With the fix,

* those committers which use the ID set in `spark.sql.sources.writeJobUUID` as a priority ID will pick that up instead and so be unique.
* committers which use the Hadoop JobID for unique paths and filenames will get the randomly generated jobID.  Assuming all clocks in a cluster in sync, the probability of two jobs launched in the same second has dropped from 1 to 1/(2^63)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests

There's a new test suite SparkHadoopWriterUtilsSuite which creates jobID, verifies they are unique even for the same timestamp and that they can be marshalled to string and parsed back in the hadoop code, which contains some (brittle) assumptions about the format of job IDs.

Functional Integration Tests

1. Hadoop-trunk built with [HADOOP-17318], publishing to local maven repository
1. Spark built with hadoop.version=3.4.0-SNAPSHOT to pick up these JARs.
1. Spark + Object store integration tests at [https://github.com/hortonworks-spark/cloud-integration](https://github.com/hortonworks-spark/cloud-integration) were built against that local spark version
1. And executed against AWS london.

The tests were run with `fs.s3a.committer.require.uuid=true`, so the s3a committers fail fast if they don't get a job ID down. This showed that `rdd.saveAsNewAPIHadoopDataset` wasn't setting the UUID option. It again uses the current Date value for an app attempt -which is not guaranteed to be unique.

With the change applied to spark, the relevant tests work, therefore the committers are getting unique job IDs.

Closes apache#30319 from steveloughran/BUG/SPARK-33402-jobuuid.

Authored-by: Steve Loughran <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
jobContext.getConfiguration.set("spark.sql.sources.writeJobUUID",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why is this needed. Can't commit protocol just get a unique job ID from the constructor parameters?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L32-L34

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants