Skip to content

Commit

Permalink
Add pattern option for custom transformations from kafka topic names …
Browse files Browse the repository at this point in the history
…to hive tables names #155
  • Loading branch information
ssp committed Jan 29, 2019
1 parent b84b2af commit cdf844e
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public class HiveConfig extends AbstractConfig implements ComposableConfig {
public static final String HIVE_DATABASE_DEFAULT = "default";
public static final String HIVE_DATABASE_DISPLAY = "Hive database";

public static final String HIVE_TABLE_PATTERN_CONFIG = "hive.table.pattern";
public static final String HIVE_TABLE_PATTERN_DOC =
"Regular expression for transformation from Kafka topic name to Hive table name. "
+ "If regular expression doesn't match, Kafka topic name will be used as Hive table name.";
public static final String HIVE_TABLE_PATTERN_DEFAULT = "(.*)";
public static final String HIVE_TABLE_PATTERN_DISPLAY = "Hive table pattern";

// Schema group
public static final String SCHEMA_COMPATIBILITY_CONFIG = "schema.compatibility";
public static final String SCHEMA_COMPATIBILITY_DOC =
Expand Down Expand Up @@ -153,6 +160,18 @@ public class HiveConfig extends AbstractConfig implements ComposableConfig {
HIVE_DATABASE_DISPLAY,
hiveIntegrationDependentsRecommender
);

CONFIG_DEF.define(
HIVE_TABLE_PATTERN_CONFIG,
Type.STRING,
HIVE_TABLE_PATTERN_DEFAULT,
Importance.LOW,
HIVE_TABLE_PATTERN_DOC,
group,
++orderInGroup,
Width.SHORT,
HIVE_TABLE_PATTERN_DISPLAY
);
}

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.confluent.connect.storage.errors.HiveMetaStoreException;

public class HiveMetaStore {

private static final Logger log = LoggerFactory.getLogger(HiveMetaStore.class);
protected final IMetaStoreClient client;
private Pattern hiveTableMatcher;

public HiveMetaStore(AbstractConfig connectorConfig) {
this(new Configuration(), connectorConfig);
Expand Down Expand Up @@ -78,6 +81,10 @@ public HiveMetaStore(Configuration conf, AbstractConfig connectorConfig)
} catch (IOException | MetaException e) {
throw new HiveMetaStoreException(e);
}

String hiveTablePattern = connectorConfig
.getString(HiveConfig.HIVE_TABLE_PATTERN_CONFIG);
hiveTableMatcher = Pattern.compile(hiveTablePattern);
}

private interface ClientAction<R> {
Expand Down Expand Up @@ -364,6 +371,21 @@ public List<String> call() throws TException {
}

public String tableNameConverter(String table) {
return table == null ? table : table.replaceAll("[.-]", "_");
if (table == null) {
return null;
}

String converterTable;
Matcher tableMatcher = hiveTableMatcher.matcher(table);
if (tableMatcher.find() && tableMatcher.groupCount() == 1) {
converterTable = tableMatcher.group(1);
} else {
throw new HiveMetaStoreException(
"Cannot convert hive table name. Pattern: '" + hiveTableMatcher.pattern()
+ "' source table: " + table
);
}

return converterTable.replaceAll("[.-]", "_");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.confluent.connect.storage.hive;

import io.confluent.connect.storage.errors.HiveMetaStoreException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;

import static io.confluent.connect.storage.hive.HiveConfig.HIVE_TABLE_PATTERN_CONFIG;
import static io.confluent.connect.storage.hive.HiveConfig.HIVE_TABLE_PATTERN_DEFAULT;
import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
public class HiveMetaStoreTableNameConverterFactoryTest {

@Rule
public ExpectedException thrown = ExpectedException.none();

@Parameterized.Parameter(0)
public String hivePatternConfig;

@Parameterized.Parameter(1)
public String sourceTableName;

@Parameterized.Parameter(2)
public String convertedTableName;

@Parameterized.Parameter(3)
public Class<Throwable> exception;

@Parameterized.Parameters(name = "pattern='{0}' sourceTableName='{1}' convertedTableName='{2}'")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{HIVE_TABLE_PATTERN_DEFAULT, "topic_name", "topic_name", null},
{HIVE_TABLE_PATTERN_DEFAULT, "topic-name", "topic_name", null},
{HIVE_TABLE_PATTERN_DEFAULT, "topic-name", "topic_name", null},
{"^.*\\.(.*)$", "topic.name", "name", null},
{"^.*\\.(.*)$", "topic-name", null, HiveMetaStoreException.class}
});
}

@Test
public void test() {
HiveConfig hiveConfig = new HiveConfig(new HashMap<String, String>() {{
put(HIVE_TABLE_PATTERN_CONFIG, hivePatternConfig);
}});

HiveMetaStore hiveMetaStore = new HiveMetaStore(hiveConfig);

if (exception != null) {
thrown.expect(exception);
}

assertEquals(hiveMetaStore.tableNameConverter(sourceTableName), convertedTableName);
}
}

0 comments on commit cdf844e

Please sign in to comment.