Skip to content

Commit 531b412

Browse files
authored
Merge branch 'master' into workflow-example-async
Signed-off-by: Whit Waldo <[email protected]>
2 parents 957076d + 6e2841a commit 531b412

20 files changed

+966
-0
lines changed

.github/workflows/itests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ jobs:
1818
name: run integration tests
1919
runs-on: ubuntu-latest
2020
strategy:
21+
fail-fast: false
2122
matrix:
2223
dotnet-version: ['6.0', '7.0', '8.0']
2324
include:

.github/workflows/sdk_build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ jobs:
4141
name: Test .NET ${{ matrix.dotnet-version }}
4242
runs-on: ubuntu-latest
4343
strategy:
44+
fail-fast: false
4445
matrix:
4546
dotnet-version: ['6.0', '7.0', '8.0']
4647
include:

all.sln

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,13 @@ EndProject
120120
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common.Test", "test\Dapr.Common.Test\Dapr.Common.Test.csproj", "{CDB47863-BEBD-4841-A807-46D868962521}"
121121
EndProject
122122
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowAsyncOperations", "examples\Workflow\WorkflowAsyncOperations\WorkflowAsyncOperations.csproj", "{00359961-0C50-4BB1-A794-8B06DE991639}"
123+
EndProject
124+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}"
125+
EndProject
126+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}"
127+
EndProject
128+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{290D1278-F613-4DF3-9DF5-F37E38CDC363}"
129+
EndProject
123130
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs", "src\Dapr.Jobs\Dapr.Jobs.csproj", "{C8BB6A85-A7EA-40C0-893D-F36F317829B3}"
124131
EndProject
125132
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs.Test", "test\Dapr.Jobs.Test\Dapr.Jobs.Test.csproj", "{BF9828E9-5597-4D42-AA6E-6E6C12214204}"
@@ -316,6 +323,18 @@ Global
316323
{00359961-0C50-4BB1-A794-8B06DE991639}.Debug|Any CPU.Build.0 = Debug|Any CPU
317324
{00359961-0C50-4BB1-A794-8B06DE991639}.Release|Any CPU.ActiveCfg = Release|Any CPU
318325
{00359961-0C50-4BB1-A794-8B06DE991639}.Release|Any CPU.Build.0 = Release|Any CPU
326+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
327+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
328+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
329+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.Build.0 = Release|Any CPU
330+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
331+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU
332+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU
333+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU
334+
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
335+
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.Build.0 = Debug|Any CPU
336+
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.ActiveCfg = Release|Any CPU
337+
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.Build.0 = Release|Any CP
319338
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
320339
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
321340
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -385,6 +404,9 @@ Global
385404
{B445B19C-A925-4873-8CB7-8317898B6970} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
386405
{CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B}
387406
{00359961-0C50-4BB1-A794-8B06DE991639} = {BF3ED6BF-ADF3-4D25-8E89-02FB8D945CA9}
407+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B}
408+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
409+
{290D1278-F613-4DF3-9DF5-F37E38CDC363} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
388410
{C8BB6A85-A7EA-40C0-893D-F36F317829B3} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
389411
{BF9828E9-5597-4D42-AA6E-6E6C12214204} = {DD020B34-460F-455F-8D17-CF4A949F100B}
390412
{D9697361-232F-465D-A136-4561E0E88488} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Text;
2+
using Dapr.Messaging.PublishSubscribe;
3+
using Dapr.Messaging.PublishSubscribe.Extensions;
4+
5+
var builder = WebApplication.CreateBuilder(args);
6+
builder.Services.AddDaprPubSubClient();
7+
var app = builder.Build();
8+
9+
//Process each message returned from the subscription
10+
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
11+
{
12+
try
13+
{
14+
//Do something with the message
15+
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
16+
return Task.FromResult(TopicResponseAction.Success);
17+
}
18+
catch
19+
{
20+
return Task.FromResult(TopicResponseAction.Retry);
21+
}
22+
}
23+
24+
var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();
25+
26+
//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling
27+
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
28+
var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic",
29+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
30+
HandleMessageAsync, cancellationTokenSource.Token);
31+
32+
await Task.Delay(TimeSpan.FromMinutes(1));
33+
34+
//When you're done with the subscription, simply dispose of it
35+
await subscription.DisposeAsync();
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<Project Sdk="Microsoft.NET.Sdk.Web">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net6.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<ProjectReference Include="..\..\..\..\src\Dapr.Messaging\Dapr.Messaging.csproj" />
12+
</ItemGroup>
13+
14+
</Project>
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<Description>This package contains the reference assemblies for developing messaging services using Dapr.</Description>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
<PackageId>Dapr.Messaging</PackageId>
8+
<Title>Dapr Messaging SDK</Title>
9+
<Description>Dapr Messaging SDK for building applications that utilize messaging components.</Description>
10+
<VersionSuffix>alpha</VersionSuffix>
11+
</PropertyGroup>
12+
13+
<ItemGroup>
14+
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
15+
</ItemGroup>
16+
17+
<ItemGroup>
18+
<ProjectReference Include="..\Dapr.Common\Dapr.Common.csproj" />
19+
<ProjectReference Include="..\Dapr.Protos\Dapr.Protos.csproj" />
20+
</ItemGroup>
21+
22+
</Project>
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
namespace Dapr.Messaging.PublishSubscribe;
15+
16+
/// <summary>
17+
/// The base implementation of a Dapr pub/sub client.
18+
/// </summary>
19+
public abstract class DaprPublishSubscribeClient
20+
{
21+
/// <summary>
22+
/// Dynamically subscribes to a Publish/Subscribe component and topic.
23+
/// </summary>
24+
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
25+
/// <param name="topicName">The name of the topic to subscribe to.</param>
26+
/// <param name="options">Configuration options.</param>
27+
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
28+
/// <param name="cancellationToken">Cancellation token.</param>
29+
/// <returns></returns>
30+
public abstract Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default);
31+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using Dapr.Common;
15+
using Microsoft.Extensions.Configuration;
16+
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
17+
18+
namespace Dapr.Messaging.PublishSubscribe;
19+
20+
/// <summary>
21+
/// Builds a <see cref="DaprPublishSubscribeClient"/>.
22+
/// </summary>
23+
public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder<DaprPublishSubscribeClient>
24+
{
25+
/// <summary>
26+
/// Used to initialize a new instance of the <see cref="DaprPublishSubscribeClientBuilder"/>.
27+
/// </summary>
28+
/// <param name="configuration">An optional instance of <see cref="IConfiguration"/>.</param>
29+
public DaprPublishSubscribeClientBuilder(IConfiguration? configuration = null) : base(configuration)
30+
{
31+
}
32+
33+
/// <summary>
34+
/// Builds the client instance from the properties of the builder.
35+
/// </summary>
36+
/// <returns>The Dapr client instance.</returns>
37+
/// <summary>
38+
/// Builds the client instance from the properties of the builder.
39+
/// </summary>
40+
public override DaprPublishSubscribeClient Build()
41+
{
42+
var daprClientDependencies = BuildDaprClientDependencies();
43+
var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel);
44+
45+
return new DaprPublishSubscribeGrpcClient(client);
46+
}
47+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using P = Dapr.Client.Autogen.Grpc.v1.Dapr;
15+
16+
namespace Dapr.Messaging.PublishSubscribe;
17+
18+
/// <summary>
19+
/// A client for interacting with the Dapr endpoints.
20+
/// </summary>
21+
internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient
22+
{
23+
private readonly P.DaprClient daprClient;
24+
25+
/// <summary>
26+
/// Creates a new instance of a <see cref="DaprPublishSubscribeGrpcClient"/>
27+
/// </summary>
28+
public DaprPublishSubscribeGrpcClient(P.DaprClient client)
29+
{
30+
daprClient = client;
31+
}
32+
33+
/// <summary>
34+
/// Dynamically subscribes to a Publish/Subscribe component and topic.
35+
/// </summary>
36+
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
37+
/// <param name="topicName">The name of the topic to subscribe to.</param>
38+
/// <param name="options">Configuration options.</param>
39+
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
40+
/// <param name="cancellationToken">Cancellation token.</param>
41+
/// <returns></returns>
42+
public override async Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default)
43+
{
44+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient);
45+
await receiver.SubscribeAsync(cancellationToken);
46+
return receiver;
47+
}
48+
}
49+
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
namespace Dapr.Messaging.PublishSubscribe;
15+
16+
/// <summary>
17+
/// Options used to configure the dynamic Dapr subscription.
18+
/// </summary>
19+
/// <param name="MessageHandlingPolicy">Describes the policy to take on messages that have not been acknowledged within the timeout period.</param>
20+
public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandlingPolicy)
21+
{
22+
/// <summary>
23+
/// Subscription metadata.
24+
/// </summary>
25+
public IReadOnlyDictionary<string, string> Metadata { get; init; } = new Dictionary<string, string>();
26+
27+
/// <summary>
28+
/// The optional name of the dead-letter topic to send unprocessed messages to.
29+
/// </summary>
30+
public string? DeadLetterTopic { get; init; }
31+
32+
/// <summary>
33+
/// If populated, this reflects the maximum number of messages that can be queued for processing on the replica. By default,
34+
/// no maximum boundary is enforced.
35+
/// </summary>
36+
public int? MaximumQueuedMessages { get; init; }
37+
38+
/// <summary>
39+
/// The maximum amount of time to take to dispose of acknowledgement messages after the cancellation token has
40+
/// been signaled.
41+
/// </summary>
42+
public TimeSpan MaximumCleanupTimeout { get; init; } = TimeSpan.FromSeconds(30);
43+
}
44+

0 commit comments

Comments
 (0)