Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions connector/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${connect.guava.version}</version>
<scope>compile</scope>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, I'm not sure this is okay or not, @junyuc25 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more on why this is not OK? I think this pattern is also seen in other modules like https://github.com/apache/spark/blob/master/connector/connect/server/pom.xml#L159-L164

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share more about your concern here @dongjoon-hyun ? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being late.

  1. Is this only for Kinesis or for all AWS SKD v2?
  2. Instead of the following, can we use the latest33.0.0-jre like [SPARK-46768][BUILD] Upgrade Guava used by the connect module to 33.0-jre #44795 ?
<aws.kinesis.client.guava.version>32.1.1-jre</aws.kinesis.client.guava.version>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This change is only for the Kinesis connector modules.
  2. Yes I changed it to use the same Guava version as the one used in other module.

Copy link
Contributor

@LuciferYang LuciferYang Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad to see that Kinesis can use the same protobuf-java as other modules, but is this feasible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @LuciferYang, I saw similar patterns in other modules as well: https://github.com/apache/spark/blob/v3.5.0/connector/connect/server/pom.xml#L173-L175. I believe this should work as long as we shade it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, I removed this shaded protobuf-java completely in Apache Spark 4.0.0 independently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dongjoon-hyun for letting me know. Updated the PR to use the same protobuf-java version as specified in <protobuf.version>. I had to change the scope to "compile" to fix ClassNotFoundExceptions.

</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
Expand Down Expand Up @@ -158,6 +169,26 @@
<include>*:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>${spark.shade.packageName}.kinesis.guava</shadedPattern>
<includes>
<include>com.google.common.**</include>
</includes>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>${spark.shade.packageName}.kinesis.protobuf</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
</includes>
</relocation>
<relocation>
<pattern>software.amazon.awssdk</pattern>
<shadedPattern>${spark.shade.packageName}.software.amazon.awssdk</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>*:*</artifact>
Expand Down
43 changes: 39 additions & 4 deletions connector/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,38 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
Copy link
Member

@pan3793 pan3793 Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please clarify the relationship between
software.amazon.kinesis:amazon-kinesis-client and software.amazon.awssdk:kinesis?

Seems they don't share the version, and the latter is hygienic (no third-party dependencies other than software.amazon.awssdk:*)

Handling Jackson/Guava/Protobuf dependencies conflict is always painful, is it possible to provide a similar hygienic artifact for the former to make the downstream project easier to consume?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pan3793, KCL (software.amazon.kinesis:amazon-kinesis-client) is built on top of Kinesis Data Stream API (software.amazon.awssdk:kinesis), and KCL provides additional functionalities such as load balancing, error recovery etc. According to this doc, generally it is recommended to use KCL over Kinesis Data Stream API:https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html .

Perhaps we could try to raise an issue to the KCL repo and ask if they can release a "clean" version of the library with all the third-party dependencies shaded. But currently I'm not aware there is such a hygienic version of KCL. I guess we probably have to live with this issue if we use KCL.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could try to raise an issue to the KCL repo and ask if they can release a "clean" version of the library with all the third-party dependencies shaded.

Would you like to drive this? It will benefit all downstream projects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pan3793, I submitted an ticket to the KCL repo: awslabs/amazon-kinesis-client#1245. Let's see what response we would get. On the other hand, it seems to me that this would be a follow-up task, rather than a blocker for this PR? Please correct me if I am wrong.

<version>${aws.kinesis.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</exclusion>
<exclusion>
<groupId>com.kjetland</groupId>
<artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- The Kinesis-client-library depends on glue-schema-registry which depends on
mbknor-jackson-jsonschema_2.12. As Spark 4.0 dropped support for Scala 2.12, we have to
explicitly import mbknor-jackson-jsonschema_2.13 here until glue-schema-registry is updated
to depend on mbknor-jackson-jsonschema_2.13 -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.java.sdk.version}</version>
<groupId>com.kjetland</groupId>
<artifactId>mbknor-jackson-jsonschema_2.13</artifactId>
<version>${mbknor.jsonschema.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand All @@ -76,6 +100,12 @@
<artifactId>jackson-dataformat-cbor</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>${jaxb.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand All @@ -90,6 +120,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${connect.guava.version}</version>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.examples.streaming;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,8 +39,10 @@
import scala.Tuple2;
import scala.reflect.ClassTag$;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;

/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
Expand All @@ -66,7 +69,7 @@
* There is a companion helper class called KinesisWordProducerASL which puts dummy data
* onto the Kinesis stream.
*
* This code uses the DefaultAWSCredentialsProviderChain to find credentials
* This code uses the DefaultCredentialsProvider to find credentials
* in the following order:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
Expand Down Expand Up @@ -106,11 +109,18 @@ public static void main(String[] args) throws Exception {
String endpointUrl = args[2];

// Create a Kinesis client in order to determine the number of shards for the given stream
AmazonKinesisClient kinesisClient =
new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
kinesisClient.setEndpoint(endpointUrl);
KinesisClient kinesisClient =
KinesisClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.endpointOverride(URI.create(endpointUrl))
.httpClientBuilder(ApacheHttpClient.builder())
.build();

DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build();
int numShards =
kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
kinesisClient.describeStream(describeStreamRequest).streamDescription().shards().size();


// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.io.Serializable;
import java.util.Date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.examples.streaming

import scala.jdk.CollectionConverters._

import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesis
import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata

private[streaming] object KinesisExampleUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
val kinesisServiceMetadata = new KinesisServiceMetadata()
kinesisServiceMetadata.regions
.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.find(r => kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about r => kinesisServiceMetadata.endpointFor(r).equals(uri)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the URI objects would be different I we make this change. Comparing these two different URI causes test failure.

Stacktrace:

 Cause: java.lang.IllegalArgumentException: Could not resolve region for endpoint: https://kinesis.us-west-2.amazonaws.com
  at org.apache.spark.streaming.kinesis.KinesisTestUtils$.$anonfun$getRegionNameByEndpoint$3(KinesisTestUtils.scala:237)
  at scala.Option.getOrElse(Option.scala:201)
  at org.apache.spark.streaming.kinesis.KinesisTestUtils$.getRegionNameByEndpoint(KinesisTestUtils.scala:237)
  at org.apache.spark.streaming.kinesis.KinesisStreamTests.<init>(KinesisStreamSuite.scala:51)
  at org.apache.spark.streaming.kinesis.WithoutAggregationKinesisStreamSuite.<init>(KinesisStreamSuite.scala:434)

So to avoid this failure, I would prefer to keep this line. kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost)

.map(_.id)
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
// scalastyle:off println
package org.apache.spark.examples.streaming

import java.net.URI
import java.nio.ByteBuffer

import scala.util.Random

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.{DescribeStreamRequest, PutRecordRequest}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -101,13 +104,22 @@ object KinesisWordCountASL extends Logging {

// Determine the number of shards from the stream using the low-level Kinesis Client
// from the AWS Java SDK.
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
require(credentials != null,
val credentialsProvider = DefaultCredentialsProvider.create
require(credentialsProvider.resolveCredentials() != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
"in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
val kinesisClient = KinesisClient.builder()
.credentialsProvider(credentialsProvider)
.endpointOverride(URI.create(endpointUrl))
.httpClientBuilder(ApacheHttpClient.builder())
.build()
val describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build()
val numShards = kinesisClient.describeStream(describeStreamRequest)
.streamDescription
.shards
.size


// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
Expand Down Expand Up @@ -221,8 +233,11 @@ object KinesisWordProducerASL {
val totals = scala.collection.mutable.Map[String, Int]()

// Create the low-level Kinesis Client from the AWS Java SDK.
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint)
val kinesisClient = KinesisClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.endpointOverride(URI.create(endpoint))
.httpClientBuilder(ApacheHttpClient.builder())
.build()

println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per record")
Expand All @@ -247,12 +262,14 @@ object KinesisWordProducerASL {
val partitionKey = s"partitionKey-$recordNum"

// Create a PutRecordRequest with an Array[Byte] version of the data
val putRecordRequest = new PutRecordRequest().withStreamName(stream)
.withPartitionKey(partitionKey)
.withData(ByteBuffer.wrap(data.getBytes()))
val putRecordRequest = PutRecordRequest.builder()
.streamName(stream)
.partitionKey(partitionKey)
.data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(data.getBytes())))
.build()

// Put the record onto the stream and capture the PutRecordResult
val putRecordResult = kinesisClient.putRecord(putRecordRequest)
kinesisClient.putRecord(putRecordRequest)
}

// Sleep for a second
Expand Down
Loading