Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions presto-base-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
<artifactId>presto-spi</artifactId>
</dependency>

<!-- Presto Expressions -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-expressions</artifactId>
Expand Down Expand Up @@ -172,6 +173,12 @@
<artifactId>presto-tests</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-parser</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ public void destroy()
connectionFactory.close();
}

@Override
public String getIdentifierQuote()
{
return identifierQuote;
}

@Override
public final Set<String> getSchemaNames(JdbcIdentity identity)
{
Expand Down Expand Up @@ -265,7 +271,7 @@ public ConnectorSplitSource getSplits(JdbcIdentity identity, JdbcTableLayoutHand
tableHandle.getSchemaName(),
tableHandle.getTableName(),
layoutHandle.getTupleDomain(),
Optional.empty());
layoutHandle.getAdditionalPredicate());
return new FixedSplitSource(ImmutableList.of(jdbcSplit));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ default boolean schemaExists(JdbcIdentity identity, String schema)
return getSchemaNames(identity).contains(schema);
}

String getIdentifierQuote();

Set<String> getSchemaNames(JdbcIdentity identity);

List<SchemaTableName> getTableNames(JdbcIdentity identity, Optional<String> schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@

import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.plugin.jdbc.optimization.JdbcPlanOptimizerProvider;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorAccessControl;
import com.facebook.presto.spi.connector.ConnectorCapabilities;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.google.common.collect.ImmutableSet;

Expand Down Expand Up @@ -55,6 +60,10 @@ public class JdbcConnector
private final Set<Procedure> procedures;

private final ConcurrentMap<ConnectorTransactionHandle, JdbcMetadata> transactions = new ConcurrentHashMap<>();
private final FunctionMetadataManager functionManager;
private final StandardFunctionResolution functionResolution;
private final RowExpressionService rowExpressionService;
private final JdbcClient jdbcClient;

@Inject
public JdbcConnector(
Expand All @@ -64,7 +73,11 @@ public JdbcConnector(
JdbcRecordSetProvider jdbcRecordSetProvider,
JdbcPageSinkProvider jdbcPageSinkProvider,
Optional<ConnectorAccessControl> accessControl,
Set<Procedure> procedures)
Set<Procedure> procedures,
FunctionMetadataManager functionManager,
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService,
JdbcClient jdbcClient)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.jdbcMetadataFactory = requireNonNull(jdbcMetadataFactory, "jdbcMetadataFactory is null");
Expand All @@ -73,6 +86,21 @@ public JdbcConnector(
this.jdbcPageSinkProvider = requireNonNull(jdbcPageSinkProvider, "jdbcPageSinkProvider is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.functionManager = requireNonNull(functionManager, "functionManager is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
}

@Override
public ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider()
{
return new JdbcPlanOptimizerProvider(
jdbcClient,
functionManager,
functionResolution,
rowExpressionService.getDeterminismEvaluator(),
rowExpressionService.getExpressionOptimizer());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.google.inject.Injector;
import com.google.inject.Module;

Expand Down Expand Up @@ -62,7 +65,14 @@ public Connector create(String catalogName, Map<String, String> requiredConfig,
requireNonNull(requiredConfig, "requiredConfig is null");

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(new JdbcModule(catalogName), module);
Bootstrap app = new Bootstrap(
binder -> {
binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());
binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution());
binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService());
},
new JdbcModule(catalogName),
module);

Injector injector = app
.strictConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
ConnectorTableLayout layout = new ConnectorTableLayout(new JdbcTableLayoutHandle(tableHandle, constraint.getSummary()));
ConnectorTableLayout layout = new ConnectorTableLayout(new JdbcTableLayoutHandle(tableHandle, constraint.getSummary(), Optional.empty()));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.plugin.jdbc.optimization.JdbcExpression;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
Expand All @@ -36,7 +37,7 @@ public class JdbcSplit
private final String schemaName;
private final String tableName;
private final TupleDomain<ColumnHandle> tupleDomain;
private final Optional<String> additionalPredicate;
private final Optional<JdbcExpression> additionalPredicate;

@JsonCreator
public JdbcSplit(
Expand All @@ -45,7 +46,7 @@ public JdbcSplit(
@JsonProperty("schemaName") @Nullable String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain,
@JsonProperty("additionalProperty") Optional<String> additionalPredicate)
@JsonProperty("additionalProperty") Optional<JdbcExpression> additionalPredicate)
{
this.connectorId = requireNonNull(connectorId, "connector id is null");
this.catalogName = catalogName;
Expand Down Expand Up @@ -88,7 +89,7 @@ public TupleDomain<ColumnHandle> getTupleDomain()
}

@JsonProperty
public Optional<String> getAdditionalPredicate()
public Optional<JdbcExpression> getAdditionalPredicate()
{
return additionalPredicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
*/
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.plugin.jdbc.optimization.JdbcExpression;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -28,14 +30,23 @@ public class JdbcTableLayoutHandle
{
private final JdbcTableHandle table;
private final TupleDomain<ColumnHandle> tupleDomain;
private final Optional<JdbcExpression> additionalPredicate;

@JsonCreator
public JdbcTableLayoutHandle(
@JsonProperty("table") JdbcTableHandle table,
@JsonProperty("tupleDomain") TupleDomain<ColumnHandle> domain)
@JsonProperty("tupleDomain") TupleDomain<ColumnHandle> domain,
@JsonProperty("additionalPredicate") Optional<JdbcExpression> additionalPredicate)
{
this.table = requireNonNull(table, "table is null");
this.tupleDomain = requireNonNull(domain, "tupleDomain is null");
this.additionalPredicate = additionalPredicate;
}

@JsonProperty
public Optional<JdbcExpression> getAdditionalPredicate()
{
return additionalPredicate;
}

@JsonProperty
Expand All @@ -61,13 +72,14 @@ public boolean equals(Object o)
}
JdbcTableLayoutHandle that = (JdbcTableLayoutHandle) o;
return Objects.equals(table, that.table) &&
Objects.equals(tupleDomain, that.tupleDomain);
Objects.equals(tupleDomain, that.tupleDomain) &&
Objects.equals(additionalPredicate, that.additionalPredicate);
}

@Override
public int hashCode()
{
return Objects.hash(table, tupleDomain);
return Objects.hash(table, tupleDomain, additionalPredicate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.plugin.jdbc.optimization.JdbcExpression;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.Range;
Expand Down Expand Up @@ -102,7 +103,7 @@ public PreparedStatement buildSql(
String table,
List<JdbcColumnHandle> columns,
TupleDomain<ColumnHandle> tupleDomain,
Optional<String> additionalPredicate)
Optional<JdbcExpression> additionalPredicate)
throws SQLException
{
StringBuilder sql = new StringBuilder();
Expand Down Expand Up @@ -133,8 +134,11 @@ public PreparedStatement buildSql(
if (additionalPredicate.isPresent()) {
clauses = ImmutableList.<String>builder()
.addAll(clauses)
.add(additionalPredicate.get())
.add(additionalPredicate.get().getExpression())
.build();
accumulator.addAll(additionalPredicate.get().getBoundConstantValues().stream()
.map(constantExpression -> new TypeAndValue(constantExpression.getType(), constantExpression.getValue()))
.collect(ImmutableList.toImmutableList()));
}
if (!clauses.isEmpty()) {
sql.append(" WHERE ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.plugin.jdbc.optimization;

import com.facebook.presto.expressions.LogicalRowExpressions;
import com.facebook.presto.expressions.translator.TranslatedExpression;
import com.facebook.presto.plugin.jdbc.JdbcTableHandle;
import com.facebook.presto.plugin.jdbc.JdbcTableLayoutHandle;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
Expand All @@ -28,24 +30,47 @@
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.ExpressionOptimizer;
import com.facebook.presto.spi.relation.RowExpression;
import com.google.common.collect.ImmutableList;

import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.expressions.translator.FunctionTranslator.buildFunctionTranslator;
import static com.facebook.presto.expressions.translator.RowExpressionTreeTranslator.translateWith;
import static com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED;
import static java.util.Objects.requireNonNull;

public class JdbcComputePushdown
implements ConnectorPlanOptimizer
{
private final ExpressionOptimizer expressionOptimizer;
private final JdbcFilterToSqlTranslator jdbcFilterToSqlTranslator;
private final LogicalRowExpressions logicalRowExpressions;

public JdbcComputePushdown(
FunctionMetadataManager functionMetadataManager,
StandardFunctionResolution functionResolution,
DeterminismEvaluator determinismEvaluator,
ExpressionOptimizer expressionOptimizer)
ExpressionOptimizer expressionOptimizer,
String identifierQuote,
Set<Class<?>> functionTranslators)
{
this.expressionOptimizer = expressionOptimizer;
requireNonNull(functionMetadataManager, "functionMetadataManager is null");
requireNonNull(identifierQuote, "identifierQuote is null");
requireNonNull(functionTranslators, "functionTranslators is null");
requireNonNull(determinismEvaluator, "determinismEvaluator is null");
requireNonNull(functionResolution, "functionResolution is null");

this.expressionOptimizer = requireNonNull(expressionOptimizer, "expressionOptimizer is null");
this.jdbcFilterToSqlTranslator = new JdbcFilterToSqlTranslator(
functionMetadataManager,
buildFunctionTranslator(functionTranslators),
identifierQuote);
this.logicalRowExpressions = new LogicalRowExpressions(
determinismEvaluator,
functionResolution,
functionMetadataManager);
}

@Override
Expand Down Expand Up @@ -100,18 +125,23 @@ public PlanNode visitFilter(FilterNode node, Void context)
TableHandle oldTableHandle = oldTableScanNode.getTable();
JdbcTableHandle oldConnectorTable = (JdbcTableHandle) oldTableHandle.getConnectorHandle();

// TODO: remove dependency on oldTableLayoutHandle, currently it needs oldTableLayoutHandle to get predicate
if (!oldTableHandle.getLayout().isPresent()) {
RowExpression predicate = expressionOptimizer.optimize(node.getPredicate(), OPTIMIZED, session);
predicate = logicalRowExpressions.convertToConjunctiveNormalForm(predicate);
TranslatedExpression<JdbcExpression> jdbcExpression = translateWith(
predicate,
jdbcFilterToSqlTranslator,
oldTableScanNode.getAssignments());

// TODO if jdbcExpression is not present, walk through translated subtree to find out which parts can be pushed down
if (!oldTableHandle.getLayout().isPresent() || !jdbcExpression.getTranslated().isPresent()) {
return node;
}

// TODO: FilterRowExpression is currently mocked, needs to be implemented

JdbcTableLayoutHandle oldTableLayoutHandle = (JdbcTableLayoutHandle) oldTableHandle.getLayout().get();
// TODO: add pushdownResult to new TableLayoutHandle
JdbcTableLayoutHandle newTableLayoutHandle = new JdbcTableLayoutHandle(
oldConnectorTable,
oldTableLayoutHandle.getTupleDomain());
oldTableLayoutHandle.getTupleDomain(),
jdbcExpression.getTranslated());

TableHandle tableHandle = new TableHandle(
oldTableHandle.getConnectorId(),
Expand Down
Loading