Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ private[kafka010] class KafkaOffsetReader(
partitions.asScala.toSet
}

def fetchOffsetsByTime(times: Map[TopicPartition, Long]):
Map[TopicPartition, Option[Long]] = runUninterruptibly {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])

consumer.offsetsForTimes(times.map{case (k, v) => k -> long2Long(v)}.asJava)
.asScala.map{case (k, v) =>
k -> (if (v != null) Some(Long2long(v.offset())) else None)
}.toMap
}

/**
* Resolves the specific offsets based on Kafka seek positions.
* This method resolves offset value -1 to the latest and -2 to the
Expand Down Expand Up @@ -394,6 +404,8 @@ private[kafka010] class KafkaOffsetReader(
}

private[kafka010] object KafkaOffsetReader {
// offsets are not instances of Optional, we need special state for None
val EMPTY_OFFSET: Long = -100L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow why this is needed. Normally, if fetchOffsetsByTime sometimes needs to return offsets and sometimes needs to return None, it should just return Option[Offset] values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree with you (this is what the comment is trying to express). I tried to go with least invasive changes, but since you've pointed this out, I can change the offset map everywhere from:
Map[TopicPartition, Long] to Map[TopicPartition, Option[Long]]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to be an Option[Long] everywhere. Just until we decide what value we want to pass to Kafka.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Option[Long] where it was necessary


def kafkaSchema: StructType = StructType(Seq(
StructField("key", BinaryType),
Expand Down
Loading