Skip to content

Commit 9ed6b6d

Browse files
authored
Merge branch 'v1.16' into feat-cross-app-wf-docs
Signed-off-by: Albert Callarisa <[email protected]>
2 parents 5988cd5 + 20ae93a commit 9ed6b6d

File tree

26 files changed

+1090
-464
lines changed

26 files changed

+1090
-464
lines changed

daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ Even if the message fails to deliver, or your application crashes, Dapr attempts
120120

121121
All Dapr pub/sub components support the at-least-once guarantee.
122122

123+
### Subscription startup reliability
124+
125+
Dapr automatically retries failed subscription startups to improve reliability during deployment scenarios. This ensures your pub/sub applications remain resilient even when facing temporary connectivity or permission issues.
126+
127+
When Dapr encounters errors starting subscriptions, it shows an error message in the logs and continues to try to start the subscription.
128+
123129
### Consumer groups and competing consumers pattern
124130

125131
Dapr handles the burden of dealing with consumer groups and the competing consumers pattern. In the competing consumers pattern, multiple application instances using a single consumer group compete for the message. Dapr enforces the competing consumer pattern when replicas use the same `app-id` without explicit consumer group overrides.

daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md

Lines changed: 36 additions & 207 deletions
Original file line numberDiff line numberDiff line change
@@ -867,7 +867,8 @@ public class DemoWorkflow extends Workflow {
867867
- The `TestWorkflow` method
868868
- Creating the workflow with input and output.
869869
- API calls. In the example below, these calls start and call the workflow activities.
870-
870+
871+
871872
```go
872873
package main
873874

@@ -877,8 +878,11 @@ import (
877878
"log"
878879
"time"
879880

880-
"github.com/dapr/go-sdk/client"
881-
"github.com/dapr/go-sdk/workflow"
881+
"github.com/dapr/durabletask-go/api"
882+
"github.com/dapr/durabletask-go/backend"
883+
"github.com/dapr/durabletask-go/client"
884+
"github.com/dapr/durabletask-go/task"
885+
dapr "github.com/dapr/go-sdk/client"
882886
)
883887

884888
var stage = 0
@@ -888,110 +892,68 @@ const (
888892
)
889893

890894
func main() {
891-
w, err := workflow.NewWorker()
892-
if err != nil {
893-
log.Fatal(err)
894-
}
895+
registry := task.NewTaskRegistry()
895896

896-
fmt.Println("Worker initialized")
897-
898-
if err := w.RegisterWorkflow(TestWorkflow); err != nil {
897+
if err := registry.AddOrchestrator(TestWorkflow); err != nil {
899898
log.Fatal(err)
900899
}
901900
fmt.Println("TestWorkflow registered")
902901

903-
if err := w.RegisterActivity(TestActivity); err != nil {
902+
if err := registry.AddActivity(TestActivity); err != nil {
904903
log.Fatal(err)
905904
}
906905
fmt.Println("TestActivity registered")
907906

908-
// Start workflow runner
909-
if err := w.Start(); err != nil {
910-
log.Fatal(err)
907+
daprClient, err := dapr.NewClient()
908+
if err != nil {
909+
log.Fatalf("failed to create Dapr client: %v", err)
911910
}
912-
fmt.Println("runner started")
913911

914-
daprClient, err := client.NewClient()
915-
if err != nil {
916-
log.Fatalf("failed to intialise client: %v", err)
912+
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
913+
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
914+
log.Fatalf("failed to start work item listener: %v", err)
917915
}
918-
defer daprClient.Close()
916+
917+
fmt.Println("runner started")
918+
919919
ctx := context.Background()
920920

921921
// Start workflow test
922-
respStart, err := daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
923-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
924-
WorkflowComponent: workflowComponent,
925-
WorkflowName: "TestWorkflow",
926-
Options: nil,
927-
Input: 1,
928-
SendRawInput: false,
929-
})
922+
id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
930923
if err != nil {
931924
log.Fatalf("failed to start workflow: %v", err)
932925
}
933-
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
926+
fmt.Printf("workflow started with id: %v\n", id)
934927

935928
// Pause workflow test
936-
err = daprClient.PauseWorkflow(ctx, &client.PauseWorkflowRequest{
937-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
938-
WorkflowComponent: workflowComponent,
939-
})
940-
929+
err = client.PurgeOrchestrationState(ctx, id)
941930
if err != nil {
942931
log.Fatalf("failed to pause workflow: %v", err)
943932
}
944933

945-
respGet, err := daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
946-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
947-
WorkflowComponent: workflowComponent,
948-
})
934+
respGet, err := client.FetchOrchestrationMetadata(ctx, id)
949935
if err != nil {
950936
log.Fatalf("failed to get workflow: %v", err)
951937
}
952-
953-
if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
954-
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
955-
}
956-
957-
fmt.Printf("workflow paused\n")
938+
fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)
958939

959940
// Resume workflow test
960-
err = daprClient.ResumeWorkflow(ctx, &client.ResumeWorkflowRequest{
961-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
962-
WorkflowComponent: workflowComponent,
963-
})
964-
941+
err = client.ResumeOrchestration(ctx, id, "")
965942
if err != nil {
966943
log.Fatalf("failed to resume workflow: %v", err)
967944
}
945+
fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)
968946

969-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
970-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
971-
WorkflowComponent: workflowComponent,
972-
})
947+
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
973948
if err != nil {
974949
log.Fatalf("failed to get workflow: %v", err)
975950
}
976-
977-
if respGet.RuntimeStatus != workflow.StatusRunning.String() {
978-
log.Fatalf("workflow not running")
979-
}
980-
981-
fmt.Println("workflow resumed")
951+
fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)
982952

983953
fmt.Printf("stage: %d\n", stage)
984954

985955
// Raise Event Test
986-
987-
err = daprClient.RaiseEventWorkflow(ctx, &client.RaiseEventWorkflowRequest{
988-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
989-
WorkflowComponent: workflowComponent,
990-
EventName: "testEvent",
991-
EventData: "testData",
992-
SendRawData: false,
993-
})
994-
956+
err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))
995957
if err != nil {
996958
fmt.Printf("failed to raise event: %v", err)
997959
}
@@ -1002,177 +964,44 @@ func main() {
1002964

1003965
fmt.Printf("stage: %d\n", stage)
1004966

1005-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
1006-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1007-
WorkflowComponent: workflowComponent,
1008-
})
967+
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
1009968
if err != nil {
1010969
log.Fatalf("failed to get workflow: %v", err)
1011970
}
1012971

1013972
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
1014973

1015974
// Purge workflow test
1016-
err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
1017-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1018-
WorkflowComponent: workflowComponent,
1019-
})
975+
err = client.PurgeOrchestrationState(ctx, id)
1020976
if err != nil {
1021977
log.Fatalf("failed to purge workflow: %v", err)
1022978
}
1023-
1024-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
1025-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1026-
WorkflowComponent: workflowComponent,
1027-
})
1028-
if err != nil && respGet != nil {
1029-
log.Fatal("failed to purge workflow")
1030-
}
1031-
1032979
fmt.Println("workflow purged")
1033-
1034-
fmt.Printf("stage: %d\n", stage)
1035-
1036-
// Terminate workflow test
1037-
respStart, err = daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
1038-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1039-
WorkflowComponent: workflowComponent,
1040-
WorkflowName: "TestWorkflow",
1041-
Options: nil,
1042-
Input: 1,
1043-
SendRawInput: false,
1044-
})
1045-
if err != nil {
1046-
log.Fatalf("failed to start workflow: %v", err)
1047-
}
1048-
1049-
fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
1050-
1051-
err = daprClient.TerminateWorkflow(ctx, &client.TerminateWorkflowRequest{
1052-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1053-
WorkflowComponent: workflowComponent,
1054-
})
1055-
if err != nil {
1056-
log.Fatalf("failed to terminate workflow: %v", err)
1057-
}
1058-
1059-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
1060-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1061-
WorkflowComponent: workflowComponent,
1062-
})
1063-
if err != nil {
1064-
log.Fatalf("failed to get workflow: %v", err)
1065-
}
1066-
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
1067-
log.Fatal("failed to terminate workflow")
1068-
}
1069-
1070-
fmt.Println("workflow terminated")
1071-
1072-
err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
1073-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1074-
WorkflowComponent: workflowComponent,
1075-
})
1076-
1077-
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
1078-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
1079-
WorkflowComponent: workflowComponent,
1080-
})
1081-
if err == nil || respGet != nil {
1082-
log.Fatalf("failed to purge workflow: %v", err)
1083-
}
1084-
1085-
fmt.Println("workflow purged")
1086-
1087-
stage = 0
1088-
fmt.Println("workflow client test")
1089-
1090-
wfClient, err := workflow.NewClient()
1091-
if err != nil {
1092-
log.Fatalf("[wfclient] faield to initialize: %v", err)
1093-
}
1094-
1095-
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
1096-
if err != nil {
1097-
log.Fatalf("[wfclient] failed to start workflow: %v", err)
1098-
}
1099-
1100-
fmt.Printf("[wfclient] started workflow with id: %s\n", id)
1101-
1102-
metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
1103-
if err != nil {
1104-
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
1105-
}
1106-
1107-
fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())
1108-
1109-
if stage != 1 {
1110-
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
1111-
}
1112-
1113-
fmt.Printf("[wfclient] stage: %d\n", stage)
1114-
1115-
// raise event
1116-
1117-
if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
1118-
log.Fatalf("[wfclient] failed to raise event: %v", err)
1119-
}
1120-
1121-
fmt.Println("[wfclient] event raised")
1122-
1123-
// Sleep to allow the workflow to advance
1124-
time.Sleep(time.Second)
1125-
1126-
if stage != 2 {
1127-
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
1128-
}
1129-
1130-
fmt.Printf("[wfclient] stage: %d\n", stage)
1131-
1132-
// stop workflow
1133-
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
1134-
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
1135-
}
1136-
1137-
fmt.Println("[wfclient] workflow terminated")
1138-
1139-
if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
1140-
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
1141-
}
1142-
1143-
fmt.Println("[wfclient] workflow purged")
1144-
1145-
// stop workflow runtime
1146-
if err := w.Shutdown(); err != nil {
1147-
log.Fatalf("failed to shutdown runtime: %v", err)
1148-
}
1149-
1150-
fmt.Println("workflow worker successfully shutdown")
1151980
}
1152981

1153-
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
982+
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
1154983
var input int
1155984
if err := ctx.GetInput(&input); err != nil {
1156985
return nil, err
1157986
}
1158987
var output string
1159-
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
988+
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
1160989
return nil, err
1161990
}
1162991

1163-
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
992+
err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
1164993
if err != nil {
1165994
return nil, err
1166995
}
1167996

1168-
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
997+
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
1169998
return nil, err
1170999
}
11711000

11721001
return output, nil
11731002
}
11741003

1175-
func TestActivity(ctx workflow.ActivityContext) (any, error) {
1004+
func TestActivity(ctx task.ActivityContext) (any, error) {
11761005
var input int
11771006
if err := ctx.GetInput(&input); err != nil {
11781007
return "", err

0 commit comments

Comments
 (0)