Skip to content

Commit

Permalink
Add initial support for MongoDB as job repository
Browse files Browse the repository at this point in the history
Resolves #877
  • Loading branch information
fmbenhassine committed Oct 10, 2024
1 parent 6cc718a commit 846648b
Show file tree
Hide file tree
Showing 23 changed files with 1,842 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<jakarta.validation-api.version>3.1.0</jakarta.validation-api.version>
<jakarta.persistence-api.version>3.1.0</jakarta.persistence-api.version>
<neo4j-ogm-core.version>4.0.11</neo4j-ogm-core.version>
<mongodb-driver-sync.version>5.2.0</mongodb-driver-sync.version>
<mongodb-driver-sync.version>5.1.4</mongodb-driver-sync.version>
<junit-jupiter.version>5.11.1</junit-jupiter.version>

<!-- provided dependencies -->
Expand Down
28 changes: 28 additions & 0 deletions spring-batch-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,28 @@
<version>${aspectj.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>${spring-data-mongodb.version}</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongodb-driver-sync.version}</version>
<optional>true</optional>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down Expand Up @@ -128,6 +150,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.explore.support;

import org.springframework.batch.core.repository.dao.ExecutionContextDao;
import org.springframework.batch.core.repository.dao.JobExecutionDao;
import org.springframework.batch.core.repository.dao.JobInstanceDao;
import org.springframework.batch.core.repository.dao.StepExecutionDao;
import org.springframework.batch.core.repository.dao.MongoExecutionContextDao;
import org.springframework.batch.core.repository.dao.MongoJobExecutionDao;
import org.springframework.batch.core.repository.dao.MongoJobInstanceDao;
import org.springframework.batch.core.repository.dao.MongoStepExecutionDao;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.util.Assert;

/**
* @author Mahmoud Ben Hassine
* @since 5.2.0
*/
public class MongoJobExplorerFactoryBean extends AbstractJobExplorerFactoryBean implements InitializingBean {

private MongoOperations mongoOperations;

public void setMongoOperations(MongoOperations mongoOperations) {
this.mongoOperations = mongoOperations;
}

@Override
protected JobInstanceDao createJobInstanceDao() {
return new MongoJobInstanceDao(this.mongoOperations);
}

@Override
protected JobExecutionDao createJobExecutionDao() {
return new MongoJobExecutionDao(this.mongoOperations);
}

@Override
protected StepExecutionDao createStepExecutionDao() {
return new MongoStepExecutionDao(this.mongoOperations);
}

@Override
protected ExecutionContextDao createExecutionContextDao() {
return new MongoExecutionContextDao(this.mongoOperations);
}

@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
Assert.notNull(this.mongoOperations, "MongoOperations must not be null.");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.repository.dao;

import java.util.Collection;
import java.util.Map;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;

import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;

/**
* @author Mahmoud Ben Hassine
* @since 5.2.0
*/
public class MongoExecutionContextDao implements ExecutionContextDao {

private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION";

private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";

private final MongoOperations mongoOperations;

public MongoExecutionContextDao(MongoOperations mongoOperations) {
this.mongoOperations = mongoOperations;
}

@Override
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
org.springframework.batch.core.repository.persistence.JobExecution execution = this.mongoOperations.findById(
jobExecution.getId(), org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
if (execution == null) {
return new ExecutionContext();
}
return new ExecutionContext(execution.getExecutionContext().map());
}

@Override
public ExecutionContext getExecutionContext(StepExecution stepExecution) {
org.springframework.batch.core.repository.persistence.StepExecution execution = this.mongoOperations.findById(
stepExecution.getId(), org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
if (execution == null) {
return new ExecutionContext();
}
return new ExecutionContext(execution.getExecutionContext().map());
}

@Override
public void saveExecutionContext(JobExecution jobExecution) {
ExecutionContext executionContext = jobExecution.getExecutionContext();
Query query = query(where("_id").is(jobExecution.getId()));

Update update = Update.update("executionContext",
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
executionContext.isDirty()));
this.mongoOperations.updateFirst(query, update,
org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
}

@Override
public void saveExecutionContext(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution.getExecutionContext();
Query query = query(where("_id").is(stepExecution.getId()));

Update update = Update.update("executionContext",
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
executionContext.isDirty()));
this.mongoOperations.updateFirst(query, update,
org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);

}

@Override
public void saveExecutionContexts(Collection<StepExecution> stepExecutions) {
for (StepExecution stepExecution : stepExecutions) {
saveExecutionContext(stepExecution);
}
}

@Override
public void updateExecutionContext(JobExecution jobExecution) {
saveExecutionContext(jobExecution);
}

@Override
public void updateExecutionContext(StepExecution stepExecution) {
saveExecutionContext(stepExecution);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.repository.dao;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
import org.springframework.batch.core.repository.persistence.converter.JobInstanceConverter;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;

import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;

/**
* @author Mahmoud Ben Hassine
* @since 5.2.0
*/
public class MongoJobExecutionDao implements JobExecutionDao {

private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";

private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ";

private static final String JOB_INSTANCES_COLLECTION_NAME = "BATCH_JOB_INSTANCE";

private final MongoOperations mongoOperations;

private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter();

private final JobInstanceConverter jobInstanceConverter = new JobInstanceConverter();

private DataFieldMaxValueIncrementer jobExecutionIncrementer;

public MongoJobExecutionDao(MongoOperations mongoOperations) {
this.mongoOperations = mongoOperations;
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME);
}

public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {
this.jobExecutionIncrementer = jobExecutionIncrementer;
}

@Override
public void saveJobExecution(JobExecution jobExecution) {
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToSave = this.jobExecutionConverter
.fromJobExecution(jobExecution);
long jobExecutionId = this.jobExecutionIncrementer.nextLongValue();
jobExecutionToSave.setJobExecutionId(jobExecutionId);
this.mongoOperations.insert(jobExecutionToSave, JOB_EXECUTIONS_COLLECTION_NAME);
jobExecution.setId(jobExecutionId);
}

@Override
public void updateJobExecution(JobExecution jobExecution) {
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter
.fromJobExecution(jobExecution);
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME);
}

@Override
public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
return jobExecutions.stream()
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
.toList();
}

@Override
public JobExecution getLastJobExecution(JobInstance jobInstance) {
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
Sort.Order sortOrder = Sort.Order.desc("jobExecutionId");
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne(
query.with(Sort.by(sortOrder)),
org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
return jobExecution != null ? this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance) : null;
}

@Override
public Set<JobExecution> findRunningJobExecutions(String jobName) {
Query query = query(where("jobName").is(jobName));
List<JobInstance> jobInstances = this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class,
JOB_INSTANCES_COLLECTION_NAME)
.stream()
.map(this.jobInstanceConverter::toJobInstance)
.toList();
Set<JobExecution> runningJobExecutions = new HashSet<>();
for (JobInstance jobInstance : jobInstances) {
query = query(
where("jobInstanceId").is(jobInstance.getId()).and("status").in("STARTING", "STARTED", "STOPPING"));
this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME)
.stream()
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
.forEach(runningJobExecutions::add);
}
return runningJobExecutions;
}

@Override
public JobExecution getJobExecution(Long executionId) {
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findById(
executionId, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
if (jobExecution == null) {
return null;
}
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findById(
jobExecution.getJobInstanceId(),
org.springframework.batch.core.repository.persistence.JobInstance.class, JOB_INSTANCES_COLLECTION_NAME);
return this.jobExecutionConverter.toJobExecution(jobExecution,
this.jobInstanceConverter.toJobInstance(jobInstance));
}

@Override
public void synchronizeStatus(JobExecution jobExecution) {
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
Update update = Update.update("status", jobExecution.getStatus());
// TODO the contract mentions to update the version as well. Double check if this
// is needed as the version is not used in the tests following the call sites of
// synchronizeStatus
this.mongoOperations.updateFirst(query, update,
org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
}

}
Loading

0 comments on commit 846648b

Please sign in to comment.