Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c2a41d9
Initial work for workflows DotNET SDK
RyanLettieri Jan 3, 2023
8cd0f0e
Addressing review comments from authoring portion of workflow SDK
RyanLettieri Jan 12, 2023
b4498cc
Merge branch 'master' into master
RyanLettieri Jan 13, 2023
94cea4d
Moving workflows test into E2E directory
RyanLettieri Jan 17, 2023
ba0d6b8
Adding back in workflow options
RyanLettieri Jan 17, 2023
9d2db44
Adding health check and updating itests for workflows
RyanLettieri Jan 17, 2023
3fa3012
Updating go version
RyanLettieri Jan 17, 2023
211224d
Adding in grpc endpoint for workflows test
RyanLettieri Jan 17, 2023
01f257c
Using http endpoint in workflows test
RyanLettieri Jan 17, 2023
6688012
Using http endpoint in workflows test
RyanLettieri Jan 17, 2023
68e1450
Updating itest for testing with local dapr
RyanLettieri Jan 17, 2023
f005729
Updating ref used for testing
RyanLettieri Jan 17, 2023
3ba809f
Addressing more review comments and adding in serialization for input
RyanLettieri Jan 18, 2023
51e4349
Addressing more review comments for workflow management SDK
RyanLettieri Jan 19, 2023
aaf7dcc
Using a record for getworkflow and using UTC time
RyanLettieri Jan 20, 2023
a704c02
Merge branch 'master' into master
RyanLettieri Jan 24, 2023
e49ef1b
Formatting cleanup
RyanLettieri Jan 25, 2023
cff1801
adding in a sleep command to wait for engine to start
RyanLettieri Jan 25, 2023
dc309c3
Merge branch 'master' into master
RyanLettieri Jan 25, 2023
e086a4a
Updating workflows SDK now that built in engine is present
RyanLettieri Jan 25, 2023
bd8f2ea
Testing new changes to workflow engine
RyanLettieri Jan 26, 2023
770f96e
Updating comment
RyanLettieri Jan 26, 2023
989d7c4
Merging in master
RyanLettieri Jan 26, 2023
fba845f
Updating checks for workflows E2E test
RyanLettieri Jan 26, 2023
f713701
adding sleep to wf activity so terminate can be called
RyanLettieri Jan 26, 2023
55856e2
Removing temp changes to itests file
RyanLettieri Jan 27, 2023
b737575
Adding in commit ref from dapr dapr
RyanLettieri Jan 27, 2023
43b2035
Adding in commit ref from dapr dapr
RyanLettieri Jan 27, 2023
aedece7
Blank commit to restart git run
RyanLettieri Jan 27, 2023
654b5e4
Addressing review comments
RyanLettieri Jan 27, 2023
49a0c94
Adding in more verbose error logging
RyanLettieri Jan 27, 2023
09e6bbf
Fixing string interpolation in workflows test
RyanLettieri Jan 30, 2023
938e349
Fixing string interpolation
RyanLettieri Jan 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/itests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ jobs:
install-version: '7.0.x'
env:
NUPKG_OUTDIR: bin/Release/nugets
GOVER: 1.17
GOVER: 1.19
GOOS: linux
GOARCH: amd64
GOPROXY: https://proxy.golang.org
DAPR_CLI_VER: 1.8.0
DAPR_RUNTIME_VER: 1.8.0
DAPR_CLI_VER: 1.9.1
DAPR_RUNTIME_VER: 1.9.5
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh
DAPR_CLI_REF: ''
DAPR_REF: ''
DAPR_REF: '82e097134cdc7494c5c3be71bf80f7904d6db9ea'
steps:
- name: Set up Dapr CLI
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }}
Expand Down
47 changes: 47 additions & 0 deletions src/Dapr.Client/DaprClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,53 @@ public abstract Task<UnlockResponse> Unlock(
string lockOwner,
CancellationToken cancellationToken = default);

/// <summary>
/// Attempt to start the given workflow with response indicating success.
/// </summary>
/// <param name="instanceId">Identifier of the specific run.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="workflowName">Name of the workflow to run.</param>
/// <param name="workflowOptions">The list of options that are potentially needed to start a workflow.</param>
/// <param name="input">The input input for the given workflow.</param>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we've had this conversation, so forgive me. But is there a reason we went with Object instead of just a Generic? Is it just because we don't have a good means of using the generic so we get nothing from the type checking side of the world?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there's no benefit to using a generic here. We don't really care what the type of the input is. We just need to be able to serialize it to JSON.

/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task"/> containing a <see cref="WorkflowReference"/></returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<WorkflowReference> StartWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
Object input,
IReadOnlyDictionary<string, string> workflowOptions = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Attempt to get information about the given workflow.
/// </summary>
/// <param name="instanceId">The unique ID of the target workflow instance.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="workflowName">Name of the workflow to run.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task"/> containing a <see cref="GetWorkflowResponse"/></returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<GetWorkflowResponse> GetWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
CancellationToken cancellationToken = default);

/// <summary>
/// Attempt to get terminate the given workflow.
/// </summary>
/// <param name="instanceId">The unique ID of the target workflow instance.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the terminate operation has been scheduled. If the wrapped value is true the operation suceeded.</returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task TerminateWorkflowAsync(
string instanceId,
string workflowComponent,
CancellationToken cancellationToken = default);

/// <inheritdoc />
public void Dispose()
{
Expand Down
114 changes: 114 additions & 0 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,120 @@ public async override Task<UnlockResponse> Unlock(

#endregion


#region Workflow API
/// <inheritdoc/>
[Obsolete]
public async override Task<WorkflowReference> StartWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
Object input,
IReadOnlyDictionary<string, string> workflowOptions = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId));
ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent));
ArgumentVerifier.ThrowIfNullOrEmpty(workflowName, nameof(workflowName));
ArgumentVerifier.ThrowIfNull(input, nameof(input));

// Serialize json data. Converts input object to bytes and then bytestring inside the request.
byte[] jsonUtf8Bytes = JsonSerializer.SerializeToUtf8Bytes(input);

var request = new Autogenerated.StartWorkflowRequest()
{
InstanceId = instanceId,
WorkflowComponent = workflowComponent,
WorkflowName = workflowName,
Input = ByteString.CopyFrom(jsonUtf8Bytes),
};

if (workflowOptions?.Count > 0)
{
foreach (var item in workflowOptions)
Comment on lines +1493 to +1495
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this if here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need it for the null check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is here to check against the possibility of workflowOptions being Null.

{
request.Options[item.Key] = item.Value;
}
}

try
{
var options = CreateCallOptions(headers: null, cancellationToken);
var response = await client.StartWorkflowAlpha1Async(request, options);
return new WorkflowReference(response.InstanceId);

}
catch (RpcException ex)
{
throw new DaprException("Start Workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}

/// <inheritdoc/>
[Obsolete]
public async override Task<GetWorkflowResponse> GetWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId));
ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent));

var request = new Autogenerated.GetWorkflowRequest()
{
InstanceId = instanceId,
WorkflowComponent = workflowComponent,
WorkflowType = workflowName //TODO: Change 'WorkflowType' to 'WorkflowName' once changes go through dapr/dapr
};

try
{
var options = CreateCallOptions(headers: null, cancellationToken);
var response = await client.GetWorkflowAlpha1Async(request, options);
var dateTimeValue = new DateTime(response.StartTime, DateTimeKind.Utc);
return new GetWorkflowResponse(response.InstanceId, dateTimeValue, response.Metadata);
}
catch (RpcException ex)
{
throw new DaprException("Get workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}

}


/// <inheritdoc/>
[Obsolete]
public async override Task TerminateWorkflowAsync(
string instanceId,
string workflowComponent,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId));
ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent));

var request = new Autogenerated.TerminateWorkflowRequest()
{
InstanceId = instanceId,
WorkflowComponent = workflowComponent
};

var options = CreateCallOptions(headers: null, cancellationToken);

try
{
await client.TerminateWorkflowAlpha1Async(request, options);
}
catch (RpcException ex)
{
throw new DaprException("Terminate workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}

}

#endregion


#region Dapr Sidecar Methods

/// <inheritdoc/>
Expand Down
26 changes: 26 additions & 0 deletions src/Dapr.Client/GetWorkflowResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace Dapr.Client
{
/// <summary>
/// Initializes a new <see cref="GetWorkflowResponse" />.
/// </summary>
/// <param name="instanceId">The instance ID assocated with this response.</param>
/// <param name="startTime">The time at which the workflow started executing.</param>
/// <param name="metadata">The response metadata.</param>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the whitespace needs to be fixed here too.

public record GetWorkflowResponse(string instanceId, DateTime startTime, IReadOnlyDictionary<string, string> metadata);
}
71 changes: 66 additions & 5 deletions src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dapr.proto.runtime.v1;

import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "dapr/proto/common/v1/common.proto";

option csharp_namespace = "Dapr.Client.Autogen.Grpc.v1";
Expand Down Expand Up @@ -110,6 +111,15 @@ service Dapr {
// Sets value in extended metadata of the sidecar
rpc SetMetadata (SetMetadataRequest) returns (google.protobuf.Empty) {}

// Start Workflow
rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (WorkflowReference) {}

// Get Workflow details
rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}

// Terminate Workflow
rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (TerminateWorkflowResponse) {}

// Shutdown the sidecar
rpc Shutdown (google.protobuf.Empty) returns (google.protobuf.Empty) {}
}
Expand Down Expand Up @@ -345,10 +355,10 @@ message InvokeBindingRequest {
bytes data = 2;

// The metadata passing to output binding components
//
//
// Common metadata property:
// - ttlInSeconds : the time to live in seconds for the message.
// If set in the binding definition will cause all messages to
// - ttlInSeconds : the time to live in seconds for the message.
// If set in the binding definition will cause all messages to
// have a default time to live. The message ttl overrides any value
// in the binding definition.
map<string, string> metadata = 3;
Expand Down Expand Up @@ -411,7 +421,7 @@ message TransactionalStateOperation {
// The type of operation to be executed
string operationType = 1;

// State values to be operated on
// State values to be operated on
common.v1.StateItem request = 2;
}

Expand Down Expand Up @@ -504,6 +514,7 @@ message InvokeActorRequest {
string actor_id = 2;
string method = 3;
bytes data = 4;
map<string, string> metadata = 5;
}

// InvokeActorResponse is the method that returns an actor invocation response.
Expand All @@ -517,6 +528,7 @@ message GetMetadataResponse {
repeated ActiveActorsCount active_actors_count = 2;
repeated RegisteredComponents registered_components = 3;
map<string, string> extended_metadata = 4;
repeated PubsubSubscription subscriptions = 5;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there are several unrelated pubsub changes in this PR?

}

message ActiveActorsCount {
Expand All @@ -531,6 +543,23 @@ message RegisteredComponents {
repeated string capabilities = 4;
}

message PubsubSubscription {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yash-nisar - Does this impact your changes in #1009?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@halspang Nope, I don't think we're calling any auto generated code for bulk subscribe but I'll update the proto anyway just to be consistent with what we have in dapr/dapr.

string pubsub_name = 1;
string topic = 2;
map<string,string> metadata = 3;
PubsubSubscriptionRules rules = 4;
string dead_letter_topic = 5;
}

message PubsubSubscriptionRules {
repeated PubsubSubscriptionRule rules = 1;
}

message PubsubSubscriptionRule {
string match = 1;
string path = 2;
}

message SetMetadataRequest {
string key = 1;
string value = 2;
Expand Down Expand Up @@ -644,4 +673,36 @@ message UnlockResponse {
}

Status status = 1;
}
}

message WorkflowReference {
string instance_id = 1;
}

message GetWorkflowRequest {
string instance_id = 1;
string workflow_type = 2;
string workflow_component = 3;
}

message GetWorkflowResponse {
string instance_id = 1;
int64 start_time = 2;
map<string, string> metadata = 3;
}

message StartWorkflowRequest {
string instance_id = 1;
string workflow_component = 2;
string workflow_name = 3;
map<string, string> options = 4;
bytes input = 5;
}

message TerminateWorkflowRequest {
string instance_id = 1;
string workflow_component = 2;
}

message TerminateWorkflowResponse {
}
40 changes: 40 additions & 0 deletions src/Dapr.Client/WorkflowReference.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace Dapr.Client
{
/// <summary>
/// Represents the response from invoking a workflow.
/// </summary>
public sealed class WorkflowReference
{
/// <summary>
/// Initializes a new <see cref="WorkflowReference" />.`
/// </summary>
/// <param name="instanceId">The instance ID assocated with this response.</param>
public WorkflowReference(string instanceId)
{
ArgumentVerifier.ThrowIfNull(instanceId, nameof(instanceId));
this.InstanceId = instanceId;
}

/// <summary>
/// The instance ID assocated with this workflow.
/// </summary>
public string InstanceId { set; get; }

}
}
2 changes: 1 addition & 1 deletion src/Dapr.Workflow/Dapr.Workflow.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<!-- NuGet configuration -->
<PropertyGroup>
<TargetFramework>net6</TargetFramework>
<TargetFrameworks>netcoreapp3.1;net6;net7</TargetFrameworks>
<Nullable>enable</Nullable>
<PackageId>Dapr.Workflow</PackageId>
<Title>Dapr Workflow Authoring SDK</Title>
Expand Down
Loading