Skip to content

Commit

Permalink
Fixes gearpump#2 Extend tap config helper to handle zookeeper & kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
kkasravi committed Nov 6, 2015
1 parent e6f7d7e commit 9cbe9c6
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 14 deletions.
52 changes: 50 additions & 2 deletions src/main/java/io/gearpump/tap/TapConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,43 @@
* },
* "name": "service instance name"
* }
* ]
* ],
* "kafka": [
* {
* "credentials": {
* "uri": "10.10.10.201:9092,10.10.10.50:9092,10.10.10.210:9092"
* },
* "label": "kafka",
* "name": "gearpump_kafka",
* "plan": "shared",
* "tags": [
* "kafka",
* "distributed",
* "real-time",
* "messaging"
* ]
* }
* ],
* "zookeeper": [
* {
* "credentials": {
* "kerberos": {
* "kdc": "",
* "krealm": ""
* },
* "zk.cluster": "10.10.10.201:2181,10.10.10.50:2181,10.10.10.210:2181",
* "zk.node": "/org/intel/zookeeperbroker/metadata/34ca13ec-18e4-4598-b2a2-c6ce981861b0"
* },
* "label": "zookeeper",
* "name": "gearpump_zookeeper",
* "plan": "shared",
* "tags": []
* }
* ]
* }
* }
*
* "hbase", "hdfs" keys represent srvice types. There can by many instances within given type.
* "hbase", "hdfs" keys represent service types. There can by many instances within given type.
* The instances of the same type are distinguished by service name ("name" key)
*/
public interface TapConfig {
Expand All @@ -80,6 +112,22 @@ public interface TapConfig {
*/
Configuration getHDFSConfig(String serviceId);

/**
* get a Kafka Configuration by kafka instance Id
*
* @param serviceId
* @return
*/
Configuration getKafkaConfig(String serviceId);

/**
* get a Zookeeper Configuration by zookeeper instance Id
*
* @param serviceId
* @return
*/
Configuration getZookeeperConfig(String serviceId);

/**
* Get a namespace provisioned for given HBase service by TAP.
*
Expand Down
71 changes: 59 additions & 12 deletions src/main/java/io/gearpump/tap/TapJsonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

public class TapJsonConfig<T extends Map> implements TapConfig {

private static final String TAP_CONFIG_ROOT = "VCAP_SERVICES";
private static final String TAP_CONFIG_HBASE_TYPE = "hbase";
private static final String TAP_CONFIG_HDFS_TYPE = "hdfs";
private static final String TAP_CONFIG_KAFKA_TYPE = "kafka";
private static final String TAP_CONFIG_ZOOKEEPER_TYPE = "zookeeper";
private static final String TAP_CONFIG_INSTANCE_ID_KEY = "name";
private static final String TAP_CONFIG_NAMESPACE_KEY = "hbase.namespace";
private static final String HADOOP_CONFIG_KEY_VALUE = "HADOOP_CONFIG_KEY";
private static final String KAFKA_URI = "uri";
private static final String KAFKA_BROKERS = "brokers";
private static final String ZOOKEEPER_CLUSTER = "zk.cluster";
private static final String ZOOKEEPER_SERVERS = "zookeepers";
private static final String CREDENTIALS_KEY_VALUE = "credentials";

private final T tapConfig;
Expand Down Expand Up @@ -68,14 +76,37 @@ private T serviceNodeForInstance(ArrayList<T> serviceNodes, String instanceName)
return result;
}

private static <T extends Map> Configuration createConfiguration(T hbaseService) {
private static <T extends Map> Configuration createConfiguration(T service, String type) {
Configuration result = null;

if (hbaseService != null) {
result = new Configuration();
Map credentials = (Map) hbaseService.get(CREDENTIALS_KEY_VALUE);
Map<String, String> values = (Map<String, String>) credentials.get(TapJsonConfig.HADOOP_CONFIG_KEY_VALUE);
values.forEach(result::set);
if (service != null) {
result = new Configuration();
Map credentials;
Map<String, String> values;
switch(type) {
case TAP_CONFIG_HBASE_TYPE:
credentials = (Map) service.get(CREDENTIALS_KEY_VALUE);
values = (Map<String, String>) credentials.get(TapJsonConfig.HADOOP_CONFIG_KEY_VALUE);
values.forEach(result::set);
break;
case TAP_CONFIG_HDFS_TYPE:
credentials = (Map) service.get(CREDENTIALS_KEY_VALUE);
values = (Map<String, String>) credentials.get(TapJsonConfig.HADOOP_CONFIG_KEY_VALUE);
values.forEach(result::set);
break;
case TAP_CONFIG_KAFKA_TYPE:
credentials = (Map) service.get(CREDENTIALS_KEY_VALUE);
values = new HashMap<String, String>();
values.put(TapJsonConfig.KAFKA_BROKERS, credentials.get(TapJsonConfig.KAFKA_URI).toString());
values.forEach(result::set);
break;
case TAP_CONFIG_ZOOKEEPER_TYPE:
credentials = (Map) service.get(CREDENTIALS_KEY_VALUE);
values = new HashMap<String, String>();
values.put(TapJsonConfig.ZOOKEEPER_SERVERS, credentials.get(TapJsonConfig.ZOOKEEPER_CLUSTER).toString());
values.forEach(result::set);
break;
}
}

return result;
Expand All @@ -91,17 +122,33 @@ public String getHBaseNamespace(String instanceName) {

@Override
public Configuration getHBase(String instanceName) {
ArrayList<T> hbases = getServiceNodes(TAP_CONFIG_HBASE_TYPE);
T serviceNode = serviceNodeForInstance(hbases, instanceName);
ArrayList<T> hbase = getServiceNodes(TAP_CONFIG_HBASE_TYPE);
T serviceNode = serviceNodeForInstance(hbase, instanceName);

return createConfiguration(serviceNode);
return createConfiguration(serviceNode, TAP_CONFIG_HBASE_TYPE);
}

@Override
public Configuration getHDFSConfig(String instanceName) {
ArrayList<T> hbases = getServiceNodes(TAP_CONFIG_HBASE_TYPE);
T serviceNode = serviceNodeForInstance(hbases, instanceName);
ArrayList<T> hdfs = getServiceNodes(TAP_CONFIG_HDFS_TYPE);
T serviceNode = serviceNodeForInstance(hdfs, instanceName);

return createConfiguration(serviceNode, TAP_CONFIG_HDFS_TYPE);
}

@Override
public Configuration getKafkaConfig(String instanceName) {
ArrayList<T> kafka = getServiceNodes(TAP_CONFIG_KAFKA_TYPE);
T serviceNode = serviceNodeForInstance(kafka, instanceName);

return createConfiguration(serviceNode, TAP_CONFIG_KAFKA_TYPE);
}

@Override
public Configuration getZookeeperConfig(String instanceName) {
ArrayList<T> zookeeper = getServiceNodes(TAP_CONFIG_ZOOKEEPER_TYPE);
T serviceNode = serviceNodeForInstance(zookeeper, instanceName);

return createConfiguration(serviceNode);
return createConfiguration(serviceNode, TAP_CONFIG_ZOOKEEPER_TYPE);
}
}

0 comments on commit 9cbe9c6

Please sign in to comment.