Skip to content
30 changes: 26 additions & 4 deletions src/Dapr.Jobs/DaprJobsGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public override async Task ScheduleJobAsync(string jobName, DaprJobSchedule sche
DateTimeOffset? ttl = null, bool overwrite = false, IJobFailurePolicyOptions? failurePolicyOptions = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(jobName, nameof(jobName));
ArgumentNullException.ThrowIfNull(schedule, nameof(schedule));
ArgumentNullException.ThrowIfNull(jobName);
ArgumentNullException.ThrowIfNull(schedule);

var job = new Autogenerated.Job { Name = jobName };

Expand Down Expand Up @@ -202,13 +202,35 @@ internal static DaprJobDetails DeserializeJobResponse(Autogenerated.GetJobRespon
var schedule = DateTime.TryParse(response.Job.DueTime, out var dueTime)
? DaprJobSchedule.FromDateTime(dueTime)
: new DaprJobSchedule(response.Job.Schedule);


IFailurePolicyResponse? failurePolicyResponse = null;
if (response.Job.FailurePolicy?.PolicyCase is not null)
{
switch (response.Job.FailurePolicy.PolicyCase)
{
case Autogenerated.JobFailurePolicy.PolicyOneofCase.Drop:
failurePolicyResponse = new ConfiguredDropFailurePolicy();
break;
case Autogenerated.JobFailurePolicy.PolicyOneofCase.Constant:
failurePolicyResponse = new ConfiguredConstantFailurePolicy(
response.Job.FailurePolicy.Constant.HasMaxRetries,
(int)response.Job.FailurePolicy.Constant.MaxRetries,
response.Job.FailurePolicy.Constant.Interval.ToTimeSpan());
break;
case Autogenerated.JobFailurePolicy.PolicyOneofCase.None:
default:
failurePolicyResponse = null;
break;
}
}

return new DaprJobDetails(schedule)
{
DueTime = !string.IsNullOrWhiteSpace(response.Job.DueTime) ? DateTime.Parse(response.Job.DueTime) : null,
Ttl = !string.IsNullOrWhiteSpace(response.Job.Ttl) ? DateTime.Parse(response.Job.Ttl) : null,
RepeatCount = (int?)response.Job.Repeats ?? 0,
Payload = response.Job.Data?.ToByteArray() ?? null
Payload = response.Job.Data?.Value?.ToByteArray() ?? null,
FailurePolicy = failurePolicyResponse
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Jobs.Models.Responses;

/// <summary>
/// Reflects the currently configured policy and values communicated by the Dapr runtime.
/// </summary>
/// <param name="HasMaxRetries">Whether the maximum retries specified by the policy have been reached.</param>
/// <param name="MaxRetries">The maximum number of retries allowed by the configured policy.</param>
/// <param name="Duration">The duration of the retry interval.</param>
public sealed record ConfiguredConstantFailurePolicy(
bool HasMaxRetries,
int? MaxRetries,
TimeSpan Duration) : IFailurePolicyResponse
{
/// <inheritdoc />
public JobFailurePolicy Type => JobFailurePolicy.Constant;
}
23 changes: 23 additions & 0 deletions src/Dapr.Jobs/Models/Responses/ConfiguredDropFailurePolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Jobs.Models.Responses;

/// <summary>
/// Reflects the currently configured drop failure policy communicated by the Dapr runtime.
/// </summary>
public sealed record ConfiguredDropFailurePolicy : IFailurePolicyResponse
{
/// <inheritdoc />
public JobFailurePolicy Type => JobFailurePolicy.Drop;
}
2 changes: 1 addition & 1 deletion src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ public sealed record DaprJobDetails(DaprJobSchedule Schedule)
/// <summary>
/// Defines the characteristics of the policy to apply when a job fails to trigger.
/// </summary>
public JobFailurePolicy? FailurePolicy { get; init; }
public IFailurePolicyResponse? FailurePolicy { get; init; }
}
25 changes: 25 additions & 0 deletions src/Dapr.Jobs/Models/Responses/IFailurePolicyResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Jobs.Models.Responses;

/// <summary>
/// Reflects a standard failure policy response.
/// </summary>
public interface IFailurePolicyResponse
{
/// <summary>
/// The type of policy represented by this type.
/// </summary>
JobFailurePolicy Type { get; }
}
145 changes: 145 additions & 0 deletions test/Dapr.IntegrationTest.Jobs/JobFailurePolicyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Jobs;
using Dapr.Jobs.Extensions;
using Dapr.Jobs.Models;
using Dapr.Jobs.Models.Responses;
using Dapr.TestContainers.Common;
using Dapr.TestContainers.Common.Options;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Dapr.E2E.Test.Jobs;

public sealed class JobFailurePolicyTests
{
[Fact]
public async Task ShouldScheduleJobWithDropFailurePolicy()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component");
var jobName = $"drop-policy-job-{Guid.NewGuid():N}";

var invocationTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);

var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
var httpEndpoint = config["DAPR_HTTP_ENDPOINT"];

if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
if (!string.IsNullOrEmpty(httpEndpoint))
clientBuilder.UseHttpEndpoint(httpEndpoint);
});
})
.ConfigureApp(app =>
{
app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory<byte> _,
ILogger<JobFailurePolicyTests>? logger, CancellationToken _) =>
{
logger?.LogInformation("Received job with drop failure policy {Job}", incomingJobName);
invocationTcs.TrySetResult(incomingJobName);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprJobsClient = scope.ServiceProvider.GetRequiredService<DaprJobsClient>();

var dropPolicy = new JobFailurePolicyDropOptions();

await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(2)),
failurePolicyOptions: dropPolicy, repeats: 1, overwrite: true);

var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30));
Assert.Equal(jobName, received);

var ex = await Assert.ThrowsAnyAsync<DaprException>(() => daprJobsClient.GetJobAsync(jobName));
Assert.NotNull(ex.InnerException);
Assert.Contains("job not found", ex.InnerException.Message);
}

[Fact]
public async Task ShouldScheduleJobWithConstantFailurePolicy()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component");
var jobName = $"constant-policy-job-{Guid.NewGuid():N}";

var invocationTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);

var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
var httpEndpoint = config["DAPR_HTTP_ENDPOINT"];

if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
if (!string.IsNullOrEmpty(httpEndpoint))
clientBuilder.UseHttpEndpoint(httpEndpoint);
});
})
.ConfigureApp(app =>
{
app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory<byte> _,
ILogger<JobFailurePolicyTests>? logger, CancellationToken _) =>
{
logger?.LogInformation("Received job with constant failure policy {Job}", incomingJobName);
invocationTcs.TrySetResult(incomingJobName);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprJobsClient = scope.ServiceProvider.GetRequiredService<DaprJobsClient>();

const int maxRetries = 3;
var constantPolicy = new JobFailurePolicyConstantOptions(TimeSpan.FromSeconds(5))
{
MaxRetries = maxRetries
};

await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(2)),
failurePolicyOptions: constantPolicy, repeats: 10, overwrite: true);

var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30));
Assert.Equal(jobName, received);

var jobDetails = await daprJobsClient.GetJobAsync(jobName);
Assert.NotNull(jobDetails.FailurePolicy);
Assert.Equal(JobFailurePolicy.Constant, jobDetails.FailurePolicy.Type);
if (jobDetails.FailurePolicy is ConfiguredConstantFailurePolicy failurePolicy)
{
Assert.True(failurePolicy.HasMaxRetries);
Assert.Equal(maxRetries, failurePolicy.MaxRetries);
Assert.Equal(TimeSpan.FromSeconds(5), failurePolicy.Duration);
}
else
{
Assert.Fail();
}
}
}
Loading