Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

final class RemoteTableNameCacheKey
public final class RemoteTableNameCacheKey
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a GitHub issue here mentioning the reason why you did it this way and what needs to be fixed before we can do it the "correct" way.

Add a TODO comment here referring to that issue so that we can eventually clean up the code instead of changing API is non-needed ways.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same TODO over the now public constructor.

{
private final JdbcIdentity identity;
private final String schema;

RemoteTableNameCacheKey(JdbcIdentity identity, String schema)
public RemoteTableNameCacheKey(JdbcIdentity identity, String schema)
{
this.identity = requireNonNull(identity, "identity is null");
this.schema = requireNonNull(schema, "schema is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,22 @@
*/
package io.trino.plugin.druid;

import com.google.common.base.CharMatcher;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.jdbc.BaseJdbcClient;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ColumnMapping;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcIdentity;
import io.trino.plugin.jdbc.JdbcNamedRelationHandle;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcSplit;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.PreparedQuery;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.RemoteTableNameCacheKey;
import io.trino.plugin.jdbc.WriteFunction;
import io.trino.plugin.jdbc.WriteMapping;
import io.trino.spi.TrinoException;
Expand All @@ -51,12 +54,15 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.plugin.jdbc.StandardColumnMappings.defaultVarcharColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharColumnMapping;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static java.util.Objects.requireNonNull;

public class DruidJdbcClient
extends BaseJdbcClient
Expand Down Expand Up @@ -85,9 +91,10 @@ protected Collection<String> listSchemas(Connection connection)
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
{
try (Connection connection = connectionFactory.openConnection(session)) {
String jdbcSchemaName = schemaTableName.getSchemaName();
String jdbcTableName = schemaTableName.getTableName();
try (ResultSet resultSet = getTables(connection, Optional.of(jdbcSchemaName), Optional.of(jdbcTableName))) {
JdbcIdentity identity = JdbcIdentity.from(session);
String remoteSchema = toRemoteSchemaName(identity, connection, schemaTableName.getSchemaName());
String remoteTable = toRemoteTableName(identity, connection, remoteSchema, schemaTableName.getTableName());
try (ResultSet resultSet = getTables(connection, Optional.of(remoteSchema), Optional.of(remoteTable))) {
List<JdbcTableHandle> tableHandles = new ArrayList<>();
while (resultSet.next()) {
tableHandles.add(new JdbcTableHandle(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tableHandles.add(new JdbcTableHandle(
tableHandles.add(new JdbcTableHandle(
schemaTableName,
getRemoteTable(resultSet));
private static RemoteTableName getRemoteTable(ResultSet resultSet)
{
return new RemoteTableName(
Optional.of(DRUID_CATALOG),
Optional.ofNullable(resultSet.getString("TABLE_SCHEM")),
resultSet.getString("TABLE_NAME"));
}

Expand All @@ -99,14 +106,15 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
if (tableHandles.isEmpty()) {
return Optional.empty();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: revert whitespace changes.

return Optional.of(
Comment thread
dheerajkulakarni marked this conversation as resolved.
getOnlyElement(
tableHandles
.stream()
.filter(
jdbcTableHandle ->
Objects.equals(jdbcTableHandle.getSchemaName(), schemaTableName.getSchemaName())
&& Objects.equals(jdbcTableHandle.getTableName(), schemaTableName.getTableName()))
Objects.equals(jdbcTableHandle.getSchemaName(), remoteSchema)
&& Objects.equals(jdbcTableHandle.getTableName(), remoteTable))
.collect(Collectors.toList())));
}
}
Expand Down Expand Up @@ -136,6 +144,68 @@ protected ResultSet getTables(Connection connection, Optional<String> schemaName
null);
}

@Override
protected String toRemoteSchemaName(JdbcIdentity identity, Connection connection, String schemaName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO comment here and toRemoteTableName.
Mention what has been changed from the original method in the comment so that it's easier to make sure code is equivalent when this override can be removed.

{
requireNonNull(schemaName, "schemaName is null");
verify(CharMatcher.forPredicate(Character::isUpperCase).matchesNoneOf(schemaName), "Expected schema name from internal metadata to be lowercase: %s", schemaName);

if (caseInsensitiveNameMatching) {
try {
Map<String, String> mapping = remoteSchemaNames.getIfPresent(identity);
if (mapping != null && !mapping.containsKey(schemaName)) {
// This might be a schema that has just been created. Force reload.
mapping = null;
}
if (mapping == null) {
mapping = listSchemasByLowerCase(connection);
remoteSchemaNames.put(identity, mapping);
}
String remoteSchema = mapping.get(schemaName);
if (remoteSchema != null) {
return remoteSchema;
}
}
catch (RuntimeException e) {
throw new TrinoException(JDBC_ERROR, "Failed to find remote schema name: " + firstNonNull(e.getMessage(), e), e);
}
}

return schemaName;
}

@Override
protected String toRemoteTableName(JdbcIdentity identity, Connection connection, String remoteSchema, String tableName)
{
requireNonNull(remoteSchema, "remoteSchema is null");
requireNonNull(tableName, "tableName is null");
verify(CharMatcher.forPredicate(Character::isUpperCase).matchesNoneOf(tableName), "Expected table name from internal metadata to be lowercase: %s", tableName);

if (caseInsensitiveNameMatching) {
try {
RemoteTableNameCacheKey cacheKey = new RemoteTableNameCacheKey(identity, remoteSchema);
Map<String, String> mapping = remoteTableNames.getIfPresent(cacheKey);
if (mapping != null && !mapping.containsKey(tableName)) {
// This might be a table that has just been created. Force reload.
mapping = null;
}
if (mapping == null) {
mapping = listTablesByLowerCase(connection, remoteSchema);
remoteTableNames.put(cacheKey, mapping);
}
String remoteTable = mapping.get(tableName);
if (remoteTable != null) {
return remoteTable;
}
}
catch (RuntimeException e) {
throw new TrinoException(JDBC_ERROR, "Failed to find remote table name: " + firstNonNull(e.getMessage(), e), e);
}
}

return tableName;
}

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public class DruidQueryRunner
{
private DruidQueryRunner() {}

public static DistributedQueryRunner createDruidQueryRunnerTpch(TestingDruidServer testingDruidServer, Map<String, String> extraProperties)
public static DistributedQueryRunner createDruidQueryRunnerTpch(
TestingDruidServer testingDruidServer,
Map<String, String> extraProperties,
Map<String, String> connectorProperties)
throws Exception
{
DistributedQueryRunner queryRunner = null;
Expand All @@ -50,7 +53,7 @@ public static DistributedQueryRunner createDruidQueryRunnerTpch(TestingDruidServ
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Map<String, String> connectorProperties = new HashMap<>();
connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties));
connectorProperties.putIfAbsent("connection-url", testingDruidServer.getJdbcUrl());
queryRunner.installPlugin(new DruidJdbcPlugin());
queryRunner.createCatalog("druid", "druid", connectorProperties);
Expand All @@ -62,6 +65,15 @@ public static DistributedQueryRunner createDruidQueryRunnerTpch(TestingDruidServ
}
}

public static void copyAndIngestTpchData(MaterializedResult rows, TestingDruidServer testingDruidServer,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like if the method name was more clear but can't think of anything better.

We are basically taking an existing index task, changing the destination name and ingesting under the new name. Something like a CREATE TABLE AS SELECT * FROM file.

String sourceDatasource, String targetDatasource)
throws IOException, InterruptedException
{
String tsvFileLocation = format("%s/%s.tsv", testingDruidServer.getHostWorkingDirectory(), targetDatasource);
writeDataAsTsv(rows, tsvFileLocation);
testingDruidServer.ingestData(targetDatasource, getIngestionSpecFileName(sourceDatasource), tsvFileLocation);
}

public static void copyAndIngestTpchData(MaterializedResult rows, TestingDruidServer testingDruidServer, String druidDatasource)
throws IOException, InterruptedException
{
Expand Down Expand Up @@ -109,7 +121,7 @@ public static void main(String[] args)

DistributedQueryRunner queryRunner = createDruidQueryRunnerTpch(
new TestingDruidServer(),
ImmutableMap.of("http-server.http.port", "8080"));
ImmutableMap.of("http-server.http.port", "8080"), ImmutableMap.of());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break arg on newline since existing ones are on new lines.


Logger log = Logger.get(DruidQueryRunner.class);
log.info("======== SERVER STARTED ========");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.druid;

import com.google.common.collect.ImmutableMap;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.assertions.Assert;
import org.testng.annotations.Test;

import java.io.IOException;

import static io.trino.plugin.druid.BaseDruidIntegrationSmokeTest.SELECT_FROM_ORDERS;
import static io.trino.plugin.druid.BaseDruidIntegrationSmokeTest.SELECT_FROM_REGION;
import static io.trino.plugin.druid.DruidQueryRunner.copyAndIngestTpchData;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static org.assertj.core.api.Assertions.assertThat;

@Test(singleThreaded = true)
public class TestDruidCaseInsensitiveMatch
extends AbstractTestQueryFramework
{
private TestingDruidServer druidServer;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
druidServer = new TestingDruidServer();
closeAfterClass(() -> {
druidServer.close();
druidServer = null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea but I don't think it matters in tests.

});
DistributedQueryRunner queryRunner = DruidQueryRunner.createDruidQueryRunnerTpch(
druidServer, ImmutableMap.of(), ImmutableMap.of("case-insensitive-name-matching", "true"));
copyAndIngestTpchData(queryRunner.execute(SELECT_FROM_ORDERS + " LIMIT 10"), this.druidServer, "orders", "CamelCase");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit and very pedantic: Even though CamelCase happens to be mixed case we are testing behaviour with mixed case names.

Suggested change
copyAndIngestTpchData(queryRunner.execute(SELECT_FROM_ORDERS + " LIMIT 10"), this.druidServer, "orders", "CamelCase");
copyAndIngestTpchData(queryRunner.execute(SELECT_FROM_ORDERS + " LIMIT 10"), this.druidServer, "orders", "MiXeD_CaSe");

return queryRunner;
}

@Test
public void testNonLowerCaseTableName()
{
MaterializedResult expectedColumns = MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
.row("__time", "timestamp(3)", "", "")
.row("clerk", "varchar", "", "") // String columns are reported only as varchar
.row("comment", "varchar", "", "")
.row("custkey", "bigint", "", "") // Long columns are reported as bigint
.row("orderdate", "varchar", "", "")
.row("orderkey", "bigint", "", "")
.row("orderpriority", "varchar", "", "")
.row("orderstatus", "varchar", "", "")
.row("shippriority", "bigint", "", "") // Druid doesn't support int type
.row("totalprice", "double", "", "")
.build();
MaterializedResult actualColumns = computeActual("DESCRIBE " + "CamelCase");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for string concat here.

Assert.assertEquals(actualColumns, expectedColumns);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static import.

MaterializedResult materializedRows = computeActual("SELECT * FROM druid.druid.CAMELCASE");
Assert.assertEquals(materializedRows.getRowCount(), 10);
MaterializedResult materializedRows1 = computeActual("SELECT * FROM druid.CamelCase");
MaterializedResult materializedRows2 = computeActual("SELECT * FROM druid.camelcase");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified using assertQuery("SELECT COUNT(1) FROM druid.druid.camelcase", "VALUES 10")

assertThat(materializedRows.equals(materializedRows1));
assertThat(materializedRows.equals(materializedRows2));
}

@Test
public void testTableNameClash()
throws IOException, InterruptedException
{
try {
//ingesting data with already existing table name in lowercase which should fail
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect ingestion to work since Druid is case-sensitive. But querying such tables should fail from Trino.

copyAndIngestTpchData(getQueryRunner().execute(SELECT_FROM_REGION + " LIMIT 10"), this.druidServer, "region", "camelcase");
}
catch (AssertionError e) {
Assert.assertEquals(e.getMessage(), "Datasource camelcase not loaded expected [true] but found [false]");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share the entire stack trace?

This exception itself can also happen if something fails on Druid end rather than our side.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
this.druidServer = new TestingDruidServer();
QueryRunner runner = DruidQueryRunner.createDruidQueryRunnerTpch(druidServer, ImmutableMap.of());
QueryRunner runner = DruidQueryRunner.createDruidQueryRunnerTpch(druidServer, ImmutableMap.of(), ImmutableMap.of());
copyAndIngestTpchData(runner.execute(SELECT_FROM_ORDERS), this.druidServer, ORDERS.getTableName());
copyAndIngestTpchData(runner.execute(SELECT_FROM_LINEITEM), this.druidServer, LINE_ITEM.getTableName());
copyAndIngestTpchData(runner.execute(SELECT_FROM_NATION), this.druidServer, NATION.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
this.druidServer = new TestingDruidServer(LATEST_DRUID_DOCKER_IMAGE);
QueryRunner runner = DruidQueryRunner.createDruidQueryRunnerTpch(druidServer, ImmutableMap.of());
QueryRunner runner = DruidQueryRunner.createDruidQueryRunnerTpch(druidServer, ImmutableMap.of(), ImmutableMap.of());
copyAndIngestTpchData(runner.execute(SELECT_FROM_ORDERS), this.druidServer, ORDERS.getTableName());
copyAndIngestTpchData(runner.execute(SELECT_FROM_LINEITEM), this.druidServer, LINE_ITEM.getTableName());
copyAndIngestTpchData(runner.execute(SELECT_FROM_NATION), this.druidServer, NATION.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void ingestData(String datasource, String indexTaskFile, String dataFilePath)
middleManager.withCopyFileToContainer(forHostPath(dataFilePath),
getMiddleManagerContainerPathForDataFile(dataFilePath));
String indexTask = Resources.toString(getResource(indexTaskFile), Charset.defaultCharset());

indexTask = getReplacedIndexTask(datasource, indexTask);
Request.Builder requestBuilder = new Request.Builder();
requestBuilder.addHeader("content-type", "application/json;charset=utf-8")
.url("http://localhost:" + getCoordinatorOverlordPort() + "/druid/indexer/v1/task")
Expand All @@ -234,6 +234,13 @@ void ingestData(String datasource, String indexTaskFile, String dataFilePath)
}
}

private String getReplacedIndexTask(String targetDataSource, String indexTask)
{
indexTask = indexTask.replaceAll("dataSource\":.*,", "dataSource\": \"" + targetDataSource + "\",");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works until it breaks.
Can we read the indexTask as a JsonNode and replace the two nodes we are interested in and return the modified JsonNode back as a string?

indexTask = indexTask.replaceAll("filter\":.*", "filter\": \"" + targetDataSource + ".tsv\"");
return indexTask;
}

private boolean checkDatasourceAvailable(String datasource)
throws IOException, InterruptedException
{
Expand Down