Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 12 additions & 25 deletions presto-docs/src/main/sphinx/connector/druid.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ Configuration Properties

The following configuration properties are available:

=================================================== ============================================================
Property Name Description
=================================================== ============================================================
``druid.coordinator-url`` Druid coordinator url.
``druid.broker-url`` Druid broker url.
``druid.schema-name`` Druid schema name.
``druid.compute-pushdown-enabled`` Whether to pushdown all query processing to Druid.
``druid.case-insensitive-name-matching`` Match dataset and table names case-insensitively.
``druid.case-insensitive-name-matching.cache-ttl`` Duration for which remote dataset and table names will be
cached. Set to ``0ms`` to disable the cache
=================================================== ============================================================
================================== ===================================================
Property Name Description
================================== ===================================================
``druid.coordinator-url`` Druid coordinator url.
``druid.broker-url`` Druid broker url.
``druid.schema-name`` Druid schema name.
``druid.compute-pushdown-enabled`` Whether to pushdown all query processing to Druid.
``case-sensitive-name-matching`` Enable case-sensitive identifier support for schema,
table, and column names for the connector. When disabled,
names are matched case-insensitively using lowercase
normalization. Default is ``false``.
================================== ===================================================

``druid.coordinator-url``
^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -68,20 +69,6 @@ Whether to pushdown all query processing to Druid.

the default is ``false``.

``druid.case-insensitive-name-matching``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Match dataset and table names case-insensitively.

The default is ``false``.

``druid.case-insensitive-name-matching.cache-ttl``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Duration for which remote dataset and table names will be cached. Set to ``0ms`` to disable the cache.

The default is ``1m``.

Data Types
----------

Expand Down
5 changes: 5 additions & 0 deletions presto-druid/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,24 @@
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.ResponseHandler;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.units.Duration;
import com.facebook.presto.druid.ingestion.DruidIngestTask;
import com.facebook.presto.druid.metadata.DruidColumnInfo;
import com.facebook.presto.druid.metadata.DruidSegmentIdWrapper;
import com.facebook.presto.druid.metadata.DruidSegmentInfo;
import com.facebook.presto.druid.metadata.DruidTableInfo;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.CharMatcher;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;

import javax.inject.Inject;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
Expand All @@ -56,19 +45,14 @@
import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.airlift.json.JsonCodec.listJsonCodec;
import static com.facebook.presto.druid.DruidErrorCode.DRUID_AMBIGUOUS_OBJECT_NAME;
import static com.facebook.presto.druid.DruidErrorCode.DRUID_BROKER_RESULT_ERROR;
import static com.facebook.presto.druid.DruidResultFormat.OBJECT;
import static com.facebook.presto.druid.DruidResultFormat.OBJECT_LINES;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static java.lang.String.format;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class DruidClient
{
Expand All @@ -89,8 +73,6 @@ public class DruidClient
private final URI druidCoordinator;
private final URI druidBroker;
private final String druidSchema;
protected final boolean caseInsensitiveNameMatching;
private final Cache<SchemaTableName, Optional<RemoteTableObject>> remoteTables;

@Inject
public DruidClient(DruidConfig config, @ForDruidClient HttpClient httpClient)
Expand All @@ -100,44 +82,6 @@ public DruidClient(DruidConfig config, @ForDruidClient HttpClient httpClient)
this.druidCoordinator = URI.create(config.getDruidCoordinatorUrl());
this.druidBroker = URI.create(config.getDruidBrokerUrl());
this.druidSchema = config.getDruidSchema();
this.caseInsensitiveNameMatching = config.isCaseInsensitiveNameMatching();

Duration caseInsensitiveNameMatchingCacheTtl = requireNonNull(config.getCaseInsensitiveNameMatchingCacheTtl(), "caseInsensitiveNameMatchingCacheTtl is null");
CacheBuilder<Object, Object> remoteTableNamesCacheBuilder = CacheBuilder.newBuilder()
.expireAfterWrite(caseInsensitiveNameMatchingCacheTtl.toMillis(), MILLISECONDS);
this.remoteTables = remoteTableNamesCacheBuilder.build();
}

Optional<RemoteTableObject> toRemoteTable(SchemaTableName schemaTableName)
{
requireNonNull(schemaTableName, "schemaTableName is null");
verify(CharMatcher.forPredicate(Character::isUpperCase).matchesNoneOf(schemaTableName.getTableName()), "Expected table name from internal metadata to be lowercase: %s", schemaTableName);
if (!caseInsensitiveNameMatching) {
return Optional.of(RemoteTableObject.of(schemaTableName.getTableName()));
}

@Nullable Optional<RemoteTableObject> remoteTable = remoteTables.getIfPresent(schemaTableName);
if (remoteTable != null) {
return remoteTable;
}

// Cache miss, reload the cache
Map<SchemaTableName, Optional<RemoteTableObject>> mapping = new HashMap<>();
for (String table : getTables()) {
SchemaTableName cacheKey = new SchemaTableName(getSchema(), table);
mapping.merge(
cacheKey,
Optional.of(RemoteTableObject.of(table)),
(currentValue, collision) -> currentValue.map(current -> current.registerCollision(collision.get().getOnlyRemoteTableName())));
remoteTables.put(cacheKey, mapping.get(cacheKey));
}

// explicitly cache if the requested table doesn't exist
if (!mapping.containsKey(schemaTableName)) {
remoteTables.put(schemaTableName, Optional.empty());
}

return mapping.containsKey(schemaTableName) ? mapping.get(schemaTableName) : Optional.empty();
}

public URI getDruidBroker()
Expand Down Expand Up @@ -303,46 +247,4 @@ public String toJson()
return JsonCodec.jsonCodec(DruidRequestBody.class).toJson(this);
}
}

static final class RemoteTableObject
{
private final Set<String> remoteTableNames;

private RemoteTableObject(Set<String> remoteTableNames)
{
this.remoteTableNames = ImmutableSet.copyOf(remoteTableNames);
}

public static RemoteTableObject of(String remoteName)
{
return new RemoteTableObject(ImmutableSet.of(remoteName));
}

public RemoteTableObject registerCollision(String ambiguousName)
{
return new RemoteTableObject(ImmutableSet.<String>builderWithExpectedSize(remoteTableNames.size() + 1)
.addAll(remoteTableNames)
.add(ambiguousName)
.build());
}

public String getAnyRemoteTableName()
{
return Collections.min(remoteTableNames);
}

public String getOnlyRemoteTableName()
{
if (!isAmbiguous()) {
return getOnlyElement(remoteTableNames);
}

throw new PrestoException(DRUID_AMBIGUOUS_OBJECT_NAME, "Found ambiguous names in Druid when looking up '" + getAnyRemoteTableName().toLowerCase(ENGLISH) + "': " + remoteTableNames);
}

public boolean isAmbiguous()
{
return remoteTableNames.size() > 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.units.Duration;
import com.facebook.airlift.units.MinDuration;
import com.facebook.airlift.configuration.LegacyConfig;
import com.google.common.base.Splitter;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.ImmutableList;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

import static java.util.concurrent.TimeUnit.MINUTES;

public class DruidConfig
{
private String coordinatorUrl;
Expand All @@ -41,8 +39,7 @@ public class DruidConfig
private String basicAuthenticationUsername;
private String basicAuthenticationPassword;
private String ingestionStoragePath = StandardSystemProperty.JAVA_IO_TMPDIR.value();
private boolean caseInsensitiveNameMatching;
private Duration caseInsensitiveNameMatchingCacheTtl = new Duration(1, MINUTES);
private boolean caseSensitiveNameMatchingEnabled;

public enum DruidAuthenticationType
{
Expand Down Expand Up @@ -200,29 +197,18 @@ public DruidConfig setIngestionStoragePath(String ingestionStoragePath)
return this;
}

public boolean isCaseInsensitiveNameMatching()
{
return caseInsensitiveNameMatching;
}

@Config("druid.case-insensitive-name-matching")
public DruidConfig setCaseInsensitiveNameMatching(boolean caseInsensitiveNameMatching)
{
this.caseInsensitiveNameMatching = caseInsensitiveNameMatching;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getCaseInsensitiveNameMatchingCacheTtl()
public boolean isCaseSensitiveNameMatchingEnabled()
{
return caseInsensitiveNameMatchingCacheTtl;
return caseSensitiveNameMatchingEnabled;
}

@Config("druid.case-insensitive-name-matching.cache-ttl")
public DruidConfig setCaseInsensitiveNameMatchingCacheTtl(Duration caseInsensitiveNameMatchingCacheTtl)
@LegacyConfig("case-insensitive-name-matching")
@Config("case-sensitive-name-matching")
@ConfigDescription("Enable case-sensitive matching of schema, table and column names across the connector. " +
"When disabled, names are matched case-insensitively using lowercase normalization.")
public DruidConfig setCaseSensitiveNameMatchingEnabled(boolean caseSensitiveNameMatchingEnabled)
{
this.caseInsensitiveNameMatchingCacheTtl = caseInsensitiveNameMatchingCacheTtl;
this.caseSensitiveNameMatchingEnabled = caseSensitiveNameMatchingEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public enum DruidErrorCode
DRUID_UNSUPPORTED_TYPE_ERROR(3, EXTERNAL),
DRUID_PUSHDOWN_UNSUPPORTED_EXPRESSION(4, EXTERNAL),
DRUID_QUERY_GENERATOR_FAILURE(5, EXTERNAL),
DRUID_BROKER_RESULT_ERROR(6, EXTERNAL),
DRUID_AMBIGUOUS_OBJECT_NAME(7, EXTERNAL);
DRUID_BROKER_RESULT_ERROR(6, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Loading
Loading