diff --git a/src/golang/cmd/server/handler/create_table.go b/src/golang/cmd/server/handler/create_table.go index 6c14a631f..313569165 100644 --- a/src/golang/cmd/server/handler/create_table.go +++ b/src/golang/cmd/server/handler/create_table.go @@ -9,13 +9,13 @@ import ( "github.com/aqueducthq/aqueduct/cmd/server/routes" "github.com/aqueducthq/aqueduct/lib/collections/integration" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" "github.com/aqueducthq/aqueduct/lib/collections/shared" aq_context "github.com/aqueducthq/aqueduct/lib/context" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/job" "github.com/aqueducthq/aqueduct/lib/storage" "github.com/aqueducthq/aqueduct/lib/vault" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth" "github.com/aqueducthq/aqueduct/lib/workflow/utils" "github.com/dropbox/godropbox/errors" diff --git a/src/golang/cmd/server/handler/discover.go b/src/golang/cmd/server/handler/discover.go index 18b45e017..18261ad03 100644 --- a/src/golang/cmd/server/handler/discover.go +++ b/src/golang/cmd/server/handler/discover.go @@ -9,12 +9,12 @@ import ( "github.com/aqueducthq/aqueduct/cmd/server/queries" "github.com/aqueducthq/aqueduct/cmd/server/routes" "github.com/aqueducthq/aqueduct/lib/collections/integration" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" "github.com/aqueducthq/aqueduct/lib/collections/shared" aq_context "github.com/aqueducthq/aqueduct/lib/context" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/job" "github.com/aqueducthq/aqueduct/lib/vault" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth" workflow_utils "github.com/aqueducthq/aqueduct/lib/workflow/utils" "github.com/dropbox/godropbox/errors" diff --git a/src/golang/cmd/server/handler/get_artifact_versions.go b/src/golang/cmd/server/handler/get_artifact_versions.go index 2a4446376..f3cc7a9e5 100644 --- a/src/golang/cmd/server/handler/get_artifact_versions.go +++ b/src/golang/cmd/server/handler/get_artifact_versions.go @@ -5,10 +5,10 @@ import ( "net/http" "github.com/aqueducthq/aqueduct/cmd/server/queries" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" "github.com/aqueducthq/aqueduct/lib/collections/shared" aq_context "github.com/aqueducthq/aqueduct/lib/context" "github.com/aqueducthq/aqueduct/lib/database" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) diff --git a/src/golang/cmd/server/handler/get_workflow.go b/src/golang/cmd/server/handler/get_workflow.go index aa1111585..16a8abdd4 100644 --- a/src/golang/cmd/server/handler/get_workflow.go +++ b/src/golang/cmd/server/handler/get_workflow.go @@ -40,7 +40,7 @@ type getWorkflowArgs struct { type getWorkflowResponse struct { // a map of workflow dags keyed by their IDs - WorkflowDags map[uuid.UUID]*workflow_dag.WorkflowDag `json:"workflow_dags"` + WorkflowDags map[uuid.UUID]*workflow_dag.DBWorkflowDag `json:"workflow_dags"` // a list of dag results. Each result's `workflow_dag_id` field correspond to the WorkflowDagResults []workflowDagResult `json:"workflow_dag_results"` // a list of auth0Ids associated with workflow watchers @@ -116,7 +116,7 @@ func (h *GetWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfac return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error occurred when retrieving workflow.") } - workflowDags := make(map[uuid.UUID]*workflow_dag.WorkflowDag, len(dbWorkflowDags)) + workflowDags := make(map[uuid.UUID]*workflow_dag.DBWorkflowDag, len(dbWorkflowDags)) for _, dbWorkflowDag := range dbWorkflowDags { constructedDag, err := workflow_utils.ReadWorkflowDagFromDatabase( ctx, diff --git a/src/golang/cmd/server/handler/preview.go b/src/golang/cmd/server/handler/preview.go index 132eb6d92..97c9393f9 100644 --- a/src/golang/cmd/server/handler/preview.go +++ b/src/golang/cmd/server/handler/preview.go @@ -226,7 +226,7 @@ func deserializeArtifactResponses( ctx context.Context, workflowStoragePaths *utils.WorkflowStoragePaths, storageConfig *shared.StorageConfig, - dagArtifacts map[uuid.UUID]artifact.Artifact, + dagArtifacts map[uuid.UUID]artifact.DBArtifact, artifactsToSkipFetch map[uuid.UUID]bool, ) (map[uuid.UUID]previewArtifactResponse, error) { responses := make(map[uuid.UUID]previewArtifactResponse, len(workflowStoragePaths.ArtifactPaths)) diff --git a/src/golang/cmd/server/handler/preview_table.go b/src/golang/cmd/server/handler/preview_table.go index 65a145856..39963b965 100644 --- a/src/golang/cmd/server/handler/preview_table.go +++ b/src/golang/cmd/server/handler/preview_table.go @@ -8,13 +8,13 @@ import ( "github.com/aqueducthq/aqueduct/cmd/server/routes" "github.com/aqueducthq/aqueduct/lib/collections/integration" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" "github.com/aqueducthq/aqueduct/lib/collections/shared" aq_context "github.com/aqueducthq/aqueduct/lib/context" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/job" "github.com/aqueducthq/aqueduct/lib/storage" "github.com/aqueducthq/aqueduct/lib/vault" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" "github.com/aqueducthq/aqueduct/lib/workflow/scheduler" workflow_utils "github.com/aqueducthq/aqueduct/lib/workflow/utils" "github.com/dropbox/godropbox/errors" diff --git a/src/golang/cmd/server/handler/register_workflow.go b/src/golang/cmd/server/handler/register_workflow.go index 58e1e382c..f451e8ba0 100644 --- a/src/golang/cmd/server/handler/register_workflow.go +++ b/src/golang/cmd/server/handler/register_workflow.go @@ -62,7 +62,7 @@ type RegisterWorkflowHandler struct { type registerWorkflowArgs struct { *aq_context.AqContext - workflowDag *workflow_dag.WorkflowDag + workflowDag *workflow_dag.DBWorkflowDag operatorIdToFileContents map[uuid.UUID][]byte // Whether this is a registering a new workflow or updating an existing one. diff --git a/src/golang/cmd/server/request/dag.go b/src/golang/cmd/server/request/dag.go index 80a99a152..ceac008b2 100644 --- a/src/golang/cmd/server/request/dag.go +++ b/src/golang/cmd/server/request/dag.go @@ -7,10 +7,10 @@ import ( "github.com/aqueducthq/aqueduct/cmd/server/routes" "github.com/aqueducthq/aqueduct/lib/collections/operator" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" "github.com/aqueducthq/aqueduct/lib/collections/shared" "github.com/aqueducthq/aqueduct/lib/collections/workflow_dag" "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/github" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) @@ -18,7 +18,7 @@ import ( const dagKey = "dag" type DagSummary struct { - Dag *workflow_dag.WorkflowDag + Dag *workflow_dag.DBWorkflowDag // Extract the operator contents from the request body FileContentsByOperatorUUID map[uuid.UUID][]byte @@ -40,7 +40,7 @@ func ParseDagSummaryFromRequest( return nil, http.StatusBadRequest, errors.Wrap(err, "Serialized dag object not available") } - var workflowDag workflow_dag.WorkflowDag + var workflowDag workflow_dag.DBWorkflowDag err = json.Unmarshal(serializedDAGBytes, &workflowDag) if err != nil { return nil, http.StatusBadRequest, errors.Wrap(err, "Invalid dag specification.") @@ -81,7 +81,7 @@ func ParseDagSummaryFromRequest( // For github contents, retrieve zipball for files and update string contents like sql queries. func extractOperatorContentsFromRequest( r *http.Request, - op *operator.Operator, + op *operator.DBOperator, ghClient github.Client, ) ([]byte, int, error) { if op.Spec.IsExtract() { diff --git a/src/golang/lib/collections/artifact/artifact.go b/src/golang/lib/collections/artifact/artifact.go index 83a3a67c8..41718ab5d 100644 --- a/src/golang/lib/collections/artifact/artifact.go +++ b/src/golang/lib/collections/artifact/artifact.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" ) -type Artifact struct { +type DBArtifact struct { Id uuid.UUID `db:"id" json:"id"` Name string `db:"name" json:"name"` Description string `db:"description" json:"description"` @@ -16,13 +16,13 @@ type Artifact struct { type Reader interface { Exists(ctx context.Context, id uuid.UUID, db database.Database) (bool, error) - GetArtifact(ctx context.Context, id uuid.UUID, db database.Database) (*Artifact, error) - GetArtifacts(ctx context.Context, ids []uuid.UUID, db database.Database) ([]Artifact, error) + GetArtifact(ctx context.Context, id uuid.UUID, db database.Database) (*DBArtifact, error) + GetArtifacts(ctx context.Context, ids []uuid.UUID, db database.Database) ([]DBArtifact, error) GetArtifactsByWorkflowDagId( ctx context.Context, workflowDagId uuid.UUID, db database.Database, - ) ([]Artifact, error) + ) ([]DBArtifact, error) ValidateArtifactOwnership( ctx context.Context, organizationId string, @@ -38,13 +38,13 @@ type Writer interface { description string, spec *Spec, db database.Database, - ) (*Artifact, error) + ) (*DBArtifact, error) UpdateArtifact( ctx context.Context, id uuid.UUID, changes map[string]interface{}, db database.Database, - ) (*Artifact, error) + ) (*DBArtifact, error) DeleteArtifact(ctx context.Context, id uuid.UUID, db database.Database) error DeleteArtifacts(ctx context.Context, ids []uuid.UUID, db database.Database) error } diff --git a/src/golang/lib/collections/artifact/artifact_test.go b/src/golang/lib/collections/artifact/artifact_test.go index 346d27349..9c97c94c5 100644 --- a/src/golang/lib/collections/artifact/artifact_test.go +++ b/src/golang/lib/collections/artifact/artifact_test.go @@ -2,10 +2,10 @@ package artifact_test import ( "encoding/json" + "github.com/aqueducthq/aqueduct/lib/collections/artifact/table" "testing" "github.com/aqueducthq/aqueduct/lib/collections/artifact" - "github.com/aqueducthq/aqueduct/lib/workflow/artifact/table" "github.com/google/uuid" "github.com/stretchr/testify/require" ) @@ -13,7 +13,7 @@ import ( func TestSerializingAndDeserializingArtifact(t *testing.T) { id := uuid.New() - atf := artifact.Artifact{ + atf := artifact.DBArtifact{ Id: id, Name: "test", Spec: *artifact.NewSpecFromTable( @@ -24,7 +24,7 @@ func TestSerializingAndDeserializingArtifact(t *testing.T) { rawAtf, err := json.Marshal(atf) require.Nil(t, err) - var reconstructedAtf artifact.Artifact + var reconstructedAtf artifact.DBArtifact err = json.Unmarshal(rawAtf, &reconstructedAtf) require.Nil(t, err) require.True(t, reconstructedAtf.Spec.IsTable()) diff --git a/src/golang/lib/workflow/artifact/boolean/bool.go b/src/golang/lib/collections/artifact/boolean/bool.go similarity index 100% rename from src/golang/lib/workflow/artifact/boolean/bool.go rename to src/golang/lib/collections/artifact/boolean/bool.go diff --git a/src/golang/lib/workflow/artifact/float/float.go b/src/golang/lib/collections/artifact/float/float.go similarity index 100% rename from src/golang/lib/workflow/artifact/float/float.go rename to src/golang/lib/collections/artifact/float/float.go diff --git a/src/golang/lib/workflow/artifact/jsonable/json.go b/src/golang/lib/collections/artifact/jsonable/json.go similarity index 100% rename from src/golang/lib/workflow/artifact/jsonable/json.go rename to src/golang/lib/collections/artifact/jsonable/json.go diff --git a/src/golang/lib/collections/artifact/sqlite.go b/src/golang/lib/collections/artifact/sqlite.go index 77aeae799..7ad8f6c5e 100644 --- a/src/golang/lib/collections/artifact/sqlite.go +++ b/src/golang/lib/collections/artifact/sqlite.go @@ -29,7 +29,7 @@ func (w *sqliteWriterImpl) CreateArtifact( description string, spec *Spec, db database.Database, -) (*Artifact, error) { +) (*DBArtifact, error) { insertColumns := []string{IdColumn, NameColumn, DescriptionColumn, SpecColumn} insertArtifactStmt := db.PrepareInsertWithReturnAllStmt(tableName, insertColumns, allColumns()) @@ -40,7 +40,7 @@ func (w *sqliteWriterImpl) CreateArtifact( args := []interface{}{id, name, description, spec} - var artifact Artifact + var artifact DBArtifact err = db.Query(ctx, &artifact, insertArtifactStmt, args...) return &artifact, err } diff --git a/src/golang/lib/collections/artifact/standard.go b/src/golang/lib/collections/artifact/standard.go index 9000c0de5..a98c5b0c8 100644 --- a/src/golang/lib/collections/artifact/standard.go +++ b/src/golang/lib/collections/artifact/standard.go @@ -22,13 +22,13 @@ func (w *standardWriterImpl) CreateArtifact( description string, spec *Spec, db database.Database, -) (*Artifact, error) { +) (*DBArtifact, error) { insertColumns := []string{NameColumn, DescriptionColumn, SpecColumn} insertArtifactStmt := db.PrepareInsertWithReturnAllStmt(tableName, insertColumns, allColumns()) args := []interface{}{name, description, spec} - var artifact Artifact + var artifact DBArtifact err := db.Query(ctx, &artifact, insertArtifactStmt, args...) return &artifact, err } @@ -41,7 +41,7 @@ func (r *standardReaderImpl) GetArtifact( ctx context.Context, id uuid.UUID, db database.Database, -) (*Artifact, error) { +) (*DBArtifact, error) { artifacts, err := r.GetArtifacts(ctx, []uuid.UUID{id}, db) if err != nil { return nil, err @@ -58,7 +58,7 @@ func (r *standardReaderImpl) GetArtifacts( ctx context.Context, ids []uuid.UUID, db database.Database, -) ([]Artifact, error) { +) ([]DBArtifact, error) { if len(ids) == 0 { return nil, errors.New("Provided empty IDs list.") } @@ -71,7 +71,7 @@ func (r *standardReaderImpl) GetArtifacts( args := stmt_preparers.CastIdsListToInterfaceList(ids) - var artifacts []Artifact + var artifacts []DBArtifact err := db.Query(ctx, &artifacts, getArtifactsQuery, args...) return artifacts, err } @@ -80,7 +80,7 @@ func (r *standardReaderImpl) GetArtifactsByWorkflowDagId( ctx context.Context, workflowDagId uuid.UUID, db database.Database, -) ([]Artifact, error) { +) ([]DBArtifact, error) { getArtifactsByWorkflowDagIdQuery := fmt.Sprintf( `SELECT %s FROM artifact WHERE id IN (SELECT from_id FROM workflow_dag_edge WHERE workflow_dag_id = $1 AND type = '%s' @@ -91,7 +91,7 @@ func (r *standardReaderImpl) GetArtifactsByWorkflowDagId( workflow_dag_edge.OperatorToArtifactType, ) - var artifacts []Artifact + var artifacts []DBArtifact err := db.Query(ctx, &artifacts, getArtifactsByWorkflowDagIdQuery, workflowDagId) return artifacts, err } @@ -101,8 +101,8 @@ func (w *standardWriterImpl) UpdateArtifact( id uuid.UUID, changes map[string]interface{}, db database.Database, -) (*Artifact, error) { - var artifact Artifact +) (*DBArtifact, error) { + var artifact DBArtifact err := utils.UpdateRecordToDest(ctx, &artifact, changes, tableName, IdColumn, id, allColumns(), db) return &artifact, err } diff --git a/src/golang/lib/workflow/artifact/table/table.go b/src/golang/lib/collections/artifact/table/table.go similarity index 100% rename from src/golang/lib/workflow/artifact/table/table.go rename to src/golang/lib/collections/artifact/table/table.go diff --git a/src/golang/lib/collections/artifact/types.go b/src/golang/lib/collections/artifact/types.go index 7f2f55139..d474cc6fb 100644 --- a/src/golang/lib/collections/artifact/types.go +++ b/src/golang/lib/collections/artifact/types.go @@ -4,11 +4,11 @@ import ( "database/sql/driver" "encoding/json" + "github.com/aqueducthq/aqueduct/lib/collections/artifact/boolean" + "github.com/aqueducthq/aqueduct/lib/collections/artifact/float" + "github.com/aqueducthq/aqueduct/lib/collections/artifact/jsonable" + "github.com/aqueducthq/aqueduct/lib/collections/artifact/table" "github.com/aqueducthq/aqueduct/lib/collections/utils" - "github.com/aqueducthq/aqueduct/lib/workflow/artifact/boolean" - "github.com/aqueducthq/aqueduct/lib/workflow/artifact/float" - "github.com/aqueducthq/aqueduct/lib/workflow/artifact/jsonable" - "github.com/aqueducthq/aqueduct/lib/workflow/artifact/table" "github.com/dropbox/godropbox/errors" ) diff --git a/src/golang/lib/workflow/operator/check/check.go b/src/golang/lib/collections/operator/check/check.go similarity index 77% rename from src/golang/lib/workflow/operator/check/check.go rename to src/golang/lib/collections/operator/check/check.go index 9d63b5704..d9a7b9f29 100644 --- a/src/golang/lib/workflow/operator/check/check.go +++ b/src/golang/lib/collections/operator/check/check.go @@ -1,7 +1,7 @@ package check import ( - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" ) type Level string diff --git a/src/golang/lib/workflow/operator/connector/extract.go b/src/golang/lib/collections/operator/connector/extract.go similarity index 100% rename from src/golang/lib/workflow/operator/connector/extract.go rename to src/golang/lib/collections/operator/connector/extract.go diff --git a/src/golang/lib/workflow/operator/connector/extract_params.go b/src/golang/lib/collections/operator/connector/extract_params.go similarity index 96% rename from src/golang/lib/workflow/operator/connector/extract_params.go rename to src/golang/lib/collections/operator/connector/extract_params.go index 5fbf9e556..266feab3b 100644 --- a/src/golang/lib/workflow/operator/connector/extract_params.go +++ b/src/golang/lib/collections/operator/connector/extract_params.go @@ -1,6 +1,8 @@ package connector -import gh_types "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/github/types" +import ( + gh_types "github.com/aqueducthq/aqueduct/lib/collections/operator/connector/github" +) type ExtractParams interface { isExtractParams() diff --git a/src/golang/lib/workflow/operator/connector/extract_test.go b/src/golang/lib/collections/operator/connector/extract_test.go similarity index 100% rename from src/golang/lib/workflow/operator/connector/extract_test.go rename to src/golang/lib/collections/operator/connector/extract_test.go diff --git a/src/golang/lib/workflow/operator/connector/github/types/types.go b/src/golang/lib/collections/operator/connector/github/types.go similarity index 99% rename from src/golang/lib/workflow/operator/connector/github/types/types.go rename to src/golang/lib/collections/operator/connector/github/types.go index 355932b19..dcc00f9c5 100644 --- a/src/golang/lib/workflow/operator/connector/github/types/types.go +++ b/src/golang/lib/collections/operator/connector/github/types.go @@ -1,4 +1,4 @@ -package types +package github import ( "fmt" diff --git a/src/golang/lib/workflow/operator/connector/load.go b/src/golang/lib/collections/operator/connector/load.go similarity index 100% rename from src/golang/lib/workflow/operator/connector/load.go rename to src/golang/lib/collections/operator/connector/load.go diff --git a/src/golang/lib/workflow/operator/connector/load_params.go b/src/golang/lib/collections/operator/connector/load_params.go similarity index 100% rename from src/golang/lib/workflow/operator/connector/load_params.go rename to src/golang/lib/collections/operator/connector/load_params.go diff --git a/src/golang/lib/workflow/operator/connector/load_test.go b/src/golang/lib/collections/operator/connector/load_test.go similarity index 100% rename from src/golang/lib/workflow/operator/connector/load_test.go rename to src/golang/lib/collections/operator/connector/load_test.go diff --git a/src/golang/lib/workflow/operator/function/function.go b/src/golang/lib/collections/operator/function/function.go similarity index 91% rename from src/golang/lib/workflow/operator/function/function.go rename to src/golang/lib/collections/operator/function/function.go index 6d6240914..89b285409 100644 --- a/src/golang/lib/workflow/operator/function/function.go +++ b/src/golang/lib/collections/operator/function/function.go @@ -1,7 +1,7 @@ package function import ( - github "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/github/types" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector/github" ) type Function struct { diff --git a/src/golang/lib/workflow/operator/metric/metric.go b/src/golang/lib/collections/operator/metric/metric.go similarity index 58% rename from src/golang/lib/workflow/operator/metric/metric.go rename to src/golang/lib/collections/operator/metric/metric.go index 7b563110b..b7cb90448 100644 --- a/src/golang/lib/workflow/operator/metric/metric.go +++ b/src/golang/lib/collections/operator/metric/metric.go @@ -1,7 +1,7 @@ package metric import ( - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" ) type Metric struct { diff --git a/src/golang/lib/collections/operator/operator.go b/src/golang/lib/collections/operator/operator.go index 02635935f..46a9f7bf2 100644 --- a/src/golang/lib/collections/operator/operator.go +++ b/src/golang/lib/collections/operator/operator.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" ) -type Operator struct { +type DBOperator struct { Id uuid.UUID `db:"id" json:"id"` Name string `db:"name" json:"name"` Description string `db:"description" json:"description"` @@ -20,13 +20,13 @@ type Operator struct { type Reader interface { Exists(ctx context.Context, id uuid.UUID, db database.Database) (bool, error) - GetOperator(ctx context.Context, id uuid.UUID, db database.Database) (*Operator, error) - GetOperators(ctx context.Context, ids []uuid.UUID, db database.Database) ([]Operator, error) + GetOperator(ctx context.Context, id uuid.UUID, db database.Database) (*DBOperator, error) + GetOperators(ctx context.Context, ids []uuid.UUID, db database.Database) ([]DBOperator, error) GetOperatorsByWorkflowDagId( ctx context.Context, workflowDagId uuid.UUID, db database.Database, - ) ([]Operator, error) + ) ([]DBOperator, error) ValidateOperatorOwnership( ctx context.Context, organizationId string, @@ -42,13 +42,13 @@ type Writer interface { description string, spec *Spec, db database.Database, - ) (*Operator, error) + ) (*DBOperator, error) UpdateOperator( ctx context.Context, id uuid.UUID, changes map[string]interface{}, db database.Database, - ) (*Operator, error) + ) (*DBOperator, error) DeleteOperator(ctx context.Context, id uuid.UUID, db database.Database) error DeleteOperators(ctx context.Context, ids []uuid.UUID, db database.Database) error } diff --git a/src/golang/lib/collections/operator/operator_test.go b/src/golang/lib/collections/operator/operator_test.go index 01b62263c..f43e8b3ce 100644 --- a/src/golang/lib/collections/operator/operator_test.go +++ b/src/golang/lib/collections/operator/operator_test.go @@ -2,10 +2,10 @@ package operator_test import ( "encoding/json" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" "testing" "github.com/aqueducthq/aqueduct/lib/collections/operator" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" "github.com/google/uuid" "github.com/stretchr/testify/require" ) @@ -13,7 +13,7 @@ import ( func TestSerializingAndDeserializingOperator(t *testing.T) { id := uuid.New() - op := operator.Operator{ + op := operator.DBOperator{ Id: id, Name: "test", Spec: *operator.NewSpecFromFunction( @@ -28,7 +28,7 @@ func TestSerializingAndDeserializingOperator(t *testing.T) { rawOp, err := json.Marshal(op) require.Nil(t, err) - var reconstructedOp operator.Operator + var reconstructedOp operator.DBOperator err = json.Unmarshal(rawOp, &reconstructedOp) require.Nil(t, err) require.True(t, reconstructedOp.Spec.IsFunction()) diff --git a/src/golang/lib/workflow/operator/param/param.go b/src/golang/lib/collections/operator/param/param.go similarity index 100% rename from src/golang/lib/workflow/operator/param/param.go rename to src/golang/lib/collections/operator/param/param.go diff --git a/src/golang/lib/collections/operator/sqlite.go b/src/golang/lib/collections/operator/sqlite.go index ad55509a2..c9743908e 100644 --- a/src/golang/lib/collections/operator/sqlite.go +++ b/src/golang/lib/collections/operator/sqlite.go @@ -29,7 +29,7 @@ func (w *sqliteWriterImpl) CreateOperator( description string, spec *Spec, db database.Database, -) (*Operator, error) { +) (*DBOperator, error) { insertColumns := []string{IdColumn, NameColumn, DescriptionColumn, SpecColumn} insertOperatorStmt := db.PrepareInsertWithReturnAllStmt(tableName, insertColumns, allColumns()) @@ -40,7 +40,7 @@ func (w *sqliteWriterImpl) CreateOperator( args := []interface{}{id, name, description, spec} - var operator Operator + var operator DBOperator err = db.Query(ctx, &operator, insertOperatorStmt, args...) return &operator, err } diff --git a/src/golang/lib/collections/operator/standard.go b/src/golang/lib/collections/operator/standard.go index e36e2b404..b4d615c4d 100644 --- a/src/golang/lib/collections/operator/standard.go +++ b/src/golang/lib/collections/operator/standard.go @@ -22,13 +22,13 @@ func (w *standardWriterImpl) CreateOperator( description string, spec *Spec, db database.Database, -) (*Operator, error) { +) (*DBOperator, error) { insertColumns := []string{NameColumn, DescriptionColumn, SpecColumn} insertOperatorStmt := db.PrepareInsertWithReturnAllStmt(tableName, insertColumns, allColumns()) args := []interface{}{name, description, spec} - var operator Operator + var operator DBOperator err := db.Query(ctx, &operator, insertOperatorStmt, args...) return &operator, err } @@ -45,7 +45,7 @@ func (r *standardReaderImpl) GetOperator( ctx context.Context, id uuid.UUID, db database.Database, -) (*Operator, error) { +) (*DBOperator, error) { operators, err := r.GetOperators(ctx, []uuid.UUID{id}, db) if err != nil { return nil, err @@ -62,7 +62,7 @@ func (r *standardReaderImpl) GetOperators( ctx context.Context, ids []uuid.UUID, db database.Database, -) ([]Operator, error) { +) ([]DBOperator, error) { if len(ids) == 0 { return nil, errors.New("Provided empty IDs list.") } @@ -75,7 +75,7 @@ func (r *standardReaderImpl) GetOperators( args := stmt_preparers.CastIdsListToInterfaceList(ids) - var operators []Operator + var operators []DBOperator err := db.Query(ctx, &operators, getOperatorsQuery, args...) return operators, err } @@ -84,7 +84,7 @@ func (r *standardReaderImpl) GetOperatorsByWorkflowDagId( ctx context.Context, workflowDagId uuid.UUID, db database.Database, -) ([]Operator, error) { +) ([]DBOperator, error) { getOperatorsByWorkflowDagIdQuery := fmt.Sprintf( `SELECT %s FROM operator WHERE id IN (SELECT from_id FROM workflow_dag_edge WHERE workflow_dag_id = $1 AND type = '%s' @@ -95,7 +95,7 @@ func (r *standardReaderImpl) GetOperatorsByWorkflowDagId( workflow_dag_edge.ArtifactToOperatorType, ) - var operators []Operator + var operators []DBOperator err := db.Query(ctx, &operators, getOperatorsByWorkflowDagIdQuery, workflowDagId) return operators, err } @@ -105,8 +105,8 @@ func (w *standardWriterImpl) UpdateOperator( id uuid.UUID, changes map[string]interface{}, db database.Database, -) (*Operator, error) { - var operator Operator +) (*DBOperator, error) { + var operator DBOperator err := utils.UpdateRecordToDest(ctx, &operator, changes, tableName, IdColumn, id, allColumns(), db) return &operator, err } diff --git a/src/golang/lib/workflow/operator/system_metric/system_metric.go b/src/golang/lib/collections/operator/system_metric/system_metric.go similarity index 100% rename from src/golang/lib/workflow/operator/system_metric/system_metric.go rename to src/golang/lib/collections/operator/system_metric/system_metric.go diff --git a/src/golang/lib/collections/operator/types.go b/src/golang/lib/collections/operator/types.go index 4f34c18bd..79373d39c 100644 --- a/src/golang/lib/collections/operator/types.go +++ b/src/golang/lib/collections/operator/types.go @@ -5,13 +5,13 @@ import ( "database/sql/driver" "encoding/json" + "github.com/aqueducthq/aqueduct/lib/collections/operator/check" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" + "github.com/aqueducthq/aqueduct/lib/collections/operator/metric" + "github.com/aqueducthq/aqueduct/lib/collections/operator/param" + "github.com/aqueducthq/aqueduct/lib/collections/operator/system_metric" "github.com/aqueducthq/aqueduct/lib/collections/utils" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/check" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/metric" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/param" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/system_metric" "github.com/dropbox/godropbox/errors" ) diff --git a/src/golang/lib/collections/tests/operator_test.go b/src/golang/lib/collections/tests/operator_test.go index 9e1e7a7cf..1cc3fc661 100644 --- a/src/golang/lib/collections/tests/operator_test.go +++ b/src/golang/lib/collections/tests/operator_test.go @@ -2,17 +2,17 @@ package tests import ( "context" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" "testing" "github.com/aqueducthq/aqueduct/lib/collections/integration" "github.com/aqueducthq/aqueduct/lib/collections/operator" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" "github.com/google/uuid" "github.com/stretchr/testify/require" ) -func seedOperator(t *testing.T, count int) []operator.Operator { +func seedOperator(t *testing.T, count int) []operator.DBOperator { specs := make([]operator.Spec, 0, count) for i := 0; i < count; i++ { @@ -31,10 +31,10 @@ func seedOperator(t *testing.T, count int) []operator.Operator { // seedOperatorWithSpecs populates the operator table with count operators // using the specs provided. -func seedOperatorWithSpecs(t *testing.T, count int, specs []operator.Spec) []operator.Operator { +func seedOperatorWithSpecs(t *testing.T, count int, specs []operator.Spec) []operator.DBOperator { require.Len(t, specs, count) - operators := make([]operator.Operator, 0, count) + operators := make([]operator.DBOperator, 0, count) for i := 0; i < count; i++ { testOperator, err := writers.operatorWriter.CreateOperator( @@ -59,7 +59,7 @@ func TestCreateOperator(t *testing.T) { integrations := seedIntegration(t, 1) - expectedOperator := &operator.Operator{ + expectedOperator := &operator.DBOperator{ Name: "test-operator", Description: "testing op", Spec: *operator.NewSpecFromExtract(connector.Extract{ diff --git a/src/golang/lib/collections/tests/server_custom_test.go b/src/golang/lib/collections/tests/server_custom_test.go index 7597ded17..95f5fbc25 100644 --- a/src/golang/lib/collections/tests/server_custom_test.go +++ b/src/golang/lib/collections/tests/server_custom_test.go @@ -2,14 +2,14 @@ package tests import ( "context" + "github.com/aqueducthq/aqueduct/lib/collections/artifact/table" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" "testing" "github.com/aqueducthq/aqueduct/cmd/server/queries" "github.com/aqueducthq/aqueduct/lib/collections/artifact" "github.com/aqueducthq/aqueduct/lib/collections/integration" "github.com/aqueducthq/aqueduct/lib/collections/operator" - "github.com/aqueducthq/aqueduct/lib/workflow/artifact/table" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" "github.com/google/uuid" "github.com/stretchr/testify/require" ) diff --git a/src/golang/lib/collections/tests/utils.go b/src/golang/lib/collections/tests/utils.go index 8196bddeb..e60ae604c 100644 --- a/src/golang/lib/collections/tests/utils.go +++ b/src/golang/lib/collections/tests/utils.go @@ -66,7 +66,7 @@ func randWorkflowIdsFromList(n int, workflows []workflow.Workflow) []uuid.UUID { // randWorkflowDagIdsFromList randomly polls workflowDags n times and returns // the workflowDagIds selected. -func randWorkflowDagIdsFromList(n int, workflowDags []workflow_dag.WorkflowDag) []uuid.UUID { +func randWorkflowDagIdsFromList(n int, workflowDags []workflow_dag.DBWorkflowDag) []uuid.UUID { workflowDagIds := make([]uuid.UUID, 0, len(workflowDags)) for _, workflowDagObj := range workflowDags { workflowDagIds = append(workflowDagIds, workflowDagObj.Id) diff --git a/src/golang/lib/collections/tests/workflow_dag_test.go b/src/golang/lib/collections/tests/workflow_dag_test.go index 636a3c779..f536c9734 100644 --- a/src/golang/lib/collections/tests/workflow_dag_test.go +++ b/src/golang/lib/collections/tests/workflow_dag_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" ) -func seedWorkflowDag(t *testing.T, count int) []workflow_dag.WorkflowDag { +func seedWorkflowDag(t *testing.T, count int) []workflow_dag.DBWorkflowDag { numWorkflows := 2 workflows := seedWorkflow(t, numWorkflows) @@ -21,10 +21,10 @@ func seedWorkflowDag(t *testing.T, count int) []workflow_dag.WorkflowDag { // seedWorkflowDagWithWorkflows populates the workflow_dag table with count workflow dags where // workflow_id is set to the values provided in workflowIds. -func seedWorkflowDagWithWorkflows(t *testing.T, count int, workflowIds []uuid.UUID) []workflow_dag.WorkflowDag { +func seedWorkflowDagWithWorkflows(t *testing.T, count int, workflowIds []uuid.UUID) []workflow_dag.DBWorkflowDag { require.Len(t, workflowIds, count) - workflowDags := make([]workflow_dag.WorkflowDag, 0, count) + workflowDags := make([]workflow_dag.DBWorkflowDag, 0, count) for i := 0; i < count; i++ { testWorkflowDag, err := writers.workflowDagWriter.CreateWorkflowDag( @@ -49,7 +49,7 @@ func seedWorkflowDagWithWorkflows(t *testing.T, count int, workflowIds []uuid.UU return workflowDags } -func requireEqualWorkflowDags(t *testing.T, expected, actual []workflow_dag.WorkflowDag) { +func requireEqualWorkflowDags(t *testing.T, expected, actual []workflow_dag.DBWorkflowDag) { require.Equal(t, len(expected), len(actual)) for _, expectedWorkflowDag := range expected { @@ -65,7 +65,7 @@ func requireEqualWorkflowDags(t *testing.T, expected, actual []workflow_dag.Work } // idsFromWorkflowDags returns the ids from the workflow dags provided. -func idsFromWorkflowDags(workflowDags []workflow_dag.WorkflowDag) []uuid.UUID { +func idsFromWorkflowDags(workflowDags []workflow_dag.DBWorkflowDag) []uuid.UUID { ids := make([]uuid.UUID, 0, len(workflowDags)) for _, workflowDag := range workflowDags { ids = append(ids, workflowDag.Id) @@ -78,7 +78,7 @@ func TestCreateWorkflowDag(t *testing.T) { workflows := seedWorkflow(t, 1) - expectedWorkflowDag := &workflow_dag.WorkflowDag{ + expectedWorkflowDag := &workflow_dag.DBWorkflowDag{ WorkflowId: workflows[0].Id, StorageConfig: shared.StorageConfig{ Type: shared.S3StorageType, diff --git a/src/golang/lib/collections/workflow_dag/sqlite.go b/src/golang/lib/collections/workflow_dag/sqlite.go index df12f1a80..154f44a52 100644 --- a/src/golang/lib/collections/workflow_dag/sqlite.go +++ b/src/golang/lib/collections/workflow_dag/sqlite.go @@ -31,7 +31,7 @@ func (w *sqliteWriterImpl) CreateWorkflowDag( workflowId uuid.UUID, storageConfig *shared.StorageConfig, db database.Database, -) (*WorkflowDag, error) { +) (*DBWorkflowDag, error) { insertColumns := []string{IdColumn, WorkflowIdColumn, CreatedAtColumn, StorageConfigColumn} insertWorkflowDagStmt := db.PrepareInsertWithReturnAllStmt(tableName, insertColumns, allColumns()) @@ -42,7 +42,7 @@ func (w *sqliteWriterImpl) CreateWorkflowDag( args := []interface{}{id, workflowId, time.Now(), storageConfig} - var workflowDag WorkflowDag + var workflowDag DBWorkflowDag err = db.Query(ctx, &workflowDag, insertWorkflowDagStmt, args...) return &workflowDag, err } diff --git a/src/golang/lib/collections/workflow_dag/standard.go b/src/golang/lib/collections/workflow_dag/standard.go index 5dcb71160..ccb49410d 100644 --- a/src/golang/lib/collections/workflow_dag/standard.go +++ b/src/golang/lib/collections/workflow_dag/standard.go @@ -23,13 +23,13 @@ func (w *standardWriterImpl) CreateWorkflowDag( workflowId uuid.UUID, storageConfig *shared.StorageConfig, db database.Database, -) (*WorkflowDag, error) { +) (*DBWorkflowDag, error) { insertColumns := []string{WorkflowIdColumn, CreatedAtColumn, StorageConfigColumn} insertWorkflowDagStmt := db.PrepareInsertWithReturnAllStmt(tableName, insertColumns, allColumns()) args := []interface{}{workflowId, time.Now(), storageConfig} - var workflowDag WorkflowDag + var workflowDag DBWorkflowDag err := db.Query(ctx, &workflowDag, insertWorkflowDagStmt, args...) return &workflowDag, err } @@ -38,7 +38,7 @@ func (r *standardReaderImpl) GetWorkflowDag( ctx context.Context, id uuid.UUID, db database.Database, -) (*WorkflowDag, error) { +) (*DBWorkflowDag, error) { workflowDags, err := r.GetWorkflowDags(ctx, []uuid.UUID{id}, db) if err != nil { return nil, err @@ -55,7 +55,7 @@ func (r *standardReaderImpl) GetWorkflowDags( ctx context.Context, ids []uuid.UUID, db database.Database, -) ([]WorkflowDag, error) { +) ([]DBWorkflowDag, error) { if len(ids) == 0 { return nil, errors.New("Provided empty IDs list.") } @@ -68,7 +68,7 @@ func (r *standardReaderImpl) GetWorkflowDags( args := stmt_preparers.CastIdsListToInterfaceList(ids) - var workflowDags []WorkflowDag + var workflowDags []DBWorkflowDag err := db.Query(ctx, &workflowDags, getWorkflowDagsQuery, args...) return workflowDags, err } @@ -77,13 +77,13 @@ func (r *standardReaderImpl) GetWorkflowDagsByWorkflowId( ctx context.Context, workflowId uuid.UUID, db database.Database, -) ([]WorkflowDag, error) { +) ([]DBWorkflowDag, error) { query := fmt.Sprintf( "SELECT %s FROM workflow_dag WHERE workflow_id = $1;", allColumns(), ) - var workflowDags []WorkflowDag + var workflowDags []DBWorkflowDag err := db.Query(ctx, &workflowDags, query, workflowId) return workflowDags, err } @@ -92,13 +92,13 @@ func (r *standardReaderImpl) GetLatestWorkflowDag( ctx context.Context, workflowId uuid.UUID, db database.Database, -) (*WorkflowDag, error) { +) (*DBWorkflowDag, error) { getLatestWorkflowDagQuery := fmt.Sprintf( "SELECT %s FROM workflow_dag WHERE workflow_id = $1 ORDER BY created_at DESC LIMIT 1;", allColumns(), ) - var workflowDag WorkflowDag + var workflowDag DBWorkflowDag err := db.Query(ctx, &workflowDag, getLatestWorkflowDagQuery, workflowId) return &workflowDag, err } @@ -107,7 +107,7 @@ func (r *standardReaderImpl) GetWorkflowDagByWorkflowDagResultId( ctx context.Context, workflowDagResultId uuid.UUID, db database.Database, -) (*WorkflowDag, error) { +) (*DBWorkflowDag, error) { query := fmt.Sprintf(` SELECT %s FROM workflow_dag, workflow_dag_result WHERE workflow_dag.id = workflow_dag_result.workflow_dag_id @@ -115,7 +115,7 @@ func (r *standardReaderImpl) GetWorkflowDagByWorkflowDagResultId( allColumnsWithPrefix(), ) - var workflowDag WorkflowDag + var workflowDag DBWorkflowDag err := db.Query(ctx, &workflowDag, query, workflowDagResultId) return &workflowDag, err } @@ -124,7 +124,7 @@ func (r *standardReaderImpl) GetWorkflowDagsByOperatorId( ctx context.Context, operatorId uuid.UUID, db database.Database, -) ([]WorkflowDag, error) { +) ([]DBWorkflowDag, error) { query := fmt.Sprintf(` SELECT DISTINCT %s FROM workflow_dag, workflow_dag_edge WHERE workflow_dag_edge.workflow_dag_id = workflow_dag.id AND @@ -132,7 +132,7 @@ func (r *standardReaderImpl) GetWorkflowDagsByOperatorId( (workflow_dag_edge.type = '%s' AND workflow_dag_edge.to_id = $1));`, allColumnsWithPrefix(), workflow_dag_edge.OperatorToArtifactType, workflow_dag_edge.ArtifactToOperatorType) - var workflowDags []WorkflowDag + var workflowDags []DBWorkflowDag err := db.Query(ctx, &workflowDags, query, operatorId) return workflowDags, err } @@ -142,8 +142,8 @@ func (w *standardWriterImpl) UpdateWorkflowDag( id uuid.UUID, changes map[string]interface{}, db database.Database, -) (*WorkflowDag, error) { - var workflowDag WorkflowDag +) (*DBWorkflowDag, error) { + var workflowDag DBWorkflowDag err := utils.UpdateRecordToDest(ctx, &workflowDag, changes, tableName, IdColumn, id, allColumns(), db) return &workflowDag, err } diff --git a/src/golang/lib/collections/workflow_dag/workflow_dag.go b/src/golang/lib/collections/workflow_dag/workflow_dag.go index baf02868c..7b6b9dbb9 100644 --- a/src/golang/lib/collections/workflow_dag/workflow_dag.go +++ b/src/golang/lib/collections/workflow_dag/workflow_dag.go @@ -12,19 +12,19 @@ import ( "github.com/google/uuid" ) -type WorkflowDag struct { +type DBWorkflowDag struct { Id uuid.UUID `db:"id" json:"id"` WorkflowId uuid.UUID `db:"workflow_id" json:"workflow_id"` CreatedAt time.Time `db:"created_at" json:"created_at"` StorageConfig shared.StorageConfig `db:"storage_config" json:"storage_config"` /* Field not stored in DB */ - Metadata *workflow.Workflow `json:"metadata"` - Operators map[uuid.UUID]operator.Operator `json:"operators,omitempty"` - Artifacts map[uuid.UUID]artifact.Artifact `json:"artifacts,omitempty"` + Metadata *workflow.Workflow `json:"metadata"` + Operators map[uuid.UUID]operator.DBOperator `json:"operators,omitempty"` + Artifacts map[uuid.UUID]artifact.DBArtifact `json:"artifacts,omitempty"` } -func (dag *WorkflowDag) GetOperatorByName(name string) *operator.Operator { +func (dag *DBWorkflowDag) GetOperatorByName(name string) *operator.DBOperator { for _, op := range dag.Operators { if op.Name == name { return &op @@ -34,24 +34,24 @@ func (dag *WorkflowDag) GetOperatorByName(name string) *operator.Operator { } type Reader interface { - GetWorkflowDag(ctx context.Context, id uuid.UUID, db database.Database) (*WorkflowDag, error) - GetWorkflowDags(ctx context.Context, ids []uuid.UUID, db database.Database) ([]WorkflowDag, error) - GetLatestWorkflowDag(ctx context.Context, workflowId uuid.UUID, db database.Database) (*WorkflowDag, error) + GetWorkflowDag(ctx context.Context, id uuid.UUID, db database.Database) (*DBWorkflowDag, error) + GetWorkflowDags(ctx context.Context, ids []uuid.UUID, db database.Database) ([]DBWorkflowDag, error) + GetLatestWorkflowDag(ctx context.Context, workflowId uuid.UUID, db database.Database) (*DBWorkflowDag, error) GetWorkflowDagsByWorkflowId( ctx context.Context, workflowId uuid.UUID, db database.Database, - ) ([]WorkflowDag, error) + ) ([]DBWorkflowDag, error) GetWorkflowDagByWorkflowDagResultId( ctx context.Context, workflowDagResultId uuid.UUID, db database.Database, - ) (*WorkflowDag, error) + ) (*DBWorkflowDag, error) GetWorkflowDagsByOperatorId( ctx context.Context, operatorId uuid.UUID, db database.Database, - ) ([]WorkflowDag, error) + ) ([]DBWorkflowDag, error) } type Writer interface { @@ -60,13 +60,13 @@ type Writer interface { workflowId uuid.UUID, storageConfig *shared.StorageConfig, db database.Database, - ) (*WorkflowDag, error) + ) (*DBWorkflowDag, error) UpdateWorkflowDag( ctx context.Context, id uuid.UUID, changes map[string]interface{}, db database.Database, - ) (*WorkflowDag, error) + ) (*DBWorkflowDag, error) DeleteWorkflowDag(ctx context.Context, id uuid.UUID, db database.Database) error DeleteWorkflowDags(ctx context.Context, ids []uuid.UUID, db database.Database) error } diff --git a/src/golang/lib/job/spec.go b/src/golang/lib/job/spec.go index 66ee795a7..f44e48a9d 100644 --- a/src/golang/lib/job/spec.go +++ b/src/golang/lib/job/spec.go @@ -8,10 +8,10 @@ import ( "github.com/aqueducthq/aqueduct/lib/collections/artifact" "github.com/aqueducthq/aqueduct/lib/collections/integration" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" "github.com/aqueducthq/aqueduct/lib/collections/shared" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/vault" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth" "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/github" "github.com/dropbox/godropbox/errors" diff --git a/src/golang/lib/workflow/dag/validation.go b/src/golang/lib/workflow/dag/validation.go index 91a1e72e3..31fe45ee9 100644 --- a/src/golang/lib/workflow/dag/validation.go +++ b/src/golang/lib/workflow/dag/validation.go @@ -28,7 +28,7 @@ var ( ) func Validate( - dag *workflow_dag.WorkflowDag, + dag *workflow_dag.DBWorkflowDag, ) error { if len(dag.Operators) == 0 { return ErrNoOperator @@ -84,7 +84,7 @@ func Validate( func ValidateDagOperatorIntegrationOwnership( ctx context.Context, - operators map[uuid.UUID]operator.Operator, + operators map[uuid.UUID]operator.DBOperator, organizationId string, integrationReader integration.Reader, db database.Database, @@ -116,7 +116,7 @@ func ValidateDagOperatorIntegrationOwnership( return true, nil } -func checkUnexecutableOperator(dag *workflow_dag.WorkflowDag) error { +func checkUnexecutableOperator(dag *workflow_dag.DBWorkflowDag) error { numOperators := len(dag.Operators) operatorsExecuted := make(map[uuid.UUID]bool, numOperators) for operatorId := range dag.Operators { diff --git a/src/golang/lib/workflow/dag/validation_test.go b/src/golang/lib/workflow/dag/validation_test.go index 031522c20..22e0d1354 100644 --- a/src/golang/lib/workflow/dag/validation_test.go +++ b/src/golang/lib/workflow/dag/validation_test.go @@ -18,48 +18,48 @@ import ( // ^ // | // extract_1 -> artifact_1 -- -func generateBasicDag(t *testing.T) *workflow_dag.WorkflowDag { - artifactZero := artifact.Artifact{ +func generateBasicDag(t *testing.T) *workflow_dag.DBWorkflowDag { + artifactZero := artifact.DBArtifact{ Id: uuid.New(), } - artifactOne := artifact.Artifact{ + artifactOne := artifact.DBArtifact{ Id: uuid.New(), } - artifactTwo := artifact.Artifact{ + artifactTwo := artifact.DBArtifact{ Id: uuid.New(), } - extractZero := operator.Operator{ + extractZero := operator.DBOperator{ Id: uuid.New(), Outputs: []uuid.UUID{artifactZero.Id}, } - extractOne := operator.Operator{ + extractOne := operator.DBOperator{ Id: uuid.New(), Outputs: []uuid.UUID{artifactOne.Id}, } - functionZero := operator.Operator{ + functionZero := operator.DBOperator{ Id: uuid.New(), Inputs: []uuid.UUID{artifactZero.Id, artifactOne.Id}, Outputs: []uuid.UUID{artifactTwo.Id}, } - loadZero := operator.Operator{ + loadZero := operator.DBOperator{ Id: uuid.New(), Inputs: []uuid.UUID{artifactTwo.Id}, } - return &workflow_dag.WorkflowDag{ - Operators: map[uuid.UUID]operator.Operator{ + return &workflow_dag.DBWorkflowDag{ + Operators: map[uuid.UUID]operator.DBOperator{ extractZero.Id: extractZero, extractOne.Id: extractOne, functionZero.Id: functionZero, loadZero.Id: loadZero, }, - Artifacts: map[uuid.UUID]artifact.Artifact{ + Artifacts: map[uuid.UUID]artifact.DBArtifact{ artifactZero.Id: artifactZero, artifactOne.Id: artifactOne, artifactTwo.Id: artifactTwo, @@ -76,49 +76,49 @@ func generateBasicDag(t *testing.T) *workflow_dag.WorkflowDag { // | |-> extract_0 // cyclic // | // extract_1 -> artifact_1 -- -func generateCyclicDag(t *testing.T) *workflow_dag.WorkflowDag { - artifactZero := artifact.Artifact{ +func generateCyclicDag(t *testing.T) *workflow_dag.DBWorkflowDag { + artifactZero := artifact.DBArtifact{ Id: uuid.New(), } - artifactOne := artifact.Artifact{ + artifactOne := artifact.DBArtifact{ Id: uuid.New(), } - artifactTwo := artifact.Artifact{ + artifactTwo := artifact.DBArtifact{ Id: uuid.New(), } - extractZero := operator.Operator{ + extractZero := operator.DBOperator{ Id: uuid.New(), Inputs: []uuid.UUID{artifactTwo.Id}, Outputs: []uuid.UUID{artifactZero.Id}, } - extractOne := operator.Operator{ + extractOne := operator.DBOperator{ Id: uuid.New(), Outputs: []uuid.UUID{artifactOne.Id}, } - functionZero := operator.Operator{ + functionZero := operator.DBOperator{ Id: uuid.New(), Inputs: []uuid.UUID{artifactZero.Id, artifactOne.Id}, Outputs: []uuid.UUID{artifactTwo.Id}, } - loadZero := operator.Operator{ + loadZero := operator.DBOperator{ Id: uuid.New(), Inputs: []uuid.UUID{artifactTwo.Id}, } - return &workflow_dag.WorkflowDag{ - Operators: map[uuid.UUID]operator.Operator{ + return &workflow_dag.DBWorkflowDag{ + Operators: map[uuid.UUID]operator.DBOperator{ extractZero.Id: extractZero, extractOne.Id: extractOne, functionZero.Id: functionZero, loadZero.Id: loadZero, }, - Artifacts: map[uuid.UUID]artifact.Artifact{ + Artifacts: map[uuid.UUID]artifact.DBArtifact{ artifactZero.Id: artifactZero, artifactOne.Id: artifactOne, artifactTwo.Id: artifactTwo, @@ -128,68 +128,68 @@ func generateCyclicDag(t *testing.T) *workflow_dag.WorkflowDag { // This manually creates a DAG with an operator whose dependency is never going to be met: // artifact_0 -> validation_0 -func generateUnexecutableOperatorDag(t *testing.T) *workflow_dag.WorkflowDag { +func generateUnexecutableOperatorDag(t *testing.T) *workflow_dag.DBWorkflowDag { validationOpId := uuid.New() artifactId := uuid.New() - artifactObject := artifact.Artifact{ + artifactObject := artifact.DBArtifact{ Id: artifactId, } - validationOperator := operator.Operator{ + validationOperator := operator.DBOperator{ Id: validationOpId, Inputs: []uuid.UUID{artifactObject.Id}, } - return &workflow_dag.WorkflowDag{ - Operators: map[uuid.UUID]operator.Operator{validationOpId: validationOperator}, - Artifacts: map[uuid.UUID]artifact.Artifact{artifactId: artifactObject}, + return &workflow_dag.DBWorkflowDag{ + Operators: map[uuid.UUID]operator.DBOperator{validationOpId: validationOperator}, + Artifacts: map[uuid.UUID]artifact.DBArtifact{artifactId: artifactObject}, } } // This manually creates a DAG with no operator. -func generateEmptyDag(t *testing.T) *workflow_dag.WorkflowDag { - return &workflow_dag.WorkflowDag{ - Operators: map[uuid.UUID]operator.Operator{}, - Artifacts: map[uuid.UUID]artifact.Artifact{}, +func generateEmptyDag(t *testing.T) *workflow_dag.DBWorkflowDag { + return &workflow_dag.DBWorkflowDag{ + Operators: map[uuid.UUID]operator.DBOperator{}, + Artifacts: map[uuid.UUID]artifact.DBArtifact{}, } } // This manually creates a DAG with an unreachable artifract: // operator_0 -> artifact_0, artifact_1 -func generateUnreachableArtifactDag(t *testing.T) *workflow_dag.WorkflowDag { - artifactZero := artifact.Artifact{ +func generateUnreachableArtifactDag(t *testing.T) *workflow_dag.DBWorkflowDag { + artifactZero := artifact.DBArtifact{ Id: uuid.New(), } - artifactOne := artifact.Artifact{ + artifactOne := artifact.DBArtifact{ Id: uuid.New(), } - operatorZero := operator.Operator{ + operatorZero := operator.DBOperator{ Id: uuid.New(), Outputs: []uuid.UUID{artifactZero.Id}, } - return &workflow_dag.WorkflowDag{ - Operators: map[uuid.UUID]operator.Operator{operatorZero.Id: operatorZero}, - Artifacts: map[uuid.UUID]artifact.Artifact{artifactZero.Id: artifactZero, artifactOne.Id: artifactOne}, + return &workflow_dag.DBWorkflowDag{ + Operators: map[uuid.UUID]operator.DBOperator{operatorZero.Id: operatorZero}, + Artifacts: map[uuid.UUID]artifact.DBArtifact{artifactZero.Id: artifactZero, artifactOne.Id: artifactOne}, } } // This manually creates a DAG with an edge that contains an undefined artifact: // operator_0 -> artifact_0, artifact_0 not included in `dags.Artifacts` -func generateUndefinedArtifactDag(t *testing.T) *workflow_dag.WorkflowDag { +func generateUndefinedArtifactDag(t *testing.T) *workflow_dag.DBWorkflowDag { artifactId := uuid.New() - operatorZero := operator.Operator{ + operatorZero := operator.DBOperator{ Id: uuid.New(), Outputs: []uuid.UUID{artifactId}, } - return &workflow_dag.WorkflowDag{ - Operators: map[uuid.UUID]operator.Operator{operatorZero.Id: operatorZero}, - Artifacts: map[uuid.UUID]artifact.Artifact{}, + return &workflow_dag.DBWorkflowDag{ + Operators: map[uuid.UUID]operator.DBOperator{operatorZero.Id: operatorZero}, + Artifacts: map[uuid.UUID]artifact.DBArtifact{}, } } diff --git a/src/golang/lib/workflow/operator/connector/github/client.go b/src/golang/lib/workflow/operator/connector/github/client.go index 8c04a8c0e..a7f223ed4 100644 --- a/src/golang/lib/workflow/operator/connector/github/client.go +++ b/src/golang/lib/workflow/operator/connector/github/client.go @@ -3,8 +3,8 @@ package github import ( "context" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" ) type Client interface { diff --git a/src/golang/lib/workflow/operator/connector/github/unimplemented.go b/src/golang/lib/workflow/operator/connector/github/unimplemented.go index 58cdf210b..5d3060a5a 100644 --- a/src/golang/lib/workflow/operator/connector/github/unimplemented.go +++ b/src/golang/lib/workflow/operator/connector/github/unimplemented.go @@ -3,8 +3,8 @@ package github import ( "context" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) diff --git a/src/golang/lib/workflow/operator/connector/github/utils.go b/src/golang/lib/workflow/operator/connector/github/utils.go index 8f931dcd7..2d81c4f52 100644 --- a/src/golang/lib/workflow/operator/connector/github/utils.go +++ b/src/golang/lib/workflow/operator/connector/github/utils.go @@ -4,10 +4,10 @@ import ( "context" "github.com/aqueducthq/aqueduct/lib/collections/operator" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" "github.com/aqueducthq/aqueduct/lib/collections/shared" "github.com/aqueducthq/aqueduct/lib/storage" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) diff --git a/src/golang/lib/workflow/operator/upload.go b/src/golang/lib/workflow/operator/upload.go index dc3b22883..af7d25d83 100644 --- a/src/golang/lib/workflow/operator/upload.go +++ b/src/golang/lib/workflow/operator/upload.go @@ -15,7 +15,7 @@ import ( // It updates the relevant operator spec with the storage path. It returns an error, if any. func UploadOperatorFiles( ctx context.Context, - dag *workflow_dag.WorkflowDag, + dag *workflow_dag.DBWorkflowDag, operatorIdToFileContents map[uuid.UUID][]byte, ) ([]string, error) { paths := make([]string, 0, len(operatorIdToFileContents)) diff --git a/src/golang/lib/workflow/orchestrator/orchestrator.go b/src/golang/lib/workflow/orchestrator/orchestrator.go index 63a2210de..ff1cd4d75 100644 --- a/src/golang/lib/workflow/orchestrator/orchestrator.go +++ b/src/golang/lib/workflow/orchestrator/orchestrator.go @@ -37,7 +37,7 @@ var ( ) func initializeOrchestration( - operators map[uuid.UUID]operator.Operator, + operators map[uuid.UUID]operator.DBOperator, ready map[uuid.UUID]bool, operatorDependencies map[uuid.UUID]map[uuid.UUID]bool, artifactToDownstreamOperatorIds map[uuid.UUID][]uuid.UUID, @@ -75,7 +75,7 @@ func initializeOrchestration( // any internal system error occurred during the execution. func updateCompletedOp( ctx context.Context, - operators map[uuid.UUID]operator.Operator, + operators map[uuid.UUID]operator.DBOperator, ready map[uuid.UUID]bool, active map[uuid.UUID]bool, operatorDependencies map[uuid.UUID]map[uuid.UUID]bool, @@ -165,8 +165,8 @@ func updateCompletedOp( func scheduleOperators( ctx context.Context, - operators map[uuid.UUID]operator.Operator, - artifacts map[uuid.UUID]artifact.Artifact, + operators map[uuid.UUID]operator.DBOperator, + artifacts map[uuid.UUID]artifact.DBArtifact, ready map[uuid.UUID]bool, active map[uuid.UUID]bool, operatorIdToJobId map[uuid.UUID]string, @@ -188,7 +188,7 @@ func scheduleOperators( return ErrInvalidOpId } - inputArtifacts := make([]artifact.Artifact, 0, len(op.Inputs)) + inputArtifacts := make([]artifact.DBArtifact, 0, len(op.Inputs)) inputContentPaths := make([]string, 0, len(op.Inputs)) inputMetadataPaths := make([]string, 0, len(op.Inputs)) for _, inputArtifactId := range op.Inputs { @@ -202,7 +202,7 @@ func scheduleOperators( inputMetadataPaths = append(inputMetadataPaths, artifactMetadataPaths[inputArtifact.Id]) } - outputArtifacts := make([]artifact.Artifact, 0, len(op.Outputs)) + outputArtifacts := make([]artifact.DBArtifact, 0, len(op.Outputs)) outputContentPaths := make([]string, 0, len(op.Outputs)) outputMetadataPaths := make([]string, 0, len(op.Outputs)) for _, outputArtifactId := range op.Outputs { @@ -281,7 +281,7 @@ func waitForActiveOperators( func Preview( ctx context.Context, - dag *workflow_dag.WorkflowDag, + dag *workflow_dag.DBWorkflowDag, workflowStoragePaths *utils.WorkflowStoragePaths, pollIntervalMillisec time.Duration, jobManager job.JobManager, @@ -307,7 +307,7 @@ func Preview( func Execute( ctx context.Context, - dag *workflow_dag.WorkflowDag, + dag *workflow_dag.DBWorkflowDag, workflowStoragePaths *utils.WorkflowStoragePaths, pollIntervalMillisec time.Duration, workflowReader workflow.Reader, @@ -340,7 +340,7 @@ func Execute( func orchestrate( ctx context.Context, - dag *workflow_dag.WorkflowDag, + dag *workflow_dag.DBWorkflowDag, workflowStoragePaths *utils.WorkflowStoragePaths, pollIntervalMillisec time.Duration, workflowReader workflow.Reader, diff --git a/src/golang/lib/workflow/scheduler/extract.go b/src/golang/lib/workflow/scheduler/extract.go index 0cf502bbd..476fa4620 100644 --- a/src/golang/lib/workflow/scheduler/extract.go +++ b/src/golang/lib/workflow/scheduler/extract.go @@ -4,10 +4,10 @@ import ( "context" "fmt" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" "github.com/aqueducthq/aqueduct/lib/collections/shared" "github.com/aqueducthq/aqueduct/lib/job" "github.com/aqueducthq/aqueduct/lib/vault" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" diff --git a/src/golang/lib/workflow/scheduler/function.go b/src/golang/lib/workflow/scheduler/function.go index c8d14acc1..35881be68 100644 --- a/src/golang/lib/workflow/scheduler/function.go +++ b/src/golang/lib/workflow/scheduler/function.go @@ -5,9 +5,9 @@ import ( "fmt" "github.com/aqueducthq/aqueduct/lib/collections/artifact" + "github.com/aqueducthq/aqueduct/lib/collections/operator/function" "github.com/aqueducthq/aqueduct/lib/collections/shared" "github.com/aqueducthq/aqueduct/lib/job" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/function" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) diff --git a/src/golang/lib/workflow/scheduler/load.go b/src/golang/lib/workflow/scheduler/load.go index 549dbbefb..2434f6e52 100644 --- a/src/golang/lib/workflow/scheduler/load.go +++ b/src/golang/lib/workflow/scheduler/load.go @@ -4,10 +4,10 @@ import ( "context" "fmt" + "github.com/aqueducthq/aqueduct/lib/collections/operator/connector" "github.com/aqueducthq/aqueduct/lib/collections/shared" "github.com/aqueducthq/aqueduct/lib/job" "github.com/aqueducthq/aqueduct/lib/vault" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector" "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" diff --git a/src/golang/lib/workflow/scheduler/param.go b/src/golang/lib/workflow/scheduler/param.go index dda30c4ad..6322ed221 100644 --- a/src/golang/lib/workflow/scheduler/param.go +++ b/src/golang/lib/workflow/scheduler/param.go @@ -4,9 +4,9 @@ import ( "context" "fmt" + "github.com/aqueducthq/aqueduct/lib/collections/operator/param" "github.com/aqueducthq/aqueduct/lib/collections/shared" "github.com/aqueducthq/aqueduct/lib/job" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/param" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) diff --git a/src/golang/lib/workflow/scheduler/schedule.go b/src/golang/lib/workflow/scheduler/schedule.go index 401ea33d0..1d98060d6 100644 --- a/src/golang/lib/workflow/scheduler/schedule.go +++ b/src/golang/lib/workflow/scheduler/schedule.go @@ -47,9 +47,9 @@ var ( // func ScheduleOperator( ctx context.Context, - op operator.Operator, - inputArtifacts []artifact.Artifact, - outputArtifacts []artifact.Artifact, + op operator.DBOperator, + inputArtifacts []artifact.DBArtifact, + outputArtifacts []artifact.DBArtifact, metadataPath string, inputContentPaths []string, inputMetadataPaths []string, diff --git a/src/golang/lib/workflow/scheduler/system_metric.go b/src/golang/lib/workflow/scheduler/system_metric.go index 548dbd2ad..89a4c6617 100644 --- a/src/golang/lib/workflow/scheduler/system_metric.go +++ b/src/golang/lib/workflow/scheduler/system_metric.go @@ -4,9 +4,9 @@ import ( "context" "fmt" + "github.com/aqueducthq/aqueduct/lib/collections/operator/system_metric" "github.com/aqueducthq/aqueduct/lib/collections/shared" "github.com/aqueducthq/aqueduct/lib/job" - "github.com/aqueducthq/aqueduct/lib/workflow/operator/system_metric" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) diff --git a/src/golang/lib/workflow/utils/utils.go b/src/golang/lib/workflow/utils/utils.go index d9313a71d..7f148b474 100644 --- a/src/golang/lib/workflow/utils/utils.go +++ b/src/golang/lib/workflow/utils/utils.go @@ -29,7 +29,7 @@ type WorkflowStoragePaths struct { ArtifactMetadataPaths map[uuid.UUID]string } -func GenerateWorkflowStoragePaths(dag *workflow_dag.WorkflowDag) *WorkflowStoragePaths { +func GenerateWorkflowStoragePaths(dag *workflow_dag.DBWorkflowDag) *WorkflowStoragePaths { workflowStoragePaths := WorkflowStoragePaths{ OperatorMetadataPaths: make(map[uuid.UUID]string), ArtifactPaths: make(map[uuid.UUID]string), @@ -105,7 +105,7 @@ func ReadFromStorage(ctx context.Context, storageConfig *shared.StorageConfig, p func WriteWorkflowDagToDatabase( ctx context.Context, - dag *workflow_dag.WorkflowDag, + dag *workflow_dag.DBWorkflowDag, workflowReader workflow.Reader, workflowWriter workflow.Writer, workflowDagWriter workflow_dag.Writer, @@ -241,7 +241,7 @@ func ReadWorkflowDagFromDatabase( artifactReader artifact.Reader, workflowDagEdgeReader workflow_dag_edge.Reader, db database.Database, -) (*workflow_dag.WorkflowDag, error) { +) (*workflow_dag.DBWorkflowDag, error) { workflowDag, err := workflowDagReader.GetWorkflowDag(ctx, workflowDagId, db) if err != nil { return nil, errors.Wrap(err, "Unable to read workflow dag from the database.") @@ -254,8 +254,8 @@ func ReadWorkflowDagFromDatabase( workflowDag.Metadata = dbWorkflow - workflowDag.Operators = make(map[uuid.UUID]operator.Operator) - workflowDag.Artifacts = make(map[uuid.UUID]artifact.Artifact) + workflowDag.Operators = make(map[uuid.UUID]operator.DBOperator) + workflowDag.Artifacts = make(map[uuid.UUID]artifact.DBArtifact) // Populate nodes for operators and artifacts. operators, err := operatorReader.GetOperatorsByWorkflowDagId(ctx, workflowDag.Id, db) @@ -324,7 +324,7 @@ func ReadLatestWorkflowDagFromDatabase( artifactReader artifact.Reader, workflowDagEdgeReader workflow_dag_edge.Reader, db database.Database, -) (*workflow_dag.WorkflowDag, error) { +) (*workflow_dag.DBWorkflowDag, error) { workflowDag, err := workflowDagReader.GetLatestWorkflowDag(ctx, workflowId, db) if err != nil { return nil, errors.Wrap(err, "Unable to read the latest workflow dag from the database.") @@ -351,7 +351,7 @@ func ReadLatestWorkflowDagFromDatabase( func UpdateWorkflowDagToLatest( ctx context.Context, githubClient github.Client, - workflowDag *workflow_dag.WorkflowDag, + workflowDag *workflow_dag.DBWorkflowDag, workflowReader workflow.Reader, workflowWriter workflow.Writer, workflowDagReader workflow_dag.Reader, @@ -363,8 +363,8 @@ func UpdateWorkflowDagToLatest( artifactReader artifact.Reader, artifactWriter artifact.Writer, db database.Database, -) (*workflow_dag.WorkflowDag, error) { - operatorsToReplace := make([]operator.Operator, 0, len(workflowDag.Operators)) +) (*workflow_dag.DBWorkflowDag, error) { + operatorsToReplace := make([]operator.DBOperator, 0, len(workflowDag.Operators)) for _, op := range workflowDag.Operators { opUpdated, err := github.PullOperator( ctx, @@ -459,7 +459,7 @@ func UpdateWorkflowDagResultMetadata( // It logs any error that occurs during these steps. func UpdateOperatorAndArtifactResults( ctx context.Context, - operator *operator.Operator, + operator *operator.DBOperator, storageConfig *shared.StorageConfig, operatorState *shared.ExecutionState, artifactMetadataPaths map[uuid.UUID]string, @@ -512,7 +512,7 @@ func UpdateOperatorAndArtifactResults( func updateOperatorAndArtifactResults( ctx context.Context, - operator *operator.Operator, + operator *operator.DBOperator, operatorState *shared.ExecutionState, artifactStatuses map[uuid.UUID]shared.ExecutionStatus, artifactResultsMetadata map[uuid.UUID]*artifact_result.Metadata,