Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions connector/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@
<include>com.google.protobuf.**</include>
</includes>
</relocation>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>${spark.shade.packageName}.kinesis.guava</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand Down
26 changes: 21 additions & 5 deletions connector/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,25 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws.kinesis.client.version}</version>
<version>${aws.kinesis.client.v2.version}</version>
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.java.sdk.version}</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down Expand Up @@ -90,6 +101,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.examples.streaming;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,8 +39,10 @@
import scala.Tuple2;
import scala.reflect.ClassTag$;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;

/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
Expand All @@ -66,7 +69,7 @@
* There is a companion helper class called KinesisWordProducerASL which puts dummy data
* onto the Kinesis stream.
*
* This code uses the DefaultAWSCredentialsProviderChain to find credentials
* This code uses the DefaultCredentialsProvider to find credentials
* in the following order:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
Expand Down Expand Up @@ -106,11 +109,18 @@ public static void main(String[] args) throws Exception {
String endpointUrl = args[2];

// Create a Kinesis client in order to determine the number of shards for the given stream
AmazonKinesisClient kinesisClient =
new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
kinesisClient.setEndpoint(endpointUrl);
KinesisClient kinesisClient =
KinesisClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.endpointOverride(URI.create(endpointUrl))
.httpClientBuilder(ApacheHttpClient.builder())
.build();

DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build();
int numShards =
kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
kinesisClient.describeStream(describeStreamRequest).streamDescription().shards().size();


// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.spark.streaming.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

import software.amazon.kinesis.common.InitialPositionInStream;

import java.io.Serializable;
import java.util.Date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.examples.streaming

import scala.collection.JavaConverters._

import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesis
import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata

private[streaming] object KinesisExampleUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
val kinesisServiceMetadata = new KinesisServiceMetadata()
kinesisServiceMetadata.regions()
.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.find(r => kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
.map(_.id())
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
// scalastyle:off println
package org.apache.spark.examples.streaming

import java.net.URI
import java.nio.ByteBuffer

import scala.util.Random

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.{DescribeStreamRequest, PutRecordRequest}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -101,13 +104,18 @@ object KinesisWordCountASL extends Logging {

// Determine the number of shards from the stream using the low-level Kinesis Client
// from the AWS Java SDK.
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
require(credentials != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val kinesisClient = KinesisClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.endpointOverride(URI.create(endpointUrl))
.httpClientBuilder(ApacheHttpClient.builder())
.build()
val describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build()
val numShards = kinesisClient.describeStream(describeStreamRequest)
.streamDescription()
.shards()
.size


// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
Expand Down Expand Up @@ -221,8 +229,11 @@ object KinesisWordProducerASL {
val totals = scala.collection.mutable.Map[String, Int]()

// Create the low-level Kinesis Client from the AWS Java SDK.
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint)
val kinesisClient = KinesisClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.endpointOverride(URI.create(endpoint))
.httpClientBuilder(ApacheHttpClient.builder())
.build()

println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per record")
Expand All @@ -247,12 +258,14 @@ object KinesisWordProducerASL {
val partitionKey = s"partitionKey-$recordNum"

// Create a PutRecordRequest with an Array[Byte] version of the data
val putRecordRequest = new PutRecordRequest().withStreamName(stream)
.withPartitionKey(partitionKey)
.withData(ByteBuffer.wrap(data.getBytes()))
val putRecordRequest = PutRecordRequest.builder()
.streamName(stream)
.partitionKey(partitionKey)
.data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(data.getBytes())))
.build()

// Put the record onto the stream and capture the PutRecordResult
val putRecordResult = kinesisClient.putRecord(putRecordRequest)
kinesisClient.putRecord(putRecordRequest)
}

// Sleep for a second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@

package org.apache.spark.streaming.kinesis

import java.net.URI
import java.util
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.amazonaws.auth.AWSCredentials
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
import com.amazonaws.services.kinesis.model._
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.{GetRecordsRequest, GetRecordsResponse, GetShardIteratorRequest, GetShardIteratorResponse, ProvisionedThroughputExceededException, ShardIteratorType}
import software.amazon.kinesis.retrieval.{AggregatorUtil, KinesisClientRecord}

import org.apache.spark._
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -83,7 +87,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
@transient private val _blockIds: Array[BlockId],
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient private val isBlockIdValid: Array[Boolean] = Array.empty,
val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _,
val messageHandler: KinesisClientRecord => T = KinesisInputDStream.defaultMessageHandler _,
val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
val kinesisReadConfigs: KinesisReadConfigurations = KinesisReadConfigurations()
) extends BlockRDD[T](sc, _blockIds) {
Expand Down Expand Up @@ -111,9 +115,9 @@ class KinesisBackedBlockRDD[T: ClassTag](
}

def getBlockFromKinesis(): Iterator[T] = {
val credentials = kinesisCreds.provider.getCredentials
val credentialsProvider = kinesisCreds.provider
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
new KinesisSequenceRangeIterator(credentialsProvider, endpointUrl, regionName,
range, kinesisReadConfigs).map(messageHandler)
}
}
Expand All @@ -133,26 +137,30 @@ class KinesisBackedBlockRDD[T: ClassTag](
*/
private[kinesis]
class KinesisSequenceRangeIterator(
credentials: AWSCredentials,
credentialsProvider: AwsCredentialsProvider,
endpointUrl: String,
regionId: String,
range: SequenceNumberRange,
kinesisReadConfigs: KinesisReadConfigurations) extends NextIterator[Record] with Logging {

private val client = new AmazonKinesisClient(credentials)
kinesisReadConfigs: KinesisReadConfigurations)
extends NextIterator[KinesisClientRecord] with Logging {

private val client = KinesisClient.builder()
.credentialsProvider(credentialsProvider)
.region(Region.of(regionId))
.endpointOverride(URI.create(endpointUrl))
.httpClientBuilder(ApacheHttpClient.builder())
.build()
private val streamName = range.streamName
private val shardId = range.shardId
// AWS limits to maximum of 10k records per get call
private val maxGetRecordsLimit = 10000

private var toSeqNumberReceived = false
private var lastSeqNumber: String = null
private var internalIterator: Iterator[Record] = null

client.setEndpoint(endpointUrl)
private var internalIterator: Iterator[KinesisClientRecord] = null

override protected def getNext(): Record = {
var nextRecord: Record = null
override protected def getNext(): KinesisClientRecord = {
var nextRecord: KinesisClientRecord = null
if (toSeqNumberReceived) {
finished = true
} else {
Expand Down Expand Up @@ -182,11 +190,11 @@ class KinesisSequenceRangeIterator(

// Get the record, copy the data into a byte array and remember its sequence number
nextRecord = internalIterator.next()
lastSeqNumber = nextRecord.getSequenceNumber()
lastSeqNumber = nextRecord.sequenceNumber()

// If the this record's sequence number matches the stopping sequence number, then make sure
// the iterator is marked finished next time getNext() is called
if (nextRecord.getSequenceNumber == range.toSeqNumber) {
if (nextRecord.sequenceNumber == range.toSeqNumber) {
toSeqNumberReceived = true
}
}
Expand All @@ -195,7 +203,7 @@ class KinesisSequenceRangeIterator(
}

override protected def close(): Unit = {
client.shutdown()
client.close()
}

/**
Expand All @@ -204,7 +212,7 @@ class KinesisSequenceRangeIterator(
private def getRecords(
iteratorType: ShardIteratorType,
seqNum: String,
recordCount: Int): Iterator[Record] = {
recordCount: Int): Iterator[KinesisClientRecord] = {
val shardIterator = getKinesisIterator(iteratorType, seqNum)
val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount)
result._1
Expand All @@ -216,19 +224,22 @@ class KinesisSequenceRangeIterator(
*/
private def getRecordsAndNextKinesisIterator(
shardIterator: String,
recordCount: Int): (Iterator[Record], String) = {
val getRecordsRequest = new GetRecordsRequest
getRecordsRequest.setRequestCredentials(credentials)
getRecordsRequest.setShardIterator(shardIterator)
getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit))
val getRecordsResult = retryOrTimeout[GetRecordsResult](
recordCount: Int): (Iterator[KinesisClientRecord], String) = {
val getRecordsRequest = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(Math.min(recordCount, this.maxGetRecordsLimit))
.build()
val getRecordsResponse = retryOrTimeout[GetRecordsResponse](
s"getting records using shard iterator") {
client.getRecords(getRecordsRequest)
}
val aggregatorUtil = new AggregatorUtil()
val records = new util.ArrayList[KinesisClientRecord]()
getRecordsResponse.records().forEach(r => records.add(KinesisClientRecord.fromRecord(r)))
// De-aggregate records, if KPL was used in producing the records. The KCL automatically
// handles de-aggregation during regular operation. This code path is used during recovery
val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
(recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
val recordIterator = aggregatorUtil.deaggregate(records)
(recordIterator.iterator.asScala, getRecordsResponse.nextShardIterator)
}

/**
Expand All @@ -238,17 +249,18 @@ class KinesisSequenceRangeIterator(
private def getKinesisIterator(
iteratorType: ShardIteratorType,
sequenceNumber: String): String = {
val getShardIteratorRequest = new GetShardIteratorRequest
getShardIteratorRequest.setRequestCredentials(credentials)
getShardIteratorRequest.setStreamName(streamName)
getShardIteratorRequest.setShardId(shardId)
getShardIteratorRequest.setShardIteratorType(iteratorType.toString)
getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber)
val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult](
val getShardIteratorRequest = GetShardIteratorRequest.builder()
.streamName(streamName)
.shardId(shardId)
.shardIteratorType(iteratorType)
.startingSequenceNumber(sequenceNumber)
.build()

val getShardIteratorResponse = retryOrTimeout[GetShardIteratorResponse](
s"getting shard iterator from sequence number $sequenceNumber") {
client.getShardIterator(getShardIteratorRequest)
}
getShardIteratorResult.getShardIterator
getShardIteratorResponse.shardIterator
}

/** Helper method to retry Kinesis API request with exponential backoff and timeouts */
Expand Down
Loading