Skip to content

Commit 3cb3b21

Browse files
committed
[STREAMING] Add Redis Pub/sub streaming support
1 parent 558962a commit 3cb3b21

File tree

12 files changed

+500
-6
lines changed

12 files changed

+500
-6
lines changed

dev/audit-release/audit_release.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ def get_url(url):
105105
"spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
106106
"spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
107107
"spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
108-
"spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
108+
"spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl",
109+
"spark-streaming-redis"
109110
]
110111
modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)
111112

docs/streaming-programming-guide.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ some of the common ones are as follows.
316316
<tr><td> Twitter </td><td> spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr>
317317
<tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} </td></tr>
318318
<tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} </td></tr>
319+
<tr><td> Redis </td><td> spark-streaming-redis_{{site.SCALA_BINARY_VERSION}} </td></tr>
319320
<tr><td></td><td></td></tr>
320321
</table>
321322

@@ -1562,6 +1563,7 @@ package and renamed for better clarity.
15621563
[TwitterUtils](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$),
15631564
[ZeroMQUtils](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), and
15641565
[MQTTUtils](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$)
1566+
[RedisUtils](api/scala/index.html#org.apache.spark.streaming.redis.RedisUtils$)
15651567
- Java docs
15661568
* [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html),
15671569
[JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html) and
@@ -1572,6 +1574,7 @@ package and renamed for better clarity.
15721574
[TwitterUtils](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html),
15731575
[ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
15741576
[MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html)
1577+
[RedisUtils](api/java/index.html?org/apache/spark/streaming/redis/RedisUtils.html)
15751578

15761579
* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
15771580
and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)

external/redis/pom.xml

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.spark</groupId>
23+
<artifactId>spark-parent</artifactId>
24+
<version>1.0.2</version>
25+
<relativePath>../../pom.xml</relativePath>
26+
</parent>
27+
28+
<groupId>org.apache.spark</groupId>
29+
<artifactId>spark-streaming-redis_2.10</artifactId>
30+
<properties>
31+
<sbt.project.name>streaming-redis</sbt.project.name>
32+
</properties>
33+
<packaging>jar</packaging>
34+
<name>Spark Project External Redis</name>
35+
<url>http://spark.apache.org/</url>
36+
37+
<repositories>
38+
<repository>
39+
<id>rediscala</id>
40+
<url>https://raw.github.com/etaty/rediscala-mvn/master/releases/</url>
41+
</repository>
42+
</repositories>
43+
44+
<dependencies>
45+
<dependency>
46+
<groupId>org.apache.spark</groupId>
47+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
48+
<version>${project.version}</version>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.apache.spark</groupId>
52+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
53+
<version>${project.version}</version>
54+
<type>test-jar</type>
55+
<scope>test</scope>
56+
</dependency>
57+
<dependency>
58+
<groupId>com.etaty.rediscala</groupId>
59+
<artifactId>rediscala_${scala.binary.version}</artifactId>
60+
<version>1.3</version>
61+
</dependency>
62+
<dependency>
63+
<groupId>${akka.group}</groupId>
64+
<artifactId>akka-zeromq_${scala.binary.version}</artifactId>
65+
<version>${akka.version}</version>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.scalatest</groupId>
69+
<artifactId>scalatest_${scala.binary.version}</artifactId>
70+
<scope>test</scope>
71+
</dependency>
72+
<dependency>
73+
<groupId>org.scalacheck</groupId>
74+
<artifactId>scalacheck_${scala.binary.version}</artifactId>
75+
<scope>test</scope>
76+
</dependency>
77+
<dependency>
78+
<groupId>junit</groupId>
79+
<artifactId>junit</artifactId>
80+
<scope>test</scope>
81+
</dependency>
82+
<dependency>
83+
<groupId>com.novocode</groupId>
84+
<artifactId>junit-interface</artifactId>
85+
<scope>test</scope>
86+
</dependency>
87+
</dependencies>
88+
<build>
89+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
90+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
91+
<plugins>
92+
<plugin>
93+
<groupId>org.scalatest</groupId>
94+
<artifactId>scalatest-maven-plugin</artifactId>
95+
</plugin>
96+
</plugins>
97+
</build>
98+
</project>
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.redis
19+
20+
import scala.collection.Map
21+
import scala.collection.mutable.HashMap
22+
import scala.collection.JavaConversions._
23+
import scala.reflect.ClassTag
24+
25+
import java.util.Properties
26+
import java.util.concurrent.Executors
27+
import java.io.IOException
28+
import java.net.URI
29+
30+
import redis.RedisPubSub
31+
import redis.api.pubsub._
32+
import scala.concurrent.Await
33+
import scala.concurrent.duration._
34+
import scala.concurrent.ExecutionContext.Implicits.global
35+
36+
import org.apache.spark.Logging
37+
import org.apache.spark.storage.StorageLevel
38+
import org.apache.spark.streaming.StreamingContext
39+
import org.apache.spark.streaming.dstream._
40+
import org.apache.spark.streaming.receiver.Receiver
41+
42+
/**
43+
* Input stream that subscribe messages from a Redis server.
44+
* Uses com.livestream as Redis client https://github.com/Livestream/scredis
45+
* @param redisUrl Url of remote redis server (e.g. redis://host:port/db)
46+
* @param channel channel name to subscribe to
47+
* @param storageLevel RDD storage level.
48+
*/
49+
50+
private[streaming]
51+
class RedisInputDStream(
52+
@transient ssc_ : StreamingContext,
53+
redisUrl: String,
54+
channels: Seq[String],
55+
patterns: Seq[String],
56+
storageLevel: StorageLevel
57+
) extends ReceiverInputDStream[String](ssc_) with Logging {
58+
59+
def getReceiver(): Receiver[String] = {
60+
new RedisReceiver(redisUrl, channels, patterns, storageLevel)
61+
}
62+
}
63+
64+
private[streaming]
65+
class RedisReceiver(
66+
redisUrl: String,
67+
channels: Seq[String],
68+
patterns: Seq[String],
69+
storageLevel: StorageLevel
70+
) extends Receiver[String](storageLevel) {
71+
72+
def onStop() {
73+
// redis.quit()
74+
}
75+
76+
def onStart() {
77+
implicit val akkaSystem = akka.actor.ActorSystem()
78+
79+
// This will fail if given a bad URL
80+
val parsedUrl = new java.net.URI(redisUrl)
81+
82+
// Initializing Redis Client specifying redisUrl
83+
val redisPubSub = RedisPubSub(
84+
host = parsedUrl.getHost,
85+
port = parsedUrl.getPort,
86+
channels = channels,
87+
patterns = patterns,
88+
onMessage = (m: Message) => {
89+
// info(s"got message: ${m.data}")
90+
store(m.data)
91+
}
92+
)
93+
94+
}
95+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.redis
19+
20+
import org.apache.spark.storage.StorageLevel
21+
import org.apache.spark.streaming.StreamingContext
22+
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
23+
import scala.reflect.ClassTag
24+
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
25+
26+
object RedisUtils {
27+
/**
28+
* Create an input stream that receives messages from a Redis Pub/Sub channel.
29+
* @param ssc StreamingContext object
30+
* @param redisUrl Url of remote Redis server
31+
* @param channel Channel to subscribe to
32+
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
33+
*/
34+
def createStream(
35+
ssc: StreamingContext,
36+
redisUrl: String,
37+
channels: Seq[String],
38+
patterns: Seq[String],
39+
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
40+
): ReceiverInputDStream[String] = {
41+
new RedisInputDStream(ssc, redisUrl, channels, patterns, storageLevel)
42+
}
43+
44+
/**
45+
* Create an input stream that receives messages from a Redis Pub/Sub channel.
46+
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
47+
* @param jssc JavaStreamingContext object
48+
* @param redisUrl Url of remote Redis server
49+
* @param channel Channel name to subscribe to
50+
*/
51+
def createStream(
52+
jssc: JavaStreamingContext,
53+
redisUrl: String,
54+
channels: Seq[String],
55+
patterns: Seq[String]
56+
): JavaReceiverInputDStream[String] = {
57+
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
58+
createStream(jssc.ssc, redisUrl, channels, patterns)
59+
}
60+
61+
/**
62+
* Create an input stream that receives messages from a Redis Pub/Sub channel.
63+
* @param jssc JavaStreamingContext object
64+
* @param redisUrl Url of remote Redis server
65+
* @param channel Channel to subscribe to
66+
* @param storageLevel RDD storage level.
67+
*/
68+
def createStream(
69+
jssc: JavaStreamingContext,
70+
redisUrl: String,
71+
channels: Seq[String],
72+
patterns: Seq[String],
73+
storageLevel: StorageLevel
74+
): JavaReceiverInputDStream[String] = {
75+
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
76+
createStream(jssc.ssc, redisUrl, channels, patterns, storageLevel)
77+
}
78+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* Redis receiver for Spark Streaming.
20+
*/
21+
package org.apache.spark.streaming.redis;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming
19+
20+
/**
21+
* Redis receiver for Spark Streaming.
22+
*/
23+
package object redis

0 commit comments

Comments
 (0)