Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class TableMetadata
private final Map<String, String> parameters;

private final Optional<HiveStorageFormat> storageFormat;
private final Optional<StorageFormat> originalStorageFormat;
private final Optional<HiveBucketProperty> bucketProperty;
private final Map<String, String> serdeParameters;

Expand All @@ -66,6 +67,7 @@ public TableMetadata(
@JsonProperty("partitionColumns") List<Column> partitionColumns,
@JsonProperty("parameters") Map<String, String> parameters,
@JsonProperty("storageFormat") Optional<HiveStorageFormat> storageFormat,
@JsonProperty("originalStorageFormat") Optional<StorageFormat> originalStorageFormat,
@JsonProperty("bucketProperty") Optional<HiveBucketProperty> bucketProperty,
@JsonProperty("serdeParameters") Map<String, String> serdeParameters,
@JsonProperty("externalLocation") Optional<String> externalLocation,
Expand All @@ -81,6 +83,7 @@ public TableMetadata(
this.parameters = ImmutableMap.copyOf(requireNonNull(parameters, "parameters is null"));

this.storageFormat = requireNonNull(storageFormat, "storageFormat is null");
this.originalStorageFormat = requireNonNull(originalStorageFormat, "originalStorageFormat is null");
this.bucketProperty = requireNonNull(bucketProperty, "bucketProperty is null");
this.serdeParameters = requireNonNull(serdeParameters, "serdeParameters is null");
this.externalLocation = requireNonNull(externalLocation, "externalLocation is null");
Expand Down Expand Up @@ -110,6 +113,12 @@ public TableMetadata(String currentVersion, Table table)
storageFormat = Arrays.stream(HiveStorageFormat.values())
.filter(format -> tableFormat.equals(StorageFormat.fromHiveStorageFormat(format)))
.findFirst();
if (storageFormat.isPresent()) {
originalStorageFormat = Optional.empty();
}
else {
originalStorageFormat = Optional.of(tableFormat);
}
bucketProperty = table.getStorage().getBucketProperty();
serdeParameters = table.getStorage().getSerdeParameters();

Expand Down Expand Up @@ -182,6 +191,12 @@ public Optional<HiveStorageFormat> getStorageFormat()
return storageFormat;
}

@JsonProperty
public Optional<StorageFormat> getOriginalStorageFormat()
{
return originalStorageFormat;
}

@JsonProperty
public Optional<HiveBucketProperty> getBucketProperty()
{
Expand Down Expand Up @@ -228,6 +243,7 @@ public TableMetadata withDataColumns(String currentVersion, List<Column> dataCol
partitionColumns,
parameters,
storageFormat,
originalStorageFormat,
bucketProperty,
serdeParameters,
externalLocation,
Expand All @@ -246,6 +262,7 @@ public TableMetadata withParameters(String currentVersion, Map<String, String> p
partitionColumns,
parameters,
storageFormat,
originalStorageFormat,
bucketProperty,
serdeParameters,
externalLocation,
Expand All @@ -264,6 +281,7 @@ public TableMetadata withColumnStatistics(String currentVersion, Map<String, Hiv
partitionColumns,
parameters,
storageFormat,
originalStorageFormat,
bucketProperty,
serdeParameters,
externalLocation,
Expand All @@ -281,7 +299,9 @@ public Table toTable(String databaseName, String tableName, String location)
tableType,
Storage.builder()
.setLocation(externalLocation.or(() -> Optional.ofNullable(parameters.get(LOCATION_PROPERTY))).orElse(location))
.setStorageFormat(storageFormat.map(StorageFormat::fromHiveStorageFormat).orElse(VIEW_STORAGE_FORMAT))
.setStorageFormat(storageFormat.map(StorageFormat::fromHiveStorageFormat)
.or(() -> originalStorageFormat)
.orElse(VIEW_STORAGE_FORMAT))
.setBucketProperty(bucketProperty)
.setSerdeParameters(serdeParameters)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public final class HiveUtil
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";

// Input formats class names are listed below as String due to hudi-hadoop-mr dependency is not in the context of trino-hive plugin
private static final String HUDI_PARQUET_INPUT_FORMAT = "org.apache.hudi.hadoop.HoodieParquetInputFormat";
public static final String HUDI_PARQUET_INPUT_FORMAT = "org.apache.hudi.hadoop.HoodieParquetInputFormat";
private static final String HUDI_PARQUET_REALTIME_INPUT_FORMAT = "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat";
private static final String HUDI_INPUT_FORMAT = "com.uber.hoodie.hadoop.HoodieInputFormat";
private static final String HUDI_REALTIME_INPUT_FORMAT = "com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.metastore.file;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.plugin.hive.metastore.Table;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveType.HIVE_INT;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.util.HiveUtil.HUDI_PARQUET_INPUT_FORMAT;
import static io.trino.spi.security.PrincipalType.USER;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.nio.file.Files.createTempDirectory;
import static org.assertj.core.api.Assertions.assertThat;

public class TestFileHiveMetastore
{
private Path tmpDir;
private FileHiveMetastore metastore;

@BeforeClass
public void setUp()
throws IOException
{
tmpDir = createTempDirectory(getClass().getSimpleName());

metastore = new FileHiveMetastore(
new NodeVersion("testversion"),
HDFS_ENVIRONMENT,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(tmpDir.toString())
.setDisableLocationChecks(true)
/*.setMetastoreUser("test")*/);

metastore.createDatabase(Database.builder()
.setDatabaseName("default")
.setOwnerName(Optional.of("test"))
.setOwnerType(Optional.of(USER))
.build());
}

@AfterClass(alwaysRun = true)
public void tearDown()
throws IOException
{
deleteRecursively(tmpDir, ALLOW_INSECURE);
metastore = null;
tmpDir = null;
}

@Test
public void testPreserveHudiInputFormat()
{
StorageFormat storageFormat = StorageFormat.create(
ParquetHiveSerDe.class.getName(),
HUDI_PARQUET_INPUT_FORMAT,
MapredParquetOutputFormat.class.getName());

Table table = Table.builder()
.setDatabaseName("default")
.setTableName("some_table_name" + randomNameSuffix())
.setTableType(TableType.EXTERNAL_TABLE.name())
.setOwner(Optional.of("public"))
.addDataColumn(new Column("foo", HIVE_INT, Optional.empty()))
.setParameters(ImmutableMap.of("serialization.format", "1", "EXTERNAL", "TRUE"))
.withStorage(storageBuilder -> storageBuilder
.setStorageFormat(storageFormat)
.setLocation("file:///dev/null"))
.build();

metastore.createTable(table, NO_PRIVILEGES);

Table saved = metastore.getTable(table.getDatabaseName(), table.getTableName()).orElseThrow();

assertThat(saved.getStorage())
.isEqualTo(table.getStorage());

metastore.dropTable(table.getDatabaseName(), table.getTableName(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.hive.util.HiveUtil.isHudiTable;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA;
import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide;
import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.hudi.HudiUtil.hudiMetadataExists;
import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE;
import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
Expand Down Expand Up @@ -100,9 +101,14 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
if (table.isEmpty()) {
return null;
}
if (!isHudiTable(session, table.get())) {
if (!isHudiTable(table.get())) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
}
Location location = Location.of(table.get().getStorage().getLocation());
if (!hudiMetadataExists(fileSystemFactory.create(session), location)) {
throw new TrinoException(HUDI_BAD_DATA, "Location of table %s does not contain Hudi table metadata: %s".formatted(tableName, location));
}

return new HudiTableHandle(
tableName.getSchemaName(),
tableName.getTableName(),
Expand All @@ -112,19 +118,6 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
TupleDomain.all());
}

private boolean isHudiTable(ConnectorSession session, Table table)
{
if (!MANAGED_TABLE.name().equals(table.getTableType()) && !EXTERNAL_TABLE.name().equals(table.getTableType())) {
// Views are not Hudi tables
return false;
}
if (table.getStorage().getOptionalLocation().isEmpty() || table.getStorage().getOptionalLocation().get().isEmpty()) {
// No location or empty location cannot be a valid Hudi table
return false;
}
return HudiUtil.isHudiTable(fileSystemFactory.create(session), Location.of(table.getStorage().getLocation()));
}

@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
Expand All @@ -143,14 +136,18 @@ private Optional<SystemTable> getRawSystemTable(SchemaTableName tableName, Conne
if (tableOptional.isEmpty()) {
return Optional.empty();
}
switch (name.getTableType()) {
case DATA:
break;
case TIMELINE:
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType());
return Optional.of(new TimelineTable(fileSystemFactory.create(session), systemTableName, tableOptional.get()));
if (!isHudiTable(tableOptional.get())) {
return Optional.empty();
}
return Optional.empty();
return switch (name.getTableType()) {
case DATA ->
// TODO (https://github.com/trinodb/trino/issues/17973) remove DATA table type
Optional.empty();
case TIMELINE -> {
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType());
yield Optional.of(new TimelineTable(fileSystemFactory.create(session), systemTableName, tableOptional.get()));
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,17 @@ private static String getFileExtension(String fullName)
return dotIndex == -1 ? "" : fileName.substring(dotIndex);
}

public static boolean isHudiTable(TrinoFileSystem trinoFileSystem, Location baseLocation)
public static boolean hudiMetadataExists(TrinoFileSystem trinoFileSystem, Location baseLocation)
{
try {
Location metaLocation = baseLocation.appendPath(METAFOLDER_NAME);
FileIterator iterator = trinoFileSystem.listFiles(metaLocation);
// If there is at least one file in the .hoodie directory, it's a valid Hudi table
if (!iterator.hasNext()) {
return false;
}
return iterator.hasNext();
}
catch (IOException e) {
throw new TrinoException(HUDI_FILESYSTEM_ERROR, "Failed to check for Hudi table at location: " + baseLocation, e);
}
return true;
}

public static boolean partitionMatchesPredicates(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.hudi.HudiUtil.isHudiTable;
import static io.trino.plugin.hudi.HudiUtil.hudiMetadataExists;
import static java.util.Objects.requireNonNull;

public class HudiTableMetaClient
Expand All @@ -61,7 +61,7 @@ protected HudiTableMetaClient(
{
this.metaPath = requireNonNull(basePath, "basePath is null").appendPath(METAFOLDER_NAME);
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
checkArgument(isHudiTable(fileSystem, basePath), "Could not check if %s is a valid table", basePath);
checkArgument(hudiMetadataExists(fileSystem, basePath), "Could not check if %s is a valid table", basePath);
this.basePath = basePath;

this.tableConfig = new HudiTableConfig(fileSystem, metaPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.HiveStorageFormat;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.PartitionStatistics;
import io.trino.plugin.hive.metastore.Column;
Expand All @@ -29,6 +28,8 @@
import io.trino.plugin.hive.metastore.Table;
import io.trino.testing.QueryRunner;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;

import java.io.File;
import java.io.IOException;
Expand All @@ -50,6 +51,7 @@
import static io.trino.plugin.hive.HiveType.HIVE_INT;
import static io.trino.plugin.hive.HiveType.HIVE_LONG;
import static io.trino.plugin.hive.HiveType.HIVE_STRING;
import static io.trino.plugin.hive.util.HiveUtil.HUDI_PARQUET_INPUT_FORMAT;

public class ResourceHudiTablesInitializer
implements HudiTablesInitializer
Expand Down Expand Up @@ -91,7 +93,10 @@ private void createTable(
List<Column> partitionColumns,
Map<String, String> partitions)
{
StorageFormat storageFormat = StorageFormat.fromHiveStorageFormat(HiveStorageFormat.PARQUET);
StorageFormat storageFormat = StorageFormat.create(
ParquetHiveSerDe.class.getName(),
HUDI_PARQUET_INPUT_FORMAT,
MapredParquetOutputFormat.class.getName());

Table table = Table.builder()
.setDatabaseName(schemaName)
Expand Down
Loading