diff --git a/README.md b/README.md index 2560721..7bcea0c 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,6 @@ Please refer to the [Enabling experimental features](#enabling-experimental-feat The currently available experimental features are the following: -* [MongoDB job repository](#mongodb-job-repository) -* [Composite item reader](#composite-item-reader) * [New chunk-oriented step implementation](#new-chunk-oriented-step-implementation) **Important note:** The versioning in this repository follows the [semantic versioning specification](https://semver.org/#spec-item-4). @@ -57,7 +55,7 @@ To import experimental features in your project, you need to add the following d org.springframework.batch spring-batch-experimental - 0.3.0 + 0.4.0 ``` @@ -69,94 +67,6 @@ To build the project and install it in your local Maven repository, use the foll $>./mvnw clean install ``` -# MongoDB job repository - -*Original issue:* https://github.com/spring-projects/spring-batch/issues/877 - -This feature introduces new implementations of `JobRepository` and `JobExplorer` for MongoDB. - -To test this feature, first import the `spring-batch-experimental` jar as described in the [Enabling experimental features](#enabling-experimental-features) section. - -Then, add the following dependencies as well: - -```xml - - - org.springframework.data - spring-data-mongodb - 4.2.0 - - - org.mongodb - mongodb-driver-sync - 4.11.1 - - -``` - -After that, you need to create the collections and sequences required by the MongoDB `JobRepository` implementation in your MongoDB server instance. -Similar to the DDL scripts provided for relational databases, the MongoShell scripts for MongoDB are provided in [schema-drop-mongodb.js](src/main/resources/org/springframework/batch/experimental/core/schema-drop-mongodb.js) and [schema-mongodb.js](src/main/resources/org/springframework/batch/experimental/core/schema-mongodb.js). - -Finally, you can define the MongoDB-based `JobRepository` and use it in your Spring Batch application as a regular `JobRepository`: - -```java -@Bean -public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransactionManager transactionManager) throws Exception { - MongoJobRepositoryFactoryBean jobRepositoryFactoryBean = new MongoJobRepositoryFactoryBean(); - jobRepositoryFactoryBean.setMongoOperations(mongoTemplate); - jobRepositoryFactoryBean.setTransactionManager(transactionManager); - jobRepositoryFactoryBean.afterPropertiesSet(); - return jobRepositoryFactoryBean.getObject(); -} -``` - -The implementation requires a [MongoTemplate](https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#mongo-template) to interact with MongoDB and a [MongoTransactionManager](https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#mongo.transactions.tx-manager) to drive Spring Batch transactions. -Those can be defined as Spring beans in the application context as described in Spring Data MongoDB documentation. - -You can find a complete example in the [MongoDBJobRepositoryIntegrationTests](./src/test/java/org/springframework/batch/experimental/core/repository/support/MongoDBJobRepositoryIntegrationTests.java) file. - -# Composite item reader - -*Original issue:* https://github.com/spring-projects/spring-batch/issues/757 - -This feature introduces a composite `ItemReader` implementation. Similar to the `CompositeItemProcessor` and `CompositeItemWriter`, the idea is to delegate reading to a list of item readers in order. -This is useful when there is a requirement to read data having the same format from different sources (files, databases, etc). Here is an example: - -```java -record Person(int id, String name) {} - -@Bean -public FlatFileItemReader fileItemReader() { - return new FlatFileItemReaderBuilder() - .name("fileItemReader") - .resource(new ClassPathResource("persons.csv")) - .delimited() - .names("id", "name") - .targetType(Person.class) - .build(); -} - -@Bean -public JdbcCursorItemReader databaseItemReader() { - String sql = "select * from persons"; - return new JdbcCursorItemReaderBuilder() - .name("databaseItemReader") - .dataSource(dataSource()) - .sql(sql) - .rowMapper(new DataClassRowMapper<>(Person.class)) - .build(); -} - -@Bean -public CompositeItemReader itemReader() { - return new CompositeItemReader<>(Arrays.asList(fileItemReader(), databaseItemReader())); -} -``` - -This snippet configures a `CompositeItemReader` with two delegates to read the same data from a flat file and a database table. - -You can find a complete example in the [CompositeItemReaderIntegrationTests](./src/test/java/org/springframework/batch/experimental/item/support/CompositeItemReaderIntegrationTests.java) file. - # New chunk-oriented step implementation *Original issue:* https://github.com/spring-projects/spring-batch/issues/3950 diff --git a/src/main/java/org/springframework/batch/experimental/core/explore/support/MongoJobExplorerFactoryBean.java b/src/main/java/org/springframework/batch/experimental/core/explore/support/MongoJobExplorerFactoryBean.java deleted file mode 100644 index e4c3996..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/explore/support/MongoJobExplorerFactoryBean.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.explore.support; - -import org.springframework.batch.core.explore.support.AbstractJobExplorerFactoryBean; -import org.springframework.batch.core.repository.dao.ExecutionContextDao; -import org.springframework.batch.core.repository.dao.JobExecutionDao; -import org.springframework.batch.core.repository.dao.JobInstanceDao; -import org.springframework.batch.core.repository.dao.StepExecutionDao; -import org.springframework.batch.experimental.core.repository.dao.MongoExecutionContextDao; -import org.springframework.batch.experimental.core.repository.dao.MongoJobExecutionDao; -import org.springframework.batch.experimental.core.repository.dao.MongoJobInstanceDao; -import org.springframework.batch.experimental.core.repository.dao.MongoStepExecutionDao; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.util.Assert; - -/** - * @author Mahmoud Ben Hassine - */ -public class MongoJobExplorerFactoryBean extends AbstractJobExplorerFactoryBean implements InitializingBean { - - private MongoOperations mongoOperations; - - public void setMongoOperations(MongoOperations mongoOperations) { - this.mongoOperations = mongoOperations; - } - - @Override - protected JobInstanceDao createJobInstanceDao() { - return new MongoJobInstanceDao(this.mongoOperations); - } - - @Override - protected JobExecutionDao createJobExecutionDao() { - return new MongoJobExecutionDao(this.mongoOperations); - } - - @Override - protected StepExecutionDao createStepExecutionDao() { - return new MongoStepExecutionDao(this.mongoOperations); - } - - @Override - protected ExecutionContextDao createExecutionContextDao() { - return new MongoExecutionContextDao(this.mongoOperations); - } - - @Override - public void afterPropertiesSet() throws Exception { - super.afterPropertiesSet(); - Assert.notNull(this.mongoOperations, "MongoOperations must not be null."); - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoExecutionContextDao.java b/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoExecutionContextDao.java deleted file mode 100644 index 71d222a..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoExecutionContextDao.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.dao; - -import java.util.Collection; -import java.util.Map; - -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.StepExecution; -import org.springframework.batch.core.repository.dao.ExecutionContextDao; -import org.springframework.batch.experimental.core.repository.persistence.converter.JobExecutionConverter; -import org.springframework.batch.item.ExecutionContext; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.core.query.Update; - -import static org.springframework.data.mongodb.core.query.Criteria.where; -import static org.springframework.data.mongodb.core.query.Query.query; - -/** - * @author Mahmoud Ben Hassine - */ -public class MongoExecutionContextDao implements ExecutionContextDao { - - private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION"; - - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; - - private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter(); - - private final MongoOperations mongoOperations; - - public MongoExecutionContextDao(MongoOperations mongoOperations) { - this.mongoOperations = mongoOperations; - } - - @Override - public ExecutionContext getExecutionContext(JobExecution jobExecution) { - org.springframework.batch.experimental.core.repository.persistence.JobExecution execution = this.mongoOperations.findById( - jobExecution.getId(), org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - if (execution == null) { - return new ExecutionContext(); - } - return new ExecutionContext(execution.getExecutionContext().map()); - } - - @Override - public ExecutionContext getExecutionContext(StepExecution stepExecution) { - org.springframework.batch.experimental.core.repository.persistence.StepExecution execution = this.mongoOperations.findById( - stepExecution.getId(), org.springframework.batch.experimental.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); - if (execution == null) { - return new ExecutionContext(); - } - return new ExecutionContext(execution.getExecutionContext().map()); - } - - @Override - public void saveExecutionContext(JobExecution jobExecution) { - ExecutionContext executionContext = jobExecution.getExecutionContext(); - Query query = query(where("_id").is(jobExecution.getId())); - - // TODO use ExecutionContext#toMap introduced in v5.1 - Map map = Map.ofEntries(executionContext.entrySet().toArray(new Map.Entry[0])); - - Update update = Update.update("executionContext", - new org.springframework.batch.experimental.core.repository.persistence.ExecutionContext(map, executionContext.isDirty())); - this.mongoOperations.updateFirst(query, update, - org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - } - - @Override - public void saveExecutionContext(StepExecution stepExecution) { - ExecutionContext executionContext = stepExecution.getExecutionContext(); - Query query = query(where("_id").is(stepExecution.getId())); - - // TODO use ExecutionContext#toMap introduced in v5.1 - Map map = Map.ofEntries(executionContext.entrySet().toArray(new Map.Entry[0])); - - Update update = Update.update("executionContext", - new org.springframework.batch.experimental.core.repository.persistence.ExecutionContext(map, executionContext.isDirty())); - this.mongoOperations.updateFirst(query, update, - org.springframework.batch.experimental.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); - - } - - @Override - public void saveExecutionContexts(Collection stepExecutions) { - for (StepExecution stepExecution : stepExecutions) { - saveExecutionContext(stepExecution); - } - } - - @Override - public void updateExecutionContext(JobExecution jobExecution) { - saveExecutionContext(jobExecution); - } - - @Override - public void updateExecutionContext(StepExecution stepExecution) { - saveExecutionContext(stepExecution); - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoJobExecutionDao.java b/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoJobExecutionDao.java deleted file mode 100644 index bab45e4..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoJobExecutionDao.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.dao; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; -import org.springframework.batch.core.repository.dao.JobExecutionDao; -import org.springframework.batch.experimental.core.repository.persistence.converter.JobExecutionConverter; -import org.springframework.batch.experimental.core.repository.persistence.converter.JobInstanceConverter; -import org.springframework.data.domain.Sort; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.core.query.Update; -import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; - -import static org.springframework.data.mongodb.core.query.Criteria.where; -import static org.springframework.data.mongodb.core.query.Query.query; - -/** - * @author Mahmoud Ben Hassine - */ -public class MongoJobExecutionDao implements JobExecutionDao { - - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; - private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ"; - private static final String JOB_INSTANCES_COLLECTION_NAME = "BATCH_JOB_INSTANCE"; - - private final MongoOperations mongoOperations; - - private DataFieldMaxValueIncrementer jobExecutionIncrementer; - - private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter(); - - private final JobInstanceConverter jobInstanceConverter = new JobInstanceConverter(); - - public MongoJobExecutionDao(MongoOperations mongoOperations) { - this.mongoOperations = mongoOperations; - this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME); - } - - public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { - this.jobExecutionIncrementer = jobExecutionIncrementer; - } - - @Override - public void saveJobExecution(JobExecution jobExecution) { - org.springframework.batch.experimental.core.repository.persistence.JobExecution jobExecutionToSave = this.jobExecutionConverter - .fromJobExecution(jobExecution); - long jobExecutionId = this.jobExecutionIncrementer.nextLongValue(); - jobExecutionToSave.setJobExecutionId(jobExecutionId); - this.mongoOperations.insert(jobExecutionToSave, JOB_EXECUTIONS_COLLECTION_NAME); - jobExecution.setId(jobExecutionId); - } - - @Override - public void updateJobExecution(JobExecution jobExecution) { - Query query = query(where("jobExecutionId").is(jobExecution.getId())); - org.springframework.batch.experimental.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter - .fromJobExecution(jobExecution); - this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME); - } - - @Override - public List findJobExecutions(JobInstance jobInstance) { - Query query = query(where("jobInstanceId").is(jobInstance.getId())); - List jobExecutions = this.mongoOperations - .find(query, org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - return jobExecutions.stream() - .map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance)) - .toList(); - } - - @Override - public JobExecution getLastJobExecution(JobInstance jobInstance) { - Query query = query(where("jobInstanceId").is(jobInstance.getId())); - Sort.Order sortOrder = Sort.Order.desc("jobExecutionId"); - org.springframework.batch.experimental.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne( - query.with(Sort.by(sortOrder)), - org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - return jobExecution != null ? this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance) : null; - } - - @Override - public Set findRunningJobExecutions(String jobName) { - Query query = query(where("jobName").is(jobName)); - List jobInstances = this.mongoOperations - .find(query, org.springframework.batch.experimental.core.repository.persistence.JobInstance.class, - JOB_INSTANCES_COLLECTION_NAME) - .stream() - .map(this.jobInstanceConverter::toJobInstance) - .toList(); - Set runningJobExecutions = new HashSet<>(); - for (JobInstance jobInstance : jobInstances) { - query = query( - where("jobInstanceId").is(jobInstance.getId()).and("status").in("STARTING", "STARTED", "STOPPING")); - this.mongoOperations - .find(query, org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME) - .stream() - .map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance)) - .forEach(runningJobExecutions::add); - } - return runningJobExecutions; - } - - @Override - public JobExecution getJobExecution(Long executionId) { - org.springframework.batch.experimental.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findById( - executionId, org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - if (jobExecution == null) { - return null; - } - org.springframework.batch.experimental.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findById( - jobExecution.getJobInstanceId(), - org.springframework.batch.experimental.core.repository.persistence.JobInstance.class, JOB_INSTANCES_COLLECTION_NAME); - return this.jobExecutionConverter.toJobExecution(jobExecution, - this.jobInstanceConverter.toJobInstance(jobInstance)); - } - - @Override - public void synchronizeStatus(JobExecution jobExecution) { - Query query = query(where("jobExecutionId").is(jobExecution.getId())); - Update update = Update.update("status", jobExecution.getStatus()); - // TODO the contract mentions to update the version as well. Double check if this - // is needed as the version is not used in the tests following the call sites of - // synchronizeStatus - this.mongoOperations.updateFirst(query, update, - org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoJobInstanceDao.java b/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoJobInstanceDao.java deleted file mode 100644 index ffa3ff3..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoJobInstanceDao.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.dao; - -import java.util.List; - -import org.springframework.batch.core.DefaultJobKeyGenerator; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; -import org.springframework.batch.core.JobKeyGenerator; -import org.springframework.batch.core.JobParameters; -import org.springframework.batch.core.launch.NoSuchJobException; -import org.springframework.batch.core.repository.dao.JobInstanceDao; -import org.springframework.batch.experimental.core.repository.persistence.converter.JobInstanceConverter; -import org.springframework.data.domain.Example; -import org.springframework.data.domain.Sort; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; -import org.springframework.util.Assert; - -import static org.springframework.data.mongodb.core.query.Criteria.where; -import static org.springframework.data.mongodb.core.query.Query.query; - -/** - * @author Mahmoud Ben Hassine - */ -public class MongoJobInstanceDao implements JobInstanceDao { - - private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE"; - private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ"; - - private final MongoOperations mongoOperations; - - private JobKeyGenerator jobKeyGenerator = new DefaultJobKeyGenerator(); - private DataFieldMaxValueIncrementer jobInstanceIncrementer; - - private final JobInstanceConverter jobInstanceConverter = new JobInstanceConverter(); - - public MongoJobInstanceDao(MongoOperations mongoOperations) { - Assert.notNull(mongoOperations, "mongoOperations must not be null."); - this.mongoOperations = mongoOperations; - this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME); - } - - public void setJobInstanceIncrementer(DataFieldMaxValueIncrementer jobInstanceIncrementer) { - this.jobInstanceIncrementer = jobInstanceIncrementer; - } - - @Override - public JobInstance createJobInstance(String jobName, JobParameters jobParameters) { - Assert.notNull(jobName, "Job name must not be null."); - Assert.notNull(jobParameters, "JobParameters must not be null."); - - Assert.state(getJobInstance(jobName, jobParameters) == null, "JobInstance must not already exist"); - - org.springframework.batch.experimental.core.repository.persistence.JobInstance jobInstanceToSave = new org.springframework.batch.experimental.core.repository.persistence.JobInstance(); - jobInstanceToSave.setJobName(jobName); - String key = this.jobKeyGenerator.generateKey(jobParameters); - jobInstanceToSave.setJobKey(key); - long instanceId = jobInstanceIncrementer.nextLongValue(); - jobInstanceToSave.setJobInstanceId(instanceId); - this.mongoOperations.insert(jobInstanceToSave, COLLECTION_NAME); - - JobInstance jobInstance = new JobInstance(instanceId, jobName); - jobInstance.incrementVersion(); // TODO is this needed? - return jobInstance; - } - - @Override - public JobInstance getJobInstance(String jobName, JobParameters jobParameters) { - String key = this.jobKeyGenerator.generateKey(jobParameters); - Query query = query(where("jobName").is(jobName).and("jobKey").is(key)); - org.springframework.batch.experimental.core.repository.persistence.JobInstance jobInstance = this.mongoOperations - .findOne(query, org.springframework.batch.experimental.core.repository.persistence.JobInstance.class, COLLECTION_NAME); - return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; - } - - @Override - public JobInstance getJobInstance(Long instanceId) { - Query query = query(where("jobInstanceId").is(instanceId)); - org.springframework.batch.experimental.core.repository.persistence.JobInstance jobInstance = this.mongoOperations - .findOne(query, org.springframework.batch.experimental.core.repository.persistence.JobInstance.class, COLLECTION_NAME); - return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; - } - - @Override - public JobInstance getJobInstance(JobExecution jobExecution) { - return getJobInstance(jobExecution.getJobId()); - } - - @Override - public List getJobInstances(String jobName, int start, int count) { - Query query = query(where("jobName").is(jobName)); - Sort.Order sortOrder = Sort.Order.desc("jobInstanceId"); - List jobInstances = this.mongoOperations - .find(query.with(Sort.by(sortOrder)), - org.springframework.batch.experimental.core.repository.persistence.JobInstance.class, COLLECTION_NAME) - .stream() - .toList(); - return jobInstances.subList(start, jobInstances.size()) - .stream() - .map(this.jobInstanceConverter::toJobInstance) - .limit(count) - .toList(); - } - - @Override - public JobInstance getLastJobInstance(String jobName) { - Query query = query(where("jobName").is(jobName)); - Sort.Order sortOrder = Sort.Order.desc("jobInstanceId"); - org.springframework.batch.experimental.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne( - query.with(Sort.by(sortOrder)), org.springframework.batch.experimental.core.repository.persistence.JobInstance.class, - COLLECTION_NAME); - return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; - } - - @Override - public List getJobNames() { - return this.mongoOperations - .findAll(org.springframework.batch.experimental.core.repository.persistence.JobInstance.class, COLLECTION_NAME) - .stream() - .map(org.springframework.batch.experimental.core.repository.persistence.JobInstance::getJobName) - .toList(); - } - - @Override - public List findJobInstancesByName(String jobName, int start, int count) { - Query query = query(where("jobName").alike(Example.of(jobName))); - Sort.Order sortOrder = Sort.Order.desc("jobInstanceId"); - List jobInstances = this.mongoOperations - .find(query.with(Sort.by(sortOrder)), - org.springframework.batch.experimental.core.repository.persistence.JobInstance.class, COLLECTION_NAME) - .stream() - .toList(); - return jobInstances.subList(start, jobInstances.size()) - .stream() - .map(this.jobInstanceConverter::toJobInstance) - .limit(count) - .toList(); - } - - @Override - public long getJobInstanceCount(String jobName) throws NoSuchJobException { - if (!getJobNames().contains(jobName)) { - throw new NoSuchJobException("Job not found " + jobName); - } - Query query = query(where("jobName").is(jobName)); - return this.mongoOperations.count(query, COLLECTION_NAME); - } - - public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) { - this.jobKeyGenerator = jobKeyGenerator; - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoSequenceIncrementer.java b/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoSequenceIncrementer.java deleted file mode 100644 index 3510413..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoSequenceIncrementer.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.dao; - -import org.springframework.dao.DataAccessException; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.core.query.Update; -import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; - -import static org.springframework.data.mongodb.core.query.Criteria.where; -import static org.springframework.data.mongodb.core.query.Query.query; - -// Based on https://www.mongodb.com/blog/post/generating-globally-unique-identifiers-for-use-with-mongodb -// Section: Use a single counter document to generate unique identifiers one at a time - -/** - * @author Mahmoud Ben Hassine - */ -public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer { - - private final MongoOperations mongoTemplate; - - private final String sequenceName; - - public MongoSequenceIncrementer(MongoOperations mongoTemplate, String sequenceName) { - this.mongoTemplate = mongoTemplate; - this.sequenceName = sequenceName; - } - - @Override - public long nextLongValue() throws DataAccessException { - // TODO optimize - MongoSequence sequence = mongoTemplate.findOne(new Query(), MongoSequence.class, sequenceName); - Query query = query(where("_id").is(sequence.getId())); - Update update = new Update().inc("count", 1); - // The following does not return the modified document - mongoTemplate.findAndModify(query, update, MongoSequence.class, sequenceName); - return mongoTemplate.findOne(new Query(), MongoSequence.class, sequenceName).getCount(); - } - - @Override - public int nextIntValue() throws DataAccessException { - throw new UnsupportedOperationException(); - } - - @Override - public String nextStringValue() throws DataAccessException { - throw new UnsupportedOperationException(); - } - - public static final class MongoSequence { - - private String id; - - private long count; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public long getCount() { - return count; - } - - public void setCount(long count) { - this.count = count; - } - - @Override - public String toString() { - return "MongoSequence{" + - "id='" + id + '\'' + - ", count=" + count + - '}'; - } - } -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoStepExecutionDao.java b/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoStepExecutionDao.java deleted file mode 100644 index 25eeaf8..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/dao/MongoStepExecutionDao.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.dao; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; - -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; -import org.springframework.batch.core.StepExecution; -import org.springframework.batch.core.repository.dao.StepExecutionDao; -import org.springframework.batch.experimental.core.repository.persistence.converter.JobExecutionConverter; -import org.springframework.batch.experimental.core.repository.persistence.converter.StepExecutionConverter; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; - -import static org.springframework.data.mongodb.core.query.Criteria.where; -import static org.springframework.data.mongodb.core.query.Query.query; - -/** - * @author Mahmoud Ben Hassine - */ -public class MongoStepExecutionDao implements StepExecutionDao { - - private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION"; - private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "BATCH_STEP_EXECUTION_SEQ"; - - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; - - private final StepExecutionConverter stepExecutionConverter = new StepExecutionConverter(); - - private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter(); - - private final MongoOperations mongoOperations; - - private DataFieldMaxValueIncrementer stepExecutionIncrementer; - - public MongoStepExecutionDao(MongoOperations mongoOperations) { - this.mongoOperations = mongoOperations; - this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, STEP_EXECUTIONS_SEQUENCE_NAME); - } - - public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) { - this.stepExecutionIncrementer = stepExecutionIncrementer; - } - - @Override - public void saveStepExecution(StepExecution stepExecution) { - org.springframework.batch.experimental.core.repository.persistence.StepExecution stepExecutionToSave = this.stepExecutionConverter - .fromStepExecution(stepExecution); - long stepExecutionId = this.stepExecutionIncrementer.nextLongValue(); - stepExecutionToSave.setStepExecutionId(stepExecutionId); - this.mongoOperations.insert(stepExecutionToSave, STEP_EXECUTIONS_COLLECTION_NAME); - stepExecution.setId(stepExecutionId); - } - - @Override - public void saveStepExecutions(Collection stepExecutions) { - for (StepExecution stepExecution : stepExecutions) { - saveStepExecution(stepExecution); - } - } - - @Override - public void updateStepExecution(StepExecution stepExecution) { - Query query = query(where("stepExecutionId").is(stepExecution.getId())); - org.springframework.batch.experimental.core.repository.persistence.StepExecution stepExecutionToUpdate = this.stepExecutionConverter - .fromStepExecution(stepExecution); - this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, STEP_EXECUTIONS_COLLECTION_NAME); - } - - @Override - public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) { - org.springframework.batch.experimental.core.repository.persistence.StepExecution stepExecution = this.mongoOperations - .findById(stepExecutionId, org.springframework.batch.experimental.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); - return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null; - } - - @Override - public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { - // TODO optimize the query - // get all step executions - List stepExecutions = new ArrayList<>(); - Query query = query(where("jobInstanceId").is(jobInstance.getId())); - List jobExecutions = this.mongoOperations - .find(query, org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - for (org.springframework.batch.experimental.core.repository.persistence.JobExecution jobExecution : jobExecutions) { - stepExecutions.addAll(jobExecution.getStepExecutions()); - } - // sort step executions by creation date then id (see contract) and return the - // first one - Optional lastStepExecution = stepExecutions - .stream() - .min(Comparator - .comparing(org.springframework.batch.experimental.core.repository.persistence.StepExecution::getCreateTime) - .thenComparing(org.springframework.batch.experimental.core.repository.persistence.StepExecution::getId)); - if (lastStepExecution.isPresent()) { - org.springframework.batch.experimental.core.repository.persistence.StepExecution stepExecution = lastStepExecution.get(); - JobExecution jobExecution = this.jobExecutionConverter.toJobExecution(jobExecutions.stream() - .filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId())) - .findFirst() - .get(), jobInstance); - return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution); - } - else { - return null; - } - } - - @Override - public void addStepExecutions(JobExecution jobExecution) { - Query query = query(where("jobExecutionId").is(jobExecution.getId())); - List stepExecutions = this.mongoOperations - .find(query, org.springframework.batch.experimental.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME) - .stream() - .map(stepExecution -> this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution)) - .toList(); - jobExecution.addStepExecutions(stepExecutions); - } - - @Override - public long countStepExecutions(JobInstance jobInstance, String stepName) { - long count = 0; - // TODO optimize the count query - Query query = query(where("jobInstanceId").is(jobInstance.getId())); - List jobExecutions = this.mongoOperations - .find(query, org.springframework.batch.experimental.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - for (org.springframework.batch.experimental.core.repository.persistence.JobExecution jobExecution : jobExecutions) { - List stepExecutions = jobExecution - .getStepExecutions(); - for (org.springframework.batch.experimental.core.repository.persistence.StepExecution stepExecution : stepExecutions) { - if (stepExecution.getName().equals(stepName)) { - count++; - } - } - } - return count; - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/ExecutionContext.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/ExecutionContext.java deleted file mode 100644 index 283cc2f..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/ExecutionContext.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence; - -import java.util.Map; - -/** - * @author Mahmoud Ben Hassine - */ -public record ExecutionContext(Map map, boolean dirty) { -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/ExitStatus.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/ExitStatus.java deleted file mode 100644 index 984655e..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/ExitStatus.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence; - -/** - * @author Mahmoud Ben Hassine - */ -public record ExitStatus(String exitCode, String exitDescription) { -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobExecution.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobExecution.java deleted file mode 100644 index 64656bb..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobExecution.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence; - -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.springframework.batch.core.BatchStatus; - -/** - * @author Mahmoud Ben Hassine - */ -public class JobExecution { - - private String id; - - private Long jobExecutionId; - - private Long jobInstanceId; - - private Map> jobParameters = new HashMap<>(); - - private List stepExecutions = new ArrayList<>(); - - private BatchStatus status; - - private LocalDateTime startTime; - - private LocalDateTime createTime; - - private LocalDateTime endTime; - - private LocalDateTime lastUpdated; - - private ExitStatus exitStatus; - - private ExecutionContext executionContext; - - public JobExecution() { - } - - public String getId() { - return id; - } - - public Long getJobInstanceId() { - return jobInstanceId; - } - - public void setJobInstanceId(Long jobInstanceId) { - this.jobInstanceId = jobInstanceId; - } - - public Long getJobExecutionId() { - return jobExecutionId; - } - - public void setJobExecutionId(Long jobExecutionId) { - this.jobExecutionId = jobExecutionId; - } - - public Map> getJobParameters() { - return jobParameters; - } - - public void setJobParameters(Map> jobParameters) { - this.jobParameters = jobParameters; - } - - public List getStepExecutions() { - return stepExecutions; - } - - public void setStepExecutions(List stepExecutions) { - this.stepExecutions = stepExecutions; - } - - public BatchStatus getStatus() { - return status; - } - - public void setStatus(BatchStatus status) { - this.status = status; - } - - public LocalDateTime getStartTime() { - return startTime; - } - - public void setStartTime(LocalDateTime startTime) { - this.startTime = startTime; - } - - public LocalDateTime getCreateTime() { - return createTime; - } - - public void setCreateTime(LocalDateTime createTime) { - this.createTime = createTime; - } - - public LocalDateTime getEndTime() { - return endTime; - } - - public void setEndTime(LocalDateTime endTime) { - this.endTime = endTime; - } - - public LocalDateTime getLastUpdated() { - return lastUpdated; - } - - public void setLastUpdated(LocalDateTime lastUpdated) { - this.lastUpdated = lastUpdated; - } - - public ExitStatus getExitStatus() { - return exitStatus; - } - - public void setExitStatus(ExitStatus exitStatus) { - this.exitStatus = exitStatus; - } - - public ExecutionContext getExecutionContext() { - return executionContext; - } - - public void setExecutionContext(ExecutionContext executionContext) { - this.executionContext = executionContext; - } - - @Override - public String toString() { - return "JobExecution{" + - "id='" + id + '\'' + - ", jobExecutionId=" + jobExecutionId + - ", jobInstanceId=" + jobInstanceId + - ", jobParameters=" + jobParameters + - ", stepExecutions=" + stepExecutions + - ", status=" + status + - ", startTime=" + startTime + - ", createTime=" + createTime + - ", endTime=" + endTime + - ", lastUpdated=" + lastUpdated + - ", exitStatus=" + exitStatus + - ", executionContext=" + executionContext + - '}'; - } -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobInstance.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobInstance.java deleted file mode 100644 index 400dd46..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobInstance.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence; - -/** - * @author Mahmoud Ben Hassine - */ -public class JobInstance { - - private String id; - - private Long jobInstanceId; - - private String jobName; - - private String jobKey; - - public JobInstance() { - } - - public String getId() { - return id; - } - - public Long getJobInstanceId() { - return jobInstanceId; - } - - public void setJobInstanceId(Long jobInstanceId) { - this.jobInstanceId = jobInstanceId; - } - - public String getJobName() { - return jobName; - } - - public void setJobName(String jobName) { - this.jobName = jobName; - } - - public String getJobKey() { - return jobKey; - } - - public void setJobKey(String jobKey) { - this.jobKey = jobKey; - } - - @Override - public String toString() { - return "JobInstance{" + - "id='" + id + '\'' + - ", jobInstanceId=" + jobInstanceId + - ", jobName='" + jobName + '\'' + - ", jobKey='" + jobKey + '\'' + - '}'; - } -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobParameter.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobParameter.java deleted file mode 100644 index 0e8d506..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/JobParameter.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence; - -/** - * @author Mahmoud Ben Hassine - */ -public record JobParameter(T value, String type, boolean identifying) { -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/StepExecution.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/StepExecution.java deleted file mode 100644 index a323ffd..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/StepExecution.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence; - -import java.time.LocalDateTime; - -import org.springframework.batch.core.BatchStatus; - -/** - * @author Mahmoud Ben Hassine - */ -public class StepExecution { - - private String id; - - private Long stepExecutionId; - - private Long jobExecutionId; - - private String name; - - private BatchStatus status; - - private long readCount; - - private long writeCount; - - private long commitCount; - - private long rollbackCount; - - private long readSkipCount; - - private long processSkipCount; - - private long writeSkipCount; - - private long filterCount; - - private LocalDateTime startTime; - - private LocalDateTime createTime; - - private LocalDateTime endTime; - - private LocalDateTime lastUpdated; - - private ExecutionContext executionContext; - - private ExitStatus exitStatus; - - private boolean terminateOnly; - - public StepExecution() { - } - - public String getId() { - return id; - } - - public Long getStepExecutionId() { - return stepExecutionId; - } - - public void setStepExecutionId(Long stepExecutionId) { - this.stepExecutionId = stepExecutionId; - } - - public Long getJobExecutionId() { - return jobExecutionId; - } - - public void setJobExecutionId(Long jobExecutionId) { - this.jobExecutionId = jobExecutionId; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public BatchStatus getStatus() { - return status; - } - - public void setStatus(BatchStatus status) { - this.status = status; - } - - public long getReadCount() { - return readCount; - } - - public void setReadCount(long readCount) { - this.readCount = readCount; - } - - public long getWriteCount() { - return writeCount; - } - - public void setWriteCount(long writeCount) { - this.writeCount = writeCount; - } - - public long getCommitCount() { - return commitCount; - } - - public void setCommitCount(long commitCount) { - this.commitCount = commitCount; - } - - public long getRollbackCount() { - return rollbackCount; - } - - public void setRollbackCount(long rollbackCount) { - this.rollbackCount = rollbackCount; - } - - public long getReadSkipCount() { - return readSkipCount; - } - - public void setReadSkipCount(long readSkipCount) { - this.readSkipCount = readSkipCount; - } - - public long getProcessSkipCount() { - return processSkipCount; - } - - public void setProcessSkipCount(long processSkipCount) { - this.processSkipCount = processSkipCount; - } - - public long getWriteSkipCount() { - return writeSkipCount; - } - - public void setWriteSkipCount(long writeSkipCount) { - this.writeSkipCount = writeSkipCount; - } - - public long getFilterCount() { - return filterCount; - } - - public void setFilterCount(long filterCount) { - this.filterCount = filterCount; - } - - public LocalDateTime getStartTime() { - return startTime; - } - - public void setStartTime(LocalDateTime startTime) { - this.startTime = startTime; - } - - public LocalDateTime getCreateTime() { - return createTime; - } - - public void setCreateTime(LocalDateTime createTime) { - this.createTime = createTime; - } - - public LocalDateTime getEndTime() { - return endTime; - } - - public void setEndTime(LocalDateTime endTime) { - this.endTime = endTime; - } - - public LocalDateTime getLastUpdated() { - return lastUpdated; - } - - public void setLastUpdated(LocalDateTime lastUpdated) { - this.lastUpdated = lastUpdated; - } - - public ExecutionContext getExecutionContext() { - return executionContext; - } - - public void setExecutionContext(ExecutionContext executionContext) { - this.executionContext = executionContext; - } - - public ExitStatus getExitStatus() { - return exitStatus; - } - - public void setExitStatus(ExitStatus exitStatus) { - this.exitStatus = exitStatus; - } - - public boolean isTerminateOnly() { - return terminateOnly; - } - - public void setTerminateOnly(boolean terminateOnly) { - this.terminateOnly = terminateOnly; - } - - @Override - public String toString() { - return "StepExecution{" + - "id='" + id + '\'' + - ", stepExecutionId=" + stepExecutionId + - ", jobExecutionId='" + jobExecutionId + '\'' + - ", name='" + name + '\'' + - ", status=" + status + - ", readCount=" + readCount + - ", writeCount=" + writeCount + - ", commitCount=" + commitCount + - ", rollbackCount=" + rollbackCount + - ", readSkipCount=" + readSkipCount + - ", processSkipCount=" + processSkipCount + - ", writeSkipCount=" + writeSkipCount + - ", filterCount=" + filterCount + - ", startTime=" + startTime + - ", createTime=" + createTime + - ", endTime=" + endTime + - ", lastUpdated=" + lastUpdated + - ", executionContext=" + executionContext + - ", exitStatus=" + exitStatus + - ", terminateOnly=" + terminateOnly + - '}'; - } -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobExecutionConverter.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobExecutionConverter.java deleted file mode 100644 index 4a3eeed..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobExecutionConverter.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence.converter; - -import java.util.HashMap; -import java.util.Map; - -import org.springframework.batch.core.JobInstance; -import org.springframework.batch.core.JobParameters; -import org.springframework.batch.experimental.core.repository.persistence.ExecutionContext; -import org.springframework.batch.experimental.core.repository.persistence.ExitStatus; -import org.springframework.batch.experimental.core.repository.persistence.JobExecution; -import org.springframework.batch.experimental.core.repository.persistence.JobParameter; - -/** - * @author Mahmoud Ben Hassine - */ -public class JobExecutionConverter { - - private final JobParameterConverter jobParameterConverter = new JobParameterConverter(); - - private final StepExecutionConverter stepExecutionConverter = new StepExecutionConverter(); - - public org.springframework.batch.core.JobExecution toJobExecution(JobExecution source, JobInstance jobInstance) { - Map> parameterMap = new HashMap<>(); - source.getJobParameters() - .forEach((key, value) -> parameterMap.put(key, this.jobParameterConverter.toJobParameter(value))); - org.springframework.batch.core.JobExecution jobExecution = new org.springframework.batch.core.JobExecution( - jobInstance, source.getJobExecutionId(), new JobParameters(parameterMap)); - jobExecution.addStepExecutions(source.getStepExecutions() - .stream() - .map(stepExecution -> this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution)) - .toList()); - jobExecution.setStatus(source.getStatus()); - jobExecution.setStartTime(source.getStartTime()); - jobExecution.setCreateTime(source.getCreateTime()); - jobExecution.setEndTime(source.getEndTime()); - jobExecution.setLastUpdated(source.getLastUpdated()); - jobExecution.setExitStatus(new org.springframework.batch.core.ExitStatus(source.getExitStatus().exitCode(), - source.getExitStatus().exitDescription())); - jobExecution.setExecutionContext( - new org.springframework.batch.item.ExecutionContext(source.getExecutionContext().map())); - return jobExecution; - } - - public JobExecution fromJobExecution(org.springframework.batch.core.JobExecution source) { - JobExecution jobExecution = new JobExecution(); - jobExecution.setJobExecutionId(source.getId()); - jobExecution.setJobInstanceId(source.getJobInstance().getInstanceId()); - Map> parameterMap = new HashMap<>(); - source.getJobParameters() - .getParameters() - .forEach((key, value) -> parameterMap.put(key, this.jobParameterConverter.fromJobParameter(value))); - jobExecution.setJobParameters(parameterMap); - jobExecution.setStepExecutions( - source.getStepExecutions().stream().map(this.stepExecutionConverter::fromStepExecution).toList()); - jobExecution.setStatus(source.getStatus()); - jobExecution.setStartTime(source.getStartTime()); - jobExecution.setCreateTime(source.getCreateTime()); - jobExecution.setEndTime(source.getEndTime()); - jobExecution.setLastUpdated(source.getLastUpdated()); - jobExecution.setExitStatus( - new ExitStatus(source.getExitStatus().getExitCode(), source.getExitStatus().getExitDescription())); - // TODO use ExecutionContext#toMap introduced in v5.1 - Map map = Map.ofEntries(source.getExecutionContext().entrySet().toArray(new Map.Entry[0])); - jobExecution.setExecutionContext(new ExecutionContext(map, source.getExecutionContext().isDirty())); - return jobExecution; - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobInstanceConverter.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobInstanceConverter.java deleted file mode 100644 index 032b3e5..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobInstanceConverter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence.converter; - -import org.springframework.batch.experimental.core.repository.persistence.JobInstance; - -/** - * @author Mahmoud Ben Hassine - */ -public class JobInstanceConverter { - - public org.springframework.batch.core.JobInstance toJobInstance(JobInstance source) { - return new org.springframework.batch.core.JobInstance(source.getJobInstanceId(), source.getJobName()); - } - - public JobInstance fromJobInstance(org.springframework.batch.core.JobInstance source) { - JobInstance jobInstance = new JobInstance(); - jobInstance.setJobName(source.getJobName()); - jobInstance.setJobInstanceId(source.getInstanceId()); - return jobInstance; - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobParameterConverter.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobParameterConverter.java deleted file mode 100644 index 82337c2..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/JobParameterConverter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence.converter; - -import org.springframework.batch.experimental.core.repository.persistence.JobParameter; - -/** - * @author Mahmoud Ben Hassine - */ -public class JobParameterConverter { - - public org.springframework.batch.core.JobParameter toJobParameter(JobParameter source) { - try { - return new org.springframework.batch.core.JobParameter<>(source.value(), - (Class) Class.forName(source.type()), source.identifying()); - } - catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - - public JobParameter fromJobParameter(org.springframework.batch.core.JobParameter source) { - return new JobParameter<>(source.getValue(), source.getType().getName(), source.isIdentifying()); - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/StepExecutionConverter.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/StepExecutionConverter.java deleted file mode 100644 index 3b3475c..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/converter/StepExecutionConverter.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.persistence.converter; - -import java.util.Map; - -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.experimental.core.repository.persistence.ExecutionContext; -import org.springframework.batch.experimental.core.repository.persistence.ExitStatus; -import org.springframework.batch.experimental.core.repository.persistence.StepExecution; - -/** - * @author Mahmoud Ben Hassine - */ -public class StepExecutionConverter { - - public org.springframework.batch.core.StepExecution toStepExecution(StepExecution source, - JobExecution jobExecution) { - org.springframework.batch.core.StepExecution stepExecution = new org.springframework.batch.core.StepExecution( - source.getName(), jobExecution, source.getStepExecutionId()); - stepExecution.setStatus(source.getStatus()); - stepExecution.setReadCount(source.getReadCount()); - stepExecution.setWriteCount(source.getWriteCount()); - stepExecution.setCommitCount(source.getCommitCount()); - stepExecution.setRollbackCount(source.getRollbackCount()); - stepExecution.setReadSkipCount(source.getReadSkipCount()); - stepExecution.setProcessSkipCount(source.getProcessSkipCount()); - stepExecution.setWriteSkipCount(source.getWriteSkipCount()); - stepExecution.setFilterCount(source.getFilterCount()); - stepExecution.setStartTime(source.getStartTime()); - stepExecution.setCreateTime(source.getCreateTime()); - stepExecution.setEndTime(source.getEndTime()); - stepExecution.setLastUpdated(source.getLastUpdated()); - stepExecution.setExitStatus(new org.springframework.batch.core.ExitStatus(source.getExitStatus().exitCode(), - source.getExitStatus().exitDescription())); - stepExecution.setExecutionContext( - new org.springframework.batch.item.ExecutionContext(source.getExecutionContext().map())); - if (source.isTerminateOnly()) { - stepExecution.setTerminateOnly(); - } - return stepExecution; - } - - public StepExecution fromStepExecution(org.springframework.batch.core.StepExecution source) { - StepExecution stepExecution = new StepExecution(); - stepExecution.setStepExecutionId(source.getId()); - stepExecution.setJobExecutionId(source.getJobExecutionId()); - stepExecution.setName(source.getStepName()); - stepExecution.setJobExecutionId(source.getJobExecutionId()); - stepExecution.setStatus(source.getStatus()); - stepExecution.setReadCount(source.getReadCount()); - stepExecution.setWriteCount(source.getWriteCount()); - stepExecution.setCommitCount(source.getCommitCount()); - stepExecution.setRollbackCount(source.getRollbackCount()); - stepExecution.setReadSkipCount(source.getReadSkipCount()); - stepExecution.setProcessSkipCount(source.getProcessSkipCount()); - stepExecution.setWriteSkipCount(source.getWriteSkipCount()); - stepExecution.setFilterCount(source.getFilterCount()); - stepExecution.setStartTime(source.getStartTime()); - stepExecution.setCreateTime(source.getCreateTime()); - stepExecution.setEndTime(source.getEndTime()); - stepExecution.setLastUpdated(source.getLastUpdated()); - stepExecution.setExitStatus( - new ExitStatus(source.getExitStatus().getExitCode(), source.getExitStatus().getExitDescription())); - // TODO use ExecutionContext#toMap introduced in v5.1 - Map map = Map.ofEntries(source.getExecutionContext().entrySet().toArray(new Map.Entry[0])); - stepExecution.setExecutionContext(new ExecutionContext(map, source.getExecutionContext().isDirty())); - stepExecution.setTerminateOnly(source.isTerminateOnly()); - return stepExecution; - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/package-info.java b/src/main/java/org/springframework/batch/experimental/core/repository/persistence/package-info.java deleted file mode 100644 index d9f05f2..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/persistence/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * This package contains the classes of the persistence model. - */ -package org.springframework.batch.experimental.core.repository.persistence; diff --git a/src/main/java/org/springframework/batch/experimental/core/repository/support/MongoJobRepositoryFactoryBean.java b/src/main/java/org/springframework/batch/experimental/core/repository/support/MongoJobRepositoryFactoryBean.java deleted file mode 100644 index 14a90d3..0000000 --- a/src/main/java/org/springframework/batch/experimental/core/repository/support/MongoJobRepositoryFactoryBean.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.support; - -import org.springframework.batch.core.repository.dao.ExecutionContextDao; -import org.springframework.batch.core.repository.dao.JobExecutionDao; -import org.springframework.batch.core.repository.dao.JobInstanceDao; -import org.springframework.batch.core.repository.dao.StepExecutionDao; -import org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean; -import org.springframework.batch.experimental.core.repository.dao.MongoExecutionContextDao; -import org.springframework.batch.experimental.core.repository.dao.MongoJobExecutionDao; -import org.springframework.batch.experimental.core.repository.dao.MongoJobInstanceDao; -import org.springframework.batch.experimental.core.repository.dao.MongoStepExecutionDao; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.util.Assert; - -public class MongoJobRepositoryFactoryBean extends AbstractJobRepositoryFactoryBean implements InitializingBean { - - private MongoOperations mongoOperations; - - public void setMongoOperations(MongoOperations mongoOperations) { - this.mongoOperations = mongoOperations; - } - - @Override - protected JobInstanceDao createJobInstanceDao() { - return new MongoJobInstanceDao(this.mongoOperations); - } - - @Override - protected JobExecutionDao createJobExecutionDao() { - return new MongoJobExecutionDao(this.mongoOperations); - } - - @Override - protected StepExecutionDao createStepExecutionDao() { - return new MongoStepExecutionDao(this.mongoOperations); - } - - @Override - protected ExecutionContextDao createExecutionContextDao() { - return new MongoExecutionContextDao(this.mongoOperations); - } - - @Override - public void afterPropertiesSet() throws Exception { - super.afterPropertiesSet(); - Assert.notNull(this.mongoOperations, "MongoOperations must not be null."); - } - -} diff --git a/src/main/java/org/springframework/batch/experimental/item/support/CompositeItemReader.java b/src/main/java/org/springframework/batch/experimental/item/support/CompositeItemReader.java deleted file mode 100644 index cd9d1bd..0000000 --- a/src/main/java/org/springframework/batch/experimental/item/support/CompositeItemReader.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.item.support; - -import java.util.Iterator; -import java.util.List; - -import org.springframework.batch.item.ExecutionContext; -import org.springframework.batch.item.ItemStreamException; -import org.springframework.batch.item.ItemStreamReader; - -/** - * Composite reader that delegates reading to a list of {@link ItemStreamReader}s. - * This implementation is not thread-safe and not restartable. - * - * @author Mahmoud Ben Hassine - * @param type of objects to read - */ -public class CompositeItemReader implements ItemStreamReader { - - private final List> delegates; - - private final Iterator> delegatesIterator; - - private ItemStreamReader currentDelegate; - - public CompositeItemReader(List> delegates) { - this.delegates = delegates; - this.delegatesIterator = this.delegates.iterator(); - this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null; - } - - @Override - public void open(ExecutionContext executionContext) throws ItemStreamException { - for (ItemStreamReader delegate : delegates) { - delegate.open(executionContext); - } - } - - @Override - public T read() throws Exception { - if (this.currentDelegate == null) { - return null; - } - T item = currentDelegate.read(); - if (item == null) { - currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null; - return read(); - } - return item; - } - - @Override - public void close() throws ItemStreamException { - for (ItemStreamReader delegate : delegates) { - delegate.close(); - } - } -} \ No newline at end of file diff --git a/src/main/resources/org/springframework/batch/experimental/core/schema-drop-mongodb.js b/src/main/resources/org/springframework/batch/experimental/core/schema-drop-mongodb.js deleted file mode 100644 index 1230dab..0000000 --- a/src/main/resources/org/springframework/batch/experimental/core/schema-drop-mongodb.js +++ /dev/null @@ -1,7 +0,0 @@ -// to execute in MongoShell and update database name `db.` as needed -db.getCollection("BATCH_JOB_INSTANCE").drop(); -db.getCollection("BATCH_JOB_EXECUTION").drop(); -db.getCollection("BATCH_STEP_EXECUTION").drop(); -db.getCollection("BATCH_JOB_INSTANCE_SEQ").drop(); -db.getCollection("BATCH_JOB_EXECUTION_SEQ").drop(); -db.getCollection("BATCH_STEP_EXECUTION_SEQ").drop(); diff --git a/src/main/resources/org/springframework/batch/experimental/core/schema-mongodb.js b/src/main/resources/org/springframework/batch/experimental/core/schema-mongodb.js deleted file mode 100644 index 599b00e..0000000 --- a/src/main/resources/org/springframework/batch/experimental/core/schema-mongodb.js +++ /dev/null @@ -1,10 +0,0 @@ -// to execute in MongoShell and update database name `db.` as needed -db.createCollection("BATCH_JOB_INSTANCE"); -db.createCollection("BATCH_JOB_EXECUTION"); -db.createCollection("BATCH_STEP_EXECUTION"); -db.createCollection("BATCH_JOB_INSTANCE_SEQ"); -db.createCollection("BATCH_JOB_EXECUTION_SEQ"); -db.createCollection("BATCH_STEP_EXECUTION_SEQ"); -db.getCollection("BATCH_JOB_INSTANCE_SEQ").insertOne({count : 0}); -db.getCollection("BATCH_JOB_EXECUTION_SEQ").insertOne({count : 0}); -db.getCollection("BATCH_STEP_EXECUTION_SEQ").insertOne({count : 0}); diff --git a/src/test/java/org/springframework/batch/experimental/core/repository/support/MongoDBJobRepositoryIntegrationTests.java b/src/test/java/org/springframework/batch/experimental/core/repository/support/MongoDBJobRepositoryIntegrationTests.java deleted file mode 100644 index c8fc1f0..0000000 --- a/src/test/java/org/springframework/batch/experimental/core/repository/support/MongoDBJobRepositoryIntegrationTests.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.core.repository.support; - -import java.time.LocalDateTime; - -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; -import com.mongodb.client.MongoCollection; -import org.bson.Document; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.testcontainers.containers.MongoDBContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -import org.springframework.batch.core.ExitStatus; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobParameters; -import org.springframework.batch.core.JobParametersBuilder; -import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; -import org.springframework.batch.core.explore.JobExplorer; -import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.launch.JobLauncher; -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.core.step.builder.StepBuilder; -import org.springframework.batch.experimental.core.explore.support.MongoJobExplorerFactoryBean; -import org.springframework.batch.repeat.RepeatStatus; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.data.mongodb.MongoDatabaseFactory; -import org.springframework.data.mongodb.MongoTransactionManager; -import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; -import org.springframework.data.mongodb.core.convert.MappingMongoConverter; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit.jupiter.SpringExtension; - -/** - * @author Mahmoud Ben Hassine - */ -@Testcontainers(disabledWithoutDocker = true) -@ExtendWith(SpringExtension.class) -@ContextConfiguration -public class MongoDBJobRepositoryIntegrationTests { - - private static final DockerImageName MONGODB_IMAGE = DockerImageName.parse("mongo:5.0.21"); - - @Container - public static MongoDBContainer mongodb = new MongoDBContainer(MONGODB_IMAGE); - - @Autowired - private MongoTemplate mongoTemplate; - - @BeforeEach - public void setUp() { - mongoTemplate.createCollection("BATCH_JOB_INSTANCE"); - mongoTemplate.createCollection("BATCH_JOB_EXECUTION"); - mongoTemplate.createCollection("BATCH_STEP_EXECUTION"); - mongoTemplate.createCollection("BATCH_JOB_INSTANCE_SEQ"); - mongoTemplate.createCollection("BATCH_JOB_EXECUTION_SEQ"); - mongoTemplate.createCollection("BATCH_STEP_EXECUTION_SEQ"); - mongoTemplate.getCollection("BATCH_JOB_INSTANCE_SEQ").insertOne(new Document("count", 0)); - mongoTemplate.getCollection("BATCH_JOB_EXECUTION_SEQ").insertOne(new Document("count", 0)); - mongoTemplate.getCollection("BATCH_STEP_EXECUTION_SEQ").insertOne(new Document("count", 0)); - } - - @Test - void testJobExecution(@Autowired JobLauncher jobLauncher, @Autowired Job job) throws Exception { - // given - JobParameters jobParameters = new JobParametersBuilder() - .addString("name", "foo") - .addLocalDateTime("runtime", LocalDateTime.now()) - .toJobParameters(); - - // when - JobExecution jobExecution = jobLauncher.run(job, jobParameters); - - // then - Assertions.assertNotNull(jobExecution); - Assertions.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); - - MongoCollection jobInstancesCollection = mongoTemplate.getCollection("BATCH_JOB_INSTANCE"); - MongoCollection jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION"); - MongoCollection stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION"); - - Assertions.assertEquals(1, jobInstancesCollection.countDocuments()); - Assertions.assertEquals(1, jobExecutionsCollection.countDocuments()); - Assertions.assertEquals(2, stepExecutionsCollection.countDocuments()); - - // dump results for inspection - for (Document document : jobInstancesCollection.find()) { - System.out.println("job instance = " + document.toJson()); - } - for (Document document : jobExecutionsCollection.find()) { - System.out.println("job execution = " + document.toJson()); - } - for (Document document : stepExecutionsCollection.find()) { - System.out.println("step execution = " + document.toJson()); - } - } - - @Configuration - @EnableBatchProcessing - static class TestConfiguration { - - @Bean - public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransactionManager transactionManager) throws Exception { - MongoJobRepositoryFactoryBean jobRepositoryFactoryBean = new MongoJobRepositoryFactoryBean(); - jobRepositoryFactoryBean.setMongoOperations(mongoTemplate); - jobRepositoryFactoryBean.setTransactionManager(transactionManager); - jobRepositoryFactoryBean.afterPropertiesSet(); - return jobRepositoryFactoryBean.getObject(); - } - - @Bean - public JobExplorer jobExplorer(MongoTemplate mongoTemplate, MongoTransactionManager transactionManager) throws Exception { - MongoJobExplorerFactoryBean jobExplorerFactoryBean = new MongoJobExplorerFactoryBean(); - jobExplorerFactoryBean.setMongoOperations(mongoTemplate); - jobExplorerFactoryBean.setTransactionManager(transactionManager); - jobExplorerFactoryBean.afterPropertiesSet(); - return jobExplorerFactoryBean.getObject(); - } - - @Bean - public MongoDatabaseFactory mongoDatabaseFactory() { - MongoClient mongoClient = MongoClients.create(mongodb.getConnectionString()); - return new SimpleMongoClientDatabaseFactory(mongoClient, "test"); - } - - @Bean - public MongoTemplate mongoTemplate(MongoDatabaseFactory mongoDatabaseFactory) { - MongoTemplate template = new MongoTemplate(mongoDatabaseFactory); - MappingMongoConverter converter = (MappingMongoConverter) template.getConverter(); - converter.setMapKeyDotReplacement("."); - return template; - } - - @Bean - public MongoTransactionManager transactionManager(MongoDatabaseFactory mongoDatabaseFactory) { - MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(); - mongoTransactionManager.setDbFactory(mongoDatabaseFactory); - mongoTransactionManager.afterPropertiesSet(); - return mongoTransactionManager; - } - - @Bean - public Job job(JobRepository jobRepository, MongoTransactionManager transactionManager) { - return new JobBuilder("job", jobRepository) - .start(new StepBuilder("step1", jobRepository) - .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager) - .build()) - .next(new StepBuilder("step2", jobRepository) - .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager) - .build()) - .build(); - } - - } - -} diff --git a/src/test/java/org/springframework/batch/experimental/item/support/CompositeItemReaderIntegrationTests.java b/src/test/java/org/springframework/batch/experimental/item/support/CompositeItemReaderIntegrationTests.java deleted file mode 100644 index f1a1ad7..0000000 --- a/src/test/java/org/springframework/batch/experimental/item/support/CompositeItemReaderIntegrationTests.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.experimental.item.support; - -import java.util.Arrays; - -import javax.sql.DataSource; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import org.springframework.batch.core.ExitStatus; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobParameters; -import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; -import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.launch.JobLauncher; -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.core.step.builder.StepBuilder; -import org.springframework.batch.item.database.JdbcBatchItemWriter; -import org.springframework.batch.item.database.JdbcCursorItemReader; -import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; -import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; -import org.springframework.batch.item.file.FlatFileItemReader; -import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.io.ClassPathResource; -import org.springframework.jdbc.core.DataClassRowMapper; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; -import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; -import org.springframework.jdbc.support.JdbcTransactionManager; -import org.springframework.test.jdbc.JdbcTestUtils; - -public class CompositeItemReaderIntegrationTests { - - record Person(int id, String name) { - } - - @Test - void testCompositeItemReader() throws Exception { - // given - ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class); - JobLauncher jobLauncher = context.getBean(JobLauncher.class); - Job job = context.getBean(Job.class); - - // when - JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); - - // then - Assertions.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); - JdbcTemplate jdbcTemplate = new JdbcTemplate(context.getBean(DataSource.class)); - int personsCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "person_target"); - Assertions.assertEquals(6, personsCount); - } - - @Configuration - @EnableBatchProcessing - static class JobConfiguration { - - @Bean - public FlatFileItemReader itemReader1() { - return new FlatFileItemReaderBuilder() - .name("personItemReader1") - .resource(new ClassPathResource("persons1.csv")) - .delimited() - .names("id", "name") - .targetType(Person.class) - .build(); - } - - @Bean - public FlatFileItemReader itemReader2() { - return new FlatFileItemReaderBuilder() - .name("personItemReader2") - .resource(new ClassPathResource("persons2.csv")) - .delimited() - .names("id", "name") - .targetType(Person.class) - .build(); - } - - @Bean - public JdbcCursorItemReader itemReader3() { - String sql = "select * from person_source"; - return new JdbcCursorItemReaderBuilder() - .name("personItemReader3") - .dataSource(dataSource()) - .sql(sql) - .rowMapper(new DataClassRowMapper<>(Person.class)) - .build(); - } - - @Bean - public CompositeItemReader itemReader() { - return new CompositeItemReader<>(Arrays.asList(itemReader1(), itemReader2(), itemReader3())); - } - - @Bean - public JdbcBatchItemWriter itemWriter() { - String sql = "insert into person_target (id, name) values (:id, :name)"; - return new JdbcBatchItemWriterBuilder() - .dataSource(dataSource()) - .sql(sql) - .beanMapped() - .build(); - } - - @Bean - public Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager) { - return new JobBuilder("job", jobRepository) - .start(new StepBuilder("step", jobRepository) - .chunk(5, transactionManager) - .reader(itemReader()) - .writer(itemWriter()) - .build()) - .build(); - } - - @Bean - public DataSource dataSource() { - return new EmbeddedDatabaseBuilder() - .setType(EmbeddedDatabaseType.H2) - .addScript("/org/springframework/batch/core/schema-drop-h2.sql") - .addScript("/org/springframework/batch/core/schema-h2.sql") - .addScript("schema.sql") - .addScript("data.sql") - .build(); - } - - @Bean - public JdbcTransactionManager transactionManager(DataSource dataSource) { - return new JdbcTransactionManager(dataSource); - } - - } -} diff --git a/src/test/resources/data.sql b/src/test/resources/data.sql deleted file mode 100644 index 6b99ba0..0000000 --- a/src/test/resources/data.sql +++ /dev/null @@ -1,2 +0,0 @@ -insert into person_source values (5, 'baz1'); -insert into person_source values (6, 'baz2'); \ No newline at end of file diff --git a/src/test/resources/persons2.csv b/src/test/resources/persons2.csv deleted file mode 100644 index e5a88e3..0000000 --- a/src/test/resources/persons2.csv +++ /dev/null @@ -1,2 +0,0 @@ -3,bar1 -4,bar2 \ No newline at end of file