Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

fix #106 Gearpump Redis Integration #11

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.example.redis

import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.embedded.EmbeddedCluster
import org.apache.gearpump.cluster.main.ArgumentsParser
import org.apache.gearpump.streaming.redis.{RedisSink, RedisSource}
import org.apache.gearpump.streaming.sink.DataSinkProcessor
import org.apache.gearpump.streaming.source.DataSourceProcessor
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.streaming.{Processor, StreamApplication}
import org.apache.gearpump.util.Graph._
import org.apache.gearpump.util.{AkkaApp, Graph}

class UpperProcessor(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {

import taskContext.output

override def onNext(message: Message): Unit = {
Option(message.msg) match {
case Some(msg) =>
val upper = msg.asInstanceOf[String].toUpperCase
LOG.info(s"to Upper $upper")
output(new Message(upper, message.timestamp))
case None => LOG.warn("Empty String")
}
}
}

object RedisSourceSinkExample extends AkkaApp with ArgumentsParser {
override def main(akkaConf: Config, args: Array[String]): Unit = {
val cluster = new EmbeddedCluster(akkaConf: Config)
cluster.start()

val context = cluster.newClientContext
implicit val actorSystem = context.system

val source = DataSourceProcessor(new RedisSource(channel = "channel.in"), 1)
val upper = Processor[UpperProcessor](1)
val sink = DataSinkProcessor(new RedisSink(channel = "channel.out"), 1)
val dag = source ~> upper ~> sink
val app = StreamApplication("RedisSourceSink", Graph(dag), UserConfig.empty)

context.submit(app)
Thread.sleep(600 * 1000)
context.close()
cluster.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.example.redis

import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.embedded.EmbeddedCluster
import org.apache.gearpump.cluster.main.ArgumentsParser
import org.apache.gearpump.streaming.redis.{RedisSource, RedisStorage}
import org.apache.gearpump.streaming.sink.DataSinkProcessor
import org.apache.gearpump.streaming.source.DataSourceProcessor
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.streaming.{Processor, StreamApplication}
import org.apache.gearpump.util.Graph._
import org.apache.gearpump.util.{AkkaApp, Graph}

class UpperProcessor(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {

import taskContext.output

override def onNext(message: Message): Unit = {
Option(message.msg) match {
case Some(msg) =>
val upper = msg.asInstanceOf[String].toUpperCase
LOG.info(s"to Upper $upper")
output(new Message(upper, message.timestamp))
case None => LOG.warn("Empty String")
}
}
}

object RedisSourceStorageExample extends AkkaApp with ArgumentsParser {
override def main(akkaConf: Config, args: Array[String]): Unit = {
val cluster = new EmbeddedCluster(akkaConf: Config)
cluster.start()

val context = cluster.newClientContext
implicit val actorSystem = context.system

val source = DataSourceProcessor(new RedisSource(channel = "channel.in"), 1)
val upper = Processor[UpperProcessor](1)
val sink = DataSinkProcessor(new RedisStorage(), 1)
val dag = source ~> upper ~> sink
val app = StreamApplication("RedisSourceStorage", Graph(dag), UserConfig.empty)

context.submit(app)
Thread.sleep(600 * 1000)
context.close()
cluster.stop()
}
}
37 changes: 37 additions & 0 deletions experiments/redis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
### Gearpump Redis

***

[Redis](http://redis.io/) is an in-memory data structure store, used as database, cache and message broker. Redis is the most popular key-value database.

Usage :

```
val source = DataSourceProcessor(new RedisSource(channel = "channel.in"), 1)
val sink = DataSinkProcessor(new RedisSink(channel = "channel.out"), 1)
```

`RedisSource` used as a **DataSource** reading from Redis channel .`RedisSink` is a **DataSink** writing message to Redis channel .

```
val sink = DataSinkProcessor(new RedisStorage(), 1)
```

`RedisStorage` is a **DataSink** write message as a key-value pair in Redis . **Processor** could send Message to RedisStorage running the command :

1. `PublishMessage` : Send message to a Redis Channel .

2. `SetMessage` : Set the value for a key .

3. `LPushMessage` : Set the value at the head of a list .

4. `RPushMessage` : Set the value at the tail of a list .

5. `HSetMessage` : Set the value as a field of a hash .

6. `SAddMessage` : Set the value to a set .

7. `ZAddMessage` : Set the value to a sorted set .

8. `GEOAdd` : Set the value to the HyperLogLog data structure .

Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.redis

import java.nio.charset.Charset

object RedisMessage {

private def toBytes(string: String,
charset: Charset = Charset.forName("UTF8")
): Array[Byte] = string.getBytes(charset)

/**
* Send message to a Redis Channel
*
* @param message
*/
case class PublishMessage(message: Array[Byte]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we include all message types ? If not, which subset has been chosen and why ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages are implemented in storm-redis I try to keep they are the same .

BTW I will add some message both in gearpump and storm .

def this(message: String) = this(toBytes(message))
}

/**
* Set the value for a key
*
* @param key
* @param value
*/
case class SetMessage(key: Array[Byte], value: Array[Byte]) {
def this(key: String, value: String) = this(toBytes(key), toBytes(value))
}

/**
* Set the value at the head of a list
*
* @param key
* @param value
*/
case class LPushMessage(key: Array[Byte], value: Array[Byte]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear how this message type can be used. LPushMessage looks the same as RPushMessage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LPushMessage and RPushMessage are difference message to control redis client add element at list's head or tail

def this(key: String, value: String) = this(toBytes(key), toBytes(value))
}

/**
* Set the value at the tail of a list
*
* @param key
* @param value
*/
case class RPushMessage(key: Array[Byte], value: Array[Byte]) {
def this(key: String, value: String) = this(toBytes(key), toBytes(value))
}

/**
* Set the value as a field of a hash
*
* @param key
* @param field
* @param value
*/
case class HSetMessage(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
def this(key: String, field: String, value: String) = this(toBytes(key), toBytes(field), toBytes(value))
}

/**
* Set the value to a set
*
* @param key
* @param value
*/
case class SAddMessage(key: Array[Byte], value: Array[Byte]) {
def this(key: String, value: String) = this(toBytes(key), toBytes(value))
}

/**
* Set the value to a sorted set
*
* @param key
* @param score
* @param value
*/
case class ZAddMessage(key: Array[Byte], score: Double, value: Array[Byte]) {
def this(key: String, score: Double, value: String) = this(toBytes(key), score, toBytes(value))
}

/**
* Set the value to the HyperLogLog data structure
*
* @param key
* @param value
*/
case class PFAdd(key: Array[Byte], value: Array[Byte]) {
def this(key: String, value: String) = this(toBytes(key), toBytes(value))
}

/**
* Set the geospatial information to the specified key
*
* @param key
* @param longitude
* @param latitude
* @param value
*/
case class GEOAdd(key: Array[Byte], longitude: Double, latitude: Double, value: Array[Byte]) {
def this(key: String, longitude: Double, latitude: Double, value: String) =
this(toBytes(key), longitude, latitude, toBytes(value))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.redis

import io.gearpump.google.common.base.Strings
import org.apache.gearpump.Message
import org.apache.gearpump.streaming.redis.RedisMessage.PublishMessage
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.LogUtil
import redis.clients.jedis.Jedis
import redis.clients.jedis.Protocol.{DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT}

/**
* Publish messages to a Redis Pub/Sub Channel
*
* @param host
* @param port
* @param timeout
* @param password
* @param channel
*/
class RedisSink(host: String = DEFAULT_HOST,
port: Int = DEFAULT_PORT,
timeout: Int = DEFAULT_TIMEOUT,
password: String = "",
channel: Array[Byte]) extends DataSink {

private val LOG = LogUtil.getLogger(getClass)
@transient private lazy val client = new Jedis(host, port, timeout)

def this(channel: String) = this(channel = channel.getBytes())

override def open(context: TaskContext): Unit = {
if (!Strings.isNullOrEmpty(password)) {
client.auth(password)
}
}

override def write(message: Message): Unit = {
val msg = message.msg
msg match {
case publish: PublishMessage => client.publish(channel, publish.message)
case _ => LOG.error("Error Message ")
}
}

override def close(): Unit = {
client.close()
}
}
Loading