Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
}

private def sparkContextInitialized(sc: SparkContext) = {
private def sparkContextInitialized(sc: SparkContext) = synchronized {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other code in this class uses synchronization on this, so I think it would be better to synchronize on sparkContextPromise in this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used sparkContextPromise as lock.

// Notify runDriver function that SparkContext is available
sparkContextPromise.success(sc)
// Pause the user class thread in order to make proper initialisation in runDriver function
wait()
}

private def registerAM(
Expand Down Expand Up @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
// After initialisation notify user class thread to continue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rest of the code uses American spelling ("initialization"), so this should be consistent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed and switched to US spell checker.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove this comment and add one to the resumeDriver method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.

synchronized { notify() }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have to do this in two places, I'd create a method (e.g. resumeDriver) close to where sparkContextInitialized is declared, so that it's easier to find the context of why this is needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into resumeDriver function right below sparkContextInitialized.

userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
Expand All @@ -506,6 +511,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} finally {
synchronized { notify() }
}
}

Expand Down