Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
namespace ControllerSample.Controllers
{
using System;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
Expand Down Expand Up @@ -156,6 +158,36 @@ public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [
return state.Value;
}

/// <summary>
/// Method for depositing to account as specified in transaction via a raw message.
/// </summary>
/// <param name="transaction">Transaction info.</param>
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "rawDeposit", true)]
[HttpPost("rawDeposit")]
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction, [FromServices] DaprClient daprClient)
{
var transactionString = rawTransaction.RootElement.GetProperty("data_base64").GetString();
logger.LogInformation($"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
var transactionJson = JsonSerializer.Deserialize<JsonDocument>(Convert.FromBase64String(transactionString));
var transaction = JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount);

if (transaction.Amount < 0m)
{
return BadRequest(new { statusCode = 400, message = "bad request" });
}

state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
return state.Value;
}

/// <summary>
/// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException
[HttpPost("throwException")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://127.0.0.1:5000/",
"sslPort": 0
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
Expand All @@ -19,13 +11,20 @@
},
"ControllerSample": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"CUSTOM_PUBSUB": "custom-pubsub",
"CUSTOM_TOPIC": "custom-topic"
},
"applicationUrl": "http://localhost:5000/"
}
},
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:54197/",
"sslPort": 44385
}
}
}
3 changes: 1 addition & 2 deletions examples/AspNetCore/ControllerSample/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,6 @@

namespace ControllerSample
{
using System.Text.Json;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
Expand Down
6 changes: 4 additions & 2 deletions examples/Client/PublishSubscribe/Example.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,10 @@

namespace Samples.Client
{
public abstract class Example
public abstract class Example
{
protected static readonly string pubsubName = "pubsub";

public abstract string DisplayName { get; }

public abstract Task RunAsync(CancellationToken cancellationToken);
Expand Down
3 changes: 2 additions & 1 deletion examples/Client/PublishSubscribe/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ class Program
private static readonly Example[] Examples = new Example[]
{
new PublishEventExample(),
new PublishBytesExample(),
};

static async Task<int> Main(string[] args)
Expand Down
26 changes: 26 additions & 0 deletions examples/Client/PublishSubscribe/PublishBytesExample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Net.Mime;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client;

namespace Samples.Client
{
public class PublishBytesExample : Example
{
public override string DisplayName => "Publish Bytes";

public async override Task RunAsync(CancellationToken cancellationToken)
{
using var client = new DaprClientBuilder().Build();

var transaction = new { Id = "17", Amount = 30m };
var content = JsonSerializer.SerializeToUtf8Bytes(transaction);

await client.PublishByteEventAsync(pubsubName, "deposit", content.AsMemory(), MediaTypeNames.Application.Json, new Dictionary<string, string> { }, cancellationToken);
Console.WriteLine("Published deposit event!");
}
}
}
10 changes: 1 addition & 9 deletions examples/Client/PublishSubscribe/PublishEventExample.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,6 @@ namespace Samples.Client
{
public class PublishEventExample : Example
{
private static readonly string pubsubName = "pubsub";

public override string DisplayName => "Publishing Events";

public override async Task RunAsync(CancellationToken cancellationToken)
Expand All @@ -32,11 +30,5 @@ public override async Task RunAsync(CancellationToken cancellationToken)
await client.PublishEventAsync(pubsubName, "deposit", eventData, cancellationToken);
Console.WriteLine("Published deposit event!");
}

private class Widget
{
public string? Size { get; set; }
public string? Color { get; set; }
}
}
}
24 changes: 21 additions & 3 deletions src/Dapr.Client/DaprClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,24 @@ public abstract Task PublishEventAsync(
Dictionary<string, string> metadata,
CancellationToken cancellationToken = default);

/// <summary>
/// Publishes an event to the specified topic.
/// </summary>
/// <param name="pubsubName">The name of the pubsub component to use.</param>
/// <param name="topicName">The name of the topic the request should be published to.</param>
/// <param name="data">The raw byte payload to inlcude in the message.</param>
/// <param name="dataContentType">The content type of the given bytes, defaults to application/json.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the pubsub. The valid metadata keys and values are determined by the type of binding used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task PublishByteEventAsync(
string pubsubName,
string topicName,
ReadOnlyMemory<byte> data,
string dataContentType = Constants.ContentTypeApplicationJson,
Dictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Invokes an output binding.
/// </summary>
Expand Down Expand Up @@ -308,7 +326,7 @@ public HttpRequestMessage CreateInvokeMethodRequest<TRequest>(string appId, stri
/// <param name="data">The data that will be JSON serialized and provided as the request body.</param>
/// <returns>An <see cref="HttpRequestMessage" /> for use with <c>SendInvokeMethodRequestAsync</c>.</returns>
public abstract HttpRequestMessage CreateInvokeMethodRequest<TRequest>(HttpMethod httpMethod, string appId, string methodName, TRequest data);

/// <summary>
/// Perform health-check of Dapr sidecar. Return 'true' if sidecar is healthy. Otherwise 'false'.
/// CheckHealthAsync handle <see cref="HttpRequestException"/> and will return 'false' if error will occur on transport level
Expand Down Expand Up @@ -340,7 +358,7 @@ public HttpRequestMessage CreateInvokeMethodRequest<TRequest>(string appId, stri
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will return when the operation has completed.</returns>
public abstract Task ShutdownSidecarAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Calls the sidecar's metadata endpoint which returns information including:
/// <list type="bullet">
Expand Down Expand Up @@ -679,7 +697,7 @@ public abstract Task<TResponse> InvokeMethodGrpcAsync<TRequest, TResponse>(
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{IReadOnlyList}" /> that will return the list of values when the operation has completed.</returns>
public abstract Task<IReadOnlyList<BulkStateItem>> GetBulkStateAsync(string storeName, IReadOnlyList<string> keys, int? parallelism, IReadOnlyDictionary<string, string> metadata = default, CancellationToken cancellationToken = default);

/// <summary>
/// Saves a list of <paramref name="items" /> to the Dapr state store.
/// </summary>
Expand Down
25 changes: 19 additions & 6 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ namespace Dapr.Client
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Json;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Google.Protobuf;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
using System.Net.Http.Json;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;

/// <summary>
/// A client for interacting with the Dapr endpoints.
Expand Down Expand Up @@ -122,6 +122,19 @@ public override Task PublishEventAsync(
return MakePublishRequest(pubsubName, topicName, null, metadata, null, cancellationToken);
}

public override Task PublishByteEventAsync(
string pubsubName,
string topicName,
ReadOnlyMemory<byte> data,
string dataContentType = Constants.ContentTypeApplicationJson,
Dictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
return MakePublishRequest(pubsubName, topicName, ByteString.CopyFrom(data.Span), metadata, dataContentType, cancellationToken);
}

private async Task MakePublishRequest(
string pubsubName,
string topicName,
Expand All @@ -140,7 +153,7 @@ private async Task MakePublishRequest(
{
envelope.Data = content;
envelope.DataContentType = dataContentType ?? Constants.ContentTypeApplicationJson;
}
}

if (metadata != null)
{
Expand Down Expand Up @@ -607,7 +620,7 @@ public override async Task<TValue> GetStateAsync<TValue>(
throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex);
}
}

/// <inheritdoc />
public override async Task SaveBulkStateAsync<TValue>(string storeName, IReadOnlyList<SaveStateItem<TValue>> items, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -647,7 +660,7 @@ public override async Task SaveBulkStateAsync<TValue>(string storeName, IReadOnl
{
stateItem.Options = ToAutoGeneratedStateOptions(item.StateOptions);
}

if (item.Value != null)
{
stateItem.Value = TypeConverters.ToJsonByteString(item.Value, this.jsonSerializerOptions);
Expand Down
31 changes: 28 additions & 3 deletions test/Dapr.Client.Test/PublishEventApiTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@ namespace Dapr.Client.Test
using Dapr.Client.Autogen.Grpc.v1;
using FluentAssertions;
using Grpc.Core;
using Grpc.Net.Client;
using Moq;
using Xunit;

Expand Down Expand Up @@ -223,13 +222,39 @@ public async Task PublishEventAsync_WrapsRpcException()
.Setup(m => m.PublishEventAsync(It.IsAny<Autogen.Grpc.v1.PublishEventRequest>(), It.IsAny<CallOptions>()))
.Throws(rpcException);

var ex = await Assert.ThrowsAsync<DaprException>(async () =>
var ex = await Assert.ThrowsAsync<DaprException>(async () =>
{
await client.DaprClient.PublishEventAsync("test", "test");
});
Assert.Same(rpcException, ex.InnerException);
}

[Fact]
public async Task PublishEventAsync_CanPublishWithRawData()
{
await using var client = TestClient.CreateForDaprClient();

var publishData = new PublishData() { PublishObjectParameter = "testparam" };
var publishBytes = JsonSerializer.SerializeToUtf8Bytes(publishData);
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
await daprClient.PublishByteEventAsync(TestPubsubName, "test", publishBytes.AsMemory());
});

request.Dismiss();

var envelope = await request.GetRequestEnvelopeAsync<PublishEventRequest>();
var jsonFromRequest = envelope.Data.ToStringUtf8();

envelope.DataContentType.Should().Be("application/json");
envelope.PubsubName.Should().Be(TestPubsubName);
envelope.Topic.Should().Be("test");
jsonFromRequest.Should().Be(JsonSerializer.Serialize(publishData));
// The default serializer forces camel case, so this should be different from our serialization above.
jsonFromRequest.Should().NotBe(JsonSerializer.Serialize(publishBytes, client.InnerClient.JsonSerializerOptions));
envelope.Metadata.Count.Should().Be(0);
}

private class PublishData
{
public string PublishObjectParameter { get; set; }
Expand Down