Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performs Code Structure/Naming changes for Orchestration Refactor #207

Merged
merged 7 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/golang/cmd/server/handler/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (

"github.com/aqueducthq/aqueduct/cmd/server/routes"
"github.com/aqueducthq/aqueduct/lib/collections/integration"
"github.com/aqueducthq/aqueduct/lib/collections/operator/connector"
"github.com/aqueducthq/aqueduct/lib/collections/shared"
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/job"
"github.com/aqueducthq/aqueduct/lib/storage"
"github.com/aqueducthq/aqueduct/lib/vault"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth"
"github.com/aqueducthq/aqueduct/lib/workflow/utils"
"github.com/dropbox/godropbox/errors"
Expand Down
2 changes: 1 addition & 1 deletion src/golang/cmd/server/handler/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"github.com/aqueducthq/aqueduct/cmd/server/queries"
"github.com/aqueducthq/aqueduct/cmd/server/routes"
"github.com/aqueducthq/aqueduct/lib/collections/integration"
"github.com/aqueducthq/aqueduct/lib/collections/operator/connector"
"github.com/aqueducthq/aqueduct/lib/collections/shared"
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/job"
"github.com/aqueducthq/aqueduct/lib/vault"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth"
workflow_utils "github.com/aqueducthq/aqueduct/lib/workflow/utils"
"github.com/dropbox/godropbox/errors"
Expand Down
2 changes: 1 addition & 1 deletion src/golang/cmd/server/handler/get_artifact_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"net/http"

"github.com/aqueducthq/aqueduct/cmd/server/queries"
"github.com/aqueducthq/aqueduct/lib/collections/operator/connector"
"github.com/aqueducthq/aqueduct/lib/collections/shared"
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
)
Expand Down
4 changes: 2 additions & 2 deletions src/golang/cmd/server/handler/get_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type getWorkflowArgs struct {

type getWorkflowResponse struct {
// a map of workflow dags keyed by their IDs
WorkflowDags map[uuid.UUID]*workflow_dag.WorkflowDag `json:"workflow_dags"`
WorkflowDags map[uuid.UUID]*workflow_dag.DBWorkflowDag `json:"workflow_dags"`
// a list of dag results. Each result's `workflow_dag_id` field correspond to the
WorkflowDagResults []workflowDagResult `json:"workflow_dag_results"`
// a list of auth0Ids associated with workflow watchers
Expand Down Expand Up @@ -116,7 +116,7 @@ func (h *GetWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfac
return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error occurred when retrieving workflow.")
}

workflowDags := make(map[uuid.UUID]*workflow_dag.WorkflowDag, len(dbWorkflowDags))
workflowDags := make(map[uuid.UUID]*workflow_dag.DBWorkflowDag, len(dbWorkflowDags))
for _, dbWorkflowDag := range dbWorkflowDags {
constructedDag, err := workflow_utils.ReadWorkflowDagFromDatabase(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion src/golang/cmd/server/handler/preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func deserializeArtifactResponses(
ctx context.Context,
workflowStoragePaths *utils.WorkflowStoragePaths,
storageConfig *shared.StorageConfig,
dagArtifacts map[uuid.UUID]artifact.Artifact,
dagArtifacts map[uuid.UUID]artifact.DBArtifact,
artifactsToSkipFetch map[uuid.UUID]bool,
) (map[uuid.UUID]previewArtifactResponse, error) {
responses := make(map[uuid.UUID]previewArtifactResponse, len(workflowStoragePaths.ArtifactPaths))
Expand Down
2 changes: 1 addition & 1 deletion src/golang/cmd/server/handler/preview_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (

"github.com/aqueducthq/aqueduct/cmd/server/routes"
"github.com/aqueducthq/aqueduct/lib/collections/integration"
"github.com/aqueducthq/aqueduct/lib/collections/operator/connector"
"github.com/aqueducthq/aqueduct/lib/collections/shared"
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/job"
"github.com/aqueducthq/aqueduct/lib/storage"
"github.com/aqueducthq/aqueduct/lib/vault"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector"
"github.com/aqueducthq/aqueduct/lib/workflow/scheduler"
workflow_utils "github.com/aqueducthq/aqueduct/lib/workflow/utils"
"github.com/dropbox/godropbox/errors"
Expand Down
2 changes: 1 addition & 1 deletion src/golang/cmd/server/handler/register_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type RegisterWorkflowHandler struct {

type registerWorkflowArgs struct {
*aq_context.AqContext
workflowDag *workflow_dag.WorkflowDag
workflowDag *workflow_dag.DBWorkflowDag
operatorIdToFileContents map[uuid.UUID][]byte

// Whether this is a registering a new workflow or updating an existing one.
Expand Down
8 changes: 4 additions & 4 deletions src/golang/cmd/server/request/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (

"github.com/aqueducthq/aqueduct/cmd/server/routes"
"github.com/aqueducthq/aqueduct/lib/collections/operator"
"github.com/aqueducthq/aqueduct/lib/collections/operator/function"
"github.com/aqueducthq/aqueduct/lib/collections/shared"
"github.com/aqueducthq/aqueduct/lib/collections/workflow_dag"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/github"
"github.com/aqueducthq/aqueduct/lib/workflow/operator/function"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
)

const dagKey = "dag"

type DagSummary struct {
Dag *workflow_dag.WorkflowDag
Dag *workflow_dag.DBWorkflowDag

// Extract the operator contents from the request body
FileContentsByOperatorUUID map[uuid.UUID][]byte
Expand All @@ -40,7 +40,7 @@ func ParseDagSummaryFromRequest(
return nil, http.StatusBadRequest, errors.Wrap(err, "Serialized dag object not available")
}

var workflowDag workflow_dag.WorkflowDag
var workflowDag workflow_dag.DBWorkflowDag
err = json.Unmarshal(serializedDAGBytes, &workflowDag)
if err != nil {
return nil, http.StatusBadRequest, errors.Wrap(err, "Invalid dag specification.")
Expand Down Expand Up @@ -81,7 +81,7 @@ func ParseDagSummaryFromRequest(
// For github contents, retrieve zipball for files and update string contents like sql queries.
func extractOperatorContentsFromRequest(
r *http.Request,
op *operator.Operator,
op *operator.DBOperator,
ghClient github.Client,
) ([]byte, int, error) {
if op.Spec.IsExtract() {
Expand Down
12 changes: 6 additions & 6 deletions src/golang/lib/collections/artifact/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/google/uuid"
)

type Artifact struct {
type DBArtifact struct {
Id uuid.UUID `db:"id" json:"id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Expand All @@ -16,13 +16,13 @@ type Artifact struct {

type Reader interface {
Exists(ctx context.Context, id uuid.UUID, db database.Database) (bool, error)
GetArtifact(ctx context.Context, id uuid.UUID, db database.Database) (*Artifact, error)
GetArtifacts(ctx context.Context, ids []uuid.UUID, db database.Database) ([]Artifact, error)
GetArtifact(ctx context.Context, id uuid.UUID, db database.Database) (*DBArtifact, error)
GetArtifacts(ctx context.Context, ids []uuid.UUID, db database.Database) ([]DBArtifact, error)
GetArtifactsByWorkflowDagId(
ctx context.Context,
workflowDagId uuid.UUID,
db database.Database,
) ([]Artifact, error)
) ([]DBArtifact, error)
ValidateArtifactOwnership(
ctx context.Context,
organizationId string,
Expand All @@ -38,13 +38,13 @@ type Writer interface {
description string,
spec *Spec,
db database.Database,
) (*Artifact, error)
) (*DBArtifact, error)
UpdateArtifact(
ctx context.Context,
id uuid.UUID,
changes map[string]interface{},
db database.Database,
) (*Artifact, error)
) (*DBArtifact, error)
DeleteArtifact(ctx context.Context, id uuid.UUID, db database.Database) error
DeleteArtifacts(ctx context.Context, ids []uuid.UUID, db database.Database) error
}
Expand Down
6 changes: 3 additions & 3 deletions src/golang/lib/collections/artifact/artifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package artifact_test

import (
"encoding/json"
"github.com/aqueducthq/aqueduct/lib/collections/artifact/table"
"testing"

"github.com/aqueducthq/aqueduct/lib/collections/artifact"
"github.com/aqueducthq/aqueduct/lib/workflow/artifact/table"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestSerializingAndDeserializingArtifact(t *testing.T) {
id := uuid.New()

atf := artifact.Artifact{
atf := artifact.DBArtifact{
Id: id,
Name: "test",
Spec: *artifact.NewSpecFromTable(
Expand All @@ -24,7 +24,7 @@ func TestSerializingAndDeserializingArtifact(t *testing.T) {
rawAtf, err := json.Marshal(atf)
require.Nil(t, err)

var reconstructedAtf artifact.Artifact
var reconstructedAtf artifact.DBArtifact
err = json.Unmarshal(rawAtf, &reconstructedAtf)
require.Nil(t, err)
require.True(t, reconstructedAtf.Spec.IsTable())
Expand Down
4 changes: 2 additions & 2 deletions src/golang/lib/collections/artifact/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (w *sqliteWriterImpl) CreateArtifact(
description string,
spec *Spec,
db database.Database,
) (*Artifact, error) {
) (*DBArtifact, error) {
insertColumns := []string{IdColumn, NameColumn, DescriptionColumn, SpecColumn}
insertArtifactStmt := db.PrepareInsertWithReturnAllStmt(tableName, insertColumns, allColumns())

Expand All @@ -40,7 +40,7 @@ func (w *sqliteWriterImpl) CreateArtifact(

args := []interface{}{id, name, description, spec}

var artifact Artifact
var artifact DBArtifact
err = db.Query(ctx, &artifact, insertArtifactStmt, args...)
return &artifact, err
}
18 changes: 9 additions & 9 deletions src/golang/lib/collections/artifact/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ func (w *standardWriterImpl) CreateArtifact(
description string,
spec *Spec,
db database.Database,
) (*Artifact, error) {
) (*DBArtifact, error) {
insertColumns := []string{NameColumn, DescriptionColumn, SpecColumn}
insertArtifactStmt := db.PrepareInsertWithReturnAllStmt(tableName, insertColumns, allColumns())

args := []interface{}{name, description, spec}

var artifact Artifact
var artifact DBArtifact
err := db.Query(ctx, &artifact, insertArtifactStmt, args...)
return &artifact, err
}
Expand All @@ -41,7 +41,7 @@ func (r *standardReaderImpl) GetArtifact(
ctx context.Context,
id uuid.UUID,
db database.Database,
) (*Artifact, error) {
) (*DBArtifact, error) {
artifacts, err := r.GetArtifacts(ctx, []uuid.UUID{id}, db)
if err != nil {
return nil, err
Expand All @@ -58,7 +58,7 @@ func (r *standardReaderImpl) GetArtifacts(
ctx context.Context,
ids []uuid.UUID,
db database.Database,
) ([]Artifact, error) {
) ([]DBArtifact, error) {
if len(ids) == 0 {
return nil, errors.New("Provided empty IDs list.")
}
Expand All @@ -71,7 +71,7 @@ func (r *standardReaderImpl) GetArtifacts(

args := stmt_preparers.CastIdsListToInterfaceList(ids)

var artifacts []Artifact
var artifacts []DBArtifact
err := db.Query(ctx, &artifacts, getArtifactsQuery, args...)
return artifacts, err
}
Expand All @@ -80,7 +80,7 @@ func (r *standardReaderImpl) GetArtifactsByWorkflowDagId(
ctx context.Context,
workflowDagId uuid.UUID,
db database.Database,
) ([]Artifact, error) {
) ([]DBArtifact, error) {
getArtifactsByWorkflowDagIdQuery := fmt.Sprintf(
`SELECT %s FROM artifact WHERE id IN
(SELECT from_id FROM workflow_dag_edge WHERE workflow_dag_id = $1 AND type = '%s'
Expand All @@ -91,7 +91,7 @@ func (r *standardReaderImpl) GetArtifactsByWorkflowDagId(
workflow_dag_edge.OperatorToArtifactType,
)

var artifacts []Artifact
var artifacts []DBArtifact
err := db.Query(ctx, &artifacts, getArtifactsByWorkflowDagIdQuery, workflowDagId)
return artifacts, err
}
Expand All @@ -101,8 +101,8 @@ func (w *standardWriterImpl) UpdateArtifact(
id uuid.UUID,
changes map[string]interface{},
db database.Database,
) (*Artifact, error) {
var artifact Artifact
) (*DBArtifact, error) {
var artifact DBArtifact
err := utils.UpdateRecordToDest(ctx, &artifact, changes, tableName, IdColumn, id, allColumns(), db)
return &artifact, err
}
Expand Down
8 changes: 4 additions & 4 deletions src/golang/lib/collections/artifact/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"database/sql/driver"
"encoding/json"

"github.com/aqueducthq/aqueduct/lib/collections/artifact/boolean"
"github.com/aqueducthq/aqueduct/lib/collections/artifact/float"
"github.com/aqueducthq/aqueduct/lib/collections/artifact/jsonable"
"github.com/aqueducthq/aqueduct/lib/collections/artifact/table"
"github.com/aqueducthq/aqueduct/lib/collections/utils"
"github.com/aqueducthq/aqueduct/lib/workflow/artifact/boolean"
"github.com/aqueducthq/aqueduct/lib/workflow/artifact/float"
"github.com/aqueducthq/aqueduct/lib/workflow/artifact/jsonable"
"github.com/aqueducthq/aqueduct/lib/workflow/artifact/table"
"github.com/dropbox/godropbox/errors"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package check

import (
"github.com/aqueducthq/aqueduct/lib/workflow/operator/function"
"github.com/aqueducthq/aqueduct/lib/collections/operator/function"
)

type Level string
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package connector

import gh_types "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/github/types"
import (
gh_types "github.com/aqueducthq/aqueduct/lib/collections/operator/connector/github"
)

type ExtractParams interface {
isExtractParams()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package types
package github

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package function

import (
github "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/github/types"
"github.com/aqueducthq/aqueduct/lib/collections/operator/connector/github"
)

type Function struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package metric

import (
"github.com/aqueducthq/aqueduct/lib/workflow/operator/function"
"github.com/aqueducthq/aqueduct/lib/collections/operator/function"
)

type Metric struct {
Expand Down
12 changes: 6 additions & 6 deletions src/golang/lib/collections/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/google/uuid"
)

type Operator struct {
type DBOperator struct {
Id uuid.UUID `db:"id" json:"id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Expand All @@ -20,13 +20,13 @@ type Operator struct {

type Reader interface {
Exists(ctx context.Context, id uuid.UUID, db database.Database) (bool, error)
GetOperator(ctx context.Context, id uuid.UUID, db database.Database) (*Operator, error)
GetOperators(ctx context.Context, ids []uuid.UUID, db database.Database) ([]Operator, error)
GetOperator(ctx context.Context, id uuid.UUID, db database.Database) (*DBOperator, error)
GetOperators(ctx context.Context, ids []uuid.UUID, db database.Database) ([]DBOperator, error)
GetOperatorsByWorkflowDagId(
ctx context.Context,
workflowDagId uuid.UUID,
db database.Database,
) ([]Operator, error)
) ([]DBOperator, error)
ValidateOperatorOwnership(
ctx context.Context,
organizationId string,
Expand All @@ -42,13 +42,13 @@ type Writer interface {
description string,
spec *Spec,
db database.Database,
) (*Operator, error)
) (*DBOperator, error)
UpdateOperator(
ctx context.Context,
id uuid.UUID,
changes map[string]interface{},
db database.Database,
) (*Operator, error)
) (*DBOperator, error)
DeleteOperator(ctx context.Context, id uuid.UUID, db database.Database) error
DeleteOperators(ctx context.Context, ids []uuid.UUID, db database.Database) error
}
Expand Down
Loading