Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions connector/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
</dependency>
<!-- This dependency is necessary for JDK 9 and above -->
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1</version>
Copy link
Member

@pan3793 pan3793 Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

root pom.xml defines this dep with <version>2.2.11</version> in dependencyManagement, does that version work? if so, please remove the version overriding here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pan3793 Thank you, good catch. Fixed it.

<scope>test</scope>
</dependency>
<!-- manage this up explicitly to match Spark; com.amazonaws:aws-java-sdk-pom specifies
2.6.7 but says we can manage it up -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import com.amazonaws.services.kinesis.model._
import com.amazonaws.waiters.WaiterParameters

import org.apache.spark.internal.Logging

Expand Down Expand Up @@ -60,6 +61,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
client
}

private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()

private lazy val dynamoDB = {
val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
Expand Down Expand Up @@ -183,18 +186,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
}

private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
val startTimeNs = System.nanoTime()
while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
describeStream(streamNameToWaitFor).foreach { description =>
val streamStatus = description.getStreamStatus()
logDebug(s"\t- current state: $streamStatus\n")
if ("ACTIVE".equals(streamStatus)) {
return
}
}
}
require(false, s"Stream $streamName never became active")
val describeStreamRequest = new DescribeStreamRequest()
.withStreamName(streamNameToWaitFor)
streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
}
}

Expand Down