diff --git a/pom.xml b/pom.xml
index 37a1340..95f7ab3 100755
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
0.4.0
oracle-r2dbc
- Oracle R2DBC Driver implementing version 0.9.0 of the R2DBC SPI for Oracle Database.
+ Oracle R2DBC Driver implementing version 1.0.0 of the R2DBC SPI for Oracle Database.
https://github.com/oracle/oracle-r2dbc
@@ -65,8 +65,8 @@
11
- 21.3.0.0
- 0.9.0.RELEASE
+ 21.5.0.0
+ 1.0.0.RELEASE
3.3.0.RELEASE
1.0.3
5.7.0
diff --git a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
index fb2dcd0..bd8ebf8 100644
--- a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
@@ -196,9 +196,8 @@ public Publisher flatMap(
*
*/
@Override
- public Publisher getRowsUpdated() {
- return publishSegments(UpdateCount.class,
- updateCount -> Math.toIntExact(updateCount.value()));
+ public Publisher getRowsUpdated() {
+ return publishSegments(UpdateCount.class, UpdateCount::value);
}
/**
diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
index bbd6f9a..5839cd6 100755
--- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
@@ -1457,7 +1457,7 @@ private JdbcBatch(
*/
@Override
protected Publisher bind() {
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked","rawtypes"})
Publisher[] bindPublishers = new Publisher[batchSize];
for (int i = 0; i < batchSize; i++) {
bindPublishers[i] = Flux.concat(
diff --git a/src/main/java/oracle/r2dbc/impl/ReadablesMetadata.java b/src/main/java/oracle/r2dbc/impl/ReadablesMetadata.java
index eeb3192..aa69e3e 100755
--- a/src/main/java/oracle/r2dbc/impl/ReadablesMetadata.java
+++ b/src/main/java/oracle/r2dbc/impl/ReadablesMetadata.java
@@ -237,22 +237,6 @@ public List extends ColumnMetadata> getColumnMetadatas() {
public boolean contains(String columnName) {
return getColumnIndex(columnName) != -1;
}
-
- /**
- * {@inheritDoc}
- *
- * Implements the R2DBC SPI method by returning a view of the column metadata
- * objects, with each list entry mapped to {@link ColumnMetadata#getName()}.
- * As specified by the SPI method documentation, the returned collection is
- * unmodifiable, imposes the same column ordering as the query result, and
- * supports case insensitive look ups.
- *
- */
- @Override
- public Collection getColumnNames() {
- throw new UnsupportedOperationException(
- "This method is deprecated for removal");
- }
}
static final class OutParametersMetadataImpl
diff --git a/src/test/java/oracle/r2dbc/impl/OracleConnectionImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleConnectionImplTest.java
index 6949992..ea9a61c 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleConnectionImplTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleConnectionImplTest.java
@@ -952,7 +952,7 @@ public void testSetAutoCommit() {
"Unexpected value returned by isAutoCommit() before subscribing to"
+ " setAutoCommit(true) publisher");
awaitMany(
- List.of(1, 1),
+ List.of(1L, 1L),
Flux.from(sessionA.createBatch()
.add("INSERT INTO testSetAutoCommit VALUES ('C')")
.add("INSERT INTO testSetAutoCommit VALUES ('C')")
diff --git a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
index 7b8b7db..d2e84dc 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
@@ -89,9 +89,9 @@ public void testGetRowsUpdated() {
.toIterable()
.iterator();
Result insertResult0 = insertResults.next();
- Publisher insertCountPublisher0 =
+ Publisher insertCountPublisher0 =
insertResult0.getRowsUpdated();
- awaitOne(1, insertCountPublisher0);
+ awaitOne(1L, insertCountPublisher0);
// Expect IllegalStateException from multiple Result consumptions.
assertThrows(IllegalStateException.class,
@@ -100,12 +100,12 @@ public void testGetRowsUpdated() {
() -> insertResult0.map((row, metadata) -> "unexpected"));
// Expect update count publisher to support multiple subscribers
- awaitOne(1, insertCountPublisher0);
+ awaitOne(1L, insertCountPublisher0);
Result insertResult1 = insertResults.next();
- Publisher insertCountPublisher1 =
+ Publisher insertCountPublisher1 =
insertResult1.getRowsUpdated();
- awaitOne(1, insertCountPublisher1);
+ awaitOne(1L, insertCountPublisher1);
// Expect IllegalStateException from multiple Result consumptions.
assertThrows(IllegalStateException.class,
@@ -114,16 +114,16 @@ public void testGetRowsUpdated() {
() -> insertResult1.map((row, metadata) -> "unexpected"));
// Expect update count publisher to support multiple subscribers
- awaitOne(1, insertCountPublisher1);
+ awaitOne(1L, insertCountPublisher1);
// Expect an update count of zero from UPDATE of zero rows
consumeOne(connection.createStatement(
"UPDATE testGetRowsUpdated SET y = 99 WHERE x = 99")
.execute(),
noUpdateResult -> {
- Publisher noUpdateCountPublisher =
+ Publisher noUpdateCountPublisher =
noUpdateResult.getRowsUpdated();
- awaitOne(0, noUpdateCountPublisher);
+ awaitOne(0L, noUpdateCountPublisher);
// Expect IllegalStateException from multiple Result consumptions.
assertThrows(IllegalStateException.class,
@@ -131,7 +131,7 @@ public void testGetRowsUpdated() {
assertThrows(IllegalStateException.class, noUpdateResult::getRowsUpdated);
// Expect update count publisher to support multiple subscribers
- awaitOne(0, noUpdateCountPublisher);
+ awaitOne(0L, noUpdateCountPublisher);
});
// Expect update count of 2 from UPDATE of 2 rows
@@ -139,8 +139,8 @@ public void testGetRowsUpdated() {
"UPDATE testGetRowsUpdated SET y = 2 WHERE x = 0")
.execute(),
updateResult -> {
- Publisher updateCountPublisher = updateResult.getRowsUpdated();
- awaitOne(2, updateCountPublisher);
+ Publisher updateCountPublisher = updateResult.getRowsUpdated();
+ awaitOne(2L, updateCountPublisher);
// Expect IllegalStateException from multiple Result consumptions.
assertThrows(IllegalStateException.class,
@@ -148,7 +148,7 @@ public void testGetRowsUpdated() {
assertThrows(IllegalStateException.class, updateResult::getRowsUpdated);
// Expect update count publisher to support multiple subscribers
- awaitOne(2, updateCountPublisher);
+ awaitOne(2L, updateCountPublisher);
});
// Expect no update count from SELECT
@@ -156,11 +156,11 @@ public void testGetRowsUpdated() {
"SELECT x,y FROM testGetRowsUpdated")
.execute())
.flatMapMany(selectResult -> {
- Publisher selectCountPublisher =
+ Publisher selectCountPublisher =
selectResult.getRowsUpdated();
// Expect update count publisher to support multiple subscribers
- Publisher result = Flux.concat(
+ Publisher result = Flux.concat(
Mono.from(selectCountPublisher).cache(),
Mono.from(selectCountPublisher).cache());
@@ -178,8 +178,8 @@ public void testGetRowsUpdated() {
.bind("x", 0)
.execute(),
deleteResult -> {
- Publisher deleteCountPublisher = deleteResult.getRowsUpdated();
- awaitOne(2, deleteCountPublisher);
+ Publisher deleteCountPublisher = deleteResult.getRowsUpdated();
+ awaitOne(2L, deleteCountPublisher);
// Expect IllegalStateException from multiple Result consumptions.
assertThrows(IllegalStateException.class,
@@ -187,7 +187,7 @@ public void testGetRowsUpdated() {
assertThrows(IllegalStateException.class, deleteResult::getRowsUpdated);
// Expect update count publisher to support multiple subscribers
- awaitOne(2, deleteCountPublisher);
+ awaitOne(2L, deleteCountPublisher);
});
}
finally {
@@ -473,7 +473,7 @@ public void testFilter() {
// UpdateCount segment to be published by getRowsUpdated
AtomicReference unfilteredUpdateCount =
new AtomicReference<>(null);
- awaitOne(1, Flux.from(connection.createStatement(
+ awaitOne(1L, Flux.from(connection.createStatement(
"INSERT INTO testFilter VALUES (1)")
.execute())
.map(result ->
@@ -529,7 +529,7 @@ public void testFilter() {
.execute())
.block(sqlTimeout());
Result filteredResult = unfilteredResult.filter(segment -> false);
- Publisher filteredUpdateCounts = filteredResult.getRowsUpdated();
+ Publisher filteredUpdateCounts = filteredResult.getRowsUpdated();
assertThrows(
IllegalStateException.class, unfilteredResult::getRowsUpdated);
assertThrows(
@@ -545,13 +545,13 @@ public void testFilter() {
.block(sqlTimeout());
Result filteredResult2 = unfilteredResult2.filter(segment ->
fail("Unexpected invocation"));
- Publisher unfilteredUpdateCounts =
+ Publisher unfilteredUpdateCounts =
unfilteredResult2.getRowsUpdated();
assertThrows(
IllegalStateException.class, filteredResult2::getRowsUpdated);
assertThrows(
IllegalStateException.class, unfilteredResult2::getRowsUpdated);
- awaitOne(1, unfilteredUpdateCounts);
+ awaitOne(1L, unfilteredUpdateCounts);
// Execute an INSERT that fails, and filter Message type segments.
// Expect the Result to not emit {@code onError} when consumed.
diff --git a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
index 1ae85cd..48ca4b3 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
@@ -22,6 +22,8 @@
package oracle.r2dbc.impl;
import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.ConnectionFactories;
+import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.Parameter;
import io.r2dbc.spi.Parameters;
import io.r2dbc.spi.R2dbcException;
@@ -32,6 +34,8 @@
import io.r2dbc.spi.Result.UpdateCount;
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.Type;
+import oracle.r2dbc.OracleR2dbcOptions;
+import oracle.r2dbc.test.DatabaseConfig;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -43,16 +47,28 @@
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static java.lang.String.format;
import static java.util.Arrays.asList;
import static oracle.r2dbc.test.DatabaseConfig.connectTimeout;
+import static oracle.r2dbc.test.DatabaseConfig.host;
import static oracle.r2dbc.test.DatabaseConfig.newConnection;
+import static oracle.r2dbc.test.DatabaseConfig.password;
+import static oracle.r2dbc.test.DatabaseConfig.port;
+import static oracle.r2dbc.test.DatabaseConfig.serviceName;
import static oracle.r2dbc.test.DatabaseConfig.sharedConnection;
+import static oracle.r2dbc.test.DatabaseConfig.sqlTimeout;
+import static oracle.r2dbc.test.DatabaseConfig.user;
import static oracle.r2dbc.util.Awaits.awaitError;
import static oracle.r2dbc.util.Awaits.awaitExecution;
import static oracle.r2dbc.util.Awaits.awaitMany;
@@ -811,7 +827,7 @@ public void testAdd() {
// Expect the statement to execute with previously added binds, and
// then emit an error if binds are missing in the final set of binds.
- List> signals =
+ List> signals =
awaitOne(Flux.from(connection.createStatement(
"INSERT INTO testAdd VALUES (:x, :y)")
.bind("x", 0).bind("y", 1).add()
@@ -911,7 +927,7 @@ public void testExecute() {
selectStatement::execute);
// Expect update to execute when a subscriber subscribes
- awaitOne(1,
+ awaitOne(1L,
Flux.from(updatePublisher)
.flatMap(result -> result.getRowsUpdated()));
awaitQuery(
@@ -1767,9 +1783,9 @@ public void testNoOutImplicitResult() {
IntStream.range(0, 100)
.forEach(i -> insert.bind(0, i).add());
insert.bind(0, 100);
- awaitOne(101, Flux.from(insert.execute())
+ awaitOne(101L, Flux.from(insert.execute())
.flatMap(Result::getRowsUpdated)
- .reduce(0, (total, updateCount) -> total + updateCount));
+ .reduce(0L, (total, updateCount) -> total + updateCount));
// Create a procedure that returns a cursor
awaitExecution(connection.createStatement(
@@ -1844,8 +1860,8 @@ public void testNoOutImplicitResult() {
.collectList()));
// Expect Implicit Results to have no update counts
- AtomicInteger count = new AtomicInteger(-9);
- awaitMany(asList(-9, -10),
+ AtomicLong count = new AtomicLong(-9);
+ awaitMany(asList(-9L, -10L),
Flux.from(connection.createStatement("BEGIN countDown; END;")
.execute())
.concatMap(result ->
@@ -1878,9 +1894,9 @@ public void testOutAndImplicitResult() {
IntStream.range(0, 100)
.forEach(i -> insert.bind(0, i).add());
insert.bind(0, 100);
- awaitOne(101, Flux.from(insert.execute())
+ awaitOne(101L, Flux.from(insert.execute())
.flatMap(Result::getRowsUpdated)
- .reduce(0, (total, updateCount) -> total + updateCount));
+ .reduce(0L, (total, updateCount) -> total + updateCount));
// Create a procedure that returns a cursor
awaitExecution(connection.createStatement(
@@ -1961,8 +1977,8 @@ public void testOutAndImplicitResult() {
.collectList()));
// Expect Implicit Results to have no update counts
- AtomicInteger count = new AtomicInteger(-8);
- awaitMany(asList(-8, -9, -10),
+ AtomicLong count = new AtomicLong(-8);
+ awaitMany(asList(-8L, -9L, -10L),
Flux.from(connection.createStatement("BEGIN countDown(?); END;")
.bind(0, Parameters.out(R2dbcType.VARCHAR))
.execute())
@@ -2033,119 +2049,98 @@ else if (index == 1) {
}
/**
- * Verifies that concurrent statement execution does not cause threads
- * to block.
+ * Verifies that concurrent statement execution on a single
+ * connection does not cause threads to block when there are many threads
+ * available.
*/
@Test
- public void testConcurrentExecute() {
- Connection connection = awaitOne(sharedConnection());
+ public void testConcurrentExecuteManyThreads() throws InterruptedException {
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
try {
-
- // Create many statements and execute them in parallel. "Many" should
- // be enough to exhaust the common ForkJoinPool if any thread gets blocked
- Publisher[] publishers =
- new Publisher[ForkJoinPool.getCommonPoolParallelism() * 4];
-
- for (int i = 0; i < publishers.length; i++) {
- Flux flux = Flux.from(connection.createStatement(
- "SELECT " + i + " FROM sys.dual")
- .execute())
- .flatMap(result ->
- result.map(row -> row.get(0, Integer.class)))
- .cache();
-
- flux.subscribe();
- publishers[i] = flux;
+ Connection connection = awaitOne(connect(executorService));
+ try {
+ verifyConcurrentExecute(connection);
+ }
+ finally {
+ tryAwaitNone(connection.close());
}
-
- awaitMany(
- IntStream.range(0, publishers.length)
- .boxed()
- .collect(Collectors.toList()),
- Flux.concat(publishers));
}
finally {
- tryAwaitNone(connection.close());
+ executorService.shutdown();
+ executorService.awaitTermination(
+ sqlTimeout().toSeconds(), TimeUnit.SECONDS);
}
}
/**
- * Verifies that concurrent statement execution and row fetching does not
- * cause threads to block.
+ * Verifies that concurrent statement execution on a single
+ * connection does not cause threads to block when there is just one thread
+ * available.
*/
@Test
- public void testConcurrentFetch() {
- Connection connection = awaitOne(sharedConnection());
+ public void testConcurrentExecuteSingleThread() throws InterruptedException {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
-
- awaitExecution(connection.createStatement(
- "CREATE TABLE testConcurrentFetch (value NUMBER)"));
-
- // Create many statements and execute them in parallel. "Many" should
- // be enough to exhaust the common ForkJoinPool if any thread gets blocked
- Publisher[] publishers =
- new Publisher[ForkJoinPool.getCommonPoolParallelism() * 4];
-
- for (int i = 0; i < publishers.length; i++) {
-
- // Each publisher batch inserts a range of 100 values
- Statement statement = connection.createStatement(
- "INSERT INTO testConcurrentFetch VALUES (?)");
- int start = i * 100;
- statement.bind(0, start);
- IntStream.range(start + 1, start + 100)
- .forEach(value -> {
- statement.add().bind(0, value);
- });
-
- Mono mono = Flux.from(statement.execute())
- .flatMap(Result::getRowsUpdated)
- .collect(Collectors.summingInt(Integer::intValue))
- .cache();
-
- // Execute in parallel, and retain the result for verification later
- mono.subscribe();
- publishers[i] = mono;
+ Connection connection = awaitOne(connect(executorService));
+ try {
+ verifyConcurrentExecute(connection);
}
-
- // Expect each publisher to emit an update count of 100
- awaitMany(
- IntStream.range(0, publishers.length)
- .map(i -> 100)
- .boxed()
- .collect(Collectors.toList()),
- Flux.merge(publishers));
-
- // Create publishers that fetch rows in parallel
- Publisher>[] fetchPublishers =
- new Publisher[publishers.length];
-
- for (int i = 0; i < publishers.length; i++) {
- Mono> mono = Flux.from(connection.createStatement(
- "SELECT value FROM testConcurrentFetch ORDER BY value")
- .execute())
- .flatMap(result ->
- result.map(row -> row.get(0, Integer.class)))
- .collect(Collectors.toList())
- .cache();
-
- // Execute in parallel, and retain the result for verification later
- mono.subscribe();
- fetchPublishers[i] = mono;
+ finally {
+ tryAwaitNone(connection.close());
}
+ }
+ finally {
+ executorService.shutdown();
+ executorService.awaitTermination(
+ sqlTimeout().toSeconds(), TimeUnit.SECONDS);
+ }
+ }
- // Expect each fetch publisher to get the same result
- List expected = IntStream.range(0, publishers.length * 100)
- .boxed()
- .collect(Collectors.toList());
+ /**
+ * Verifies that concurrent statement execution and row fetching on a single
+ * connection does not cause threads to block when there is just one thread
+ * available.
+ */
+ @Test
+ public void testConcurrentFetchSingleThread() throws InterruptedException {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ try {
+ Connection connection = awaitOne(connect(executorService));
+ try {
+ verifyConcurrentFetch(connection);
+ }
+ finally {
+ tryAwaitNone(connection.close());
+ }
+ }
+ finally {
+ executorService.shutdown();
+ executorService.awaitTermination(
+ sqlTimeout().toSeconds(), TimeUnit.SECONDS);
+ }
+ }
- for (Publisher> publisher : fetchPublishers)
- awaitOne(expected, publisher);
+ /**
+ * Verifies that concurrent statement execution and row fetching on a single
+ * connection does not cause threads to block when there are many threads
+ * available.
+ */
+ @Test
+ public void testConcurrentFetchManyThreads() throws InterruptedException {
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+ try {
+ Connection connection = awaitOne(connect(executorService));
+ try {
+ verifyConcurrentFetch(connection);
+ }
+ finally {
+ tryAwaitNone(connection.close());
+ }
}
finally {
- tryAwaitExecution(connection.createStatement(
- "DROP TABLE testConcurrentFetch"));
- tryAwaitNone(connection.close());
+ executorService.shutdown();
+ executorService.awaitTermination(
+ sqlTimeout().toSeconds(), TimeUnit.SECONDS);
}
}
@@ -2162,6 +2157,7 @@ public void testUsingWhenCancel() {
"CREATE TABLE testUsingWhenCancel (value NUMBER)"));
// Use more threads than what the FJP has available
+ @SuppressWarnings({"unchecked","rawtypes"})
Publisher[] publishers =
new Publisher[ForkJoinPool.getCommonPoolParallelism() * 4];
@@ -2275,4 +2271,132 @@ public Object getValue() {
return value;
}
}
+
+ /**
+ * Connect to the database configured by {@link DatabaseConfig}, with a
+ * the connection configured to use a given {@code executor} for async
+ * callbacks.
+ * @param executor Executor for async callbacks
+ * @return Connection that uses the {@code executor}
+ */
+ private static Publisher extends Connection> connect(Executor executor) {
+ return ConnectionFactories.get(
+ ConnectionFactoryOptions.parse(format(
+ "r2dbc:oracle://%s:%d/%s", host(), port(), serviceName()))
+ .mutate()
+ .option(
+ ConnectionFactoryOptions.USER, user())
+ .option(
+ ConnectionFactoryOptions.PASSWORD, password())
+ .option(
+ OracleR2dbcOptions.EXECUTOR, executor)
+ .build())
+ .create();
+ }
+
+ /**
+ * Verifies concurrent statement execution the given {@code connection}
+ * @param connection Connection to verify
+ */
+ private void verifyConcurrentExecute(Connection connection) {
+
+ // Create many statements and execute them in parallel.
+ @SuppressWarnings({"unchecked","rawtypes"})
+ Publisher[] publishers = new Publisher[8];
+
+ for (int i = 0; i < publishers.length; i++) {
+ Flux flux = Flux.from(connection.createStatement(
+ "SELECT " + i + " FROM sys.dual")
+ .execute())
+ .flatMap(result ->
+ result.map(row -> row.get(0, Integer.class)))
+ .cache();
+
+ flux.subscribe();
+ publishers[i] = flux;
+ }
+
+ awaitMany(
+ IntStream.range(0, publishers.length)
+ .boxed()
+ .collect(Collectors.toList()),
+ Flux.concat(publishers));
+ }
+
+ /**
+ * Verifies concurrent row fetching with the given {@code connection}
+ * @param connection Connection to verify
+ */
+ private void verifyConcurrentFetch(Connection connection) {
+ try {
+ awaitExecution(connection.createStatement(
+ "CREATE TABLE testConcurrentFetch (value NUMBER)"));
+
+ // Create many statements and execute them in parallel.
+ @SuppressWarnings({"unchecked","rawtypes"})
+ Publisher[] publishers = new Publisher[8];
+
+ for (int i = 0; i < publishers.length; i++) {
+
+ Statement statement = connection.createStatement(
+ "INSERT INTO testConcurrentFetch VALUES (?)");
+
+ // Each publisher batch inserts a range of 10 values
+ int start = i * 10;
+ statement.bind(0, start);
+ IntStream.range(start + 1, start + 10)
+ .forEach(value -> {
+ statement.add().bind(0, value);
+ });
+
+ Mono mono = Flux.from(statement.execute())
+ .flatMap(Result::getRowsUpdated)
+ .collect(Collectors.summingLong(Long::longValue))
+ .cache();
+
+ // Execute in parallel, and retain the result for verification later
+ mono.subscribe();
+ publishers[i] = mono;
+ }
+
+ // Expect each publisher to emit an update count of 100
+ awaitMany(
+ Stream.generate(() -> 10L)
+ .limit(publishers.length)
+ .collect(Collectors.toList()),
+ Flux.merge(publishers));
+
+ // Create publishers that fetch rows in parallel
+ @SuppressWarnings({"unchecked","rawtypes"})
+ Publisher>[] fetchPublishers =
+ new Publisher[publishers.length];
+
+ for (int i = 0; i < fetchPublishers.length; i++) {
+ Mono> mono = Flux.from(connection.createStatement(
+ "SELECT value FROM testConcurrentFetch ORDER BY value")
+ .execute())
+ .flatMap(result ->
+ result.map(row -> row.get(0, Integer.class)))
+ .sort()
+ .collect(Collectors.toList())
+ .cache();
+
+ // Execute in parallel, and retain the result for verification later
+ mono.subscribe();
+ fetchPublishers[i] = mono;
+ }
+
+ // Expect each fetch publisher to get the same result
+ List expected = IntStream.range(0, publishers.length * 10)
+ .boxed()
+ .collect(Collectors.toList());
+
+ for (Publisher> publisher : fetchPublishers)
+ awaitOne(expected, publisher);
+ }
+ finally {
+ tryAwaitExecution(connection.createStatement(
+ "DROP TABLE testConcurrentFetch"));
+ }
+ }
}
diff --git a/src/test/java/oracle/r2dbc/test/OracleTestKit.java b/src/test/java/oracle/r2dbc/test/OracleTestKit.java
index 058e913..c526fff 100755
--- a/src/test/java/oracle/r2dbc/test/OracleTestKit.java
+++ b/src/test/java/oracle/r2dbc/test/OracleTestKit.java
@@ -213,9 +213,9 @@ private Object extractColumn(String name, Row row) {
*
*/
@Override
- public Mono extractRowsUpdated(Result result) {
+ public Mono extractRowsUpdated(Result result) {
return Flux.from(result.getRowsUpdated())
- .reduce(0, (total, updateCount) -> total + updateCount);
+ .reduce(0L, (total, updateCount) -> total + updateCount);
}
@Override
diff --git a/src/test/java/oracle/r2dbc/util/Awaits.java b/src/test/java/oracle/r2dbc/util/Awaits.java
index b985b07..81b4976 100644
--- a/src/test/java/oracle/r2dbc/util/Awaits.java
+++ b/src/test/java/oracle/r2dbc/util/Awaits.java
@@ -269,6 +269,7 @@ public static void awaitUpdate(
expectedCounts,
Flux.from(statement.execute())
.flatMap(result -> Flux.from(result.getRowsUpdated()))
+ .map(Math::toIntExact)
.collectList()
.block(sqlTimeout()),
"Unexpected update counts");