Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using faultTolerant and processor as a Function #3880

Closed
influence160 opened this issue Apr 8, 2021 · 1 comment
Closed

using faultTolerant and processor as a Function #3880

influence160 opened this issue Apr 8, 2021 · 1 comment
Labels
status: duplicate Issues that are duplicates of other issues

Comments

@influence160
Copy link

Hello,

i have this step which worked fine

@Bean
    public Step myDBMigrationStep() {
        SimpleStepBuilder builder = stepBuilderFactory.get("myDBMigrationStep")
                .allowStartIfComplete(ALLOW_START_IF_COMPLETED)
                .<Party, Intermediaire>chunk(CHUNK_SIZE)
                .reader(partyItemReader)
                .listener(logItemReadListener)
                .processor((Function<? super Party, ? extends Intermediaire>) (party) -> IntermediaireMapper.mapToIntermediaire(party))
                .listener(logItemProcessListener)
                .writer(intermediaireItemWriter)
                .listener(logItemWriterListener)
                .faultTolerant()
                .skipLimit(4723)//number of expected NullPasswordException is 4723
                .skip(NullPasswordException.class);

         builder.listener(logChunkListener)
                .listener(logStepExecutionListener);
        return builder.build();
    }

but after having the first NullPasswordException i restarted it and i got a strange exception

2021-04-08 10:06:56.865 TRACE 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemReadListener     : After Read [Party@503e94ef id = '78403']
2021-04-08 10:06:56.865 TRACE 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemReadListener     : Before Read.
2021-04-08 10:06:56.865 TRACE 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemReadListener     : After Read [Party@2cebcbe2 id = '78406']
2021-04-08 10:06:56.865 TRACE 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemReadListener     : Before Read.
2021-04-08 10:06:56.865 TRACE 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemReadListener     : After Read [Party@33704a19 id = '78407']
2021-04-08 10:06:56.865 TRACE 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemReadListener     : Before Read.
2021-04-08 10:06:56.865 TRACE 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemReadListener     : After Read [Party@352aded6 id = '78408']
2021-04-08 10:06:56.897 DEBUG 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemWriterListener   : After items write 100 class my.hidden.pakage.v3.model.Party from [Party@4bf7b9e0 id = '78278'] to [Party@352aded6 id = '78408']
2021-04-08 10:06:56.912  INFO 37688 --- [SimpleAsyncTaskExecutor-1] f.d.o.m.job.step.LogItemWriterListener   : On items write Error100 class my.hidden.pakage.v3.model.Party from [Party@4bf7b9e0 id = '78278'] to [Party@352aded6 id = '78408']
org.springframework.retry.ExhaustedRetryException: Retry exhausted after last attempt in recovery path, but exception is not skippable.; nested exception is java.lang.ClassCastException: class my.hidden.pakage.v3.model.Party cannot be cast to class my.hidden.pakage.v4.model.Intermediaire (my.hidden.pakage.v3.model.Party and my.hidden.pakage.v4.model.Intermediaire are in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @67897b34)
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$5.recover(FaultTolerantChunkProcessor.java:429) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) ~[spring-retry-1.3.1.jar:na]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) ~[spring-retry-1.3.1.jar:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) ~[spring-retry-1.3.1.jar:na]
	at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.write(FaultTolerantChunkProcessor.java:444) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.3.jar:5.3.3]
	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.1.jar:4.3.1]
	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.1.jar:4.3.1]
	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.ClassCastException: class my.hidden.pakage.v3.model.Party cannot be cast to class my.hidden.pakage.v4.model.Intermediaire (my.hidden.pakage.v3.model.Party and my.hidden.pakage.v4.model.Intermediaire are in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @67897b34)
	at my.hidden.pakage.job.step.portefeuille.IntermediaireItemWriter.write(IntermediaireItemWriter.java:33) ~[classes/:na]
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$3.doWithRetry(FaultTolerantChunkProcessor.java:348) ~[spring-batch-core-4.3.1.jar:4.3.1]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.1.jar:na]
	... 24 common frames omitted

as you can see in the code i have listeners which write log after and before item reading, processing and writing

                .listener(logItemReadListener)
                .listener(logItemProcessListener)
                .listener(logItemWriterListener)

and you can see that the exception is hapening in the writer and the writer is getting an instance of Party insteads of an Intermediaire
and the log is not showing "Before Process" and "After Process" between "After Read" and "Before Write"
it is like he ignored the processor

Note that this same code was working well before that i add the faultTolerant :

                .faultTolerant()
                .skipLimit(4723)//number of expected NullPasswordException is 4723
                .skip(NullPasswordException.class);

without this lines the code was working and when i get an exception i could restart from the last stop point after fixing it because my ItemReader is based on AbstractItemCountingItemStreamItemReader

now with this code (with faultTolerant) i have just changed the type of the processor from Function to ItemProcessor and the code is working

@Bean
    public Step myDBMigrationStep() {
        SimpleStepBuilder builder = stepBuilderFactory.get("myDBMigrationStep")
                .allowStartIfComplete(ALLOW_START_IF_COMPLETED)
                .<Party, Intermediaire>chunk(CHUNK_SIZE)
                .reader(partyItemReader)
                .listener(logItemReadListener)
                .processor((ItemProcessor<? super Party, ? extends Intermediaire>) (party) -> IntermediaireMapper.mapToIntermediaire(party))
                .listener(logItemProcessListener)
                .writer(intermediaireItemWriter)
                .listener(logItemWriterListener)
                .faultTolerant()
                .skipLimit(4723)//number of expected NullPasswordException is 4723
                .skip(NullPasswordException.class);

         builder.listener(logChunkListener)
                .listener(logStepExecutionListener);
        return builder.build();
    }

it seems that the faultTolerant dont like the Function processor

@fmbenhassine
Copy link
Contributor

Thank you for opening this issue.

Note that this same code was working well before that i add the faultTolerant :

I confirm this issue happens with v4.3.2. The following sample passes with a non fault-tolerant step but fails with a ClassCastException when using a fault-tolerant step:

import java.util.Arrays;
import java.util.function.Function;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class GH3880 {
	
	@Bean
	public Step step(StepBuilderFactory stepBuilderFactory) {
		return stepBuilderFactory.get("step")
				.<Integer, Person>chunk(2)
				.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
				.processor((Function<Integer, Person>) integer -> new Person("foo" + integer))
				.writer(items -> items.forEach(System.out::println))
//				.faultTolerant()
//				.skip(Exception.class)
//				.skipLimit(3)
				.build();
	}

	@Bean
	public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
		return jobBuilderFactory.get("job")
				.start(step(stepBuilderFactory))
				.build();
	}

	public static void main(String[] args) throws Exception {
		ApplicationContext context = new AnnotationConfigApplicationContext(GH3880.class);
		JobLauncher jobLauncher = context.getBean(JobLauncher.class);
		Job job = context.getBean(Job.class);
		jobLauncher.run(job, new JobParameters());
	}
	
	static class Person {
		private final String name;

		public Person(String name) {
			this.name = name;
		}

		@Override
		public String toString() {
			return name;
		}
	}

}

This is actually a duplicate of #3749 and the change suggested there fixes the issue. I will close this as duplicate and plan #3749 for v4.3.3. Thanks again for reporting this issue 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: duplicate Issues that are duplicates of other issues
Projects
None yet
Development

No branches or pull requests

2 participants