Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,30 @@ public async Task UploadBuildBlobsAsync(string account, Guid projectId, int buil
await UploadBuildBlobAsync(account, build);
}

public async Task<string[]> GetBuildBlobNamesAsync(string projectName, DateTimeOffset minTime, DateTimeOffset maxTime, CancellationToken cancellationToken)
{
DateTimeOffset minDay = minTime.ToUniversalTime().Date;
DateTimeOffset maxDay = maxTime.ToUniversalTime().Date;

DateTimeOffset[] days = Enumerable.Range(0, (int)(maxDay - minDay).TotalDays + 1)
.Select(offset => minDay.AddDays(offset))
.ToArray();

List<string> blobNames = [];

foreach (DateTimeOffset day in days)
{
string blobPrefix = $"{projectName}/{day:yyyy/MM/dd}/";

await foreach (BlobItem blob in this.buildsContainerClient.GetBlobsAsync(prefix: blobPrefix, cancellationToken: cancellationToken))
{
blobNames.Add(blob.Name);
}
}

return [.. blobNames];
}

public string GetBuildBlobName(Build build)
{
long changeTime = ((DateTimeOffset)build.LastChangedDate).ToUnixTimeSeconds();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Sdk.Tools.PipelineWitness.Configuration;
using Azure.Sdk.Tools.PipelineWitness.Services;
using Azure.Sdk.Tools.PipelineWitness.Services.WorkTokens;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.TeamFoundation.Build.WebApi;
using Microsoft.VisualStudio.Services.WebApi;

namespace Azure.Sdk.Tools.PipelineWitness.AzurePipelines
{
public class MissingAzurePipelineRunsWorker : PeriodicLockingBackgroundService
{
private readonly ILogger<MissingAzurePipelineRunsWorker> logger;
private readonly AzurePipelinesProcessor runProcessor;
private readonly BuildCompleteQueue buildCompleteQueue;
private readonly IOptions<PipelineWitnessSettings> options;
private readonly EnhancedBuildHttpClient buildClient;

public MissingAzurePipelineRunsWorker(
ILogger<MissingAzurePipelineRunsWorker> logger,
AzurePipelinesProcessor runProcessor,
IAsyncLockProvider asyncLockProvider,
VssConnection vssConnection,
BuildCompleteQueue buildCompleteQueue,
IOptions<PipelineWitnessSettings> options)
: base(
logger,
asyncLockProvider,
options.Value.MissingPipelineRunsWorker)
{
this.logger = logger;
this.runProcessor = runProcessor;
this.buildCompleteQueue = buildCompleteQueue;
this.options = options;

ArgumentNullException.ThrowIfNull(vssConnection);

this.buildClient = vssConnection.GetClient<EnhancedBuildHttpClient>();
}

protected override async Task ProcessAsync(CancellationToken cancellationToken)
{
var settings = this.options.Value;

// search for builds that completed within this window
var buildMinTime = DateTimeOffset.UtcNow.Subtract(settings.MissingPipelineRunsWorker.LookbackPeriod);
var buildMaxTime = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromHours(1));

foreach (string project in settings.Projects)
{
var knownBlobs = await this.runProcessor.GetBuildBlobNamesAsync(project, buildMinTime, buildMaxTime, cancellationToken);

string continuationToken = null;
do
{
var completedBuilds = await this.buildClient.GetBuildsAsync2(
project,
minFinishTime: buildMinTime.DateTime,
maxFinishTime: buildMaxTime.DateTime,
statusFilter: BuildStatus.Completed,
continuationToken: continuationToken,
cancellationToken: cancellationToken);

var skipCount = 0;
var enqueueCount = 0;
foreach (var build in completedBuilds)
{
var blobName = this.runProcessor.GetBuildBlobName(build);

if (knownBlobs.Contains(blobName, StringComparer.InvariantCultureIgnoreCase))
{
skipCount++;
continue;
}

var queueMessage = new BuildCompleteQueueMessage
{
Account = settings.Account,
ProjectId = build.Project.Id,
BuildId = build.Id
};

this.logger.LogInformation("Enqueuing missing build {Project} {BuildId} for processing", build.Project.Name, build.Id);
await this.buildCompleteQueue.EnqueueMessageAsync(queueMessage);
enqueueCount++;
}

this.logger.LogInformation("Enqueued {EnqueueCount} missing builds, skipped {SkipCount} existing builds in project {Project}", enqueueCount, skipCount, project);

continuationToken = completedBuilds.ContinuationToken;
} while(!string.IsNullOrEmpty(continuationToken));
}
}

protected override Task ProcessExceptionAsync(Exception ex)
{
this.logger.LogError(ex, "Error processing missing builds");
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public class PeriodicProcessSettings
/// </summary>
public TimeSpan CooldownPeriod { get; set; }

/// <summary>
/// Gets or sets the amount of history to process in each iteration
/// </summary>
public TimeSpan LookbackPeriod { get; set; }

/// <summary>
/// Gets or sets the name of the distributed lock
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ public class PipelineWitnessSettings
/// </summary>
public PeriodicProcessSettings BuildDefinitionWorker { get; set; }

/// <summary>
/// Gets or sets the loops settins for the Missing Azure Pipline Runs worker
/// </summary>
public PeriodicProcessSettings MissingPipelineRunsWorker { get; set; }

/// <summary>
/// Gets or sets the loops settins for the Missing GitHub Actions worker
/// </summary>
public PeriodicProcessSettings MissingGitHubActionsWorker { get; set; }

/// <summary>
/// Gets or sets the artifact name used by the pipeline owners extraction build
/// </summary>
Expand All @@ -109,5 +119,15 @@ public class PipelineWitnessSettings
/// Gets or sets the container to use for async locks
/// </summary>
public string CosmosAsyncLockContainer { get; set; }

/// <summary>
/// Gets or sets the list of monitored GitHub repositories (Overrides GitHubRepositoriesSource)
/// </summary>
public string[] GitHubRepositories { get; set; }

/// <summary>
/// Gets or sets the url for a list of monitored GitHub repositories
/// </summary>
public string GitHubRepositoriesSource { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Net.Http;
using System.Net.Http.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Azure.Sdk.Tools.PipelineWitness.Configuration;

public class PostConfigureSettings : IPostConfigureOptions<PipelineWitnessSettings>
{
private readonly ILogger logger;

public PostConfigureSettings(ILogger<PostConfigureSettings> logger)
{
this.logger = logger;
}

public void PostConfigure(string name, PipelineWitnessSettings options)
{
if (options.GitHubRepositories == null || options.GitHubRepositories.Length == 0)
{
options.GitHubRepositories = [];

if (string.IsNullOrEmpty(options.GitHubRepositoriesSource))
{
this.logger.LogWarning("No GitHubRepositories or GitHubRepositoriesSource configured");
return;
}

try
{
this.logger.LogInformation("Replacing settings property GitHubRepositories with values from {Source}", options.GitHubRepositoriesSource);
using var client = new HttpClient();

options.GitHubRepositories = client.GetFromJsonAsync<string[]>(options.GitHubRepositoriesSource)
.ConfigureAwait(true)
.GetAwaiter()
.GetResult();
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error loading repository list from source");
return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,27 @@ private async Task<IActionResult> ProcessWorkflowRunEventAsync()

if (action == "completed")
{
var queueMessage = new RunCompleteQueueMessage
{
Owner = eventMessage.GetProperty("repository").GetProperty("owner").GetProperty("login").GetString(),
Repository = eventMessage.GetProperty("repository").GetProperty("name").GetString(),
RunId = eventMessage.GetProperty("workflow_run").GetProperty("id").GetInt64(),
};

this.logger.LogInformation("Enqueuing GitHubRunCompleteMessage for {Owner}/{Repository} run {RunId}", queueMessage.Owner, queueMessage.Repository, queueMessage.RunId);
string owner = eventMessage.GetProperty("repository").GetProperty("owner").GetProperty("login").GetString();
string repository = eventMessage.GetProperty("repository").GetProperty("name").GetString();
long runId = eventMessage.GetProperty("workflow_run").GetProperty("id").GetInt64();

await this.queueClient.SendMessageAsync(JsonSerializer.Serialize(queueMessage));
if (this.settings.GitHubRepositories.Contains($"{owner}/{repository}", StringComparer.InvariantCultureIgnoreCase))
{
this.logger.LogInformation("Enqueuing GitHubRunCompleteMessage for {Owner}/{Repository} run {RunId}", owner, repository, runId);

var queueMessage = new RunCompleteQueueMessage
{
Owner = owner,
Repository = repository,
RunId = runId,
};

await this.queueClient.SendMessageAsync(JsonSerializer.Serialize(queueMessage));
}
else
{
this.logger.LogInformation("Skipping message for unknown repostory {Owner}/{Repository}", owner, repository);
}
}

return Ok();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,32 @@ public async Task ProcessAsync(string owner, string repository, long runId)
}
}

public async Task<string[]> GetRunBlobNamesAsync(string repository, DateTimeOffset minTime, DateTimeOffset maxTime, CancellationToken cancellationToken)
{
DateTimeOffset minDay = minTime.ToUniversalTime().Date;
DateTimeOffset maxDay = maxTime.ToUniversalTime().Date;

DateTimeOffset[] days = Enumerable.Range(0, (int)(maxDay - minDay).TotalDays + 1)
.Select(offset => minDay.AddDays(offset))
.ToArray();

List<string> blobNames = [];

foreach (DateTimeOffset day in days)
{
string blobPrefix = $"{repository}/{day:yyyy/MM/dd}/".ToLower();

AsyncPageable<BlobItem> blobs = this.runsContainerClient.GetBlobsAsync(prefix: blobPrefix, cancellationToken: cancellationToken);

await foreach (BlobItem blob in blobs)
{
blobNames.Add(blob.Name);
}
}

return blobNames.ToArray();
}

public string GetRunBlobName(WorkflowRun run)
{
string repository = run.Repository.FullName;
Expand Down
Loading