Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4058,8 +4058,8 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// delta lake supports only a columnar (parquet) storage format
return true;
// dont split to subqueries if tableHandle is systemTableHandle, delta lake supports only a columnar (parquet) storage format
return tableHandle instanceof DeltaLakeTableHandle;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3984,7 +3984,12 @@ private static Optional<CatalogSchemaTableName> redirectTableToHudi(Optional<Str
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle)
{
SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName();
// dont split to subqueries if tableHandle is systemTableHandle
if (!(tableHandle instanceof HiveTableHandle hiveTableHandle)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would check for the SystemTableHandle in the io.trino.metadata.MetadataManager#allowSplittingReadIntoMultipleSubQueries. This avoids copying this check to multiple places.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filtering for SystemTableHandle in MetadataManager would indeed work for this specific case, but it's not clear to me that we won't encounter any more cases where tableHandle is different from HiveTableHandle.
It seems safer to just specifically test for connector specific table handle, even if there is code repetition.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't pass any table handle to any connector. Every connector expects only its own TableHandle class. The casts like that are all over the HiveMetadata. The CatalogHandle actually has io.trino.spi.connector.CatalogHandle#type that in this case is set to io.trino.spi.connector.CatalogHandle.CatalogHandleType#SYSTEM.

return false;
}

SchemaTableName tableName = hiveTableHandle.getSchemaTableName();

Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

import io.trino.testing.AbstractTestAggregations;
import io.trino.testing.QueryRunner;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;

import static io.trino.testing.TestingNames.randomNameSuffix;

public class TestHiveDistributedAggregations
extends AbstractTestAggregations
Expand All @@ -27,4 +31,20 @@ protected QueryRunner createQueryRunner()
.setInitialTables(REQUIRED_TPCH_TABLES)
.build();
}

@Test
public void testDistinctAggregationWithSystemTable()
{
String tableName = "test_dist_aggr_" + randomNameSuffix();
@Language("SQL") String createTable = """
CREATE TABLE %s
WITH (
partitioned_by = ARRAY[ 'regionkey', 'nationkey' ]
) AS (SELECT name, comment, regionkey, nationkey FROM nation)
""".formatted(tableName);

assertUpdate(getSession(), createTable, 25);

assertQuerySucceeds("SELECT count(distinct regionkey), count(distinct nationkey) FROM \"%s$partitions\"".formatted(tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// hudi supports only a columnar (parquet) storage format
return true;
// dont split to subqueries if tableHandle is systemTableHandle, hudi supports only a columnar (parquet) storage format
return tableHandle instanceof HudiTableHandle;
}

HiveMetastore getMetastore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3312,8 +3312,11 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle connectorTableHandle)
{
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle;
IcebergFileFormat storageFormat = getFileFormat(tableHandle.getStorageProperties());
// dont split to subqueries if tableHandle is systemTableHandle
if (!(connectorTableHandle instanceof IcebergTableHandle icebergTableHandle)) {
return false;
}
IcebergFileFormat storageFormat = getFileFormat(icebergTableHandle.getStorageProperties());

return storageFormat == IcebergFileFormat.ORC || storageFormat == IcebergFileFormat.PARQUET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ public Optional<SampleApplicationResult<ConnectorTableHandle>> applySample(Conne
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return true;
// dont split to subqueries if tableHandle is systemTableHandle
return tableHandle instanceof MemoryTableHandle;
}

@Override
Expand Down