Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,33 +172,35 @@ public KafkaOffsetGen(TypedProperties props) {
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {

// Obtain current metadata for the topic
KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
List<PartitionInfo> partitionInfoList;
partitionInfoList = consumer.partitionsFor(topicName);
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());

// Determine the offset ranges to read from
Map<TopicPartition, Long> fromOffsets;
if (lastCheckpointStr.isPresent()) {
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
case EARLIEST:
fromOffsets = consumer.beginningOffsets(topicPartitions);
break;
case LATEST:
fromOffsets = consumer.endOffsets(topicPartitions);
break;
default:
throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
Map<TopicPartition, Long> toOffsets;
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
List<PartitionInfo> partitionInfoList;
partitionInfoList = consumer.partitionsFor(topicName);
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());

// Determine the offset ranges to read from
if (lastCheckpointStr.isPresent()) {
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
case EARLIEST:
fromOffsets = consumer.beginningOffsets(topicPartitions);
break;
case LATEST:
fromOffsets = consumer.endOffsets(topicPartitions);
break;
default:
throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
}
}
}

// Obtain the latest offsets.
Map<TopicPartition, Long> toOffsets = consumer.endOffsets(topicPartitions);
// Obtain the latest offsets.
toOffsets = consumer.endOffsets(topicPartitions);
}

// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
Expand Down