Skip to content

Commit

Permalink
Add pattern option for custom transformations from kafka topic names …
Browse files Browse the repository at this point in the history
…to hive tables names #155
  • Loading branch information
ssp committed Dec 29, 2018
1 parent 40aaa1f commit 10b5085
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public class HiveConfig extends AbstractConfig implements ComposableConfig {
public static final String HIVE_DATABASE_DEFAULT = "default";
public static final String HIVE_DATABASE_DISPLAY = "Hive database";

public static final String HIVE_TABLE_PATTERN_CONFIG = "hive.table.pattern";
public static final String HIVE_TABLE_PATTERN_DOC =
"Regular expression for transformation from Kafka topic name to Hive table name. "
+ "If regular expression doesn't match, Kafka topic name will be used as Hive table name.";
public static final String HIVE_TABLE_PATTERN_DEFAULT = "";
public static final String HIVE_TABLE_PATTERN_DISPLAY = "";

// Schema group
public static final String SCHEMA_COMPATIBILITY_CONFIG = "schema.compatibility";
public static final String SCHEMA_COMPATIBILITY_DOC =
Expand Down Expand Up @@ -153,6 +160,18 @@ public class HiveConfig extends AbstractConfig implements ComposableConfig {
HIVE_DATABASE_DISPLAY,
hiveIntegrationDependentsRecommender
);

CONFIG_DEF.define(
HIVE_TABLE_PATTERN_CONFIG,
Type.STRING,
HIVE_TABLE_PATTERN_DEFAULT,
Importance.LOW,
HIVE_TABLE_PATTERN_DOC,
group,
++orderInGroup,
Width.SHORT,
HIVE_TABLE_PATTERN_DISPLAY
);
}

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.confluent.connect.storage.errors.HiveMetaStoreException;

public class HiveMetaStore {

private static final Logger log = LoggerFactory.getLogger(HiveMetaStore.class);
protected final IMetaStoreClient client;
private Pattern hiveTableMatcher;

public HiveMetaStore(AbstractConfig connectorConfig) {
this(new Configuration(), connectorConfig);
Expand Down Expand Up @@ -78,6 +81,10 @@ public HiveMetaStore(Configuration conf, AbstractConfig connectorConfig)
} catch (IOException | MetaException e) {
throw new HiveMetaStoreException(e);
}

String hiveTablePattern = connectorConfig
.getString(HiveConfig.HIVE_TABLE_PATTERN_CONFIG);
hiveTableMatcher = Pattern.compile(hiveTablePattern);
}

private interface ClientAction<R> {
Expand Down Expand Up @@ -364,6 +371,18 @@ public List<String> call() throws TException {
}

public String tableNameConverter(String table) {
return table == null ? table : table.replaceAll("[.-]", "_");
if (table == null) {
return null;
}

String converterTable;
Matcher tableMatcher = hiveTableMatcher.matcher(table);
if (tableMatcher.find() && tableMatcher.groupCount() == 1) {
converterTable = tableMatcher.group(1);
} else {
converterTable = table;
}

return converterTable.replaceAll("[.-]", "_");
}
}

0 comments on commit 10b5085

Please sign in to comment.