Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/src/main/sphinx/connector/hudi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,46 @@ The output of the query has the following columns:
- ``varchar``
- Current state of the instant

Rolling back to a previous version
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mosabua Could you please review this docs?

-----------------------------------

Hudi allows us to store multiple versions of data in the table overtime while
providing ``snapshot isolation``. The number of versions are configurable and depends
on how much tradeoff one prefers for space used by those older versions. Hence, as
one can imagine, Hudi has the capabilities to allow to accessing the state of a table
at a certain point/instant in time. This could be done by allowing read mechanisms
from a Hudi table that go back and read the version of the data that was written a
particular point/instant in time.

For example, you could find the version IDs for the ``customer_orders`` table
by running the following query::

SELECT timestamp
FROM example.testdb."customer_orders$timeline"
ORDER BY timestamp DESC

Time travel queries
^^^^^^^^^^^^^^^^^^^

The connector offers the ability to query historical data.
This allows you to query the table as it was when a previous snapshot
of the table was taken, even if the data has since been modified or deleted.

The historical data of the table can be retrieved by specifying the
snapshot identifier corresponding to the version of the table that
needs to be retrieved::

SELECT *
FROM example.testdb.customer_orders FOR VERSION AS OF 8954597067493422955

A different approach of retrieving historical data is to specify
a point in time in the past, such as a day or week ago. The latest snapshot
of the table taken before or at the specified timestamp in the query is
internally used for providing the previous state of the table::

SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 Europe/Vienna'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR contain a test case for FOR TIMESTAMP AS OF TIMESTAMP?


File formats
------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -61,7 +62,9 @@
import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide;
import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.hudi.HudiUtil.getHoodieInstantTime;
import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE;
import static io.trino.spi.connector.PointerType.TARGET_ID;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -98,6 +101,16 @@ public List<String> listSchemaNames(ConnectorSession session)

@Override
public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
return getTableHandle(session, tableName, Optional.empty(), Optional.empty());
}

@Override
public HudiTableHandle getTableHandle(
ConnectorSession session,
SchemaTableName tableName,
Optional<ConnectorTableVersion> startVersion,
Optional<ConnectorTableVersion> endVersion)
{
if (isHiveSystemSchema(tableName.getSchemaName())) {
return null;
Expand All @@ -109,11 +122,21 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
if (!isHudiTable(session, table.get())) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
}
if (endVersion.isPresent() && endVersion.get().getPointerType() == TARGET_ID) {
startVersion = endVersion;
}
List<HiveColumnHandle> dataColumns = hiveColumnHandles(table.get(), typeManager, NANOSECONDS).stream()
.collect(toImmutableList());

return new HudiTableHandle(
tableName.getSchemaName(),
tableName.getTableName(),
table.get().getStorage().getLocation(),
HoodieTableType.COPY_ON_WRITE,
ImmutableList.of(),
dataColumns,
getHoodieInstantTime(startVersion),
getHoodieInstantTime(endVersion),
TupleDomain.all(),
TupleDomain.all());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import org.apache.hudi.common.model.HoodieTableType;

import java.util.List;
import java.util.Optional;

import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.util.Objects.requireNonNull;

Expand All @@ -31,6 +35,13 @@ public class HudiTableHandle
private final String tableName;
private final String basePath;
private final HoodieTableType tableType;

private final List<HiveColumnHandle> dataColumns;

private final List<HiveColumnHandle> partitionColumns;
private final Optional<String> startVersion;
private final Optional<String> endVersion;

private final TupleDomain<HiveColumnHandle> partitionPredicates;
private final TupleDomain<HiveColumnHandle> regularPredicates;

Expand All @@ -40,13 +51,21 @@ public HudiTableHandle(
@JsonProperty("tableName") String tableName,
@JsonProperty("basePath") String basePath,
@JsonProperty("tableType") HoodieTableType tableType,
@JsonProperty("partitionColumns") List<HiveColumnHandle> partitionColumns,
@JsonProperty("dataColumns") List<HiveColumnHandle> dataColumns,
@JsonProperty("startVersion") Optional<String> startVersion,
@JsonProperty("endVersion") Optional<String> endVersion,
@JsonProperty("partitionPredicates") TupleDomain<HiveColumnHandle> partitionPredicates,
@JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle> regularPredicates)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.basePath = requireNonNull(basePath, "basePath is null");
this.tableType = requireNonNull(tableType, "tableType is null");
this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null"));
this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null"));
this.startVersion = requireNonNull(startVersion, "startVersion is null");
this.endVersion = requireNonNull(endVersion, "endVersion is null");
this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null");
this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null");
}
Expand Down Expand Up @@ -87,11 +106,36 @@ public TupleDomain<HiveColumnHandle> getRegularPredicates()
return regularPredicates;
}

@JsonProperty
public Optional<String> getStartVersion()
{
return startVersion;
}

@JsonProperty
public Optional<String> getEndVersion()
{
return endVersion;
}

@JsonProperty
public SchemaTableName getSchemaTableName()
{
return schemaTableName(schemaName, tableName);
}

@JsonProperty
public List<HiveColumnHandle> getDataColumns()
{
return dataColumns;
}

@JsonProperty
public List<HiveColumnHandle> getPartitionColumns()
{
return partitionColumns;
}

HudiTableHandle applyPredicates(
TupleDomain<HiveColumnHandle> partitionTupleDomain,
TupleDomain<HiveColumnHandle> regularTupleDomain)
Expand All @@ -101,6 +145,10 @@ HudiTableHandle applyPredicates(
tableName,
basePath,
tableType,
partitionColumns,
dataColumns,
startVersion,
endVersion,
partitionPredicates.intersect(partitionTupleDomain),
regularPredicates.intersect(regularTupleDomain));
}
Expand Down
47 changes: 47 additions & 0 deletions plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import io.trino.plugin.hive.metastore.Column;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -38,18 +41,28 @@
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;

import java.io.IOException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;

import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.util.HiveUtil.checkCondition;
import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static java.util.stream.Collectors.toList;

public final class HudiUtil
{
public static final String DEFAULT_INSTANT_TIME_FORMAT = "yyyyMMddHHmmssSSS";

private HudiUtil() {}

public static boolean isHudiParquetInputFormat(InputFormat<?, ?> inputFormat)
Expand Down Expand Up @@ -171,4 +184,38 @@ public static FileStatus getFileStatus(HoodieBaseFile baseFile)
throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error getting file status of " + baseFile.getPath(), e);
}
}

public static Optional<String> getHoodieInstantTime(Optional<ConnectorTableVersion> optionalVersion)
{
if (optionalVersion.isPresent()) {
ConnectorTableVersion version = optionalVersion.get();
SimpleDateFormat formatter = new SimpleDateFormat(DEFAULT_INSTANT_TIME_FORMAT);
formatter.setTimeZone(TimeZone.getTimeZone(ZoneOffset.UTC));
io.trino.spi.type.Type versionType = version.getVersionType();
switch (version.getPointerType()) {
case TEMPORAL:
long epochMillis;
if (versionType instanceof TimestampWithTimeZoneType) {
epochMillis = ((TimestampWithTimeZoneType) versionType).isShort()
? unpackMillisUtc((long) version.getVersion())
: ((LongTimestampWithTimeZone) version.getVersion()).getEpochMillis();
}
else {
throw new TrinoException(NOT_SUPPORTED, "Unsupported type for temporal table version: " + versionType.getDisplayName());
}
return Optional.of(formatter.format(new Timestamp(epochMillis)));
case TARGET_ID:
if (versionType != BIGINT) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported type for table version: " + versionType.getDisplayName());
}
String instantTime = String.valueOf(version.getVersion());

return Optional.of(instantTime);
}
throw new TrinoException(NOT_SUPPORTED, "Version pointer type is not supported: " + version.getPointerType());
}
else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -101,6 +102,15 @@ public List<HudiPartitionInfo> getPartitionsToScan()
@Override
public List<FileStatus> listStatus(HudiPartitionInfo partitionInfo)
{
if (tableHandle.getEndVersion().isPresent()) {
return fileSystemView.getAllFileGroups(partitionInfo.getRelativePartitionPath())
.map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(tableHandle.getEndVersion().get()))
.filter(Option::isPresent)
.map(fileSliceOption -> fileSliceOption.get().getBaseFile())
.filter(Option::isPresent)
.map(baseFileOption -> getFileStatus(baseFileOption.get()))
.collect(toImmutableList());
}
return fileSystemView.getLatestBaseFiles(partitionInfo.getRelativePartitionPath())
.map(baseFile -> getFileStatus(baseFile))
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,14 @@ protected static String columnsToHide()
columns.add(TpchHudiTablesInitializer.FIELD_UUID);
return String.join(",", columns);
}

@Override
Comment thread
albericgenius marked this conversation as resolved.
Outdated
protected void verifyVersionedQueryFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageMatching("Version pointer type is not supported: .*|" +
"Unsupported type for temporal table version: .*|" +
"Unsupported type for table version: .*|" +
"No version history table tpch.nation at or before .*");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.nio.file.Files;
Expand Down Expand Up @@ -165,6 +167,31 @@ public void testPartitionColumn()
assertQueryFails("SELECT \"$partition\" FROM " + HUDI_NON_PART_COW, ".* Column '\\$partition' cannot be resolved");
}

@Test(dataProvider = "timeTravelQueryDataProvider")
public void testTimeTravelQuery(@Language("SQL") String sql, String expected)
{
assertQuery(sql, expected);
}

@DataProvider(name = "timeTravelQueryDataProvider")
public Object[][] timeTravelQueryDataProvider()
{
return new Object[][] {
{"SELECT count(_hoodie_commit_time) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-16 07:14:53.74700 Z'", "VALUES 99"},
{"SELECT count(1) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-16 07:14:53.74700 Z'", "VALUES 99"},
{"SELECT count(*) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-16 07:14:53.74700 Z'", "VALUES 99"},
{"SELECT count(_hoodie_commit_time) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216071453747", "VALUES 99"},
{"SELECT count(1) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216071453747", "VALUES 99"},
{"SELECT count(*) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216071453747", "VALUES 99"},
{"SELECT count(_hoodie_commit_time) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216009999999", "VALUES 0"},
{"SELECT count(1) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216009999999", "VALUES 0"},
{"SELECT count(*) FROM STOCK_TICKS_COW FOR VERSION AS OF 20211216009999999", "VALUES 0"},
{"SELECT count(_hoodie_commit_time) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-15 16:00:00.000000000 Z'", "VALUES 0"},
{"SELECT count(1) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-15 16:00:00.000000000 Z'", "VALUES 0"},
{"SELECT count(*) FROM STOCK_TICKS_COW FOR TIMESTAMP AS OF TIMESTAMP '2021-12-15 16:00:00.000000000 Z'", "VALUES 0"},
};
}

private static Path toPath(String path)
{
// Remove leading 'file:' because $path column returns 'file:/path-to-file' in case of local file system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_HADOOP_INIT_D;
import static io.trino.tests.product.launcher.env.common.Minio.MINIO_CONTAINER_NAME;
import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -81,12 +82,20 @@ public EnvSinglenodeHudi(
@Override
public void extendEnvironment(Environment.Builder builder)
{
String dockerImageName = "ghcr.io/trinodb/testing/hdp3.1-hive:" + hadoopImagesVersion;
// Using hdp3.1 so we are using Hive metastore with version close to versions of hive-*.jars Spark uses
builder.configureContainer(HADOOP, container -> container.setDockerImageName("ghcr.io/trinodb/testing/hdp3.1-hive:" + hadoopImagesVersion));
builder.configureContainer(HADOOP, container -> {
container.setDockerImageName(dockerImageName);
container.withCopyFileToContainer(
forHostPath(configDir.getPath("apply-hive-config-for-hudi.sh")),
CONTAINER_HADOOP_INIT_D + "/apply-hive-config-for-hudi.sh");
});

builder.addConnector(
"hive",
forHostPath(configDir.getPath("hive.properties")),
CONTAINER_TRINO_ETC + "/catalog/hive.properties");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Revert unrelated change.

builder.addConnector(
"hudi",
forHostPath(configDir.getPath("hudi.properties")),
Expand Down
Loading