diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml index 850104555cfe..4084ab27374f 100644 --- a/plugin/trino-hudi/pom.xml +++ b/plugin/trino-hudi/pom.xml @@ -49,11 +49,6 @@ trino-plugin-toolkit - - io.trino.hadoop - hadoop-apache - - io.trino.hive hive-apache @@ -94,6 +89,11 @@ units + + com.fasterxml.jackson.core + jackson-databind + + com.google.code.findbugs jsr305 @@ -110,6 +110,12 @@ guice + + com.linkedin.calcite + calcite-core + shaded + + javax.annotation javax.annotation-api @@ -131,91 +137,8 @@ - org.apache.hudi - hudi-common - ${dep.hudi.version} - - - org.apache.hbase - hbase-server - - - org.apache.hbase - hbase-client - - - org.osgi - org.osgi.core - - - org.apache.orc - orc-core - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-databind - - - org.apache.httpcomponents - httpclient - - - org.apache.httpcomponents - fluent-hc - - - org.rocksdb - rocksdbjni - - - com.esotericsoftware - kryo-shaded - - - org.apache.hadoop - hadoop-client - - - org.apache.hadoop - hadoop-hdfs - - - org.apache.httpcomponents - httpcore - - - org.apache.hive - hive-exec - - - org.apache.hive - hive-jdbc - - - com.github.ben-manes.caffeine - caffeine - - - org.lz4 - lz4-java - - - - - - org.apache.hudi - hudi-hadoop-mr - ${dep.hudi.version} - - - * - * - - + org.apache.avro + avro @@ -231,14 +154,14 @@ - io.airlift - log-manager + io.trino.hadoop + hadoop-apache runtime - org.apache.avro - avro + io.airlift + log-manager runtime @@ -424,6 +347,19 @@ + + org.apache.hudi + hudi-common + ${dep.hudi.version} + test + + + * + * + + + + org.apache.hudi hudi-java-client diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java index e2eca0c4e61e..68e50edba22f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java @@ -30,7 +30,8 @@ public enum HudiErrorCode HUDI_UNSUPPORTED_FILE_FORMAT(5, EXTERNAL), HUDI_CURSOR_ERROR(6, EXTERNAL), HUDI_FILESYSTEM_ERROR(7, EXTERNAL), - HUDI_PARTITION_NOT_FOUND(8, EXTERNAL); + HUDI_PARTITION_NOT_FOUND(8, EXTERNAL), + HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL); private final ErrorCode errorCode; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFileStatus.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFileStatus.java index ded7389110cf..56d585db8772 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFileStatus.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiFileStatus.java @@ -13,14 +13,14 @@ */ package io.trino.plugin.hudi; -import org.apache.hadoop.fs.Path; +import io.trino.filesystem.Location; import static java.util.Objects.requireNonNull; -public record HudiFileStatus(Path path, boolean isDirectory, long length, long modificationTime, long blockSize) +public record HudiFileStatus(Location location, boolean isDirectory, long length, long modificationTime, long blockSize) { public HudiFileStatus { - requireNonNull(path, "path is null"); + requireNonNull(location, "location is null"); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index 32f953d10aa7..43520a67157b 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -16,11 +16,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; -import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.metastore.Column; @@ -43,7 +40,6 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -60,18 +56,17 @@ import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; -import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide; import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY; -import static io.trino.plugin.hudi.model.HoodieTableType.COPY_ON_WRITE; +import static io.trino.plugin.hudi.HudiUtil.isHudiTable; +import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE; import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE; import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.lang.String.format; import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; -import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; public class HudiMetadata implements ConnectorMetadata @@ -79,14 +74,12 @@ public class HudiMetadata public static final Logger log = Logger.get(HudiMetadata.class); private final HiveMetastore metastore; - private final HdfsEnvironment hdfsEnvironment; private final TrinoFileSystemFactory fileSystemFactory; private final TypeManager typeManager; - public HudiMetadata(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager) + public HudiMetadata(HiveMetastore metastore, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager) { this.metastore = requireNonNull(metastore, "metastore is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @@ -109,7 +102,7 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName if (table.isEmpty()) { return null; } - if (!isHudiTable(session, table.get())) { + if (!isHudiTable(fileSystemFactory.create(session), Location.of(table.get().getStorage().getLocation()))) { throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName)); } return new HudiTableHandle( @@ -124,11 +117,11 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName @Override public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) { - return getRawSystemTable(tableName) + return getRawSystemTable(tableName, session) .map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader())); } - private Optional getRawSystemTable(SchemaTableName tableName) + private Optional getRawSystemTable(SchemaTableName tableName, ConnectorSession session) { HudiTableName name = HudiTableName.from(tableName.getTableName()); if (name.getTableType() == TableType.DATA) { @@ -144,7 +137,7 @@ private Optional getRawSystemTable(SchemaTableName tableName) break; case TIMELINE: SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType()); - return Optional.of(new TimelineTable(hdfsEnvironment, systemTableName, tableOptional.get())); + return Optional.of(new TimelineTable(fileSystemFactory.create(session), systemTableName, tableOptional.get())); } return Optional.empty(); } @@ -227,31 +220,6 @@ HiveMetastore getMetastore() return metastore; } - private boolean isHudiTable(ConnectorSession session, Table table) - { - String basePath = table.getStorage().getLocation(); - try { - Location baseLocation = Location.of(basePath); - Location metaLocation = baseLocation.appendPath(METAFOLDER_NAME); - - TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); - FileIterator iterator = trinoFileSystem.listFiles(metaLocation); - // If there is at least one file in the .hoodie directory, it's a valid Hudi table - if (!iterator.hasNext()) { - log.warn("Could not find Hudi table at path '%s'.", basePath); - return false; - } - } - catch (IllegalArgumentException e) { - log.warn("Could not find Hudi table at path '%s'. Error: %s", basePath, e.getMessage()); - return false; - } - catch (IOException e) { - throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Could not check if %s is a valid table", basePath), e); - } - return true; - } - private Optional getTableColumnMetadata(ConnectorSession session, SchemaTableName table) { try { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java index 6ec7ec71564e..7f855394a4c9 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java @@ -14,7 +14,6 @@ package io.trino.plugin.hudi; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; import io.trino.spi.security.ConnectorIdentity; @@ -29,15 +28,13 @@ public class HudiMetadataFactory { private final HiveMetastoreFactory metastoreFactory; - private final HdfsEnvironment hdfsEnvironment; private final TrinoFileSystemFactory fileSystemFactory; private final TypeManager typeManager; @Inject - public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, HdfsEnvironment hdfsEnvironment, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager) + public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager) { this.metastoreFactory = requireNonNull(metastoreFactory, "metastore is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @@ -45,6 +42,6 @@ public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, HdfsEnvironmen public HudiMetadata create(ConnectorIdentity identity) { HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(identity)); - return new HudiMetadata(metastore, hdfsEnvironment, fileSystemFactory, typeManager); + return new HudiMetadata(metastore, fileSystemFactory, typeManager); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 43cd48d70f07..beab6f5bdd54 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -31,7 +31,7 @@ import io.trino.plugin.hive.ReaderColumns; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; -import io.trino.plugin.hudi.model.HoodieFileFormat; +import io.trino.plugin.hudi.model.HudiFileFormat; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.connector.ColumnHandle; @@ -146,9 +146,9 @@ public ConnectorPageSource createPageSource( DynamicFilter dynamicFilter) { HudiSplit split = (HudiSplit) connectorSplit; - String path = split.getPath(); - HoodieFileFormat hudiFileFormat = getHudiFileFormat(path); - if (!HoodieFileFormat.PARQUET.equals(hudiFileFormat)) { + String path = split.getLocation(); + HudiFileFormat hudiFileFormat = getHudiFileFormat(path); + if (!HudiFileFormat.PARQUET.equals(hudiFileFormat)) { throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat)); } @@ -185,7 +185,7 @@ private static ConnectorPageSource createPageSource( { ParquetDataSource dataSource = null; boolean useColumnNames = shouldUseParquetColumnNames(session); - String path = hudiSplit.getPath(); + String path = hudiSplit.getLocation(); long start = hudiSplit.getStart(); long length = hudiSplit.getLength(); try { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java index 0081e35fa7d1..5c752221e500 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java @@ -33,7 +33,7 @@ public class HudiSplit implements ConnectorSplit { - private final String path; + private final String location; private final long start; private final long length; private final long fileSize; @@ -45,7 +45,7 @@ public class HudiSplit @JsonCreator public HudiSplit( - @JsonProperty("path") String path, + @JsonProperty("location") String location, @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("fileSize") long fileSize, @@ -59,7 +59,7 @@ public HudiSplit( checkArgument(length >= 0, "length must be positive"); checkArgument(start + length <= fileSize, "fileSize must be at least start + length"); - this.path = requireNonNull(path, "path is null"); + this.location = requireNonNull(location, "location is null"); this.start = start; this.length = length; this.fileSize = fileSize; @@ -87,7 +87,7 @@ public List getAddresses() public Object getInfo() { return ImmutableMap.builder() - .put("path", path) + .put("location", location) .put("start", start) .put("length", length) .put("fileSize", fileSize) @@ -103,9 +103,9 @@ public SplitWeight getSplitWeight() } @JsonProperty - public String getPath() + public String getLocation() { - return path; + return location; } @JsonProperty @@ -148,7 +148,7 @@ public List getPartitionKeys() public String toString() { return toStringHelper(this) - .addValue(path) + .addValue(location) .addValue(start) .addValue(length) .addValue(fileSize) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index 3b1006e2c3e4..c9a25c3db15f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -13,7 +13,7 @@ */ package io.trino.plugin.hudi; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveTransactionHandle; @@ -46,7 +46,7 @@ public class HudiSplitManager { private final HudiTransactionManager transactionManager; private final BiFunction metastoreProvider; - private final HdfsEnvironment hdfsEnvironment; + private final TrinoFileSystemFactory fileSystemFactory; private final ExecutorService executor; private final int maxSplitsPerSecond; private final int maxOutstandingSplits; @@ -55,14 +55,14 @@ public class HudiSplitManager public HudiSplitManager( HudiTransactionManager transactionManager, BiFunction metastoreProvider, - HdfsEnvironment hdfsEnvironment, @ForHudiSplitManager ExecutorService executor, + TrinoFileSystemFactory fileSystemFactory, HudiConfig hudiConfig) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.metastoreProvider = requireNonNull(metastoreProvider, "metastoreProvider is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.executor = requireNonNull(executor, "executor is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.maxSplitsPerSecond = requireNonNull(hudiConfig, "hudiConfig is null").getMaxSplitsPerSecond(); this.maxOutstandingSplits = hudiConfig.getMaxOutstandingSplits(); } @@ -95,7 +95,7 @@ public ConnectorSplitSource getSplits( metastore, table, hudiTableHandle, - hdfsEnvironment, + fileSystemFactory, partitionColumnHandles, executor, maxSplitsPerSecond, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index 6b367071c984..edeb4c2f0403 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -15,8 +15,7 @@ import com.google.common.util.concurrent.Futures; import io.airlift.units.DataSize; -import io.trino.hdfs.HdfsContext; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.Table; @@ -27,15 +26,11 @@ import io.trino.plugin.hudi.split.HudiBackgroundSplitLoader; import io.trino.plugin.hudi.split.HudiSplitWeightProvider; import io.trino.plugin.hudi.split.SizeBasedSplitWeightProvider; +import io.trino.plugin.hudi.table.HudiTableMetaClient; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.engine.HoodieLocalEngineContext; -import org.apache.hudi.common.table.HoodieTableMetaClient; import java.util.List; import java.util.Map; @@ -47,7 +42,6 @@ import static io.airlift.concurrent.MoreFutures.toCompletableFuture; import static io.trino.plugin.hudi.HudiSessionProperties.getMinimumAssignedSplitWeight; import static io.trino.plugin.hudi.HudiSessionProperties.getStandardSplitWeightSize; -import static io.trino.plugin.hudi.HudiSessionProperties.isHudiMetadataEnabled; import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled; import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -64,24 +58,17 @@ public HudiSplitSource( HiveMetastore metastore, Table table, HudiTableHandle tableHandle, - HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, Map partitionColumnHandleMap, ExecutorService executor, int maxSplitsPerSecond, int maxOutstandingSplits) { - boolean metadataEnabled = isHudiMetadataEnabled(session); - HoodieTableMetaClient metaClient = buildTableMetaClient(hdfsEnvironment, session, tableHandle.getBasePath()); - HoodieEngineContext engineContext = new HoodieLocalEngineContext(hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(tableHandle.getBasePath()))); - HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() - .enable(metadataEnabled) - .build(); + HudiTableMetaClient metaClient = buildTableMetaClient(fileSystemFactory.create(session), tableHandle.getBasePath()); List partitionColumnHandles = table.getPartitionColumns().stream() .map(column -> partitionColumnHandleMap.get(column.getName())).collect(toList()); HudiDirectoryLister hudiDirectoryLister = new HudiReadOptimizedDirectoryLister( - metadataConfig, - engineContext, tableHandle, metaClient, metastore, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java index 54f6dc38ae6c..0da9f2d897a7 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java @@ -16,7 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.plugin.hive.HiveColumnHandle; -import io.trino.plugin.hudi.model.HoodieTableType; +import io.trino.plugin.hudi.model.HudiTableType; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; @@ -30,7 +30,7 @@ public class HudiTableHandle private final String schemaName; private final String tableName; private final String basePath; - private final HoodieTableType tableType; + private final HudiTableType tableType; private final TupleDomain partitionPredicates; private final TupleDomain regularPredicates; @@ -39,7 +39,7 @@ public HudiTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("basePath") String basePath, - @JsonProperty("tableType") HoodieTableType tableType, + @JsonProperty("tableType") HudiTableType tableType, @JsonProperty("partitionPredicates") TupleDomain partitionPredicates, @JsonProperty("regularPredicates") TupleDomain regularPredicates) { @@ -70,7 +70,7 @@ public String getBasePath() } @JsonProperty - public HoodieTableType getTableType() + public HudiTableType getTableType() { return tableType; } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java index 5a94618a2b4b..88ef46c2704c 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java @@ -15,53 +15,54 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; -import io.trino.hdfs.HdfsContext; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartition; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hive.HivePartitionManager; import io.trino.plugin.hive.metastore.Column; -import io.trino.plugin.hudi.model.HoodieFileFormat; +import io.trino.plugin.hudi.model.HudiFileFormat; +import io.trino.plugin.hudi.table.HudiTableMetaClient; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; -import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.table.HoodieTableMetaClient; +import java.io.IOException; import java.util.List; import java.util.Map; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.util.HiveUtil.checkCondition; import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT; +import static io.trino.plugin.hudi.table.HudiTableMetaClient.METAFOLDER_NAME; import static java.util.stream.Collectors.toList; public final class HudiUtil { private HudiUtil() {} - public static HoodieFileFormat getHudiFileFormat(String path) + public static HudiFileFormat getHudiFileFormat(String path) { String extension = getFileExtension(path); - if (extension.equals(HoodieFileFormat.PARQUET.getFileExtension())) { - return HoodieFileFormat.PARQUET; + if (extension.equals(HudiFileFormat.PARQUET.getFileExtension())) { + return HudiFileFormat.PARQUET; } - if (extension.equals(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { - return HoodieFileFormat.HOODIE_LOG; + if (extension.equals(HudiFileFormat.HOODIE_LOG.getFileExtension())) { + return HudiFileFormat.HOODIE_LOG; } - if (extension.equals(HoodieFileFormat.ORC.getFileExtension())) { - return HoodieFileFormat.ORC; + if (extension.equals(HudiFileFormat.ORC.getFileExtension())) { + return HudiFileFormat.ORC; } - if (extension.equals(HoodieFileFormat.HFILE.getFileExtension())) { - return HoodieFileFormat.HFILE; + if (extension.equals(HudiFileFormat.HFILE.getFileExtension())) { + return HudiFileFormat.HFILE; } throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, "Hoodie InputFormat not implemented for base file of type " + extension); } @@ -73,6 +74,22 @@ private static String getFileExtension(String fullName) return dotIndex == -1 ? "" : fileName.substring(dotIndex); } + public static boolean isHudiTable(TrinoFileSystem trinoFileSystem, Location baseLocation) + { + try { + Location metaLocation = baseLocation.appendPath(METAFOLDER_NAME); + FileIterator iterator = trinoFileSystem.listFiles(metaLocation); + // If there is at least one file in the .hoodie directory, it's a valid Hudi table + if (!iterator.hasNext()) { + return false; + } + } + catch (IOException e) { + throw new TrinoException(HUDI_FILESYSTEM_ERROR, "Failed to check for Hudi table at location: " + baseLocation, e); + } + return true; + } + public static boolean partitionMatchesPredicates( SchemaTableName tableName, String hivePartitionName, @@ -149,11 +166,13 @@ public static List buildPartitionKeys(List keys, List< return partitionKeys.build(); } - public static HoodieTableMetaClient buildTableMetaClient(HdfsEnvironment hdfsEnvironment, ConnectorSession session, String basePath) + public static HudiTableMetaClient buildTableMetaClient( + TrinoFileSystem fileSystem, + String basePath) { - HoodieTableMetaClient client = HoodieTableMetaClient.builder().setConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(basePath))).setBasePath(basePath).build(); - // Do not load the bootstrap index, will not read bootstrap base data or a mapping index defined - client.getTableConfig().setValue("hoodie.bootstrap.index.enable", "false"); - return client; + return HudiTableMetaClient.builder() + .setTrinoFileSystem(fileSystem) + .setBasePath(Location.of(basePath)) + .build(); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java index 42a005a0a476..da6a56e1ff0b 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java @@ -14,8 +14,10 @@ package io.trino.plugin.hudi; import com.google.common.collect.ImmutableList; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hudi.model.HudiInstant; +import io.trino.plugin.hudi.table.HudiTableMetaClient; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableMetadata; @@ -26,8 +28,6 @@ import io.trino.spi.connector.SystemTable; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; import java.util.ArrayList; import java.util.List; @@ -43,10 +43,10 @@ public class TimelineTable { private final ConnectorTableMetadata tableMetadata; private final List types; - private final HdfsEnvironment hdfsEnvironment; + private final TrinoFileSystem fileSystem; private final String location; - public TimelineTable(HdfsEnvironment hdfsEnvironment, SchemaTableName tableName, Table hudiTable) + public TimelineTable(TrinoFileSystem fileSystem, SchemaTableName tableName, Table hudiTable) { this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), ImmutableList.builder() @@ -55,7 +55,7 @@ public TimelineTable(HdfsEnvironment hdfsEnvironment, SchemaTableName tableName, .add(new ColumnMetadata("state", VARCHAR)) .build()); this.types = tableMetadata.getColumns().stream().map(ColumnMetadata::getType).collect(toImmutableList()); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.location = requireNonNull(hudiTable.getStorage().getLocation(), "location is null"); } @@ -74,12 +74,12 @@ public ConnectorTableMetadata getTableMetadata() @Override public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { - HoodieTableMetaClient metaClient = buildTableMetaClient(hdfsEnvironment, session, location); + HudiTableMetaClient metaClient = buildTableMetaClient(fileSystem, location); Iterable> records = () -> metaClient.getCommitsTimeline().getInstants().map(this::getRecord).iterator(); return new InMemoryRecordSet(types, records).cursor(); } - private List getRecord(HoodieInstant hudiInstant) + private List getRecord(HudiInstant hudiInstant) { List columns = new ArrayList<>(); columns.add(hudiInstant.getTimestamp()); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/CompactionOperation.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/CompactionOperation.java new file mode 100644 index 000000000000..c73511f5063f --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/CompactionOperation.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.compaction; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.Location; +import io.trino.plugin.hudi.files.HudiFileGroupId; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static io.trino.plugin.hudi.files.FSUtils.getCommitTime; +import static java.util.Objects.requireNonNull; + +public class CompactionOperation +{ + private String baseInstantTime; + private Optional dataFileCommitTime; + private List deltaFileNames; + private Optional dataFileName; + private HudiFileGroupId id; + private Map metrics; + private Optional bootstrapFilePath; + + public CompactionOperation( + String baseInstantTime, + Optional dataFileCommitTime, + List deltaFileNames, + Optional dataFileName, + HudiFileGroupId id, + Map metrics, + Optional bootstrapFilePath) + { + this.baseInstantTime = requireNonNull(baseInstantTime, "baseInstantTime is null"); + this.dataFileCommitTime = requireNonNull(dataFileCommitTime, "dataFileCommitTime is null"); + this.deltaFileNames = requireNonNull(deltaFileNames, "deltaFileNames is null"); + this.dataFileName = requireNonNull(dataFileName, "dataFileName is null"); + this.id = requireNonNull(id, "id is null"); + this.metrics = requireNonNull(metrics, "metrics is null"); + this.bootstrapFilePath = requireNonNull(bootstrapFilePath, "bootstrapFilePath is null"); + } + + public String getFileId() + { + return id.getFileId(); + } + + public String getPartitionPath() + { + return id.getPartitionPath(); + } + + public HudiFileGroupId getFileGroupId() + { + return id; + } + + public static CompactionOperation convertFromAvroRecordInstance(HudiCompactionOperation operation) + { + Optional dataFileName = Optional.ofNullable(operation.getDataFilePath()); + return new CompactionOperation( + operation.getBaseInstantTime(), + dataFileName.map(path -> getCommitTime(Location.of(path).fileName())), + ImmutableList.copyOf(operation.getDeltaFilePaths()), + dataFileName, + new HudiFileGroupId(operation.getPartitionPath(), operation.getFileId()), + operation.getMetrics() == null ? ImmutableMap.of() : ImmutableMap.copyOf(operation.getMetrics()), + Optional.ofNullable(operation.getBootstrapFilePath())); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("baseInstantTime", baseInstantTime) + .add("dataFileCommitTime", dataFileCommitTime) + .add("deltaFileNames", deltaFileNames) + .add("dataFileName", dataFileName) + .add("id", id) + .add("metrics", metrics) + .add("bootstrapFilePath", bootstrapFilePath) + .toString(); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/HudiCompactionOperation.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/HudiCompactionOperation.java new file mode 100644 index 000000000000..e744e9d80972 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/HudiCompactionOperation.java @@ -0,0 +1,235 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.compaction; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Parser; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; + +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class HudiCompactionOperation + extends SpecificRecordBase + implements SpecificRecord +{ + private static final Schema SCHEMA = new Parser().parse("{\"type\":\"record\",\"name\":\"HoodieCompactionOperation\",\"namespace\":\"org.apache.hudi.avro.model\",\"fields\":[{\"name\":\"baseInstantTime\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"deltaFilePaths\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}],\"default\":null},{\"name\":\"dataFilePath\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"fileId\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"partitionPath\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"metrics\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"double\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"bootstrapFilePath\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}"); + private static final SpecificData MODEL = new SpecificData(); + + private String baseInstantTime; + private List deltaFilePaths; + private String dataFilePath; + private String fileId; + private String partitionPath; + private Map metrics; + private String bootstrapFilePath; + + public HudiCompactionOperation() {} + + public HudiCompactionOperation( + String baseInstantTime, + List deltaFilePaths, + String dataFilePath, + String fileId, + String partitionPath, + Map metrics, + String bootstrapFilePath) + { + this.baseInstantTime = requireNonNull(baseInstantTime, "baseInstantTime is null"); + this.deltaFilePaths = requireNonNull(deltaFilePaths, "deltaFilePaths is null"); + this.dataFilePath = requireNonNull(dataFilePath, "dataFilePath is null"); + this.fileId = requireNonNull(fileId, "fileId is null"); + this.partitionPath = requireNonNull(partitionPath, "partitionPath is null"); + this.metrics = requireNonNull(metrics, "metrics is null"); + this.bootstrapFilePath = requireNonNull(bootstrapFilePath, "bootstrapFilePath is null"); + } + + @Override + public SpecificData getSpecificData() + { + return MODEL; + } + + @Override + public Schema getSchema() + { + return SCHEMA; + } + + // Used by DatumWriter. Applications should not call. + @Override + public Object get(int field) + { + return switch (field) { + case 0: + yield baseInstantTime; + case 1: + yield deltaFilePaths; + case 2: + yield dataFilePath; + case 3: + yield fileId; + case 4: + yield partitionPath; + case 5: + yield metrics; + case 6: + yield bootstrapFilePath; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field); + }; + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value = "unchecked") + public void put(int field, Object value) + { + switch (field) { + case 0: + baseInstantTime = value != null ? value.toString() : null; + break; + case 1: + deltaFilePaths = (List) value; + break; + case 2: + dataFilePath = value != null ? value.toString() : null; + break; + case 3: + fileId = value != null ? value.toString() : null; + break; + case 4: + partitionPath = value != null ? value.toString() : null; + break; + case 5: + metrics = (Map) value; + break; + case 6: + bootstrapFilePath = value != null ? value.toString() : null; + break; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field); + } + } + + public String getBaseInstantTime() + { + return baseInstantTime; + } + + public List getDeltaFilePaths() + { + return deltaFilePaths; + } + + public String getDataFilePath() + { + return dataFilePath; + } + + public String getFileId() + { + return fileId; + } + + public String getPartitionPath() + { + return partitionPath; + } + + public Map getMetrics() + { + return metrics; + } + + public String getBootstrapFilePath() + { + return bootstrapFilePath; + } + + public static HudiCompactionOperation.Builder newBuilder() + { + return new HudiCompactionOperation.Builder(); + } + + public static class Builder + { + private String baseInstantTime; + private List deltaFilePaths; + private String dataFilePath; + private String fileId; + private String partitionPath; + private Map metrics; + private String bootstrapFilePath; + + private Builder() + { + } + + public HudiCompactionOperation.Builder setBaseInstantTime(String baseInstantTime) + { + this.baseInstantTime = baseInstantTime; + return this; + } + + public HudiCompactionOperation.Builder setDeltaFilePaths(List deltaFilePaths) + { + this.deltaFilePaths = ImmutableList.copyOf(deltaFilePaths); + return this; + } + + public HudiCompactionOperation.Builder setDataFilePath(String dataFilePath) + { + this.dataFilePath = dataFilePath; + return this; + } + + public HudiCompactionOperation.Builder setFileId(String fileId) + { + this.fileId = fileId; + return this; + } + + public HudiCompactionOperation.Builder setPartitionPath(String partitionPath) + { + this.partitionPath = partitionPath; + return this; + } + + public HudiCompactionOperation.Builder setMetrics(Map metrics) + { + this.metrics = ImmutableMap.copyOf(metrics); + return this; + } + + @SuppressWarnings("unchecked") + public HudiCompactionOperation build() + { + return new HudiCompactionOperation( + baseInstantTime, + deltaFilePaths, + dataFilePath, + fileId, + partitionPath, + metrics, + bootstrapFilePath); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/HudiCompactionPlan.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/HudiCompactionPlan.java new file mode 100644 index 000000000000..3d39a2210ea8 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/compaction/HudiCompactionPlan.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.compaction; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; + +import java.util.List; +import java.util.Map; + +public class HudiCompactionPlan + extends SpecificRecordBase + implements SpecificRecord +{ + private static final Schema SCHEMA = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"HoodieCompactionPlan\",\"namespace\":\"org.apache.hudi.avro.model\",\"fields\":[{\"name\":\"operations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HoodieCompactionOperation\",\"fields\":[{\"name\":\"baseInstantTime\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"deltaFilePaths\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}],\"default\":null},{\"name\":\"dataFilePath\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"fileId\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"partitionPath\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"metrics\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"double\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"bootstrapFilePath\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}}],\"default\":null},{\"name\":\"extraMetadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"version\",\"type\":[\"int\",\"null\"],\"default\":1}]}"); + + private static final SpecificData MODEL = new SpecificData(); + + private List operations; + private Map extraMetadata; + private Integer version; + + public HudiCompactionPlan() {} + + public HudiCompactionPlan(List operations, Map extraMetadata, Integer version) + { + this.operations = ImmutableList.copyOf(operations); + this.extraMetadata = ImmutableMap.copyOf(extraMetadata); + this.version = version; + } + + @Override + public SpecificData getSpecificData() + { + return MODEL; + } + + @Override + public Schema getSchema() + { + return SCHEMA; + } + + public List getOperations() + { + return operations; + } + + public Map getExtraMetadata() + { + return extraMetadata; + } + + public Integer getVersion() + { + return version; + } + + // Used by DatumWriter. Applications should not call. + @Override + public Object get(int field) + { + return switch (field) { + case 0: + yield operations; + case 1: + yield extraMetadata; + case 2: + yield version; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field); + }; + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value = "unchecked") + public void put(int field, Object value) + { + switch (field) { + case 0: + operations = ImmutableList.copyOf((List) value); + break; + case 1: + extraMetadata = ImmutableMap.copyOf((Map) value); + break; + case 2: + version = (Integer) value; + break; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/config/HudiTableConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/config/HudiTableConfig.java new file mode 100644 index 000000000000..7cbafae2c449 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/config/HudiTableConfig.java @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.config; + +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; +import io.trino.plugin.hudi.model.HudiFileFormat; +import io.trino.plugin.hudi.model.HudiTableType; +import io.trino.plugin.hudi.timeline.TimelineLayoutVersion; +import io.trino.spi.TrinoException; + +import java.io.IOException; +import java.util.Optional; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; +import static java.lang.String.format; + +public class HudiTableConfig +{ + public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; + public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup"; + public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name"; + public static final String HOODIE_TABLE_TYPE_KEY = "hoodie.table.type"; + public static final String HOODIE_TABLE_BASE_FILE_FORMAT = "hoodie.table.base.file.format"; + public static final String HOODIE_TIMELINE_LAYOUT_VERSION_KEY = "hoodie.timeline.layout.version"; + private final Properties properties; + + public HudiTableConfig(TrinoFileSystem fs, Location metaPath) + { + this.properties = new Properties(); + Location propertyPath = metaPath.appendPath(HOODIE_PROPERTIES_FILE); + try { + TrinoInputFile inputFile = fs.newInputFile(propertyPath); + try (TrinoInputStream inputStream = inputFile.newStream()) { + properties.load(inputStream); + } + } + catch (IOException e) { + if (!tryLoadingBackupPropertyFile(fs, metaPath)) { + throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Could not load Hoodie properties from %s", propertyPath)); + } + } + checkArgument(properties.containsKey(HOODIE_TABLE_NAME_KEY) && properties.containsKey(HOODIE_TABLE_TYPE_KEY), + "hoodie.properties file seems invalid. Please check for left over `.updated` files if any, " + + "manually copy it to hoodie.properties and retry"); + } + + private boolean tryLoadingBackupPropertyFile(TrinoFileSystem fs, Location metaPath) + { + Location backupPath = metaPath.appendPath(HOODIE_PROPERTIES_FILE_BACKUP); + try { + FileIterator fileIterator = fs.listFiles(metaPath); + while (fileIterator.hasNext()) { + if (fileIterator.next().location().equals(backupPath)) { + // try the backup. this way no query ever fails if update fails midway. + TrinoInputFile inputFile = fs.newInputFile(backupPath); + try (TrinoInputStream inputStream = inputFile.newStream()) { + properties.load(inputStream); + } + return true; + } + } + } + catch (IOException e) { + throw new TrinoException(HUDI_FILESYSTEM_ERROR, "Failed to load Hudi properties from file: " + backupPath, e); + } + return false; + } + + public HudiTableType getTableType() + { + return HudiTableType.valueOf(properties.getProperty(HOODIE_TABLE_TYPE_KEY)); + } + + public HudiFileFormat getBaseFileFormat() + { + if (properties.containsKey(HOODIE_TABLE_BASE_FILE_FORMAT)) { + return HudiFileFormat.valueOf(properties.getProperty(HOODIE_TABLE_BASE_FILE_FORMAT)); + } + return HudiFileFormat.PARQUET; + } + + public Optional getTimelineLayoutVersion() + { + return Optional.ofNullable(properties.getProperty(HOODIE_TIMELINE_LAYOUT_VERSION_KEY)) + .map(Integer::parseInt) + .map(TimelineLayoutVersion::new); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FSUtils.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FSUtils.java new file mode 100644 index 000000000000..6d6422a86da7 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FSUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.files; + +import io.trino.filesystem.Location; +import io.trino.spi.TrinoException; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA; +import static io.trino.plugin.hudi.model.HudiFileFormat.HOODIE_LOG; + +public final class FSUtils +{ + private FSUtils() + { + } + + public static final Pattern LOG_FILE_PATTERN = + Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)))?"); + + public static String getFileIdFromLogPath(Location location) + { + Matcher matcher = LOG_FILE_PATTERN.matcher(location.fileName()); + if (!matcher.find()) { + throw new TrinoException(HUDI_BAD_DATA, "Invalid LogFile " + location); + } + return matcher.group(1); + } + + public static String getBaseCommitTimeFromLogPath(Location location) + { + Matcher matcher = LOG_FILE_PATTERN.matcher(location.fileName()); + if (!matcher.find()) { + throw new TrinoException(HUDI_BAD_DATA, "Invalid LogFile " + location); + } + return matcher.group(2); + } + + public static boolean isLogFile(String fileName) + { + Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); + return matcher.find() && fileName.contains(HOODIE_LOG.getFileExtension()); + } + + public static int getFileVersionFromLog(Location logLocation) + { + Matcher matcher = LOG_FILE_PATTERN.matcher(logLocation.fileName()); + if (!matcher.find()) { + throw new TrinoException(HUDI_BAD_DATA, "Invalid location " + logLocation); + } + return Integer.parseInt(matcher.group(4)); + } + + public static String getWriteTokenFromLogPath(Location location) + { + Matcher matcher = LOG_FILE_PATTERN.matcher(location.fileName()); + if (!matcher.find()) { + throw new TrinoException(HUDI_BAD_DATA, "Invalid location " + location); + } + return matcher.group(6); + } + + public static String getCommitTime(String fullFileName) + { + Matcher matcher = LOG_FILE_PATTERN.matcher(fullFileName); + if (matcher.find() && fullFileName.contains(HOODIE_LOG.getFileExtension())) { + return fullFileName.split("_")[1].split("\\.")[0]; + } + return fullFileName.split("_")[2].split("\\.")[0]; + } + + public static Location getPartitionLocation(Location baseLocation, String partitionPath) + { + return isNullOrEmpty(partitionPath) ? baseLocation : baseLocation.appendPath(partitionPath); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FileSlice.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FileSlice.java new file mode 100644 index 000000000000..a9d93624465d --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FileSlice.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.files; + +import java.util.Optional; +import java.util.TreeSet; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class FileSlice +{ + private final String baseInstantTime; + + private Optional baseFile; + + private final TreeSet logFiles; + + public FileSlice(String baseInstantTime) + { + this.baseInstantTime = requireNonNull(baseInstantTime, "baseInstantTime is null"); + this.baseFile = Optional.empty(); + this.logFiles = new TreeSet<>(HudiLogFile.getReverseLogFileComparator()); + } + + public void setBaseFile(HudiBaseFile baseFile) + { + this.baseFile = Optional.ofNullable(baseFile); + } + + public void addLogFile(HudiLogFile logFile) + { + this.logFiles.add(logFile); + } + + public String getBaseInstantTime() + { + return baseInstantTime; + } + + public Optional getBaseFile() + { + return baseFile; + } + + public boolean isEmpty() + { + return (baseFile == null) && (logFiles.isEmpty()); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("baseInstantTime", baseInstantTime) + .add("baseFile", baseFile) + .add("logFiles", logFiles) + .toString(); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiBaseFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiBaseFile.java new file mode 100644 index 000000000000..c9c651d2a5b6 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiBaseFile.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.files; + +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.Location; + +import java.util.Objects; + +import static io.trino.plugin.hudi.files.FSUtils.isLogFile; +import static java.util.Objects.requireNonNull; + +public class HudiBaseFile +{ + private transient FileEntry fileEntry; + private final String fullPath; + private final String fileName; + private long fileLen; + + public HudiBaseFile(FileEntry fileEntry) + { + this(fileEntry, + fileEntry.location().path(), + fileEntry.location().fileName(), + fileEntry.length()); + } + + private HudiBaseFile(FileEntry fileEntry, String fullPath, String fileName, long fileLen) + { + this.fileEntry = requireNonNull(fileEntry, "fileEntry is null"); + this.fullPath = requireNonNull(fullPath, "fullPath is null"); + this.fileLen = fileLen; + this.fileName = requireNonNull(fileName, "fileName is null"); + } + + public String getPath() + { + return fullPath; + } + + public Location getFullPath() + { + if (fileEntry != null) { + return fileEntry.location(); + } + + return Location.of(fullPath); + } + + public String getFileName() + { + return fileName; + } + + public FileEntry getFileEntry() + { + return fileEntry; + } + + public String getFileId() + { + return getFileName().split("_")[0]; + } + + public String getCommitTime() + { + String fileName = getFileName(); + if (isLogFile(fileName)) { + return fileName.split("_")[1].split("\\.")[0]; + } + return fileName.split("_")[2].split("\\.")[0]; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HudiBaseFile dataFile = (HudiBaseFile) o; + return Objects.equals(fullPath, dataFile.fullPath); + } + + @Override + public int hashCode() + { + return Objects.hash(fullPath); + } + + @Override + public String toString() + { + return "BaseFile{fullPath=" + fullPath + ", fileLen=" + fileLen + '}'; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFileGroup.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFileGroup.java new file mode 100644 index 000000000000..7679db5f5a3b --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFileGroup.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.files; + +import io.trino.plugin.hudi.model.HudiInstant; +import io.trino.plugin.hudi.timeline.HudiTimeline; + +import java.util.Comparator; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.Stream; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static io.trino.plugin.hudi.timeline.HudiTimeline.LESSER_THAN_OR_EQUALS; +import static io.trino.plugin.hudi.timeline.HudiTimeline.compareTimestamps; +import static java.util.Objects.requireNonNull; + +public class HudiFileGroup +{ + public static Comparator getReverseCommitTimeComparator() + { + return Comparator.reverseOrder(); + } + + private final HudiFileGroupId fileGroupId; + + private final TreeMap fileSlices; + + private final HudiTimeline timeline; + + private final Optional lastInstant; + + public HudiFileGroup(String partitionPath, String id, HudiTimeline timeline) + { + this(new HudiFileGroupId(partitionPath, id), timeline); + } + + public HudiFileGroup(HudiFileGroupId fileGroupId, HudiTimeline timeline) + { + this.fileGroupId = requireNonNull(fileGroupId, "fileGroupId is null"); + this.fileSlices = new TreeMap<>(HudiFileGroup.getReverseCommitTimeComparator()); + this.lastInstant = timeline.lastInstant(); + this.timeline = timeline; + } + + public void addNewFileSliceAtInstant(String baseInstantTime) + { + if (!fileSlices.containsKey(baseInstantTime)) { + fileSlices.put(baseInstantTime, new FileSlice(baseInstantTime)); + } + } + + public void addBaseFile(HudiBaseFile dataFile) + { + if (!fileSlices.containsKey(dataFile.getCommitTime())) { + fileSlices.put(dataFile.getCommitTime(), new FileSlice(dataFile.getCommitTime())); + } + fileSlices.get(dataFile.getCommitTime()).setBaseFile(dataFile); + } + + public void addLogFile(HudiLogFile logFile) + { + if (!fileSlices.containsKey(logFile.getBaseCommitTime())) { + fileSlices.put(logFile.getBaseCommitTime(), new FileSlice(logFile.getBaseCommitTime())); + } + fileSlices.get(logFile.getBaseCommitTime()).addLogFile(logFile); + } + + public String getPartitionPath() + { + return fileGroupId.getPartitionPath(); + } + + public HudiFileGroupId getFileGroupId() + { + return fileGroupId; + } + + private boolean isFileSliceCommitted(FileSlice slice) + { + if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) { + return false; + } + + return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()); + } + + public Stream getAllFileSlices() + { + if (!timeline.empty()) { + return fileSlices.values().stream().filter(this::isFileSliceCommitted); + } + return Stream.empty(); + } + + public Stream getAllBaseFiles() + { + return getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).map(slice -> slice.getBaseFile().get()); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("fileGroupId", fileGroupId) + .add("fileSlices", fileSlices) + .add("timeline", timeline) + .add("lastInstant", lastInstant) + .toString(); + } + + public HudiTimeline getTimeline() + { + return timeline; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFileGroupId.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFileGroupId.java new file mode 100644 index 000000000000..2470b3093fd3 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFileGroupId.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.files; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class HudiFileGroupId + implements Comparable +{ + private final String partitionPath; + + private final String fileId; + + public HudiFileGroupId(String partitionPath, String fileId) + { + this.partitionPath = requireNonNull(partitionPath, "partitionPath is null"); + this.fileId = requireNonNull(fileId, "partitionPath is null"); + } + + public String getPartitionPath() + { + return partitionPath; + } + + public String getFileId() + { + return fileId; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HudiFileGroupId that = (HudiFileGroupId) o; + return Objects.equals(partitionPath, that.partitionPath) && Objects.equals(fileId, that.fileId); + } + + @Override + public int hashCode() + { + return Objects.hash(partitionPath, fileId); + } + + @Override + public String toString() + { + return "HoodieFileGroupId{partitionPath='" + partitionPath + '\'' + ", fileId='" + fileId + '\'' + '}'; + } + + @Override + public int compareTo(HudiFileGroupId o) + { + int ret = partitionPath.compareTo(o.partitionPath); + if (ret == 0) { + ret = fileId.compareTo(o.fileId); + } + return ret; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiLogFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiLogFile.java new file mode 100644 index 000000000000..dde08427b23b --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiLogFile.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.files; + +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.Location; + +import java.util.Comparator; +import java.util.Objects; + +import static io.trino.plugin.hudi.files.FSUtils.getBaseCommitTimeFromLogPath; +import static io.trino.plugin.hudi.files.FSUtils.getFileIdFromLogPath; +import static io.trino.plugin.hudi.files.FSUtils.getFileVersionFromLog; +import static io.trino.plugin.hudi.files.FSUtils.getWriteTokenFromLogPath; + +public class HudiLogFile +{ + private static final Comparator LOG_FILE_COMPARATOR_REVERSED = new HudiLogFile.LogFileComparator().reversed(); + + private final String pathStr; + private final long fileLen; + + public HudiLogFile(FileEntry fileStatus) + { + this.pathStr = fileStatus.location().toString(); + this.fileLen = fileStatus.length(); + } + + public String getFileId() + { + return getFileIdFromLogPath(getPath()); + } + + public String getBaseCommitTime() + { + return getBaseCommitTimeFromLogPath(getPath()); + } + + public int getLogVersion() + { + return getFileVersionFromLog(getPath()); + } + + public String getLogWriteToken() + { + return getWriteTokenFromLogPath(getPath()); + } + + public Location getPath() + { + return Location.of(pathStr); + } + + public static Comparator getReverseLogFileComparator() + { + return LOG_FILE_COMPARATOR_REVERSED; + } + + public static class LogFileComparator + implements Comparator + { + private transient Comparator writeTokenComparator; + + private Comparator getWriteTokenComparator() + { + if (null == writeTokenComparator) { + // writeTokenComparator is not serializable. Hence, lazy loading + writeTokenComparator = Comparator.nullsFirst(Comparator.naturalOrder()); + } + return writeTokenComparator; + } + + @Override + public int compare(HudiLogFile o1, HudiLogFile o2) + { + String baseInstantTime1 = o1.getBaseCommitTime(); + String baseInstantTime2 = o2.getBaseCommitTime(); + + if (baseInstantTime1.equals(baseInstantTime2)) { + if (o1.getLogVersion() == o2.getLogVersion()) { + // Compare by write token when base-commit and log-version is same + return getWriteTokenComparator().compare(o1.getLogWriteToken(), o2.getLogWriteToken()); + } + + // compare by log-version when base-commit is same + return Integer.compare(o1.getLogVersion(), o2.getLogVersion()); + } + + // compare by base-commits + return baseInstantTime1.compareTo(baseInstantTime2); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HudiLogFile that = (HudiLogFile) o; + return Objects.equals(pathStr, that.pathStr); + } + + @Override + public int hashCode() + { + return Objects.hash(pathStr); + } + + @Override + public String toString() + { + return "HoodieLogFile{pathStr='" + pathStr + '\'' + ", fileLen=" + fileLen + '}'; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HoodieFileFormat.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiFileFormat.java similarity index 93% rename from plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HoodieFileFormat.java rename to plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiFileFormat.java index 34f5c6b96a21..02bed7c56ef8 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HoodieFileFormat.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiFileFormat.java @@ -15,7 +15,7 @@ import static java.util.Objects.requireNonNull; -public enum HoodieFileFormat +public enum HudiFileFormat { PARQUET(".parquet"), HOODIE_LOG(".log"), @@ -24,7 +24,7 @@ public enum HoodieFileFormat private final String extension; - HoodieFileFormat(String extension) + HudiFileFormat(String extension) { this.extension = requireNonNull(extension, "extension is null"); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiInstant.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiInstant.java new file mode 100644 index 000000000000..5def3a0f9482 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiInstant.java @@ -0,0 +1,223 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.model; + +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.FileEntry; +import io.trino.plugin.hudi.timeline.HudiTimeline; + +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; + +import static io.trino.plugin.hudi.timeline.HudiTimeline.INFLIGHT_EXTENSION; +import static io.trino.plugin.hudi.timeline.HudiTimeline.REQUESTED_EXTENSION; +import static java.util.Objects.requireNonNull; + +public class HudiInstant + implements Comparable +{ + public enum State + { + // Requested State (valid state for Compaction) + REQUESTED, + // Inflight instant + INFLIGHT, + // Committed instant + COMPLETED, + // Invalid instant + NIL + } + + private static final Map COMPARABLE_ACTIONS = + ImmutableMap.of(HudiTimeline.COMPACTION_ACTION, HudiTimeline.COMMIT_ACTION); + + private static final Comparator ACTION_COMPARATOR = + Comparator.comparing(instant -> getComparableAction(instant.getAction())); + + private static final Comparator COMPARATOR = Comparator.comparing(HudiInstant::getTimestamp) + .thenComparing(ACTION_COMPARATOR).thenComparing(HudiInstant::getState); + + public static String getComparableAction(String action) + { + return COMPARABLE_ACTIONS.getOrDefault(action, action); + } + + public static String getTimelineFileExtension(String fileName) + { + requireNonNull(fileName); + int dotIndex = fileName.indexOf('.'); + return dotIndex == -1 ? "" : fileName.substring(dotIndex); + } + + private HudiInstant.State state = HudiInstant.State.COMPLETED; + private String action; + private String timestamp; + + /** + * Load the instant from the meta FileStatus. + */ + public HudiInstant(FileEntry fileEntry) + { + // First read the instant timestamp. [==>20170101193025<==].commit + String fileName = fileEntry.location().fileName(); + String fileExtension = getTimelineFileExtension(fileName); + timestamp = fileName.replace(fileExtension, ""); + + // Next read the action for this marker + action = fileExtension.replaceFirst(".", ""); + if (action.equals("inflight")) { + // This is to support backwards compatibility on how in-flight commit files were written + // General rule is inflight extension is ..inflight, but for commit it is .inflight + action = "commit"; + state = HudiInstant.State.INFLIGHT; + } + else if (action.contains(INFLIGHT_EXTENSION)) { + state = HudiInstant.State.INFLIGHT; + action = action.replace(INFLIGHT_EXTENSION, ""); + } + else if (action.contains(REQUESTED_EXTENSION)) { + state = HudiInstant.State.REQUESTED; + action = action.replace(REQUESTED_EXTENSION, ""); + } + } + + public HudiInstant(HudiInstant.State state, String action, String timestamp) + { + this.state = state; + this.action = action; + this.timestamp = timestamp; + } + + public boolean isCompleted() + { + return state == HudiInstant.State.COMPLETED; + } + + public boolean isInflight() + { + return state == HudiInstant.State.INFLIGHT; + } + + public boolean isRequested() + { + return state == HudiInstant.State.REQUESTED; + } + + public String getAction() + { + return action; + } + + public String getTimestamp() + { + return timestamp; + } + + public String getFileName() + { + if (HudiTimeline.COMMIT_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightCommitFileName(timestamp) + : isRequested() ? HudiTimeline.makeRequestedCommitFileName(timestamp) + : HudiTimeline.makeCommitFileName(timestamp); + } + else if (HudiTimeline.CLEAN_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightCleanerFileName(timestamp) + : isRequested() ? HudiTimeline.makeRequestedCleanerFileName(timestamp) + : HudiTimeline.makeCleanerFileName(timestamp); + } + else if (HudiTimeline.ROLLBACK_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightRollbackFileName(timestamp) + : isRequested() ? HudiTimeline.makeRequestedRollbackFileName(timestamp) + : HudiTimeline.makeRollbackFileName(timestamp); + } + else if (HudiTimeline.SAVEPOINT_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightSavePointFileName(timestamp) + : HudiTimeline.makeSavePointFileName(timestamp); + } + else if (HudiTimeline.DELTA_COMMIT_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightDeltaFileName(timestamp) + : isRequested() ? HudiTimeline.makeRequestedDeltaFileName(timestamp) + : HudiTimeline.makeDeltaFileName(timestamp); + } + else if (HudiTimeline.COMPACTION_ACTION.equals(action)) { + if (isInflight()) { + return HudiTimeline.makeInflightCompactionFileName(timestamp); + } + else if (isRequested()) { + return HudiTimeline.makeRequestedCompactionFileName(timestamp); + } + else { + return HudiTimeline.makeCommitFileName(timestamp); + } + } + else if (HudiTimeline.RESTORE_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightRestoreFileName(timestamp) + : isRequested() ? HudiTimeline.makeRequestedRestoreFileName(timestamp) + : HudiTimeline.makeRestoreFileName(timestamp); + } + else if (HudiTimeline.REPLACE_COMMIT_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightReplaceFileName(timestamp) + : isRequested() ? HudiTimeline.makeRequestedReplaceFileName(timestamp) + : HudiTimeline.makeReplaceFileName(timestamp); + } + else if (HudiTimeline.INDEXING_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightIndexFileName(timestamp) + : isRequested() ? HudiTimeline.makeRequestedIndexFileName(timestamp) + : HudiTimeline.makeIndexCommitFileName(timestamp); + } + else if (HudiTimeline.SCHEMA_COMMIT_ACTION.equals(action)) { + return isInflight() ? HudiTimeline.makeInflightSchemaFileName(timestamp) + : isRequested() ? HudiTimeline.makeRequestSchemaFileName(timestamp) + : HudiTimeline.makeSchemaFileName(timestamp); + } + throw new IllegalArgumentException("Cannot get file name for unknown action " + action); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HudiInstant that = (HudiInstant) o; + return state == that.state && Objects.equals(action, that.action) && Objects.equals(timestamp, that.timestamp); + } + + public HudiInstant.State getState() + { + return state; + } + + @Override + public int hashCode() + { + return Objects.hash(state, action, timestamp); + } + + @Override + public int compareTo(HudiInstant o) + { + return COMPARATOR.compare(this, o); + } + + @Override + public String toString() + { + return "[" + ((isInflight() || isRequested()) ? "==>" : "") + timestamp + "__" + action + "__" + state + "]"; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiReplaceCommitMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiReplaceCommitMetadata.java new file mode 100644 index 000000000000..60212459625c --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiReplaceCommitMetadata.java @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.MoreObjects.toStringHelper; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class HudiReplaceCommitMetadata +{ + private Map> partitionToReplaceFileIds; + private Boolean compacted; + + // for ser/deser + public HudiReplaceCommitMetadata() + { + partitionToReplaceFileIds = ImmutableMap.of(); + compacted = false; + } + + public Map> getPartitionToReplaceFileIds() + { + return partitionToReplaceFileIds; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + HudiReplaceCommitMetadata that = (HudiReplaceCommitMetadata) o; + + return compacted.equals(that.compacted); + } + + @Override + public int hashCode() + { + return compacted.hashCode(); + } + + public static T fromBytes(byte[] bytes, ObjectMapper objectMapper, Class clazz) + throws IOException + { + try { + String jsonStr = new String(bytes, StandardCharsets.UTF_8); + if (jsonStr == null || jsonStr.isEmpty()) { + return clazz.getConstructor().newInstance(); + } + return objectMapper.readValue(jsonStr, clazz); + } + catch (Exception e) { + throw new IOException("unable to read commit metadata", e); + } + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("partitionToReplaceFileIds", partitionToReplaceFileIds) + .add("compacted", compacted) + .toString(); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HoodieTableType.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiTableType.java similarity index 97% rename from plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HoodieTableType.java rename to plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiTableType.java index 9608e6726b14..da93f80d95b0 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HoodieTableType.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiTableType.java @@ -22,7 +22,7 @@ *
  • MERGE_ON_READ - Speeds up upserts, by delaying merge until enough work piles up. * */ -public enum HoodieTableType +public enum HudiTableType { COPY_ON_WRITE, MERGE_ON_READ diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java index d624bb360777..7ae74b3b5023 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java @@ -114,17 +114,13 @@ public void loadPartitionInfo(Optional partition) this.hivePartitionKeys = buildPartitionKeys(partitionColumns, partition.get().getValues()); } - /* - * Given a base partition and a partition path, return relative path of partition path to the base path. - * This is equivalent to org.apache.hudi.common.fs.FSUtils#getRelativePartitionPath - */ private static String getRelativePartitionPath(Location baseLocation, Location fullPartitionLocation) { String basePath = baseLocation.path(); String fullPartitionPath = fullPartitionLocation.path(); if (!fullPartitionPath.startsWith(basePath)) { - throw new IllegalArgumentException("Partition path does not belong to base-path"); + throw new IllegalArgumentException("Partition location does not belong to base-location"); } String baseLocationParent = baseLocation.parentDirectory().path(); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java index 9d60f54462d7..431e2bec2bc3 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java @@ -21,28 +21,21 @@ import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hudi.HudiFileStatus; import io.trino.plugin.hudi.HudiTableHandle; +import io.trino.plugin.hudi.files.HudiBaseFile; import io.trino.plugin.hudi.partition.HiveHudiPartitionInfo; import io.trino.plugin.hudi.partition.HudiPartitionInfo; -import io.trino.spi.TrinoException; +import io.trino.plugin.hudi.table.HudiTableFileSystemView; +import io.trino.plugin.hudi.table.HudiTableMetaClient; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.TupleDomain; -import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.view.FileSystemViewManager; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT; -import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getFileStatus; public class HudiReadOptimizedDirectoryLister implements HudiDirectoryLister @@ -52,17 +45,15 @@ public class HudiReadOptimizedDirectoryLister private final Table hiveTable; private final SchemaTableName tableName; private final List partitionColumnHandles; - private final HoodieTableFileSystemView fileSystemView; + private final HudiTableFileSystemView fileSystemView; private final TupleDomain partitionKeysFilter; private final List partitionColumns; private List hivePartitionNames; public HudiReadOptimizedDirectoryLister( - HoodieMetadataConfig metadataConfig, - HoodieEngineContext engineContext, HudiTableHandle tableHandle, - HoodieTableMetaClient metaClient, + HudiTableMetaClient metaClient, HiveMetastore hiveMetastore, Table hiveTable, List partitionColumnHandles) @@ -72,7 +63,7 @@ public HudiReadOptimizedDirectoryLister( this.hiveMetastore = hiveMetastore; this.hiveTable = hiveTable; this.partitionColumnHandles = partitionColumnHandles; - this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, metadataConfig); + this.fileSystemView = new HudiTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); this.partitionKeysFilter = MetastoreUtil.computePartitionKeyFilter(partitionColumnHandles, tableHandle.getPartitionPredicates()); this.partitionColumns = hiveTable.getPartitionColumns(); } @@ -94,31 +85,24 @@ public List getPartitionsToScan() tableHandle.getPartitionPredicates(), hiveTable, hiveMetastore)) - .collect(Collectors.toList()); + .collect(toImmutableList()); return allPartitionInfoList.stream() .filter(partitionInfo -> partitionInfo.getHivePartitionKeys().isEmpty() || partitionInfo.doesMatchPredicates()) - .collect(Collectors.toList()); + .collect(toImmutableList()); } @Override public List listStatus(HudiPartitionInfo partitionInfo) { return fileSystemView.getLatestBaseFiles(partitionInfo.getRelativePartitionPath()) - .map(baseFile -> { - try { - return getFileStatus(baseFile); - } - catch (IOException e) { - throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error getting file status of " + baseFile.getPath(), e); - } - }) - .map(status -> new HudiFileStatus( - status.getPath(), + .map(HudiBaseFile::getFileEntry) + .map(fileEntry -> new HudiFileStatus( + fileEntry.location(), false, - status.getLen(), - status.getModificationTime(), - status.getBlockSize())) + fileEntry.length(), + fileEntry.lastModified().toEpochMilli(), + fileEntry.blocks().map(listOfBlocks -> (!listOfBlocks.isEmpty()) ? listOfBlocks.get(0).length() : 0).orElse(0L))) .collect(toImmutableList()); } @@ -127,7 +111,7 @@ private List getPartitionNamesFromHiveMetastore(TupleDomain part return hiveMetastore.getPartitionNamesByFilter( tableName.getSchemaName(), tableName.getTableName(), - partitionColumns.stream().map(Column::getName).collect(Collectors.toList()), + partitionColumns.stream().map(Column::getName).collect(toImmutableList()), partitionKeysFilter).orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName())); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java index a885a09eb2de..4cd6d54c4688 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java @@ -19,7 +19,6 @@ import io.trino.plugin.hudi.HudiSplit; import io.trino.plugin.hudi.HudiTableHandle; import io.trino.spi.TrinoException; -import org.apache.hudi.hadoop.PathWithBootstrapFileStatus; import java.util.List; @@ -45,14 +44,14 @@ public HudiSplitFactory( public List createSplits(List partitionKeys, HudiFileStatus fileStatus) { if (fileStatus.isDirectory()) { - throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Not a valid path: %s", fileStatus.path())); + throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Not a valid location: %s", fileStatus.location())); } long fileSize = fileStatus.length(); - if (fileSize == 0 || fileStatus.path() instanceof PathWithBootstrapFileStatus) { + if (fileSize == 0) { return ImmutableList.of(new HudiSplit( - fileStatus.path().toString(), + fileStatus.location().toString(), 0, fileSize, fileSize, @@ -69,7 +68,7 @@ public List createSplits(List partitionKeys, HudiFi long bytesRemaining = fileSize; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { splits.add(new HudiSplit( - fileStatus.path().toString(), + fileStatus.location().toString(), fileSize - bytesRemaining, splitSize, fileSize, @@ -82,7 +81,7 @@ public List createSplits(List partitionKeys, HudiFi } if (bytesRemaining > 0) { splits.add(new HudiSplit( - fileStatus.path().toString(), + fileStatus.location().toString(), fileSize - bytesRemaining, bytesRemaining, fileSize, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java new file mode 100644 index 000000000000..84ac73e4e9c4 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java @@ -0,0 +1,485 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.table; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import io.airlift.json.ObjectMapperProvider; +import io.airlift.log.Logger; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.plugin.hudi.compaction.CompactionOperation; +import io.trino.plugin.hudi.compaction.HudiCompactionOperation; +import io.trino.plugin.hudi.compaction.HudiCompactionPlan; +import io.trino.plugin.hudi.files.HudiBaseFile; +import io.trino.plugin.hudi.files.HudiFileGroup; +import io.trino.plugin.hudi.files.HudiFileGroupId; +import io.trino.plugin.hudi.files.HudiLogFile; +import io.trino.plugin.hudi.model.HudiFileFormat; +import io.trino.plugin.hudi.model.HudiInstant; +import io.trino.plugin.hudi.model.HudiReplaceCommitMetadata; +import io.trino.plugin.hudi.timeline.HudiTimeline; +import io.trino.spi.TrinoException; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.io.DatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.commons.lang3.tuple.ImmutablePair; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Stream; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA; +import static io.trino.plugin.hudi.files.FSUtils.LOG_FILE_PATTERN; +import static io.trino.plugin.hudi.files.FSUtils.getPartitionLocation; +import static java.util.stream.Collectors.groupingBy; + +public class HudiTableFileSystemView +{ + private static final Logger LOG = Logger.get(HudiTableFileSystemView.class); + private static final Integer VERSION_2 = 2; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get(); + // Locks to control concurrency. Sync operations use write-lock blocking all fetch operations. + // For the common-case, we allow concurrent read of single or multiple partitions + private final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = globalLock.readLock(); + // Used to concurrently load and populate partition views + private final ConcurrentHashMap addedPartitions = new ConcurrentHashMap<>(4096); + + private boolean closed; + + private Map> partitionToFileGroupsMap; + private HudiTableMetaClient metaClient; + + private Map> fgIdToPendingCompaction; + + private HudiTimeline visibleCommitsAndCompactionTimeline; + + private Map fgIdToReplaceInstants; + + public HudiTableFileSystemView(HudiTableMetaClient metaClient, HudiTimeline visibleActiveTimeline) + { + partitionToFileGroupsMap = new ConcurrentHashMap<>(); + this.metaClient = metaClient; + this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getWriteTimeline(); + resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline); + resetPendingCompactionOperations(getAllPendingCompactionOperations(metaClient) + .values().stream() + .map(pair -> ImmutablePair.of(pair.getKey(), CompactionOperation.convertFromAvroRecordInstance(pair.getValue())))); + } + + private static Map> getAllPendingCompactionOperations( + HudiTableMetaClient metaClient) + { + List> pendingCompactionPlanWithInstants = + getAllPendingCompactionPlans(metaClient); + + Map> fgIdToPendingCompactionWithInstantMap = new HashMap<>(); + pendingCompactionPlanWithInstants.stream() + .flatMap(instantPlanPair -> getPendingCompactionOperations(instantPlanPair.getKey(), instantPlanPair.getValue())) + .forEach(pair -> { + if (fgIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { + HudiCompactionOperation operation = pair.getValue().getValue(); + HudiCompactionOperation anotherOperation = fgIdToPendingCompactionWithInstantMap.get(pair.getKey()).getValue(); + + if (!operation.equals(anotherOperation)) { + String msg = "Hudi File Id (" + pair.getKey() + ") has more than 1 pending compactions. Instants: " + + pair.getValue() + ", " + fgIdToPendingCompactionWithInstantMap.get(pair.getKey()); + throw new IllegalStateException(msg); + } + } + fgIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); + }); + return fgIdToPendingCompactionWithInstantMap; + } + + private static List> getAllPendingCompactionPlans( + HudiTableMetaClient metaClient) + { + List pendingCompactionInstants = + metaClient.getActiveTimeline() + .filterPendingCompactionTimeline() + .getInstants() + .collect(toImmutableList()); + return pendingCompactionInstants.stream() + .map(instant -> { + try { + return ImmutablePair.of(instant, getCompactionPlan(metaClient, instant.getTimestamp())); + } + catch (IOException e) { + throw new TrinoException(HUDI_BAD_DATA, e); + } + }) + .collect(toImmutableList()); + } + + private static HudiCompactionPlan getCompactionPlan(HudiTableMetaClient metaClient, String compactionInstant) + throws IOException + { + HudiCompactionPlan compactionPlan = deserializeAvroMetadata( + metaClient + .getActiveTimeline() + .readCompactionPlanAsBytes(HudiTimeline.getCompactionRequestedInstant(compactionInstant)).get(), + HudiCompactionPlan.class); + return upgradeToLatest(compactionPlan, compactionPlan.getVersion()); + } + + private static HudiCompactionPlan upgradeToLatest(HudiCompactionPlan metadata, int metadataVersion) + { + if (metadataVersion == VERSION_2) { + return metadata; + } + checkState(metadataVersion == 1, "Lowest supported metadata version is 1"); + List v2CompactionOperationList = new ArrayList<>(); + if (null != metadata.getOperations()) { + v2CompactionOperationList = metadata.getOperations().stream() + .map(compactionOperation -> + HudiCompactionOperation.newBuilder() + .setBaseInstantTime(compactionOperation.getBaseInstantTime()) + .setFileId(compactionOperation.getFileId()) + .setPartitionPath(compactionOperation.getPartitionPath()) + .setMetrics(compactionOperation.getMetrics()) + .setDataFilePath(compactionOperation.getDataFilePath() == null ? null : Location.of(compactionOperation.getDataFilePath()).fileName()) + .setDeltaFilePaths(compactionOperation.getDeltaFilePaths().stream().map(filePath -> Location.of(filePath).fileName()).collect(toImmutableList())) + .build()) + .collect(toImmutableList()); + } + return new HudiCompactionPlan(v2CompactionOperationList, metadata.getExtraMetadata(), VERSION_2); + } + + private static T deserializeAvroMetadata(byte[] bytes, Class clazz) + throws IOException + { + DatumReader reader = new SpecificDatumReader<>(clazz); + FileReader fileReader = DataFileReader.openReader(new SeekableByteArrayInput(bytes), reader); + checkState(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz); + return fileReader.next(); + } + + private static Stream>> getPendingCompactionOperations( + HudiInstant instant, HudiCompactionPlan compactionPlan) + { + List ops = compactionPlan.getOperations(); + if (null != ops) { + return ops.stream() + .map(op -> + ImmutablePair.of( + new HudiFileGroupId(op.getPartitionPath(), op.getFileId()), + ImmutablePair.of(instant.getTimestamp(), op))); + } + return Stream.empty(); + } + + private void resetPendingCompactionOperations(Stream> operations) + { + this.fgIdToPendingCompaction = operations + .map(entry -> ImmutablePair.of( + entry.getValue().getFileGroupId(), + ImmutablePair.of(entry.getKey(), entry.getValue()))) + .collect(toImmutableMap(ImmutablePair::getKey, ImmutablePair::getValue)); + } + + private void resetFileGroupsReplaced(HudiTimeline timeline) + { + // for each REPLACE instant, get map of (partitionPath -> deleteFileGroup) + HudiTimeline replacedTimeline = timeline.getCompletedReplaceTimeline(); + Map replacedFileGroups = replacedTimeline.getInstants() + .flatMap(instant -> { + try { + HudiReplaceCommitMetadata replaceMetadata = HudiReplaceCommitMetadata.fromBytes( + metaClient.getActiveTimeline().getInstantDetails(instant).get(), + OBJECT_MAPPER, + HudiReplaceCommitMetadata.class); + + // get replace instant mapping for each partition, fileId + return replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> entry.getValue().stream().map(e -> + new AbstractMap.SimpleEntry<>(new HudiFileGroupId(entry.getKey(), e), instant))); + } + catch (IOException e) { + throw new TrinoException(HUDI_BAD_DATA, "error reading commit metadata for " + instant); + } + }) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + fgIdToReplaceInstants = new ConcurrentHashMap<>(replacedFileGroups); + } + + public final Stream getLatestBaseFiles(String partitionStr) + { + try { + readLock.lock(); + String partitionPath = formatPartitionKey(partitionStr); + ensurePartitionLoadedCorrectly(partitionPath); + return fetchLatestBaseFiles(partitionPath) + .filter(hudiBaseFile -> !isFileGroupReplaced(partitionPath, hudiBaseFile.getFileId())); + } + finally { + readLock.unlock(); + } + } + + private boolean isFileGroupReplaced(String partitionPath, String fileId) + { + return isFileGroupReplaced(new HudiFileGroupId(partitionPath, fileId)); + } + + private String formatPartitionKey(String partitionStr) + { + return partitionStr.endsWith("/") ? partitionStr.substring(0, partitionStr.length() - 1) : partitionStr; + } + + private void ensurePartitionLoadedCorrectly(String partition) + { + checkState(!isClosed(), "View is already closed"); + + addedPartitions.computeIfAbsent(partition, (partitionPathStr) -> { + long beginTs = System.currentTimeMillis(); + if (!isPartitionAvailableInStore(partitionPathStr)) { + // Not loaded yet + try { + LOG.info("Building file system view for partition (" + partitionPathStr + ")"); + + Location partitionLocation = getPartitionLocation(metaClient.getBasePath(), partitionPathStr); + FileIterator partitionFiles = listPartition(partitionLocation); + List groups = addFilesToView(partitionFiles); + + if (groups.isEmpty()) { + storePartitionView(partitionPathStr, new ArrayList<>()); + } + } + catch (IOException e) { + throw new TrinoException(HUDI_BAD_DATA, "Failed to list base files in partition " + partitionPathStr, e); + } + } + else { + LOG.debug("View already built for Partition :" + partitionPathStr + ", FOUND is "); + } + long endTs = System.currentTimeMillis(); + LOG.debug("Time to load partition (" + partitionPathStr + ") =" + (endTs - beginTs)); + return true; + }); + } + + protected boolean isPartitionAvailableInStore(String partitionPath) + { + return partitionToFileGroupsMap.containsKey(partitionPath); + } + + private FileIterator listPartition(Location partitionLocation) + throws IOException + { + FileIterator fileIterator = metaClient.getFileSystem().listFiles(partitionLocation); + if (fileIterator.hasNext()) { + return fileIterator; + } + try (OutputStream ignored = metaClient.getFileSystem().newOutputFile(partitionLocation).create()) { + return FileIterator.empty(); + } + } + + public List addFilesToView(FileIterator partitionFiles) + throws IOException + { + List fileGroups = buildFileGroups(partitionFiles, visibleCommitsAndCompactionTimeline, true); + // Group by partition for efficient updates for both InMemory and DiskBased structures. + fileGroups.stream() + .collect(groupingBy(HudiFileGroup::getPartitionPath)) + .forEach((partition, value) -> { + if (!isPartitionAvailableInStore(partition)) { + storePartitionView(partition, value); + } + }); + return fileGroups; + } + + private List buildFileGroups( + FileIterator partitionFiles, + HudiTimeline timeline, + boolean addPendingCompactionFileSlice) + throws IOException + { + List hoodieBaseFiles = new ArrayList<>(); + List hudiLogFiles = new ArrayList<>(); + String baseHoodieFileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + while (partitionFiles.hasNext()) { + FileEntry fileEntry = partitionFiles.next(); + if (fileEntry.location().path().contains(baseHoodieFileExtension)) { + hoodieBaseFiles.add(new HudiBaseFile(fileEntry)); + } + String fileName = fileEntry.location().fileName(); + if (LOG_FILE_PATTERN.matcher(fileName).matches() && fileName.contains(HudiFileFormat.HOODIE_LOG.getFileExtension())) { + hudiLogFiles.add(new HudiLogFile(fileEntry)); + } + } + return buildFileGroups(hoodieBaseFiles.stream(), hudiLogFiles.stream(), timeline, addPendingCompactionFileSlice); + } + + private List buildFileGroups( + Stream baseFileStream, + Stream logFileStream, + HudiTimeline timeline, + boolean addPendingCompactionFileSlice) + { + Map, List> baseFiles = baseFileStream + .collect(groupingBy(baseFile -> { + String partitionPathStr = getPartitionPathFor(baseFile); + return ImmutablePair.of(partitionPathStr, baseFile.getFileId()); + })); + + Map, List> logFiles = logFileStream + .collect(groupingBy((logFile) -> { + String partitionPathStr = getRelativePartitionPath(metaClient.getBasePath(), logFile.getPath().parentDirectory()); + return ImmutablePair.of(partitionPathStr, logFile.getFileId()); + })); + + Set> fileIdSet = new HashSet<>(baseFiles.keySet()); + fileIdSet.addAll(logFiles.keySet()); + + List fileGroups = new ArrayList<>(); + fileIdSet.forEach(pair -> { + String fileId = pair.getValue(); + String partitionPath = pair.getKey(); + HudiFileGroup group = new HudiFileGroup(partitionPath, fileId, timeline); + if (baseFiles.containsKey(pair)) { + baseFiles.get(pair).forEach(group::addBaseFile); + } + if (logFiles.containsKey(pair)) { + logFiles.get(pair).forEach(group::addLogFile); + } + + if (addPendingCompactionFileSlice) { + Optional> pendingCompaction = + getPendingCompactionOperationWithInstant(group.getFileGroupId()); + // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears + // so that any new ingestion uses the correct base-instant + pendingCompaction.ifPresent(stringCompactionOperationImmutablePair -> + group.addNewFileSliceAtInstant(stringCompactionOperationImmutablePair.getKey())); + } + fileGroups.add(group); + }); + + return fileGroups; + } + + private String getPartitionPathFor(HudiBaseFile baseFile) + { + return getRelativePartitionPath(metaClient.getBasePath(), baseFile.getFullPath().parentDirectory()); + } + + private String getRelativePartitionPath(Location basePath, Location fullPartitionPath) + { + String fullPartitionPathStr = fullPartitionPath.path(); + + if (!fullPartitionPathStr.startsWith(basePath.path())) { + throw new IllegalArgumentException("Partition location does not belong to base-location"); + } + + int partitionStartIndex = fullPartitionPath.path().indexOf(basePath.fileName(), basePath.parentDirectory().path().length()); + // Partition-Path could be empty for non-partitioned tables + if (partitionStartIndex + basePath.fileName().length() == fullPartitionPathStr.length()) { + return ""; + } + return fullPartitionPathStr.substring(partitionStartIndex + basePath.fileName().length() + 1); + } + + protected Optional> getPendingCompactionOperationWithInstant(HudiFileGroupId fgId) + { + return Optional.ofNullable(fgIdToPendingCompaction.get(fgId)); + } + + private void storePartitionView(String partitionPath, List fileGroups) + { + LOG.debug("Adding file-groups for partition :" + partitionPath + ", #FileGroups=" + fileGroups.size()); + List newList = ImmutableList.copyOf(fileGroups); + partitionToFileGroupsMap.put(partitionPath, newList); + } + + private Stream fetchLatestBaseFiles(final String partitionPath) + { + return fetchAllStoredFileGroups(partitionPath) + .filter(filGroup -> !isFileGroupReplaced(filGroup.getFileGroupId())) + .map(filGroup -> ImmutablePair.of(filGroup.getFileGroupId(), getLatestBaseFile(filGroup))) + .filter(pair -> pair.getValue().isPresent()) + .map(pair -> pair.getValue().get()); + } + + private Stream fetchAllStoredFileGroups(String partition) + { + final List fileGroups = ImmutableList.copyOf(partitionToFileGroupsMap.get(partition)); + return fileGroups.stream(); + } + + private boolean isFileGroupReplaced(HudiFileGroupId fileGroup) + { + return Optional.ofNullable(fgIdToReplaceInstants.get(fileGroup)).isPresent(); + } + + protected Optional getLatestBaseFile(HudiFileGroup fileGroup) + { + return fileGroup.getAllBaseFiles() + .filter(hudiBaseFile -> !isBaseFileDueToPendingCompaction(hudiBaseFile) && !isBaseFileDueToPendingClustering(hudiBaseFile)) + .findFirst(); + } + + private boolean isBaseFileDueToPendingCompaction(HudiBaseFile baseFile) + { + final String partitionPath = getPartitionPathFor(baseFile); + + Optional> compactionWithInstantTime = + getPendingCompactionOperationWithInstant(new HudiFileGroupId(partitionPath, baseFile.getFileId())); + return (compactionWithInstantTime.isPresent()) && (null != compactionWithInstantTime.get().getKey()) + && baseFile.getCommitTime().equals(compactionWithInstantTime.get().getKey()); + } + + private boolean isBaseFileDueToPendingClustering(HudiBaseFile baseFile) + { + List pendingReplaceInstants = metaClient.getActiveTimeline() + .filterPendingReplaceTimeline() + .getInstants() + .map(HudiInstant::getTimestamp) + .collect(toImmutableList()); + + return !pendingReplaceInstants.isEmpty() && pendingReplaceInstants.contains(baseFile.getCommitTime()); + } + + public boolean isClosed() + { + return closed; + } + + public void close() + { + this.fgIdToPendingCompaction = null; + this.partitionToFileGroupsMap = null; + this.fgIdToReplaceInstants = null; + closed = true; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java new file mode 100644 index 000000000000..40d2db6ea83e --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java @@ -0,0 +1,207 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.table; + +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.plugin.hudi.config.HudiTableConfig; +import io.trino.plugin.hudi.model.HudiInstant; +import io.trino.plugin.hudi.model.HudiTableType; +import io.trino.plugin.hudi.timeline.HudiActiveTimeline; +import io.trino.plugin.hudi.timeline.HudiTimeline; +import io.trino.plugin.hudi.timeline.TimelineLayout; +import io.trino.plugin.hudi.timeline.TimelineLayoutVersion; +import io.trino.spi.TrinoException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_TABLE_TYPE; +import static io.trino.plugin.hudi.HudiUtil.isHudiTable; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class HudiTableMetaClient +{ + public static final String METAFOLDER_NAME = ".hoodie"; + public static final String SEPARATOR = "/"; + public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + SEPARATOR + ".aux"; + public static final String SCHEMA_FOLDER_NAME = ".schema"; + + private final Location metaPath; + private final Location basePath; + private HudiTableType tableType; + private TimelineLayoutVersion timelineLayoutVersion; + private HudiTableConfig tableConfig; + private HudiActiveTimeline activeTimeline; + private TrinoFileSystem fileSystem; + + protected HudiTableMetaClient( + TrinoFileSystem fileSystem, + Location basePath, + Optional layoutVersion) + { + this.metaPath = requireNonNull(basePath, "basePath is null").appendPath(METAFOLDER_NAME); + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + checkArgument(isHudiTable(fileSystem, basePath), "Could not check if %s is a valid table", basePath); + this.basePath = basePath; + + this.tableConfig = new HudiTableConfig(fileSystem, metaPath); + this.tableType = tableConfig.getTableType(); + // TODO: Migrate Timeline objects + Optional tableConfigVersion = tableConfig.getTimelineLayoutVersion(); + if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) { + // Ensure layout version passed in config is not lower than the one seen in hoodie.properties + checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get()) >= 0, + "Layout Version defined in hoodie properties has higher version (%s) than the one passed in config (%s)", + tableConfigVersion.get(), + layoutVersion.get()); + } + this.timelineLayoutVersion = layoutVersion.orElseGet(() -> tableConfig.getTimelineLayoutVersion().orElseThrow()); + } + + public HudiTableConfig getTableConfig() + { + return tableConfig; + } + + public HudiTableType getTableType() + { + return tableType; + } + + public HudiTimeline getCommitsTimeline() + { + return switch (this.getTableType()) { + case COPY_ON_WRITE -> getActiveTimeline().getCommitTimeline(); + case MERGE_ON_READ -> + // We need to include the parquet files written out in delta commits + // Include commit action to be able to start doing a MOR over a COW table - no + // migration required + getActiveTimeline().getCommitsTimeline(); + default -> throw new TrinoException(HUDI_UNSUPPORTED_TABLE_TYPE, format("Unsupported table type : %s", this.getTableType())); + }; + } + + public synchronized HudiActiveTimeline getActiveTimeline() + { + if (activeTimeline == null) { + activeTimeline = new HudiActiveTimeline(this); + } + return activeTimeline; + } + + public TimelineLayoutVersion getTimelineLayoutVersion() + { + return timelineLayoutVersion; + } + + public Location getBasePath() + { + return basePath; + } + + public Location getMetaPath() + { + return metaPath; + } + + public TrinoFileSystem getFileSystem() + { + return fileSystem; + } + + public String getMetaAuxiliaryPath() + { + return basePath + SEPARATOR + AUXILIARYFOLDER_NAME; + } + + public String getSchemaFolderName() + { + return metaPath.appendPath(SCHEMA_FOLDER_NAME).path(); + } + + private static HudiTableMetaClient newMetaClient( + TrinoFileSystem fileSystem, + Location basePath) + { + return new HudiTableMetaClient(fileSystem, basePath, Optional.of(TimelineLayoutVersion.CURRENT_LAYOUT_VERSION)); + } + + public List scanHoodieInstantsFromFileSystem(Set includedExtensions, + boolean applyLayoutVersionFilters) + throws IOException + { + Stream instantStream = scanFiles(location -> { + String extension = HudiInstant.getTimelineFileExtension(location.fileName()); + return includedExtensions.contains(extension); + }).stream().map(HudiInstant::new); + + if (applyLayoutVersionFilters) { + instantStream = TimelineLayout.getLayout(getTimelineLayoutVersion()).filterHoodieInstants(instantStream); + } + return instantStream.sorted().collect(Collectors.toList()); + } + + private List scanFiles(Predicate pathPredicate) + throws IOException + { + FileIterator fileIterator = fileSystem.listFiles(metaPath); + List result = new ArrayList<>(); + while (fileIterator.hasNext()) { + FileEntry fileEntry = fileIterator.next(); + if (pathPredicate.test(fileEntry.location())) { + result.add(fileEntry); + } + } + return result; + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private TrinoFileSystem fileSystem; + private Location basePath; + + public Builder setTrinoFileSystem(TrinoFileSystem fileSystem) + { + this.fileSystem = fileSystem; + return this; + } + + public Builder setBasePath(Location basePath) + { + this.basePath = basePath; + return this; + } + + public HudiTableMetaClient build() + { + return newMetaClient(fileSystem, basePath); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiActiveTimeline.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiActiveTimeline.java new file mode 100644 index 000000000000..387d16332475 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiActiveTimeline.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.timeline; + +import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInputStream; +import io.trino.plugin.hudi.model.HudiInstant; +import io.trino.plugin.hudi.table.HudiTableMetaClient; +import io.trino.spi.TrinoException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Optional; +import java.util.Set; + +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA; + +public class HudiActiveTimeline + extends HudiDefaultTimeline +{ + private static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = ImmutableSet.of( + COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, + DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, + SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, + CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, + INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, + REQUESTED_RESTORE_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION, + ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION, + REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION, + REQUESTED_INDEX_COMMIT_EXTENSION, INFLIGHT_INDEX_COMMIT_EXTENSION, INDEX_COMMIT_EXTENSION, + REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION, INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION, SAVE_SCHEMA_ACTION_EXTENSION); + + private HudiTableMetaClient metaClient; + + public HudiActiveTimeline(HudiTableMetaClient metaClient) + { + // Filter all the filter in the metapath and include only the extensions passed and + // convert them into HoodieInstant + try { + this.setInstants(metaClient.scanHoodieInstantsFromFileSystem(VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, true)); + } + catch (IOException e) { + throw new TrinoException(HUDI_BAD_DATA, "Failed to scan metadata", e); + } + this.metaClient = metaClient; + this.details = this::getInstantDetails; + } + + @Deprecated + public HudiActiveTimeline() + { + } + + @Override + public Optional getInstantDetails(HudiInstant instant) + { + Location detailLocation = getInstantFileNamePath(instant.getFileName()); + return readDataFromPath(detailLocation); + } + + //----------------------------------------------------------------- + // BEGIN - COMPACTION RELATED META-DATA MANAGEMENT. + //----------------------------------------------------------------- + + public Optional readCompactionPlanAsBytes(HudiInstant instant) + { + // Reading from auxiliary location first. In future release, we will cleanup compaction management + // to only write to timeline and skip auxiliary and this code will be able to handle it. + return readDataFromPath(Location.of(metaClient.getMetaAuxiliaryPath()).appendPath(instant.getFileName())); + } + + private Location getInstantFileNamePath(String fileName) + { + return Location.of(fileName.contains(SCHEMA_COMMIT_ACTION) ? metaClient.getSchemaFolderName() : metaClient.getMetaPath().path()).appendPath(fileName); + } + + private Optional readDataFromPath(Location detailPath) + { + try (TrinoInputStream inputStream = metaClient.getFileSystem().newInputFile(detailPath).newStream()) { + return Optional.of(readAsByteArray(inputStream)); + } + catch (IOException e) { + throw new TrinoException(HUDI_BAD_DATA, "Could not read commit details from " + detailPath, e); + } + } + + private static byte[] readAsByteArray(InputStream input) + throws IOException + { + ByteArrayOutputStream bos = new ByteArrayOutputStream(128); + copy(input, bos); + return bos.toByteArray(); + } + + private static void copy(InputStream inputStream, OutputStream outputStream) + throws IOException + { + byte[] buffer = new byte[1024]; + int len; + while ((len = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, len); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiDefaultTimeline.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiDefaultTimeline.java new file mode 100644 index 000000000000..09381a8a7f79 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiDefaultTimeline.java @@ -0,0 +1,192 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.timeline; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.hudi.model.HudiInstant; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.hudi.timeline.HudiTimeline.compareTimestamps; + +public class HudiDefaultTimeline + implements HudiTimeline +{ + private List instants; + protected transient Function> details; + + public HudiDefaultTimeline(Stream instants, Function> details) + { + this.details = details; + setInstants(instants.collect(Collectors.toList())); + } + + public void setInstants(List instants) + { + this.instants = ImmutableList.copyOf(instants); + } + + public HudiDefaultTimeline() + { + } + + @Override + public HudiTimeline filterCompletedInstants() + { + return new HudiDefaultTimeline(instants.stream().filter(HudiInstant::isCompleted), details); + } + + @Override + public HudiDefaultTimeline getWriteTimeline() + { + Set validActions = ImmutableSet.of(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION); + return new HudiDefaultTimeline( + instants.stream().filter(s -> validActions.contains(s.getAction())), + details); + } + + @Override + public HudiTimeline getCompletedReplaceTimeline() + { + return new HudiDefaultTimeline( + instants.stream() + .filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)) + .filter(HudiInstant::isCompleted), + details); + } + + @Override + public HudiTimeline filterPendingReplaceTimeline() + { + return new HudiDefaultTimeline( + instants.stream().filter(s -> s.getAction().equals(HudiTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), + details); + } + + @Override + public HudiTimeline filterPendingCompactionTimeline() + { + return new HudiDefaultTimeline( + instants.stream().filter(s -> s.getAction().equals(HudiTimeline.COMPACTION_ACTION) && !s.isCompleted()), + details); + } + + public HudiTimeline getCommitsTimeline() + { + return getTimelineOfActions(ImmutableSet.of(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION)); + } + + public HudiTimeline getCommitTimeline() + { + return getTimelineOfActions(ImmutableSet.of(COMMIT_ACTION, REPLACE_COMMIT_ACTION)); + } + + public HudiTimeline getTimelineOfActions(Set actions) + { + return new HudiDefaultTimeline( + getInstants().filter(s -> actions.contains(s.getAction())), + this::getInstantDetails); + } + + @Override + public boolean empty() + { + return instants.stream().findFirst().isEmpty(); + } + + @Override + public int countInstants() + { + return instants.size(); + } + + @Override + public Optional firstInstant() + { + return instants.stream().findFirst(); + } + + @Override + public Optional nthInstant(int n) + { + if (empty() || n >= countInstants()) { + return Optional.empty(); + } + return Optional.of(instants.get(n)); + } + + @Override + public Optional lastInstant() + { + return empty() ? Optional.empty() : nthInstant(countInstants() - 1); + } + + @Override + public boolean containsOrBeforeTimelineStarts(String instant) + { + return instants.stream().anyMatch(s -> s.getTimestamp().equals(instant)) || isBeforeTimelineStarts(instant); + } + + @Override + public Stream getInstants() + { + return instants.stream(); + } + + @Override + public boolean isBeforeTimelineStarts(String instant) + { + Optional firstNonSavepointCommit = getFirstNonSavepointCommit(); + return firstNonSavepointCommit.isPresent() + && compareTimestamps(instant, LESSER_THAN, firstNonSavepointCommit.get().getTimestamp()); + } + + @Override + public Optional getFirstNonSavepointCommit() + { + Optional firstCommit = firstInstant(); + Set savepointTimestamps = instants.stream() + .filter(entry -> entry.getAction().equals(SAVEPOINT_ACTION)) + .map(HudiInstant::getTimestamp) + .collect(toImmutableSet()); + Optional firstNonSavepointCommit = firstCommit; + if (!savepointTimestamps.isEmpty()) { + // There are chances that there could be holes in the timeline due to archival and savepoint interplay. + // So, the first non-savepoint commit is considered as beginning of the active timeline. + firstNonSavepointCommit = instants.stream() + .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp())) + .findFirst(); + } + return firstNonSavepointCommit; + } + + @Override + public Optional getInstantDetails(HudiInstant instant) + { + return details.apply(instant); + } + + @Override + public String toString() + { + return this.getClass().getName() + ": " + instants.stream().map(Object::toString).collect(Collectors.joining(",")); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiTimeline.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiTimeline.java new file mode 100644 index 000000000000..153ba99f03e1 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiTimeline.java @@ -0,0 +1,255 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.timeline; + +import io.trino.plugin.hudi.model.HudiInstant; +import io.trino.plugin.hudi.model.HudiInstant.State; + +import java.util.Optional; +import java.util.function.BiPredicate; +import java.util.stream.Stream; + +import static java.lang.String.join; + +public interface HudiTimeline +{ + String COMMIT_ACTION = "commit"; + String DELTA_COMMIT_ACTION = "deltacommit"; + String CLEAN_ACTION = "clean"; + String ROLLBACK_ACTION = "rollback"; + String SAVEPOINT_ACTION = "savepoint"; + String REPLACE_COMMIT_ACTION = "replacecommit"; + String INFLIGHT_EXTENSION = ".inflight"; + // With Async Compaction, compaction instant can be in 3 states : + // (compaction-requested), (compaction-inflight), (completed) + String COMPACTION_ACTION = "compaction"; + String REQUESTED_EXTENSION = ".requested"; + String RESTORE_ACTION = "restore"; + String INDEXING_ACTION = "indexing"; + // only for schema save + String SCHEMA_COMMIT_ACTION = "schemacommit"; + String COMMIT_EXTENSION = "." + COMMIT_ACTION; + String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; + String CLEAN_EXTENSION = "." + CLEAN_ACTION; + String ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION; + String SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION; + // this is to preserve backwards compatibility on commit in-flight filenames + String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; + String REQUESTED_COMMIT_EXTENSION = "." + COMMIT_ACTION + REQUESTED_EXTENSION; + String REQUESTED_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + REQUESTED_EXTENSION; + String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION; + String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_CLEAN_EXTENSION = "." + CLEAN_ACTION + REQUESTED_EXTENSION; + String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + REQUESTED_EXTENSION; + String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_COMPACTION_SUFFIX = join("", COMPACTION_ACTION, REQUESTED_EXTENSION); + String REQUESTED_COMPACTION_EXTENSION = join(".", REQUESTED_COMPACTION_SUFFIX); + String INFLIGHT_COMPACTION_EXTENSION = join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION); + String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION + REQUESTED_EXTENSION; + String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION; + String RESTORE_EXTENSION = "." + RESTORE_ACTION; + String INFLIGHT_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + REQUESTED_EXTENSION; + String REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION; + String INFLIGHT_INDEX_COMMIT_EXTENSION = "." + INDEXING_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_INDEX_COMMIT_EXTENSION = "." + INDEXING_ACTION + REQUESTED_EXTENSION; + String INDEX_COMMIT_EXTENSION = "." + INDEXING_ACTION; + String SAVE_SCHEMA_ACTION_EXTENSION = "." + SCHEMA_COMMIT_ACTION; + String INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION = "." + SCHEMA_COMMIT_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION = "." + SCHEMA_COMMIT_ACTION + REQUESTED_EXTENSION; + + HudiTimeline filterCompletedInstants(); + + HudiTimeline getWriteTimeline(); + + HudiTimeline getCompletedReplaceTimeline(); + + HudiTimeline filterPendingCompactionTimeline(); + + HudiTimeline filterPendingReplaceTimeline(); + + boolean empty(); + + int countInstants(); + + Optional firstInstant(); + + Optional nthInstant(int n); + + Optional lastInstant(); + + boolean containsOrBeforeTimelineStarts(String ts); + + Stream getInstants(); + + boolean isBeforeTimelineStarts(String ts); + + Optional getFirstNonSavepointCommit(); + + Optional getInstantDetails(HudiInstant instant); + + BiPredicate LESSER_THAN_OR_EQUALS = (commit1, commit2) -> commit1.compareTo(commit2) <= 0; + BiPredicate LESSER_THAN = (commit1, commit2) -> commit1.compareTo(commit2) < 0; + + static boolean compareTimestamps(String commit1, BiPredicate predicateToApply, String commit2) + { + return predicateToApply.test(commit1, commit2); + } + + static HudiInstant getCompactionRequestedInstant(final String timestamp) + { + return new HudiInstant(State.REQUESTED, COMPACTION_ACTION, timestamp); + } + + static String makeCommitFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.COMMIT_EXTENSION); + } + + static String makeInflightCommitFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.INFLIGHT_COMMIT_EXTENSION); + } + + static String makeRequestedCommitFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.REQUESTED_COMMIT_EXTENSION); + } + + static String makeCleanerFileName(String instant) + { + return join("", instant, HudiTimeline.CLEAN_EXTENSION); + } + + static String makeRequestedCleanerFileName(String instant) + { + return join("", instant, HudiTimeline.REQUESTED_CLEAN_EXTENSION); + } + + static String makeInflightCleanerFileName(String instant) + { + return join("", instant, HudiTimeline.INFLIGHT_CLEAN_EXTENSION); + } + + static String makeRollbackFileName(String instant) + { + return join("", instant, HudiTimeline.ROLLBACK_EXTENSION); + } + + static String makeRequestedRollbackFileName(String instant) + { + return join("", instant, HudiTimeline.REQUESTED_ROLLBACK_EXTENSION); + } + + static String makeRequestedRestoreFileName(String instant) + { + return join("", instant, HudiTimeline.REQUESTED_RESTORE_EXTENSION); + } + + static String makeInflightRollbackFileName(String instant) + { + return join("", instant, HudiTimeline.INFLIGHT_ROLLBACK_EXTENSION); + } + + static String makeInflightSavePointFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.INFLIGHT_SAVEPOINT_EXTENSION); + } + + static String makeSavePointFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.SAVEPOINT_EXTENSION); + } + + static String makeInflightDeltaFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); + } + + static String makeRequestedDeltaFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.REQUESTED_DELTA_COMMIT_EXTENSION); + } + + static String makeInflightCompactionFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.INFLIGHT_COMPACTION_EXTENSION); + } + + static String makeRequestedCompactionFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.REQUESTED_COMPACTION_EXTENSION); + } + + static String makeRestoreFileName(String instant) + { + return join("", instant, HudiTimeline.RESTORE_EXTENSION); + } + + static String makeInflightRestoreFileName(String instant) + { + return join("", instant, HudiTimeline.INFLIGHT_RESTORE_EXTENSION); + } + + static String makeReplaceFileName(String instant) + { + return join("", instant, HudiTimeline.REPLACE_COMMIT_EXTENSION); + } + + static String makeInflightReplaceFileName(String instant) + { + return join("", instant, HudiTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION); + } + + static String makeRequestedReplaceFileName(String instant) + { + return join("", instant, HudiTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION); + } + + static String makeDeltaFileName(String instantTime) + { + return instantTime + HudiTimeline.DELTA_COMMIT_EXTENSION; + } + + static String makeIndexCommitFileName(String instant) + { + return join("", instant, HudiTimeline.INDEX_COMMIT_EXTENSION); + } + + static String makeInflightIndexFileName(String instant) + { + return join("", instant, HudiTimeline.INFLIGHT_INDEX_COMMIT_EXTENSION); + } + + static String makeRequestedIndexFileName(String instant) + { + return join("", instant, HudiTimeline.REQUESTED_INDEX_COMMIT_EXTENSION); + } + + static String makeSchemaFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.SAVE_SCHEMA_ACTION_EXTENSION); + } + + static String makeInflightSchemaFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION); + } + + static String makeRequestSchemaFileName(String instantTime) + { + return join("", instantTime, HudiTimeline.REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/TimelineLayout.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/TimelineLayout.java new file mode 100644 index 000000000000..5120e773902f --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/TimelineLayout.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.timeline; + +import io.trino.plugin.hudi.model.HudiInstant; +import org.apache.commons.lang3.tuple.ImmutablePair; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class TimelineLayout +{ + private static final Map LAYOUT_MAP = new HashMap<>(); + + static { + LAYOUT_MAP.put(new TimelineLayoutVersion(TimelineLayoutVersion.VERSION_0), new TimelineLayout.TimelineLayoutV0()); + LAYOUT_MAP.put(new TimelineLayoutVersion(TimelineLayoutVersion.VERSION_1), new TimelineLayout.TimelineLayoutV1()); + } + + public static TimelineLayout getLayout(TimelineLayoutVersion version) + { + return LAYOUT_MAP.get(version); + } + + public abstract Stream filterHoodieInstants(Stream instantStream); + + private static class TimelineLayoutV0 + extends TimelineLayout + { + @Override + public Stream filterHoodieInstants(Stream instantStream) + { + return instantStream; + } + } + + private static class TimelineLayoutV1 + extends TimelineLayout + { + @Override + public Stream filterHoodieInstants(Stream instantStream) + { + return instantStream.collect(Collectors.groupingBy(instant -> ImmutablePair.of(instant.getTimestamp(), + HudiInstant.getComparableAction(instant.getAction())))) + .values() + .stream() + .map(hoodieInstants -> + hoodieInstants.stream().reduce((x, y) -> { + // Pick the one with the highest state + if (x.getState().compareTo(y.getState()) >= 0) { + return x; + } + return y; + }).get()); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/TimelineLayoutVersion.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/TimelineLayoutVersion.java new file mode 100644 index 000000000000..a6e95dd87c49 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/TimelineLayoutVersion.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.timeline; + +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; + +public class TimelineLayoutVersion + implements Comparable +{ + public static final Integer VERSION_0 = 0; // pre 0.5.1 version format + public static final Integer VERSION_1 = 1; // current version with no renames + + private static final Integer CURRENT_VERSION = VERSION_1; + public static final TimelineLayoutVersion CURRENT_LAYOUT_VERSION = new TimelineLayoutVersion(CURRENT_VERSION); + + private final Integer version; + + public TimelineLayoutVersion(Integer version) + { + checkArgument(version <= CURRENT_VERSION); + checkArgument(version >= VERSION_0); + this.version = version; + } + + public Integer getVersion() + { + return version; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TimelineLayoutVersion that = (TimelineLayoutVersion) o; + return Objects.equals(version, that.version); + } + + @Override + public int hashCode() + { + return Objects.hash(version); + } + + @Override + public int compareTo(TimelineLayoutVersion o) + { + return Integer.compare(version, o.version); + } + + @Override + public String toString() + { + return String.valueOf(version); + } +} diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java index f00491943ffe..0c2058118bde 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java @@ -167,7 +167,7 @@ public void testPartitionColumn() private static Path toPath(String path) { - // Remove leading 'file:' because $path column returns 'file:/path-to-file' in case of local file system + // Remove leading 'file:' because path column returns 'file:/path-to-file' in case of local file system return Path.of(path.replaceFirst("^file:", "")); } }