Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,16 @@ public KafkaFilteringResult getKafkaFilterResult(
if (offsetTimestampRanged.isPresent()) {
try (KafkaConsumer<byte[], byte[]> kafkaConsumer = consumerFactory.create(session)) {
Optional<Range> finalOffsetTimestampRanged = offsetTimestampRanged;
partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets,
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, finalOffsetTimestampRanged.get().getBegin()));
// filter negative value to avoid java.lang.IllegalArgumentException when using KafkaConsumer offsetsForTimes
if (offsetTimestampRanged.get().getBegin() > INVALID_KAFKA_RANGE_INDEX) {
Comment thread
ebyhr marked this conversation as resolved.
Outdated
partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets,
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, finalOffsetTimestampRanged.get().getBegin()));
}
if (isTimestampUpperBoundPushdownEnabled(session, kafkaTableHandle.getTopicName())) {
partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets,
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, finalOffsetTimestampRanged.get().getEnd()));
if (offsetTimestampRanged.get().getEnd() > INVALID_KAFKA_RANGE_INDEX) {
partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets,
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, finalOffsetTimestampRanged.get().getEnd()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static io.trino.testing.assertions.Assert.assertEventually;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;

@Test(singleThreaded = true)
Expand Down Expand Up @@ -101,70 +100,62 @@ public void testPartitionPushDown()
public void testOffsetPushDown()
{
createMessages(topicNameOffset);
assertProcessedInputPossitions(format("SELECT count(*) FROM default.%s WHERE _partition_offset between 2 and 10", topicNameOffset), 18);
assertProcessedInputPossitions(format("SELECT count(*) FROM default.%s WHERE _partition_offset > 2 and _partition_offset < 10", topicNameOffset), 14);
assertProcessedInputPossitions(format("SELECT count(*) FROM default.%s WHERE _partition_offset = 3", topicNameOffset), 2);
}

private void assertProcessedInputPossitions(String sql, long expectedProcessedInputPositions)
{
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
assertEventually(() -> {
MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql);
assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions(), expectedProcessedInputPositions);
});
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset between 2 and 10", topicNameOffset), 18);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset > 2 and _partition_offset < 10", topicNameOffset), 14);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset = 3", topicNameOffset), 2);
}

@Test
public void testTimestampCreateTimeModePushDown()
throws Exception
{
RecordMessage recordMessage = createTimestampTestMessages(topicNameCreateTime);
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
// ">= startTime" insure including index 2, "< endTime" insure excluding index 4;
String sql = format(
"SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'",
topicNameCreateTime,
recordMessage.getStartTime(),
recordMessage.getEndTime());

// timestamp_upper_bound_force_push_down_enabled default as false.
assertEventually(() -> {
MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql);
assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions())
.isEqualTo(998);
});
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp < timestamp '%s'", topicNameCreateTime, recordMessage.getEndTime()), 1000);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp <= timestamp '%s'", topicNameCreateTime, recordMessage.getEndTime()), 1000);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp > timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime()), 997);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime()), 998);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp between timestamp '%s' and timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime(), recordMessage.getEndTime()), 998);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime(), recordMessage.getEndTime()), 998);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp = timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime()), 998);

// timestamp_upper_bound_force_push_down_enabled set as true.
assertEventually(() -> {
// timestamp_upper_bound_force_push_down_enabled set as true.
Session sessionWithUpperBoundPushDownEnabled = Session.builder(getSession())
.setSystemProperty("kafka.timestamp_upper_bound_force_push_down_enabled", "true")
.build();

MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(sessionWithUpperBoundPushDownEnabled, sql);
assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions())
.isEqualTo(2);
});
Session sessionWithUpperBoundPushDownEnabled = Session.builder(getSession())
.setSystemProperty("kafka.timestamp_upper_bound_force_push_down_enabled", "true")
.build();
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp < timestamp '%s'", topicNameCreateTime, recordMessage.getEndTime()), 4, sessionWithUpperBoundPushDownEnabled);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp <= timestamp '%s'", topicNameCreateTime, recordMessage.getEndTime()), 4, sessionWithUpperBoundPushDownEnabled);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp > timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime()), 997, sessionWithUpperBoundPushDownEnabled);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime()), 998, sessionWithUpperBoundPushDownEnabled);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp between timestamp '%s' and timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime(), recordMessage.getEndTime()), 2, sessionWithUpperBoundPushDownEnabled);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime(), recordMessage.getEndTime()), 2, sessionWithUpperBoundPushDownEnabled);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp = timestamp '%s'", topicNameCreateTime, recordMessage.getStartTime()), 0, sessionWithUpperBoundPushDownEnabled);
}

@Test
public void testTimestampLogAppendModePushDown()
throws Exception
{
RecordMessage recordMessage = createTimestampTestMessages(topicNameLogAppend);
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
// ">= startTime" insure including index 2, "< endTime" insure excluding index 4;
String sql = format(
"SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'",
topicNameLogAppend,
recordMessage.getStartTime(),
recordMessage.getEndTime());
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp < timestamp '%s'", topicNameLogAppend, recordMessage.getEndTime()), 4);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp <= timestamp '%s'", topicNameLogAppend, recordMessage.getEndTime()), 4);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp > timestamp '%s'", topicNameLogAppend, recordMessage.getStartTime()), 997);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s'", topicNameLogAppend, recordMessage.getStartTime()), 998);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp between timestamp '%s' and timestamp '%s'", topicNameLogAppend, recordMessage.getStartTime(), recordMessage.getEndTime()), 2);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", topicNameLogAppend, recordMessage.getStartTime(), recordMessage.getEndTime()), 2);
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp = timestamp '%s'", topicNameLogAppend, recordMessage.getStartTime()), 0);
}

private void assertProcessedInputPositions(String sql, long expectedProcessedInputPositions)
{
assertProcessedInputPositions(sql, expectedProcessedInputPositions, getSession());
}

private void assertProcessedInputPositions(String sql, long expectedProcessedInputPositions, Session session)
{
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
assertEventually(() -> {
MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql);
assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions())
.isEqualTo(2);
MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(session, sql);
assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions(), expectedProcessedInputPositions);
Comment thread
ebyhr marked this conversation as resolved.
Outdated
});
}

Expand Down