Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions sdk/eventhub/Azure.Messaging.EventHubs/stress/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ description: Stress tests for Event Hubs for .NET

dependencies:
- name: stress-test-addons
version: 0.1.20
version: 0.2.0
repository: https://stresstestcharts.blob.core.windows.net/helm/

annotations:
stressTest: 'true'
namespace: 'net'
dockerbuilddir: '../../../..'
dockerfile: './Dockerfile'
6 changes: 3 additions & 3 deletions sdk/eventhub/Azure.Messaging.EventHubs/stress/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM mcr.microsoft.com/dotnet/sdk:6.0-cbl-mariner1.0 AS build-env
FROM mcr.microsoft.com/dotnet/sdk:7.0 AS build-env

# Copy in engineering system needed to build
COPY ./eng/ /app/eng/
Expand All @@ -23,9 +23,9 @@ RUN dotnet build './sdk/eventhub/Azure.Messaging.EventHubs/stress/src/' --config
# Copy in the dll files to be ready to run
FROM build-env as publish
WORKDIR /app
COPY --from=build-env /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0/ /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0/
COPY --from=build-env /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net7.0/ /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net7.0/

WORKDIR /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net6.0
WORKDIR /app/artifacts/bin/Azure.Messaging.EventHubs.Stress/Release/net7.0

# The default is running just the "EventProducerTest"
ENTRYPOINT ["dotnet Azure.Messaging.EventHubs.Stress.dll", "--test"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
scenarios:
- scenarioName: Consumer
imageBuildDir: ../../../..
Scenario: consumer
image: Dockerfile
imageTag: stresspgs7b6dif73rup6.azurecr.io/mredding/eventhub-net-stress/dockerfile:mredding
- scenarioName: EventProd
imageBuildDir: ../../../..
Scenario: eventprod
image: Dockerfile
imageTag: stresspgs7b6dif73rup6.azurecr.io/mredding/eventhub-net-stress/dockerfile:mredding
- scenarioName: BuffProd
imageBuildDir: ../../../..
Scenario: buffprod
image: Dockerfile
imageTag: stresspgs7b6dif73rup6.azurecr.io/mredding/eventhub-net-stress/dockerfile:mredding
- scenarioName: BurstBuffProd
imageBuildDir: ../../../..
Scenario: burstbuffprod
image: Dockerfile
imageTag: stresspgs7b6dif73rup6.azurecr.io/mredding/eventhub-net-stress/dockerfile:mredding
- scenarioName: Processor
imageBuildDir: ../../../..
Scenario: processor
image: Dockerfile
imageTag: stresspgs7b6dif73rup6.azurecr.io/mredding/eventhub-net-stress/dockerfile:mredding

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
matrix:
scenarios:
consumer:
image: Dockerfile
imageBuildDir: "../../../.."
scenarioName: "Consumer"
eventprod:
image: Dockerfile
imageBuildDir: "../../../.."
scenarioName: "EventProd"
buffprod:
image: Dockerfile
imageBuildDir: "../../../.."
scenarioName: "BuffProd"
burstbuffprod:
image: Dockerfile
imageBuildDir: "../../../.."
scenarioName: "BurstBuffProd"
processor:
image: Dockerfile
imageBuildDir: "../../../.."
scenarioName: "Processor"
45 changes: 24 additions & 21 deletions sdk/eventhub/Azure.Messaging.EventHubs/stress/src/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ private static async Task RunOptions(Options opts)
// test scenario runs are run in parallel.

var testScenarioTasks = new List<Task>();
var testsToRun = opts.All ? Enum.GetValues(typeof(TestScenario)) : new TestScenario[]{StringToTestScenario(opts.Test)};
var testsToRun = opts.All ? Enum.GetValues(typeof(TestScenarioName)) : new TestScenarioName[]{StringToTestScenario(opts.Test)};

var testParameters = new TestParameters();
testParameters.EventHubsConnectionString = eventHubsConnectionString;
var runAllRoles = !int.TryParse(opts.Role, out var roleIndex);
testParameters.JobIndex = roleIndex;
testParameters.RunAllRoles = runAllRoles;

var cancellationSource = new CancellationTokenSource();
var runDuration = TimeSpan.FromHours(testParameters.DurationInHours);
Expand All @@ -74,7 +77,7 @@ private static async Task RunOptions(Options opts)

try
{
foreach (TestScenario testScenario in testsToRun)
foreach (TestScenarioName testScenario in testsToRun)
{
var testName = testScenario.ToString();
metrics.Client.Context.GlobalProperties["TestName"] = testName;
Expand All @@ -86,31 +89,31 @@ private static async Task RunOptions(Options opts)

switch (testScenario)
{
case TestScenario.BufferedProducerTest:
case TestScenarioName.BufferedProducerTest:
environment.TryGetValue(EnvironmentVariables.EventHubBufferedProducerTest, out eventHubName);
testParameters.EventHub = PromptForResources("Event Hub", testName, eventHubName, opts.Interactive);

var bufferedProducerTest = new BufferedProducerTest(testParameters, metrics, opts.Role);
var bufferedProducerTest = new BufferedProducerTest(testParameters, metrics);
testScenarioTasks.Add(bufferedProducerTest.RunTestAsync(cancellationSource.Token));
break;

case TestScenario.BurstBufferedProducerTest:
case TestScenarioName.BurstBufferedProducerTest:
environment.TryGetValue(EnvironmentVariables.EventHubBurstBufferedProducerTest, out eventHubName);
testParameters.EventHub = PromptForResources("Event Hub", testName, eventHubName, opts.Interactive);

var burstBufferedProducerTest = new BurstBufferedProducerTest(testParameters, metrics, opts.Role);
var burstBufferedProducerTest = new BurstBufferedProducerTest(testParameters, metrics);
testScenarioTasks.Add(burstBufferedProducerTest.RunTestAsync(cancellationSource.Token));
break;

case TestScenario.EventProducerTest:
case TestScenarioName.EventProducerTest:
environment.TryGetValue(EnvironmentVariables.EventHubEventProducerTest, out eventHubName);
testParameters.EventHub = PromptForResources("Event Hub", testName, eventHubName, opts.Interactive);

var eventProducerTest = new EventProducerTest(testParameters, metrics, opts.Role);
var eventProducerTest = new EventProducerTest(testParameters, metrics);
testScenarioTasks.Add(eventProducerTest.RunTestAsync(cancellationSource.Token));
break;

case TestScenario.ProcessorTest:
case TestScenarioName.ProcessorTest:
// Get the Event Hub name for this test
environment.TryGetValue(EnvironmentVariables.EventHubProcessorTest, out eventHubName);
testParameters.EventHub = PromptForResources("Event Hub", testName, eventHubName, opts.Interactive);
Expand All @@ -123,15 +126,15 @@ private static async Task RunOptions(Options opts)
environment.TryGetValue(EnvironmentVariables.StorageAccountProcessorTest, out storageConnectionString);
testParameters.StorageConnectionString = PromptForResources("Storage Account Connection String", testName, storageConnectionString, opts.Interactive);

var processorTest = new ProcessorTest(testParameters, metrics, opts.Role);
var processorTest = new ProcessorTest(testParameters, metrics);
testScenarioTasks.Add(processorTest.RunTestAsync(cancellationSource.Token));
break;

case TestScenario.ConsumerTest:
case TestScenarioName.ConsumerTest:
environment.TryGetValue(EnvironmentVariables.EventHubBurstBufferedProducerTest, out eventHubName);
testParameters.EventHub = PromptForResources("Event Hub", testName, eventHubName, opts.Interactive);

var consumerTest = new ConsumerTest(testParameters, metrics, opts.Role);
var consumerTest = new ConsumerTest(testParameters, metrics);
testScenarioTasks.Add(consumerTest.RunTestAsync(cancellationSource.Token));
break;
}
Expand Down Expand Up @@ -175,20 +178,20 @@ private static async Task RunOptions(Options opts)
}

/// <summary>
/// Converts a string into a <see cref="TestScenario"/> value.
/// Converts a string into a <see cref="TestScenarioName"/> value.
/// </summary>
///
/// <param name="testScenario">The string to convert to a <see cref="TestScenario"/>.</param>
/// <param name="testScenario">The string to convert to a <see cref="TestScenarioName"/>.</param>
///
/// <returns>The <see cref="TestScenario"/> of the string input.</returns>
/// <returns>The <see cref="TestScenarioName"/> of the string input.</returns>
///
public static TestScenario StringToTestScenario(string testScenario) => testScenario switch
public static TestScenarioName StringToTestScenario(string testScenario) => testScenario switch
{
"BufferedProducerTest" or "BuffProd" => TestScenario.BufferedProducerTest,
"BurstBufferedProducerTest" or "BurstBuffProd" => TestScenario.BurstBufferedProducerTest,
"EventProducerTest" or "EventProd" => TestScenario.EventProducerTest,
"ProcessorTest" or "Processor" => TestScenario.ProcessorTest,
"ConsumerTest" or "Consumer" => TestScenario.ConsumerTest,
"BufferedProducerTest" or "BuffProd" => TestScenarioName.BufferedProducerTest,
"BurstBufferedProducerTest" or "BurstBuffProd" => TestScenarioName.BurstBufferedProducerTest,
"EventProducerTest" or "EventProd" => TestScenarioName.EventProducerTest,
"ProcessorTest" or "Processor" => TestScenarioName.ProcessorTest,
"ConsumerTest" or "Consumer" => TestScenarioName.ConsumerTest,
_ => throw new ArgumentNullException(),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@ namespace Azure.Messaging.EventHubs.Stress;
/// The test scenario responsible for running all of the roles needed for the Buffered Producer test scenario.
/// <summary/>
///
public class BufferedProducerTest
public class BufferedProducerTest : TestScenario
{
/// <summary>The <see cref="TestParameters"/> used to configure this test scenario.</summary>
private readonly TestParameters _testParameters;

/// <summary>The index used to determine which role should be run if this is a distributed test run.</summary>
private readonly string _jobIndex;

/// <summary> The <see cref="Metrics"/> instance used to send metrics to application insights.</summary>
private Metrics _metrics;
/// <summary> The name of this test.</summary>
public override string Name { get; } = "BufferedProducerTest";

/// <summary> The array of <see cref="Role"/>s needed to run this test scenario.</summary>
private static Role[] _roles = {Role.BufferedPublisher, Role.BufferedPublisher};
public override Role[] Roles { get; } = {Role.BufferedPublisher, Role.BufferedPublisher};

/// <summary>
/// Initializes a new <see cref="BufferedProducerTest"/> instance.
Expand All @@ -35,60 +29,7 @@ public class BufferedProducerTest
/// <param name="jobIndex">An optional index used to determine which role should be run if this is a distributed run.</param>
///
public BufferedProducerTest(TestParameters testParameters,
Metrics metrics,
string jobIndex = default)
{
_testParameters = testParameters;
_jobIndex = jobIndex;
_metrics = metrics;
_metrics.Client.Context.GlobalProperties["TestRunID"] = $"net-buff-prod-{Guid.NewGuid().ToString()}";
}

/// <summary>
/// Runs all of the roles required for this instance of the Buffered Producer test scenario.
/// </summary>
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
public async Task RunTestAsync(CancellationToken cancellationToken)
Metrics metrics) : base(testParameters, metrics, $"net-buff-prod-{Guid.NewGuid().ToString()}")
{
var runAllRoles = !int.TryParse(_jobIndex, out var roleIndex);
var testRunTasks = new List<Task>();

if (runAllRoles)
{
foreach (Role role in _roles)
{
testRunTasks.Add(RunRoleAsync(role, cancellationToken));
}
}
else
{
testRunTasks.Add(RunRoleAsync(_roles[roleIndex], cancellationToken));
}

await Task.WhenAll(testRunTasks).ConfigureAwait(false);
}

/// <summary>
/// Creates a role instance and runs that role.
/// </summary>
///
/// <param name="role">The <see cref="Role"/> to run.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
private Task RunRoleAsync(Role role,
CancellationToken cancellationToken)
{
switch (role)
{
case Role.BufferedPublisher:
var publisherConfiguration = new BufferedPublisherConfiguration();
var publisher = new BufferedPublisher(_testParameters, publisherConfiguration, _metrics);
return Task.Run(() => publisher.RunAsync(cancellationToken));

default:
throw new NotSupportedException($"Running role { role.ToString() } is not supported by this test scenario.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,14 @@ namespace Azure.Messaging.EventHubs.Stress;
/// The test scenario responsible for running all of the roles needed for the burst Buffered Producer test scenario.
/// <summary/>
///
public class BurstBufferedProducerTest
public class BurstBufferedProducerTest : TestScenario
{
/// <summary>The <see cref="TestParameters"/> used to configure this test scenario.</summary>
private readonly TestParameters _testParameters;

/// <summary>The index used to determine which role should be run if this is a distributed test run.</summary>
private readonly string _jobIndex;

/// <summary> The <see cref="Metrics"/> instance used to send metrics to application insights.</summary>
private Metrics _metrics;
/// <summary> The name of this test.</summary>
public override string Name { get; } = "BurstBufferedProducerTest";

/// <summary> The set of <see cref="Role"/>s needed to run this test scenario.</summary>

private static Role[] _roles = {Role.BufferedPublisher};
public override Role[] Roles { get; } = {Role.BufferedPublisher};

/// <summary>
/// Initializes a new <see cref="BurstBufferedProducerTest"/> instance.
Expand All @@ -36,60 +30,7 @@ public class BurstBufferedProducerTest
/// <param name="jobIndex">An optional index used to determine which role should be run if this is a distributed run.</param>
///
public BurstBufferedProducerTest(TestParameters testParameters,
Metrics metrics,
string jobIndex = default)
{
_testParameters = testParameters;
_jobIndex = jobIndex;
_metrics = metrics;
_metrics.Client.Context.GlobalProperties["TestRunID"] = $"net-burst-buff-{Guid.NewGuid().ToString()}";
}

/// <summary>
/// Runs all of the roles required for this instance of the burst Buffered Producer test scenario.
/// </summary>
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
public async Task RunTestAsync(CancellationToken cancellationToken)
Metrics metrics) : base(testParameters, metrics, $"net-burst-buff-{Guid.NewGuid().ToString()}")
{
var runAllRoles = !int.TryParse(_jobIndex, out var roleIndex);
var testRunTasks = new List<Task>();

if (runAllRoles)
{
foreach (Role role in _roles)
{
testRunTasks.Add(RunRoleAsync(role, cancellationToken));
}
}
else
{
testRunTasks.Add(RunRoleAsync(_roles[roleIndex], cancellationToken));
}

await Task.WhenAll(testRunTasks).ConfigureAwait(false);
}

/// <summary>
/// Creates a role instance and runs that role.
/// </summary>
///
/// <param name="role">The <see cref="Role"/> to run.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
private Task RunRoleAsync(Role role,
CancellationToken cancellationToken)
{
switch (role)
{
case Role.BufferedPublisher:
var publisherConfiguration = new BufferedPublisherConfiguration();
var publisher = new BufferedPublisher(_testParameters, publisherConfiguration, _metrics);
return Task.Run(() => publisher.RunAsync(cancellationToken));

default:
throw new NotSupportedException($"Running role { role.ToString() } is not supported by this test scenario.");
}
}
}
Loading