Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@

import javax.inject.Inject;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.List;
import java.util.stream.Collectors;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
Expand All @@ -48,6 +51,7 @@
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static com.google.common.net.MediaType.JSON_UTF_8;
import static java.lang.String.format;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.Objects.requireNonNull;

public class DruidClient
Expand Down Expand Up @@ -175,6 +179,10 @@ public InputStream handleException(Request request, Exception exception)
public InputStream handle(Request request, com.facebook.airlift.http.client.Response response)
{
try {
if (response.getStatusCode() != HTTP_OK) {
String result = new BufferedReader(new InputStreamReader(response.getInputStream())).lines().collect(Collectors.joining("\n"));
throw new PrestoException(DRUID_BROKER_RESULT_ERROR, result);
}
if (APPLICATION_JSON.equals(response.getHeader(CONTENT_TYPE))) {
return response.getInputStream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.facebook.presto.druid.DruidErrorCode.DRUID_PUSHDOWN_UNSUPPORTED_EXPRESSION;
import static com.facebook.presto.druid.DruidErrorCode.DRUID_QUERY_GENERATOR_FAILURE;
import static com.facebook.presto.druid.DruidPushdownUtils.DRUID_COUNT_DISTINCT_FUNCTION_NAME;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -104,7 +105,10 @@ private DruidQueryGeneratorContext(

public DruidQueryGeneratorContext withFilter(String filter)
{
checkState(!hasFilter(), "Druid doesn't support filters at multiple levels");
if (hasAggregation()) {
throw new PrestoException(DRUID_PUSHDOWN_UNSUPPORTED_EXPRESSION, "Druid does not support filter on top of AggregationNode.");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think better not through PrestoException here. Could we do:
checkState(!hasAggregation && !hasFilter, ...)

Copy link
Copy Markdown
Contributor Author

@weidongduan37 weidongduan37 Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw PrestoException here is intended here I think. checkState will make query failed directly. if query are like:
select a , sum(b) from druid_table where c = 'xxx' group by 1 having sum(b) > 0;
Throw PrestoException will help make the filter sum(b) > 0(on top of aggregation node) executed in presto(not in druid). The reason not push down such filter right now is because we could not make sure all filters on top of aggregation node could be pushed down. Following code will catch this exception and make queries work(DruidQueryGenerator.java):

    public Optional<DruidQueryGeneratorResult> generate(PlanNode plan, ConnectorSession session)
    {
        try {
            DruidQueryGeneratorContext context = requireNonNull(plan.accept(
                    new DruidQueryPlanVisitor(session),
                    new DruidQueryGeneratorContext()),
                    "Resulting context is null");
            return Optional.of(new DruidQueryGeneratorResult(context.toQuery(), context));
        }
        catch (PrestoException e) {
            log.debug(e, "Possibly benign error when pushing plan into scan node %s", plan);
            return Optional.empty();
        }
    }

Copy link
Copy Markdown
Contributor Author

@weidongduan37 weidongduan37 Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After check #14952 changes, I think we are trying to solve different cases. His code focuses on time with timezone. I think our code is compatible.
By the way, the 1st and 3rd example he gave could be pushed down by original code. His change may be helpful for the 2nd example.
First:
select * from druid.wikipedia where __time > timestamp '2016-06-26 18:00:00.000'
Second:
select * from druid.wikipedia where __time > timestamp '2016-06-26 19:00:00.000 UTC';
Third:
select * from druid.wikipedia where __time > CAST('2016-06-26 18:00:00.000' as TIMESTAMP);

And the pushed down dql will be as following by original code:
first:
select col1,col2,.. from druid.wikipedia where __time > '2016-06-26 18:00:00.000'
third:
select col1, col2,.. from druid.wikipedia where __time > '2016-06-26 18:00:00.000'

}
checkState(!hasFilter(), "Druid doesn't support filters at multiple levels under AggregationNode");
return new DruidQueryGeneratorContext(
selections,
from,
Expand Down