From cf1f34b4f9c7f3e405aa573b7f3ac167c0dca375 Mon Sep 17 00:00:00 2001 From: Henning Poettker Date: Thu, 21 Sep 2023 21:22:24 +0200 Subject: [PATCH] Use more `SynchronizedItemReader` in tests --- ...tTolerantStepFactoryBeanRollbackTests.java | 47 +++++-------------- .../step/tasklet/AsyncTaskletStepTests.java | 33 +++++-------- 2 files changed, 24 insertions(+), 56 deletions(-) diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantStepFactoryBeanRollbackTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantStepFactoryBeanRollbackTests.java index ea08e13537..66060e11b6 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantStepFactoryBeanRollbackTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantStepFactoryBeanRollbackTests.java @@ -17,9 +17,7 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,6 +41,7 @@ import org.springframework.batch.core.step.factory.FaultTolerantStepFactoryBean; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.support.ListItemReader; +import org.springframework.batch.item.support.SynchronizedItemReader; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; import org.springframework.batch.support.transaction.TransactionAwareProxyFactory; import org.springframework.core.task.SimpleAsyncTaskExecutor; @@ -79,7 +78,6 @@ class FaultTolerantStepFactoryBeanRollbackTests { private JobRepository repository; - @SuppressWarnings("unchecked") @BeforeEach void setUp() throws Exception { reader = new SkipReaderStub<>(); @@ -103,7 +101,7 @@ void setUp() throws Exception { factory.setSkipLimit(2); - factory.setSkippableExceptionClasses(getExceptionMap(Exception.class)); + factory.setSkippableExceptionClasses(Map.of(Exception.class, true)); EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder() .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") @@ -177,7 +175,6 @@ void testReaderDefaultNoRollbackOnCheckedException() throws Exception { /** * Scenario: Exception in reader that should not cause rollback */ - @SuppressWarnings("unchecked") @Test void testReaderAttributesOverrideSkippableNoRollback() throws Exception { reader.setFailures("2", "3"); @@ -185,9 +182,9 @@ void testReaderAttributesOverrideSkippableNoRollback() throws Exception { reader.setExceptionType(SkippableException.class); // No skips by default - factory.setSkippableExceptionClasses(getExceptionMap(RuntimeException.class)); + factory.setSkippableExceptionClasses(Map.of(RuntimeException.class, true)); // But this one is explicit in the tx-attrs so it should be skipped - factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class)); + factory.setNoRollbackExceptionClasses(List.of(SkippableException.class)); Step step = factory.getObject(); @@ -249,11 +246,8 @@ void testNoRollbackInProcessorWhenSkipExceeded() throws Throwable { processor.clear(); factory.setItemProcessor(processor); - List> exceptions = Arrays.asList(Exception.class); - factory.setNoRollbackExceptionClasses(exceptions); - @SuppressWarnings("unchecked") - Map, Boolean> skippable = getExceptionMap(Exception.class); - factory.setSkippableExceptionClasses(skippable); + factory.setNoRollbackExceptionClasses(List.of(Exception.class)); + factory.setSkippableExceptionClasses(Map.of(Exception.class, true)); processor.setFailures("2"); @@ -279,7 +273,7 @@ void testProcessSkipWithNoRollbackForCheckedException() throws Exception { processor.setFailures("4"); processor.setExceptionType(SkippableException.class); - factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class)); + factory.setNoRollbackExceptionClasses(List.of(SkippableException.class)); Step step = factory.getObject(); @@ -359,7 +353,7 @@ void testWriterNoRollbackOnRuntimeException() throws Exception { writer.setFailures("2", "3"); writer.setExceptionType(SkippableRuntimeException.class); - factory.setNoRollbackExceptionClasses(getExceptionList(SkippableRuntimeException.class)); + factory.setNoRollbackExceptionClasses(List.of(SkippableRuntimeException.class)); Step step = factory.getObject(); @@ -380,7 +374,7 @@ void testWriterNoRollbackOnCheckedException() throws Exception { writer.setFailures("2", "3"); writer.setExceptionType(SkippableException.class); - factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class)); + factory.setNoRollbackExceptionClasses(List.of(SkippableException.class)); Step step = factory.getObject(); @@ -517,12 +511,7 @@ void testSkipInWriterTransactionalReader() throws Exception { @Test void testMultithreadedSkipInWriter() throws Exception { - factory.setItemReader(new ItemReader<>() { - @Override - public synchronized String read() throws Exception { - return reader.read(); - } - }); + factory.setItemReader(new SynchronizedItemReader<>(reader)); writer.setFailures("1", "2", "3", "4", "5"); factory.setCommitInterval(3); factory.setSkipLimit(10); @@ -575,23 +564,9 @@ void testMultipleSkipsInWriterNonTransactionalProcessor() throws Exception { assertEquals("[1, 2, 3, 4, 5]", processor.getProcessed().toString()); } - @SuppressWarnings("unchecked") - private Collection> getExceptionList(Class arg) { - return Arrays.>asList(arg); - } - - @SuppressWarnings("unchecked") - private Map, Boolean> getExceptionMap(Class... args) { - Map, Boolean> map = new HashMap<>(); - for (Class arg : args) { - map.put(arg, true); - } - return map; - } - static class ExceptionThrowingChunkListener implements ChunkListener { - private int phase = -1; + private final int phase; public ExceptionThrowingChunkListener(int throwPhase) { this.phase = throwPhase; diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/AsyncTaskletStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/AsyncTaskletStepTests.java index c96f852634..5062972d08 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/AsyncTaskletStepTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/AsyncTaskletStepTests.java @@ -35,16 +35,16 @@ import org.springframework.batch.core.step.JobRepositorySupport; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.ItemStreamSupport; +import org.springframework.batch.item.ItemStream; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.batch.item.support.PassThroughItemProcessor; +import org.springframework.batch.item.support.SynchronizedItemReader; import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; import org.springframework.batch.repeat.support.RepeatTemplate; import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; import org.springframework.core.task.SimpleAsyncTaskExecutor; -import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; class AsyncTaskletStepTests { @@ -83,8 +83,8 @@ private void setUp() { RepeatTemplate chunkTemplate = new RepeatTemplate(); chunkTemplate.setCompletionPolicy(new SimpleCompletionPolicy(2)); - step.setTasklet(new TestingChunkOrientedTasklet<>(new ListItemReader<>(items), itemProcessor, itemWriter, - chunkTemplate)); + step.setTasklet(new TestingChunkOrientedTasklet<>(new SynchronizedItemReader<>(new ListItemReader<>(items)), + itemProcessor, itemWriter, chunkTemplate)); jobRepository = new JobRepositorySupport(); step.setJobRepository(jobRepository); @@ -96,12 +96,11 @@ private void setUp() { template.setTaskExecutor(taskExecutor); step.setStepOperations(template); - step.registerStream(new ItemStreamSupport() { + step.registerStream(new ItemStream() { private int count = 0; @Override public void update(ExecutionContext executionContext) { - super.update(executionContext); executionContext.putInt("counter", count++); } }); @@ -125,10 +124,8 @@ void testStepExecutionUpdates() throws Exception { step.execute(stepExecution); assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus()); - // assertEquals(25, stepExecution.getReadCount()); - // assertEquals(25, processed.size()); - assertTrue(stepExecution.getReadCount() >= 25); - assertTrue(processed.size() >= 25); + assertEquals(25, stepExecution.getReadCount()); + assertEquals(25, processed.size()); // Check commit count didn't spin out of control waiting for other // threads to finish... @@ -170,17 +167,13 @@ void testStepExecutionFailsWithProcessor() throws Exception { throttleLimit = 1; concurrencyLimit = 1; items = Arrays.asList("one", "barf", "three", "four"); - itemProcessor = new ItemProcessor<>() { - @Nullable - @Override - public String process(String item) throws Exception { - logger.info("Item: " + item); - processed.add(item); - if (item.equals("barf")) { - throw new RuntimeException("Planned processor error"); - } - return item; + itemProcessor = item -> { + logger.info("Item: " + item); + processed.add(item); + if (item.equals("barf")) { + throw new RuntimeException("Planned processor error"); } + return item; }; setUp();