Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions clusters/clusters_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,22 +434,6 @@ type Cluster struct {
ClusterMounts []MountInfo `json:"cluster_mount_infos,omitempty" tf:"alias:cluster_mount_info"`
}

// TODO: Remove this once all the resources using clusters are migrated to Go SDK.
// They would then be using Validate(cluster compute.CreateCluster) defined in resource_cluster.go that is a duplicate of this method but uses Go SDK.
func (cluster Cluster) Validate() error {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was not being used anywhere.

// TODO: rewrite with CustomizeDiff
if cluster.NumWorkers > 0 || cluster.Autoscale != nil {
return nil
}
profile := cluster.SparkConf["spark.databricks.cluster.profile"]
master := cluster.SparkConf["spark.master"]
resourceClass := cluster.CustomTags["ResourceClass"]
if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" {
return nil
}
return errors.New(numWorkerErr)
}

// TODO: Remove this once all the resources using clusters are migrated to Go SDK.
// They would then be using ModifyRequestOnInstancePool(cluster *compute.CreateCluster) defined in resource_cluster.go that is a duplicate of this method but uses Go SDK.
// ModifyRequestOnInstancePool helps remove all request fields that should not be submitted when instance pool is selected.
Expand Down
29 changes: 24 additions & 5 deletions clusters/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,19 @@ func ZoneDiffSuppress(k, old, new string, d *schema.ResourceData) bool {
return false
}

// This method is a duplicate of Validate() in clusters/clusters_api.go that uses Go SDK.
// Long term, Validate() in clusters_api.go will be removed once all the resources using clusters are migrated to Go SDK.
func Validate(cluster any) error {
// The clusters API does not provide a great way to create a single node cluster.
// This function validates that a user has their cluster configured correctly IF
// they were trying to create a single node cluster. It does this by:
//
// 1. Asserting the correct cluster tags and spark conf are set when num_workers is 0
// and autoscaling is not enabled.
// 2. Skip the validation if a policy is configured on the cluster. This is done to allow
// users to configure spark_conf, custom_tags, num_workers, etc. in the policy itself.
//
// TODO: Once the clusters resource is migrated to the TF plugin framework, we should
// make this a warning instead of an error.
func ValidateIfSingleNode(cluster any) error {
var hasPolicyConfigured bool
var profile, master, resourceClass string
switch c := cluster.(type) {
case compute.CreateCluster:
Expand All @@ -142,23 +152,32 @@ func Validate(cluster any) error {
profile = c.SparkConf["spark.databricks.cluster.profile"]
master = c.SparkConf["spark.master"]
resourceClass = c.CustomTags["ResourceClass"]
hasPolicyConfigured = c.PolicyId != ""
case compute.EditCluster:
if c.NumWorkers > 0 || c.Autoscale != nil {
return nil
}
profile = c.SparkConf["spark.databricks.cluster.profile"]
master = c.SparkConf["spark.master"]
resourceClass = c.CustomTags["ResourceClass"]
hasPolicyConfigured = c.PolicyId != ""
case compute.ClusterSpec:
if c.NumWorkers > 0 || c.Autoscale != nil {
return nil
}
profile = c.SparkConf["spark.databricks.cluster.profile"]
master = c.SparkConf["spark.master"]
resourceClass = c.CustomTags["ResourceClass"]
hasPolicyConfigured = c.PolicyId != ""
default:
return fmt.Errorf(unsupportedExceptCreateEditClusterSpecErr, cluster, "", "", "")
}
// If a cluster has a policy configured then we skip validation regarding whether
// the single node cluster configuration is valid or not. This is done to allow
// users to configure spark_conf, custom_tags, num_workers, etc. in the policy itself.
if hasPolicyConfigured {
return nil
}
if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" {
return nil
}
Expand Down Expand Up @@ -445,7 +464,7 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
clusters := w.Clusters
var createClusterRequest compute.CreateCluster
common.DataToStructPointer(d, clusterSchema, &createClusterRequest)
if err := Validate(createClusterRequest); err != nil {
if err := ValidateIfSingleNode(createClusterRequest); err != nil {
return err
}
if err = ModifyRequestOnInstancePool(&createClusterRequest); err != nil {
Expand Down Expand Up @@ -595,7 +614,7 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo

if hasClusterConfigChanged(d) {
log.Printf("[DEBUG] Cluster state has changed!")
if err := Validate(cluster); err != nil {
if err := ValidateIfSingleNode(cluster); err != nil {
return err
}
if err = ModifyRequestOnInstancePool(&cluster); err != nil {
Expand Down
141 changes: 141 additions & 0 deletions clusters/resource_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,77 @@ func TestResourceClusterCreate_SingleNodeFail(t *testing.T) {
assert.EqualError(t, err, numWorkerErr)
}

func TestResourceClusterCreate_SingleNodeWithPolicy(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.1/clusters/create",
ExpectedRequest: compute.CreateCluster{
NumWorkers: 0,
ClusterName: "Single Node Cluster",
SparkVersion: "7.3.x-scala12",
NodeTypeId: "Standard_F4s",
AutoterminationMinutes: 120,
ForceSendFields: []string{"NumWorkers"},
PolicyId: "policy-123",
},
Response: compute.ClusterDetails{
ClusterId: "abc",
State: compute.StateRunning,
},
},
{
Method: "POST",
Resource: "/api/2.1/clusters/events",
ExpectedRequest: compute.GetEvents{
ClusterId: "abc",
Limit: 1,
Order: compute.GetEventsOrderDesc,
EventTypes: []compute.EventType{compute.EventTypePinned, compute.EventTypeUnpinned},
},
Response: compute.GetEventsResponse{
Events: []compute.ClusterEvent{},
TotalCount: 0,
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.1/clusters/get?cluster_id=abc",
Response: compute.ClusterDetails{
ClusterId: "abc",
ClusterName: "Single Node Cluster",
SparkVersion: "7.3.x-scala12",
NodeTypeId: "Standard_F4s",
AutoterminationMinutes: 120,
State: compute.StateRunning,
PolicyId: "policy-123",
},
},
{
Method: "GET",
Resource: "/api/2.0/libraries/cluster-status?cluster_id=abc",
Response: compute.ClusterLibraryStatuses{
LibraryStatuses: []compute.LibraryFullStatus{},
},
},
},
Create: true,
Resource: ResourceCluster(),
State: map[string]any{
"autotermination_minutes": 120,
"cluster_name": "Single Node Cluster",
"spark_version": "7.3.x-scala12",
"node_type_id": "Standard_F4s",
"is_pinned": false,
"policy_id": "policy-123",
},
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, 0, d.Get("num_workers"))
}

func TestResourceClusterCreate_NegativeNumWorkers(t *testing.T) {
_, err := qa.ResourceFixture{
Create: true,
Expand Down Expand Up @@ -1902,6 +1973,76 @@ func TestResourceClusterUpdate_FailNumWorkersZero(t *testing.T) {
assert.EqualError(t, err, numWorkerErr)
}

func TestResourceClusterUpdate_NumWorkersZeroWithPolicy(t *testing.T) {
_, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.1/clusters/get?cluster_id=abc",
ReuseRequest: true,
Response: compute.ClusterDetails{
ClusterId: "abc",
NumWorkers: 0,
ClusterName: "Shared Autoscaling",
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 15,
State: compute.StateTerminated,
PolicyId: "policy-123",
},
},
{
Method: "POST",
Resource: "/api/2.1/clusters/events",
ExpectedRequest: compute.GetEvents{
ClusterId: "abc",
Limit: 1,
Order: compute.GetEventsOrderDesc,
EventTypes: []compute.EventType{compute.EventTypePinned, compute.EventTypeUnpinned},
},
Response: compute.GetEventsResponse{
Events: []compute.ClusterEvent{},
TotalCount: 0,
},
},
{
Method: "POST",
Resource: "/api/2.1/clusters/edit",
ExpectedRequest: compute.ClusterDetails{
AutoterminationMinutes: 15,
ClusterId: "abc",
NumWorkers: 0,
ClusterName: "Shared Autoscaling",
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
PolicyId: "policy-123",
ForceSendFields: []string{"NumWorkers"},
},
},
},
ID: "abc",
Update: true,
Resource: ResourceCluster(),
InstanceState: map[string]string{
"autotermination_minutes": "15",
"cluster_name": "Shared Autoscaling",
"spark_version": "7.1-scala12",
"node_type_id": "i3.xlarge",
"num_workers": "100",
"policy_id": "policy-123",
},
State: map[string]any{
"autotermination_minutes": 15,
"cluster_name": "Shared Autoscaling",
"spark_version": "7.1-scala12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"policy_id": "policy-123",
},
}.Apply(t)
assert.NoError(t, err)
}

func TestModifyClusterRequestAws(t *testing.T) {
c := compute.CreateCluster{
InstancePoolId: "a",
Expand Down
2 changes: 1 addition & 1 deletion jobs/jobs_api_go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c controlRunStateLifecycleManagerGoSdk) OnUpdate(ctx context.Context) erro
}

func updateAndValidateJobClusterSpec(clusterSpec *compute.ClusterSpec, d *schema.ResourceData) error {
err := clusters.Validate(*clusterSpec)
err := clusters.ValidateIfSingleNode(*clusterSpec)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions jobs/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,12 +1072,12 @@ func ResourceJob() common.Resource {
if task.NewCluster == nil {
continue
}
if err := clusters.Validate(*task.NewCluster); err != nil {
if err := clusters.ValidateIfSingleNode(*task.NewCluster); err != nil {
return fmt.Errorf("task %s invalid: %w", task.TaskKey, err)
}
}
if js.NewCluster != nil {
if err := clusters.Validate(*js.NewCluster); err != nil {
if err := clusters.ValidateIfSingleNode(*js.NewCluster); err != nil {
return fmt.Errorf("invalid job cluster: %w", err)
}
}
Expand Down
102 changes: 102 additions & 0 deletions jobs/resource_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,55 @@ is defined in a policy used by the cluster. Please define this in the cluster co
itself to create a single node cluster.`)
}

func TestResourceJobCreate_SingleNodeJobClustersWithPolicy(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.0/jobs/create",
ExpectedRequest: JobSettings{
Name: "single node cluster",
MaxConcurrentRuns: 1,
Libraries: []compute.Library{
{
Jar: "dbfs://ff/gg/hh.jar",
},
},
NewCluster: &clusters.Cluster{
NumWorkers: 0,
PolicyID: "policy-123",
SparkVersion: "7.3.x-scala2.12",
},
},
Response: Job{
JobID: 17,
},
},
{
Method: "GET",
Resource: "/api/2.0/jobs/get?job_id=17",
Response: Job{
Settings: &JobSettings{},
},
},
},
Create: true,
Resource: ResourceJob(),
HCL: `
name = "single node cluster"
new_cluster {
spark_version = "7.3.x-scala2.12"
policy_id = "policy-123"
}
max_concurrent_runs = 1
library {
jar = "dbfs://ff/gg/hh.jar"
}`,
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "17", d.Id())
}

func TestResourceJobRead(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
Expand Down Expand Up @@ -2976,6 +3025,59 @@ is defined in a policy used by the cluster. Please define this in the cluster co
itself to create a single node cluster.`)
}

func TestResourceJobUpdate_SingleNodeJobClustersWithPolicy(t *testing.T) {
d, err := qa.ResourceFixture{
ID: "17",
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.0/jobs/reset",
ExpectedRequest: UpdateJobRequest{
JobID: 17,
NewSettings: &JobSettings{
Name: "single node cluster",
MaxConcurrentRuns: 1,
Libraries: []compute.Library{
{
Jar: "dbfs://ff/gg/hh.jar",
},
},
NewCluster: &clusters.Cluster{
NumWorkers: 0,
PolicyID: "policy-123",
SparkVersion: "7.3.x-scala2.12",
},
},
},
Response: Job{
JobID: 17,
},
},
{
Method: "GET",
Resource: "/api/2.0/jobs/get?job_id=17",
Response: Job{
Settings: &JobSettings{},
},
},
},
Update: true,
Resource: ResourceJob(),
HCL: `
name = "single node cluster"
new_cluster {
spark_version = "7.3.x-scala2.12"
policy_id = "policy-123"
}
max_concurrent_runs = 1
library {
jar = "dbfs://ff/gg/hh.jar"
}`,
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "17", d.Id())
}

func TestJobsAPIList(t *testing.T) {
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
{
Expand Down