From cb0bc08f1922a3a9dc5df2d6d3b5357a5c82a3fb Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 9 Jun 2022 16:29:46 +0200 Subject: [PATCH 1/3] Fix formatting --- ...TestConnectorPushdownRulesWithIceberg.java | 72 ++++++++++++++++--- 1 file changed, 64 insertions(+), 8 deletions(-) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index 22f7dae0155c..9f2db55caafb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -165,8 +165,22 @@ public void testProjectionPushdown() BIGINT, Optional.empty()); - IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", "", 1, - TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES, ImmutableList.of()); + IcebergTableHandle icebergTable = new IcebergTableHandle( + SCHEMA_NAME, + tableName, + DATA, + Optional.of(1L), + "", + "", + 1, + TupleDomain.all(), + TupleDomain.all(), + ImmutableSet.of(), + Optional.empty(), + "", + ImmutableMap.of(), + NO_RETRIES, + ImmutableList.of()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle fullColumn = partialColumn.getBaseColumn(); @@ -230,8 +244,22 @@ public void testPredicatePushdown() PushPredicateIntoTableScan pushPredicateIntoTableScan = new PushPredicateIntoTableScan(tester().getPlannerContext(), tester().getTypeAnalyzer()); - IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", "", 1, - TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES, ImmutableList.of()); + IcebergTableHandle icebergTable = new IcebergTableHandle( + SCHEMA_NAME, + tableName, + DATA, + Optional.of(1L), + "", + "", + 1, + TupleDomain.all(), + TupleDomain.all(), + ImmutableSet.of(), + Optional.empty(), + "", + ImmutableMap.of(), + NO_RETRIES, + ImmutableList.of()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle column = new IcebergColumnHandle(primitiveColumnIdentity(1, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()); @@ -263,8 +291,22 @@ public void testColumnPruningProjectionPushdown() PruneTableScanColumns pruneTableScanColumns = new PruneTableScanColumns(tester().getMetadata()); - IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.empty(), "", "", 1, - TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES, ImmutableList.of()); + IcebergTableHandle icebergTable = new IcebergTableHandle( + SCHEMA_NAME, + tableName, + DATA, + Optional.empty(), + "", + "", + 1, + TupleDomain.all(), + TupleDomain.all(), + ImmutableSet.of(), + Optional.empty(), + "", + ImmutableMap.of(), + NO_RETRIES, + ImmutableList.of()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle columnA = new IcebergColumnHandle(primitiveColumnIdentity(0, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()); @@ -307,8 +349,22 @@ public void testPushdownWithDuplicateExpressions() tester().getTypeAnalyzer(), new ScalarStatsCalculator(tester().getPlannerContext(), tester().getTypeAnalyzer())); - IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", "", 1, - TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES, ImmutableList.of()); + IcebergTableHandle icebergTable = new IcebergTableHandle( + SCHEMA_NAME, + tableName, + DATA, + Optional.of(1L), + "", + "", + 1, + TupleDomain.all(), + TupleDomain.all(), + ImmutableSet.of(), + Optional.empty(), + "", + ImmutableMap.of(), + NO_RETRIES, + ImmutableList.of()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle bigintColumn = new IcebergColumnHandle(primitiveColumnIdentity(1, "just_bigint"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty()); From 6fe3ccd1c155ab7319ace6f7e1910089aae446c9 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 9 Jun 2022 16:21:19 +0200 Subject: [PATCH 2/3] Invoke main IcebergTableHandle constructor Don't invoke `IcebergTableHandle` constructor meant for deserialization explicitly. This fixes potential information loss in `IcebergMetadata.applyFilter` (it's believed it doesn't matter). To prevent future mistakes, the deserialization constructor is changed to a factory method. --- .../io/trino/plugin/iceberg/IcebergMetadata.java | 11 ++++++++--- .../trino/plugin/iceberg/IcebergTableHandle.java | 4 ++-- .../TestIcebergNodeLocalDynamicSplitPruning.java | 4 +++- .../plugin/iceberg/TestIcebergSplitSource.java | 4 +++- .../TestConnectorPushdownRulesWithIceberg.java | 16 ++++++++++++---- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 77048ae3d89d..b355aa18a833 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -325,7 +325,9 @@ public IcebergTableHandle getTableHandle( table.location(), table.properties(), NO_RETRIES, - ImmutableList.of()); + ImmutableList.of(), + false, + Optional.empty()); } @Override @@ -1716,7 +1718,8 @@ public Optional> applyFilter(C } return Optional.of(new ConstraintApplicationResult<>( - new IcebergTableHandle(table.getSchemaName(), + new IcebergTableHandle( + table.getSchemaName(), table.getTableName(), table.getTableType(), table.getSnapshotId(), @@ -1730,7 +1733,9 @@ public Optional> applyFilter(C table.getTableLocation(), table.getStorageProperties(), table.getRetryMode(), - table.getUpdatedColumns()), + table.getUpdatedColumns(), + table.isRecordScannedFiles(), + table.getMaxScannedFileSize()), remainingConstraint.transformKeys(ColumnHandle.class::cast), false)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index e2076e529a2d..a0e822199ce1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -65,7 +65,7 @@ public class IcebergTableHandle private final Optional maxScannedFileSize; @JsonCreator - public IcebergTableHandle( + public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("tableType") TableType tableType, @@ -82,7 +82,7 @@ public IcebergTableHandle( @JsonProperty("retryMode") RetryMode retryMode, @JsonProperty("updatedColumns") List updatedColumns) { - this( + return new IcebergTableHandle( schemaName, tableName, tableType, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 266215721217..33132685e408 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -182,7 +182,9 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle outputFile.getParentFile().getAbsolutePath(), ImmutableMap.of(), RetryMode.NO_RETRIES, - ImmutableList.of()), + ImmutableList.of(), + false, + Optional.empty()), transaction); FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index d26c488e8a41..49d066633a3e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -137,7 +137,9 @@ public void testIncompleteDynamicFilterTimeout() nationTable.location(), nationTable.properties(), NO_RETRIES, - ImmutableList.of()); + ImmutableList.of(), + false, + Optional.empty()); IcebergSplitSource splitSource = new IcebergSplitSource( tableHandle, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index 9f2db55caafb..4976d3aec498 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -180,7 +180,9 @@ public void testProjectionPushdown() "", ImmutableMap.of(), NO_RETRIES, - ImmutableList.of()); + ImmutableList.of(), + false, + Optional.empty()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle fullColumn = partialColumn.getBaseColumn(); @@ -259,7 +261,9 @@ public void testPredicatePushdown() "", ImmutableMap.of(), NO_RETRIES, - ImmutableList.of()); + ImmutableList.of(), + false, + Optional.empty()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle column = new IcebergColumnHandle(primitiveColumnIdentity(1, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()); @@ -306,7 +310,9 @@ public void testColumnPruningProjectionPushdown() "", ImmutableMap.of(), NO_RETRIES, - ImmutableList.of()); + ImmutableList.of(), + false, + Optional.empty()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle columnA = new IcebergColumnHandle(primitiveColumnIdentity(0, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()); @@ -364,7 +370,9 @@ public void testPushdownWithDuplicateExpressions() "", ImmutableMap.of(), NO_RETRIES, - ImmutableList.of()); + ImmutableList.of(), + false, + Optional.empty()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle bigintColumn = new IcebergColumnHandle(primitiveColumnIdentity(1, "just_bigint"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty()); From 8b1a88d3874fefd917622a432a639b5f1cb03d34 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 9 Jun 2022 16:33:38 +0200 Subject: [PATCH 3/3] Close IcebergSplitSource in test --- .../iceberg/TestIcebergSplitSource.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 49d066633a3e..0c4a383148ce 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -141,7 +141,7 @@ public void testIncompleteDynamicFilterTimeout() false, Optional.empty()); - IcebergSplitSource splitSource = new IcebergSplitSource( + try (IcebergSplitSource splitSource = new IcebergSplitSource( tableHandle, nationTable.newScan(), Optional.empty(), @@ -188,21 +188,21 @@ public TupleDomain getCurrentPredicate() alwaysTrue(), new TestingTypeManager(), false, - new IcebergConfig().getMinimumAssignedSplitWeight()); - - ImmutableList.Builder splits = ImmutableList.builder(); - while (!splitSource.isFinished()) { - splitSource.getNextBatch(null, 100).get() - .getSplits() - .stream() - .map(IcebergSplit.class::cast) - .forEach(splits::add); + new IcebergConfig().getMinimumAssignedSplitWeight())) { + ImmutableList.Builder splits = ImmutableList.builder(); + while (!splitSource.isFinished()) { + splitSource.getNextBatch(null, 100).get() + .getSplits() + .stream() + .map(IcebergSplit.class::cast) + .forEach(splits::add); + } + assertThat(splits.build().size()).isGreaterThan(0); + assertTrue(splitSource.isFinished()); + assertThat(System.currentTimeMillis() - startMillis) + .as("IcebergSplitSource failed to wait for dynamicFilteringWaitTimeout") + .isGreaterThanOrEqualTo(2000); } - assertThat(splits.build().size()).isGreaterThan(0); - assertTrue(splitSource.isFinished()); - assertThat(System.currentTimeMillis() - startMillis) - .as("IcebergSplitSource failed to wait for dynamicFilteringWaitTimeout") - .isGreaterThanOrEqualTo(2000); } @Test