diff --git a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/BlobUploadProcessor.cs b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/BlobUploadProcessor.cs index 7862f197908..b2e1ace599f 100644 --- a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/BlobUploadProcessor.cs +++ b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/BlobUploadProcessor.cs @@ -52,7 +52,6 @@ public class BlobUploadProcessor private readonly BlobContainerClient testRunsContainerClient; private readonly BlobContainerClient buildDefinitionsContainerClient; private readonly BlobContainerClient buildFailuresContainerClient; - private readonly QueueClient queueClient; private readonly IOptions options; private readonly Dictionary cachedDefinitionRevisions = new(); private readonly IFailureAnalyzer failureAnalyzer; @@ -61,7 +60,6 @@ public BlobUploadProcessor( ILogger logger, BuildLogProvider logProvider, BlobServiceClient blobServiceClient, - QueueServiceClient queueServiceClient, BuildHttpClient buildClient, TestResultsHttpClient testResultsClient, IOptions options, @@ -72,11 +70,6 @@ public BlobUploadProcessor( throw new ArgumentNullException(nameof(blobServiceClient)); } - if (queueServiceClient == null) - { - throw new ArgumentNullException(nameof(queueServiceClient)); - } - this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); this.options = options ?? throw new ArgumentNullException(nameof(options)); this.logProvider = logProvider ?? throw new ArgumentNullException(nameof(logProvider)); @@ -89,8 +82,6 @@ public BlobUploadProcessor( this.buildFailuresContainerClient = blobServiceClient.GetBlobContainerClient(BuildFailuresContainerName); this.testRunsContainerClient = blobServiceClient.GetBlobContainerClient(TestRunsContainerName); this.buildDefinitionsContainerClient = blobServiceClient.GetBlobContainerClient(BuildDefinitionsContainerName); - this.queueClient = queueServiceClient.GetQueueClient(this.options.Value.BuildLogBundlesQueueName); - this.queueClient.CreateIfNotExists(); this.failureAnalyzer = failureAnalyzer; } @@ -178,15 +169,12 @@ public async Task UploadBuildBlobsAsync(string account, Guid projectId, int buil logger.LogWarning("No logs available for build {Project}: {BuildId}", build.Project.Name, build.Id); return; } - - var bundles = BuildLogBundles(account, build, timeline, logs); - // We no longer process log bundles on separate messages. - // During zero downtime upgrade phase, process all the bundles sequentially but allow for processing message in the log bundle queue - // After the upgrade phase, this should be rewritten to remove bundling but keep the log -> timeline record association - foreach(var bundle in bundles) + var buildLogInfos = GetBuildLogInfos(account, build, timeline, logs); + + foreach (var log in buildLogInfos) { - await ProcessBuildLogBundleAsync(bundle); + await UploadLogLinesBlobAsync(account, build, log); } } @@ -246,14 +234,6 @@ private async Task UploadBuildFailureBlobAsync(string account, Build build, Time } } - public async Task ProcessBuildLogBundleAsync(BuildLogBundle buildLogBundle) - { - foreach (var log in buildLogBundle.TimelineLogs) - { - await UploadLogLinesBlobAsync(buildLogBundle, log); - } - } - public async Task UploadBuildDefinitionBlobsAsync(string account, string projectName) { var definitions = await buildClient.GetFullDefinitionsAsync2(project: projectName); @@ -361,35 +341,14 @@ private async Task UploadBuildDefinitionBlobAsync(string account, BuildDefinitio } } - private List BuildLogBundles(string account, Build build, Timeline timeline, List logs) + private List GetBuildLogInfos(string account, Build build, Timeline timeline, List logs) { - BuildLogBundle CreateBundle() => new BuildLogBundle - { - Account = account, - BuildId = build.Id, - ProjectId = build.Project.Id, - ProjectName = build.Project.Name, - QueueTime = build.QueueTime.Value, - StartTime = build.StartTime.Value, - FinishTime = build.FinishTime.Value, - DefinitionId = build.Definition.Id, - DefinitionName = build.Definition.Name, - DefinitionPath = build.Definition.Path - }; - - BuildLogBundle currentBundle; - var logBundles = new List(); - logBundles.Add(currentBundle = CreateBundle()); - var logsById = logs.ToDictionary(l => l.Id); + var buildLogInfos = new List(); + foreach (var log in logs) { - if(currentBundle.TimelineLogs.Count >= this.options.Value.BuildLogBundleSize) - { - logBundles.Add(currentBundle = CreateBundle()); - } - var logRecords = timeline.Records.Where(x => x.Log?.Id == log.Id).ToArray(); if(logRecords.Length > 1) @@ -419,7 +378,7 @@ private List BuildLogBundles(string account, Build build, Timeli } } - currentBundle.TimelineLogs.Add(new BuildLogInfo + buildLogInfos.Add(new BuildLogInfo { LogId = log.Id, LineCount = log.LineCount, @@ -430,7 +389,7 @@ private List BuildLogBundles(string account, Build build, Timeli }); } - return logBundles; + return buildLogInfos; } private async Task UploadBuildBlobAsync(string account, Build build) @@ -599,29 +558,29 @@ private async Task UploadTimelineBlobAsync(string account, Build build, Timeline } } - private async Task UploadLogLinesBlobAsync(BuildLogBundle build, BuildLogInfo log) + private async Task UploadLogLinesBlobAsync(string account, Build build, BuildLogInfo log) { try { // we don't use FinishTime in the logs blob path to prevent duplicating logs when processing retries. // i.e. logs with a given buildid/logid are immutable and retries only add new logs. - var blobPath = $"{build.ProjectName}/{build.QueueTime:yyyy/MM/dd}/{build.BuildId}-{log.LogId}.jsonl"; + var blobPath = $"{build.Project.Name}/{build.QueueTime:yyyy/MM/dd}/{build.Id}-{log.LogId}.jsonl"; var blobClient = this.buildLogLinesContainerClient.GetBlobClient(blobPath); if (await blobClient.ExistsAsync()) { - this.logger.LogInformation("Skipping existing log for build {BuildId}, record {RecordId}, log {LogId}", build.BuildId, log.RecordId, log.LogId); + this.logger.LogInformation("Skipping existing log for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId); return; } - this.logger.LogInformation("Processing log for build {BuildId}, record {RecordId}, log {LogId}", build.BuildId, log.RecordId, log.LogId); + this.logger.LogInformation("Processing log for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId); var lineNumber = 0; var characterCount = 0; // Over an open read stream and an open write stream, one line at a time, read, process, and write to // blob storage - using (var logStream = await this.logProvider.GetLogStreamAsync(build.ProjectName, build.BuildId, log.LogId)) + using (var logStream = await this.logProvider.GetLogStreamAsync(build.Project.Name, build.Id, log.LogId)) using (var logReader = new StreamReader(logStream)) using (var blobStream = await blobClient.OpenWriteAsync(overwrite: true, new BlobOpenWriteOptions())) using (var blobWriter = new StreamWriter(blobStream)) @@ -657,13 +616,13 @@ private async Task UploadLogLinesBlobAsync(BuildLogBundle build, BuildLogInfo lo await blobWriter.WriteLineAsync(JsonConvert.SerializeObject(new { - OrganizationName = build.Account, - ProjectId = build.ProjectId, - ProjectName = build.ProjectName, - BuildDefinitionId = build.DefinitionId, - BuildDefinitionPath = build.DefinitionPath, - BuildDefinitionName = build.DefinitionName, - BuildId = build.BuildId, + OrganizationName = account, + ProjectId = build.Project.Id, + ProjectName = build.Project.Name, + BuildDefinitionId = build.Definition.Id, + BuildDefinitionPath = build.Definition.Path, + BuildDefinitionName = build.Definition.Name, + BuildId = build.Id, LogId = log.LogId, LineNumber = lineNumber, Length = message.Length, @@ -674,15 +633,15 @@ await blobWriter.WriteLineAsync(JsonConvert.SerializeObject(new } } - logger.LogInformation("Processed {CharacterCount} characters and {LineCount} lines for build {BuildId}, record {RecordId}, log {LogId}", characterCount, lineNumber, build.BuildId, log.RecordId, log.LogId); + logger.LogInformation("Processed {CharacterCount} characters and {LineCount} lines for build {BuildId}, record {RecordId}, log {LogId}", characterCount, lineNumber, build.Id, log.RecordId, log.LogId); } catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict) { - this.logger.LogInformation("Ignoring existing blob exception for build {BuildId}, record {RecordId}, log {LogId}", build.BuildId, log.RecordId, log.LogId); + this.logger.LogInformation("Ignoring existing blob exception for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId); } catch (Exception ex) { - this.logger.LogError(ex, "Error processing build {BuildId}, record {RecordId}, log {LogId}", build.BuildId, log.RecordId, log.LogId); + this.logger.LogError(ex, "Error processing build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId); throw; } } diff --git a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/PipelineWitnessSettings.cs b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/PipelineWitnessSettings.cs index 26396ecd127..e54f2c81f04 100644 --- a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/PipelineWitnessSettings.cs +++ b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/PipelineWitnessSettings.cs @@ -6,27 +6,25 @@ namespace Azure.Sdk.Tools.PipelineWitness { public class PipelineWitnessSettings { + /// + /// Gets or sets the uri of the key vault to use + /// public string KeyVaultUri { get; set; } - public string QueueStorageAccountUri { get; set; } - - - public string BlobStorageAccountUri { get; set; } - /// - /// Gets or sets the name of the build complete queue + /// Gets or sets uri of the storage account use for queue processing /// - public string BuildCompleteQueueName { get; set; } + public string QueueStorageAccountUri { get; set; } /// - /// Gets or sets the name of the build log bundles queue + /// Gets or sets uri of the blob storage account use for blob export /// - public string BuildLogBundlesQueueName { get; set; } + public string BlobStorageAccountUri { get; set; } /// - /// Gets or sets the number of build logs to add to each log bundle message + /// Gets or sets the name of the build complete queue /// - public int BuildLogBundleSize { get; set; } = 50; + public string BuildCompleteQueueName { get; set; } /// /// Gets or sets the amount of time a message should be invisible in the queue while being processed @@ -63,11 +61,6 @@ public class PipelineWitnessSettings /// public TimeSpan BuildDefinitionLoopPeriod { get; set; } = TimeSpan.FromMinutes(5); - /// - /// Gets or sets the number of concurrent log bundle queue workers to register - /// - public int BuildLogBundlesWorkerCount { get; set; } = 1; - /// /// Gets or sets the number of concurrent build complete queue workers to register /// diff --git a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Services/BuildLogBundleQueueWorker.cs b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Services/BuildLogBundleQueueWorker.cs deleted file mode 100644 index 29e7cb366ce..00000000000 --- a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Services/BuildLogBundleQueueWorker.cs +++ /dev/null @@ -1,66 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Azure.Storage.Queues; -using Azure.Storage.Queues.Models; -using Microsoft.ApplicationInsights; -using Microsoft.ApplicationInsights.DataContracts; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Newtonsoft.Json; - -namespace Azure.Sdk.Tools.PipelineWitness.Services -{ - internal class BuildLogBundleQueueWorker : QueueWorkerBackgroundService - { - private readonly ILogger logger; - private readonly TelemetryClient telemetryClient; - private readonly BlobUploadProcessor runProcessor; - - public BuildLogBundleQueueWorker( - ILogger logger, - BlobUploadProcessor runProcessor, - QueueServiceClient queueServiceClient, - TelemetryClient telemetryClient, - IOptionsMonitor options) - : base( - logger, - telemetryClient, - queueServiceClient, - options?.CurrentValue?.BuildLogBundlesQueueName, - options) - { - this.logger = logger; - this.runProcessor = runProcessor; - this.telemetryClient = telemetryClient; - } - - internal override async Task ProcessMessageAsync(QueueMessage message, CancellationToken cancellationToken) - { - logger.LogInformation("Processing build log bundle message."); - - if (message.InsertedOn.HasValue) - { - telemetryClient.TrackMetric(new MetricTelemetry - { - Name = "AzurePipelinesBuildLogBundle MessageLatencyMs", - Sum = DateTimeOffset.Now.Subtract(message.InsertedOn.Value).TotalMilliseconds, - }); - } - - BuildLogBundle buildLogBundle; - try - { - buildLogBundle = JsonConvert.DeserializeObject(message.MessageText); - } - catch (Exception ex) - { - logger.LogError(ex, "Failed to deserialize message body. Body: {MessageBody}", message.MessageText); - throw; - } - - // TODO: Add cancellation token propatagion - await runProcessor.ProcessBuildLogBundleAsync(buildLogBundle); - } - } -} diff --git a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Startup.cs b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Startup.cs index 8a150ec0f06..f7c65bd1686 100644 --- a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Startup.cs +++ b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Startup.cs @@ -79,7 +79,6 @@ public static void Configure(WebApplicationBuilder builder) builder.Services.Configure(settingsSection); builder.Services.AddHostedService(settings.BuildCompleteWorkerCount); - builder.Services.AddHostedService(settings.BuildLogBundlesWorkerCount); builder.Services.AddHostedService(); } diff --git a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/appsettings.json b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/appsettings.json index 74ba073bc35..4516629b8f8 100644 --- a/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/appsettings.json +++ b/tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/appsettings.json @@ -15,9 +15,6 @@ "BlobStorageAccountUri": "https://azsdkengsyspipelinelogs.blob.core.windows.net", "BuildCompleteQueueName": "azurepipelines-build-completed", "BuildCompleteWorkerCount": 5, - "BuildLogBundlesQueueName": "azurepipelines-build-log-bundle", - "BuildLogBundlesWorkerCount": 5, - "BuildLogBundleSize": 50, "MessageLeasePeriod": "00:03:00", "MessageErrorSleepPeriod": "00:00:10", "MaxDequeueCount": 5,