-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22126][ML][WIP] Fix model-specific optimization support for ML tuning #19350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,10 +18,13 @@ | |
| package org.apache.spark.ml | ||
|
|
||
| import scala.annotation.varargs | ||
| import scala.concurrent.{ExecutionContext, Future} | ||
| import scala.concurrent.duration.Duration | ||
|
|
||
| import org.apache.spark.annotation.{DeveloperApi, Since} | ||
| import org.apache.spark.ml.param.{ParamMap, ParamPair} | ||
| import org.apache.spark.sql.Dataset | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
|
|
@@ -82,5 +85,32 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage { | |
| paramMaps.map(fit(dataset, _)) | ||
| } | ||
|
|
||
| @Since("2.3.0") | ||
| def fit(dataset: Dataset[_], paramMaps: Array[ParamMap], | ||
| unpersistDatasetAfterFitting: Boolean, executionContext: ExecutionContext, | ||
| modelCallback: (Model[_], ParamMap, Int) => Unit | ||
| ): Unit = { | ||
| // Fit models in a Future for training in parallel | ||
| val modelFutures = paramMaps.map { paramMap => | ||
| Future[Model[_]] { | ||
| fit(dataset, paramMap).asInstanceOf[Model[_]] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will this work in a pipeline? If the But if we have a stage with model-specific optimization (let's say for arguments sake a So that pushing the parallel fit into
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @MLnick Oh, the design is still under discussion on JIRA and will be changed I think. I should mark this WIP. thanks!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @MLnick I dicussed with @jkbradley @MrBago offline and here is the newest proposal |
||
| } (executionContext) | ||
| } | ||
|
|
||
| if (unpersistDatasetAfterFitting) { | ||
| // Unpersist training data only when all models have trained | ||
| Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext) | ||
| .onComplete { _ => dataset.unpersist() }(executionContext) | ||
| } | ||
|
|
||
| val modelCallbackFutures = modelFutures.zipWithIndex.map { | ||
| case (modelFuture, paramMapIndex) => | ||
| modelFuture.map { model => | ||
| modelCallback(model, paramMaps(paramMapIndex), paramMapIndex) | ||
| }(executionContext) | ||
| } | ||
| modelCallbackFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) | ||
| } | ||
|
|
||
| override def copy(extra: ParamMap): Estimator[M] | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add doc for this interface later. But it can be reviewed first, currently this interface looks a little ugly.