diff --git a/extensions/Worker.Extensions.EventHubs/release_notes.md b/extensions/Worker.Extensions.EventHubs/release_notes.md index 087ebc6d0..99426b460 100644 --- a/extensions/Worker.Extensions.EventHubs/release_notes.md +++ b/extensions/Worker.Extensions.EventHubs/release_notes.md @@ -4,6 +4,6 @@ - My change description (#PR/#issue) --> -### Microsoft.Azure.Functions.Worker.Extensions.EventHubs +### Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.4.0-preview1 -- Add `BindingCapabilities` attribute to EventHubTrigger to express function-level retry capabilities. (#1457) \ No newline at end of file +- Add support for binding to `EventData`(#1609) diff --git a/extensions/Worker.Extensions.EventHubs/src/Constants.cs b/extensions/Worker.Extensions.EventHubs/src/Constants.cs new file mode 100644 index 000000000..070066865 --- /dev/null +++ b/extensions/Worker.Extensions.EventHubs/src/Constants.cs @@ -0,0 +1,12 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.Functions.Worker.Extensions.EventHubs +{ + internal static class Constants + { + internal const string BinaryContentType = "application/octet-stream"; + + internal const string BindingSource = "AzureEventHubsEventData"; + } +} \ No newline at end of file diff --git a/extensions/Worker.Extensions.EventHubs/src/EventDataConverter.cs b/extensions/Worker.Extensions.EventHubs/src/EventDataConverter.cs new file mode 100644 index 000000000..71cb3e704 --- /dev/null +++ b/extensions/Worker.Extensions.EventHubs/src/EventDataConverter.cs @@ -0,0 +1,58 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Linq; +using System.Threading.Tasks; +using Azure.Core.Amqp; +using Microsoft.Azure.Functions.Worker.Converters; +using Microsoft.Azure.Functions.Worker.Core; +using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; +using Azure.Messaging.EventHubs; +using Microsoft.Azure.Functions.Worker.Extensions.EventHubs; + +namespace Microsoft.Azure.Functions.Worker +{ + [SupportsDeferredBinding] + [SupportedConverterType(typeof(EventData))] + [SupportedConverterType(typeof(EventData[]))] + internal class EventDataConverter : IInputConverter + { + public ValueTask ConvertAsync(ConverterContext context) + { + try + { + ConversionResult result = context?.Source switch + { + ModelBindingData binding => ConversionResult.Success(ConvertToEventData(binding)), + // Only array collections are currently supported, which matches the behavior of the in-proc extension. + CollectionModelBindingData collection => ConversionResult.Success(collection.ModelBindingDataArray + .Select(ConvertToEventData).ToArray()), + _ => ConversionResult.Unhandled() + }; + return new ValueTask(result); + } + catch (Exception exception) + { + return new ValueTask(ConversionResult.Failed(exception)); + } + } + + private EventData ConvertToEventData(ModelBindingData binding) + { + if (binding?.Source is not Constants.BindingSource) + { + throw new InvalidOperationException( + $"Unexpected binding source. Only '{Constants.BindingSource}' is supported."); + } + + if (binding.ContentType != Constants.BinaryContentType) + { + throw new InvalidOperationException( + $"Unexpected content-type. Only '{Constants.BinaryContentType}' is supported."); + } + + return new EventData(AmqpAnnotatedMessage.FromBytes(binding.Content)); + } + } +} \ No newline at end of file diff --git a/extensions/Worker.Extensions.EventHubs/src/EventHubTriggerAttribute.cs b/extensions/Worker.Extensions.EventHubs/src/EventHubTriggerAttribute.cs index 9c8eff632..e957cdd99 100644 --- a/extensions/Worker.Extensions.EventHubs/src/EventHubTriggerAttribute.cs +++ b/extensions/Worker.Extensions.EventHubs/src/EventHubTriggerAttribute.cs @@ -1,7 +1,8 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; +using Microsoft.Azure.Functions.Worker.Converters; +using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; namespace Microsoft.Azure.Functions.Worker { @@ -9,6 +10,8 @@ namespace Microsoft.Azure.Functions.Worker /// Attribute used to mark a function that should be triggered by Event Hubs messages. /// [BindingCapabilities(KnownBindingCapabilities.FunctionLevelRetry)] + [AllowConverterFallback(true)] + [InputConverter(typeof(EventDataConverter))] public sealed class EventHubTriggerAttribute : TriggerBindingAttribute, ISupportCardinality { // Batch by default diff --git a/extensions/Worker.Extensions.EventHubs/src/Properties/AssemblyInfo.cs b/extensions/Worker.Extensions.EventHubs/src/Properties/AssemblyInfo.cs index b29c59370..e090a8273 100644 --- a/extensions/Worker.Extensions.EventHubs/src/Properties/AssemblyInfo.cs +++ b/extensions/Worker.Extensions.EventHubs/src/Properties/AssemblyInfo.cs @@ -1,6 +1,8 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Runtime.CompilerServices; using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; -[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.EventHubs", "5.3.0")] +[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.EventHubs", "5.4.0")] +[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.WorkerExtension.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")] diff --git a/extensions/Worker.Extensions.EventHubs/src/Worker.Extensions.EventHubs.csproj b/extensions/Worker.Extensions.EventHubs/src/Worker.Extensions.EventHubs.csproj index e41060b28..61973e733 100644 --- a/extensions/Worker.Extensions.EventHubs/src/Worker.Extensions.EventHubs.csproj +++ b/extensions/Worker.Extensions.EventHubs/src/Worker.Extensions.EventHubs.csproj @@ -6,14 +6,21 @@ Azure Event Hubs extensions for .NET isolated functions - 5.3.0 + 5.4.0 + -preview1 + + + + + + \ No newline at end of file diff --git a/extensions/Worker.Extensions.ServiceBus/release_notes.md b/extensions/Worker.Extensions.ServiceBus/release_notes.md index aeba67dd4..083c3f78b 100644 --- a/extensions/Worker.Extensions.ServiceBus/release_notes.md +++ b/extensions/Worker.Extensions.ServiceBus/release_notes.md @@ -4,6 +4,7 @@ - My change description (#PR/#issue) --> -### Microsoft.Azure.Functions.Worker.Extensions.ServiceBus +### Microsoft.Azure.Functions.Worker.Extensions.ServiceBus 5.10.0-preview3 -- +- Make Constants class internal (#1609) +- Return ConversionResult.Failed when the wrong content-type or binding source is used (#1609) diff --git a/extensions/Worker.Extensions.ServiceBus/src/Constants.cs b/extensions/Worker.Extensions.ServiceBus/src/Constants.cs index 6912e94f6..8c63d4124 100644 --- a/extensions/Worker.Extensions.ServiceBus/src/Constants.cs +++ b/extensions/Worker.Extensions.ServiceBus/src/Constants.cs @@ -1,11 +1,12 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. - namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus { - public static class Constants + internal static class Constants { internal const string BinaryContentType = "application/octet-stream"; + + internal const string BindingSource = "AzureServiceBusReceivedMessage"; } } \ No newline at end of file diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusExtensionStartup.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusExtensionStartup.cs deleted file mode 100644 index eab4334b3..000000000 --- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusExtensionStartup.cs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using Microsoft.Azure.Functions.Worker; -using Microsoft.Azure.Functions.Worker.Core; -using Microsoft.Extensions.Azure; - -[assembly: WorkerExtensionStartup(typeof(ServiceBusExtensionStartup))] - -namespace Microsoft.Azure.Functions.Worker -{ - public class ServiceBusExtensionStartup : WorkerExtensionStartup - { - public override void Configure(IFunctionsWorkerApplicationBuilder applicationBuilder) - { - if (applicationBuilder == null) - { - throw new ArgumentNullException(nameof(applicationBuilder)); - } - - applicationBuilder.Services.AddAzureClientsCore(); // Adds AzureComponentFactory - } - } -} \ No newline at end of file diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusReceivedMessageConverter.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusReceivedMessageConverter.cs index 19e257aa5..00fae0e5c 100644 --- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusReceivedMessageConverter.cs +++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusReceivedMessageConverter.cs @@ -13,7 +13,6 @@ namespace Microsoft.Azure.Functions.Worker { - [SupportsDeferredBinding] [SupportedConverterType(typeof(ServiceBusReceivedMessage))] [SupportedConverterType(typeof(ServiceBusReceivedMessage[]))] @@ -21,21 +20,31 @@ internal class ServiceBusReceivedMessageConverter : IInputConverter { public ValueTask ConvertAsync(ConverterContext context) { - ConversionResult result = context?.Source switch + try + { + ConversionResult result = context?.Source switch + { + ModelBindingData binding => ConversionResult.Success(ConvertToServiceBusReceivedMessage(binding)), + // Only array collections are currently supported, which matches the behavior of the in-proc extension. + CollectionModelBindingData collection => ConversionResult.Success(collection.ModelBindingDataArray + .Select(ConvertToServiceBusReceivedMessage).ToArray()), + _ => ConversionResult.Unhandled() + }; + return new ValueTask(result); + } + catch (Exception exception) { - ModelBindingData binding => ConversionResult.Success(ConvertToServiceBusReceivedMessage(binding)), - // Only array collections are currently supported, which matches the behavior of the in-proc extension. - CollectionModelBindingData collection => ConversionResult.Success(collection.ModelBindingDataArray - .Select(ConvertToServiceBusReceivedMessage).ToArray()), - _ => ConversionResult.Unhandled() - }; - return new ValueTask(result); + return new ValueTask(ConversionResult.Failed(exception)); + } } private ServiceBusReceivedMessage ConvertToServiceBusReceivedMessage(ModelBindingData binding) { - // The lock token is a 16 byte GUID - const int lockTokenLength = 16; + if (binding?.Source is not Constants.BindingSource) + { + throw new InvalidOperationException( + $"Unexpected binding source. Only '{Constants.BindingSource}' is supported."); + } if (binding.ContentType != Constants.BinaryContentType) { @@ -43,6 +52,9 @@ private ServiceBusReceivedMessage ConvertToServiceBusReceivedMessage(ModelBindin $"Unexpected content-type. Only '{Constants.BinaryContentType}' is supported."); } + // The lock token is a 16 byte GUID + const int lockTokenLength = 16; + ReadOnlyMemory bytes = binding.Content.ToMemory(); ReadOnlyMemory lockTokenBytes = bytes.Slice(0, lockTokenLength); ReadOnlyMemory messageBytes = bytes.Slice(lockTokenLength, bytes.Length - lockTokenLength); diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj index 968ba9f47..e0d879ce6 100644 --- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj +++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj @@ -7,7 +7,7 @@ 5.10.0 - -preview2 + -preview3 false diff --git a/samples/WorkerBindingSamples/EventHubs/EventDataSamples.cs b/samples/WorkerBindingSamples/EventHubs/EventDataSamples.cs new file mode 100644 index 000000000..f6820aaaf --- /dev/null +++ b/samples/WorkerBindingSamples/EventHubs/EventDataSamples.cs @@ -0,0 +1,68 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Azure.Messaging.EventHubs; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Logging; + +namespace SampleApp +{ + /// + /// Samples demonstrating binding to the type. + /// + public class EventDataSamples + { + private readonly ILogger _logger; + + public EventDataSamples(ILogger logger) + { + _logger = logger; + } + + /// + /// This function demonstrates binding to a single . Note that when doing so, you must also set the + /// property to false as the default value of this property is + /// true. + /// + [Function(nameof(EventDataFunctions))] + public void EventDataFunctions( + [EventHubTrigger("queue", Connection = "EventHubConnection", IsBatched = false)] EventData @event) + { + _logger.LogInformation("Event Body: {body}", @event.Body); + _logger.LogInformation("Event Content-Type: {contentType}", @event.ContentType); + } + + /// + /// This function demonstrates binding to an array of . + /// + [Function(nameof(EventDataBatchFunction))] + public void EventDataBatchFunction( + [EventHubTrigger("queue", Connection = "EventHubConnection")] EventData[] events) + { + foreach (EventData @event in events) + { + _logger.LogInformation("Event Body: {body}", @event.Body); + _logger.LogInformation("Event Content-Type: {contentType}", @event.ContentType); + } + } + + /// + /// This functions demonstrates that it is possible to bind to both the and any of the supported binding contract + /// properties at the same time. If attempting this, the must be the first parameter. There is not + /// much benefit to doing this as all of the binding contract properties are available as properties on the . + /// + [Function(nameof(EventDataWithStringPropertiesFunction))] + public void EventDataWithStringPropertiesFunction( + [EventHubTrigger("queue", Connection = "EventHubConnection")] + EventData @event, string contentType, long offset) + { + // The ContentType property and the contentType parameter are the same. + _logger.LogInformation("Event Content-Type: {contentType}", @event.ContentType); + _logger.LogInformation("Event Content-Type: {contentType}", contentType); + + // Similarly the Offset property and the offset parameter are the same. + _logger.LogInformation("Event offset: {offset}", @event.Offset); + _logger.LogInformation("Event offset: {offset}", offset); + } + } +} \ No newline at end of file diff --git a/samples/WorkerBindingSamples/WorkerBindingSamples.csproj b/samples/WorkerBindingSamples/WorkerBindingSamples.csproj index b804f5429..1f5a2653a 100644 --- a/samples/WorkerBindingSamples/WorkerBindingSamples.csproj +++ b/samples/WorkerBindingSamples/WorkerBindingSamples.csproj @@ -16,6 +16,7 @@ + diff --git a/test/FunctionMetadataGeneratorTests/FunctionMetadataGeneratorTests.cs b/test/FunctionMetadataGeneratorTests/FunctionMetadataGeneratorTests.cs index c0ea88c28..2178bc8cc 100644 --- a/test/FunctionMetadataGeneratorTests/FunctionMetadataGeneratorTests.cs +++ b/test/FunctionMetadataGeneratorTests/FunctionMetadataGeneratorTests.cs @@ -11,6 +11,7 @@ using System.Reflection; using System.Threading.Tasks; using Azure.Data.Tables; +using Azure.Messaging.EventHubs; using Azure.Messaging.ServiceBus; using Azure.Storage.Blobs; using Microsoft.Azure.Functions.Tests; @@ -971,6 +972,59 @@ void ValidateServiceBusBatchTrigger(ExpandoObject b) } } + [Fact] + public void EventHubs_SDKTypeBindings() + { + var generator = new FunctionMetadataGenerator(); + var module = ModuleDefinition.ReadModule(_thisAssembly.Location); + var typeDef = TestUtility.GetTypeDefinition(typeof(SDKTypeBindings_EventHubs)); + var functions = generator.GenerateFunctionMetadata(typeDef); + var extensions = generator.Extensions; + + Assert.Equal(2, functions.Count()); + + AssertDictionary(extensions, new Dictionary + { + { "Microsoft.Azure.WebJobs.Extensions.EventHubs", "5.4.0" }, + }); + + var eventHubTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_EventHubs.EventHubTriggerFunction)); + + ValidateFunction(eventHubTriggerFunction, nameof(SDKTypeBindings_EventHubs.EventHubTriggerFunction), GetEntryPoint(nameof(SDKTypeBindings_EventHubs), nameof(SDKTypeBindings_EventHubs.EventHubTriggerFunction)), + ValidateEventHubTrigger); + + var eventHubBatchTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_EventHubs.EventHubBatchTriggerFunction)); + + ValidateFunction(eventHubBatchTriggerFunction, nameof(SDKTypeBindings_EventHubs.EventHubBatchTriggerFunction), GetEntryPoint(nameof(SDKTypeBindings_EventHubs), nameof(SDKTypeBindings_EventHubs.EventHubBatchTriggerFunction)), + ValidateEventHubBatchTrigger); + + void ValidateEventHubTrigger(ExpandoObject b) + { + AssertExpandoObject(b, new Dictionary + { + { "Name", "event" }, + { "Type", "eventHubTrigger" }, + { "Direction", "In" }, + { "eventHubName", "hub" }, + { "Cardinality", "One" }, + { "Properties", new Dictionary( ) { { "SupportsDeferredBinding" , "True"} } } + }); + } + + void ValidateEventHubBatchTrigger(ExpandoObject b) + { + AssertExpandoObject(b, new Dictionary + { + { "Name", "events" }, + { "Type", "eventHubTrigger" }, + { "Direction", "In" }, + { "eventHubName", "hub" }, + { "Cardinality", "Many" }, + { "Properties", new Dictionary( ) { { "SupportsDeferredBinding" , "True"} } } + }); + } + } + private class EventHubNotBatched { [Function("EventHubTrigger")] @@ -1213,6 +1267,23 @@ public static void ServiceBusBatchTriggerFunction( } } + private class SDKTypeBindings_EventHubs + { + [Function(nameof(EventHubTriggerFunction))] + public static void EventHubTriggerFunction( + [EventHubTrigger("hub", IsBatched = false)] EventData @event) + { + throw new NotImplementedException(); + } + + [Function(nameof(EventHubBatchTriggerFunction))] + public static void EventHubBatchTriggerFunction( + [EventHubTrigger("hub")] EventData[] events) + { + throw new NotImplementedException(); + } + } + private class ExternalType_Return { public const string FunctionName = "BasicHttpWithExternalTypeReturn"; diff --git a/test/WorkerExtensionTests/EventHubs/EventDataConverterTests.cs b/test/WorkerExtensionTests/EventHubs/EventDataConverterTests.cs new file mode 100644 index 000000000..00b2ecc13 --- /dev/null +++ b/test/WorkerExtensionTests/EventHubs/EventDataConverterTests.cs @@ -0,0 +1,191 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Azure.Messaging.EventHubs; +using Google.Protobuf; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Converters; +using Microsoft.Azure.Functions.Worker.Extensions.EventHubs; +using Microsoft.Azure.Functions.Worker.Grpc.Messages; +using Microsoft.Azure.Functions.Worker.Tests.Converters; +using Xunit; + +namespace Microsoft.Azure.Functions.WorkerExtension.Tests +{ + public class EventDataConverterTests + { + [Fact] + public async Task ConvertAsync_ReturnsSuccess() + { + var eventData = CreateEventData(); + + var data = new GrpcModelBindingData(new ModelBindingData() + { + Version = "1.0", + Source = "AzureEventHubsEventData", + Content = ByteString.CopyFrom(ConvertEventDataToBinaryData(eventData)), + ContentType = Constants.BinaryContentType + }); + var context = new TestConverterContext(typeof(string), data); + var converter = new EventDataConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Succeeded, result.Status); + var output = result.Value as EventData; + Assert.NotNull(output); + AssertEventData(output); + } + + [Fact] + public async Task ConvertAsync_Batch_ReturnsSuccess() + { + var message = CreateEventData(); + + var data = new ModelBindingData + { + Version = "1.0", + Source = "AzureEventHubsEventData", + Content = ByteString.CopyFrom(ConvertEventDataToBinaryData(message)), + ContentType = Constants.BinaryContentType + }; + + var array = new CollectionModelBindingData(); + array.ModelBindingData.Add(data); + array.ModelBindingData.Add(data); + + var context = new TestConverterContext(typeof(string), new GrpcCollectionModelBindingData(array)); + var converter = new EventDataConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Succeeded, result.Status); + var output = result.Value as EventData[]; + Assert.NotNull(output); + Assert.Equal(2, output.Length); + AssertEventData(output[0]); + AssertEventData(output[1]); + } + + [Fact] + public async Task ConvertAsync_ReturnsFailure_WrongContentType() + { + var eventData = CreateEventData(); + + var data = new GrpcModelBindingData(new ModelBindingData() + { + Version = "1.0", + Source = Constants.BindingSource, + Content = ByteString.CopyFrom(ConvertEventDataToBinaryData(eventData)), + ContentType = "application/json" + }); + var context = new TestConverterContext(typeof(string), data); + var converter = new EventDataConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Failed, result.Status); + var output = result.Value as EventData; + Assert.Null(output); + Assert.IsType(result.Error); + } + + [Fact] + public async Task ConvertAsync_Batch_ReturnsFailure_WrongContentType() + { + var message = CreateEventData(); + + var data = new ModelBindingData + { + Version = "1.0", + Source = Constants.BindingSource, + Content = ByteString.CopyFrom(ConvertEventDataToBinaryData(message)), + ContentType = "application/json" + }; + + var array = new CollectionModelBindingData(); + array.ModelBindingData.Add(data); + array.ModelBindingData.Add(data); + + var context = new TestConverterContext(typeof(string), new GrpcCollectionModelBindingData(array)); + var converter = new EventDataConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Failed, result.Status); + var output = result.Value as EventData[]; + Assert.Null(output); + Assert.IsType(result.Error); + } + + [Fact] + public async Task ConvertAsync_ReturnsFailure_WrongSource() + { + var eventData = CreateEventData(); + + var data = new GrpcModelBindingData(new ModelBindingData() + { + Version = "1.0", + Source = "some-other-source", + Content = ByteString.CopyFrom(ConvertEventDataToBinaryData(eventData)), + ContentType = Constants.BinaryContentType + }); + var context = new TestConverterContext(typeof(string), data); + var converter = new EventDataConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Failed, result.Status); + var output = result.Value as EventData; + Assert.Null(output); + Assert.IsType(result.Error); + } + + [Fact] + public async Task ConvertAsync_Batch_ReturnsFailure_WrongSource() + { + var message = CreateEventData(); + + var data = new ModelBindingData + { + Version = "1.0", + Source = "some-other-source", + Content = ByteString.CopyFrom(ConvertEventDataToBinaryData(message)), + ContentType = Constants.BinaryContentType + }; + + var array = new CollectionModelBindingData(); + array.ModelBindingData.Add(data); + array.ModelBindingData.Add(data); + + var context = new TestConverterContext(typeof(string), new GrpcCollectionModelBindingData(array)); + var converter = new EventDataConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Failed, result.Status); + var output = result.Value as EventData[]; + Assert.Null(output); + Assert.IsType(result.Error); + } + + private static void AssertEventData(EventData output) + { + Assert.Equal("body", output.EventBody.ToString()); + Assert.Equal("messageId", output.MessageId); + Assert.Equal("correlationId", output.CorrelationId); + Assert.Equal("contentType", output.ContentType); + } + + private static EventData CreateEventData() + { + return new EventData("body") + { + ContentType = "contentType", + CorrelationId = "correlationId", + MessageId = "messageId", + }; + } + + private static BinaryData ConvertEventDataToBinaryData(EventData @event) + { + return @event.GetRawAmqpMessage().ToBytes(); + } + } +} \ No newline at end of file diff --git a/test/WorkerExtensionTests/ServiceBus/ServiceBusReceivedMessageConverterTests.cs b/test/WorkerExtensionTests/ServiceBus/ServiceBusReceivedMessageConverterTests.cs index 77275aceb..23455a4a7 100644 --- a/test/WorkerExtensionTests/ServiceBus/ServiceBusReceivedMessageConverterTests.cs +++ b/test/WorkerExtensionTests/ServiceBus/ServiceBusReceivedMessageConverterTests.cs @@ -69,6 +69,108 @@ public async Task ConvertAsync_Batch_ReturnsSuccess() AssertReceivedMessage(output[1], lockToken); } + [Fact] + public async Task ConvertAsync_ReturnsFailure_WrongContentType() + { + var lockToken = Guid.NewGuid(); + var message = CreateReceivedMessage(lockToken); + + var data = new GrpcModelBindingData(new ModelBindingData() + { + Version = "1.0", + Source = Constants.BindingSource, + Content = ByteString.CopyFrom(ConvertReceivedMessageToBinaryData(message)), + ContentType = "application/json" + }); + var context = new TestConverterContext(typeof(string), data); + var converter = new ServiceBusReceivedMessageConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Failed, result.Status); + var output = result.Value as ServiceBusReceivedMessage; + Assert.Null(output); + Assert.IsType(result.Error); + } + + [Fact] + public async Task ConvertAsync_Batch_ReturnsFailure_WrongContentType() + { + var lockToken = Guid.NewGuid(); + var message = CreateReceivedMessage(lockToken); + + var data = new ModelBindingData + { + Version = "1.0", + Source = Constants.BindingSource, + Content = ByteString.CopyFrom(ConvertReceivedMessageToBinaryData(message)), + ContentType = "application/json" + }; + + var array = new CollectionModelBindingData(); + array.ModelBindingData.Add(data); + array.ModelBindingData.Add(data); + + var context = new TestConverterContext(typeof(string), new GrpcCollectionModelBindingData(array)); + var converter = new ServiceBusReceivedMessageConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Failed, result.Status); + var output = result.Value as ServiceBusReceivedMessage[]; + Assert.Null(output); + Assert.IsType(result.Error); + } + + [Fact] + public async Task ConvertAsync_ReturnsFailure_WrongSource() + { + var lockToken = Guid.NewGuid(); + var message = CreateReceivedMessage(lockToken); + + var data = new GrpcModelBindingData(new ModelBindingData() + { + Version = "1.0", + Source = "some-other-source", + Content = ByteString.CopyFrom(ConvertReceivedMessageToBinaryData(message)), + ContentType = Constants.BinaryContentType + }); + var context = new TestConverterContext(typeof(string), data); + var converter = new ServiceBusReceivedMessageConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Failed, result.Status); + var output = result.Value as ServiceBusReceivedMessage; + Assert.Null(output); + Assert.IsType(result.Error); + } + + [Fact] + public async Task ConvertAsync_Batch_ReturnsFailure_WrongSource() + { + var lockToken = Guid.NewGuid(); + var message = CreateReceivedMessage(lockToken); + + var data = new ModelBindingData + { + Version = "1.0", + Source = "some-other-source", + Content = ByteString.CopyFrom(ConvertReceivedMessageToBinaryData(message)), + ContentType = Constants.BinaryContentType + }; + + var array = new CollectionModelBindingData(); + array.ModelBindingData.Add(data); + array.ModelBindingData.Add(data); + + var context = new TestConverterContext(typeof(string), new GrpcCollectionModelBindingData(array)); + var converter = new ServiceBusReceivedMessageConverter(); + var result = await converter.ConvertAsync(context); + + Assert.Equal(ConversionStatus.Failed, result.Status); + var output = result.Value as ServiceBusReceivedMessage[]; + Assert.Null(output); + Assert.IsType(result.Error); + } + private static void AssertReceivedMessage(ServiceBusReceivedMessage output, Guid lockToken) { Assert.Equal("body", output.Body.ToString()); diff --git a/test/WorkerExtensionTests/WorkerExtensionTests.csproj b/test/WorkerExtensionTests/WorkerExtensionTests.csproj index 25339322c..b38882afb 100644 --- a/test/WorkerExtensionTests/WorkerExtensionTests.csproj +++ b/test/WorkerExtensionTests/WorkerExtensionTests.csproj @@ -24,6 +24,7 @@ +