diff --git a/docs/src/main/sphinx/connector/hudi.rst b/docs/src/main/sphinx/connector/hudi.rst index 1de9ed6f7a31..50c5aa7db281 100644 --- a/docs/src/main/sphinx/connector/hudi.rst +++ b/docs/src/main/sphinx/connector/hudi.rst @@ -220,6 +220,46 @@ The output of the query has the following columns: - ``varchar`` - Current state of the instant +Rolling back to a previous version +----------------------------------- + +Hudi allows us to store multiple versions of data in the table overtime while +providing ``snapshot isolation``. The number of versions are configurable and depends +on how much tradeoff one prefers for space used by those older versions. Hence, as +one can imagine, Hudi has the capabilities to allow to accessing the state of a table +at a certain point/instant in time. This could be done by allowing read mechanisms +from a Hudi table that go back and read the version of the data that was written a +particular point/instant in time. + +For example, you could find the version IDs for the ``customer_orders`` table +by running the following query:: + + SELECT timestamp + FROM example.testdb."customer_orders$timeline" + ORDER BY timestamp DESC + +Time travel queries +^^^^^^^^^^^^^^^^^^^ + +The connector offers the ability to query historical data. +This allows you to query the table as it was when a previous snapshot +of the table was taken, even if the data has since been modified or deleted. + +The historical data of the table can be retrieved by specifying the +snapshot identifier corresponding to the version of the table that +needs to be retrieved:: + + SELECT * + FROM example.testdb.customer_orders FOR VERSION AS OF 8954597067493422955 + +A different approach of retrieving historical data is to specify +a point in time in the past, such as a day or week ago. The latest snapshot +of the table taken before or at the specified timestamp in the query is +internally used for providing the previous state of the table:: + + SELECT * + FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 Europe/Vienna' + File formats ------------ diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index e6226712a3a1..9232502de089 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -30,6 +30,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTableVersion; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.SchemaTableName; @@ -61,7 +62,9 @@ import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide; import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY; +import static io.trino.plugin.hudi.HudiUtil.getHoodieInstantTime; import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE; +import static io.trino.spi.connector.PointerType.TARGET_ID; import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.lang.String.format; import static java.util.Collections.singletonList; @@ -98,6 +101,16 @@ public List listSchemaNames(ConnectorSession session) @Override public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + { + return getTableHandle(session, tableName, Optional.empty(), Optional.empty()); + } + + @Override + public HudiTableHandle getTableHandle( + ConnectorSession session, + SchemaTableName tableName, + Optional startVersion, + Optional endVersion) { if (isHiveSystemSchema(tableName.getSchemaName())) { return null; @@ -109,11 +122,21 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName if (!isHudiTable(session, table.get())) { throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName)); } + if (endVersion.isPresent() && endVersion.get().getPointerType() == TARGET_ID) { + startVersion = endVersion; + } + List dataColumns = hiveColumnHandles(table.get(), typeManager, NANOSECONDS).stream() + .collect(toImmutableList()); + return new HudiTableHandle( tableName.getSchemaName(), tableName.getTableName(), table.get().getStorage().getLocation(), HoodieTableType.COPY_ON_WRITE, + ImmutableList.of(), + dataColumns, + getHoodieInstantTime(startVersion), + getHoodieInstantTime(endVersion), TupleDomain.all(), TupleDomain.all()); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java index 7b092288fc68..d4a578deef88 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java @@ -15,12 +15,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; import org.apache.hudi.common.model.HoodieTableType; +import java.util.List; +import java.util.Optional; + import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.util.Objects.requireNonNull; @@ -31,6 +35,13 @@ public class HudiTableHandle private final String tableName; private final String basePath; private final HoodieTableType tableType; + + private final List dataColumns; + + private final List partitionColumns; + private final Optional startVersion; + private final Optional endVersion; + private final TupleDomain partitionPredicates; private final TupleDomain regularPredicates; @@ -40,6 +51,10 @@ public HudiTableHandle( @JsonProperty("tableName") String tableName, @JsonProperty("basePath") String basePath, @JsonProperty("tableType") HoodieTableType tableType, + @JsonProperty("partitionColumns") List partitionColumns, + @JsonProperty("dataColumns") List dataColumns, + @JsonProperty("startVersion") Optional startVersion, + @JsonProperty("endVersion") Optional endVersion, @JsonProperty("partitionPredicates") TupleDomain partitionPredicates, @JsonProperty("regularPredicates") TupleDomain regularPredicates) { @@ -47,6 +62,10 @@ public HudiTableHandle( this.tableName = requireNonNull(tableName, "tableName is null"); this.basePath = requireNonNull(basePath, "basePath is null"); this.tableType = requireNonNull(tableType, "tableType is null"); + this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null")); + this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null")); + this.startVersion = requireNonNull(startVersion, "startVersion is null"); + this.endVersion = requireNonNull(endVersion, "endVersion is null"); this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null"); this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null"); } @@ -87,11 +106,36 @@ public TupleDomain getRegularPredicates() return regularPredicates; } + @JsonProperty + public Optional getStartVersion() + { + return startVersion; + } + + @JsonProperty + public Optional getEndVersion() + { + return endVersion; + } + + @JsonProperty public SchemaTableName getSchemaTableName() { return schemaTableName(schemaName, tableName); } + @JsonProperty + public List getDataColumns() + { + return dataColumns; + } + + @JsonProperty + public List getPartitionColumns() + { + return partitionColumns; + } + HudiTableHandle applyPredicates( TupleDomain partitionTupleDomain, TupleDomain regularTupleDomain) @@ -101,6 +145,10 @@ HudiTableHandle applyPredicates( tableName, basePath, tableType, + partitionColumns, + dataColumns, + startVersion, + endVersion, partitionPredicates.intersect(partitionTupleDomain), regularPredicates.intersect(regularTupleDomain)); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java index 4976e7e3e539..e908cf2d606e 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java @@ -22,10 +22,13 @@ import io.trino.plugin.hive.metastore.Column; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorTableVersion; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.LongTimestampWithTimeZone; +import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -38,18 +41,28 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import java.io.IOException; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.time.ZoneOffset; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.util.HiveUtil.checkCondition; import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; import static java.util.stream.Collectors.toList; public final class HudiUtil { + public static final String DEFAULT_INSTANT_TIME_FORMAT = "yyyyMMddHHmmssSSS"; + private HudiUtil() {} public static boolean isHudiParquetInputFormat(InputFormat inputFormat) @@ -171,4 +184,38 @@ public static FileStatus getFileStatus(HoodieBaseFile baseFile) throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error getting file status of " + baseFile.getPath(), e); } } + + public static Optional getHoodieInstantTime(Optional optionalVersion) + { + if (optionalVersion.isPresent()) { + ConnectorTableVersion version = optionalVersion.get(); + SimpleDateFormat formatter = new SimpleDateFormat(DEFAULT_INSTANT_TIME_FORMAT); + formatter.setTimeZone(TimeZone.getTimeZone(ZoneOffset.UTC)); + io.trino.spi.type.Type versionType = version.getVersionType(); + switch (version.getPointerType()) { + case TEMPORAL: + long epochMillis; + if (versionType instanceof TimestampWithTimeZoneType) { + epochMillis = ((TimestampWithTimeZoneType) versionType).isShort() + ? unpackMillisUtc((long) version.getVersion()) + : ((LongTimestampWithTimeZone) version.getVersion()).getEpochMillis(); + } + else { + throw new TrinoException(NOT_SUPPORTED, "Unsupported type for temporal table version: " + versionType.getDisplayName()); + } + return Optional.of(formatter.format(new Timestamp(epochMillis))); + case TARGET_ID: + if (versionType != BIGINT) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported type for table version: " + versionType.getDisplayName()); + } + String instantTime = String.valueOf(version.getVersion()); + + return Optional.of(instantTime); + } + throw new TrinoException(NOT_SUPPORTED, "Version pointer type is not supported: " + version.getPointerType()); + } + else { + return Optional.empty(); + } + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java index 92aad499ce63..e9b93f6bc954 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; import java.util.Collections; import java.util.List; @@ -101,6 +102,15 @@ public List getPartitionsToScan() @Override public List listStatus(HudiPartitionInfo partitionInfo) { + if (tableHandle.getEndVersion().isPresent()) { + return fileSystemView.getAllFileGroups(partitionInfo.getRelativePartitionPath()) + .map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(tableHandle.getEndVersion().get())) + .filter(Option::isPresent) + .map(fileSliceOption -> fileSliceOption.get().getBaseFile()) + .filter(Option::isPresent) + .map(baseFileOption -> getFileStatus(baseFileOption.get())) + .collect(toImmutableList()); + } return fileSystemView.getLatestBaseFiles(partitionInfo.getRelativePartitionPath()) .map(baseFile -> getFileStatus(baseFile)) .collect(toImmutableList()); diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/BaseHudiConnectorTest.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/BaseHudiConnectorTest.java index c25a2f8ee100..09eae68142a2 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/BaseHudiConnectorTest.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/BaseHudiConnectorTest.java @@ -102,4 +102,14 @@ protected static String columnsToHide() columns.add(TpchHudiTablesInitializer.FIELD_UUID); return String.join(",", columns); } + + @Override + protected void verifyVersionedQueryFailurePermissible(Exception e) + { + assertThat(e) + .hasMessageMatching("Version pointer type is not supported: .*|" + + "Unsupported type for temporal table version: .*|" + + "Unsupported type for table version: .*|" + + "No version history table tpch.nation at or before .*"); + } } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java index f00491943ffe..6485ca983310 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java @@ -17,6 +17,8 @@ import io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.nio.file.Files; @@ -165,6 +167,31 @@ public void testPartitionColumn() assertQueryFails("SELECT \"$partition\" FROM " + HUDI_NON_PART_COW, ".* Column '\\$partition' cannot be resolved"); } + @Test(dataProvider = "timeTravelQueryDataProvider") + public void testTimeTravelQuery(@Language("SQL") String sql, String expected) + { + assertQuery(sql, expected); + } + + @DataProvider(name = "timeTravelQueryDataProvider") + public Object[][] timeTravelQueryDataProvider() + { + return new Object[][] { + {"SELECT count(_hoodie_commit_time) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-16 07:14:53.74700 Z'", "VALUES 99"}, + {"SELECT count(1) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-16 07:14:53.74700 Z'", "VALUES 99"}, + {"SELECT count(*) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-16 07:14:53.74700 Z'", "VALUES 99"}, + {"SELECT count(_hoodie_commit_time) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216071453747", "VALUES 99"}, + {"SELECT count(1) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216071453747", "VALUES 99"}, + {"SELECT count(*) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216071453747", "VALUES 99"}, + {"SELECT count(_hoodie_commit_time) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216009999999", "VALUES 0"}, + {"SELECT count(1) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216009999999", "VALUES 0"}, + {"SELECT count(*) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216009999999", "VALUES 0"}, + {"SELECT count(_hoodie_commit_time) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-15 16:00:00.000000000 Z'", "VALUES 0"}, + {"SELECT count(1) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-15 16:00:00.000000000 Z'", "VALUES 0"}, + {"SELECT count(*) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-15 16:00:00.000000000 Z'", "VALUES 0"}, + }; + } + private static Path toPath(String path) { // Remove leading 'file:' because $path column returns 'file:/path-to-file' in case of local file system diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHudi.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHudi.java index ad020c483caa..41ee08d439c6 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHudi.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHudi.java @@ -39,6 +39,7 @@ import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP; import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS; +import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_HADOOP_INIT_D; import static io.trino.tests.product.launcher.env.common.Minio.MINIO_CONTAINER_NAME; import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; import static java.util.Objects.requireNonNull; @@ -81,12 +82,20 @@ public EnvSinglenodeHudi( @Override public void extendEnvironment(Environment.Builder builder) { + String dockerImageName = "ghcr.io/trinodb/testing/hdp3.1-hive:" + hadoopImagesVersion; // Using hdp3.1 so we are using Hive metastore with version close to versions of hive-*.jars Spark uses - builder.configureContainer(HADOOP, container -> container.setDockerImageName("ghcr.io/trinodb/testing/hdp3.1-hive:" + hadoopImagesVersion)); + builder.configureContainer(HADOOP, container -> { + container.setDockerImageName(dockerImageName); + container.withCopyFileToContainer( + forHostPath(configDir.getPath("apply-hive-config-for-hudi.sh")), + CONTAINER_HADOOP_INIT_D + "/apply-hive-config-for-hudi.sh"); + }); + builder.addConnector( "hive", forHostPath(configDir.getPath("hive.properties")), CONTAINER_TRINO_ETC + "/catalog/hive.properties"); + builder.addConnector( "hudi", forHostPath(configDir.getPath("hudi.properties")), diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hudi/apply-hive-config-for-hudi.sh b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hudi/apply-hive-config-for-hudi.sh new file mode 100755 index 000000000000..6df422e311de --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hudi/apply-hive-config-for-hudi.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -exuo pipefail + +echo "Applying hive-site configuration overrides for Spark" +apply-site-xml-override /etc/hive/conf/hive-site.xml "/docker/presto-product-tests/conf/environment/singlenode-hudi/hive-site-overrides.xml" diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hudi/hive-site-overrides.xml b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hudi/hive-site-overrides.xml new file mode 100644 index 000000000000..b1411b3ab8a9 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hudi/hive-site-overrides.xml @@ -0,0 +1,9 @@ + + + + + hive.metastore.disallow.incompatible.col.type.changes + false + + + diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java index 5f0650ff9b51..f5c4c138e1b0 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java @@ -20,7 +20,11 @@ import org.assertj.core.api.Assertions; import org.testng.annotations.Test; +import java.text.SimpleDateFormat; +import java.time.ZoneOffset; +import java.util.Date; import java.util.List; +import java.util.TimeZone; import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; @@ -38,6 +42,7 @@ public class TestHudiSparkCompatibility { private static final String COW_TABLE_TYPE = "cow"; private static final String MOR_TABLE_TYPE = "mor"; + private static final String DEFAULT_INSTANT_TIME_FORMAT = "yyyyMMddHHmmssSSS"; private String bucketName; @@ -320,6 +325,48 @@ public void testTimelineTableRedirect() } } + @Test(groups = {HUDI, PROFILE_SPECIFIC_TESTS}) + public void testTimeTravelQuery() + { + String tableName = "test_hudi_cow_select_session_props" + randomNameSuffix(); + SimpleDateFormat formatter = new SimpleDateFormat(DEFAULT_INSTANT_TIME_FORMAT); + formatter.setTimeZone(TimeZone.getTimeZone(ZoneOffset.UTC)); + String before = formatter.format(new Date()); + onHudi().executeQuery("SET hoodie.schema.on.read.enable=true"); + createNonPartitionedTable(tableName, COW_TABLE_TYPE); + String after = formatter.format(new Date()); + + try { + assertThat(onTrino().executeQuery("SELECT id, name FROM hudi.default." + tableName + " FOR VERSION AS OF " + before)).hasNoRows(); + assertThat(onTrino().executeQuery("SELECT id, name FROM hudi.default." + tableName + " FOR VERSION AS OF " + after)) + .containsOnly(ImmutableList.of( + row(1, "a1"), + row(2, "a2"))); + + onHudi().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMNS (new_col string)"); + onHudi().executeQuery("INSERT INTO default." + tableName + " VALUES (3, 'a3', 20, 1000, 'a3_commit')"); + String afterAddColumn = formatter.format(new Date()); + assertThat(onTrino().executeQuery("SELECT id, name, new_col FROM hudi.default." + tableName + " FOR VERSION AS OF " + afterAddColumn)) + .containsOnly(ImmutableList.of( + row(1, "a1", null), + row(2, "a2", null), + row(3, "a3", "a3_commit"))); + + onHudi().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMNS (new_col)"); + onHudi().executeQuery("INSERT INTO default." + tableName + " VALUES (4, 'a4', 20, 1000)"); + String afterDropColumn = formatter.format(new Date()); + assertThat(onTrino().executeQuery("SELECT id, name FROM hudi.default." + tableName + " FOR VERSION AS OF " + afterDropColumn)) + .containsOnly(ImmutableList.of( + row(1, "a1"), + row(2, "a2"), + row(3, "a3"), + row(4, "a4"))); + } + finally { + onHudi().executeQuery("DROP TABLE default." + tableName); + } + } + private void createNonPartitionedTable(String tableName, String tableType) { onHudi().executeQuery(format(