diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 86ba0a78d266..e5186220a41a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -508,7 +508,7 @@ public Optional finishInsert(ConnectorSession session, icebergTable.schema().findType(field.sourceId()))) .toArray(Type[]::new); - AppendFiles appendFiles = transaction.newFastAppend(); + AppendFiles appendFiles = transaction.newAppend(); for (CommitTaskData task : commitTasks) { DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) .withPath(task.getPath()) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java new file mode 100644 index 000000000000..d81681d264c6 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfiguration; +import io.trino.plugin.hive.HdfsConfigurationInitializer; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveHdfsConfiguration; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.MetastoreConfig; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.MaterializedResult; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorSession; +import org.apache.iceberg.Table; +import org.testng.annotations.Test; + +import java.io.File; + +import static org.testng.Assert.assertEquals; + +public class TestIcebergMergeAppend + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private HdfsEnvironment hdfsEnvironment; + private IcebergTableOperationsProvider tableOperationsProvider; + + @Override + protected QueryRunner createQueryRunner() throws Exception + { + DistributedQueryRunner queryRunner = IcebergQueryRunner.createIcebergQueryRunner(); + HdfsConfig hdfsConfig = new HdfsConfig(); + HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); + hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); + + File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data").toFile(); + metastore = new FileHiveMetastore( + new NodeVersion("testversion"), + hdfsEnvironment, + new MetastoreConfig(), + new FileHiveMetastoreConfig() + .setCatalogDirectory(baseDir.toURI().toString()) + .setMetastoreUser("test")); + tableOperationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); + + return queryRunner; + } + + @Test + public void testInsertWithAppend() + { + assertUpdate("CREATE TABLE table_to_insert (_bigint BIGINT, _varchar VARCHAR)"); + Table table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION, + new SchemaTableName("tpch", "table_to_insert")); + table.updateProperties() + .set("commit.manifest.min-count-to-merge", "2") + .commit(); + assertUpdate("INSERT INTO table_to_insert VALUES (1, 'a'), (2, 'b'), (3, 'c')", 3); + MaterializedResult result = computeActual("select * from \"table_to_insert$manifests\""); + assertEquals(result.getRowCount(), 1); + assertUpdate("INSERT INTO table_to_insert VALUES (4, 'd')", 1); + result = computeActual("select * from \"table_to_insert$manifests\""); + assertEquals(result.getRowCount(), 1); + } +}