diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/IterableStream.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/IterableStream.java index e87b868199a4..5d431f16209d 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/IterableStream.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/IterableStream.java @@ -3,10 +3,14 @@ package com.azure.core.util; +import com.azure.core.util.logging.ClientLogger; import reactor.core.publisher.Flux; +import java.util.Collections; import java.util.Iterator; +import java.util.Objects; import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * This class provides utility to iterate over values using standard 'for-each' style loops, or to convert them into a @@ -28,15 +32,30 @@ * @see Iterable */ public class IterableStream implements Iterable { + private final ClientLogger logger = new ClientLogger(IterableStream.class); private final Flux flux; + private final Iterable iterable; /** - * Creates instance with the given {@link Flux}. + * Creates an instance with the given {@link Flux}. * * @param flux Flux of items to iterate over. + * @throws NullPointerException if {@code flux} is {@code null}. */ public IterableStream(Flux flux) { - this.flux = flux; + this.flux = Objects.requireNonNull(flux, "'flux' cannot be null."); + this.iterable = null; + } + + /** + * Creates an instance with the given {@link Iterable}. + * + * @param iterable Collection of items to iterate over. + * @throws NullPointerException if {@code iterable} is {@code null}. + */ + public IterableStream(Iterable iterable) { + this.iterable = Objects.requireNonNull(iterable, "'iterable' cannot be null."); + this.flux = null; } /** @@ -46,7 +65,14 @@ public IterableStream(Flux flux) { * @return {@link Stream} of value {@code T}. */ public Stream stream() { - return flux.toStream(); + if (flux != null) { + return flux.toStream(); + } else if (iterable != null) { + return StreamSupport.stream(iterable.spliterator(), false); + } else { + logger.warning("IterableStream was not initialized with Iterable or Flux, returning empty stream."); + return Stream.empty(); + } } /** @@ -57,7 +83,13 @@ public Stream stream() { */ @Override public Iterator iterator() { - return flux.toIterable().iterator(); + if (flux != null) { + return flux.toIterable().iterator(); + } else if (iterable != null) { + return iterable.iterator(); + } else { + logger.warning("IterableStream was not initialized with Iterable or Flux, returning empty iterator."); + return Collections.emptyIterator(); + } } - } diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/util/IterableStreamTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/util/IterableStreamTests.java new file mode 100644 index 000000000000..5dbff7591459 --- /dev/null +++ b/sdk/core/azure-core/src/test/java/com/azure/core/util/IterableStreamTests.java @@ -0,0 +1,254 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Tests for {@link IterableStream}. + */ +public class IterableStreamTests { + @Test + public void requiresFlux() { + Assertions.assertThrows(NullPointerException.class, () -> new IterableStream<>((Flux) null)); + } + + @Test + public void requiresIterable() { + Assertions.assertThrows(NullPointerException.class, () -> new IterableStream<>((Iterable) null)); + } + + /** + * Tests that we can stream using a Flux. Subscribing on single, to ensure we don't hit an IllegalStateException. + */ + @Test + public void streamFlux() { + // Arrange + final Set expected = new HashSet<>(); + expected.add("Something"); + expected.add("Foo"); + expected.add("Bar"); + + final Flux flux = Flux.create(sink -> { + for (String c : expected) { + sink.next(c); + } + sink.complete(); + }).subscribeOn(Schedulers.parallel()); + final IterableStream iterableStream = new IterableStream<>(flux); + + // Act + final Set actual = iterableStream.stream().collect(Collectors.toSet()); + + // Assert + assertSets(expected, actual); + } + + /** + * Tests that we can stream over the Flux multiple times. Subscribing on single, to ensure we don't hit an + * IllegalStateException. + */ + @Test + public void streamFluxMultipleTimes() { + // Arrange + final Set expected = new HashSet<>(); + expected.add("Something"); + expected.add("Foo"); + expected.add("Bar"); + + final Flux flux = Flux.create(sink -> { + for (String c : expected) { + sink.next(c); + } + sink.complete(); + }).subscribeOn(Schedulers.single()); + final IterableStream iterableStream = new IterableStream<>(flux); + + // Act + final Set actual = iterableStream.stream().collect(Collectors.toSet()); + final Set actual2 = iterableStream.stream().collect(Collectors.toSet()); + + // Assert + assertSets(expected, actual); + assertSets(expected, actual2); + } + + /** + * Tests that we can iterate over the Flux. Subscribing on a parallel scheduler to see if it throws an + * IllegalStateException. + */ + @Test + public void iteratorFlux() { + // Arrange + final Set expected = new HashSet<>(); + expected.add("Something"); + expected.add("Foo"); + expected.add("Bar"); + + final Flux flux = Flux.create(sink -> { + for (String c : expected) { + sink.next(c); + } + sink.complete(); + }).subscribeOn(Schedulers.parallel()); + final IterableStream iterableStream = new IterableStream<>(flux); + + // Act & Assert + int counter = 0; + for (String actual : iterableStream) { + Assertions.assertTrue(expected.contains(actual)); + counter++; + } + + Assertions.assertEquals(expected.size(), counter); + } + + /** + * Tests that we can iterate over the Flux multiple times. Subscribing on a parallel scheduler to see if it throws + * an IllegalStateException. + */ + @Test + public void iteratorFluxMultipleTimes() { + // Arrange + final Set expected = new HashSet<>(); + expected.add("Something"); + expected.add("Foo"); + expected.add("Bar"); + + final Flux flux = Flux.create(sink -> { + for (String c : expected) { + sink.next(c); + } + sink.complete(); + }).subscribeOn(Schedulers.parallel()); + final IterableStream iterableStream = new IterableStream<>(flux); + + // Act & Assert + int counter = 0; + for (String actual : iterableStream) { + Assertions.assertTrue(expected.contains(actual)); + counter++; + } + + Assertions.assertEquals(expected.size(), counter); + + int counter2 = 0; + for (String actual : iterableStream) { + Assertions.assertTrue(expected.contains(actual)); + counter2++; + } + + Assertions.assertEquals(expected.size(), counter2); + } + + /** + * Tests that we can stream using an Iterable. + */ + @Test + public void streamIterable() { + // Arrange + final Set expected = new HashSet<>(); + expected.add("Something"); + expected.add("Foo"); + expected.add("Bar"); + + final IterableStream iterableStream = new IterableStream<>(expected); + + // Act + final Set actual = iterableStream.stream().collect(Collectors.toSet()); + + // Assert + assertSets(expected, actual); + } + + /** + * Tests that we can stream over the Iterable multiple times. + */ + @Test + public void streamIterableMultipleTimes() { + // Arrange + final Set expected = new HashSet<>(); + expected.add("Something"); + expected.add("Foo"); + expected.add("Bar"); + + final IterableStream iterableStream = new IterableStream<>(expected); + + // Act + final Set actual = iterableStream.stream().collect(Collectors.toSet()); + final Set actual2 = iterableStream.stream().collect(Collectors.toSet()); + + // Assert + assertSets(expected, actual); + assertSets(expected, actual2); + } + + /** + * Tests that we can iterate over the Iterable. + */ + @Test + public void iteratorIterable() { + // Arrange + final Set expected = new HashSet<>(); + expected.add("Something"); + expected.add("Foo"); + expected.add("Bar"); + + final IterableStream iterableStream = new IterableStream<>(expected); + + // Act & Assert + int counter = 0; + for (String actual : iterableStream) { + Assertions.assertTrue(expected.contains(actual)); + counter++; + } + + Assertions.assertEquals(expected.size(), counter); + } + + /** + * Tests that we can iterate over the Iterable multiple times. + */ + @Test + public void iteratorIterableMultipleTimes() { + // Arrange + final Set expected = new HashSet<>(); + expected.add("Something"); + expected.add("Foo"); + expected.add("Bar"); + + final IterableStream iterableStream = new IterableStream<>(expected); + + // Act & Assert + int counter = 0; + for (String actual : iterableStream) { + Assertions.assertTrue(expected.contains(actual)); + counter++; + } + + Assertions.assertEquals(expected.size(), counter); + + int counter2 = 0; + for (String actual : iterableStream) { + Assertions.assertTrue(expected.contains(actual)); + counter2++; + } + + Assertions.assertEquals(expected.size(), counter2); + } + + private static void assertSets(Set expected, Set actual) { + Assertions.assertEquals(expected.size(), actual.size()); + for (String str : expected) { + Assertions.assertTrue(actual.contains(str)); + } + } +}