Skip to content
2 changes: 1 addition & 1 deletion eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ cosmos-spark_3-2_org.apache.spark:spark-hive_2.12;3.2.0
cosmos-spark_3-3_org.apache.spark:spark-hive_2.12;3.3.0
cosmos-spark_3-4_org.apache.spark:spark-hive_2.12;3.4.0
cosmos-spark_3-5_org.apache.spark:spark-hive_2.12;3.5.0
cosmos_org.scala-lang:scala-library;2.12.10
cosmos_org.scala-lang:scala-library;2.12.19
cosmos_org.scala-lang.modules:scala-java8-compat_2.12;0.8.0
cosmos_io.projectreactor:reactor-scala-extensions_2.12;0.8.0
cosmos_commons-io:commons-io;2.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
<maven.build.timestamp.format>MM-dd-HH-mm-ss</maven.build.timestamp.format>
<jacoco.min.branchcoverage>0.17</jacoco.min.branchcoverage>
<jacoco.min.linecoverage>0.18</jacoco.min.linecoverage>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jacoco.skip>true</jacoco.skip>
<shadingPrefix>azure_cosmos_spark_sample</shadingPrefix>
<legal>
Expand Down Expand Up @@ -84,7 +84,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version> <!-- {x-version-update;cosmos_org.scala-lang:scala-library;external_dependency} -->
<version>2.12.19</version> <!-- {x-version-update;cosmos_org.scala-lang:scala-library;external_dependency} -->
<scope>provided</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -143,6 +143,55 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.globalmentor</groupId>
<artifactId>hadoop-bare-naked-local-fs</artifactId>
<version>0.1.0</version> <!-- {x-version-update;cosmos_com.globalmentor:hadoop-bare-naked-local-fs;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.8.1</version> <!-- {x-version-update;cosmos_org.mockito:mockito-core;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.36</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.22.0</version> <!-- {x-version-update;org.assertj:assertj-core;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.12</artifactId>
<version>3.2.2</version> <!-- {x-version-update;cosmos_org.scalatest:scalatest_2.12;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-flatspec_2.12</artifactId>
<version>3.2.3</version> <!-- {x-version-update;cosmos_org.scalatest:scalatest-flatspec_2.12;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_2.12</artifactId>
<version>3.2.3</version> <!-- {x-version-update;cosmos_org.scalactic:scalactic_2.12;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalamock</groupId>
<artifactId>scalamock_2.12</artifactId>
<version>5.0.0</version> <!-- {x-version-update;cosmos_org.scalamock:scalamock_2.12;external_dependency} -->
<scope>test</scope>
</dependency>

<!-- Added this provided dependency to include necessary annotations used by "reactor-core".
Without this dependency, javadoc throws a warning as it cannot find enum When.MAYBE
which is used in @Nullable annotation in reactor core classes.
Expand Down Expand Up @@ -230,7 +279,7 @@
<include>org.slf4j:slf4j-api:[1.7.36]</include> <!-- {x-include-update;org.slf4j:slf4j-api;external_dependency} -->
<include>org.apache.spark:spark-sql_2.12:[3.5.0]</include> <!-- {x-include-update;cosmos-spark_3-5_org.apache.spark:spark-sql_2.12;external_dependency} -->
<include>commons-io:commons-io:[2.4]</include> <!-- {x-include-update;cosmos_commons-io:commons-io;external_dependency} -->
<include>org.scala-lang:scala-library:[2.12.10]</include> <!-- {x-include-update;cosmos_org.scala-lang:scala-library;external_dependency} -->
<include>org.scala-lang:scala-library:[2.12.19]</include> <!-- {x-include-update;cosmos_org.scala-lang:scala-library;external_dependency} -->
<include>org.scala-lang.modules:scala-java8-compat_2.12:[0.8.0]</include> <!-- {x-include-update;cosmos_org.scala-lang.modules:scala-java8-compat_2.12;external_dependency} -->
<include>io.projectreactor:reactor-scala-extensions_2.12:[0.8.0]</include> <!-- {x-include-update;cosmos_io.projectreactor:reactor-scala-extensions_2.12;external_dependency} -->
<include>org.scalatest:scalatest_2.12:[3.2.2]</include> <!-- {x-include-update;cosmos_org.scalatest:scalatest_2.12;external_dependency} -->
Expand Down Expand Up @@ -325,9 +374,9 @@
<artifactId>scala-maven-plugin</artifactId>
<version>4.8.1</version> <!-- {x-version-update;cosmos_net.alchim31.maven:scala-maven-plugin;external_dependency} -->
<configuration>
<source>11</source>
<target>11</target>
<scalaVersion>2.12.10</scalaVersion>
<source>1.8</source>
<target>1.8</target>
<scalaVersion>2.12.19</scalaVersion>
</configuration>
<executions>
<execution>
Expand Down Expand Up @@ -738,21 +787,21 @@
<execution>
<id>default-compile</id>
<configuration>
<release>11</release>
<release>8</release>
</configuration>
</execution>
<!-- Here the 'base-compile' execution section of java-lts profile defined in parent pom.client.xml is
overridden. In parent pom, this execution entry enforces java8 release compatibility. The Spark
connectors for Spark 3.0 and above not available in Java8, hence here in this pom we override that
release compact to 11.
connectors for Spark 3.0 and above are available in Java8, hence here in this pom we override that
release compact to 1.8.
-->
<execution>
<id>base-compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration combine.self="override">
<release>11</release>
<release>8</release>
</configuration>
</execution>
<execution>
Expand All @@ -761,8 +810,8 @@
<goal>testCompile</goal>
</goals>
<configuration>
<release>11</release>
<testRelease>11</testRelease>
<release>8</release>
<testRelease>8</testRelease>
</configuration>
</execution>
<execution>
Expand All @@ -771,7 +820,7 @@
<goal>testCompile</goal>
</goals>
<configuration combine.self="override">
<testRelease>11</testRelease>
<testRelease>8</testRelease>
</configuration>
</execution>
</executions>
Expand Down
29 changes: 14 additions & 15 deletions sdk/cosmos/azure-cosmos-spark_3_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
<maven.build.timestamp.format>MM-dd-HH-mm-ss</maven.build.timestamp.format>
<jacoco.min.branchcoverage>0.17</jacoco.min.branchcoverage>
<jacoco.min.linecoverage>0.18</jacoco.min.linecoverage>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jacoco.skip>true</jacoco.skip>
<shadingPrefix>azure_cosmos_spark</shadingPrefix>
<shadingPrefixNetty>azurecosmosspark</shadingPrefixNetty>
Expand All @@ -47,12 +47,11 @@
</developer>
</developers>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version> <!-- {x-version-update;cosmos_org.scala-lang:scala-library;external_dependency} -->
<version>2.12.19</version> <!-- {x-version-update;cosmos_org.scala-lang:scala-library;external_dependency} -->
<scope>provided</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -228,7 +227,7 @@
<include>org.apache.spark:spark-sql_2.12:[3.4.0]</include> <!-- {x-include-update;cosmos-spark_3-4_org.apache.spark:spark-sql_2.12;external_dependency} -->
<include>org.apache.spark:spark-sql_2.12:[3.5.0]</include> <!-- {x-include-update;cosmos-spark_3-5_org.apache.spark:spark-sql_2.12;external_dependency} -->
<include>commons-io:commons-io:[2.4]</include> <!-- {x-include-update;cosmos_commons-io:commons-io;external_dependency} -->
<include>org.scala-lang:scala-library:[2.12.10]</include> <!-- {x-include-update;cosmos_org.scala-lang:scala-library;external_dependency} -->
<include>org.scala-lang:scala-library:[2.12.19]</include> <!-- {x-include-update;cosmos_org.scala-lang:scala-library;external_dependency} -->
<include>org.scala-lang.modules:scala-java8-compat_2.12:[0.8.0]</include> <!-- {x-include-update;cosmos_org.scala-lang.modules:scala-java8-compat_2.12;external_dependency} -->
<include>io.projectreactor:reactor-scala-extensions_2.12:[0.8.0]</include> <!-- {x-include-update;cosmos_io.projectreactor:reactor-scala-extensions_2.12;external_dependency} -->
<include>org.scalatest:scalatest_2.12:[3.2.2]</include> <!-- {x-include-update;cosmos_org.scalatest:scalatest_2.12;external_dependency} -->
Expand Down Expand Up @@ -326,9 +325,9 @@
<artifactId>scala-maven-plugin</artifactId>
<version>4.8.1</version> <!-- {x-version-update;cosmos_net.alchim31.maven:scala-maven-plugin;external_dependency} -->
<configuration>
<source>11</source>
<target>11</target>
<scalaVersion>2.12.10</scalaVersion>
<source>1.8</source>
<target>1.8</target>
<scalaVersion>2.12.19</scalaVersion>
</configuration>
<executions>
<execution>
Expand Down Expand Up @@ -768,21 +767,21 @@
<execution>
<id>default-compile</id>
<configuration>
<release>11</release>
<release>8</release>
</configuration>
</execution>
<!-- Here the 'base-compile' execution section of java-lts profile defined in parent pom.client.xml is
overridden. In parent pom, this execution entry enforces java8 release compatibility. The Spark
connectors for Spark 3.0 and above not available in Java8, hence here in this pom we override that
release compact to 11.
connectors for Spark 3.0 and above are available in Java8, hence here in this pom we override that
release compact to 1.8.
-->
<execution>
<id>base-compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration combine.self="override">
<release>11</release>
<release>8</release>
</configuration>
</execution>
<execution>
Expand All @@ -791,8 +790,8 @@
<goal>testCompile</goal>
</goals>
<configuration>
<release>11</release>
<testRelease>11</testRelease>
<release>8</release>
<testRelease>8</testRelease>
</configuration>
</execution>
<execution>
Expand All @@ -801,7 +800,7 @@
<goal>testCompile</goal>
</goals>
<configuration combine.self="override">
<testRelease>11</testRelease>
<testRelease>8</testRelease>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkContext, TaskContext}
import reactor.core.publisher.Mono
import reactor.core.scheduler.{Scheduler, Schedulers}

import java.time.{Duration, Instant}
import java.util.ConcurrentModificationException
Expand Down Expand Up @@ -52,6 +53,15 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {
this.cleanupIntervalInSeconds,
TimeUnit.SECONDS)

private val AAD_AUTH_BOUNDED_ELASTIC_THREAD_NAME = "cosmos-client-cache-auth-bounded-elastic"
private val TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60 // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS

private val aadAuthBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
AAD_AUTH_BOUNDED_ELASTIC_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)

def apply(cosmosClientConfiguration: CosmosClientConfiguration,
cosmosClientStateHandle: Option[CosmosClientMetadataCachesSnapshot],
calledFrom: String): CosmosClientCacheItem = {
Expand Down Expand Up @@ -615,10 +625,14 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {

private[this] class CosmosAccessTokenCredential(val tokenProvider: (List[String]) =>CosmosAccessToken) extends TokenCredential {
override def getToken(tokenRequestContext: TokenRequestContext): Mono[AccessToken] = {
val token = tokenProvider
.apply(tokenRequestContext.getScopes.asScala.toList)
val returnValue: Mono[AccessToken] = Mono.fromCallable(() => {
val token = tokenProvider
.apply(tokenRequestContext.getScopes.asScala.toList)

new AccessToken(token.token, token.Offset)
})

Mono.just(new AccessToken(token.token, token.Offset))
returnValue.publishOn(aadAuthBoundedElastic)
}
}
}
Expand Down