Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@

package com.azure.core.util;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Flux;

import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* This class provides utility to iterate over values using standard 'for-each' style loops, or to convert them into a
Expand All @@ -28,15 +32,30 @@
* @see Iterable
*/
public class IterableStream<T> implements Iterable<T> {
private final ClientLogger logger = new ClientLogger(IterableStream.class);
private final Flux<T> flux;
private final Iterable<T> iterable;

/**
* Creates instance with the given {@link Flux}.
* Creates an instance with the given {@link Flux}.
*
* @param flux Flux of items to iterate over.
* @throws NullPointerException if {@code flux} is {@code null}.
*/
public IterableStream(Flux<T> flux) {
this.flux = flux;
this.flux = Objects.requireNonNull(flux, "'flux' cannot be null.");
this.iterable = null;
}

/**
* Creates an instance with the given {@link Iterable}.
*
* @param iterable Collection of items to iterate over.
* @throws NullPointerException if {@code iterable} is {@code null}.
*/
public IterableStream(Iterable<T> iterable) {
this.iterable = Objects.requireNonNull(iterable, "'iterable' cannot be null.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having flux and iterable fields in this class, if an instance of IterableStream is created using an iterable, can this just be converted to this.flux = Flux.fromIterable(Objects.requireNonNull(iterable, "'iterable' cannot be null."));. Simplifies code in other methods too where you don't have to check if flux is null or iterable is null.

Copy link
Member Author

@conniey conniey Nov 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this will result in that illegal state exception again because we are moving from sync to async world then back again.

It was what the code was doing before this overload.

this.flux = null;
}

/**
Expand All @@ -46,7 +65,14 @@ public IterableStream(Flux<T> flux) {
* @return {@link Stream} of value {@code T}.
*/
public Stream<T> stream() {
return flux.toStream();
if (flux != null) {
return flux.toStream();
} else if (iterable != null) {
return StreamSupport.stream(iterable.spliterator(), false);
} else {
logger.warning("IterableStream was not initialized with Iterable or Flux, returning empty stream.");
return Stream.empty();
}
}

/**
Expand All @@ -57,7 +83,13 @@ public Stream<T> stream() {
*/
@Override
public Iterator<T> iterator() {
return flux.toIterable().iterator();
if (flux != null) {
return flux.toIterable().iterator();
} else if (iterable != null) {
return iterable.iterator();
} else {
logger.warning("IterableStream was not initialized with Iterable or Flux, returning empty iterator.");
return Collections.emptyIterator();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Tests for {@link IterableStream}.
*/
public class IterableStreamTests {
@Test
public void requiresFlux() {
Assertions.assertThrows(NullPointerException.class, () -> new IterableStream<>((Flux<String>) null));
}

@Test
public void requiresIterable() {
Assertions.assertThrows(NullPointerException.class, () -> new IterableStream<>((Iterable<String>) null));
}

/**
* Tests that we can stream using a Flux. Subscribing on single, to ensure we don't hit an IllegalStateException.
*/
@Test
public void streamFlux() {
// Arrange
final Set<String> expected = new HashSet<>();
expected.add("Something");
expected.add("Foo");
expected.add("Bar");

final Flux<String> flux = Flux.<String>create(sink -> {
for (String c : expected) {
sink.next(c);
}
sink.complete();
}).subscribeOn(Schedulers.parallel());
final IterableStream<String> iterableStream = new IterableStream<>(flux);

// Act
final Set<String> actual = iterableStream.stream().collect(Collectors.toSet());

// Assert
assertSets(expected, actual);
}

/**
* Tests that we can stream over the Flux multiple times. Subscribing on single, to ensure we don't hit an
* IllegalStateException.
*/
@Test
public void streamFluxMultipleTimes() {
// Arrange
final Set<String> expected = new HashSet<>();
expected.add("Something");
expected.add("Foo");
expected.add("Bar");

final Flux<String> flux = Flux.<String>create(sink -> {
for (String c : expected) {
sink.next(c);
}
sink.complete();
}).subscribeOn(Schedulers.single());
final IterableStream<String> iterableStream = new IterableStream<>(flux);

// Act
final Set<String> actual = iterableStream.stream().collect(Collectors.toSet());
final Set<String> actual2 = iterableStream.stream().collect(Collectors.toSet());

// Assert
assertSets(expected, actual);
assertSets(expected, actual2);
}

/**
* Tests that we can iterate over the Flux. Subscribing on a parallel scheduler to see if it throws an
* IllegalStateException.
*/
@Test
public void iteratorFlux() {
// Arrange
final Set<String> expected = new HashSet<>();
expected.add("Something");
expected.add("Foo");
expected.add("Bar");

final Flux<String> flux = Flux.<String>create(sink -> {
for (String c : expected) {
sink.next(c);
}
sink.complete();
}).subscribeOn(Schedulers.parallel());
final IterableStream<String> iterableStream = new IterableStream<>(flux);

// Act & Assert
int counter = 0;
for (String actual : iterableStream) {
Assertions.assertTrue(expected.contains(actual));
counter++;
}

Assertions.assertEquals(expected.size(), counter);
}

/**
* Tests that we can iterate over the Flux multiple times. Subscribing on a parallel scheduler to see if it throws
* an IllegalStateException.
*/
@Test
public void iteratorFluxMultipleTimes() {
// Arrange
final Set<String> expected = new HashSet<>();
expected.add("Something");
expected.add("Foo");
expected.add("Bar");

final Flux<String> flux = Flux.<String>create(sink -> {
for (String c : expected) {
sink.next(c);
}
sink.complete();
}).subscribeOn(Schedulers.parallel());
final IterableStream<String> iterableStream = new IterableStream<>(flux);

// Act & Assert
int counter = 0;
for (String actual : iterableStream) {
Assertions.assertTrue(expected.contains(actual));
counter++;
}

Assertions.assertEquals(expected.size(), counter);

int counter2 = 0;
for (String actual : iterableStream) {
Assertions.assertTrue(expected.contains(actual));
counter2++;
}

Assertions.assertEquals(expected.size(), counter2);
}

/**
* Tests that we can stream using an Iterable.
*/
@Test
public void streamIterable() {
// Arrange
final Set<String> expected = new HashSet<>();
expected.add("Something");
expected.add("Foo");
expected.add("Bar");

final IterableStream<String> iterableStream = new IterableStream<>(expected);

// Act
final Set<String> actual = iterableStream.stream().collect(Collectors.toSet());

// Assert
assertSets(expected, actual);
}

/**
* Tests that we can stream over the Iterable multiple times.
*/
@Test
public void streamIterableMultipleTimes() {
// Arrange
final Set<String> expected = new HashSet<>();
expected.add("Something");
expected.add("Foo");
expected.add("Bar");

final IterableStream<String> iterableStream = new IterableStream<>(expected);

// Act
final Set<String> actual = iterableStream.stream().collect(Collectors.toSet());
final Set<String> actual2 = iterableStream.stream().collect(Collectors.toSet());

// Assert
assertSets(expected, actual);
assertSets(expected, actual2);
}

/**
* Tests that we can iterate over the Iterable.
*/
@Test
public void iteratorIterable() {
// Arrange
final Set<String> expected = new HashSet<>();
expected.add("Something");
expected.add("Foo");
expected.add("Bar");

final IterableStream<String> iterableStream = new IterableStream<>(expected);

// Act & Assert
int counter = 0;
for (String actual : iterableStream) {
Assertions.assertTrue(expected.contains(actual));
counter++;
}

Assertions.assertEquals(expected.size(), counter);
}

/**
* Tests that we can iterate over the Iterable multiple times.
*/
@Test
public void iteratorIterableMultipleTimes() {
// Arrange
final Set<String> expected = new HashSet<>();
expected.add("Something");
expected.add("Foo");
expected.add("Bar");

final IterableStream<String> iterableStream = new IterableStream<>(expected);

// Act & Assert
int counter = 0;
for (String actual : iterableStream) {
Assertions.assertTrue(expected.contains(actual));
counter++;
}

Assertions.assertEquals(expected.size(), counter);

int counter2 = 0;
for (String actual : iterableStream) {
Assertions.assertTrue(expected.contains(actual));
counter2++;
}

Assertions.assertEquals(expected.size(), counter2);
}

private static void assertSets(Set<String> expected, Set<String> actual) {
Assertions.assertEquals(expected.size(), actual.size());
for (String str : expected) {
Assertions.assertTrue(actual.contains(str));
}
}
}