Skip to content

Commit

Permalink
Fix job execution retrieval by id for MongoDB
Browse files Browse the repository at this point in the history
Resolves #4722

Signed-off-by: Mahmoud Ben Hassine <[email protected]>
  • Loading branch information
hpoettker authored and fmbenhassine committed Dec 6, 2024
1 parent ba2202a commit 82d9c15
Show file tree
Hide file tree
Showing 7 changed files with 396 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.batch.core.repository.dao;

import java.util.Collection;
import java.util.Map;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
Expand Down Expand Up @@ -46,8 +45,9 @@ public MongoExecutionContextDao(MongoOperations mongoOperations) {

@Override
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
org.springframework.batch.core.repository.persistence.JobExecution execution = this.mongoOperations.findById(
jobExecution.getId(), org.springframework.batch.core.repository.persistence.JobExecution.class,
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
org.springframework.batch.core.repository.persistence.JobExecution execution = this.mongoOperations.findOne(
query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
if (execution == null) {
return new ExecutionContext();
Expand All @@ -57,8 +57,9 @@ public ExecutionContext getExecutionContext(JobExecution jobExecution) {

@Override
public ExecutionContext getExecutionContext(StepExecution stepExecution) {
org.springframework.batch.core.repository.persistence.StepExecution execution = this.mongoOperations.findById(
stepExecution.getId(), org.springframework.batch.core.repository.persistence.StepExecution.class,
Query query = query(where("stepExecutionId").is(stepExecution.getId()));
org.springframework.batch.core.repository.persistence.StepExecution execution = this.mongoOperations.findOne(
query, org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
if (execution == null) {
return new ExecutionContext();
Expand All @@ -69,7 +70,7 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) {
@Override
public void saveExecutionContext(JobExecution jobExecution) {
ExecutionContext executionContext = jobExecution.getExecutionContext();
Query query = query(where("_id").is(jobExecution.getId()));
Query query = query(where("jobExecutionId").is(jobExecution.getId()));

Update update = Update.update("executionContext",
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
Expand All @@ -82,7 +83,7 @@ public void saveExecutionContext(JobExecution jobExecution) {
@Override
public void saveExecutionContext(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution.getExecutionContext();
Query query = query(where("_id").is(stepExecution.getId()));
Query query = query(where("stepExecutionId").is(stepExecution.getId()));

Update update = Update.update("executionContext",
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,17 @@ public Set<JobExecution> findRunningJobExecutions(String jobName) {

@Override
public JobExecution getJobExecution(Long executionId) {
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findById(
executionId, org.springframework.batch.core.repository.persistence.JobExecution.class,
Query jobExecutionQuery = query(where("jobExecutionId").is(executionId));
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne(
jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
if (jobExecution == null) {
return null;
}
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findById(
jobExecution.getJobInstanceId(),
org.springframework.batch.core.repository.persistence.JobInstance.class, JOB_INSTANCES_COLLECTION_NAME);
Query jobInstanceQuery = query(where("jobInstanceId").is(jobExecution.getJobInstanceId()));
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
jobInstanceQuery, org.springframework.batch.core.repository.persistence.JobInstance.class,
JOB_INSTANCES_COLLECTION_NAME);
return this.jobExecutionConverter.toJobExecution(jobExecution,
this.jobInstanceConverter.toJobInstance(jobInstance));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ public void updateStepExecution(StepExecution stepExecution) {

@Override
public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
Query query = query(where("stepExecutionId").is(stepExecutionId));
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations
.findById(stepExecutionId, org.springframework.batch.core.repository.persistence.StepExecution.class,
.findOne(query, org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.repository.support;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MongoJobExplorerFactoryBean;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;

/**
* @author Mahmoud Ben Hassine
*/
@Configuration
@EnableBatchProcessing
class MongoDBIntegrationTestConfiguration {

@Bean
public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransactionManager transactionManager)
throws Exception {
MongoJobRepositoryFactoryBean jobRepositoryFactoryBean = new MongoJobRepositoryFactoryBean();
jobRepositoryFactoryBean.setMongoOperations(mongoTemplate);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.afterPropertiesSet();
return jobRepositoryFactoryBean.getObject();
}

@Bean
public JobExplorer jobExplorer(MongoTemplate mongoTemplate, MongoTransactionManager transactionManager)
throws Exception {
MongoJobExplorerFactoryBean jobExplorerFactoryBean = new MongoJobExplorerFactoryBean();
jobExplorerFactoryBean.setMongoOperations(mongoTemplate);
jobExplorerFactoryBean.setTransactionManager(transactionManager);
jobExplorerFactoryBean.afterPropertiesSet();
return jobExplorerFactoryBean.getObject();
}

@Bean
public MongoDatabaseFactory mongoDatabaseFactory(@Value("${mongo.connectionString}") String connectionString) {
MongoClient mongoClient = MongoClients.create(connectionString);
return new SimpleMongoClientDatabaseFactory(mongoClient, "test");
}

@Bean
public MongoTemplate mongoTemplate(MongoDatabaseFactory mongoDatabaseFactory) {
MongoTemplate template = new MongoTemplate(mongoDatabaseFactory);
MappingMongoConverter converter = (MappingMongoConverter) template.getConverter();
converter.setMapKeyDotReplacement(".");
return template;
}

@Bean
public MongoTransactionManager transactionManager(MongoDatabaseFactory mongoDatabaseFactory) {
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager();
mongoTransactionManager.setDatabaseFactory(mongoDatabaseFactory);
mongoTransactionManager.afterPropertiesSet();
return mongoTransactionManager;
}

@Bean
public Job job(JobRepository jobRepository, MongoTransactionManager transactionManager) {
return new JobBuilder("job", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager)
.build())
.next(new StepBuilder("step2", jobRepository)
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager)
.build())
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.repository.support;

import java.time.LocalDateTime;
import java.util.Map;

import org.bson.Document;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;

/**
* @author Henning Pöttker
*/
@Testcontainers(disabledWithoutDocker = true)
@SpringJUnitConfig(MongoDBIntegrationTestConfiguration.class)
public class MongoDBJobExplorerIntegrationTests {

private static final DockerImageName MONGODB_IMAGE = DockerImageName.parse("mongo:8.0.1");

@Container
public static MongoDBContainer mongodb = new MongoDBContainer(MONGODB_IMAGE);

@DynamicPropertySource
static void setMongoDbConnectionString(DynamicPropertyRegistry registry) {
registry.add("mongo.connectionString", mongodb::getConnectionString);
}

@BeforeAll
static void setUp(@Autowired MongoTemplate mongoTemplate) {
mongoTemplate.createCollection("BATCH_JOB_INSTANCE");
mongoTemplate.createCollection("BATCH_JOB_EXECUTION");
mongoTemplate.createCollection("BATCH_STEP_EXECUTION");
mongoTemplate.createCollection("BATCH_SEQUENCES");
mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_JOB_INSTANCE_SEQ", "count", 0L)));
mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_JOB_EXECUTION_SEQ", "count", 0L)));
mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_STEP_EXECUTION_SEQ", "count", 0L)));
}

@Test
void testGetJobExecutionById(@Autowired JobLauncher jobLauncher, @Autowired Job job,
@Autowired JobExplorer jobExplorer) throws Exception {
// given
JobParameters jobParameters = new JobParametersBuilder().addString("name", "testGetJobExecutionById")
.addLocalDateTime("runtime", LocalDateTime.now())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);

// when
JobExecution actual = jobExplorer.getJobExecution(jobExecution.getId());

// then
assertNotNull(actual);
assertNotNull(actual.getJobInstance());
assertEquals(jobExecution.getJobId(), actual.getJobId());
assertFalse(actual.getExecutionContext().isEmpty());
}

@Test
void testGetStepExecutionByIds(@Autowired JobLauncher jobLauncher, @Autowired Job job,
@Autowired JobExplorer jobExplorer) throws Exception {
// given
JobParameters jobParameters = new JobParametersBuilder().addString("name", "testGetStepExecutionByIds")
.addLocalDateTime("runtime", LocalDateTime.now())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
StepExecution stepExecution = jobExecution.getStepExecutions().stream().findFirst().orElseThrow();

// when
StepExecution actual = jobExplorer.getStepExecution(jobExecution.getId(), stepExecution.getId());

// then
assertNotNull(actual);
assertEquals(stepExecution.getId(), actual.getId());
assertFalse(actual.getExecutionContext().isEmpty());
}

}
Loading

0 comments on commit 82d9c15

Please sign in to comment.