Skip to content

Commit 0e68330

Browse files
yashs360brkyvz
authored andcommitted
[SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific timestamp
## What changes were proposed in this pull request? Kinesis client can resume from a specified timestamp while creating a stream. We should have option to pass a timestamp in config to allow kinesis to resume from the given timestamp. The patch introduces a new `KinesisInitialPositionInStream` that takes the `InitialPositionInStream` with the `timestamp` information that can be used to resume kinesis fetches from the provided timestamp. ## How was this patch tested? Unit Tests cc : budde brkyvz Author: Yash Sharma <[email protected]> Closes #18029 from yssharma/ysharma/kcl_resume.
1 parent be03d3a commit 0e68330

File tree

8 files changed

+264
-49
lines changed

8 files changed

+264
-49
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.kinesis;
18+
19+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
20+
21+
import java.io.Serializable;
22+
import java.util.Date;
23+
24+
/**
25+
* A java wrapper for exposing [[InitialPositionInStream]]
26+
* to the corresponding Kinesis readers.
27+
*/
28+
interface KinesisInitialPosition {
29+
InitialPositionInStream getPosition();
30+
}
31+
32+
public class KinesisInitialPositions {
33+
public static class Latest implements KinesisInitialPosition, Serializable {
34+
public Latest() {}
35+
36+
@Override
37+
public InitialPositionInStream getPosition() {
38+
return InitialPositionInStream.LATEST;
39+
}
40+
}
41+
42+
public static class TrimHorizon implements KinesisInitialPosition, Serializable {
43+
public TrimHorizon() {}
44+
45+
@Override
46+
public InitialPositionInStream getPosition() {
47+
return InitialPositionInStream.TRIM_HORIZON;
48+
}
49+
}
50+
51+
public static class AtTimestamp implements KinesisInitialPosition, Serializable {
52+
private Date timestamp;
53+
54+
public AtTimestamp(Date timestamp) {
55+
this.timestamp = timestamp;
56+
}
57+
58+
@Override
59+
public InitialPositionInStream getPosition() {
60+
return InitialPositionInStream.AT_TIMESTAMP;
61+
}
62+
63+
public Date getTimestamp() {
64+
return timestamp;
65+
}
66+
}
67+
68+
69+
/**
70+
* Returns instance of [[KinesisInitialPosition]] based on the passed [[InitialPositionInStream]].
71+
* This method is used in KinesisUtils for translating the InitialPositionInStream
72+
* to InitialPosition. This function would be removed when we deprecate the KinesisUtils.
73+
*
74+
* @return [[InitialPosition]]
75+
*/
76+
public static KinesisInitialPosition fromKinesisInitialPosition(
77+
InitialPositionInStream initialPositionInStream) throws UnsupportedOperationException {
78+
if (initialPositionInStream == InitialPositionInStream.LATEST) {
79+
return new Latest();
80+
} else if (initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) {
81+
return new TrimHorizon();
82+
} else {
83+
// InitialPositionInStream.AT_TIMESTAMP is not supported.
84+
// Use InitialPosition.atTimestamp(timestamp) instead.
85+
throw new UnsupportedOperationException(
86+
"Only InitialPositionInStream.LATEST and InitialPositionInStream.TRIM_HORIZON " +
87+
"supported in initialPositionInStream(). Please use the initialPosition() from " +
88+
"builder API in KinesisInputDStream for using InitialPositionInStream.AT_TIMESTAMP");
89+
}
90+
}
91+
}

external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import scala.util.Random
2424

2525
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
2626
import com.amazonaws.services.kinesis.AmazonKinesisClient
27-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
2827
import com.amazonaws.services.kinesis.model.PutRecordRequest
2928
import org.apache.log4j.{Level, Logger}
3029

@@ -33,9 +32,9 @@ import org.apache.spark.internal.Logging
3332
import org.apache.spark.storage.StorageLevel
3433
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
3534
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
35+
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
3636
import org.apache.spark.streaming.kinesis.KinesisInputDStream
3737

38-
3938
/**
4039
* Consumes messages from a Amazon Kinesis streams and does wordcount.
4140
*
@@ -139,7 +138,7 @@ object KinesisWordCountASL extends Logging {
139138
.streamName(streamName)
140139
.endpointUrl(endpointUrl)
141140
.regionName(regionName)
142-
.initialPositionInStream(InitialPositionInStream.LATEST)
141+
.initialPosition(new Latest())
143142
.checkpointAppName(appName)
144143
.checkpointInterval(kinesisCheckpointInterval)
145144
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
2828
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
2929
import org.apache.spark.streaming.api.java.JavaStreamingContext
3030
import org.apache.spark.streaming.dstream.ReceiverInputDStream
31+
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
3132
import org.apache.spark.streaming.receiver.Receiver
3233
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
3334

@@ -36,7 +37,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
3637
val streamName: String,
3738
val endpointUrl: String,
3839
val regionName: String,
39-
val initialPositionInStream: InitialPositionInStream,
40+
val initialPosition: KinesisInitialPosition,
4041
val checkpointAppName: String,
4142
val checkpointInterval: Duration,
4243
val _storageLevel: StorageLevel,
@@ -77,7 +78,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
7778
}
7879

7980
override def getReceiver(): Receiver[T] = {
80-
new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
81+
new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition,
8182
checkpointAppName, checkpointInterval, _storageLevel, messageHandler,
8283
kinesisCreds, dynamoDBCreds, cloudWatchCreds)
8384
}
@@ -100,7 +101,7 @@ object KinesisInputDStream {
100101
// Params with defaults
101102
private var endpointUrl: Option[String] = None
102103
private var regionName: Option[String] = None
103-
private var initialPositionInStream: Option[InitialPositionInStream] = None
104+
private var initialPosition: Option[KinesisInitialPosition] = None
104105
private var checkpointInterval: Option[Duration] = None
105106
private var storageLevel: Option[StorageLevel] = None
106107
private var kinesisCredsProvider: Option[SparkAWSCredentials] = None
@@ -180,16 +181,32 @@ object KinesisInputDStream {
180181
this
181182
}
182183

184+
/**
185+
* Sets the initial position data is read from in the Kinesis stream. Defaults to
186+
* [[KinesisInitialPositions.Latest]] if no custom value is specified.
187+
*
188+
* @param initialPosition [[KinesisInitialPosition]] value specifying where Spark Streaming
189+
* will start reading records in the Kinesis stream from
190+
* @return Reference to this [[KinesisInputDStream.Builder]]
191+
*/
192+
def initialPosition(initialPosition: KinesisInitialPosition): Builder = {
193+
this.initialPosition = Option(initialPosition)
194+
this
195+
}
196+
183197
/**
184198
* Sets the initial position data is read from in the Kinesis stream. Defaults to
185199
* [[InitialPositionInStream.LATEST]] if no custom value is specified.
200+
* This function would be removed when we deprecate the KinesisUtils.
186201
*
187202
* @param initialPosition InitialPositionInStream value specifying where Spark Streaming
188203
* will start reading records in the Kinesis stream from
189204
* @return Reference to this [[KinesisInputDStream.Builder]]
190205
*/
206+
@deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")
191207
def initialPositionInStream(initialPosition: InitialPositionInStream): Builder = {
192-
initialPositionInStream = Option(initialPosition)
208+
this.initialPosition = Option(
209+
KinesisInitialPositions.fromKinesisInitialPosition(initialPosition))
193210
this
194211
}
195212

@@ -266,7 +283,7 @@ object KinesisInputDStream {
266283
getRequiredParam(streamName, "streamName"),
267284
endpointUrl.getOrElse(DEFAULT_KINESIS_ENDPOINT_URL),
268285
regionName.getOrElse(DEFAULT_KINESIS_REGION_NAME),
269-
initialPositionInStream.getOrElse(DEFAULT_INITIAL_POSITION_IN_STREAM),
286+
initialPosition.getOrElse(DEFAULT_INITIAL_POSITION),
270287
getRequiredParam(checkpointAppName, "checkpointAppName"),
271288
checkpointInterval.getOrElse(ssc.graph.batchDuration),
272289
storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
@@ -293,7 +310,6 @@ object KinesisInputDStream {
293310
* Creates a [[KinesisInputDStream.Builder]] for constructing [[KinesisInputDStream]] instances.
294311
*
295312
* @since 2.2.0
296-
*
297313
* @return [[KinesisInputDStream.Builder]] instance
298314
*/
299315
def builder: Builder = new Builder
@@ -309,7 +325,6 @@ object KinesisInputDStream {
309325
private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String =
310326
"https://kinesis.us-east-1.amazonaws.com"
311327
private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
312-
private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: InitialPositionInStream =
313-
InitialPositionInStream.LATEST
328+
private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()
314329
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
315330
}

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ import scala.collection.mutable
2424
import scala.util.control.NonFatal
2525

2626
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
27-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
27+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker}
2828
import com.amazonaws.services.kinesis.model.Record
2929

3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
3232
import org.apache.spark.streaming.Duration
33+
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp
3334
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
3435
import org.apache.spark.util.Utils
3536

@@ -56,12 +57,13 @@ import org.apache.spark.util.Utils
5657
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
5758
* @param regionName Region name used by the Kinesis Client Library for
5859
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
59-
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
60-
* worker's initial starting position in the stream.
61-
* The values are either the beginning of the stream
62-
* per Kinesis' limit of 24 hours
63-
* (InitialPositionInStream.TRIM_HORIZON) or
64-
* the tip of the stream (InitialPositionInStream.LATEST).
60+
* @param initialPosition Instance of [[KinesisInitialPosition]]
61+
* In the absence of Kinesis checkpoint info, this is the
62+
* worker's initial starting position in the stream.
63+
* The values are either the beginning of the stream
64+
* per Kinesis' limit of 24 hours
65+
* ([[KinesisInitialPositions.TrimHorizon]]) or
66+
* the tip of the stream ([[KinesisInitialPositions.Latest]]).
6567
* @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
6668
* by the Kinesis Client Library. If you change the App name or Stream name,
6769
* the KCL will throw errors. This usually requires deleting the backing
@@ -83,7 +85,7 @@ private[kinesis] class KinesisReceiver[T](
8385
val streamName: String,
8486
endpointUrl: String,
8587
regionName: String,
86-
initialPositionInStream: InitialPositionInStream,
88+
initialPosition: KinesisInitialPosition,
8789
checkpointAppName: String,
8890
checkpointInterval: Duration,
8991
storageLevel: StorageLevel,
@@ -148,18 +150,29 @@ private[kinesis] class KinesisReceiver[T](
148150

149151
kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
150152
val kinesisProvider = kinesisCreds.provider
151-
val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
152-
checkpointAppName,
153-
streamName,
154-
kinesisProvider,
155-
dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
156-
cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
157-
workerId)
153+
154+
val kinesisClientLibConfiguration = {
155+
val baseClientLibConfiguration = new KinesisClientLibConfiguration(
156+
checkpointAppName,
157+
streamName,
158+
kinesisProvider,
159+
dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
160+
cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
161+
workerId)
158162
.withKinesisEndpoint(endpointUrl)
159-
.withInitialPositionInStream(initialPositionInStream)
163+
.withInitialPositionInStream(initialPosition.getPosition)
160164
.withTaskBackoffTimeMillis(500)
161165
.withRegionName(regionName)
162166

167+
// Update the Kinesis client lib config with timestamp
168+
// if InitialPositionInStream.AT_TIMESTAMP is passed
169+
initialPosition match {
170+
case ts: AtTimestamp =>
171+
baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts.getTimestamp)
172+
case _ => baseClientLibConfiguration
173+
}
174+
}
175+
163176
/*
164177
* RecordProcessorFactory creates impls of IRecordProcessor.
165178
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ object KinesisUtils {
7373
// Setting scope to override receiver stream's scope of "receiver stream"
7474
ssc.withNamedScope("kinesis stream") {
7575
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
76-
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
76+
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
77+
kinesisAppName, checkpointInterval, storageLevel,
7778
cleanedHandler, DefaultCredentials, None, None)
7879
}
7980
}
@@ -129,7 +130,8 @@ object KinesisUtils {
129130
awsAccessKeyId = awsAccessKeyId,
130131
awsSecretKey = awsSecretKey)
131132
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
132-
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
133+
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
134+
kinesisAppName, checkpointInterval, storageLevel,
133135
cleanedHandler, kinesisCredsProvider, None, None)
134136
}
135137
}
@@ -198,7 +200,8 @@ object KinesisUtils {
198200
awsAccessKeyId = awsAccessKeyId,
199201
awsSecretKey = awsSecretKey))
200202
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
201-
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
203+
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
204+
kinesisAppName, checkpointInterval, storageLevel,
202205
cleanedHandler, kinesisCredsProvider, None, None)
203206
}
204207
}
@@ -243,7 +246,8 @@ object KinesisUtils {
243246
// Setting scope to override receiver stream's scope of "receiver stream"
244247
ssc.withNamedScope("kinesis stream") {
245248
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
246-
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
249+
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
250+
kinesisAppName, checkpointInterval, storageLevel,
247251
KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None)
248252
}
249253
}
@@ -293,7 +297,8 @@ object KinesisUtils {
293297
awsAccessKeyId = awsAccessKeyId,
294298
awsSecretKey = awsSecretKey)
295299
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
296-
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
300+
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
301+
kinesisAppName, checkpointInterval, storageLevel,
297302
KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None)
298303
}
299304
}

0 commit comments

Comments
 (0)