Skip to content

Commit

Permalink
Fix output chunk end property in ChunkProcessor implementations
Browse files Browse the repository at this point in the history
Resolves #4560
  • Loading branch information
myanar7 authored and fmbenhassine committed Mar 14, 2024
1 parent f56a21c commit b3cc665
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2023 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -306,7 +306,9 @@ else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount()))
break;
}
}

if (inputs.isEnd()) {
outputs.setEnd();
}
return outputs;

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2022 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -340,6 +340,9 @@ protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) thr
iterator.remove();
}
}
if (inputs.isEnd()) {
outputs.setEnd();
}
return outputs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2023 the original author or authors.
* Copyright 2008-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
Expand Down Expand Up @@ -97,6 +98,16 @@ public String process(String item) throws Exception {
assertEquals(1, contribution.getFilterCount());
}

@Test
void testTransformChunkEnd() throws Exception {
Chunk<String> inputs = new Chunk<>(Arrays.asList("1", "2"));
inputs.setEnd();
processor.initializeUserData(inputs);
Chunk<String> outputs = processor.transform(contribution, inputs);
assertEquals(Arrays.asList("1", "2"), outputs.getItems());
assertTrue(outputs.isEnd());
}

@Test
void testFilterCountOnSkip() throws Exception {
processor.setProcessSkipPolicy(new AlwaysSkipItemSkipPolicy());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2023 the original author or authors.
* Copyright 2008-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
package org.springframework.batch.core.step.item;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -76,4 +77,15 @@ void testProcess() throws Exception {
assertEquals(2, contribution.getWriteCount());
}

@Test
void testTransform() throws Exception {
Chunk<String> inputs = new Chunk<>();
inputs.add("foo");
inputs.add("bar");
inputs.setEnd();
Chunk<String> outputs = processor.transform(contribution, inputs);
assertEquals(Arrays.asList("foo", "bar"), outputs.getItems());
assertTrue(outputs.isEnd());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2023 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -154,6 +154,13 @@ public int size() {

/**
* Flag to indicate if the source data is exhausted.
*
* <p>
* Note: This may return false if the last chunk has the same number of items as the
* configured commit interval. Consequently, in such cases,there will be a last empty
* chunk that won't be processed. It is recommended to consider this behavior when
* utilizing this method.
* </p>
* @return true if there is no more data to process
*/
public boolean isEnd() {
Expand Down

0 comments on commit b3cc665

Please sign in to comment.