Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,12 @@ public class DemoWorkflowActivity implements WorkflowActivity {

<!--go-->

### Define workflow activities

Define each workflow activity you'd like your workflow to perform. The Activity input can be unmarshalled from the context with `ctx.GetInput`. Activities should be defined as taking a `ctx workflow.ActivityContext` parameter and returning an interface and error.

```go
func TestActivity(ctx workflow.ActivityContext) (any, error) {
func BusinessActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
Expand All @@ -211,6 +213,87 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) {
}
```

### Define the workflow

Define your workflow function with the parameter `ctx *workflow.WorkflowContext` and return any and error. Invoke your defined activities from within your workflow.

```go
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(BusinessActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
if err := ctx.WaitForExternalEvent("businessEvent", time.Second*60).Await(&output); err != nil {
return nil, err
}

if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
return nil, nil
}
return output, nil
}
```

### Register workflows and activities

Before your application can execute workflows, you must register both the workflow orchestrator and its activities with a workflow registry. This ensures Dapr knows which functions to call when executing your workflow.

```go
func main() {
// Create a workflow registry
r := workflow.NewRegistry()

// Register the workflow orchestrator
if err := r.AddWorkflow(BusinessWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("BusinessWorkflow registered")

// Register the workflow activities
if err := r.AddActivity(BusinessActivity); err != nil {
log.Fatal(err)
}
fmt.Println("BusinessActivity registered")

// Create workflow client and start worker
wclient, err := client.NewWorkflowClient()
if err != nil {
log.Fatal(err)
}
fmt.Println("Worker initialized")

ctx, cancel := context.WithCancel(context.Background())
if err = wclient.StartWorker(ctx, r); err != nil {
log.Fatal(err)
}
fmt.Println("runner started")

// Your application logic continues here...
// Example: Start a workflow
instanceID, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)

// Stop workflow worker when done
cancel()
fmt.Println("workflow worker successfully shutdown")
}
```

**Key points about registration:**
- Use `workflow.NewRegistry()` to create a workflow registry
- Use `r.AddWorkflow()` to register workflow functions
- Use `r.AddActivity()` to register activity functions
- Use `client.NewWorkflowClient()` to create a workflow client
- Call `wclient.StartWorker()` to begin processing workflows
- Use `wclient.ScheduleWorkflow` to schedule a named instance of a workflow

[See the Go SDK workflow activity example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

{{% /tab %}}
Expand Down Expand Up @@ -383,16 +466,16 @@ public class DemoWorkflowWorker {
Define your workflow function with the parameter `ctx *workflow.WorkflowContext` and return any and error. Invoke your defined activities from within your workflow.

```go
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(BusinessActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil {
if err := ctx.WaitForExternalEvent("businessEvent", time.Second*60).Await(&output); err != nil {
return nil, err
}

Expand Down Expand Up @@ -864,7 +947,7 @@ public class DemoWorkflow extends Workflow {
[As in the following example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md), a hello-world application using the Go SDK and Dapr Workflow would include:

- A Go package called `client` to receive the Go SDK client capabilities.
- The `TestWorkflow` method
- The `BusinessWorkflow` method
- Creating the workflow with input and output.
- API calls. In the example below, these calls start and call the workflow activities.

Expand All @@ -889,15 +972,15 @@ var failActivityTries = 0
func main() {
r := workflow.NewRegistry()

if err := r.AddWorkflow(TestWorkflow); err != nil {
if err := r.AddWorkflow(BusinessWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("TestWorkflow registered")
fmt.Println("BusinessWorkflow registered")

if err := r.AddActivity(TestActivity); err != nil {
if err := r.AddActivity(BusinessActivity); err != nil {
log.Fatal(err)
}
fmt.Println("TestActivity registered")
fmt.Println("BusinessActivity registered")

if err := r.AddActivity(FailActivity); err != nil {
log.Fatal(err)
Expand All @@ -921,7 +1004,7 @@ func main() {
// "start". This is useful for increasing the throughput of creating
// workflows.
// workflow.WithStartTime(time.Now())
instanceID, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
instanceID, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
Expand Down Expand Up @@ -963,9 +1046,8 @@ func main() {

fmt.Printf("stage: %d\n", stage)

// Raise Event Test

err = wclient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
// Raise Event
err = wclient.RaiseEvent(ctx, instanceID, "businessEvent", workflow.WithEventPayload("testData"))
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
Expand Down Expand Up @@ -1008,7 +1090,7 @@ func main() {
fmt.Printf("stage: %d\n", stage)

// Terminate workflow test
id, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
id, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
Expand Down Expand Up @@ -1037,22 +1119,22 @@ func main() {
fmt.Println("workflow worker successfully shutdown")
}

func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(BusinessActivity, task.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}

err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
err := ctx.WaitForSingleEvent("businessEvent", time.Second*60).Await(&output)
if err != nil {
return nil, err
}

if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(BusinessActivity, task.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}

Expand All @@ -1068,7 +1150,7 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
return output, nil
}

func TestActivity(ctx workflow.ActivityContext) (any, error) {
func BusinessActivity(ctx task.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,65 @@ Some scenarios where this is useful include:
- Implementation of a workflow spans different programming languages based on team expertise or existing codebases.
- Different team boundaries or microservice ownership.

<img src="/images/workflow-overview/workflow-multi-app-complex.png" width=800 alt="Diagram showing multi-application complex workflow">

The diagram below shows an example scenario of a complex workflow that orchestrates across multiple applications that are written in different languages. Each applications' main steps and activities are:

• **App1: Main Workflow Service** - Top-level orchestrator that coordinates the entire ML pipeline
- Starts the process
- Calls data processing activities on App2
- Calls ML training child workflow on App3
- Calls model deployment on App4
- Ends the complete workflow
- **Language: Java**

• **App2: Data Processing Pipeline** - **GPU activities** only
- Data Ingesting Activity (GPU-accelerated)
- Feature Engineering Activity (GPU-accelerated)
- Returns completion signal to Main Workflow
- **Language: Go**

• **App3: ML Training Child Workflow** - Contains a child workflow and activities
- Child workflow orchestrates:
- Data Processing Activity
- Model Training Activity (GPU-intensive)
- Model Validation Activity
- Triggered by App2's activities completing
- Returns completion signal to Main Workflow
- **Language: Java**

• **App4: Model Serving Service** - **Beefy GPU app** with activities only
- Model Loading Activity (GPU memory intensive)
- Inference Setup Activity (GPU-accelerated inference)
- Triggered by App3's workflow completing
- Returns completion signal to Main Workflow
- **Language: Go**

## Multi-application workflows

Like all building blocks in Dapr, workflow execution routing is based on the [App ID of the hosting Dapr application]({{% ref "security-concept.md#application-identity" %}}).
Workflow execution routing is based on the [App ID of the hosting Dapr application]({{% ref "security-concept.md#application-identity" %}}).
By default, the full workflow execution is hosted on the app ID that started the workflow. This workflow can be executed across any replicas of that app ID, not just the single replica which scheduled the workflow.


It is possible to execute activities or child workflows on different app IDs by specifying the target app ID parameter, inside the workflow execution code.
Upon execution, the target app ID will execute the activity or child workflow, and return the result to the parent workflow of the originating app ID.
It is possible to execute activities and child workflows on different app IDs by specifying the target app ID parameter, inside the workflow execution code.
Upon execution, the target app ID executes the activity or child workflow, and returns the result to the parent workflow of the originating app ID.

The entire Workflow execution may be distributed across multiple app IDs with no limit, with each activity or child workflow specifying the target app ID.
The final history of the workflow will be saved by the app ID that hosts the very parent (or can consider it the root) workflow.

{{% alert title="Restrictions" color="primary" %}}
Like other building blocks and resources in Dapr, workflows are scoped to a single namespace.
Like other API building blocks and resources in Dapr, workflows are scoped to a single namespace.
This means that all app IDs involved in a multi-application workflow must be in the same namespace.
Similarly, all app IDs must use the same actor state store.
Finally, the target app ID must have the activity or child workflow defined, otherwise the parent workflow will retry indefinitely.
Similarly, all app IDs must use the same workflow (or actor) state store.
Finally, the target app ID must have the activity or child workflow defined and registered, otherwise the parent workflow retries indefinitely.
{{% /alert %}}

{{% alert title="Important Limitations" color="warning" %}}
- **SDKs supporting multi-application workflows** - Multi-application workflows are used via the SDKs. Currently Java (activities calling) and Go (both activities and child workflows calling) SDKs are supported. The SDKs (Python, .NET, JavaScript) are planned for future releases.
**SDKs supporting multi-application workflows** - Multi-application workflows are used via the SDKs.
Currently the following are supported:
- **Java** (**only** activity calls)
- **Go** (**both** activities and child workflows calls)
- The Python, .NET, JavaScript SDKs support are planned for future releases
{{% /alert %}}

## Error handling
Expand All @@ -63,7 +100,7 @@ The following example shows how to execute the activity `ActivityA` on the targe
{{% tab "Go" %}}

```go
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var output string
err := ctx.CallActivity("ActivityA",
workflow.WithActivityInput("my-input"),
Expand All @@ -83,12 +120,12 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
{{% tab "Java" %}}

```java
public class CrossAppWorkflow implements Workflow {
public class BusinessWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
String output = ctx.callActivity(
"ActivityA",
ActivityA.class.getName(),
"my-input",
new WorkflowTaskOptions("App2"), // Here we set the target app ID which will execute this activity.
String.class
Expand All @@ -115,7 +152,7 @@ The following example shows how to execute the child workflow `Workflow2` on the
{{% tab "Go" %}}

```go
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var output string
err := ctx.CallChildWorkflow("Workflow2",
workflow.WithChildWorkflowInput("my-input"),
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading