diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ff5b00c --- /dev/null +++ b/.gitignore @@ -0,0 +1,264 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. + +# Azure Functions localsettings file +local.settings.json + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ + +# Visual Studio 2015 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# DNX +project.lock.json +project.fragment.lock.json +artifacts/ + +*_i.c +*_p.c +*_i.h +*.ilk +*.meta +*.obj +*.pch +*.pdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# TODO: Comment the next line if you want to checkin your web deploy settings +# but database connection strings (with potential passwords) will be unencrypted +#*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/packages/* +# except build/, which is used as an MSBuild target. +!**/packages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/packages/repositories.config +# NuGet v3's project.json files produces more ignoreable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +node_modules/ +orleans.codegen.cs + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm + +# SQL Server files +*.mdf +*.ldf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# JetBrains Rider +.idea/ +*.sln.iml + +# CodeRush +.cr/ + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc \ No newline at end of file diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 0000000..de991f4 --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,6 @@ +{ + "recommendations": [ + "ms-azuretools.vscode-azurefunctions", + "ms-dotnettools.csharp" + ] +} diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..894cbe6 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,11 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Attach to .NET Functions", + "type": "coreclr", + "request": "attach", + "processId": "${command:azureFunctions.pickProcess}" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..4d0e007 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "azureFunctions.deploySubpath": "bin/Release/netcoreapp3.1/publish", + "azureFunctions.projectLanguage": "C#", + "azureFunctions.projectRuntime": "~3", + "debug.internalConsoleOptions": "neverOpen", + "azureFunctions.preDeployTask": "publish", + "cmake.configureOnOpen": false +} \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..88083f5 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,69 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "clean", + "command": "dotnet", + "args": [ + "clean", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "type": "process", + "problemMatcher": "$msCompile" + }, + { + "label": "build", + "command": "dotnet", + "args": [ + "build", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "type": "process", + "dependsOn": "clean", + "group": { + "kind": "build", + "isDefault": true + }, + "problemMatcher": "$msCompile" + }, + { + "label": "clean release", + "command": "dotnet", + "args": [ + "clean", + "--configuration", + "Release", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "type": "process", + "problemMatcher": "$msCompile" + }, + { + "label": "publish", + "command": "dotnet", + "args": [ + "publish", + "--configuration", + "Release", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "type": "process", + "dependsOn": "clean release", + "problemMatcher": "$msCompile" + }, + { + "type": "func", + "dependsOn": "build", + "options": { + "cwd": "${workspaceFolder}/bin/Debug/netcoreapp3.1" + }, + "command": "host start", + "isBackground": true, + "problemMatcher": "$func-dotnet-watch" + } + ] +} \ No newline at end of file diff --git a/OrchestrationLease.cs b/OrchestrationLease.cs new file mode 100644 index 0000000..36d518a --- /dev/null +++ b/OrchestrationLease.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using Newtonsoft.Json; + +namespace Ragc.Etl +{ + public class OrchestrationLease + { + [JsonProperty(PropertyName = "id")] + public string Id {get; set; } + public DateTime StartTime {get; set;} + public DateTime EndTime {get; set;} + public bool Succeeded {get; set;} + public bool Locked {get; set;} + public string PreferredLocations {get; set;} //TODO: CHange by worker identification + + [JsonProperty(PropertyName = "_etag")] + public string ETag {get; set;} + } +} \ No newline at end of file diff --git a/Orchestrator.cs b/Orchestrator.cs new file mode 100644 index 0000000..9737943 --- /dev/null +++ b/Orchestrator.cs @@ -0,0 +1,234 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs; +using Microsoft.Azure.WebJobs.Extensions.DurableTask; +using Microsoft.Extensions.Logging; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.Documents.Client; +using Newtonsoft.Json; + +namespace Ragc.Etl +{ + public static class Orchestrator + { + private const string ExternalEndpointUrl = "https://testpyfunc-weu.azurewebsites.net/api/list"; + private const string UrlParams = "?code=TNaBHahpBoa4scaJbRcx3YHk/7NVLu6gEYaUuZhViJeQ6zN9MN0tKg=="; + + private static CosmosClient _cosmosClient = new CosmosClient(Environment.GetEnvironmentVariable("CosmosDbConnStr")); + private static Database _db = _cosmosClient.GetDatabase("documents"); + + /// Orchestration Entry Point (done with an scheduled function) + /// Considerations: We can have the orchestration being executed every, let say 5 mins. + // Improvements: every execution verify if it is needed to run (check the last excecution and verify if it was succeded, etc...) + [FunctionName("Orchestration_Run")] + public static async Task RunExtraction( + [TimerTrigger("0 */5 * * * *")] TimerInfo myTimer, ILogger log, + [DurableClient] IDurableOrchestrationClient starter) + { + if (myTimer.IsPastDue) + { + log.LogInformation("Timer is running late!"); + } + log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}"); + + // Function input comes from the request content. + string instanceId = await starter.StartNewAsync("Orchestrator", null); + + log.LogInformation($"Started orchestration with ID = '{instanceId}'."); + } + + [FunctionName("Orchestrator")] + public static async Task RunOrchestrator( + [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log) + { + try + { + //TODO: Move locks to use entity function? + + if (await context.CallActivityAsync("GetOrchestrationLease", null)) + { + // Retrieve Data + var items = await context.CallActivityAsync>("ExtractDocuments", null); + + // SaveItems + await context.CallActivityAsync("SaveDocuments", items); + + //Release Lock + await context.CallActivityAsync("ReleaseOrchestrationLease", true); + } + else{ + log.LogInformation("Orchestration lease already in place. Skipping execution."); + } + + } + catch (Exception ex) + { + //Exception handling & compensation if needed + log.LogError("Orchestration execution failed: " + ex.Message); + await context.CallActivityAsync("ReleaseOrchestrationLease", false); + throw; + } + } + [FunctionName("GetOrchestrationLease")] + public static async Task GetOrchestrationLeaseAsync([ActivityTrigger] string name, ILogger log) + { + Container container = await _db.CreateContainerIfNotExistsAsync("orchestrationLease", "/id"); + + OrchestrationLease olease = null; + + try + { + olease = await container.ReadItemAsync("OrchestrationLeaseId", new PartitionKey("OrchestrationLeaseId")); + } + catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound) + { + log.LogInformation("OrchestrationLease does not exists. The first orchestration lease item will be created."); + olease = new OrchestrationLease(); + olease.Id = "OrchestrationLeaseId"; + } + + ItemRequestOptions options = null; + options = new ItemRequestOptions(); + options.IfMatchEtag = olease.ETag; + + if (!olease.Locked) + { + olease.Locked = true; + olease.PreferredLocations = Environment.GetEnvironmentVariable("PreferredLocations"); + olease.StartTime = DateTime.Now; + + try + { + await container.UpsertItemAsync(olease, new PartitionKey("OrchestrationLeaseId"), options); + return true; + } + catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.PreconditionFailed) + { + log.LogInformation($"Unable to set the lock (other process have updated the lease). This execution will be skipped. ETag precondition failed: {ex.ToString()}"); + return false; + } + catch(Exception ex){ + log.LogError("Error adquiring the orchestration lock:" + ex.Message); + throw; + } + } + else + { + log.LogInformation($"Lock alrady adquired by other process. This execution will skip."); + return false; + } + } + + [FunctionName("ReleaseOrchestrationLease")] + public static async Task ReleaseOrchestrationLeaseAsync([ActivityTrigger] bool succeded, ILogger log) + { + Container container = await _db.CreateContainerIfNotExistsAsync("orchestrationLease", "/id"); + OrchestrationLease olease = await container.ReadItemAsync("OrchestrationLeaseId", new PartitionKey("OrchestrationLeaseId")); + + ItemRequestOptions options = null; + + options = new ItemRequestOptions(); + options.IfMatchEtag = olease.ETag; + + if (olease.Locked) + { + olease.Locked = false; + olease.EndTime = DateTime.Now; + olease.Succeeded = succeded; + try + { + await container.UpsertItemAsync(olease, new PartitionKey("OrchestrationLeaseId"), options); + } + catch (Exception ex) + { + log.LogError($"Error releasing the lock: {ex.Message}"); + throw; + } + } + else{ + log.LogWarning("Lock is not adquired. Not releasing."); + } + } + + + [FunctionName("ExtractDocuments")] + public static async Task> ExtractAsync([ActivityTrigger] string name, ILogger log) + { + try + { + //Call The HTTP EndPoint to retrieve JSon data + HttpClient client = new HttpClient(); + client.BaseAddress = new Uri(ExternalEndpointUrl); + client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); + + HttpResponseMessage response = await client.GetAsync(UrlParams); + if (response.IsSuccessStatusCode) + { + var dataItems = await response.Content.ReadAsAsync>(); + + return dataItems; + } + else + { + log.LogError($"Unable to retrieve data from { ExternalEndpointUrl }. Status Code: {response.StatusCode}."); + return null; + } + } + catch (Exception ex) + { + log.LogError($"Exception in ExtractAndStore: {ex.Message}"); + return null; + } + } + + [FunctionName("SaveDocuments")] + public static async Task SaveDocumentsAsync([ActivityTrigger] IEnumerable itemsIn, + [CosmosDB(databaseName: "documents", collectionName: "extracted", ConnectionStringSetting = "CosmosDbConnStr", CreateIfNotExists = true, + PreferredLocations="%PreferredLocations%", UseMultipleWriteLocations=true )]IAsyncCollector itemsOut, + ILogger log) + { + log.LogInformation($"Saving items..."); + int itemsCount = 0; + foreach (SampleItem item in itemsIn) + { + log.LogInformation($"Document Name={item.Name}"); + item.Id = Guid.NewGuid(); + await itemsOut.AddAsync(item); + itemsCount++; + } + log.LogInformation($"Saved {itemsCount} items"); + } + + [FunctionName("TransformDocument")] + public static async Task TransformDocumentAsync( + [CosmosDBTrigger(databaseName: "documents", collectionName: "extracted", ConnectionStringSetting = "CosmosDbConnStr", + LeaseCollectionName = "transformLease", + CreateLeaseCollectionIfNotExists=true, + PreferredLocations="%PreferredLocations%", UseMultipleWriteLocations=true )]IReadOnlyList documents, + [CosmosDB(databaseName: "documents", collectionName: "transformed", ConnectionStringSetting = "CosmosDbConnStr", CreateIfNotExists = true, + PreferredLocations="%PreferredLocations%", UseMultipleWriteLocations=true )]IAsyncCollector itemsOut, + ILogger log) + { + log.LogInformation($"Transforming documents..."); + int itemsCount = 0; + Guid batchGuid = Guid.NewGuid(); + foreach (var document in documents) + { + itemsCount++; + + SampleItem item = JsonConvert.DeserializeObject(document.ToString()); + + log.LogInformation($"Transforming item {item.Id}. Setting "); + item.UpdateLocation = Environment.GetEnvironmentVariable("PreferredLocations"); + item.TransformData = $"Data added from transform function [{itemsCount}]"; + item.TransformBatch = batchGuid; + await itemsOut.AddAsync(item); + } + log.LogInformation($"Transformed {itemsCount} items"); + } + } +} \ No newline at end of file diff --git a/SampleItem.cs b/SampleItem.cs new file mode 100644 index 0000000..e8e6c7a --- /dev/null +++ b/SampleItem.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using Newtonsoft.Json; + +namespace Ragc.Etl +{ + public class SampleItem + { + [JsonProperty(PropertyName = "id")] + public Guid Id {get; set;} + public string Date {get; set;} + public string Desc {get; set;} + public string Done {get; set;} + public string Name {get; set;} + public string Pr {get; set;} + public string TransformData {get; set;} + public Guid TransformBatch {get; set;} + public string UpdateLocation {get; set;} + } +} \ No newline at end of file diff --git a/host.json b/host.json new file mode 100644 index 0000000..4b77f70 --- /dev/null +++ b/host.json @@ -0,0 +1,20 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingExcludedTypes": "Request", + "samplingSettings": { + "isEnabled": true + } + } + }, + "extensions": { + "cosmosDB": { + "connectionMode": "Gateway", + "protocol": "Https", + "leaseOptions": { + "leasePrefix": "durableFuncPrefix" + } + } + } +} \ No newline at end of file diff --git a/ragc-etl-azfunc.csproj b/ragc-etl-azfunc.csproj new file mode 100644 index 0000000..ebe7801 --- /dev/null +++ b/ragc-etl-azfunc.csproj @@ -0,0 +1,22 @@ + + + netcoreapp3.1 + v3 + ragc_etl_azfunc + + + + + + + + + + PreserveNewest + + + PreserveNewest + Never + + +