Skip to content

Commit

Permalink
Add appropriate workflow count to our connected notification cards an…
Browse files Browse the repository at this point in the history
…d details page (#1302)
  • Loading branch information
kenxu95 authored May 11, 2023
1 parent 7c75ad6 commit 86269ec
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// serialized `listOperatorsForIntegrationResponse`
//
// `listOperatorsForIntegration` lists all operators associated with
// the given integraion. Together we provide the following information for
// the given integration. Together we provide the following information for
// each associated operator:
// `workflow_id`: the workflow associated with this operator
// `workflow_dag_id`: the workflow dag associated with this operator
Expand Down Expand Up @@ -100,6 +100,9 @@ func (h *ListOperatorsForIntegrationHandler) Perform(ctx context.Context, interf
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve operators.")
}
if len(operators) == 0 {
return listOperatorsForIntegrationResponse{OperatorWithIds: []listOperatorsForIntegrationItem{}}, http.StatusOK, nil
}

operatorIDs := make([]uuid.UUID, 0, len(operators))
operatorByIDs := make(map[uuid.UUID]models.Operator, len(operators))
Expand Down
3 changes: 3 additions & 0 deletions src/golang/cmd/server/handler/v2/integration_operators_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func (h *IntegrationOperatorsGetHandler) Perform(ctx context.Context, interfaceA
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve operators.")
}
if len(operators) == 0 {
return []*response.Operator{}, http.StatusOK, nil
}

operatorIDs := slices.Map(operators, func(op models.Operator) uuid.UUID {
return op.ID
Expand Down
6 changes: 4 additions & 2 deletions src/golang/cmd/server/handler/v2/integration_workflows_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type IntegrationWorkflowsGetHandler struct {

Database database.Database
IntegrationRepo repos.Integration
OperatorRepo repos.Operator
WorkflowRepo repos.Workflow
DAGRepo repos.DAG
DAGResultRepo repos.DAGResult
OperatorRepo repos.Operator
}

func (*IntegrationWorkflowsGetHandler) Name() string {
Expand Down Expand Up @@ -69,7 +71,7 @@ func (h *IntegrationWorkflowsGetHandler) Perform(ctx context.Context, interfaceA
}

workflowAndDagIDs, err := fetchWorkflowAndDagIDsForIntegration(
ctx, args.OrgID, integration, h.IntegrationRepo, h.OperatorRepo, h.DAGResultRepo, h.Database,
ctx, args.OrgID, integration, h.IntegrationRepo, h.WorkflowRepo, h.OperatorRepo, h.DAGRepo, h.DAGResultRepo, h.Database,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "Unable to find workflows for integration %s", args.integrationID)
Expand Down
139 changes: 85 additions & 54 deletions src/golang/cmd/server/handler/v2/integrations_workflows_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aqueducthq/aqueduct/lib/errors"
"github.com/aqueducthq/aqueduct/lib/functional/slices"
"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/aqueducthq/aqueduct/lib/response"
"github.com/aqueducthq/aqueduct/lib/workflow/operator"
Expand Down Expand Up @@ -37,8 +38,10 @@ type IntegrationsWorkflowsGetHandler struct {

Database database.Database
IntegrationRepo repos.Integration
OperatorRepo repos.Operator
WorkflowRepo repos.Workflow
DAGRepo repos.DAG
DAGResultRepo repos.DAGResult
OperatorRepo repos.Operator
}

func (*IntegrationsWorkflowsGetHandler) Name() string {
Expand Down Expand Up @@ -71,7 +74,9 @@ func (h *IntegrationsWorkflowsGetHandler) Perform(ctx context.Context, interface

response := make(map[uuid.UUID][]*response.WorkflowAndDagIDs, len(integrations))
for _, integration := range integrations {
workflowAndDagIDs, err := fetchWorkflowAndDagIDsForIntegration(ctx, args.OrgID, &integration, h.IntegrationRepo, h.OperatorRepo, h.DAGResultRepo, h.Database)
workflowAndDagIDs, err := fetchWorkflowAndDagIDsForIntegration(
ctx,
args.OrgID, &integration, h.IntegrationRepo, h.WorkflowRepo, h.OperatorRepo, h.DAGRepo, h.DAGResultRepo, h.Database)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "Unable to find workflows for integration %s", integration.ID)
}
Expand All @@ -88,73 +93,99 @@ func fetchWorkflowAndDagIDsForIntegration(
orgID string,
integration *models.Integration,
integrationRepo repos.Integration,
workflowRepo repos.Workflow,
operatorRepo repos.Operator,
dagRepo repos.DAG,
dagResultRepo repos.DAGResult,
db database.Database,
) ([]*response.WorkflowAndDagIDs, error) {
operators, err := operator.GetOperatorsOnIntegration(
ctx,
orgID,
integration,
integrationRepo,
operatorRepo,
db,
)
if err != nil {
return nil, errors.Wrap(err, "Unable to retrieve operators.")
}

// Now, using the operators using this integration, we can infer all the workflows
// that also use this integration.
operatorIDs := slices.Map(operators, func(op models.Operator) uuid.UUID {
return op.ID
})
// For performance reasons, we split out the workflows fetching for notifications, since for these
// resources, you can fetch the workflow IDs that use them directly, instead of having to go through
// operators.
if shared.IsNotificationResource(integration.Service) {
workflowIDs, err := operator.GetWorkflowIDsUsingNotification(ctx, integration, workflowRepo, db)
if err != nil {
return nil, err
}

operatorRelations, err := operatorRepo.GetRelationBatch(ctx, operatorIDs, db)
if err != nil {
return nil, errors.Wrap(err, "Unable to retrieve operator ID information.")
}
workflowIDToLatestDagID, err := dagRepo.GetLatestIDByWorkflowBatch(ctx, workflowIDs, db)
if err != nil {
return nil, err
}

// This map is derived directly from the operators.
workflowIDToDagIDs := make(map[uuid.UUID][]uuid.UUID, len(operatorRelations))
for _, operatorRelation := range operatorRelations {
workflowIDToDagIDs[operatorRelation.WorkflowID] = append(
workflowIDToDagIDs[operatorRelation.WorkflowID],
operatorRelation.DagID,
workflowAndDagIDs := make([]*response.WorkflowAndDagIDs, 0, len(workflowIDToLatestDagID))
for workflowID, dagID := range workflowIDToLatestDagID {
workflowAndDagIDs = append(workflowAndDagIDs, &response.WorkflowAndDagIDs{
WorkflowID: workflowID,
DagID: dagID,
})
}
return workflowAndDagIDs, nil

} else {
operators, err := operator.GetOperatorsOnIntegration(
ctx,
orgID,
integration,
integrationRepo,
operatorRepo,
db,
)
}
if err != nil {
return nil, errors.Wrap(err, "Unable to retrieve operators.")
}

// Now, using the operators using this integration, we can infer all the workflows
// that also use this integration.
operatorIDs := slices.Map(operators, func(op models.Operator) uuid.UUID {
return op.ID
})

// For each workflow, fetch the latest dag ID. We can use this latest dag ID to filter out any
// workflows had historically used this resource, but no longer do in their latest run.
workflowAndDagIDs := make([]*response.WorkflowAndDagIDs, 0, len(workflowIDToDagIDs))
for workflowID, dagIDs := range workflowIDToDagIDs {
dbDAGResults, err := dagResultRepo.GetByWorkflow(ctx, workflowID, "created_at", 1, true, db)
operatorRelations, err := operatorRepo.GetRelationBatch(ctx, operatorIDs, db)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "Unable to retrieve operator ID information.")
}

// Skip any workflows that have been defined but have not run yet.
if len(dbDAGResults) == 1 {
latestDagID := dbDAGResults[0].DagID
// This map is derived directly from the operators.
workflowIDToDagIDs := make(map[uuid.UUID][]uuid.UUID, len(operatorRelations))
for _, operatorRelation := range operatorRelations {
workflowIDToDagIDs[operatorRelation.WorkflowID] = append(
workflowIDToDagIDs[operatorRelation.WorkflowID],
operatorRelation.DagID,
)
}

found := false
for _, dagID := range dagIDs {
if dagID == latestDagID {
found = true
break
}
// For each workflow, fetch the latest dag ID. We can use this latest dag ID to filter out any
// workflows had historically used this resource, but no longer do in their latest run.
workflowAndDagIDs := make([]*response.WorkflowAndDagIDs, 0, len(workflowIDToDagIDs))
for workflowID, dagIDs := range workflowIDToDagIDs {
dbDAGResults, err := dagResultRepo.GetByWorkflow(ctx, workflowID, "created_at", 1, true, db)
if err != nil {
return nil, err
}

// If the latest dag does not have any of the resource operator's on it, that means
// that the workflow no longer uses this resource.
if found {
workflowAndDagIDs = append(workflowAndDagIDs, &response.WorkflowAndDagIDs{
WorkflowID: workflowID,
DagID: latestDagID,
})
// Skip any workflows that have been defined but have not run yet.
if len(dbDAGResults) == 1 {
latestDagID := dbDAGResults[0].DagID

found := false
for _, dagID := range dagIDs {
if dagID == latestDagID {
found = true
break
}
}

// If the latest dag does not have any of the resource operator's on it, that means
// that the workflow no longer uses this resource.
if found {
workflowAndDagIDs = append(workflowAndDagIDs, &response.WorkflowAndDagIDs{
WorkflowID: workflowID,
DagID: latestDagID,
})
}
}
}
return workflowAndDagIDs, nil
}

return workflowAndDagIDs, nil
}
8 changes: 6 additions & 2 deletions src/golang/cmd/server/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ func (s *AqServer) Handlers() map[string]handler.Handler {
routes.IntegrationsWorkflowsRoute: &v2.IntegrationsWorkflowsGetHandler{
Database: s.Database,
IntegrationRepo: s.IntegrationRepo,
OperatorRepo: s.OperatorRepo,
WorkflowRepo: s.WorkflowRepo,
DAGRepo: s.DAGRepo,
DAGResultRepo: s.DAGResultRepo,
OperatorRepo: s.OperatorRepo,
},
routes.IntegrationWorkflowsRoute: &v2.IntegrationWorkflowsGetHandler{
Database: s.Database,
IntegrationRepo: s.IntegrationRepo,
OperatorRepo: s.OperatorRepo,
WorkflowRepo: s.WorkflowRepo,
DAGRepo: s.DAGRepo,
DAGResultRepo: s.DAGResultRepo,
OperatorRepo: s.OperatorRepo,
},

// V1 Handlers
Expand Down
33 changes: 33 additions & 0 deletions src/golang/lib/lib_utils/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package lib_utils

import (
"github.com/aqueducthq/aqueduct/lib/errors"
"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/models/shared"
)

const (
notificationEnabledKey = "enabled"
notificationLevelKey = "level"
)

// Returning a nil level means that `disabled` == true.
func ExtractNotificationLevel(integrationObject *models.Integration) (*shared.NotificationLevel, error) {
enabledStr, ok := integrationObject.Config[notificationEnabledKey]
if !ok {
return nil, errors.Newf("Notification %s is missing 'enabled' key.", integrationObject.Name)
}
if enabledStr == "false" {
return nil, nil
}

levelStr, ok := integrationObject.Config[notificationLevelKey]
if !ok {
return nil, errors.Newf("Notification %s is enabled but missing 'level' key.", integrationObject.Name)
}
level, err := shared.StrToNotificationLevel(levelStr)
if err != nil {
return nil, err
}
return &level, nil
}
13 changes: 13 additions & 0 deletions src/golang/lib/models/shared/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql/driver"

"github.com/aqueducthq/aqueduct/lib/models/utils"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
)

Expand All @@ -13,10 +14,22 @@ const (
SuccessNotificationLevel NotificationLevel = "success"
WarningNotificationLevel NotificationLevel = "warning"
ErrorNotificationLevel NotificationLevel = "error"

// This is only for Aqueduct bell notifications.
InfoNotificationLevel NotificationLevel = "info"
NeutralNotificationLevel NotificationLevel = "neutral"
)

func StrToNotificationLevel(levelStr string) (NotificationLevel, error) {
level := NotificationLevel(levelStr)
switch level {
case SuccessNotificationLevel, WarningNotificationLevel, ErrorNotificationLevel, InfoNotificationLevel, NeutralNotificationLevel:
return level, nil
default:
return "", errors.Newf("Unknown notification level: %v", level)
}
}

type NotificationStatus string

const (
Expand Down
4 changes: 4 additions & 0 deletions src/golang/lib/models/shared/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ func IsComputeIntegration(service Service) bool {
return ok
}

func IsNotificationResource(service Service) bool {
return service == Email || service == Slack
}

// IsUserOnlyIntegration returns whether the specified service is only accessible by the user.
func IsUserOnlyIntegration(svc Service) bool {
userSpecific := []Service{GoogleSheets, Github}
Expand Down
Loading

0 comments on commit 86269ec

Please sign in to comment.