diff --git a/presto-docs/src/main/sphinx/connector/druid.rst b/presto-docs/src/main/sphinx/connector/druid.rst index 2609826141625..4bbc4a74eca97 100644 --- a/presto-docs/src/main/sphinx/connector/druid.rst +++ b/presto-docs/src/main/sphinx/connector/druid.rst @@ -32,14 +32,17 @@ Configuration Properties The following configuration properties are available: -================================== =================================================== -Property Name Description -================================== =================================================== -``druid.coordinator-url`` Druid coordinator url. -``druid.broker-url`` Druid broker url. -``druid.schema-name`` Druid schema name. -``druid.compute-pushdown-enabled`` Whether to pushdown all query processing to Druid. -================================== =================================================== +=================================================== ============================================================ +Property Name Description +=================================================== ============================================================ +``druid.coordinator-url`` Druid coordinator url. +``druid.broker-url`` Druid broker url. +``druid.schema-name`` Druid schema name. +``druid.compute-pushdown-enabled`` Whether to pushdown all query processing to Druid. +``druid.case-insensitive-name-matching`` Match dataset and table names case-insensitively +``druid.case-insensitive-name-matching.cache-ttl`` Duration for which remote dataset and table names will be + cached. Set to ``0ms`` to disable the cache +==================================================== ============================================================ ``druid.coordinator-url`` ^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -65,6 +68,20 @@ Whether to pushdown all query processing to Druid. the default is ``false``. +``druid.case-insensitive-name-matching`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Match dataset and table names case-insensitively. + +The default is ``false``. + +``druid.case-insensitive-name-matching.cache-ttl`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Duration for which remote dataset and table names will be cached. Set to ``0ms`` to disable the cache. + +The default is ``1m``. + Data Types ---------- diff --git a/presto-druid/src/main/java/com/facebook/presto/druid/DruidClient.java b/presto-druid/src/main/java/com/facebook/presto/druid/DruidClient.java index 31578a0d40d1e..ee35e7159ae7e 100644 --- a/presto-druid/src/main/java/com/facebook/presto/druid/DruidClient.java +++ b/presto-druid/src/main/java/com/facebook/presto/druid/DruidClient.java @@ -24,9 +24,16 @@ import com.facebook.presto.druid.metadata.DruidSegmentInfo; import com.facebook.presto.druid.metadata.DruidTableInfo; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.SchemaTableName; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.CharMatcher; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.units.Duration; +import org.checkerframework.checker.nullness.qual.Nullable; import javax.inject.Inject; @@ -35,7 +42,12 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom; @@ -45,15 +57,20 @@ import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator; import static com.facebook.airlift.json.JsonCodec.jsonCodec; import static com.facebook.airlift.json.JsonCodec.listJsonCodec; +import static com.facebook.presto.druid.DruidErrorCode.DRUID_AMBIGUOUS_OBJECT_NAME; import static com.facebook.presto.druid.DruidErrorCode.DRUID_BROKER_RESULT_ERROR; import static com.facebook.presto.druid.DruidResultFormat.OBJECT; import static com.facebook.presto.druid.DruidResultFormat.OBJECT_LINES; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.net.HttpHeaders.CONTENT_TYPE; import static com.google.common.net.MediaType.JSON_UTF_8; import static java.lang.String.format; import static java.net.HttpURLConnection.HTTP_OK; +import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; public class DruidClient { @@ -74,6 +91,8 @@ public class DruidClient private final URI druidCoordinator; private final URI druidBroker; private final String druidSchema; + protected final boolean caseInsensitiveNameMatching; + private final Cache> remoteTables; @Inject public DruidClient(DruidConfig config, @ForDruidClient HttpClient httpClient) @@ -83,6 +102,44 @@ public DruidClient(DruidConfig config, @ForDruidClient HttpClient httpClient) this.druidCoordinator = URI.create(config.getDruidCoordinatorUrl()); this.druidBroker = URI.create(config.getDruidBrokerUrl()); this.druidSchema = config.getDruidSchema(); + this.caseInsensitiveNameMatching = config.isCaseInsensitiveNameMatching(); + + Duration caseInsensitiveNameMatchingCacheTtl = requireNonNull(config.getCaseInsensitiveNameMatchingCacheTtl(), "caseInsensitiveNameMatchingCacheTtl is null"); + CacheBuilder remoteTableNamesCacheBuilder = CacheBuilder.newBuilder() + .expireAfterWrite(caseInsensitiveNameMatchingCacheTtl.toMillis(), MILLISECONDS); + this.remoteTables = remoteTableNamesCacheBuilder.build(); + } + + Optional toRemoteTable(SchemaTableName schemaTableName) + { + requireNonNull(schemaTableName, "schemaTableName is null"); + verify(CharMatcher.forPredicate(Character::isUpperCase).matchesNoneOf(schemaTableName.getTableName()), "Expected table name from internal metadata to be lowercase: %s", schemaTableName); + if (!caseInsensitiveNameMatching) { + return Optional.of(RemoteTableObject.of(schemaTableName.getTableName())); + } + + @Nullable Optional remoteTable = remoteTables.getIfPresent(schemaTableName); + if (remoteTable != null) { + return remoteTable; + } + + // Cache miss, reload the cache + Map> mapping = new HashMap<>(); + for (String table : getTables()) { + SchemaTableName cacheKey = new SchemaTableName(getSchema(), table); + mapping.merge( + cacheKey, + Optional.of(RemoteTableObject.of(table)), + (currentValue, collision) -> currentValue.map(current -> current.registerCollision(collision.get().getOnlyRemoteTableName()))); + remoteTables.put(cacheKey, mapping.get(cacheKey)); + } + + // explicitly cache if the requested table doesn't exist + if (!mapping.containsKey(schemaTableName)) { + remoteTables.put(schemaTableName, Optional.empty()); + } + + return mapping.containsKey(schemaTableName) ? mapping.get(schemaTableName) : Optional.empty(); } public URI getDruidBroker() @@ -248,4 +305,46 @@ public String toJson() return JsonCodec.jsonCodec(DruidRequestBody.class).toJson(this); } } + + static final class RemoteTableObject + { + private final Set remoteTableNames; + + private RemoteTableObject(Set remoteTableNames) + { + this.remoteTableNames = ImmutableSet.copyOf(remoteTableNames); + } + + public static RemoteTableObject of(String remoteName) + { + return new RemoteTableObject(ImmutableSet.of(remoteName)); + } + + public RemoteTableObject registerCollision(String ambiguousName) + { + return new RemoteTableObject(ImmutableSet.builderWithExpectedSize(remoteTableNames.size() + 1) + .addAll(remoteTableNames) + .add(ambiguousName) + .build()); + } + + public String getAnyRemoteTableName() + { + return Collections.min(remoteTableNames); + } + + public String getOnlyRemoteTableName() + { + if (!isAmbiguous()) { + return getOnlyElement(remoteTableNames); + } + + throw new PrestoException(DRUID_AMBIGUOUS_OBJECT_NAME, "Found ambiguous names in Druid when looking up '" + getAnyRemoteTableName().toLowerCase(ENGLISH) + "': " + remoteTableNames); + } + + public boolean isAmbiguous() + { + return remoteTableNames.size() > 1; + } + } } diff --git a/presto-druid/src/main/java/com/facebook/presto/druid/DruidConfig.java b/presto-druid/src/main/java/com/facebook/presto/druid/DruidConfig.java index 2c423510f9403..a5b6a49d5785e 100644 --- a/presto-druid/src/main/java/com/facebook/presto/druid/DruidConfig.java +++ b/presto-druid/src/main/java/com/facebook/presto/druid/DruidConfig.java @@ -18,6 +18,8 @@ import com.google.common.base.Splitter; import com.google.common.base.StandardSystemProperty; import com.google.common.collect.ImmutableList; +import io.airlift.units.Duration; +import io.airlift.units.MinDuration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -27,6 +29,8 @@ import java.util.List; import java.util.Map; +import static java.util.concurrent.TimeUnit.MINUTES; + public class DruidConfig { private String coordinatorUrl; @@ -38,6 +42,8 @@ public class DruidConfig private String basicAuthenticationUsername; private String basicAuthenticationPassword; private String ingestionStoragePath = StandardSystemProperty.JAVA_IO_TMPDIR.value(); + private boolean caseInsensitiveNameMatching; + private Duration caseInsensitiveNameMatchingCacheTtl = new Duration(1, MINUTES); public enum DruidAuthenticationType { @@ -194,4 +200,30 @@ public DruidConfig setIngestionStoragePath(String ingestionStoragePath) this.ingestionStoragePath = ingestionStoragePath; return this; } + + public boolean isCaseInsensitiveNameMatching() + { + return caseInsensitiveNameMatching; + } + + @Config("druid.case-insensitive-name-matching") + public DruidConfig setCaseInsensitiveNameMatching(boolean caseInsensitiveNameMatching) + { + this.caseInsensitiveNameMatching = caseInsensitiveNameMatching; + return this; + } + + @NotNull + @MinDuration("0ms") + public Duration getCaseInsensitiveNameMatchingCacheTtl() + { + return caseInsensitiveNameMatchingCacheTtl; + } + + @Config("druid.case-insensitive-name-matching.cache-ttl") + public DruidConfig setCaseInsensitiveNameMatchingCacheTtl(Duration caseInsensitiveNameMatchingCacheTtl) + { + this.caseInsensitiveNameMatchingCacheTtl = caseInsensitiveNameMatchingCacheTtl; + return this; + } } diff --git a/presto-druid/src/main/java/com/facebook/presto/druid/DruidErrorCode.java b/presto-druid/src/main/java/com/facebook/presto/druid/DruidErrorCode.java index e776790899d3c..b45ebbd88daee 100644 --- a/presto-druid/src/main/java/com/facebook/presto/druid/DruidErrorCode.java +++ b/presto-druid/src/main/java/com/facebook/presto/druid/DruidErrorCode.java @@ -28,7 +28,8 @@ public enum DruidErrorCode DRUID_UNSUPPORTED_TYPE_ERROR(3, EXTERNAL), DRUID_PUSHDOWN_UNSUPPORTED_EXPRESSION(4, EXTERNAL), DRUID_QUERY_GENERATOR_FAILURE(5, EXTERNAL), - DRUID_BROKER_RESULT_ERROR(6, EXTERNAL); + DRUID_BROKER_RESULT_ERROR(6, EXTERNAL), + DRUID_AMBIGUOUS_OBJECT_NAME(7, EXTERNAL); private final ErrorCode errorCode; diff --git a/presto-druid/src/main/java/com/facebook/presto/druid/DruidMetadata.java b/presto-druid/src/main/java/com/facebook/presto/druid/DruidMetadata.java index 8af9d3f18760b..734d9142bdbd8 100644 --- a/presto-druid/src/main/java/com/facebook/presto/druid/DruidMetadata.java +++ b/presto-druid/src/main/java/com/facebook/presto/druid/DruidMetadata.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.druid; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.druid.DruidClient.RemoteTableObject; import com.facebook.presto.druid.ingestion.DruidIngestionTableHandle; import com.facebook.presto.druid.metadata.DruidColumnInfo; import com.facebook.presto.druid.metadata.DruidColumnType; @@ -46,7 +48,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static com.facebook.presto.druid.DruidTableHandle.fromSchemaTableName; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.util.Objects.requireNonNull; @@ -54,6 +55,8 @@ public class DruidMetadata implements ConnectorMetadata { + private static final Logger log = Logger.get(DruidMetadata.class); + private final DruidClient druidClient; @Inject @@ -69,11 +72,15 @@ public List listSchemaNames(ConnectorSession session) } @Override - public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) { + String remoteTableName = druidClient.toRemoteTable(schemaTableName) + .map(RemoteTableObject::getOnlyRemoteTableName) + .orElse(schemaTableName.getTableName()); + return druidClient.getTables().stream() - .filter(name -> name.equals(tableName.getTableName())) - .map(name -> fromSchemaTableName(tableName)) + .filter(name -> name.equals(remoteTableName)) + .map(name -> new DruidTableHandle(druidClient.getSchema(), remoteTableName, Optional.empty())) .findFirst() .orElse(null); } @@ -106,9 +113,22 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect @Override public List listTables(ConnectorSession session, Optional schemaName) { - return druidClient.getTables().stream() - .map(tableName -> new SchemaTableName(druidClient.getSchema(), tableName)) - .collect(toImmutableList()); + ImmutableList.Builder tableNames = ImmutableList.builder(); + for (String table : druidClient.getTables()) { + // Ignore ambiguous tables + boolean isAmbiguous = druidClient.toRemoteTable(new SchemaTableName(druidClient.getSchema(), table)) + .filter(RemoteTableObject::isAmbiguous) + .isPresent(); + + if (!isAmbiguous) { + tableNames.add(new SchemaTableName(druidClient.getSchema(), table)); + } + else { + log.debug("Filtered out [%s.%s] from list of tables due to ambiguous name", druidClient.getSchema(), table); + } + } + + return tableNames.build(); } @Override @@ -125,7 +145,7 @@ public Map> listTableColumns(ConnectorSess requireNonNull(prefix, "prefix is null"); ImmutableMap.Builder> columns = ImmutableMap.builder(); for (SchemaTableName tableName : listTables(session, prefix)) { - ConnectorTableMetadata tableMetadata = getTableMetadata(session, fromSchemaTableName(tableName)); + ConnectorTableMetadata tableMetadata = getTableMetadata(session, getTableHandle(session, tableName)); if (tableMetadata != null) { columns.put(tableName, tableMetadata.getColumns()); } @@ -173,7 +193,7 @@ public Optional finishCreateTable(ConnectorSession sess private List listTables(ConnectorSession session, SchemaTablePrefix prefix) { if (prefix.getTableName() == null) { - return listTables(session, prefix.getSchemaName()); + return listTables(session, Optional.of(prefix.getSchemaName())); } return ImmutableList.of(prefix.toSchemaTableName()); } diff --git a/presto-druid/src/test/java/com/facebook/presto/druid/DruidQueryRunner.java b/presto-druid/src/test/java/com/facebook/presto/druid/DruidQueryRunner.java index 1793da7cbbd4d..35a05f00c5a6a 100644 --- a/presto-druid/src/test/java/com/facebook/presto/druid/DruidQueryRunner.java +++ b/presto-druid/src/test/java/com/facebook/presto/druid/DruidQueryRunner.java @@ -18,6 +18,7 @@ import com.facebook.presto.tests.DistributedQueryRunner; import com.google.common.collect.ImmutableMap; +import java.util.HashMap; import java.util.Map; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; @@ -35,17 +36,17 @@ private DruidQueryRunner() {} private static String broker = "http://localhost:8082"; private static String coordinator = "http://localhost:8081"; - public static DistributedQueryRunner createDruidQueryRunner() + public static DistributedQueryRunner createDruidQueryRunner(Map connectorProperties) throws Exception { DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(createSession()).build(); try { queryRunner.installPlugin(new DruidPlugin()); - Map properties = ImmutableMap.builder() - .put("druid.coordinator-url", coordinator) - .put("druid.broker-url", broker) - .build(); - queryRunner.createCatalog(DEFAULT_CATALOG, "druid", properties); + connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties)); + connectorProperties.putIfAbsent("druid.coordinator-url", coordinator); + connectorProperties.putIfAbsent("druid.broker-url", broker); + + queryRunner.createCatalog(DEFAULT_CATALOG, "druid", connectorProperties); return queryRunner; } catch (Exception e) { @@ -66,7 +67,7 @@ public static Session createSession() public static void main(String[] args) throws Exception { - DistributedQueryRunner queryRunner = createDruidQueryRunner(); + DistributedQueryRunner queryRunner = createDruidQueryRunner(ImmutableMap.of()); log.info(format("Presto server started: %s", queryRunner.getCoordinator().getBaseUrl())); } } diff --git a/presto-druid/src/test/java/com/facebook/presto/druid/TestDruidConfig.java b/presto-druid/src/test/java/com/facebook/presto/druid/TestDruidConfig.java index 70ac7a69db2be..5b5b90043ab32 100644 --- a/presto-druid/src/test/java/com/facebook/presto/druid/TestDruidConfig.java +++ b/presto-druid/src/test/java/com/facebook/presto/druid/TestDruidConfig.java @@ -17,6 +17,7 @@ import com.google.common.base.StandardSystemProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; import org.testng.annotations.Test; import java.util.Map; @@ -25,6 +26,8 @@ import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static com.facebook.presto.druid.DruidConfig.DruidAuthenticationType.BASIC; import static com.facebook.presto.druid.DruidConfig.DruidAuthenticationType.NONE; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; public class TestDruidConfig { @@ -40,7 +43,9 @@ public void testDefaults() .setDruidAuthenticationType(NONE) .setBasicAuthenticationUsername(null) .setBasicAuthenticationPassword(null) - .setIngestionStoragePath(StandardSystemProperty.JAVA_IO_TMPDIR.value())); + .setIngestionStoragePath(StandardSystemProperty.JAVA_IO_TMPDIR.value()) + .setCaseInsensitiveNameMatching(false) + .setCaseInsensitiveNameMatchingCacheTtl(new Duration(1, MINUTES))); } @Test @@ -56,6 +61,8 @@ public void testExplicitPropertyMappings() .put("druid.basic.authentication.username", "http_basic_username") .put("druid.basic.authentication.password", "http_basic_password") .put("druid.ingestion.storage.path", "hdfs://foo/bar/") + .put("druid.case-insensitive-name-matching", "true") + .put("druid.case-insensitive-name-matching.cache-ttl", "1s") .build(); DruidConfig expected = new DruidConfig() @@ -67,7 +74,9 @@ public void testExplicitPropertyMappings() .setDruidAuthenticationType(BASIC) .setBasicAuthenticationUsername("http_basic_username") .setBasicAuthenticationPassword("http_basic_password") - .setIngestionStoragePath("hdfs://foo/bar/"); + .setIngestionStoragePath("hdfs://foo/bar/") + .setCaseInsensitiveNameMatching(true) + .setCaseInsensitiveNameMatchingCacheTtl(new Duration(1, SECONDS)); ConfigAssertions.assertFullMapping(properties, expected); }