Skip to content

Commit

Permalink
Update missing information about error handling in ChunkListener
Browse files Browse the repository at this point in the history
Resolves #4384
  • Loading branch information
injae-kim authored and fmbenhassine committed Apr 30, 2024
1 parent 89db774 commit 74c0b27
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2022 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,11 +20,15 @@
/**
* Listener interface for the lifecycle of a chunk. A chunk can be thought of as a
* collection of items that are committed together.
* <p>
* {@link ChunkListener} shouldn't throw exceptions and expect continued processing, they
* must be handled in the implementation or the step will terminate.
*
* @author Lucas Ward
* @author Michael Minella
* @author Mahmoud Ben Hassine
* @author Parikshit Dutta
* @author Injae Kim
*/
public interface ChunkListener extends StepListener {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2023 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -689,10 +689,6 @@ private void addNonRetryableExceptionIfMissing(Class<? extends Throwable>... cls
/**
* ChunkListener that wraps exceptions thrown from the ChunkListener in
* {@link FatalStepExecutionException} to force termination of StepExecution
* <p>
* ChunkListeners shoulnd't throw exceptions and expect continued processing, they
* must be handled in the implementation or the step will terminate
*
*/
private static class TerminateOnExceptionChunkListenerDelegate implements ChunkListener {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2023 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -328,6 +328,87 @@ public void afterChunkError(ChunkContext context) {
assertTrue(writeListener.trail.startsWith("1234"), "Listener order not as expected: " + writeListener.trail);
}

@Test
void testChunkListenersThrowException() throws Exception {
String[] items = new String[] { "1", "2", "3", "4", "5", "6", "7" };
int commitInterval = 3;

SimpleStepFactoryBean<String, String> factory = getStepFactory(items);
class AssertingWriteListener extends StepListenerSupport<Object, Object> {

String trail = "";

@Override
public void beforeWrite(Chunk<?> chunk) {
trail = trail + "2";
}

@Override
public void afterWrite(Chunk<?> items) {
trail = trail + "3";
}

}
class CountingChunkListener implements ChunkListener {

int beforeCount = 0;

int afterCount = 0;

int failedCount = 0;

private final AssertingWriteListener writeListener;

public CountingChunkListener(AssertingWriteListener writeListener) {
super();
this.writeListener = writeListener;
}

@Override
public void afterChunk(ChunkContext context) {
writeListener.trail = writeListener.trail + "4";
afterCount++;
throw new RuntimeException("Step will be terminated when ChunkListener throws exceptions.");
}

@Override
public void beforeChunk(ChunkContext context) {
writeListener.trail = writeListener.trail + "1";
beforeCount++;
throw new RuntimeException("Step will be terminated when ChunkListener throws exceptions.");
}

@Override
public void afterChunkError(ChunkContext context) {
writeListener.trail = writeListener.trail + "5";
failedCount++;
throw new RuntimeException("Step will be terminated when ChunkListener throws exceptions.");
}

}
AssertingWriteListener writeListener = new AssertingWriteListener();
CountingChunkListener chunkListener = new CountingChunkListener(writeListener);
factory.setListeners(new StepListener[] { chunkListener, writeListener });
factory.setCommitInterval(commitInterval);

AbstractStep step = (AbstractStep) factory.getObject();

job.setSteps(Collections.singletonList((Step) step));

JobExecution jobExecution = repository.createJobExecution(job.getName(), new JobParameters());
job.execute(jobExecution);

assertEquals(BatchStatus.FAILED, jobExecution.getStatus());
assertEquals("1", reader.read());
assertEquals(0, written.size());

assertEquals(0, chunkListener.afterCount);
assertEquals(1, chunkListener.beforeCount);
assertEquals(1, chunkListener.failedCount);
assertEquals("15", writeListener.trail);
assertTrue(writeListener.trail.startsWith("15"), "Listener order not as expected: " + writeListener.trail);
}

/*
* Commit interval specified is not allowed to be zero or negative.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ You can apply a `ChunkListener` when there is no chunk declaration. The `Tasklet
responsible for calling the `ChunkListener`, so it applies to a non-item-oriented tasklet
as well (it is called before and after the tasklet).

A `ChunkListener` is not designed to throw checked exceptions. Errors must be handled in the
implementation or the step will terminate.

[[itemReadListener]]
== `ItemReadListener`

Expand Down

0 comments on commit 74c0b27

Please sign in to comment.