Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ linters:
- shadow
gocyclo:
min-complexity: 15
revive:
rules:
- name: var-naming
disabled: true
- name: exported
disabled: true

exclusions:
paths:
Expand All @@ -44,4 +50,4 @@ run:

issues:
max-issues-per-linter: 0
max-same-issues: 0
max-same-issues: 0
99 changes: 87 additions & 12 deletions docs/workflow_sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,49 @@ workflowInput := map[string]interface{}{}
workflowRun, err := executor.ExecuteWorkflow(&model.StartWorkflowRequest{Name: wf.GetName(), Version: &version, Input: workflowInput}, "")
//workfowRun is a struct that contains the output of the workflow execution
type WorkflowRun struct {
CorrelationId string `json:"correlationId,omitempty"`
CreateTime int64 `json:"createTime,omitempty"`
CreatedBy string `json:"createdBy,omitempty"`
Input map[string]interface{} `json:"input,omitempty"`
Output map[string]interface{} `json:"output,omitempty"`
Priority int32 `json:"priority,omitempty"`
RequestId string `json:"requestId,omitempty"`
Status string `json:"status,omitempty"`
Tasks []Task `json:"tasks,omitempty"`
UpdateTime int64 `json:"updateTime,omitempty"`
Variables map[string]interface{} `json:"variables,omitempty"`
WorkflowId string `json:"workflowId,omitempty"`
CorrelationId string `json:"correlationId,omitempty"`
CreateTime int64 `json:"createTime,omitempty"`
CreatedBy string `json:"createdBy,omitempty"`
Input map[string]interface{} `json:"input,omitempty"`
Output map[string]interface{} `json:"output,omitempty"`
Priority int32 `json:"priority,omitempty"`
RequestId string `json:"requestId,omitempty"`
ResponseType ReturnStrategy `json:"responseType,omitempty"`
Status WorkflowStatus `json:"status,omitempty"`
TargetWorkflowId string `json:"targetWorkflowId,omitempty"`
TargetWorkflowStatus WorkflowStatus `json:"targetWorkflowStatus,omitempty"`
Tasks []Task `json:"tasks,omitempty"`
UpdateTime int64 `json:"updateTime,omitempty"`
Variables map[string]interface{} `json:"variables,omitempty"`
WorkflowId string `json:"workflowId,omitempty"`
}
```

#### WorkflowRun Helper Methods

`WorkflowRun` provides helper methods to check workflow status and manage tasks:

**Status Check Methods:**
```go
// Check workflow status
workflowRun.IsRunning() // Returns true if status is RUNNING
workflowRun.IsPaused() // Returns true if status is PAUSED
workflowRun.IsFailed() // Returns true if status is FAILED
workflowRun.IsCompleted() // Returns true if status is COMPLETED
workflowRun.IsTimedOut() // Returns true if status is TIMED_OUT
workflowRun.IsTerminated() // Returns true if status is TERMINATED
```

**Task Management Methods:**
```go
// Get tasks by status
failedTasks := workflowRun.GetFailedTasks() // Returns tasks with FAILED or FAILED_WITH_TERMINAL_ERROR status
completedTasks := workflowRun.GetCompletedTasks() // Returns tasks with COMPLETED status
tasks := workflowRun.GetTasksByStatus(model.FailedTask, model.CompletedTask) // Returns tasks with specified statuses

// Get specific task
task := workflowRun.GetTaskByReferenceName("task_ref_name") // Returns task by reference name, nil if not found
```
**Note:** Synchronous workflow execution is useful for workflows that complete in few seconds at max. For longer running workflows use `StartWorkflow` and use the Id of the workflow to monitor the output.

#### Using struct instance as workflow input
Expand All @@ -82,7 +111,53 @@ workflowId, err := executor.StartWorkflow(&model.StartWorkflowRequest{
Address: []string{"street", "city", "zip"},
},
})

// Get workflow execution details
workflow, err := executor.GetWorkflowWithContext(context.Background(), workflowId, true)
if err != nil {
// Handle error
}
```

**Monitor Workflow Execution:**
```go
// Start monitoring workflow execution
executionChannel, err := executor.MonitorExecution(workflowId)
if err != nil {
// Handle error
}

// Wait for completion
workflow, err := executor.WaitForWorkflowCompletionUntilTimeout(executionChannel, 60*time.Second)
```

#### Workflow Helper Methods

`Workflow` provides additional helper methods for status and task management:

**Status Check Methods:**
```go
workflow.IsRunning() // Returns true if status is RUNNING
workflow.IsPaused() // Returns true if status is PAUSED
workflow.IsFailed() // Returns true if status is FAILED
workflow.IsCompleted() // Returns true if status is COMPLETED
workflow.IsTimedOut() // Returns true if status is TIMED_OUT
workflow.IsTerminated() // Returns true if status is TERMINATED
```

**Task Management Methods:**
```go
// Get tasks by status
inProgressTasks := workflow.GetInProgressTasks() // Returns tasks with IN_PROGRESS status
failedTasks := workflow.GetFailedTasks() // Returns tasks with FAILED or FAILED_WITH_TERMINAL_ERROR status
completedTasks := workflow.GetCompletedTasks() // Returns tasks with COMPLETED status
scheduledTasks := workflow.GetScheduledTasks() // Returns tasks with SCHEDULED status
tasks := workflow.GetTasksByStatus(model.FailedTask, model.CompletedTask) // Returns tasks with specified statuses

// Get specific task
task := workflow.GetTaskByReferenceName("task_ref_name") // Returns task by reference name, nil if not found
```

### Workflow Management APIs
Take a look at the [API Docs](https://pkg.go.dev/github.com/conductor-sdk/conductor-go/sdk/workflow/executor) fore more details on how to start, pause, resume, terminate, search and get workflow execution status.

Expand Down
19 changes: 0 additions & 19 deletions sdk/model/model_run.go

This file was deleted.

4 changes: 4 additions & 0 deletions sdk/model/task_result_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ const (
CompletedTask TaskResultStatus = "COMPLETED"
ScheduledTask TaskResultStatus = "SCHEDULED"
)

func (t TaskResultStatus) String() string {
return string(t)
}
84 changes: 84 additions & 0 deletions sdk/model/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,87 @@ type Workflow struct {
WorkflowName string `json:"workflowName,omitempty"`
WorkflowVersion int32 `json:"workflowVersion,omitempty"`
}

// IsRunning returns true if the workflow is currently running.
// Status is RUNNING
func (w *Workflow) IsRunning() bool {
return w.Status == RunningWorkflow
}

// IsPaused returns true if the workflow is currently paused.
// Status is PAUSED
func (w *Workflow) IsPaused() bool {
return w.Status == PausedWorkflow
}

// IsFailed returns true if the workflow has failed.
// Status is FAILED
func (w *Workflow) IsFailed() bool {
return w.Status == FailedWorkflow
}

// IsCompleted returns true if the workflow has completed successfully.
// Status is COMPLETED
func (w *Workflow) IsCompleted() bool {
return w.Status == CompletedWorkflow
}

// IsTimedOut returns true if the workflow has timed out.
// Status is TIMED_OUT
func (w *Workflow) IsTimedOut() bool {
return w.Status == TimedOutWorkflow
}

// IsTerminated returns true if the workflow has been terminated.
// Status is TERMINATED
func (w *Workflow) IsTerminated() bool {
return w.Status == TerminatedWorkflow
}

// GetInProgressTasks returns all tasks that are currently in progress.
// Status is IN_PROGRESS
func (w *Workflow) GetInProgressTasks() []Task {
return w.GetTasksByStatus(InProgressTask)
}

// GetFailedTasks returns all tasks that have failed.
// Status is FAILED or FAILED_WITH_TERMINAL_ERROR
func (w *Workflow) GetFailedTasks() []Task {
return w.GetTasksByStatus(FailedTask, FailedWithTerminalErrorTask)
}

// GetCompletedTasks returns all tasks that have completed successfully.
// Status is COMPLETED
func (w *Workflow) GetCompletedTasks() []Task {
return w.GetTasksByStatus(CompletedTask)
}

// GetScheduledTasks returns all tasks that are scheduled but not yet started.
// Status is SCHEDULED
func (w *Workflow) GetScheduledTasks() []Task {
return w.GetTasksByStatus(ScheduledTask)
}

// GetTasksByStatus returns all tasks with the specified status(es).
func (w *Workflow) GetTasksByStatus(statuses ...TaskResultStatus) []Task {
var filteredTasks []Task
for _, task := range w.Tasks {
for _, status := range statuses {
if task.Status == status {
filteredTasks = append(filteredTasks, task)
break
}
}
}
return filteredTasks
}

// GetTaskByReferenceName returns the task with the specified reference name.
func (w *Workflow) GetTaskByReferenceName(referenceTaskName string) *Task {
for _, task := range w.Tasks {
if task.ReferenceTaskName == referenceTaskName {
return &task
}
}
return nil
}
91 changes: 91 additions & 0 deletions sdk/model/workflow_run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package model

type WorkflowRun struct {
CorrelationId string `json:"correlationId,omitempty"`
CreateTime int64 `json:"createTime,omitempty"`
CreatedBy string `json:"createdBy,omitempty"`
Input map[string]interface{} `json:"input,omitempty"`
Output map[string]interface{} `json:"output,omitempty"`
Priority int32 `json:"priority,omitempty"`
RequestId string `json:"requestId,omitempty"`
ResponseType ReturnStrategy `json:"responseType,omitempty"`
Status WorkflowStatus `json:"status,omitempty"`
TargetWorkflowId string `json:"targetWorkflowId,omitempty"`
TargetWorkflowStatus WorkflowStatus `json:"targetWorkflowStatus,omitempty"`
Tasks []Task `json:"tasks,omitempty"`
UpdateTime int64 `json:"updateTime,omitempty"`
Variables map[string]interface{} `json:"variables,omitempty"`
WorkflowId string `json:"workflowId,omitempty"`
}

// IsRunning returns true if the workflow is currently running.
// Status is RUNNING
func (w *WorkflowRun) IsRunning() bool {
return w.Status == RunningWorkflow
}

// IsPaused returns true if the workflow is currently paused.
// Status is PAUSED
func (w *WorkflowRun) IsPaused() bool {
return w.Status == PausedWorkflow
}

// IsFailed returns true if the workflow has failed.
// Status is FAILED
func (w *WorkflowRun) IsFailed() bool {
return w.Status == FailedWorkflow
}

// IsCompleted returns true if the workflow has completed successfully.
// Status is COMPLETED
func (w *WorkflowRun) IsCompleted() bool {
return w.Status == CompletedWorkflow
}

// IsTimedOut returns true if the workflow has timed out.
// Status is TIMED_OUT
func (w *WorkflowRun) IsTimedOut() bool {
return w.Status == TimedOutWorkflow
}

// IsTerminated returns true if the workflow has been terminated.
// Status is TERMINATED
func (w *WorkflowRun) IsTerminated() bool {
return w.Status == TerminatedWorkflow
}

// GetFailedTasks returns all tasks that have failed.
// Status is FAILED or FAILED_WITH_TERMINAL_ERROR
func (w *WorkflowRun) GetFailedTasks() []Task {
return w.GetTasksByStatus(FailedTask, FailedWithTerminalErrorTask)
}

// GetCompletedTasks returns all tasks that have completed successfully.
// Status is COMPLETED
func (w *WorkflowRun) GetCompletedTasks() []Task {
return w.GetTasksByStatus(CompletedTask)
}

// GetTasksByStatus returns all tasks with the specified status(es).
func (w *WorkflowRun) GetTasksByStatus(statuses ...TaskResultStatus) []Task {
var filteredTasks []Task
for _, task := range w.Tasks {
for _, status := range statuses {
if task.Status == status {
filteredTasks = append(filteredTasks, task)
break
}
}
}
return filteredTasks
}

// GetTaskByReferenceName returns the task with the specified reference name.
func (w *WorkflowRun) GetTaskByReferenceName(referenceTaskName string) *Task {
for _, task := range w.Tasks {
if task.ReferenceTaskName == referenceTaskName {
return &task
}
}
return nil
}
Loading