Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions clusters/clusters_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,22 +434,6 @@ type Cluster struct {
ClusterMounts []MountInfo `json:"cluster_mount_infos,omitempty" tf:"alias:cluster_mount_info"`
}

// TODO: Remove this once all the resources using clusters are migrated to Go SDK.
// They would then be using Validate(cluster compute.CreateCluster) defined in resource_cluster.go that is a duplicate of this method but uses Go SDK.
func (cluster Cluster) Validate() error {
Comment thread
alexott marked this conversation as resolved.
// TODO: rewrite with CustomizeDiff
if cluster.NumWorkers > 0 || cluster.Autoscale != nil {
return nil
}
profile := cluster.SparkConf["spark.databricks.cluster.profile"]
master := cluster.SparkConf["spark.master"]
resourceClass := cluster.CustomTags["ResourceClass"]
if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" {
return nil
}
return errors.New(numWorkerErr)
}

// TODO: Remove this once all the resources using clusters are migrated to Go SDK.
// They would then be using ModifyRequestOnInstancePool(cluster *compute.CreateCluster) defined in resource_cluster.go that is a duplicate of this method but uses Go SDK.
// ModifyRequestOnInstancePool helps remove all request fields that should not be submitted when instance pool is selected.
Expand Down
18 changes: 7 additions & 11 deletions jobs/jobs_api_go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,8 @@ func (c controlRunStateLifecycleManagerGoSdk) OnUpdate(ctx context.Context) erro
return StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate), w, ctx)
}

func updateAndValidateJobClusterSpec(clusterSpec *compute.ClusterSpec, d *schema.ResourceData) error {
err := clusters.Validate(*clusterSpec)
if err != nil {
return err
}
err = clusters.ModifyRequestOnInstancePool(clusterSpec)
func updateJobClusterSpec(clusterSpec *compute.ClusterSpec, d *schema.ResourceData) error {
err := clusters.ModifyRequestOnInstancePool(clusterSpec)
if err != nil {
return err
}
Expand All @@ -178,21 +174,21 @@ func updateAndValidateJobClusterSpec(clusterSpec *compute.ClusterSpec, d *schema

func prepareJobSettingsForUpdateGoSdk(d *schema.ResourceData, js *JobSettingsResource) error {
if js.NewCluster != nil {
err := updateAndValidateJobClusterSpec(js.NewCluster, d)
err := updateJobClusterSpec(js.NewCluster, d)
if err != nil {
return err
}
}
for _, task := range js.Tasks {
if task.NewCluster != nil {
err := updateAndValidateJobClusterSpec(task.NewCluster, d)
err := updateJobClusterSpec(task.NewCluster, d)
if err != nil {
return err
}
}
}
for i := range js.JobClusters {
err := updateAndValidateJobClusterSpec(&js.JobClusters[i].NewCluster, d)
err := updateJobClusterSpec(&js.JobClusters[i].NewCluster, d)
if err != nil {
return err
}
Expand All @@ -205,14 +201,14 @@ func prepareJobSettingsForCreateGoSdk(d *schema.ResourceData, jc *JobCreateStruc
// Before the go-sdk migration, the field `num_workers` was required, so we always sent it.
for _, task := range jc.Tasks {
if task.NewCluster != nil {
err := updateAndValidateJobClusterSpec(task.NewCluster, d)
err := updateJobClusterSpec(task.NewCluster, d)
if err != nil {
return err
}
}
}
for i := range jc.JobClusters {
err := updateAndValidateJobClusterSpec(&jc.JobClusters[i].NewCluster, d)
err := updateJobClusterSpec(&jc.JobClusters[i].NewCluster, d)
if err != nil {
return err
}
Expand Down
13 changes: 0 additions & 13 deletions jobs/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,19 +1068,6 @@ func ResourceJob() common.Resource {
return fmt.Errorf("`control_run_state` must be specified only with `max_concurrent_runs = 1`")
}
}
for _, task := range js.Tasks {
if task.NewCluster == nil {
continue
}
if err := clusters.Validate(*task.NewCluster); err != nil {
return fmt.Errorf("task %s invalid: %w", task.TaskKey, err)
}
}
if js.NewCluster != nil {
if err := clusters.Validate(*js.NewCluster); err != nil {
return fmt.Errorf("invalid job cluster: %w", err)
}
}
return nil
},
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
Expand Down
104 changes: 21 additions & 83 deletions jobs/resource_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,14 @@ func TestResourceJobCreate_JobClusters(t *testing.T) {
NotebookPath: "/Stuff",
},
},
{
TaskKey: "c",
NewCluster: &clusters.Cluster{
SparkVersion: "d",
NodeTypeID: "e",
NumWorkers: 0,
},
},
},
MaxConcurrentRuns: 1,
JobClusters: []JobCluster{
Expand All @@ -839,7 +847,7 @@ func TestResourceJobCreate_JobClusters(t *testing.T) {
NewCluster: &clusters.Cluster{
SparkVersion: "x",
NodeTypeID: "y",
NumWorkers: 9,
NumWorkers: 0,
},
},
},
Expand Down Expand Up @@ -883,7 +891,7 @@ func TestResourceJobCreate_JobClusters(t *testing.T) {
job_cluster {
job_cluster_key = "k"
new_cluster {
num_workers = 9
num_workers = 0
spark_version = "x"
node_type_id = "y"
}
Expand All @@ -910,7 +918,17 @@ func TestResourceJobCreate_JobClusters(t *testing.T) {
notebook_task {
notebook_path = "/Stuff"
}
}`,
}

task {
task_key = "c"
new_cluster {
spark_version = "d"
node_type_id = "e"
num_workers = 0
}
}
`,
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "17", d.Id())
Expand Down Expand Up @@ -2031,48 +2049,6 @@ func TestResourceJobCreateFromGitSourceWithoutProviderFail(t *testing.T) {
}.ExpectError(t, "git source is not empty but Git Provider is not specified and cannot be guessed by url &{GitBranch: GitCommit: GitProvider: GitSnapshot:<nil> GitTag:0.4.8 GitUrl:https://custom.git.hosting.com/databricks/terraform-provider-databricks JobSource:<nil> ForceSendFields:[]}")
}

func TestResourceJobCreateSingleNode_Fail(t *testing.T) {
_, err := qa.ResourceFixture{
Create: true,
Resource: ResourceJob(),
HCL: `new_cluster {
num_workers = 0
spark_version = "7.3.x-scala2.12"
node_type_id = "Standard_DS3_v2"
}
max_concurrent_runs = 1
max_retries = 3
min_retry_interval_millis = 5000
name = "Featurizer"
retry_on_timeout = true

spark_jar_task {
main_class_name = "com.labs.BarMain"
}
library {
jar = "dbfs://aa/bb/cc.jar"
}
library {
jar = "dbfs://ff/gg/hh.jar"
}`,
}.Apply(t)
assert.ErrorContains(t, err, `num_workers may be 0 only for single-node clusters. To create a single node
cluster please include the following configuration in your cluster configuration:

spark_conf = {
"spark.databricks.cluster.profile" : "singleNode"
"spark.master" : "local[*]"
}

custom_tags = {
"ResourceClass" = "SingleNode"
}

Please note that the Databricks Terraform provider cannot detect if the above configuration
is defined in a policy used by the cluster. Please define this in the cluster configuration
itself to create a single node cluster.`)
}

func TestResourceJobRead(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
Expand Down Expand Up @@ -2938,44 +2914,6 @@ func TestResourceJobDelete(t *testing.T) {
assert.Equal(t, "789", d.Id())
}

func TestResourceJobUpdate_FailNumWorkersZero(t *testing.T) {
_, err := qa.ResourceFixture{
ID: "789",
Update: true,
Resource: ResourceJob(),
HCL: `new_cluster {
num_workers = 0
spark_version = "7.3.x-scala2.12"
node_type_id = "Standard_DS3_v2"
}
max_concurrent_runs = 1
max_retries = 3
min_retry_interval_millis = 5000
name = "Featurizer New"
retry_on_timeout = true

spark_jar_task {
main_class_name = "com.labs.BarMain"
parameters = ["--cleanup", "full"]
}`,
}.Apply(t)
assert.ErrorContains(t, err, `num_workers may be 0 only for single-node clusters. To create a single node
cluster please include the following configuration in your cluster configuration:

spark_conf = {
"spark.databricks.cluster.profile" : "singleNode"
"spark.master" : "local[*]"
}

custom_tags = {
"ResourceClass" = "SingleNode"
}

Please note that the Databricks Terraform provider cannot detect if the above configuration
is defined in a policy used by the cluster. Please define this in the cluster configuration
itself to create a single node cluster.`)
}

func TestJobsAPIList(t *testing.T) {
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
{
Expand Down