Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
33cd2cf
Extend Jedis pipeline to return key
ntviet18 Oct 25, 2018
80cac05
Cleanup k/v hgetall test
ntviet18 Oct 25, 2018
f6f278d
Clarify k/v builder
ntviet18 Oct 25, 2018
6a62e94
Implement k/v hmget
ntviet18 Oct 25, 2018
bce5228
Implement table key value extractor
ntviet18 Oct 26, 2018
0d05625
Read key column from Redis keys
ntviet18 Oct 26, 2018
3467849
Add table key extractor with keys and empty patterns
ntviet18 Oct 26, 2018
56586e4
Exclude user defined key from hash dataframe values
ntviet18 Oct 28, 2018
73501f9
Fix cannot load user defined keys
ntviet18 Oct 28, 2018
f46ac35
Replace implicit conversion with key order zipping
ntviet18 Oct 28, 2018
c103a0a
Simplify dataframe row decoding
ntviet18 Oct 28, 2018
c788085
Remove redundant type parameters
ntviet18 Oct 28, 2018
58ad507
Extract dataframe key parameters
ntviet18 Oct 28, 2018
5885961
Document key column storage
ntviet18 Oct 28, 2018
5b0cfc0
Document key column storage limitations
ntviet18 Oct 28, 2018
33695e2
Remove redundant fields from dataframe documentation
ntviet18 Oct 28, 2018
9766345
Generalize Redis prefix key pattern
ntviet18 Oct 30, 2018
f9b2fa5
Test extract key column with prefix pattern
ntviet18 Oct 30, 2018
b7f28eb
Use explicit type casts to avoid type erasure warnings
ntviet18 Oct 30, 2018
5cf3ab8
Document Redis row encoding
ntviet18 Oct 30, 2018
4c3c512
Document Redis row decoding
ntviet18 Oct 30, 2018
d19ebbb
Decouple schema inference from row decoding
ntviet18 Oct 30, 2018
e019d94
Cache filtered schema while scanning rows
ntviet18 Oct 30, 2018
2e16f7d
Prune unreachable code while persisting dataframe rows
ntviet18 Oct 30, 2018
9328b32
Document read _id from hashes with Redis key
ntviet18 Oct 30, 2018
0316628
Loan scan rows connection
ntviet18 Oct 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions doc/dataframe.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,40 @@ The keys in Redis:
2) "person:Peter"
```

The keys will not be persisted in Redis hashes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also update 'Reading Redis Hashes' and 'DataFrame options' sections


```bash
127.0.0.1:6379> hgetall person:John
1) "age"
2) "30"
```

In order to load the keys back, you also need to specify
the key column parameter while reading

```scala
val df = spark.read
.format("org.apache.spark.sql.redis")
.option("table", "person")
.option("key.column", "name")
.load()
```

Otherwise, a field with name `_id` of type `String` will be populated

```bash
root
|-- _id: string (nullable = true)
|-- age: integer (nullable = false)

+-----+---+
| _id|age|
+-----+---+
| John| 30|
|Peter| 45|
+-----+---+
```

### Save Modes

Spark-redis supports all DataFrame [SaveMode](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)'s: `Append`,
Expand Down Expand Up @@ -213,7 +247,7 @@ root
+-----+---+
| John| 30|
|Peter| 45|
+-----+---+
+-----+---+
```

To read with a Spark SQL:
Expand Down Expand Up @@ -262,8 +296,42 @@ The output is:
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- _id: string (nullable = true)
```

Note: If your schema has a field named `_id` or it was inferred. The
Redis key will be stored in that field. Spark Redis will also try to
extract the key based on your pattern. (you can also change the name
of key column, please refer to [Specifying Redis key](#specifying-redis-key))
- if the pattern ends with `*` and it's the only wildcard, all the
trailing value will be extracted, e.g.
```scala
df.show()
```
```bash
+-----+---+-----+
| name|age| _id|
+-----+---+-----+
| John| 30| John|
|Peter| 45|Peter|
+-----+---+-----+
```
- otherwise, all Redis key will be kept as is, e.g.
```scala
val df = // code ommitted...
.option("keys.pattern", "p*:*")
.load()
df.show()
```
```bash
+-----+---+------------+
| name|age| _id|
+-----+---+------------+
| John| 30| person:John|
|Peter| 45|person:Peter|
+-----+---+------------+
```

## DataFrame options

| Name | Description | Type | Default |
Expand All @@ -279,4 +347,5 @@ root

## Known limitations

- Nested DataFrame fields are not currently supported with Hash model. Consider making DataFrame schema flat or using Binary persistence model.
- Nested DataFrame fields are not currently supported with Hash model. Consider making DataFrame schema flat or using Binary persistence model.
- Key column deserialization relies on pattern prefix, e.g. keysPattern:*, tableName:$key
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.redislabs.provider.redis.util

import com.redislabs.provider.redis.RedisEndpoint
import redis.clients.jedis.Jedis

/**
* @author The Viet Nguyen
*/
object ConnectionUtils {

def withConnection[A](endpoint: RedisEndpoint)(body: Jedis => A): A = {
val conn = endpoint.connect()
def withConnection[A](conn: Jedis)(body: Jedis => A): A = {
val res = body(conn)
conn.close()
res
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] {
override def load(pipeline: Pipeline, key: String, requiredColumns: Seq[String]): Unit =
pipeline.get(key.getBytes(UTF_8))

override def encodeRow(value: Row): Array[Byte] = {
override def encodeRow(keyName: String, value: Row): Array[Byte] = {
val fields = value.schema.fields.map(_.name)
val valuesArray = fields.map(f => value.getAs[Any](f))
SerializationUtils.serialize(valuesArray)
}

override def decodeRow(value: Array[Byte], schema: => StructType, inferSchema: Boolean): Row = {
override def decodeRow(keyMap: (String, String), value: Array[Byte], schema: StructType,
requiredColumns: Seq[String]): Row = {
val valuesArray: Array[Any] = SerializationUtils.deserialize(value)
new GenericRowWithSchema(valuesArray, schema)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.sql.redis

import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
import java.util.{List => JList}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
Expand All @@ -12,47 +13,44 @@ import scala.collection.JavaConverters._
/**
* @author The Viet Nguyen
*/
class HashRedisPersistence extends RedisPersistence[Map[String, String]] {
class HashRedisPersistence extends RedisPersistence[Any] {

override def save(pipeline: Pipeline, key: String, value: Map[String, String], ttl: Int): Unit = {
val javaValue = value.asJava
override def save(pipeline: Pipeline, key: String, value: Any, ttl: Int): Unit = {
val javaValue = value.asInstanceOf[Map[String, String]].asJava
pipeline.hmset(key, javaValue)
if (ttl > 0) {
pipeline.expire(key, ttl)
}
}

override def load(pipeline: Pipeline, key: String, requiredColumns: Seq[String]): Unit = {
if (requiredColumns.isEmpty) {
pipeline.hgetAll(key)
} else {
pipeline.hmget(key, requiredColumns: _*)
}
pipeline.hmget(key, requiredColumns: _*)
}

override def encodeRow(value: Row): Map[String, String] = {
override def encodeRow(keyName: String, value: Row): Map[String, String] = {
val fields = value.schema.fields.map(_.name)
val kvMap = value.getValuesMap[Any](fields)
kvMap
.filter { case (k, v) =>
.filter { case (_, v) =>
// don't store null values
v != null
}
.filter { case (k, _) =>
// don't store key values
k != keyName
}
.map { case (k, v) =>
k -> String.valueOf(v)
}
}

override def decodeRow(value: Map[String, String], schema: => StructType,
inferSchema: Boolean): Row = {
val actualSchema = if (!inferSchema) schema else {
val fields = value.keys
.map(StructField(_, StringType))
.toArray
StructType(fields)
}
val fieldsValue = parseFields(value, actualSchema)
new GenericRowWithSchema(fieldsValue, actualSchema)
override def decodeRow(keyMap: (String, String), value: Any, schema: StructType,
requiredColumns: Seq[String]): Row = {
val scalaValue = value.asInstanceOf[JList[String]].asScala
val values = requiredColumns.zip(scalaValue)
val results = values :+ keyMap
val fieldsValue = parseFields(results.toMap, schema)
new GenericRowWithSchema(fieldsValue, schema)
}

private def parseFields(value: Map[String, String], schema: StructType): Array[Any] =
Expand Down
23 changes: 20 additions & 3 deletions src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,26 @@ trait RedisPersistence[T] extends Serializable {

def load(pipeline: Pipeline, key: String, requiredColumns: Seq[String]): Unit

def encodeRow(value: Row): T

def decodeRow(value: T, schema: => StructType, inferSchema: Boolean): Row
/**
* Encode dataframe row before storing it in Redis.
*
* @param keyName field name that should be encoded in special way, e.g. in Redis keys.
* @param value row to encode.
* @return encoded row
*/
def encodeRow(keyName: String, value: Row): T

/**
* Decode dataframe row stored in Redis.
*
* @param keyMap extracted name/value of key column from Redis key
* @param value encoded row
* @param schema row schema
* @param requiredColumns required columns to decode
* @return decoded row
*/
def decodeRow(keyMap: (String, String), value: T, schema: StructType,
requiredColumns: Seq[String]): Row
}

object RedisPersistence {
Expand Down
Loading