Skip to content

Commit

Permalink
Add blocking queue item reader and writer
Browse files Browse the repository at this point in the history
Resolves #2350
Resolves #2044
  • Loading branch information
fmbenhassine committed Oct 11, 2024
1 parent a293f3e commit 7c871e5
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.queue;

import org.springframework.batch.item.ItemReader;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* This is an {@link ItemReader} that reads items from a {@link BlockingQueue}. It stops
* reading (ie returns {@code null}) if no items are available in the queue after a
* configurable timeout.
*
* @param <T> type of items to read.
* @author Mahmoud Ben Hassine
* @since 5.2.0
*/
public class BlockingQueueItemReader<T> implements ItemReader<T> {

private final BlockingQueue<T> queue;

private long timeout = 1L;

private TimeUnit timeUnit = TimeUnit.SECONDS;

/**
* Create a new {@link BlockingQueueItemReader}.
* @param queue the queue to read items from
*/
public BlockingQueueItemReader(BlockingQueue<T> queue) {
this.queue = queue;
}

/**
* Set the reading timeout and time unit. Defaults to 1 second.
* @param timeout the timeout after which the reader stops reading
* @param timeUnit the unit of the timeout
*/
public void setTimeout(long timeout, TimeUnit timeUnit) {
this.timeout = timeout;
this.timeUnit = timeUnit;
}

@Override
public T read() throws Exception {
return this.queue.poll(this.timeout, this.timeUnit);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.queue;

import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;

import java.util.concurrent.BlockingQueue;

/**
* This is an {@link ItemWriter} that writes items to a {@link BlockingQueue}.
*
* @param <T> type of items to write
* @since 5.2.0
* @author Mahmoud Ben Hassine
*/
public class BlockingQueueItemWriter<T> implements ItemWriter<T> {

private final BlockingQueue<T> queue;

/**
* Create a new {@link BlockingQueueItemWriter}.
* @param queue the queue to write items to
*/
public BlockingQueueItemWriter(BlockingQueue<T> queue) {
this.queue = queue;
}

@Override
public void write(Chunk<? extends T> items) throws Exception {
for (T item : items) {
this.queue.put(item);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.queue.builder;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.util.Assert;

/**
* Builder for {@link BlockingQueueItemReader}.
*
* @param <T> type of items to read
* @since 5.2.0
* @author Mahmoud Ben Hassine
*/
public class BlockingQueueItemReaderBuilder<T> {

private BlockingQueue<T> queue;

private long timeout = 1L;

private TimeUnit timeUnit = TimeUnit.SECONDS;

/**
* Set the queue to read items from.
* @param queue the queue to read items from.
* @return this instance of the builder
*/
public BlockingQueueItemReaderBuilder<T> queue(BlockingQueue<T> queue) {
this.queue = queue;
return this;
}

/**
* Set the reading timeout. Defaults to 1 second.
* @param timeout the reading timeout.
* @return this instance of the builder
*/
public BlockingQueueItemReaderBuilder<T> timeout(long timeout, TimeUnit timeUnit) {
this.timeout = timeout;
this.timeUnit = timeUnit;
return this;
}

/**
* Create a configured {@link BlockingQueueItemReader}.
* @return a configured {@link BlockingQueueItemReader}.
*/
public BlockingQueueItemReader<T> build() {
Assert.state(this.queue != null, "The blocking queue is required.");
BlockingQueueItemReader<T> blockingQueueItemReader = new BlockingQueueItemReader<>(this.queue);
blockingQueueItemReader.setTimeout(this.timeout, this.timeUnit);
return blockingQueueItemReader;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.queue.builder;

import java.util.concurrent.BlockingQueue;

import org.springframework.batch.item.queue.BlockingQueueItemWriter;
import org.springframework.util.Assert;

/**
* Builder for a {@link BlockingQueueItemWriter}.
*
* @param <T> type of items to write
* @since 5.2.0
* @author Mahmoud Ben Hassine
*/
public class BlockingQueueItemWriterBuilder<T> {

private BlockingQueue<T> queue;

/**
* Create a new {@link BlockingQueueItemWriterBuilder}
* @param queue the queue to write items to
* @return this instance of the builder
*/
public BlockingQueueItemWriterBuilder<T> queue(BlockingQueue<T> queue) {
this.queue = queue;
return this;
}

/**
* Create a configured {@link BlockingQueueItemWriter}.
* @return a configured {@link BlockingQueueItemWriter}.
*/
public BlockingQueueItemWriter<T> build() {
Assert.state(this.queue != null, "The blocking queue is required.");
return new BlockingQueueItemWriter<>(this.queue);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.springframework.batch.item.queue.builder.BlockingQueueItemReaderBuilder;

/**
* Test class for {@link BlockingQueueItemReader}.
*
* @author Mahmoud Ben Hassine
*/
class BlockingQueueItemReaderTests {

@Test
void testRead() throws Exception {
// given
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
queue.put("foo");
BlockingQueueItemReader<String> reader = new BlockingQueueItemReaderBuilder<String>().queue(queue)
.timeout(10, TimeUnit.MILLISECONDS)
.build();

// when & then
Assertions.assertEquals("foo", reader.read());
Assertions.assertNull(reader.read());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.queue;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.junit.jupiter.api.Test;

import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.queue.builder.BlockingQueueItemWriterBuilder;

import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Test class for {@link BlockingQueueItemWriter}.
*
* @author Mahmoud Ben Hassine
*/
class BlockingQueueItemWriterTests {

@Test
void testWrite() throws Exception {
// given
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
BlockingQueueItemWriter<String> writer = new BlockingQueueItemWriterBuilder<String>().queue(queue).build();

// when
writer.write(Chunk.of("foo", "bar"));

// then
assertTrue(queue.containsAll(List.of("foo", "bar")));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.queue.builder;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.junit.jupiter.api.Test;

import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.test.util.ReflectionTestUtils;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* Test class for {@link BlockingQueueItemReaderBuilder}.
*
* @author Mahmoud Ben Hassine
*/
class BlockingQueueItemReaderBuilderTests {

@Test
void testMandatoryQueue() {
assertThrows(IllegalStateException.class, () -> new BlockingQueueItemReaderBuilder<String>().build());
}

@Test
void testBuildReader() {
// given
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

// when
BlockingQueueItemReader<String> reader = new BlockingQueueItemReaderBuilder<String>().queue(queue).build();

// then
assertNotNull(reader);
assertEquals(queue, ReflectionTestUtils.getField(reader, "queue"));
}

}
Loading

0 comments on commit 7c871e5

Please sign in to comment.