Skip to content

Commit

Permalink
Merge pull request #164 from IG-Group/ISSUE-136
Browse files Browse the repository at this point in the history
Issue 136 - Support topic with dots in hive.
  • Loading branch information
ewencp authored Jan 17, 2017
2 parents e26514e + 0f754b8 commit 413a169
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void alterSchema(String database, String tableName, Schema schema) throws

private Table constructAvroTable(String database, String tableName, Schema schema, Partitioner partitioner)
throws HiveMetaStoreException {
Table table = new Table(database, tableName);
Table table = newTable(database, tableName);
table.setTableType(TableType.EXTERNAL_TABLE);
table.getParameters().put("EXTERNAL", "TRUE");
String tablePath = FileUtils.hiveDirectoryName(url, topicsDir, tableName);
Expand Down
28 changes: 16 additions & 12 deletions src/main/java/io/confluent/connect/hdfs/hive/HiveMetaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public Void call() throws TException {
// purposely don't check if the partition already exists because
// getPartition(db, table, path) will throw an exception to indicate the
// partition doesn't exist also. this way, it's only one call.
client.appendPartition(database, tableName, path);
client.appendPartition(database, tableNameConverter(tableName), path);
return null;
}
};
Expand All @@ -104,7 +104,7 @@ public Void call() throws TException {
} catch (AlreadyExistsException e) {
// this is okay
} catch (InvalidObjectException e) {
throw new HiveMetaStoreException("Invalid partition for " + database + "." + tableName + ": " + path, e);
throw new HiveMetaStoreException("Invalid partition for " + database + "." + tableNameConverter(tableName) + ": " + path, e);
} catch (MetaException e) {
throw new HiveMetaStoreException("Hive MetaStore exception", e);
} catch (TException e) {
Expand All @@ -116,7 +116,7 @@ public void dropPartition(final String database, final String tableName, final S
ClientAction<Void> dropPartition = new ClientAction<Void>() {
@Override
public Void call() throws TException {
client.dropPartition(database, tableName, path, false);
client.dropPartition(database, tableNameConverter(tableName), path, false);
return null;
}
};
Expand All @@ -126,7 +126,7 @@ public Void call() throws TException {
} catch (NoSuchObjectException e) {
// this is okay
} catch (InvalidObjectException e) {
throw new HiveMetaStoreException("Invalid partition for " + database + "." + tableName + ": " + path, e);
throw new HiveMetaStoreException("Invalid partition for " + database + "." + tableNameConverter(tableName) + ": " + path, e);
} catch (MetaException e) {
throw new HiveMetaStoreException("Hive MetaStore exception", e);
} catch (TException e) {
Expand Down Expand Up @@ -192,7 +192,7 @@ public Void call() throws TException {
try {
doAction(create);
} catch (NoSuchObjectException e) {
throw new HiveMetaStoreException("Hive table not found: " + table.getDbName() + "." + table.getTableName());
throw new HiveMetaStoreException("Hive table not found: " + table.getDbName() + "." + tableNameConverter(table.getTableName()));
} catch (AlreadyExistsException e) {
// this is okey
log.warn("Hive table already exists: {}.{}", table.getDbName(), table.getTableName());
Expand All @@ -209,7 +209,7 @@ public void alterTable(final Table table) throws HiveMetaStoreException {
ClientAction<Void> alter = new ClientAction<Void>() {
@Override
public Void call() throws TException {
client.alter_table(table.getDbName(), table.getTableName(), table.getTTable());
client.alter_table(table.getDbName(), tableNameConverter(table.getTableName()), table.getTTable());
return null;
}
};
Expand All @@ -233,7 +233,7 @@ public void dropTable(final String database, final String tableName) {
ClientAction<Void> drop = new ClientAction<Void>() {
@Override
public Void call() throws TException {
client.dropTable(database, tableName, false, true);
client.dropTable(database, tableNameConverter(tableName), false, true);
return null;
}
};
Expand All @@ -253,7 +253,7 @@ public boolean tableExists(final String database, final String tableName) throws
ClientAction<Boolean> exists = new ClientAction<Boolean>() {
@Override
public Boolean call() throws TException {
return client.tableExists(database, tableName);
return client.tableExists(database, tableNameConverter(tableName));
}
};
try {
Expand All @@ -271,23 +271,23 @@ public Table getTable(final String database, final String tableName) throws Hive
ClientAction<Table> getTable = new ClientAction<Table>() {
@Override
public Table call() throws TException {
return new Table(client.getTable(database, tableName));
return new Table(client.getTable(database, tableNameConverter(tableName)));
}
};

Table table;
try {
table = doAction(getTable);
} catch (NoSuchObjectException e) {
throw new HiveMetaStoreException("Hive table not found: " + database + "." + tableName);
throw new HiveMetaStoreException("Hive table not found: " + database + "." + tableNameConverter(tableName));
} catch (MetaException e) {
throw new HiveMetaStoreException("Hive table lookup exception", e);
} catch (TException e) {
throw new HiveMetaStoreException("Exception communicating with the Hive MetaStore", e);
}

if (table == null) {
throw new HiveMetaStoreException("Could not find info for table: " + tableName);
throw new HiveMetaStoreException("Could not find info for table: " + tableNameConverter(tableName));
}
return table;
}
Expand All @@ -296,7 +296,7 @@ public List<String> listPartitions(final String database, final String tableName
ClientAction<List<String>> listPartitions = new ClientAction<List<String>>() {
@Override
public List<String> call() throws TException {
List<Partition> partitions = client.listPartitions(database, tableName, max);
List<Partition> partitions = client.listPartitions(database, tableNameConverter(tableName), max);
List<String> paths = new ArrayList<>();
for (Partition partition : partitions) {
paths.add(partition.getSd().getLocation());
Expand Down Expand Up @@ -354,4 +354,8 @@ public List<String> call() throws TException {
throw new HiveMetaStoreException("Exception communicating with the Hive MetaStore", e);
}
}

public String tableNameConverter(String table){
return table == null ? table : table.replaceAll("\\.", "_");
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/hive/HiveUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package io.confluent.connect.hdfs.hive;

import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.kafka.connect.data.Schema;

import io.confluent.connect.avro.AvroData;
Expand All @@ -38,4 +39,8 @@ public HiveUtil(HdfsSinkConnectorConfig connectorConfig, AvroData avroData, Hive
public abstract void createTable(String database, String tableName, Schema schema, Partitioner partitioner);

public abstract void alterSchema(String database, String tableName, Schema schema);

public Table newTable(String database, String table){
return new Table(database, hiveMetaStore.tableNameConverter(table));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void alterSchema(String database, String tableName, Schema schema) {
}

private Table constructParquetTable(String database, String tableName, Schema schema, Partitioner partitioner) throws HiveMetaStoreException {
Table table = new Table(database, tableName);
Table table = newTable(database, tableName);
table.setTableType(TableType.EXTERNAL_TABLE);
table.getParameters().put("EXTERNAL", "TRUE");
String tablePath = FileUtils.hiveDirectoryName(url, topicsDir, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ public class HdfsSinkConnectorTestBase {

protected MockSinkTaskContext context;
protected static final String TOPIC = "topic";
protected static final String TOPIC_WITH_DOTS = "topic.with.dots";
protected static final int PARTITION = 12;
protected static final int PARTITION2 = 13;
protected static final int PARTITION3 = 14;
protected static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
protected static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
protected static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
protected static final TopicPartition TOPIC_WITH_DOTS_PARTITION = new TopicPartition(TOPIC_WITH_DOTS, PARTITION);
protected static Set<TopicPartition> assignment;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,54 @@ public void testHiveIntegrationAvro() throws Exception {
assertEquals(expectedPartitions, partitions);
}

@Test
public void testHiveIntegrationTopicWithDotsAvro() throws Exception {
assignment.add(TOPIC_WITH_DOTS_PARTITION);

Map<String, String> props = createProps();
props.put(HdfsSinkConnectorConfig.HIVE_INTEGRATION_CONFIG, "true");
HdfsSinkConnectorConfig config = new HdfsSinkConnectorConfig(props);

DataWriter hdfsWriter = new DataWriter(config, context, avroData);
hdfsWriter.recover(TOPIC_WITH_DOTS_PARTITION);

String key = "key";
Schema schema = createSchema();
Struct record = createRecord(schema);

Collection<SinkRecord> sinkRecords = new ArrayList<>();
for (long offset = 0; offset < 7; offset++) {
SinkRecord sinkRecord =
new SinkRecord(TOPIC_WITH_DOTS, PARTITION, Schema.STRING_SCHEMA, key, schema, record, offset);

sinkRecords.add(sinkRecord);
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close(assignment);
hdfsWriter.stop();

Table table = hiveMetaStore.getTable(hiveDatabase, TOPIC_WITH_DOTS);
List<String> expectedColumnNames = new ArrayList<>();
for (Field field: schema.fields()) {
expectedColumnNames.add(field.name());
}

List<String> actualColumnNames = new ArrayList<>();
for (FieldSchema column: table.getSd().getCols()) {
actualColumnNames.add(column.getName());
}
assertEquals(expectedColumnNames, actualColumnNames);

List<String> expectedPartitions = new ArrayList<>();
String directory = TOPIC_WITH_DOTS + "/" + "partition=" + String.valueOf(PARTITION);
expectedPartitions.add(FileUtils.directoryName(url, topicsDir, directory));

List<String> partitions = hiveMetaStore.listPartitions(hiveDatabase, TOPIC_WITH_DOTS, (short)-1);

assertEquals(expectedPartitions, partitions);
}

@Test
public void testHiveIntegrationFieldPartitionerAvro() throws Exception {
Map<String, String> props = createProps();
Expand Down

0 comments on commit 413a169

Please sign in to comment.