Skip to content

Conversation

@tomasbartalos
Copy link

@tomasbartalos tomasbartalos commented Feb 8, 2019

What changes were proposed in this pull request?

This patch introduces timestamp pushdown to Kafka relation to significantly speed up queries filtered by timestamp.
Before this PR, queries filtered by timestamp were performed as full-topic scans and filtering was handled in spark memory. For production topics with few days of history such query couldn't complete in a reasonable time and clients had to rely on manual offset computation.
Moreover offset filters have to be specified during dataframe initialization (or table creation when using thrift) as Datasource's startingOffsets/endingOffsets option, which makes offset changes inflexible.
Timestamp pushdown is a flexible solution which reuses existing datasources/tables and makes it possible to create dynamic queries or even live views.
As an example following SQL commands will create a live view displaying last 10 minutes of data stored in Kafka:

CREATE TABLE kafka_source USING kafka OPTIONS (kafka.bootstrap.servers '$urls', subscribe '$topic');
CREATE or replace VIEW kafka_5_min as 
    select value, timestamp from kafka_source 
    where timestamp > cast(from_unixtime(unix_timestamp() - 10 * 60, "YYYY-MM-dd HH:mm:ss") as TIMESTAMP);`

PR deals with SQL queries it doesn't handle streaming queries.

Technical notes

Technically KafkaRelation's parent was changed from TableScan to PrunedFilteredScan allowing to use filter conditions and required columns projection.
Operating only on required columns should result in lower memory pressure and slightly better performance.
Filtering leverages KafkaConsumer.offsetsForTimes() method to compute offset ranges for filtered timestamps. Resulting offset ranges are merged with existing range filters specified as Datasource options.

There is another PR #23747 related to timestamp filter on Kafka table which handles timestamps as dataframe/table options during dataframe creation.
In practice 2 PRs supplement each other and they should be able to work together.

Restrictions

There is one technical restriction when using equals operator. Equals will not find element if it has the latest timestamp in its partition.
Example with timestamp in milliseconds (ts):

Topic with partitions:
partition 0 : [ (k1, v1, ts: 1001), (k4, v4, ts: 1002), (k7, v7, ts: 1004) ]
partition 1 : [ (k2, v2, ts: 1001), (k5, v5, ts: 1002), (k8, v8, ts: 1004) ]
partition 2 : [ (k3, v3, ts: 1001), (k6, v6, ts: 1002), (k9, v9, ts: 1004) ]

-- OK, result [(k4, v4, ts: 1002), (k5, v5, ts: 1002), (k6, v6, ts: 1002))
select * from kafka_table where timestamp = 1002
-- Not OK, result is empty
select * from kafka_table where timestamp = 1004
-- OK, result [(k1, v1, ts: 1001), (k2, v2, ts: 1001), (k3, v3, ts: 1001))
select * from kafka_table where timestamp = 1001

This situation is explained in unit test timestamp pushdown on unevenly distributed partitions.
In real world scenario with live traffic it is extremely unlikely to run into this scenario, however its still worth documenting.
Queries with OR condition (timestamp > a or timestamp < b) will not use pushdown, however they will return correct results.

How was this patch tested?

  • Manual tests with queries to thrift JDBC/ODBC server connected to Kafka
  • Unit tests in KafkaRelationSuite class

@tomasbartalos tomasbartalos changed the title [WIP][SPARK-26841][SQL] [WIP][SPARK-26841][SQL] Kafka timestamp pushdown Feb 8, 2019
@HeartSaVioR
Copy link
Contributor

Looks like it's orthogonal to #23747 - the patch handles how to deal with offset options and filters, and #23747 will be included to offset options so may not need to special care of that even in this patch.

@tomasbartalos tomasbartalos force-pushed the kafka-timestamp-pushdown branch from cc192cf to 5bfdc9a Compare February 19, 2019 10:46
@tomasbartalos tomasbartalos force-pushed the kafka-timestamp-pushdown branch from 2d62004 to 21e2701 Compare February 20, 2019 11:44
@tomasbartalos tomasbartalos changed the title [WIP][SPARK-26841][SQL] Kafka timestamp pushdown [SPARK-26841][SQL] Kafka timestamp pushdown Feb 20, 2019
@tomasbartalos
Copy link
Author

Hello Guys, can anyone approve running the test build, so I can see if new unit tests are passing ?

@arunmahadevan
Copy link
Contributor

arunmahadevan commented Mar 15, 2019

IMO, we could adopt the more generic solution. If its just timestamp based filtering of start and end offsets maybe the timestamp offset approach proposed in #23747 looks straightforward. However if there are more cases that the filter pushdown might be able to handle we should go with that. I am assuming the filter condition can appear anywhere in the query and get pushed down to filter the rows and if so looks more generic and avoids having to add extra options to the kafka source.

I am not sure providing two different options for timestamp based filtering is necessary. If we support both, the user can provide different values via the options and the filter and it gets very confusing.

@jose-torres
Copy link
Contributor

It looks like the most recent commit doesn't compile. I can approve the test build once it does.

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may change after comments are addressed, but I'm not sure the added complexity here is worth the benefit beyond just being able to specify a timestamp range in the source. Things that require deep knowledge of Kafka to understand are hard for Spark committers to maintain.


private[kafka010] object KafkaOffsetReader {
// offsets are not instances of Optional, we need special state for None
val EMPTY_OFFSET: Long = -100L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow why this is needed. Normally, if fetchOffsetsByTime sometimes needs to return offsets and sometimes needs to return None, it should just return Option[Offset] values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree with you (this is what the comment is trying to express). I tried to go with least invasive changes, but since you've pointed this out, I can change the offset map everywhere from:
Map[TopicPartition, Long] to Map[TopicPartition, Option[Long]]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to be an Option[Long] everywhere. Just until we decide what value we want to pass to Kafka.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Option[Long] where it was necessary

filters: Array[Filter]): Map[TopicPartition, Long] = {

val offsetsByLimit = getPartitionOffsetsByRangeLimit(kafkaReader, endingOffsets)
getEndingPartitionOffsetsByFilter(kafkaReader, offsetsByLimit, filters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the naming quite makes sense to me here. It's weird to have to apply multiple levels of "getPartitionOffsets" before obtaining the actual partition offsets that should be used.

Copy link
Author

@tomasbartalos tomasbartalos Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the method naming getPartitionOffsetsByRangeLimit is confusing.

To Kafka, we can pushdown only offsets, so what we need to do is to merge offsets specified as DS option (startingOffsets, endingOffsets) with offsets obtained from timestamp filter.

Example: DS option have offsets range of 100 - 200
startingOffsets '{"topic" : {"0" : 100}}', endingOffsets '{"topic" : {"0" : 200}}');
but timestamp pushdown where timestamp > x and timestamp < y have offset range of let's say 150 - 300

The merge result is highest of starting to lowest of ending = 150 - 200

I think getEndingPartitionOffsetsByFilter does what is says, but it's worth to rename the getPartitionOffsetsByRangeLimit. What do you think is a good name ?
Maybe: getPartitionOffsetsFromDSOption ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I see "getPartitionOffsetsByA" and "getPartitionOffsetsByB", I normally expect those to be two independent strategies A and B for getting the final offsets. If they have to be chained together in some specific order, I don't think these are appropriate method names; they should be renamed or inlined.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code dealing with offset calculation was extracted to separate class. Thus most of the methods could become single argument ones and the dependency between calculation strategies could be emphasised.

throw new IllegalStateException(s"$tp doesn't have a from offset")
}
var untilOffset = untilPartitionOffsets(tp)
untilOffset = if (areOffsetsInLine(fromOffset, untilOffset)) untilOffset else fromOffset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem safe. We should avoid generating nonsensical ranges in the first place, rather than generating them and then silently clamping them down.

Copy link
Author

@tomasbartalos tomasbartalos Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nonsensical ranges originate from wrong (contradictory) user queries (example: timestamp > 10 and timestamp < 10). Question is how you want to react to this kind of queries.

  1. If we don't handle them then user will see an error:
    You either provided an invalid fromOffset, or the Kafka topic has been damaged
  2. If we do handle them, user will get empty result set and no error.

I'm more fan of option 2), since this is how most of DB would react, but if you disagree I can delete the handling.
Maybe method naming areOffsetsInLine could be improved ?

If I uncomment the line
untilOffset = if (areOffsetsInLine(fromOffset, untilOffset)) untilOffset else fromOffset
Then 2 unit tests fails:

  • timestamp pushdown with contradictory condition - query like timestamp > 10 and timestamp < 10
  • timestamp pushdown out of offset range - this is for cases where DS option specifies offset range and the timestamp filter is valid but out of DS offset range.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose option 2 really is the only good choice here. But let's add a warning log for this case, saying what the original range was and what user predicates made us clamp it to empty set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Warning log message. This message will be printed for each partition that was clamped to empty set

sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schemaProjected).rdd
}

def invalidateEmptyOffsets(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method needs docs - I don't really understand what it's doing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First I'll elaborate why we need empty offset. When we request mapping of timestamp to Kafka offset, Kafka may return null for some partitions. This means that specific partition doesn't contain any record which timestamp is equal or greater to the given timestamp. I need to handle this situation and transform null to something meaningful (currently constant, will be changed to None as you've proposed).

The above situation may happen for calculated startingOffsets or endingOffsets. As a result we have to invalidate every partition which have startOffsets or endOffsets empty - set their offset range to (0,0).

Should I just add a comment to the method ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs

}
}

private def isLimitSpecified(offset: Long): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the connection between the name and implementation here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it may seem confusing. The thing is Kafka offset can have special flag LATEST = -1 EARLIEST = -2. The method is checking if the offset is bound by specific range and not unbound (latest, earliest). Honestly I had this renamed like 3 times and I'm still not satisfied, but I can't find something meaningful.
Maybe isNotLatestOrEarliest would be better ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed method name

cr.offset,
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
cr.timestampType.id)
val columns = requiredColumns.map{KafkaRelation.columnToValueExtractor(_)(cr)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is on the fast path. I don't think we should add 2 lambda invocations and a HashMap lookup to every row computation. I'd prefer just having the test harness create its own extractor if it needs one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you mean by

I'd prefer just having the test harness create its own extractor if it needs one

Could you please elaborate more on that ?

The relation type was changed from TableScan to PrunedFilteredScan and so I need to return only required columns (not all columns as before), thus extracting only specific fields from ConsumerRecord.

I was able to come up with alternative solution:

  class ConsumerRecordInspector(cr: ConsumerRecord[Array[Byte], Array[Byte]]) {
    def getValues(requiredColumns: List[Any]) : Seq[Any] = {
      requiredColumns match {
        case "key"::rest => cr.key +: getValues(rest)
        case "value"::rest => cr.value +: getValues(rest)
        case "topic"::rest => UTF8String.fromString(cr.topic) +: getValues(rest)
        case "partition"::rest => cr.partition +: getValues(rest)
        case "offset"::rest => cr.offset +: getValues(rest)
        case "timestamp"::rest => DateTimeUtils.
          fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)) +: getValues(rest)
        case "timestampType"::rest => cr.timestampType.id +: getValues(rest)
        case Seq() => Seq.empty
      }
    }
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lambda invocation replaced with recursion.
Single-core (i7) performance test on my laptop for 1 billion records:

  • recursion = 2m 38s
  • lambdas = 3m 47s

@tomasbartalos
Copy link
Author

IMO, we could adopt the more generic solution. If its just timestamp based filtering of start and end offsets maybe the timestamp offset approach proposed in #23747 looks straightforward. However if there are more cases that the filter pushdown might be able to handle we should go with that. I am assuming the filter condition can appear anywhere in the query and get pushed down to filter the rows and if so looks more generic and avoids having to add extra options to the kafka source.

I am not sure providing two different options for timestamp based filtering is necessary. If we support both, the user can provide different values via the options and the filter and it gets very confusing.

@arunmahadevan, IMHO, I would divide this to 2 use cases:

Our specific use-case (from which this PR emerged) is to see the last 30 minutes of Kafka. With having a timestamp pushdown and a dynamic view:

CREATE or replace VIEW kafka_30_min as 
    select value, timestamp from kafka_source 
    where timestamp > cast(from_unixtime(unix_timestamp() - 10 * 60, "YYYY-MM-dd HH:mm:ss") as TIMESTAMP);`

I can repeatedly query the same view and always get up-to date last 30 minutes of Kafka.

@tomasbartalos
Copy link
Author

This may change after comments are addressed, but I'm not sure the added complexity here is worth the benefit beyond just being able to specify a timestamp range in the source. Things that require deep knowledge of Kafka to understand are hard for Spark committers to maintain.

@jose-torres, I wish there would be an easier way to implement it. In the end it's just about the possibility to pushdown the timestamp filter and implementation wise about mapping timestamp to kafka offsets. I know its implemented in Hive 4 (maybe also in 3 - I didn't check that one) but I'd love to have it in Spark.
I'll try to address your concerns and polish the solution.

@tomasbartalos
Copy link
Author

It looks like the most recent commit doesn't compile. I can approve the test build once it does.

The branch should compile now.

EMPTY_OFFSET variable replaced with None pattern
Renamed methods dealing with partitionOffset calculation
Emphasized dependencies between methods for offset calculation
Added warning log when offsets are clamped down to empty set
Added Comment for method invalidating Empty offsets
Changed extracting of required fields from lambdas to recursion
Refactored offset Calculation code to separate classes
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@tomasbartalos
Copy link
Author

Hello, can anyone review this patch ? @jose-torres did initial review and all the requirements have been solved.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Oct 1, 2019

I could help reviewing this, but as I'm not a committer you may still want to have attention from committers for this PR.

First of all, you may want to rebase with latest master, and now you have some utils to leverage here. For example, KafkaOffsetReader.fetchSpecificTimestampBasedOffsets is available when getting offset by timestamp, and KafkaTestUtils now supports sending message with timestamp. I guess it would reduce bunch of code lines here, making reviewers easier to review.

Next, It feels the "Restriction" described above sounds a "correctness" issue instead of a restriction. Actually I don't see why it provides such result as it doesn't meet with javadoc in Kafka API.

https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-

Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

Here it describes the case of "equal".

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 10, 2020
@HeartSaVioR
Copy link
Contributor

As I commented in #25911 , this is the ideal way to go for dealing with use cases on Kafka batch query (especially interactive query). Unlike other sources, in most cases we can't leverage partition pruning from Kafka, and timestamp is the only one indexed by Kafka. Offset is definitely indexed, but would the value have any meaning in business logic?

This patch adds lots of lines (which we tend to say "complexity" whereas I'm not sure I agree) but now it can be reduced as #23747 added similar functionality and now it's only the matter of pushdown. I'd take this over if we agree about the value.

@tomasbartalos
Copy link
Author

I'm happy to make any changes and proposals in @HeartSaVioR's comments sound reasonable, however I see no interest from Spark committers to push this forward. @jose-torres started a review and I tried to address all his comments but long time has passed without any response. I think we should first receive approval from Spark community that they agree on benefit / "added complexity" ratio and they're willing to spend some time on this.

@github-actions github-actions bot closed this Jan 11, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants