diff --git a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs index 512d70164..f5d40d382 100644 --- a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs +++ b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs @@ -14,6 +14,8 @@ namespace ControllerSample.Controllers { using System; + using System.Text; + using System.Text.Json; using System.Threading.Tasks; using Dapr; using Dapr.Client; @@ -156,6 +158,36 @@ public async Task> WithdrawV2(TransactionV2 transaction, [ return state.Value; } + /// + /// Method for depositing to account as specified in transaction via a raw message. + /// + /// Transaction info. + /// State client to interact with Dapr runtime. + /// A representing the result of the asynchronous operation. + /// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI. + [Topic("pubsub", "rawDeposit", true)] + [HttpPost("rawDeposit")] + public async Task> RawDeposit([FromBody] JsonDocument rawTransaction, [FromServices] DaprClient daprClient) + { + var transactionString = rawTransaction.RootElement.GetProperty("data_base64").GetString(); + logger.LogInformation($"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}"); + var transactionJson = JsonSerializer.Deserialize(Convert.FromBase64String(transactionString)); + var transaction = JsonSerializer.Deserialize(transactionJson.RootElement.GetProperty("data").GetRawText()); + var state = await daprClient.GetStateEntryAsync(StoreName, transaction.Id); + state.Value ??= new Account() { Id = transaction.Id, }; + logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount); + + if (transaction.Amount < 0m) + { + return BadRequest(new { statusCode = 400, message = "bad request" }); + } + + state.Value.Balance += transaction.Amount; + logger.LogInformation("Balance is {0}", state.Value.Balance); + await state.SaveAsync(); + return state.Value; + } + /// /// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException [HttpPost("throwException")] diff --git a/examples/AspNetCore/ControllerSample/Properties/launchSettings.json b/examples/AspNetCore/ControllerSample/Properties/launchSettings.json index 26dd4495b..a761a55e6 100644 --- a/examples/AspNetCore/ControllerSample/Properties/launchSettings.json +++ b/examples/AspNetCore/ControllerSample/Properties/launchSettings.json @@ -1,12 +1,4 @@ { - "iisSettings": { - "windowsAuthentication": false, - "anonymousAuthentication": true, - "iisExpress": { - "applicationUrl": "http://127.0.0.1:5000/", - "sslPort": 0 - } - }, "profiles": { "IIS Express": { "commandName": "IISExpress", @@ -19,7 +11,6 @@ }, "ControllerSample": { "commandName": "Project", - "launchBrowser": true, "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development", "CUSTOM_PUBSUB": "custom-pubsub", @@ -27,5 +18,13 @@ }, "applicationUrl": "http://localhost:5000/" } + }, + "iisSettings": { + "windowsAuthentication": false, + "anonymousAuthentication": true, + "iisExpress": { + "applicationUrl": "http://localhost:54197/", + "sslPort": 44385 + } } } \ No newline at end of file diff --git a/examples/AspNetCore/ControllerSample/Startup.cs b/examples/AspNetCore/ControllerSample/Startup.cs index d6e9cc222..11b81d8b3 100644 --- a/examples/AspNetCore/ControllerSample/Startup.cs +++ b/examples/AspNetCore/ControllerSample/Startup.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,7 +13,6 @@ namespace ControllerSample { - using System.Text.Json; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; diff --git a/examples/Client/PublishSubscribe/Example.cs b/examples/Client/PublishSubscribe/Example.cs index d2c07bfeb..ffc168c0e 100644 --- a/examples/Client/PublishSubscribe/Example.cs +++ b/examples/Client/PublishSubscribe/Example.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,8 +16,10 @@ namespace Samples.Client { - public abstract class Example + public abstract class Example { + protected static readonly string pubsubName = "pubsub"; + public abstract string DisplayName { get; } public abstract Task RunAsync(CancellationToken cancellationToken); diff --git a/examples/Client/PublishSubscribe/Program.cs b/examples/Client/PublishSubscribe/Program.cs index 9b4ed9d90..3700535a5 100644 --- a/examples/Client/PublishSubscribe/Program.cs +++ b/examples/Client/PublishSubscribe/Program.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ class Program private static readonly Example[] Examples = new Example[] { new PublishEventExample(), + new PublishBytesExample(), }; static async Task Main(string[] args) diff --git a/examples/Client/PublishSubscribe/PublishBytesExample.cs b/examples/Client/PublishSubscribe/PublishBytesExample.cs new file mode 100644 index 000000000..8b2d3e509 --- /dev/null +++ b/examples/Client/PublishSubscribe/PublishBytesExample.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; +using System.Net.Mime; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Dapr.Client; + +namespace Samples.Client +{ + public class PublishBytesExample : Example + { + public override string DisplayName => "Publish Bytes"; + + public async override Task RunAsync(CancellationToken cancellationToken) + { + using var client = new DaprClientBuilder().Build(); + + var transaction = new { Id = "17", Amount = 30m }; + var content = JsonSerializer.SerializeToUtf8Bytes(transaction); + + await client.PublishByteEventAsync(pubsubName, "deposit", content.AsMemory(), MediaTypeNames.Application.Json, new Dictionary { }, cancellationToken); + Console.WriteLine("Published deposit event!"); + } + } +} diff --git a/examples/Client/PublishSubscribe/PublishEventExample.cs b/examples/Client/PublishSubscribe/PublishEventExample.cs index 5215a7eea..2153639e5 100644 --- a/examples/Client/PublishSubscribe/PublishEventExample.cs +++ b/examples/Client/PublishSubscribe/PublishEventExample.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,8 +20,6 @@ namespace Samples.Client { public class PublishEventExample : Example { - private static readonly string pubsubName = "pubsub"; - public override string DisplayName => "Publishing Events"; public override async Task RunAsync(CancellationToken cancellationToken) @@ -32,11 +30,5 @@ public override async Task RunAsync(CancellationToken cancellationToken) await client.PublishEventAsync(pubsubName, "deposit", eventData, cancellationToken); Console.WriteLine("Published deposit event!"); } - - private class Widget - { - public string? Size { get; set; } - public string? Color { get; set; } - } } } diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index 31d397545..60df4255c 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -210,6 +210,24 @@ public abstract Task PublishEventAsync( Dictionary metadata, CancellationToken cancellationToken = default); + /// + /// Publishes an event to the specified topic. + /// + /// The name of the pubsub component to use. + /// The name of the topic the request should be published to. + /// The raw byte payload to inlcude in the message. + /// The content type of the given bytes, defaults to application/json. + /// A collection of metadata key-value pairs that will be provided to the pubsub. The valid metadata keys and values are determined by the type of binding used. + /// A that can be used to cancel the operation. + /// A that will complete when the operation has completed. + public abstract Task PublishByteEventAsync( + string pubsubName, + string topicName, + ReadOnlyMemory data, + string dataContentType = Constants.ContentTypeApplicationJson, + Dictionary metadata = default, + CancellationToken cancellationToken = default); + /// /// Invokes an output binding. /// @@ -308,7 +326,7 @@ public HttpRequestMessage CreateInvokeMethodRequest(string appId, stri /// The data that will be JSON serialized and provided as the request body. /// An for use with SendInvokeMethodRequestAsync. public abstract HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMethod, string appId, string methodName, TRequest data); - + /// /// Perform health-check of Dapr sidecar. Return 'true' if sidecar is healthy. Otherwise 'false'. /// CheckHealthAsync handle and will return 'false' if error will occur on transport level @@ -340,7 +358,7 @@ public HttpRequestMessage CreateInvokeMethodRequest(string appId, stri /// A that can be used to cancel the operation. /// A that will return when the operation has completed. public abstract Task ShutdownSidecarAsync(CancellationToken cancellationToken = default); - + /// /// Calls the sidecar's metadata endpoint which returns information including: /// @@ -679,7 +697,7 @@ public abstract Task InvokeMethodGrpcAsync( /// A that can be used to cancel the operation. /// A that will return the list of values when the operation has completed. public abstract Task> GetBulkStateAsync(string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default); - + /// /// Saves a list of to the Dapr state store. /// diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index e292f0726..50f4f799a 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -17,15 +17,15 @@ namespace Dapr.Client using System.Collections.Generic; using System.Linq; using System.Net.Http; + using System.Net.Http.Json; using System.Text.Json; using System.Threading; using System.Threading.Tasks; - using Grpc.Core; using Google.Protobuf; - using Autogenerated = Dapr.Client.Autogen.Grpc.v1; - using System.Net.Http.Json; using Google.Protobuf.WellKnownTypes; + using Grpc.Core; using Grpc.Net.Client; + using Autogenerated = Dapr.Client.Autogen.Grpc.v1; /// /// A client for interacting with the Dapr endpoints. @@ -122,6 +122,19 @@ public override Task PublishEventAsync( return MakePublishRequest(pubsubName, topicName, null, metadata, null, cancellationToken); } + public override Task PublishByteEventAsync( + string pubsubName, + string topicName, + ReadOnlyMemory data, + string dataContentType = Constants.ContentTypeApplicationJson, + Dictionary metadata = default, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); + ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName)); + return MakePublishRequest(pubsubName, topicName, ByteString.CopyFrom(data.Span), metadata, dataContentType, cancellationToken); + } + private async Task MakePublishRequest( string pubsubName, string topicName, @@ -140,7 +153,7 @@ private async Task MakePublishRequest( { envelope.Data = content; envelope.DataContentType = dataContentType ?? Constants.ContentTypeApplicationJson; - } + } if (metadata != null) { @@ -607,7 +620,7 @@ public override async Task GetStateAsync( throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex); } } - + /// public override async Task SaveBulkStateAsync(string storeName, IReadOnlyList> items, CancellationToken cancellationToken = default) { @@ -647,7 +660,7 @@ public override async Task SaveBulkStateAsync(string storeName, IReadOnl { stateItem.Options = ToAutoGeneratedStateOptions(item.StateOptions); } - + if (item.Value != null) { stateItem.Value = TypeConverters.ToJsonByteString(item.Value, this.jsonSerializerOptions); diff --git a/test/Dapr.Client.Test/PublishEventApiTest.cs b/test/Dapr.Client.Test/PublishEventApiTest.cs index b5a3be815..d8caf63d1 100644 --- a/test/Dapr.Client.Test/PublishEventApiTest.cs +++ b/test/Dapr.Client.Test/PublishEventApiTest.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ namespace Dapr.Client.Test using Dapr.Client.Autogen.Grpc.v1; using FluentAssertions; using Grpc.Core; - using Grpc.Net.Client; using Moq; using Xunit; @@ -223,13 +222,39 @@ public async Task PublishEventAsync_WrapsRpcException() .Setup(m => m.PublishEventAsync(It.IsAny(), It.IsAny())) .Throws(rpcException); - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(async () => { await client.DaprClient.PublishEventAsync("test", "test"); }); Assert.Same(rpcException, ex.InnerException); } + [Fact] + public async Task PublishEventAsync_CanPublishWithRawData() + { + await using var client = TestClient.CreateForDaprClient(); + + var publishData = new PublishData() { PublishObjectParameter = "testparam" }; + var publishBytes = JsonSerializer.SerializeToUtf8Bytes(publishData); + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + await daprClient.PublishByteEventAsync(TestPubsubName, "test", publishBytes.AsMemory()); + }); + + request.Dismiss(); + + var envelope = await request.GetRequestEnvelopeAsync(); + var jsonFromRequest = envelope.Data.ToStringUtf8(); + + envelope.DataContentType.Should().Be("application/json"); + envelope.PubsubName.Should().Be(TestPubsubName); + envelope.Topic.Should().Be("test"); + jsonFromRequest.Should().Be(JsonSerializer.Serialize(publishData)); + // The default serializer forces camel case, so this should be different from our serialization above. + jsonFromRequest.Should().NotBe(JsonSerializer.Serialize(publishBytes, client.InnerClient.JsonSerializerOptions)); + envelope.Metadata.Count.Should().Be(0); + } + private class PublishData { public string PublishObjectParameter { get; set; }