Skip to content

[1/n] Add should persist column to artf table #1390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 6, 2023
Merged
2 changes: 1 addition & 1 deletion scripts/install_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
ui_directory = join(os.environ["HOME"], ".aqueduct", "ui")

# Make sure to update this if there is any schema change we want to include in the upgrade.
SCHEMA_VERSION = "27"
SCHEMA_VERSION = "28"


def execute_command(args, cwd=None):
Expand Down
7 changes: 7 additions & 0 deletions src/golang/cmd/migrator/migrator/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
_000025 "github.com/aqueducthq/aqueduct/cmd/migrator/versions/000025_add_storage_migration_table"
_000026 "github.com/aqueducthq/aqueduct/cmd/migrator/versions/000026_drop_integration_validated_column"
_000027 "github.com/aqueducthq/aqueduct/cmd/migrator/versions/000027_rename_integrations_table"
_000028 "github.com/aqueducthq/aqueduct/cmd/migrator/versions/000028_add_artifact_should_persist_column"
"github.com/aqueducthq/aqueduct/lib/database"
)

Expand Down Expand Up @@ -198,4 +199,10 @@ func init() {
downPostgres: _000027.DownPostgres,
name: "rename integration table to resource",
}

registeredMigrations[28] = &migration{
upPostgres: _000028.UpPostgres, upSqlite: _000028.UpSqlite,
downPostgres: _000028.DownPostgres,
name: "add should_persist column to artifact table",
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package _000028_add_artifact_should_persist_column

const downPostgresScript = `
ALTER TABLE artifact DROP COLUMN IF EXISTS should_persist;
`
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package _000028_add_artifact_should_persist_column

import (
"context"

"github.com/aqueducthq/aqueduct/lib/database"
)

func UpPostgres(ctx context.Context, db database.Database) error {
return db.Execute(ctx, upPostgresScript)
}

func UpSqlite(ctx context.Context, db database.Database) error {
return db.Execute(ctx, upSqliteScript)
}

func DownPostgres(ctx context.Context, db database.Database) error {
return db.Execute(ctx, downPostgresScript)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package _000028_add_artifact_should_persist_column

const upPostgresScript = `
ALTER TABLE artifact
ADD COLUMN should_persist BOOL DEFAULT TRUE NOT NULL;
`
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package _000028_add_artifact_should_persist_column

const upSqliteScript = `
ALTER TABLE artifact
ADD COLUMN should_persist BOOL DEFAULT TRUE NOT NULL;
`
19 changes: 11 additions & 8 deletions src/golang/lib/models/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ const (
ArtifactTable = "artifact"

// Artifact column names
ArtifactID = "id"
ArtifactName = "name"
ArtifactDescription = "description"
ArtifactType = "type"
ArtifactID = "id"
ArtifactName = "name"
ArtifactDescription = "description"
ArtifactType = "type"
ArtifactShouldPersist = "should_persist"
)

// An Artifact maps to the artifact table.
type Artifact struct {
ID uuid.UUID `db:"id" json:"id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Type shared.ArtifactType `db:"type" json:"type"`
ID uuid.UUID `db:"id" json:"id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Type shared.ArtifactType `db:"type" json:"type"`
ShouldPersist bool `db:"should_persist" json:"should_persist"`
}

// ArtifactCols returns a comma-separated string of all Artifact columns.
Expand All @@ -37,6 +39,7 @@ func AllArtifactCols() []string {
ArtifactName,
ArtifactDescription,
ArtifactType,
ArtifactShouldPersist,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/models/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const (
// This is the source of truth for the required schema version
// for both the server and executor. This value MUST be updated
// when a new schema change is added.
CurrentSchemaVersion = 27
CurrentSchemaVersion = 28

SchemaVersionTable = "schema_version"

Expand Down
11 changes: 6 additions & 5 deletions src/golang/lib/models/views/artifact_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ const (
)

type ArtifactNode struct {
ID uuid.UUID `db:"id" json:"id"`
DagID uuid.UUID `db:"dag_id" json:"dag_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Type shared.ArtifactType `db:"type" json:"type"`
ID uuid.UUID `db:"id" json:"id"`
DagID uuid.UUID `db:"dag_id" json:"dag_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Type shared.ArtifactType `db:"type" json:"type"`
ShouldPersist bool `db:"should_persist" json:"should_persist"`

Input uuid.UUID `db:"input" json:"input"`
Outputs shared.NullableIndexedList[uuid.UUID] `db:"outputs" json:"outputs"`
Expand Down
37 changes: 20 additions & 17 deletions src/golang/lib/models/views/merged_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,28 @@ import (
)

const (
OperatorWithArtifactNodeView = "merged_node"
OperatorWithArtifactNodeID = "id"
OperatorWithArtifactNodeDagID = "dag_id"
OperatorWithArtifactNodeArtifactID = "artifact_id"
OperatorWithArtifactNodeName = "name"
OperatorWithArtifactNodeDescription = "description"
OperatorWithArtifactNodeSpec = "spec"
OperatorWithArtifactNodeType = "type"
OperatorWithArtifactNodeInputs = "inputs"
OperatorWithArtifactNodeOutputs = "outputs"
OperatorWithArtifactNodeView = "merged_node"
OperatorWithArtifactNodeID = "id"
OperatorWithArtifactNodeDagID = "dag_id"
OperatorWithArtifactNodeArtifactID = "artifact_id"
OperatorWithArtifactNodeName = "name"
OperatorWithArtifactNodeDescription = "description"
OperatorWithArtifactNodeSpec = "spec"
OperatorWithArtifactNodeType = "type"
OperatorWithArtifactNodeInputs = "inputs"
OperatorWithArtifactNodeOutputs = "outputs"
OperatorWithArtifactNodeShouldPersist = "should_persist"
)

type OperatorWithArtifactNode struct {
ID uuid.UUID `db:"id" json:"id"`
DagID uuid.UUID `db:"dag_id" json:"dag_id"`
ArtifactID uuid.UUID `db:"artifact_id" json:"artifact_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Spec operator.Spec `db:"spec" json:"spec"`
Type shared.ArtifactType `db:"type" json:"type"`
ID uuid.UUID `db:"id" json:"id"`
DagID uuid.UUID `db:"dag_id" json:"dag_id"`
ArtifactID uuid.UUID `db:"artifact_id" json:"artifact_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Spec operator.Spec `db:"spec" json:"spec"`
Type shared.ArtifactType `db:"type" json:"type"`
ShouldPersist bool `db:"should_persist" json:"should_persist"`

Inputs shared.NullableIndexedList[uuid.UUID] `db:"inputs" json:"inputs"`
Outputs shared.NullableIndexedList[uuid.UUID] `db:"outputs" json:"outputs"`
Expand Down Expand Up @@ -62,5 +64,6 @@ func allOperatorWithArtifactNodeCols() []string {
OperatorWithArtifactNodeType,
OperatorWithArtifactNodeInputs,
OperatorWithArtifactNodeOutputs,
OperatorWithArtifactNodeShouldPersist,
}
}
32 changes: 22 additions & 10 deletions src/golang/lib/repos/sqlite/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const artifactNodeViewSubQuery = `
workflow_dag.id AS dag_id,
artifact.name AS name,
artifact.description AS description,
artifact.should_persist AS should_persist,
artifact.type as type,
CAST( json_group_array( -- Group to_ids and idx into one array
json_object(
Expand All @@ -43,6 +44,7 @@ const artifactNodeViewSubQuery = `
workflow_dag.id AS dag_id,
artifact.name AS name,
artifact.description AS description,
artifact.should_persist AS should_persist,
artifact.type as type,
workflow_dag_edge.from_id AS input
FROM
Expand All @@ -56,6 +58,7 @@ const artifactNodeViewSubQuery = `
artf_with_input.dag_id AS dag_id,
artf_with_input.name AS name,
artf_with_input.description AS description,
artf_with_input.should_persist AS should_persist,
artf_with_input.type AS type,
artf_with_outputs.outputs AS outputs,
artf_with_input.input AS input
Expand Down Expand Up @@ -209,11 +212,12 @@ func (*artifactReader) GetMetricsByUpstreamArtifactBatch(

type artifactWithUpstreamID struct {
// copy of artifact
ID uuid.UUID `db:"id"`
Name string `db:"name"`
Description string `db:"description"`
Type shared.ArtifactType `db:"type"`
UpstreamID uuid.UUID `db:"upstream_id"`
ID uuid.UUID `db:"id"`
Name string `db:"name"`
Description string `db:"description"`
Type shared.ArtifactType `db:"type"`
ShouldPersist bool `db:"should_persist"`
UpstreamID uuid.UUID `db:"upstream_id"`
}

var queryRows []artifactWithUpstreamID
Expand All @@ -225,10 +229,11 @@ func (*artifactReader) GetMetricsByUpstreamArtifactBatch(
results := make(map[uuid.UUID][]models.Artifact, len(queryRows))
for _, queryRow := range queryRows {
results[queryRow.UpstreamID] = append(results[queryRow.UpstreamID], models.Artifact{
ID: queryRow.ID,
Name: queryRow.Name,
Description: queryRow.Description,
Type: queryRow.Type,
ID: queryRow.ID,
Name: queryRow.Name,
Description: queryRow.Description,
Type: queryRow.Type,
ShouldPersist: queryRow.ShouldPersist,
})
}

Expand Down Expand Up @@ -264,6 +269,7 @@ func (*artifactWriter) Create(
models.ArtifactName,
models.ArtifactDescription,
models.ArtifactType,
models.ArtifactShouldPersist,
}
query := DB.PrepareInsertWithReturnAllStmt(models.ArtifactTable, cols, models.ArtifactCols())

Expand All @@ -272,7 +278,13 @@ func (*artifactWriter) Create(
return nil, err
}

args := []interface{}{ID, name, description, artifactType}
args := []interface{}{
ID,
name,
description,
artifactType,
true, // should_persist
}
return getArtifact(ctx, DB, query, args...)
}

Expand Down
1 change: 1 addition & 0 deletions src/golang/lib/repos/sqlite/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ var mergedNodeViewSubQuery = fmt.Sprintf(`
operator_node.inputs AS inputs,
artifact_node.id AS artifact_id,
artifact_node.type AS type,
artifact_node.should_persist AS should_persist,
artifact_node.outputs AS outputs
FROM
operator_node LEFT JOIN
Expand Down
7 changes: 4 additions & 3 deletions src/golang/lib/repos/tests/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ func (ts *TestSuite) TestArtifact_Create() {
artifactType := randArtifactType()

expectedArtifact := &models.Artifact{
Name: name,
Description: description,
Type: artifactType,
Name: name,
Description: description,
Type: artifactType,
ShouldPersist: true,
}

actualArtifact, err := ts.artifact.Create(ts.ctx, name, description, artifactType, ts.DB)
Expand Down
2 changes: 1 addition & 1 deletion src/python/bin/aqueduct
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import yaml
from packaging.version import parse as parse_version
from tqdm import tqdm

SCHEMA_VERSION = "27"
SCHEMA_VERSION = "28"
CHUNK_SIZE = 4096

# Connector Package Version Bounds
Expand Down