Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private void setUpTableFromTpch(String tableName)
@Test
public void testEmptyTable()
{
String tableName = "test_stats_table_empty_" + randomTableSuffix();
String tableName = "test_empty_" + randomTableSuffix();
computeActual(format("CREATE TABLE %s AS SELECT orderkey, custkey, orderpriority, comment FROM tpch.tiny.orders WHERE false", tableName));
try {
gatherStats(tableName);
Expand Down
5 changes: 5 additions & 0 deletions plugin/trino-oracle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcExpression;
import io.trino.plugin.jdbc.JdbcJoinCondition;
import io.trino.plugin.jdbc.JdbcSortItem;
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.LongReadFunction;
Expand Down Expand Up @@ -56,13 +58,19 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.JoinCondition;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.statistics.ColumnStatistics;
import io.trino.spi.statistics.Estimate;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.CharType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import oracle.jdbc.OraclePreparedStatement;
import oracle.jdbc.OracleTypes;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;

import javax.inject.Inject;

Expand All @@ -85,7 +93,10 @@
import java.util.Set;
import java.util.function.BiFunction;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
Expand Down Expand Up @@ -142,6 +153,7 @@
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.function.Function.identity;

public class OracleClient
extends BaseJdbcClient
Expand Down Expand Up @@ -198,20 +210,23 @@ public class OracleClient
.buildOrThrow();

private final boolean synonymsEnabled;
private final boolean statisticsEnabled;
private final ConnectorExpressionRewriter<String> connectorExpressionRewriter;
private final AggregateFunctionRewriter<JdbcExpression, String> aggregateFunctionRewriter;

@Inject
public OracleClient(
BaseJdbcConfig config,
OracleConfig oracleConfig,
JdbcStatisticsConfig statisticsConfig,
ConnectionFactory connectionFactory,
QueryBuilder queryBuilder,
IdentifierMapping identifierMapping)
{
super(config, "\"", connectionFactory, queryBuilder, identifierMapping);

this.synonymsEnabled = oracleConfig.isSynonymsEnabled();
this.statisticsEnabled = requireNonNull(statisticsConfig, "statisticsConfig is null").isEnabled();

this.connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
.addStandardRules(this::quoted)
Expand Down Expand Up @@ -470,12 +485,109 @@ public boolean isLimitGuaranteed(ConnectorSession session)
return true;
}

@Override
public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List<JdbcSortItem> sortOrder)
{
return true;
}

@Override
protected Optional<TopNFunction> topNFunction()
{
// NOTE: The syntax used here is supported since Oracle 12c (older releases are not supported by Oracle)
return Optional.of(TopNFunction.sqlStandard(this::quoted));
}

@Override
public boolean isTopNGuaranteed(ConnectorSession session)
{
return true;
}

@Override
protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
{
return joinCondition.getOperator() != JoinCondition.Operator.IS_DISTINCT_FROM;
}

@Override
public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHandle handle, TupleDomain<ColumnHandle> tupleDomain)
{
if (!statisticsEnabled) {
return TableStatistics.empty();
}
if (!handle.isNamedRelation()) {
return TableStatistics.empty();
}
try {
return readTableStatistics(session, handle);
}
catch (SQLException | RuntimeException e) {
throwIfInstanceOf(e, TrinoException.class);
throw new TrinoException(JDBC_ERROR, "Failed fetching statistics for table: " + handle, e);
}
}

private TableStatistics readTableStatistics(ConnectorSession session, JdbcTableHandle table)
throws SQLException
{
checkArgument(table.isNamedRelation(), "Relation is not a table: %s", table);

try (Connection connection = connectionFactory.openConnection(session);
Handle handle = Jdbi.open(connection)) {
StatisticsDao statisticsDao = new StatisticsDao(handle);

Long rowCount = statisticsDao.getRowCount(table.getSchemaName(), table.getTableName());
if (rowCount == null) {
return TableStatistics.empty();
}

TableStatistics.Builder tableStatistics = TableStatistics.builder();
tableStatistics.setRowCount(Estimate.of(rowCount));

if (rowCount == 0) {
return tableStatistics.build();
}

Map<String, ColumnStatisticsResult> columnStatistics = statisticsDao.getColumnStatistics(table.getSchemaName(), table.getTableName()).stream()
.collect(toImmutableMap(ColumnStatisticsResult::getColumnName, identity()));

for (JdbcColumnHandle column : this.getColumns(session, table)) {
ColumnStatisticsResult result = columnStatistics.get(column.getColumnName());
if (result == null) {
continue;
}

ColumnStatistics statistics = ColumnStatistics.builder()
.setNullsFraction(result.getNullsCount()
.map(nullsCount -> Estimate.of(1.0 * nullsCount / rowCount))
.orElseGet(Estimate::unknown))
.setDistinctValuesCount(result.getDistinctValuesCount()
.map(Estimate::of)
.orElseGet(Estimate::unknown))
.setDataSize(result.getAverageColumnLength()
/*
* ALL_TAB_COLUMNS.AVG_COL_LEN is hard to interpret precisely:
* - it can be `0` for all-null column
* - it can be `len+1` for varchar column filled with constant of length `len`, as if each row contained a is-null byte or length
* - it can be `len/2+1` for varchar column half-filled with constant (or random) of length `len`, as if each row contained a is-null byte or length
* - it can be `2` for varchar column with single non-null value of length 10, as if ... (?)
* - it looks storage size does not directly depend on `IS NULL` column attribute
*
* Since the interpretation of the value is not obvious, we do not deduce is-null bytes. They will be accounted for second time in
* `PlanNodeStatsEstimate.getOutputSizeForSymbol`, but this is the safer thing to do.
*/
.map(averageColumnLength -> Estimate.of(1.0 * averageColumnLength * rowCount))
.orElseGet(Estimate::unknown))
.build();

tableStatistics.setColumnStatistics(column, statistics);
}

return tableStatistics.build();
}
}

public static LongWriteFunction trinoDateToOracleDateWriteFunction()
{
return new LongWriteFunction()
Expand Down Expand Up @@ -683,4 +795,76 @@ public void setColumnComment(ConnectorSession session, JdbcTableHandle handle, J
varcharLiteral(comment.orElse("")));
execute(session, sql);
}

private static class StatisticsDao
{
private final Handle handle;

public StatisticsDao(Handle handle)
{
this.handle = requireNonNull(handle, "handle is null");
}

Long getRowCount(String schema, String tableName)
{
return handle.createQuery("SELECT NUM_ROWS FROM ALL_TAB_STATISTICS WHERE OWNER = :schema AND TABLE_NAME = :table_name and PARTITION_NAME IS NULL")
.bind("schema", schema)
.bind("table_name", tableName)
.mapTo(Long.class)
.findFirst()
.orElse(null);
}

List<ColumnStatisticsResult> getColumnStatistics(String schema, String tableName)
{
// [SEP-3425] we are not using ALL_TAB_COL_STATISTICS, here because we observed queries which took multiple minutes when obtaining statistics for partitioned tables.
Comment thread
hashhar marked this conversation as resolved.
Outdated
// It adds slight risk, because the statistics-related columns in ALL_TAB_COLUMNS are marked as deprecated and present only for backward
// compatibility with Oracle 7 (see: https://docs.oracle.com/cd/B14117_01/server.101/b10755/statviews_1180.htm)
return handle.createQuery("SELECT COLUMN_NAME, NUM_NULLS, NUM_DISTINCT, AVG_COL_LEN FROM ALL_TAB_COLUMNS WHERE OWNER = :schema AND TABLE_NAME = :table_name")
.bind("schema", schema)
.bind("table_name", tableName)
.map((rs, ctx) -> new ColumnStatisticsResult(
requireNonNull(rs.getString("COLUMN_NAME"), "COLUMN_NAME is null"),
Optional.ofNullable(rs.getObject("NUM_NULLS", Long.class)),
Optional.ofNullable(rs.getObject("NUM_DISTINCT", Long.class)),
Optional.ofNullable(rs.getObject("AVG_COL_LEN", Long.class))))
.list();
}
}

private static class ColumnStatisticsResult
{
private final String columnName;
private final Optional<Long> nullsCount;
private final Optional<Long> distinctValuesCount;
private final Optional<Long> averageColumnLength;

ColumnStatisticsResult(String columnName, Optional<Long> nullsCount, Optional<Long> distinctValuesCount, Optional<Long> averageColumnLength)
{
this.columnName = columnName;
this.nullsCount = nullsCount;
this.distinctValuesCount = distinctValuesCount;
this.averageColumnLength = averageColumnLength;
}

String getColumnName()
{
return columnName;
}

Optional<Long> getNullsCount()
{
return nullsCount;
}

Optional<Long> getDistinctValuesCount()
{
return distinctValuesCount;
}

Optional<Long> getAverageColumnLength()
{
return averageColumnLength;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@

import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcJoinPushdownSupportModule;
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.MaxDomainCompactionThreshold;
import io.trino.plugin.jdbc.RetryingConnectionFactory;
import io.trino.plugin.jdbc.credential.CredentialProvider;
Expand All @@ -42,16 +44,18 @@
import static io.trino.plugin.oracle.OracleClient.ORACLE_MAX_LIST_EXPRESSIONS;

public class OracleClientModule
implements Module
extends AbstractConfigurationAwareModule
{
@Override
public void configure(Binder binder)
protected void setup(Binder binder)
{
binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(OracleClient.class).in(Scopes.SINGLETON);
bindSessionPropertiesProvider(binder, OracleSessionProperties.class);
configBinder(binder).bindConfig(OracleConfig.class);
configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class)).setBinding().toInstance(ORACLE_MAX_LIST_EXPRESSIONS);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
install(new JdbcJoinPushdownSupportModule());
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ public abstract class BaseOracleConnectorTest
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
switch (connectorBehavior) {
case SUPPORTS_TOPN_PUSHDOWN:
return false;

case SUPPORTS_AGGREGATION_PUSHDOWN:
case SUPPORTS_AGGREGATION_PUSHDOWN_STDDEV:
case SUPPORTS_AGGREGATION_PUSHDOWN_VARIANCE:
Expand Down
Loading