-
Notifications
You must be signed in to change notification settings - Fork 366
Closed
Milestone
Description
Using Spark 2.3, Scala 2.11, Redis 5.0, and have tried mvn repo from spark-redis 'master' branch readme.md & tried version built locally based on master branch, I'm getting the following recurring output when I run my spark streaming test -- it'll parse data from my redis container but it repeatedly shows the following exception -- eventually the application visually appears to be running but no more exception output occurs but it also stops processing any input being sent into my redis container:
. . .
18/09/11 16:18:11 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error receiving data - java.lang.IndexOutOfBoundsException: Index: 1
at java.util.Collections$EmptyList.get(Collections.java:4454)
at com.redislabs.provider.redis.streaming.RedisReceiver$MessageHandler.run(RedisInputDStream.scala:56)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
. . .
My pom.xml contains:
<!-- activate only one of the following depending on mvn vs local -->
<!-- from the maven repo -->
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis</artifactId>
<version>2.3.0</version>
</dependency>
<!-- built and published to local mvn repo -- note I changed the name of my local artifact-->
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-connector</artifactId>
<version>2.3.0</version>
</dependency>
. . .
Project source:
package test.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.sql.{Row,SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
import com.redislabs.provider.redis._
import com.redislabs.provider.redis._
import org.apache.spark.sql.SaveMode
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.postgresql._
object streaming {
def main(args: Array[String]) {
var sparkConf = new SparkConf().setAppName("redis-streaming")
.set("redis.host","server.tech.local")
.set("redis.port","6379")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
try {
val redisStream = ssc.createRedisStreamWithoutListname(Array("UrlList"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
redisStream.print
ssc.start()
ssc.awaitTermination()
} catch {
case iob: java.lang.IndexOutOfBoundsException => {}
}
}
}
build-and-run.sh
#!/bin/bash
mvn package
spark2-submit --master local[4] --class test.streaming.streaming ./target/uber-streaming-1.0-SNAPSHOT.jar
#spark2-submit --class test.streaming.streaming \
# --master yarn \
# --num-executors 2 \
# --executor-memory 1024m \
# --executor-cores 2 \
# --driver-memory 10000m \
# --driver-cores 1 \
# ./target/uber-streaming-1.0-SNAPSHOT.jar
Metadata
Metadata
Assignees
Labels
No labels