diff --git a/src/Dapr.Jobs/DaprJobsGrpcClient.cs b/src/Dapr.Jobs/DaprJobsGrpcClient.cs index 8f0e33fea..2b5683d79 100644 --- a/src/Dapr.Jobs/DaprJobsGrpcClient.cs +++ b/src/Dapr.Jobs/DaprJobsGrpcClient.cs @@ -45,8 +45,8 @@ public override async Task ScheduleJobAsync(string jobName, DaprJobSchedule sche DateTimeOffset? ttl = null, bool overwrite = false, IJobFailurePolicyOptions? failurePolicyOptions = null, CancellationToken cancellationToken = default) { - ArgumentNullException.ThrowIfNull(jobName, nameof(jobName)); - ArgumentNullException.ThrowIfNull(schedule, nameof(schedule)); + ArgumentNullException.ThrowIfNull(jobName); + ArgumentNullException.ThrowIfNull(schedule); var job = new Autogenerated.Job { Name = jobName }; @@ -202,13 +202,35 @@ internal static DaprJobDetails DeserializeJobResponse(Autogenerated.GetJobRespon var schedule = DateTime.TryParse(response.Job.DueTime, out var dueTime) ? DaprJobSchedule.FromDateTime(dueTime) : new DaprJobSchedule(response.Job.Schedule); - + + IFailurePolicyResponse? failurePolicyResponse = null; + if (response.Job.FailurePolicy?.PolicyCase is not null) + { + switch (response.Job.FailurePolicy.PolicyCase) + { + case Autogenerated.JobFailurePolicy.PolicyOneofCase.Drop: + failurePolicyResponse = new ConfiguredDropFailurePolicy(); + break; + case Autogenerated.JobFailurePolicy.PolicyOneofCase.Constant: + failurePolicyResponse = new ConfiguredConstantFailurePolicy( + response.Job.FailurePolicy.Constant.HasMaxRetries, + (int)response.Job.FailurePolicy.Constant.MaxRetries, + response.Job.FailurePolicy.Constant.Interval.ToTimeSpan()); + break; + case Autogenerated.JobFailurePolicy.PolicyOneofCase.None: + default: + failurePolicyResponse = null; + break; + } + } + return new DaprJobDetails(schedule) { DueTime = !string.IsNullOrWhiteSpace(response.Job.DueTime) ? DateTime.Parse(response.Job.DueTime) : null, Ttl = !string.IsNullOrWhiteSpace(response.Job.Ttl) ? DateTime.Parse(response.Job.Ttl) : null, RepeatCount = (int?)response.Job.Repeats ?? 0, - Payload = response.Job.Data?.ToByteArray() ?? null + Payload = response.Job.Data?.Value?.ToByteArray() ?? null, + FailurePolicy = failurePolicyResponse }; } diff --git a/src/Dapr.Jobs/Models/Responses/ConfiguredConstantFailurePolicy.cs b/src/Dapr.Jobs/Models/Responses/ConfiguredConstantFailurePolicy.cs new file mode 100644 index 000000000..8bd1722c5 --- /dev/null +++ b/src/Dapr.Jobs/Models/Responses/ConfiguredConstantFailurePolicy.cs @@ -0,0 +1,29 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Jobs.Models.Responses; + +/// +/// Reflects the currently configured policy and values communicated by the Dapr runtime. +/// +/// Whether the maximum retries specified by the policy have been reached. +/// The maximum number of retries allowed by the configured policy. +/// The duration of the retry interval. +public sealed record ConfiguredConstantFailurePolicy( + bool HasMaxRetries, + int? MaxRetries, + TimeSpan Duration) : IFailurePolicyResponse +{ + /// + public JobFailurePolicy Type => JobFailurePolicy.Constant; +} diff --git a/src/Dapr.Jobs/Models/Responses/ConfiguredDropFailurePolicy.cs b/src/Dapr.Jobs/Models/Responses/ConfiguredDropFailurePolicy.cs new file mode 100644 index 000000000..8a32ea1ee --- /dev/null +++ b/src/Dapr.Jobs/Models/Responses/ConfiguredDropFailurePolicy.cs @@ -0,0 +1,23 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Jobs.Models.Responses; + +/// +/// Reflects the currently configured drop failure policy communicated by the Dapr runtime. +/// +public sealed record ConfiguredDropFailurePolicy : IFailurePolicyResponse +{ + /// + public JobFailurePolicy Type => JobFailurePolicy.Drop; +} diff --git a/src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs b/src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs index 77911c7b6..fc0aebb5f 100644 --- a/src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs +++ b/src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs @@ -52,5 +52,5 @@ public sealed record DaprJobDetails(DaprJobSchedule Schedule) /// /// Defines the characteristics of the policy to apply when a job fails to trigger. /// - public JobFailurePolicy? FailurePolicy { get; init; } + public IFailurePolicyResponse? FailurePolicy { get; init; } } diff --git a/src/Dapr.Jobs/Models/Responses/IFailurePolicyResponse.cs b/src/Dapr.Jobs/Models/Responses/IFailurePolicyResponse.cs new file mode 100644 index 000000000..8f718b349 --- /dev/null +++ b/src/Dapr.Jobs/Models/Responses/IFailurePolicyResponse.cs @@ -0,0 +1,25 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Jobs.Models.Responses; + +/// +/// Reflects a standard failure policy response. +/// +public interface IFailurePolicyResponse +{ + /// + /// The type of policy represented by this type. + /// + JobFailurePolicy Type { get; } +} diff --git a/test/Dapr.IntegrationTest.Jobs/JobFailurePolicyTests.cs b/test/Dapr.IntegrationTest.Jobs/JobFailurePolicyTests.cs new file mode 100644 index 000000000..b63e3a06b --- /dev/null +++ b/test/Dapr.IntegrationTest.Jobs/JobFailurePolicyTests.cs @@ -0,0 +1,145 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Jobs; +using Dapr.Jobs.Extensions; +using Dapr.Jobs.Models; +using Dapr.Jobs.Models.Responses; +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Dapr.E2E.Test.Jobs; + +public sealed class JobFailurePolicyTests +{ + [Fact] + public async Task ShouldScheduleJobWithDropFailurePolicy() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"drop-policy-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory _, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received job with drop failure policy {Job}", incomingJobName); + invocationTcs.TrySetResult(incomingJobName); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var dropPolicy = new JobFailurePolicyDropOptions(); + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(2)), + failurePolicyOptions: dropPolicy, repeats: 1, overwrite: true); + + var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(jobName, received); + + var ex = await Assert.ThrowsAnyAsync(() => daprJobsClient.GetJobAsync(jobName)); + Assert.NotNull(ex.InnerException); + Assert.Contains("job not found", ex.InnerException.Message); + } + + [Fact] + public async Task ShouldScheduleJobWithConstantFailurePolicy() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"constant-policy-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory _, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received job with constant failure policy {Job}", incomingJobName); + invocationTcs.TrySetResult(incomingJobName); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + const int maxRetries = 3; + var constantPolicy = new JobFailurePolicyConstantOptions(TimeSpan.FromSeconds(5)) + { + MaxRetries = maxRetries + }; + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(2)), + failurePolicyOptions: constantPolicy, repeats: 10, overwrite: true); + + var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(jobName, received); + + var jobDetails = await daprJobsClient.GetJobAsync(jobName); + Assert.NotNull(jobDetails.FailurePolicy); + Assert.Equal(JobFailurePolicy.Constant, jobDetails.FailurePolicy.Type); + if (jobDetails.FailurePolicy is ConfiguredConstantFailurePolicy failurePolicy) + { + Assert.True(failurePolicy.HasMaxRetries); + Assert.Equal(maxRetries, failurePolicy.MaxRetries); + Assert.Equal(TimeSpan.FromSeconds(5), failurePolicy.Duration); + } + else + { + Assert.Fail(); + } + } +} diff --git a/test/Dapr.IntegrationTest.Jobs/JobManagementTests.cs b/test/Dapr.IntegrationTest.Jobs/JobManagementTests.cs new file mode 100644 index 000000000..583343da4 --- /dev/null +++ b/test/Dapr.IntegrationTest.Jobs/JobManagementTests.cs @@ -0,0 +1,197 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text; +using Dapr.Jobs; +using Dapr.Jobs.Extensions; +using Dapr.Jobs.Models; +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.E2E.Test.Jobs; + +public sealed class JobManagementTests +{ + [Fact] + public async Task ShouldGetJobDetails() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"get-job-{Guid.NewGuid():N}"; + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var payload = "Test Payload"u8.ToArray(); + var schedule = DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(10)); + var startingFrom = DateTimeOffset.UtcNow.AddMinutes(1); + await daprJobsClient.ScheduleJobAsync(jobName, schedule, payload, startingFrom: startingFrom, + repeats: 10, overwrite: true); + var expected = startingFrom.ToLocalTime(); + + var jobDetails = await daprJobsClient.GetJobAsync(jobName); + + Assert.NotNull(jobDetails); + Assert.Equal(expected.ToString("O"), jobDetails.Schedule.ExpressionValue); + Assert.Equal(10, jobDetails.RepeatCount); + Assert.NotNull(jobDetails.Payload); + Assert.Equal(Encoding.UTF8.GetString(payload), Encoding.UTF8.GetString(jobDetails.Payload)); + } + + [Fact] + public async Task ShouldDeleteJob() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"delete-job-{Guid.NewGuid():N}"; + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromHours(1)), + overwrite: true); + + var jobDetails = await daprJobsClient.GetJobAsync(jobName); + Assert.NotNull(jobDetails); + + await daprJobsClient.DeleteJobAsync(jobName); + + await Assert.ThrowsAsync(async () => + await daprJobsClient.GetJobAsync(jobName)); + } + + [Fact] + public async Task ShouldOverwriteExistingJob() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"overwrite-job-{Guid.NewGuid():N}"; + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var originalPayload = "Original"u8.ToArray(); + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromHours(1)), + originalPayload, repeats: 5, overwrite: true); + + var originalDetails = await daprJobsClient.GetJobAsync(jobName); + Assert.Equal(5, originalDetails.RepeatCount); + + var newPayload = "Updated"u8.ToArray(); + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromMinutes(30)), + newPayload, repeats: 10, overwrite: true); + + var updatedDetails = await daprJobsClient.GetJobAsync(jobName); + Assert.Equal(10, updatedDetails.RepeatCount); + Assert.Equal(Encoding.UTF8.GetString(newPayload), Encoding.UTF8.GetString(updatedDetails.Payload!)); + + await daprJobsClient.DeleteJobAsync(jobName); + } + + [Fact] + public async Task ShouldScheduleJobWithTTL() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"ttl-job-{Guid.NewGuid():N}"; + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var startTime = DateTimeOffset.UtcNow.AddSeconds(2); + var ttl = DateTimeOffset.UtcNow.AddMinutes(5); + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(1)), + startingFrom: startTime, ttl: ttl, repeats: 100, overwrite: true); + + var jobDetails = await daprJobsClient.GetJobAsync(jobName); + Assert.NotNull(jobDetails); + Assert.NotNull(jobDetails.Ttl); + + await daprJobsClient.DeleteJobAsync(jobName); + } +} diff --git a/test/Dapr.IntegrationTest.Jobs/JobPayloadTests.cs b/test/Dapr.IntegrationTest.Jobs/JobPayloadTests.cs new file mode 100644 index 000000000..963dd68ab --- /dev/null +++ b/test/Dapr.IntegrationTest.Jobs/JobPayloadTests.cs @@ -0,0 +1,225 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text; +using System.Text.Json; +using Dapr.Jobs; +using Dapr.Jobs.Extensions; +using Dapr.Jobs.Models; +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Dapr.E2E.Test.Jobs; + +public sealed class JobPayloadTests +{ + [Fact] + public async Task ShouldHandleEmptyPayload() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"empty-payload-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory payload, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received job with empty payload {Job}", incomingJobName); + invocationTcs.TrySetResult(payload.IsEmpty); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(2)), + payload: null, repeats: 1, overwrite: true); + + var isEmpty = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.True(isEmpty); + } + + [Fact] + public async Task ShouldHandleJsonPayload() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"json-payload-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory payload, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received job with JSON payload {Job}", incomingJobName); + var payloadStr = Encoding.UTF8.GetString(payload.Span); + var deserializedPayload = JsonSerializer.Deserialize(payloadStr); + invocationTcs.TrySetResult(deserializedPayload!); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var testPayload = new TestPayload("Test Message", 42, DateTimeOffset.UtcNow); + var jsonPayload = JsonSerializer.SerializeToUtf8Bytes(testPayload); + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(2)), + jsonPayload, repeats: 1, overwrite: true); + + var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(testPayload.Message, received.Message); + Assert.Equal(testPayload.Value, received.Value); + } + + [Fact] + public async Task ShouldHandleLargePayload() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"large-payload-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory payload, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received job with large payload {Job}", incomingJobName); + invocationTcs.TrySetResult(payload.Length); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var largePayload = new byte[10000]; + new Random().NextBytes(largePayload); + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(2)), + largePayload, repeats: 1, overwrite: true); + + var receivedSize = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(largePayload.Length, receivedSize); + } + + [Fact] + public async Task ShouldHandleBinaryPayload() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"binary-payload-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory payload, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received job with binary payload {Job}", incomingJobName); + invocationTcs.TrySetResult(payload.ToArray()); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var binaryPayload = new byte[] { 0x00, 0xFF, 0x42, 0xAB, 0xCD, 0xEF }; + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(2)), + binaryPayload, repeats: 1, overwrite: true); + + var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(binaryPayload, received); + } + + private record TestPayload(string Message, int Value, DateTimeOffset Timestamp); +} diff --git a/test/Dapr.IntegrationTest.Jobs/JobSchedulingTests.cs b/test/Dapr.IntegrationTest.Jobs/JobSchedulingTests.cs new file mode 100644 index 000000000..931b0adf9 --- /dev/null +++ b/test/Dapr.IntegrationTest.Jobs/JobSchedulingTests.cs @@ -0,0 +1,267 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Jobs; +using Dapr.Jobs.Extensions; +using Dapr.Jobs.Models; +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Dapr.E2E.Test.Jobs; + +public sealed class JobSchedulingTests +{ + [Fact] + public async Task ShouldScheduleJobWithCronExpression() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"cron-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory _, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received cron job {Job}", incomingJobName); + invocationTcs.TrySetResult(incomingJobName); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var cronSchedule = new CronExpressionBuilder() + .Every(EveryCronPeriod.Second, 15); + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromCronExpression(cronSchedule), + repeats: 1, overwrite: true); + + var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(jobName, received); + } + + // Too long of a test + // [Fact] + // public async Task ShouldScheduleJobWithPrefixedPeriod() + // { + // var options = new DaprRuntimeOptions(); + // var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + // var jobName = $"hourly-job-{Guid.NewGuid():N}"; + // + // var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + // + // var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + // await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + // .ConfigureServices(builder => + // { + // builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + // { + // var config = sp.GetRequiredService(); + // var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + // var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + // + // if (!string.IsNullOrEmpty(grpcEndpoint)) + // clientBuilder.UseGrpcEndpoint(grpcEndpoint); + // if (!string.IsNullOrEmpty(httpEndpoint)) + // clientBuilder.UseHttpEndpoint(httpEndpoint); + // }); + // }) + // .ConfigureApp(app => + // { + // app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory payload, + // ILogger? logger, CancellationToken _) => + // { + // logger?.LogInformation("Received prefixed period job {Job}", incomingJobName); + // invocationTcs.TrySetResult(incomingJobName); + // }); + // }) + // .BuildAndStartAsync(); + // + // using var scope = testApp.CreateScope(); + // var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + // + // await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.Hourly, + // repeats: 1, overwrite: true); + // + // var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + // Assert.Equal(jobName, received); + // } + + [Fact] + public async Task ShouldScheduleJobWithDateTime() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"datetime-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory _, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received datetime job {Job}", incomingJobName); + invocationTcs.TrySetResult(incomingJobName); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var scheduledTime = DateTimeOffset.UtcNow.AddSeconds(5); + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDateTime(scheduledTime), + overwrite: true); + + var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(jobName, received); + } + + [Fact] + public async Task ShouldScheduleJobWithStartingFrom() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"startingfrom-job-{Guid.NewGuid():N}"; + + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory _, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received job with startingFrom {Job}", incomingJobName); + invocationTcs.TrySetResult(incomingJobName); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + var startTime = DateTimeOffset.UtcNow.AddSeconds(5); + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(1)), + startingFrom: startTime, repeats: 1, overwrite: true); + + var received = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(jobName, received); + } + + [Fact] + public async Task ShouldScheduleMultipleRepeatingJob() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("jobs-component"); + var jobName = $"repeating-job-{Guid.NewGuid():N}"; + + var receivedCount = 0; + var invocationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var harness = new DaprHarnessBuilder(options).BuildJobs(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprJobsClient(configure: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + var httpEndpoint = config["DAPR_HTTP_ENDPOINT"]; + + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + if (!string.IsNullOrEmpty(httpEndpoint)) + clientBuilder.UseHttpEndpoint(httpEndpoint); + }); + }) + .ConfigureApp(app => + { + app.MapDaprScheduledJobHandler((string incomingJobName, ReadOnlyMemory _, + ILogger? logger, CancellationToken _) => + { + logger?.LogInformation("Received repeating job {Job} iteration {Count}", incomingJobName, receivedCount + 1); + var count = Interlocked.Increment(ref receivedCount); + if (count == 3) + { + invocationTcs.TrySetResult(count); + } + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprJobsClient = scope.ServiceProvider.GetRequiredService(); + + await daprJobsClient.ScheduleJobAsync(jobName, DaprJobSchedule.FromDuration(TimeSpan.FromSeconds(5)), + repeats: 3, overwrite: true); + + var finalCount = await invocationTcs.Task.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.Equal(3, finalCount); + } +}