Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions presto-docs/src/main/sphinx/connector/druid.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ Configuration Properties

The following configuration properties are available:

================================== ===================================================
Property Name Description
================================== ===================================================
``druid.coordinator-url`` Druid coordinator url.
``druid.broker-url`` Druid broker url.
``druid.schema-name`` Druid schema name.
``druid.compute-pushdown-enabled`` Whether to pushdown all query processing to Druid.
================================== ===================================================
=================================================== ============================================================
Property Name Description
=================================================== ============================================================
``druid.coordinator-url`` Druid coordinator url.
``druid.broker-url`` Druid broker url.
``druid.schema-name`` Druid schema name.
``druid.compute-pushdown-enabled`` Whether to pushdown all query processing to Druid.
``druid.case-insensitive-name-matching`` Match dataset and table names case-insensitively
``druid.case-insensitive-name-matching.cache-ttl`` Duration for which remote dataset and table names will be
cached. Set to ``0ms`` to disable the cache
==================================================== ============================================================

``druid.coordinator-url``
^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -65,6 +68,20 @@ Whether to pushdown all query processing to Druid.

the default is ``false``.

``druid.case-insensitive-name-matching``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Match dataset and table names case-insensitively.

The default is ``false``.

``druid.case-insensitive-name-matching.cache-ttl``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Duration for which remote dataset and table names will be cached. Set to ``0ms`` to disable the cache.

The default is ``1m``.

Data Types
----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@
import com.facebook.presto.druid.metadata.DruidSegmentInfo;
import com.facebook.presto.druid.metadata.DruidTableInfo;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.CharMatcher;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.Duration;
import org.checkerframework.checker.nullness.qual.Nullable;

import javax.inject.Inject;

Expand All @@ -35,7 +42,12 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
Expand All @@ -45,15 +57,20 @@
import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.airlift.json.JsonCodec.listJsonCodec;
import static com.facebook.presto.druid.DruidErrorCode.DRUID_AMBIGUOUS_OBJECT_NAME;
import static com.facebook.presto.druid.DruidErrorCode.DRUID_BROKER_RESULT_ERROR;
import static com.facebook.presto.druid.DruidResultFormat.OBJECT;
import static com.facebook.presto.druid.DruidResultFormat.OBJECT_LINES;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static com.google.common.net.MediaType.JSON_UTF_8;
import static java.lang.String.format;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class DruidClient
{
Expand All @@ -74,6 +91,8 @@ public class DruidClient
private final URI druidCoordinator;
private final URI druidBroker;
private final String druidSchema;
protected final boolean caseInsensitiveNameMatching;
private final Cache<SchemaTableName, Optional<RemoteTableObject>> remoteTables;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the cache you're using is thread-safe, correct me if I'm wrong.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beinan Yes, that's right the cache is thread-safe.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks for the clarification.


@Inject
public DruidClient(DruidConfig config, @ForDruidClient HttpClient httpClient)
Expand All @@ -83,6 +102,44 @@ public DruidClient(DruidConfig config, @ForDruidClient HttpClient httpClient)
this.druidCoordinator = URI.create(config.getDruidCoordinatorUrl());
this.druidBroker = URI.create(config.getDruidBrokerUrl());
this.druidSchema = config.getDruidSchema();
this.caseInsensitiveNameMatching = config.isCaseInsensitiveNameMatching();

Duration caseInsensitiveNameMatchingCacheTtl = requireNonNull(config.getCaseInsensitiveNameMatchingCacheTtl(), "caseInsensitiveNameMatchingCacheTtl is null");
CacheBuilder<Object, Object> remoteTableNamesCacheBuilder = CacheBuilder.newBuilder()
.expireAfterWrite(caseInsensitiveNameMatchingCacheTtl.toMillis(), MILLISECONDS);
this.remoteTables = remoteTableNamesCacheBuilder.build();
}

Optional<RemoteTableObject> toRemoteTable(SchemaTableName schemaTableName)
{
requireNonNull(schemaTableName, "schemaTableName is null");
verify(CharMatcher.forPredicate(Character::isUpperCase).matchesNoneOf(schemaTableName.getTableName()), "Expected table name from internal metadata to be lowercase: %s", schemaTableName);
if (!caseInsensitiveNameMatching) {
return Optional.of(RemoteTableObject.of(schemaTableName.getTableName()));
}

@Nullable Optional<RemoteTableObject> remoteTable = remoteTables.getIfPresent(schemaTableName);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, can we use cache's get(K key,
Callable<? extends V> valueLoader) to avoid returning a null? it's something like if cached, return; otherwise create, cache and return

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beinan In the current design, to create a new cache entry it involves looping through all the tables present in Druid. If I make use of cacheLoader to create, cache and return, it will result in looping through all the tables every time a new table is accessed as cache will be updated for only the table which is accessed. Whereas in my current implementation, if a cache reload takes place, I load all the tables into the cache which are present in Druid at the instant which means cache will only be reloaded again when a table is accessed which did not exist when the cache was refreshed the last time.

Please let me know your views on the same and correct me in case I misunderstood something.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I'm convinced.

if (remoteTable != null) {
return remoteTable;
}

// Cache miss, reload the cache
Map<SchemaTableName, Optional<RemoteTableObject>> mapping = new HashMap<>();
for (String table : getTables()) {
SchemaTableName cacheKey = new SchemaTableName(getSchema(), table);
mapping.merge(
cacheKey,
Optional.of(RemoteTableObject.of(table)),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the benefit of using an Optional here? can we just put the remoteTableObject as the value directly?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using Optional.empty() to mark a table which doesn't exist in Druid.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it make sense. Thank you for the clarification!

(currentValue, collision) -> currentValue.map(current -> current.registerCollision(collision.get().getOnlyRemoteTableName())));
remoteTables.put(cacheKey, mapping.get(cacheKey));
}

// explicitly cache if the requested table doesn't exist
if (!mapping.containsKey(schemaTableName)) {
remoteTables.put(schemaTableName, Optional.empty());
}

return mapping.containsKey(schemaTableName) ? mapping.get(schemaTableName) : Optional.empty();
}

public URI getDruidBroker()
Expand Down Expand Up @@ -248,4 +305,46 @@ public String toJson()
return JsonCodec.jsonCodec(DruidRequestBody.class).toJson(this);
}
}

static final class RemoteTableObject
{
private final Set<String> remoteTableNames;

private RemoteTableObject(Set<String> remoteTableNames)
{
this.remoteTableNames = ImmutableSet.copyOf(remoteTableNames);
}

public static RemoteTableObject of(String remoteName)
{
return new RemoteTableObject(ImmutableSet.of(remoteName));
}

public RemoteTableObject registerCollision(String ambiguousName)
{
return new RemoteTableObject(ImmutableSet.<String>builderWithExpectedSize(remoteTableNames.size() + 1)
.addAll(remoteTableNames)
.add(ambiguousName)
.build());
}

public String getAnyRemoteTableName()
{
return Collections.min(remoteTableNames);
}

public String getOnlyRemoteTableName()
{
if (!isAmbiguous()) {
return getOnlyElement(remoteTableNames);
}

throw new PrestoException(DRUID_AMBIGUOUS_OBJECT_NAME, "Found ambiguous names in Druid when looking up '" + getAnyRemoteTableName().toLowerCase(ENGLISH) + "': " + remoteTableNames);
}

public boolean isAmbiguous()
{
return remoteTableNames.size() > 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.common.base.Splitter;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

Expand All @@ -27,6 +29,8 @@
import java.util.List;
import java.util.Map;

import static java.util.concurrent.TimeUnit.MINUTES;

public class DruidConfig
{
private String coordinatorUrl;
Expand All @@ -38,6 +42,8 @@ public class DruidConfig
private String basicAuthenticationUsername;
private String basicAuthenticationPassword;
private String ingestionStoragePath = StandardSystemProperty.JAVA_IO_TMPDIR.value();
private boolean caseInsensitiveNameMatching;
private Duration caseInsensitiveNameMatchingCacheTtl = new Duration(1, MINUTES);

public enum DruidAuthenticationType
{
Expand Down Expand Up @@ -194,4 +200,30 @@ public DruidConfig setIngestionStoragePath(String ingestionStoragePath)
this.ingestionStoragePath = ingestionStoragePath;
return this;
}

public boolean isCaseInsensitiveNameMatching()
{
return caseInsensitiveNameMatching;
}

@Config("druid.case-insensitive-name-matching")
public DruidConfig setCaseInsensitiveNameMatching(boolean caseInsensitiveNameMatching)
{
this.caseInsensitiveNameMatching = caseInsensitiveNameMatching;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getCaseInsensitiveNameMatchingCacheTtl()
{
return caseInsensitiveNameMatchingCacheTtl;
}

@Config("druid.case-insensitive-name-matching.cache-ttl")
public DruidConfig setCaseInsensitiveNameMatchingCacheTtl(Duration caseInsensitiveNameMatchingCacheTtl)
{
this.caseInsensitiveNameMatchingCacheTtl = caseInsensitiveNameMatchingCacheTtl;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public enum DruidErrorCode
DRUID_UNSUPPORTED_TYPE_ERROR(3, EXTERNAL),
DRUID_PUSHDOWN_UNSUPPORTED_EXPRESSION(4, EXTERNAL),
DRUID_QUERY_GENERATOR_FAILURE(5, EXTERNAL),
DRUID_BROKER_RESULT_ERROR(6, EXTERNAL);
DRUID_BROKER_RESULT_ERROR(6, EXTERNAL),
DRUID_AMBIGUOUS_OBJECT_NAME(7, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.druid;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.druid.DruidClient.RemoteTableObject;
import com.facebook.presto.druid.ingestion.DruidIngestionTableHandle;
import com.facebook.presto.druid.metadata.DruidColumnInfo;
import com.facebook.presto.druid.metadata.DruidColumnType;
Expand Down Expand Up @@ -46,14 +48,15 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.druid.DruidTableHandle.fromSchemaTableName;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;

public class DruidMetadata
implements ConnectorMetadata
{
private static final Logger log = Logger.get(DruidMetadata.class);

private final DruidClient druidClient;

@Inject
Expand All @@ -69,11 +72,15 @@ public List<String> listSchemaNames(ConnectorSession session)
}

@Override
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
{
String remoteTableName = druidClient.toRemoteTable(schemaTableName)
.map(RemoteTableObject::getOnlyRemoteTableName)
.orElse(schemaTableName.getTableName());

return druidClient.getTables().stream()
.filter(name -> name.equals(tableName.getTableName()))
.map(name -> fromSchemaTableName(tableName))
.filter(name -> name.equals(remoteTableName))
.map(name -> new DruidTableHandle(druidClient.getSchema(), remoteTableName, Optional.empty()))
.findFirst()
.orElse(null);
}
Expand Down Expand Up @@ -106,9 +113,22 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
return druidClient.getTables().stream()
.map(tableName -> new SchemaTableName(druidClient.getSchema(), tableName))
.collect(toImmutableList());
ImmutableList.Builder<SchemaTableName> tableNames = ImmutableList.builder();
for (String table : druidClient.getTables()) {
// Ignore ambiguous tables
boolean isAmbiguous = druidClient.toRemoteTable(new SchemaTableName(druidClient.getSchema(), table))
.filter(RemoteTableObject::isAmbiguous)
.isPresent();

if (!isAmbiguous) {
tableNames.add(new SchemaTableName(druidClient.getSchema(), table));
}
else {
log.debug("Filtered out [%s.%s] from list of tables due to ambiguous name", druidClient.getSchema(), table);
}
}

return tableNames.build();
}

@Override
Expand All @@ -125,7 +145,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
requireNonNull(prefix, "prefix is null");
ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
for (SchemaTableName tableName : listTables(session, prefix)) {
ConnectorTableMetadata tableMetadata = getTableMetadata(session, fromSchemaTableName(tableName));
ConnectorTableMetadata tableMetadata = getTableMetadata(session, getTableHandle(session, tableName));
if (tableMetadata != null) {
columns.put(tableName, tableMetadata.getColumns());
}
Expand Down Expand Up @@ -173,7 +193,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
{
if (prefix.getTableName() == null) {
return listTables(session, prefix.getSchemaName());
return listTables(session, Optional.of(prefix.getSchemaName()));
}
return ImmutableList.of(prefix.toSchemaTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;

import java.util.HashMap;
import java.util.Map;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
Expand All @@ -35,17 +36,17 @@ private DruidQueryRunner() {}
private static String broker = "http://localhost:8082";
private static String coordinator = "http://localhost:8081";

public static DistributedQueryRunner createDruidQueryRunner()
public static DistributedQueryRunner createDruidQueryRunner(Map<String, String> connectorProperties)
throws Exception
{
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(createSession()).build();
try {
queryRunner.installPlugin(new DruidPlugin());
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("druid.coordinator-url", coordinator)
.put("druid.broker-url", broker)
.build();
queryRunner.createCatalog(DEFAULT_CATALOG, "druid", properties);
connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties));
connectorProperties.putIfAbsent("druid.coordinator-url", coordinator);
connectorProperties.putIfAbsent("druid.broker-url", broker);

queryRunner.createCatalog(DEFAULT_CATALOG, "druid", connectorProperties);
return queryRunner;
}
catch (Exception e) {
Expand All @@ -66,7 +67,7 @@ public static Session createSession()
public static void main(String[] args)
throws Exception
{
DistributedQueryRunner queryRunner = createDruidQueryRunner();
DistributedQueryRunner queryRunner = createDruidQueryRunner(ImmutableMap.of());
log.info(format("Presto server started: %s", queryRunner.getCoordinator().getBaseUrl()));
}
}
Loading