From 0b79c39a3cb8db0447a7dab0e92857d0d5c79665 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 25 Dec 2015 15:19:02 -0800 Subject: [PATCH 1/5] Fix compiler warnings due to @transient annotations. --- .../spark/streaming/kinesis/KinesisBackedBlockRDD.scala | 6 +++--- .../spark/streaming/kinesis/KinesisInputDStream.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 691c1790b207f..b0a3cfc652d17 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -70,12 +70,12 @@ class KinesisBackedBlockRDDPartition( */ private[kinesis] class KinesisBackedBlockRDD[T: ClassTag]( - @transient sc: SparkContext, + sc: SparkContext, val regionName: String, val endpointUrl: String, - @transient blockIds: Array[BlockId], + @transient private val blockIds: Array[BlockId], @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], - @transient isBlockIdValid: Array[Boolean] = Array.empty, + @transient private isBlockIdValid: Array[Boolean] = Array.empty, val retryTimeoutMs: Int = 10000, val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _, val awsCredentialsOption: Option[SerializableAWSCredentials] = None diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index 72ab6357a53b0..3321c7527edb4 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -30,7 +30,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo import org.apache.spark.streaming.{Duration, StreamingContext, Time} private[kinesis] class KinesisInputDStream[T: ClassTag]( - @transient _ssc: StreamingContext, + _ssc: StreamingContext, streamName: String, endpointUrl: String, regionName: String, From 70e7032a2fac27e806c3249c84981cf4bc99ff90 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 25 Dec 2015 17:43:33 -0800 Subject: [PATCH 2/5] Update KinesisBackedBlockRDD.scala --- .../apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index b0a3cfc652d17..24e39fc658de6 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -75,7 +75,7 @@ class KinesisBackedBlockRDD[T: ClassTag]( val endpointUrl: String, @transient private val blockIds: Array[BlockId], @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], - @transient private isBlockIdValid: Array[Boolean] = Array.empty, + @transient private val isBlockIdValid: Array[Boolean] = Array.empty, val retryTimeoutMs: Int = 10000, val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _, val awsCredentialsOption: Option[SerializableAWSCredentials] = None From afe11f225e60d054a4f72681fba53cf220ecadeb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 25 Dec 2015 19:14:28 -0800 Subject: [PATCH 3/5] Update KinesisBackedBlockRDD.scala --- .../apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 24e39fc658de6..5a1d60c2e06de 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -73,7 +73,7 @@ class KinesisBackedBlockRDD[T: ClassTag]( sc: SparkContext, val regionName: String, val endpointUrl: String, - @transient private val blockIds: Array[BlockId], + @transient val blockIds: Array[BlockId], @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient private val isBlockIdValid: Array[Boolean] = Array.empty, val retryTimeoutMs: Int = 10000, From 0a3f671883923b67d872a025092e07b3d7ca358c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 25 Dec 2015 19:37:08 -0800 Subject: [PATCH 4/5] Update KinesisBackedBlockRDD.scala --- .../apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 5a1d60c2e06de..1bb07f1599df0 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -73,7 +73,7 @@ class KinesisBackedBlockRDD[T: ClassTag]( sc: SparkContext, val regionName: String, val endpointUrl: String, - @transient val blockIds: Array[BlockId], + blockIds: Array[BlockId], @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient private val isBlockIdValid: Array[Boolean] = Array.empty, val retryTimeoutMs: Int = 10000, From 422ef494b56f9ac4c770311743fb2a01a9d19ae1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 28 Dec 2015 11:49:10 -0800 Subject: [PATCH 5/5] Rename blockIds to _blockIds --- .../streaming/kinesis/KinesisBackedBlockRDD.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 1bb07f1599df0..3996f168e69ee 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -73,23 +73,23 @@ class KinesisBackedBlockRDD[T: ClassTag]( sc: SparkContext, val regionName: String, val endpointUrl: String, - blockIds: Array[BlockId], + @transient private val _blockIds: Array[BlockId], @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient private val isBlockIdValid: Array[Boolean] = Array.empty, val retryTimeoutMs: Int = 10000, val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _, val awsCredentialsOption: Option[SerializableAWSCredentials] = None - ) extends BlockRDD[T](sc, blockIds) { + ) extends BlockRDD[T](sc, _blockIds) { - require(blockIds.length == arrayOfseqNumberRanges.length, + require(_blockIds.length == arrayOfseqNumberRanges.length, "Number of blockIds is not equal to the number of sequence number ranges") override def isValid(): Boolean = true override def getPartitions: Array[Partition] = { - Array.tabulate(blockIds.length) { i => + Array.tabulate(_blockIds.length) { i => val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i) - new KinesisBackedBlockRDDPartition(i, blockIds(i), isValid, arrayOfseqNumberRanges(i)) + new KinesisBackedBlockRDDPartition(i, _blockIds(i), isValid, arrayOfseqNumberRanges(i)) } }