Skip to content

Commit

Permalink
Fix test failures and use new APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Apr 18, 2016
1 parent e34d224 commit b2b1c61
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 46 deletions.
8 changes: 4 additions & 4 deletions src/main/java/io/confluent/connect/hdfs/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public void syncWithHive() throws ConnectException {
}
}

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
public void open(Collection<TopicPartition> partitions) {
assignment = new HashSet<>(partitions);
for (TopicPartition tp: assignment) {
TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
Expand All @@ -282,12 +282,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
}

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
public void close(Collection<TopicPartition> partitions) {
// Close any writers we have. We may get assigned the same partitions and end up duplicating
// some effort since we'll have to reprocess those messages. It may be possible to hold on to
// the TopicPartitionWriter and continue to use the temp file, but this can get significantly
// more complex due to potential failures and network partitions. For example, we may get
// this onPartitionsRevoked, then miss a few generations of group membership, during which
// this close, then miss a few generations of group membership, during which
// data may have continued to be processed and we'd have to restart from the recovery stage,
// make sure we apply the WAL, and only reuse the temp file if the starting offset is still
// valid. For now, we prefer the simpler solution that may result in a bit of wasted effort.
Expand All @@ -302,7 +302,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
}

public void close() {
public void stop() {
if (executorService != null) {
boolean terminated = false;
try {
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public String version() {

@Override
public void start(Map<String, String> props) {
Set<TopicPartition> assignment = context.assignment();;
try {
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
boolean hiveIntegration = connectorConfig.getBoolean(HdfsSinkConnectorConfig.HIVE_INTEGRATION_CONFIG);
Expand All @@ -61,7 +62,6 @@ public void start(Map<String, String> props) {
int schemaCacheSize = connectorConfig.getInt(HdfsSinkConnectorConfig.SCHEMA_CACHE_SIZE_CONFIG);
avroData = new AvroData(schemaCacheSize);
hdfsWriter = new DataWriter(connectorConfig, context, avroData);
Set<TopicPartition> assignment = context.assignment();
recover(assignment);
if (hiveIntegration) {
syncWithHive();
Expand All @@ -72,15 +72,16 @@ public void start(Map<String, String> props) {
log.info("Couldn't start HdfsSinkConnector:", e);
log.info("Shutting down HdfsSinkConnector.");
if (hdfsWriter != null) {
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();
}
}
}

@Override
public void stop() throws ConnectException {
if (hdfsWriter != null) {
hdfsWriter.close();
hdfsWriter.stop();
}
}

Expand All @@ -99,13 +100,13 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
hdfsWriter.onPartitionsAssigned(partitions);
public void open(Collection<TopicPartition> partitions) {
hdfsWriter.open(partitions);
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
hdfsWriter.onPartitionsRevoked(partitions);
public void close(Collection<TopicPartition> partitions) {
hdfsWriter.close(partitions);
}

private void recover(Set<TopicPartition> assignment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void testCommitFailure() throws Exception {
content = data.get(logFile);
assertEquals(6, content.size());

hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();
}

@Test
Expand Down Expand Up @@ -162,7 +163,8 @@ public void testWriterFailureMultiPartitions() throws Exception {
}

hdfsWriter.write(new ArrayList<SinkRecord>());
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();
}

@Test
Expand Down Expand Up @@ -224,6 +226,7 @@ public void testWriterFailure() throws Exception {
}

hdfsWriter.write(new ArrayList<SinkRecord>());
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,10 @@ private void prepareData(String topic, int partition) throws Exception {
new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, schema, record, offset);
sinkRecords.add(sinkRecord);
}
hdfsWriter.write(sinkRecords);

hdfsWriter.close();
hdfsWriter.write(sinkRecords);
hdfsWriter.close(assignment);
hdfsWriter.stop();
}

private DataWriter createWriter(SinkTaskContext context, AvroData avroData){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -71,7 +70,8 @@ public void testWriteRecord() throws Exception {
sinkRecords.add(sinkRecord);
}
hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

String encodedPartition = "partition=" + String.valueOf(PARTITION);
String directory = partitioner.generatePartitionedPath(TOPIC, encodedPartition);
Expand Down Expand Up @@ -140,7 +140,8 @@ public void testRecovery() throws Exception {
new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 50 + i));

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

committedFiles.add(FileUtils.committedFileName(url, topicsDir, directory, TOPIC_PARTITION,
50, 52, extension, ZERO_PAD_FMT));
Expand Down Expand Up @@ -172,7 +173,8 @@ public void testWriteRecordMultiplePartitions() throws Exception {
}
}
hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

// Last file (offset 6) doesn't satisfy size requirement and gets discarded on close
long[] validOffsets = {-1, 2, 5};
Expand Down Expand Up @@ -221,7 +223,8 @@ public void testGetPreviousOffsets() throws Exception {
long previousOffset = committedOffsets.get(TOPIC_PARTITION);
assertEquals(previousOffset, 6L);

hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();
}

@Test
Expand All @@ -242,7 +245,8 @@ public void testWriteRecordNonZeroInitailOffset() throws Exception {
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

String directory = partitioner.generatePartitionedPath(TOPIC, "partition=" + String.valueOf(PARTITION));

Expand Down Expand Up @@ -290,9 +294,10 @@ public void testRebalance() throws Exception {
Set<TopicPartition> newAssignment = new HashSet<>();
newAssignment.add(TOPIC_PARTITION);
newAssignment.add(TOPIC_PARTITION3);
hdfsWriter.onPartitionsRevoked(assignment);
hdfsWriter.close(assignment);

assignment = newAssignment;
hdfsWriter.onPartitionsAssigned(newAssignment);
hdfsWriter.open(newAssignment);

assertEquals(null, hdfsWriter.getBucketWriter(TOPIC_PARTITION2));
assertNotNull(hdfsWriter.getBucketWriter(TOPIC_PARTITION));
Expand Down Expand Up @@ -326,7 +331,8 @@ public void testRebalance() throws Exception {
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

// Last file (offset 9) doesn't satisfy size requirement and gets discarded on close
long[] validOffsetsTopicPartition1 = {5, 8};
Expand Down Expand Up @@ -386,7 +392,8 @@ public void testProjectBackWard() throws Exception {
sinkRecords.add(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1L));

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

String DIRECTORY = TOPIC + "/" + "partition=" + String.valueOf(PARTITION);
Path path = new Path(FileUtils.committedFileName(url, topicsDir, DIRECTORY, TOPIC_PARTITION,
Expand All @@ -408,7 +415,8 @@ public void testProjectBackWard() throws Exception {
newRecord, 3L));

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

path = new Path(FileUtils.committedFileName(url, topicsDir, DIRECTORY, TOPIC_PARTITION, 2L,
3L, extension, ZERO_PAD_FMT));
Expand Down Expand Up @@ -442,7 +450,8 @@ public void testProjectNone() throws Exception {
sinkRecords.add(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 2L));

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

String DIRECTORY = TOPIC + "/" + "partition=" + String.valueOf(PARTITION);
Path path = new Path(FileUtils.committedFileName(url, topicsDir, DIRECTORY, TOPIC_PARTITION,
Expand All @@ -467,7 +476,8 @@ public void testProjectNone() throws Exception {
sinkRecords.add(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, newSchema, newRecord, 4L));

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

path = new Path(FileUtils.committedFileName(url, topicsDir, DIRECTORY, TOPIC_PARTITION, 3L,
4L, extension, ZERO_PAD_FMT));
Expand Down Expand Up @@ -502,7 +512,8 @@ public void testProjectForward() throws Exception {
sinkRecords.add(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 2L));

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

String DIRECTORY = TOPIC + "/" + "partition=" + String.valueOf(PARTITION);
Path path = new Path(FileUtils.committedFileName(url, topicsDir, DIRECTORY, TOPIC_PARTITION,
Expand All @@ -526,7 +537,8 @@ public void testProjectForward() throws Exception {
sinkRecords.add(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, newSchema, newRecord, 4L));

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

path = new Path(FileUtils.committedFileName(url, topicsDir, DIRECTORY, TOPIC_PARTITION, 3L,
4L, extension, ZERO_PAD_FMT));
Expand Down Expand Up @@ -575,8 +587,8 @@ public void testProjectNoVersion() throws Exception {
} catch (RuntimeException e) {
// expected
}

hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void testSyncWithHiveAvro() throws Exception {
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

Map<String, String> props = createProps();
props.put(HdfsSinkConnectorConfig.HIVE_INTEGRATION_CONFIG, "true");
Expand Down Expand Up @@ -96,7 +97,8 @@ public void testSyncWithHiveAvro() throws Exception {

assertEquals(expectedPartitions, partitions);

hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();
}

@Test
Expand All @@ -119,8 +121,10 @@ public void testHiveIntegrationAvro() throws Exception {

sinkRecords.add(sinkRecord);
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

Table table = hiveMetaStore.getTable(hiveDatabase, TOPIC);
List<String> expectedColumnNames = new ArrayList<>();
Expand Down Expand Up @@ -169,7 +173,8 @@ public void testHiveIntegrationFieldPartitionerAvro() throws Exception {
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

Table table = hiveMetaStore.getTable(hiveDatabase, TOPIC);

Expand Down Expand Up @@ -244,7 +249,8 @@ public void testHiveIntegrationTimeBasedPartitionerAvro() throws Exception {
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

Table table = hiveMetaStore.getTable(hiveDatabase, TOPIC);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void testWriteRecord() throws Exception {
sinkRecords.add(sinkRecord);
}
hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

String encodedPartition = "partition=" + String.valueOf(PARTITION);
String directory = partitioner.generatePartitionedPath(TOPIC, encodedPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void testSyncWithHiveParquet() throws Exception {
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

props = createProps();
props.put(HdfsSinkConnectorConfig.HIVE_INTEGRATION_CONFIG, "true");
Expand Down Expand Up @@ -99,7 +100,8 @@ public void testSyncWithHiveParquet() throws Exception {

assertEquals(expectedPartitions, partitions);

hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();
}

@Test
Expand All @@ -123,7 +125,8 @@ public void testHiveIntegrationParquet() throws Exception {
sinkRecords.add(sinkRecord);
}
hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

Table table = hiveMetaStore.getTable(hiveDatabase, TOPIC);
List<String> expectedColumnNames = new ArrayList<>();
Expand Down Expand Up @@ -172,7 +175,8 @@ public void testHiveIntegrationFieldPartitionerParquet() throws Exception {
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

Table table = hiveMetaStore.getTable(hiveDatabase, TOPIC);

Expand Down Expand Up @@ -247,7 +251,8 @@ public void testHiveIntegrationTimeBasedPartitionerParquet() throws Exception {
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close();
hdfsWriter.close(assignment);
hdfsWriter.stop();

Table table = hiveMetaStore.getTable(hiveDatabase, TOPIC);

Expand Down
Loading

0 comments on commit b2b1c61

Please sign in to comment.