Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ Please follow the [tutorial](https://docs.microsoft.com/azure/azure-functions/fu

## Examples

### Functions that uses Event Grid binding
### Functions that uses Event Grid output binding

```C# Snippet:EventGridBindingFunction
public static class EventGridBindingFunction
If you are using the EventGrid schema for your topic, you can output EventGridEvents.

```C# Snippet:EventGridEventBindingFunction
public static class EventGridEventBindingFunction
{
[FunctionName("EventGridBindingFunction")]
[FunctionName("EventGridEventBindingFunction")]
public static async Task<IActionResult> RunAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
[EventGrid(TopicEndpointUri = "EventGridEndpoint", TopicKeySetting = "EventGridKey")] IAsyncCollector<EventGridEvent> eventCollector)
Expand All @@ -60,12 +62,31 @@ public static class EventGridBindingFunction
}
```

If you are using the CloudEvent schema for your topic, you can output CloudEvents.
```C# Snippet:CloudEventBindingFunction
public static class CloudEventBindingFunction
{
[FunctionName("CloudEventBindingFunction")]
public static async Task<IActionResult> RunAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
[EventGrid(TopicEndpointUri = "EventGridEndpoint", TopicKeySetting = "EventGridKey")] IAsyncCollector<CloudEvent> eventCollector)
{
CloudEvent e = new CloudEvent("IncomingRequest", "IncomingRequest", await req.ReadAsStringAsync());
await eventCollector.AddAsync(e);
return new OkResult();
}
}
```

You can also output a string or JObject and the extension will attempt to parse into the correct strongly typed event.

### Functions that uses Event Grid trigger
You can also create a function that will be executed whenever an event is delivered to your topic. Depending on the schema you have selected for your Azure Function event subscription, you can bind to either `EventGridEvent` or `CloudEvent`:

```C# Snippet:EventGridTriggerFunction
public static class EventGridTriggerFunction
```C# Snippet:EventGridEventTriggerFunction
public static class EventGridEventTriggerFunction
{
[FunctionName("EventGridTriggerFunction")]
[FunctionName("EventGridEventTriggerFunction")]
public static void Run(
ILogger logger,
[EventGridTrigger] EventGridEvent e)
Expand All @@ -75,6 +96,20 @@ public static class EventGridTriggerFunction
}
```

And if your subscription is configured with the CloudEvent schema:
```C# Snippet:CloudEventTriggerFunction
public static class CloudEventTriggerFunction
{
[FunctionName("CloudEventTriggerFunction")]
public static void Run(
ILogger logger,
[EventGridTrigger] CloudEvent e)
{
logger.LogInformation("Event received {type} {subject}", e.Type, e.Subject);
}
}
```

## Troubleshooting

Please refer to [Monitor Azure Functions](https://docs.microsoft.com/azure/azure-functions/functions-monitoring) for troubleshooting guidance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using System.Web;
using Azure;
using Azure.Messaging;
using Azure.Messaging.EventGrid;
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.Host.Bindings;
Expand All @@ -31,12 +32,12 @@ internal class EventGridExtensionConfigProvider : IExtensionConfigProvider,
{
private ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> _converter;
private readonly Func<EventGridAttribute, IAsyncCollector<object>> _converter;
private readonly HttpRequestProcessor _httpRequestProcessor;

// for end to end testing
internal EventGridExtensionConfigProvider(
Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> converter,
Func<EventGridAttribute, IAsyncCollector<object>> converter,
HttpRequestProcessor httpRequestProcessor,
ILoggerFactory loggerFactory)
{
Expand Down Expand Up @@ -72,11 +73,13 @@ public void Initialize(ExtensionConfigContext context)
// also take benefit of identity converter
context
.AddBindingRule<EventGridTriggerAttribute>() // following converters are for EventGridTriggerAttribute only
.AddConverter<JToken, string>((jtoken) => jtoken.ToString(Formatting.Indented))
.AddConverter<JToken, string[]>((jarray) => jarray.Select(ar => ar.ToString(Formatting.Indented)).ToArray())
.AddConverter<JToken, DirectInvokeString>((jtoken) => new DirectInvokeString(null))
.AddConverter<JToken, EventGridEvent>((jobject) => EventGridEvent.Parse(new BinaryData(jobject.ToString()))) // surface the type to function runtime
.AddConverter<JToken, EventGridEvent[]>((jobject) => EventGridEvent.ParseMany(new BinaryData(jobject.ToString()))) // surface the type to function runtime
.AddConverter<JToken, string>(jtoken => jtoken.ToString(Formatting.Indented))
.AddConverter<JToken, string[]>(jarray => jarray.Select(ar => ar.ToString(Formatting.Indented)).ToArray())
.AddConverter<JToken, DirectInvokeString>(jtoken => new DirectInvokeString(null))
.AddConverter<JToken, EventGridEvent>(jobject => EventGridEvent.Parse(new BinaryData(jobject.ToString()))) // surface the type to function runtime
.AddConverter<JToken, EventGridEvent[]>(jobject => EventGridEvent.ParseMany(new BinaryData(jobject.ToString())))
.AddConverter<JToken, CloudEvent>(jobject => CloudEvent.Parse(new BinaryData(jobject.ToString())))
.AddConverter<JToken, CloudEvent[]>(jobject => CloudEvent.ParseMany(new BinaryData(jobject.ToString())))
.AddOpenConverter<JToken, OpenType.Poco>(typeof(JTokenToPocoConverter<>))
.AddOpenConverter<JToken, OpenType.Poco[]>(typeof(JTokenToPocoConverter<>))
.BindToTrigger<JToken>(new EventGridTriggerAttributeBindingProvider(this));
Expand All @@ -85,8 +88,30 @@ public void Initialize(ExtensionConfigContext context)
var rule = context
.AddBindingRule<EventGridAttribute>()
//TODO - add binding for BinaryData?
.AddConverter<string, EventGridEvent>((str) => EventGridEvent.Parse(new BinaryData(str)))
.AddConverter<JObject, EventGridEvent>((jobject) => EventGridEvent.Parse(new BinaryData(jobject.ToString())));
.AddConverter<string, object>(str =>
{
// first attempt to parse as EventGridEvent, then fallback to CloudEvent
try
{
return EventGridEvent.Parse(new BinaryData(str));
}
catch (ArgumentException)
{
return CloudEvent.Parse(new BinaryData(str));
}
})
.AddConverter<JObject, object>(jobject =>
{
try
{
return EventGridEvent.Parse(new BinaryData(jobject.ToString()));
}
catch (ArgumentException)
{
return CloudEvent.Parse(new BinaryData(jobject.ToString()));
}
});

rule.BindToCollector(_converter);
rule.AddValidator((a, t) =>
{
Expand Down Expand Up @@ -181,4 +206,4 @@ public T Convert(JToken input)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,30 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging;
using Azure.Messaging.EventGrid;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.EventGrid
{
internal sealed class EventGridAsyncCollector : IAsyncCollector<EventGridEvent>
internal sealed class EventGridAsyncCollector : IAsyncCollector<object>
{
// use EventGridPublisherClient for mocking test
private readonly EventGridPublisherClient _client;
private readonly object _syncroot = new object();

private IList<EventGridEvent> _eventsToSend = new List<EventGridEvent>();
private IList<object> _eventsToSend = new List<object>();

public EventGridAsyncCollector(EventGridPublisherClient client)
{
_client = client;
}

public Task AddAsync(EventGridEvent item, CancellationToken cancellationToken = default(CancellationToken))
public Task AddAsync(object item, CancellationToken cancellationToken = default(CancellationToken))
{
lock (_syncroot)
{
// Don't let FlushAsyc take place while we're doing this
// Don't let FlushAsync take place while we're doing this
_eventsToSend.Add(item);
}

Expand All @@ -35,8 +38,8 @@ public EventGridAsyncCollector(EventGridPublisherClient client)

public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
IList<EventGridEvent> events;
var newEventList = new List<EventGridEvent>();
IList<object> events;
var newEventList = new List<object>();
lock (_syncroot)
{
// swap the events to send out with a new list; locking so 'AddAsync' doesn't take place while we do this
Expand All @@ -46,7 +49,92 @@ public EventGridAsyncCollector(EventGridPublisherClient client)

if (events.Any())
{
await _client.SendEventsAsync(events, cancellationToken).ConfigureAwait(false);
// determine the schema by inspecting the first event (a topic can only support a single schema)
var firstEvent = events.First();
if (firstEvent is string str)
{
bool isEventGridEvent = false;
try
{
var ev = EventGridEvent.Parse(new BinaryData(str));
isEventGridEvent = true;
}
catch (ArgumentException)
{
}

if (isEventGridEvent)
{
List<EventGridEvent> egEvents = new();
foreach (string evt in events)
{
egEvents.Add(EventGridEvent.Parse(new BinaryData(evt)));
}

await _client.SendEventsAsync(egEvents, cancellationToken).ConfigureAwait(false);
}
else
{
List<CloudEvent> cloudEvents = new();
foreach (string evt in events)
{
cloudEvents.Add(CloudEvent.Parse(new BinaryData(evt)));
}

await _client.SendEventsAsync(cloudEvents, cancellationToken).ConfigureAwait(false);
}
}
else if (firstEvent is JObject jObject)
{
bool isEventGridEvent = false;
try
{
var ev = EventGridEvent.Parse(new BinaryData(jObject.ToString()));
isEventGridEvent = true;
}
catch (ArgumentException)
{
}

if (isEventGridEvent)
{
List<EventGridEvent> egEvents = new();
foreach (JObject evt in events)
{
egEvents.Add(EventGridEvent.Parse(new BinaryData(evt.ToString())));
}

await _client.SendEventsAsync(egEvents, cancellationToken).ConfigureAwait(false);
}
else
{
List<CloudEvent> cloudEvents = new();
foreach (JObject evt in events)
{
cloudEvents.Add(CloudEvent.Parse(new BinaryData(evt.ToString())));
}

await _client.SendEventsAsync(cloudEvents, cancellationToken).ConfigureAwait(false);
}
}
else if (firstEvent is EventGridEvent)
{
List<EventGridEvent> egEvents = new();
foreach (object evt in events)
{
egEvents.Add((EventGridEvent) evt);
}
await _client.SendEventsAsync(egEvents, cancellationToken).ConfigureAwait(false);
}
else
{
List<CloudEvent> cloudEvents = new();
foreach (object evt in events)
{
cloudEvents.Add((CloudEvent) evt);
}
await _client.SendEventsAsync(cloudEvents, cancellationToken).ConfigureAwait(false);
}
}
}
}
Expand Down
Loading