Skip to content
87 changes: 61 additions & 26 deletions bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,90 @@ import (
"github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio"
terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
)

func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}

// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}

actions := make([]terraformlib.Action, 0)
for _, rc := range plan.ResourceChanges {
// We only care about destructive actions on UC schema resources.
if rc.Type != "databricks_schema" {
func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action {
res := make([]terraformlib.Action, 0)
for _, rc := range changes {
if !toInclude(rc.Type, rc.Change.Actions) {
continue
}

var actionType terraformlib.ActionType

switch {
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// We don't need a prompt for non-destructive actions like creating
// or updating a schema.
// No use case for other action types yet.
continue
}

actions = append(actions, terraformlib.Action{
res = append(res, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
})
}

// No restricted actions planned. No need for approval.
if len(actions) == 0 {
return res
}

func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}

// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}

schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only UC schema resources.
if typ != "databricks_schema" {
return false
}

// We only display prompts for destructive actions like deleting or
// recreating a schema.
return actions.Delete() || actions.Replace()
})

dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only DLT pipeline resources.
if typ != "databricks_pipeline" {
return false
}

// Recreating DLT pipeline leads to metadata loss and for a transient period
// the underling tables will be unavailable.
return actions.Replace()
Comment thread
shreyas-goenka marked this conversation as resolved.
Outdated
})

// We don't need to display any prompts in this case.
if len(dltActions) == 0 && len(schemaActions) == 0 {
return true, nil
}

cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range actions {
cmdio.Log(ctx, action)
// One or more UC schema resources will be deleted or recreated.
if len(schemaActions) != 0 {
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range schemaActions {
cmdio.Log(ctx, action)
}
}

// One or more DLT pipelines is being recreated.
if len(dltActions) != 0 {
cmdio.LogString(ctx, "The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period until the newly recreated pipelines are run once successfully. History of previous pipeline update runs will be lost because of recreation:")
Comment thread
shreyas-goenka marked this conversation as resolved.
Outdated
for _, action := range dltActions {
cmdio.Log(ctx, action)
}
}

if b.AutoApprove {
Expand Down Expand Up @@ -126,7 +161,7 @@ func Deploy() bundle.Mutator {
terraform.CheckRunningResource(),
terraform.Plan(terraform.PlanGoal("deploy")),
bundle.If(
approvalForUcSchemaDelete,
approvalForDeploy,
deployCore,
bundle.LogString("Deployment cancelled!"),
),
Expand Down
67 changes: 67 additions & 0 deletions bundle/phases/deploy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package phases

import (
"testing"

terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
"github.com/stretchr/testify/assert"
)

func TestParseTerraformActions(t *testing.T) {
changes := []*tfjson.ResourceChange{
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionCreate},
},
Name: "create pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete},
},
Name: "delete pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate pipeline",
},
{
Type: "databricks_whatever",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate whatever",
},
}

res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool {
if typ != "databricks_pipeline" {
return false
}

if actions.Delete() || actions.Replace() {
return true
}

return false
})

assert.Equal(t, []terraformlib.Action{
{
Action: terraformlib.ActionTypeDelete,
ResourceType: "databricks_pipeline",
ResourceName: "delete pipeline",
},
{
Action: terraformlib.ActionTypeRecreate,
ResourceType: "databricks_pipeline",
ResourceName: "recreate pipeline",
},
}, res)
}
6 changes: 6 additions & 0 deletions cmd/root/progress_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func (f *progressLoggerFlag) resolveModeDefault(format flags.ProgressLogFormat)
}

func (f *progressLoggerFlag) initializeContext(ctx context.Context) (context.Context, error) {
// No need to initialize the logger if it's already set in the context. This
// happens in unit tests where the logger is setup as a fixture.
if _, ok := cmdio.FromContext(ctx); ok {
return ctx, nil
}

if f.log.level.String() != "disabled" && f.log.file.String() == "stderr" &&
f.ProgressLogFormat == flags.ModeInplace {
return nil, fmt.Errorf("inplace progress logging cannot be used when log-file is stderr")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for the schema and pipeline names"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
bundle:
name: "bundle-playground"

variables:
catalog:
description: The catalog the DLT pipeline should use.
default: main


resources:
pipelines:
foo:
name: test-pipeline-{{.unique_id}}
libraries:
- notebook:
path: ./nb.sql
development: true
catalog: ${var.catalog}

include:
- "*.yml"

targets:
development:
default: true
2 changes: 2 additions & 0 deletions internal/bundle/bundles/recreate_pipeline/template/nb.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Databricks notebook source
select 1
38 changes: 37 additions & 1 deletion internal/bundle/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,43 @@ func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) {
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, _, err := c.Run()
stdout, stderr, err := c.Run()

assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), "The following UC schemas will be deleted or recreated. Any underlying data may be lost:\n delete schema bar")
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}

func TestAccBundlePipelineRecreateWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.UcWorkspaceTest(t)
w := wt.W
uniqueId := uuid.New().String()

bundleRoot, err := initTestTemplate(t, ctx, "recreate_pipeline", map[string]any{
"unique_id": uniqueId,
})
require.NoError(t, err)

err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)

t.Cleanup(func() {
destroyBundle(t, ctx, bundleRoot)
})

// Assert the pipeline is created
pipelineName := "test-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
require.Equal(t, pipelineName, pipeline.Name)

// Redeploy the bundle, pointing the DLT pipeline to a different UC catalog.
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--var=\"catalog=whatever\"")
stdout, stderr, err := c.Run()

assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), "The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period, until the newly recreated pipeline is run once successfully. History of previous pipeline update runs will be lost as a result of the recreation:\n recreate pipeline foo")
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}