From 10b508524c1ce1be0c9063497a01ea31576cca18 Mon Sep 17 00:00:00 2001
From: ssp <ssp@alternativaplatform.com>
Date: Sat, 29 Dec 2018 13:39:18 +0500
Subject: [PATCH] Add pattern option for custom transformations from kafka
 topic names to hive tables names #155

---
 .../connect/storage/hive/HiveConfig.java      | 19 +++++++++++++++++
 .../connect/storage/hive/HiveMetaStore.java   | 21 ++++++++++++++++++-
 2 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/hive/src/main/java/io/confluent/connect/storage/hive/HiveConfig.java b/hive/src/main/java/io/confluent/connect/storage/hive/HiveConfig.java
index a839e1b34..f5232026d 100644
--- a/hive/src/main/java/io/confluent/connect/storage/hive/HiveConfig.java
+++ b/hive/src/main/java/io/confluent/connect/storage/hive/HiveConfig.java
@@ -60,6 +60,13 @@ public class HiveConfig extends AbstractConfig implements ComposableConfig {
   public static final String HIVE_DATABASE_DEFAULT = "default";
   public static final String HIVE_DATABASE_DISPLAY = "Hive database";
 
+  public static final String HIVE_TABLE_PATTERN_CONFIG = "hive.table.pattern";
+  public static final String HIVE_TABLE_PATTERN_DOC =
+      "Regular expression for transformation from Kafka topic name to Hive table name. "
+      + "If regular expression doesn't match, Kafka topic name will be used as Hive table name.";
+  public static final String HIVE_TABLE_PATTERN_DEFAULT = "";
+  public static final String HIVE_TABLE_PATTERN_DISPLAY = "";
+
   // Schema group
   public static final String SCHEMA_COMPATIBILITY_CONFIG = "schema.compatibility";
   public static final String SCHEMA_COMPATIBILITY_DOC =
@@ -153,6 +160,18 @@ public class HiveConfig extends AbstractConfig implements ComposableConfig {
           HIVE_DATABASE_DISPLAY,
           hiveIntegrationDependentsRecommender
       );
+
+      CONFIG_DEF.define(
+              HIVE_TABLE_PATTERN_CONFIG,
+          Type.STRING,
+              HIVE_TABLE_PATTERN_DEFAULT,
+          Importance.LOW,
+              HIVE_TABLE_PATTERN_DOC,
+          group,
+          ++orderInGroup,
+          Width.SHORT,
+              HIVE_TABLE_PATTERN_DISPLAY
+      );
     }
 
     {
diff --git a/hive/src/main/java/io/confluent/connect/storage/hive/HiveMetaStore.java b/hive/src/main/java/io/confluent/connect/storage/hive/HiveMetaStore.java
index 635602164..e502e6658 100644
--- a/hive/src/main/java/io/confluent/connect/storage/hive/HiveMetaStore.java
+++ b/hive/src/main/java/io/confluent/connect/storage/hive/HiveMetaStore.java
@@ -37,6 +37,8 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import io.confluent.connect.storage.errors.HiveMetaStoreException;
 
@@ -44,6 +46,7 @@ public class HiveMetaStore {
 
   private static final Logger log = LoggerFactory.getLogger(HiveMetaStore.class);
   protected final IMetaStoreClient client;
+  private Pattern hiveTableMatcher;
 
   public HiveMetaStore(AbstractConfig connectorConfig) {
     this(new Configuration(), connectorConfig);
@@ -78,6 +81,10 @@ public HiveMetaStore(Configuration conf, AbstractConfig connectorConfig)
     } catch (IOException | MetaException e) {
       throw new HiveMetaStoreException(e);
     }
+
+    String hiveTablePattern = connectorConfig
+        .getString(HiveConfig.HIVE_TABLE_PATTERN_CONFIG);
+    hiveTableMatcher = Pattern.compile(hiveTablePattern);
   }
 
   private interface ClientAction<R> {
@@ -364,6 +371,18 @@ public List<String> call() throws TException {
   }
 
   public String tableNameConverter(String table) {
-    return table == null ? table : table.replaceAll("[.-]", "_");
+    if (table == null) {
+      return null;
+    }
+
+    String converterTable;
+    Matcher tableMatcher = hiveTableMatcher.matcher(table);
+    if (tableMatcher.find() && tableMatcher.groupCount() == 1) {
+      converterTable = tableMatcher.group(1);
+    } else {
+      converterTable = table;
+    }
+
+    return converterTable.replaceAll("[.-]", "_");
   }
 }