Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: redis sink using depot #193

Merged
merged 12 commits into from
Nov 9, 2022
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies {
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation group: 'io.odpf', name: 'depot', version: '0.2.1'
implementation group: 'io.odpf', name: 'depot', version: '0.3.0'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
Expand Down
85 changes: 13 additions & 72 deletions docs/docs/sinks/redis-sink.md
Original file line number Diff line number Diff line change
@@ -1,80 +1,21 @@
# Redis
# Redis Sink

A Redis sink Firehose \(`SINK_TYPE`=`redis`\) requires the following variables to be set along with Generic ones
Redis Sink is implemented in Firehose using the Redis sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot).

### `SINK_REDIS_URLS`
### Data Types
Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List
- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping)
- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_list_data_field_name)
- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_key_value_data_field_name)

REDIS instance hostname/IP address followed by its port.
The `key` is picked up from a field in the message itself.

- Example value: `localhos:6379,localhost:6380`
- Type: `required`
Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now.

### `SINK_REDIS_DATA_TYPE`
### Configuration

To select whether you want to push your data as a HashSet or as a List.
For Redis sink in Firehose we need to set first (`SINK_TYPE`=`redis`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Redis sink specific configs are mentioned in ODPF Depot repository. You can check out the Redis Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md)

- Example value: `Hashset`
- Type: `required`
- Default value: `List`

### `SINK_REDIS_KEY_TEMPLATE`

The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key.

- Example value: `Service\_%%s,1`

This will take the value with index 1 from proto and create the Redis keys as per the template\

- Type: `required`

### `INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING`

This is the field that decides what all data will be stored in the HashSet for each message.

- Example value: `{"6":"customer_id", "2":"order_num"}`
- Type: `required (For Hashset)`

### `SINK_REDIS_LIST_DATA_PROTO_INDEX`

This field decides what all data will be stored in the List for each message.

- Example value: `6`

This will get the value of the field with index 6 in your proto and push that to the Redis list with the corresponding keyTemplate\

- Type: `required (For List)`

### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX`

This field decides what data will be stored in the value part of key-value pair

- Example value: `6`

This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\

- Type: `required (For KeyValue)`

### `SINK_REDIS_TTL_TYPE`

- Example value: `DURATION`
- Type: `optional`
- Default value: `DISABLE`
- Choice of Redis TTL type.It can be:\
- `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)\
- `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired

### `SINK_REDIS_TTL_VALUE`

Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type.

- Example value: `100000`
- Type: `optional`
- Default value: `0`

### `SINK_REDIS_DEPLOYMENT_TYPE`

The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types.

- Example value: `Standalone`
- Type: `required`
- Default value: `Standalone`
### Deployment Types
Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`.
43 changes: 0 additions & 43 deletions src/main/java/io/odpf/firehose/config/RedisSinkConfig.java

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

19 changes: 16 additions & 3 deletions src/main/java/io/odpf/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import io.odpf.depot.bigquery.BigQuerySink;
import io.odpf.depot.bigquery.BigQuerySinkFactory;
import io.odpf.depot.redis.RedisSink;
import io.odpf.depot.redis.RedisSinkFactory;
import io.odpf.depot.config.RedisSinkConfig;
import io.odpf.depot.config.BigQuerySinkConfig;
import io.odpf.depot.log.LogSink;
import io.odpf.depot.log.LogSinkFactory;
Expand All @@ -20,10 +23,10 @@
import io.odpf.firehose.sink.jdbc.JdbcSinkFactory;
import io.odpf.firehose.sink.mongodb.MongoSinkFactory;
import io.odpf.firehose.sink.prometheus.PromSinkFactory;
import io.odpf.firehose.sink.redis.RedisSinkFactory;
import io.odpf.stencil.client.StencilClient;
import org.aeonbits.owner.ConfigFactory;

import java.io.IOException;
import java.util.Map;

public class SinkFactory {
Expand All @@ -34,6 +37,7 @@ public class SinkFactory {
private final OffsetManager offsetManager;
private BigQuerySinkFactory bigQuerySinkFactory;
private LogSinkFactory logSinkFactory;
private RedisSinkFactory redisSinkFactory;
private final Map<String, String> config;

public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
Expand All @@ -57,7 +61,6 @@ public void init() {
case HTTP:
case INFLUXDB:
case ELASTICSEARCH:
case REDIS:
case GRPC:
case PROMETHEUS:
case BLOB:
Expand All @@ -67,6 +70,16 @@ public void init() {
logSinkFactory = new LogSinkFactory(config, statsDReporter);
logSinkFactory.init();
return;
case REDIS:
redisSinkFactory = new RedisSinkFactory(
ConfigFactory.create(RedisSinkConfig.class, config),
statsDReporter);
try {
redisSinkFactory.init();
} catch (IOException e) {
throw new IllegalArgumentException("Exception occurred while creating sink", e);
sumitaich1998 marked this conversation as resolved.
Show resolved Hide resolved
}
return;
case BIGQUERY:
BigquerySinkUtils.addMetadataColumns(config);
bigQuerySinkFactory = new BigQuerySinkFactory(
Expand Down Expand Up @@ -95,7 +108,7 @@ public Sink getSink() {
case ELASTICSEARCH:
return EsSinkFactory.create(config, statsDReporter, stencilClient);
case REDIS:
return RedisSinkFactory.create(config, statsDReporter, stencilClient);
return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), sinkType.name(), redisSinkFactory.create());
case GRPC:
return GrpcSinkFactory.create(config, statsDReporter, stencilClient);
case PROMETHEUS:
Expand Down
57 changes: 0 additions & 57 deletions src/main/java/io/odpf/firehose/sink/redis/RedisSink.java

This file was deleted.

51 changes: 0 additions & 51 deletions src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java

This file was deleted.

Loading