Skip to content

Commit

Permalink
Merge pull request #88 from atlanhq/APP-5188
Browse files Browse the repository at this point in the history
APP-5188 : Manage Workflows and Workflows Schedules | Support for Snowflake Miner with Abstract Package
  • Loading branch information
0xquark authored Feb 18, 2025
2 parents ea0eba2 + b3eebb9 commit 04e8e32
Show file tree
Hide file tree
Showing 9 changed files with 1,905 additions and 11 deletions.
98 changes: 98 additions & 0 deletions atlan/assets/abstract_package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package assets

import (
"encoding/json"
"time"

"github.com/atlanhq/atlan-go/atlan/model/structs"
)

// AbstractPackage represents a base package
type AbstractPackage struct {
Parameters []structs.NameValuePair
CredentialsBody map[string]interface{}
PackageName string
PackagePrefix string
}

// NewAbstractPackage initializes an abstract package
func NewAbstractPackage(packageName, packagePrefix string) *AbstractPackage {
return &AbstractPackage{
PackageName: packageName,
PackagePrefix: packagePrefix,
Parameters: []structs.NameValuePair{},
CredentialsBody: map[string]interface{}{},
}
}

func (p *AbstractPackage) ToWorkflow() *structs.Workflow {
metadata := p.GetMetadata()

spec := structs.WorkflowSpec{
Entrypoint: structs.StringPtr("main"),
Templates: []structs.WorkflowTemplate{
{
Name: "main",
DAG: structs.WorkflowDAG{
Tasks: []structs.WorkflowTask{
{
Name: "run",
Arguments: structs.WorkflowParameters{
Parameters: p.Parameters,
},
TemplateRef: structs.WorkflowTemplateRef{
Name: p.PackagePrefix,
Template: "main",
ClusterScope: true,
},
},
},
},
},
},
WorkflowMetadata: metadata,
}

var payload []structs.PackageParameter
if len(p.CredentialsBody) > 0 {
credJSON, _ := json.Marshal(p.CredentialsBody)
payload = append(payload, structs.PackageParameter{
Parameter: "credentialGuid",
Type: "credential",
Body: credJSON,
})
}

return &structs.Workflow{
Metadata: metadata,
Spec: &spec,
Payload: payload,
}
}

// GetMetadata should be implemented by subclasses
func (p *AbstractPackage) GetMetadata() *structs.WorkflowMetadata {
// Default (empty) metadata implementation, to be overridden by child structs
return &structs.WorkflowMetadata{}
}

// AbstractMiner represents a base miner package
type AbstractMiner struct {
*AbstractPackage
Epoch int64
}

// NewAbstractMiner initializes an abstract miner
func NewAbstractMiner(connectionQualifiedName, packageName, packagePrefix string) *AbstractMiner {
epoch := time.Now().Unix()
packageInstance := NewAbstractPackage(packageName, packagePrefix)
packageInstance.Parameters = append(packageInstance.Parameters, structs.NameValuePair{
Name: "connection-qualified-name",
Value: connectionQualifiedName,
})

return &AbstractMiner{
AbstractPackage: packageInstance,
Epoch: epoch,
}
}
127 changes: 127 additions & 0 deletions atlan/assets/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,33 @@ type PurposeFields struct {
PURPOSE_CLASSIFICATIONS *KeywordField
}

type WorkflowFields struct {
AssetFields
WORKFLOW_TEMPLATE_GUID *KeywordField
WORKFLOW_TYPE *KeywordField
WORKFLOW_CONFIG *TextField
WORKFLOW_STATUS *KeywordField
WORKFLOW_RUN_EXPIRES_IN *TextField
WORKFLOW_CREATED_BY *KeywordField
WORKFLOW_UPDATED_BY *KeywordField
WORKFLOW_DELETED_AT *NumericField
}

// WorkflowRunFields represents the fields for a workflow run.
type WorkflowRunFields struct {
AssetFields
WORKFLOW_RUN_WORKFLOW_GUID *KeywordField
WORKFLOW_RUN_TYPE *KeywordField
WORKFLOW_RUN_ON_ASSET_GUID *KeywordField
WORKFLOW_RUN_COMMENT *TextField
WORKFLOW_RUN_CONFIG *TextField
WORKFLOW_RUN_STATUS *KeywordField
WORKFLOW_RUN_EXPIRES_AT *NumericField
WORKFLOW_RUN_CREATED_BY *KeywordField
WORKFLOW_RUN_UPDATED_BY *KeywordField
WORKFLOW_RUN_DELETED_AT *NumericField
}

// NewSearchTable returns a new AtlasTable object for Searching
func NewSearchTable() *AtlasTableFields {
return &AtlasTableFields{
Expand Down Expand Up @@ -1022,6 +1049,106 @@ func NewPurposeFields() *PurposeFields {
}
}

// NewWorkflowFields initializes and returns a WorkflowFields struct.
func NewWorkflowFields() *WorkflowFields {
return &WorkflowFields{
AssetFields: AssetFields{
AttributesFields: AttributesFields{
TYPENAME: NewKeywordTextField("typeName", "__typeName.keyword", "__typeName"),
GUID: NewKeywordField("guid", "__guid"),
CREATED_BY: NewKeywordField("createdBy", "__createdBy"),
UPDATED_BY: NewKeywordField("updatedBy", "__modifiedBy"),
STATUS: NewKeywordField("status", "__state"),
ATLAN_TAGS: NewKeywordTextField("classificationNames", "__traitNames", "__classificationsText"),
PROPOGATED_ATLAN_TAGS: NewKeywordTextField("classificationNames", "__propagatedTraitNames", "__classificationsText"),
ASSIGNED_TERMS: NewKeywordTextField("meanings", "__meanings", "__meaningsText"),
SUPERTYPE_NAMES: NewKeywordTextField("typeName", "__superTypeNames.keyword", "__superTypeNames"),
CREATE_TIME: NewNumericField("createTime", "__timestamp"),
UPDATE_TIME: NewNumericField("updateTime", "__modificationTimestamp"),
QUALIFIED_NAME: NewKeywordTextField("qualifiedName", "qualifiedName", "qualifiedName.text"),
},
NAME: NewKeywordTextStemmedField("name", "name.keyword", "name", "name"),
DISPLAY_NAME: NewKeywordTextField("displayName", "displayName.keyword", "displayName"),
DESCRIPTION: NewKeywordTextField("description", "description", "description.text"),
USER_DESCRIPTION: NewKeywordTextField("userDescription", "userDescription", "userDescription.text"),
TENET_ID: NewKeywordField("tenetId", "tenetId"),
CERTIFICATE_STATUS: NewKeywordTextField("certificateStatus", "certificateStatus", "certificateStatus.text"),
CERTIFICATE_STATUS_MESSAGE: NewKeywordField("certificateStatusMessage", "certificateStatusMessage"),
CERTIFICATE_UPDATED_BY: NewNumericField("certificateUpdatedBy", "certificateUpdatedBy"),
ANNOUNCEMENT_TITLE: NewKeywordField("announcementTitle", "announcementTitle"),
ANNOUNCEMENT_MESSAGE: NewKeywordTextField("announcementMessage", "announcementMessage", "announcementMessage.text"),
ANNOUNCEMENT_TYPE: NewKeywordField("announcementType", "announcementType"),
ANNOUNCEMENT_UPDATED_AT: NewNumericField("announcementUpdatedAt", "announcementUpdatedAt"),
ANNOUNCEMENT_UPDATED_BY: NewKeywordField("announcementUpdatedBy", "announcementUpdatedBy"),
OWNER_USERS: NewKeywordTextField("ownerUsers", "ownerUsers", "ownerUsers.text"),
ADMIN_USERS: NewKeywordField("adminUsers", "adminUsers"),
VIEWER_USERS: NewKeywordField("viewerUsers", "viewerUsers"),
VIEWER_GROUPS: NewKeywordField("viewerGroups", "viewerGroups"),
CONNECTOR_NAME: NewKeywordTextField("connectorName", "connectorName", "connectorName.text"),
CONNECTION_QUALIFIED_NAME: NewKeywordTextField("connectionQualifiedName", "connectionQualifiedName", "connectionQualifiedName.text"),
},
WORKFLOW_TEMPLATE_GUID: NewKeywordField("workflowTemplateGuid", "workflowTemplateGuid"),
WORKFLOW_TYPE: NewKeywordField("workflowType", "workflowType"),
WORKFLOW_CONFIG: NewTextField("workflowConfig", "workflowConfig"),
WORKFLOW_STATUS: NewKeywordField("workflowStatus", "workflowStatus"),
WORKFLOW_RUN_EXPIRES_IN: NewTextField("workflowRunExpiresIn", "workflowRunExpiresIn"),
WORKFLOW_CREATED_BY: NewKeywordField("workflowCreatedBy", "workflowCreatedBy"),
WORKFLOW_UPDATED_BY: NewKeywordField("workflowUpdatedBy", "workflowUpdatedBy"),
WORKFLOW_DELETED_AT: NewNumericField("workflowDeletedAt", "workflowDeletedAt"),
}
}

// NewWorkflowRunFields initializes and returns a WorkflowRunFields struct.
func NewWorkflowRunFields() *WorkflowRunFields {
return &WorkflowRunFields{
AssetFields: AssetFields{
AttributesFields: AttributesFields{
TYPENAME: NewKeywordTextField("typeName", "__typeName.keyword", "__typeName"),
GUID: NewKeywordField("guid", "__guid"),
CREATED_BY: NewKeywordField("createdBy", "__createdBy"),
UPDATED_BY: NewKeywordField("updatedBy", "__modifiedBy"),
STATUS: NewKeywordField("status", "__state"),
ATLAN_TAGS: NewKeywordTextField("classificationNames", "__traitNames", "__classificationsText"),
PROPOGATED_ATLAN_TAGS: NewKeywordTextField("classificationNames", "__propagatedTraitNames", "__classificationsText"),
ASSIGNED_TERMS: NewKeywordTextField("meanings", "__meanings", "__meaningsText"),
SUPERTYPE_NAMES: NewKeywordTextField("typeName", "__superTypeNames.keyword", "__superTypeNames"),
CREATE_TIME: NewNumericField("createTime", "__timestamp"),
UPDATE_TIME: NewNumericField("updateTime", "__modificationTimestamp"),
QUALIFIED_NAME: NewKeywordTextField("qualifiedName", "qualifiedName", "qualifiedName.text"),
},
NAME: NewKeywordTextStemmedField("name", "name.keyword", "name", "name"),
DISPLAY_NAME: NewKeywordTextField("displayName", "displayName.keyword", "displayName"),
DESCRIPTION: NewKeywordTextField("description", "description", "description.text"),
USER_DESCRIPTION: NewKeywordTextField("userDescription", "userDescription", "userDescription.text"),
TENET_ID: NewKeywordField("tenetId", "tenetId"),
CERTIFICATE_STATUS: NewKeywordTextField("certificateStatus", "certificateStatus", "certificateStatus.text"),
CERTIFICATE_STATUS_MESSAGE: NewKeywordField("certificateStatusMessage", "certificateStatusMessage"),
CERTIFICATE_UPDATED_BY: NewNumericField("certificateUpdatedBy", "certificateUpdatedBy"),
ANNOUNCEMENT_TITLE: NewKeywordField("announcementTitle", "announcementTitle"),
ANNOUNCEMENT_MESSAGE: NewKeywordTextField("announcementMessage", "announcementMessage", "announcementMessage.text"),
ANNOUNCEMENT_TYPE: NewKeywordField("announcementType", "announcementType"),
ANNOUNCEMENT_UPDATED_AT: NewNumericField("announcementUpdatedAt", "announcementUpdatedAt"),
ANNOUNCEMENT_UPDATED_BY: NewKeywordField("announcementUpdatedBy", "announcementUpdatedBy"),
OWNER_USERS: NewKeywordTextField("ownerUsers", "ownerUsers", "ownerUsers.text"),
ADMIN_USERS: NewKeywordField("adminUsers", "adminUsers"),
VIEWER_USERS: NewKeywordField("viewerUsers", "viewerUsers"),
VIEWER_GROUPS: NewKeywordField("viewerGroups", "viewerGroups"),
CONNECTOR_NAME: NewKeywordTextField("connectorName", "connectorName", "connectorName.text"),
CONNECTION_QUALIFIED_NAME: NewKeywordTextField("connectionQualifiedName", "connectionQualifiedName", "connectionQualifiedName.text"),
},
WORKFLOW_RUN_WORKFLOW_GUID: NewKeywordField("workflowRunWorkflowGuid", "workflowRunWorkflowGuid"),
WORKFLOW_RUN_TYPE: NewKeywordField("workflowRunType", "workflowRunType"),
WORKFLOW_RUN_ON_ASSET_GUID: NewKeywordField("workflowRunOnAssetGuid", "workflowRunOnAssetGuid"),
WORKFLOW_RUN_COMMENT: NewTextField("workflowRunComment", "workflowRunComment"),
WORKFLOW_RUN_CONFIG: NewTextField("workflowRunConfig", "workflowRunConfig"),
WORKFLOW_RUN_STATUS: NewKeywordField("workflowRunStatus", "workflowRunStatus"),
WORKFLOW_RUN_EXPIRES_AT: NewNumericField("workflowRunExpiresAt", "workflowRunExpiresAt"),
WORKFLOW_RUN_CREATED_BY: NewKeywordField("workflowRunCreatedBy", "workflowRunCreatedBy"),
WORKFLOW_RUN_UPDATED_BY: NewKeywordField("workflowRunUpdatedBy", "workflowRunUpdatedBy"),
WORKFLOW_RUN_DELETED_AT: NewNumericField("workflowRunDeletedAt", "workflowRunDeletedAt"),
}
}

// Methods on assets

// GetbyGuid retrieves an asset by guid
Expand Down
19 changes: 10 additions & 9 deletions atlan/assets/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (

// AtlanClient defines the Atlan API client structure.
type AtlanClient struct {
Session *http.Client
host string
ApiKey string
requestParams map[string]interface{}
logger logger.Logger
RoleClient *RoleClient
GroupClient *GroupClient
UserClient *UserClient
TokenClient *TokenClient
Session *http.Client
host string
ApiKey string
requestParams map[string]interface{}
logger logger.Logger
RoleClient *RoleClient
GroupClient *GroupClient
UserClient *UserClient
TokenClient *TokenClient
WorkflowClient *WorkflowClient
SearchAssets
}

Expand Down
108 changes: 108 additions & 0 deletions atlan/assets/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ const (

// Tokens API
TOKENS_API = "apikeys"

// Workflows API
WORKFLOW_API = "workflows"
WORKFLOW_INDEX_API = "workflows/indexsearch"
WORKFLOW_INDEX_RUN_API = "runs/indexsearch"
SCHEDULE_QUERY_WORKFLOWS_SEARCH_API = "runs/cron/scheduleQueriesBetweenDuration"
SCHEDULE_QUERY_WORKFLOWS_MISSED_API = "runs/cron/missedScheduleQueriesBetweenDuration"
WORKFLOW_OWNER_RERUN_API = "workflows/triggerAsOwner"
WORKFLOW_RERUN_API = "workflows/submit"
WORKFLOW_RUN_API = "workflows?submit=true"
WORKFLOW_SCHEDULE_RUN = "runs"
)

// API defines the structure of an API call.
Expand Down Expand Up @@ -319,6 +330,103 @@ var (
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

// Workflows

SCHEDULE_QUERY_WORKFLOWS_SEARCH = API{
Path: SCHEDULE_QUERY_WORKFLOWS_SEARCH_API,
Method: http.MethodGet,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

SCHEDULE_QUERY_WORKFLOWS_MISSED = API{
Path: SCHEDULE_QUERY_WORKFLOWS_MISSED_API,
Method: http.MethodGet,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_INDEX_SEARCH = API{
Path: WORKFLOW_INDEX_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_INDEX_RUN_SEARCH = API{
Path: WORKFLOW_INDEX_RUN_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

// triggers a workflow using the current user's credentials

WORKFLOW_RERUN = API{
Path: WORKFLOW_RUN_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

// triggers a workflow using the workflow owner's credentials

WORKFLOW_OWNER_RERUN = API{
Path: WORKFLOW_OWNER_RERUN_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_UPDATE = API{
Path: WORKFLOW_API + "/%s",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_ARCHIVE = API{
Path: WORKFLOW_API + "/%s/archive",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

GET_ALL_SCHEDULE_RUNS = API{
Path: WORKFLOW_SCHEDULE_RUN + "/cron",
Method: http.MethodGet,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

GET_SCHEDULE_RUN = API{
Path: WORKFLOW_SCHEDULE_RUN + "/cron/%s",
Method: http.MethodGet,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

STOP_WORKFLOW_RUN = API{
Path: WORKFLOW_SCHEDULE_RUN + "/%s/stop",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_CHANGE_OWNER = API{
Path: WORKFLOW_API + "/%s/changeownership",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_RUN = API{
Path: WORKFLOW_RUN_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}
)

// Constants for the Atlas search DSL
Expand Down
Loading

0 comments on commit 04e8e32

Please sign in to comment.