Skip to content

Commit

Permalink
Merge pull request #1 from Eneco/b_hive_and_ha
Browse files Browse the repository at this point in the history
Catch HiveExceptions and pass HDFS conf
  • Loading branch information
johnhofman authored Dec 2, 2016
2 parents 8da9d61 + aadcc71 commit 6db48d2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
10 changes: 8 additions & 2 deletions src/main/java/io/confluent/connect/hdfs/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ public void open(Collection<TopicPartition> partitions) {
// assigned topics while we try to recover offsets and rewind.
recover(tp);
}
if(hiveIntegration) {
syncWithHive();
}
}

public void close(Collection<TopicPartition> partitions) {
Expand All @@ -293,9 +296,12 @@ public void close(Collection<TopicPartition> partitions) {
// valid. For now, we prefer the simpler solution that may result in a bit of wasted effort.
for (TopicPartition tp: assignment) {
try {
topicPartitionWriters.get(tp).close();
TopicPartitionWriter tpw = topicPartitionWriters.get(tp);
if(tpw != null) {
tpw.close();
}
} catch (ConnectException e) {
log.error("Error closing writer for {}. Error: {]", tp, e.getMessage());
log.error("Error closing writer for {}. Error: {}", tp, e.getMessage());
} finally {
topicPartitionWriters.remove(tp);
}
Expand Down
31 changes: 22 additions & 9 deletions src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -585,36 +585,49 @@ private void setRetryTimeout(long timeoutMs) {
context.timeout(timeoutMs);
}

private abstract class HiveCall implements Callable<Void> {
abstract void hiveCall() throws HiveMetaStoreException;

@Override
public Void call() throws Exception {
try {
hiveCall();
} catch (HiveMetaStoreException e) {
log.error("HiveMetaStoreException", e);
}
return null;
}
}

private void createHiveTable() {
Future<Void> future = executorService.submit(new Callable<Void>() {
Future<Void> future = executorService.submit(new HiveCall() {
@Override
public Void call() throws HiveMetaStoreException {
public void hiveCall() throws HiveMetaStoreException {
hive.createTable(hiveDatabase, tp.topic(), currentSchema, partitioner);
return null;
}
});
hiveUpdateFutures.add(future);
}

private void alterHiveSchema() {
Future<Void> future = executorService.submit(new Callable<Void>() {
Future<Void> future = executorService.submit(new HiveCall() {
@Override
public Void call() throws HiveMetaStoreException {
public void hiveCall() throws HiveMetaStoreException {
hive.alterSchema(hiveDatabase, tp.topic(), currentSchema);
return null;
}
});
hiveUpdateFutures.add(future);
}

private void addHivePartition(final String location) {
Future<Void> future = executorService.submit(new Callable<Void>() {
Future<Void> future = executorService.submit(new HiveCall() {
@Override
public Void call() throws Exception {
public void hiveCall() throws HiveMetaStoreException {
hiveMetaStore.addPartition(hiveDatabase, tp.topic(), location);
return null;
}
});
hiveUpdateFutures.add(future);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public RecordWriter<SinkRecord> getRecordWriter(

Path path = new Path(fileName);
final ParquetWriter<GenericRecord> writer =
new AvroParquetWriter<>(path, avroSchema, compressionCodecName, blockSize, pageSize);
new AvroParquetWriter<>(path, avroSchema, compressionCodecName, blockSize, pageSize, true, conf);

return new RecordWriter<SinkRecord>() {
@Override
Expand Down

0 comments on commit 6db48d2

Please sign in to comment.