diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java index 4c29f9886cc6..70bf49e7354e 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java @@ -28,7 +28,8 @@ public enum HudiErrorCode // HUDI_MISSING_DATA(3, EXTERNAL) is deprecated HUDI_CANNOT_OPEN_SPLIT(4, EXTERNAL), HUDI_UNSUPPORTED_FILE_FORMAT(5, EXTERNAL), - HUDI_CURSOR_ERROR(6, EXTERNAL); + HUDI_CURSOR_ERROR(6, EXTERNAL), + HUDI_FILESYSTEM_ERROR(7, EXTERNAL); private final ErrorCode errorCode; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index e6226712a3a1..78c483713a0a 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -39,10 +39,10 @@ import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieTableType; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -58,6 +58,7 @@ import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide; import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY; @@ -70,7 +71,6 @@ import static org.apache.hudi.common.fs.FSUtils.getFs; import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; -import static org.apache.hudi.exception.TableNotFoundException.checkTableValidity; public class HudiMetadata implements ConnectorMetadata @@ -121,11 +121,11 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName @Override public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) { - return getRawSystemTable(session, tableName) + return getRawSystemTable(tableName) .map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader())); } - private Optional getRawSystemTable(ConnectorSession session, SchemaTableName tableName) + private Optional getRawSystemTable(SchemaTableName tableName) { HudiTableName name = HudiTableName.from(tableName.getTableName()); if (name.getTableType() == TableType.DATA) { @@ -141,8 +141,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema break; case TIMELINE: SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType()); - Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(tableOptional.get().getStorage().getLocation())); - return Optional.of(new TimelineTable(configuration, systemTableName, tableOptional.get())); + return Optional.of(new TimelineTable(hdfsEnvironment, systemTableName, tableOptional.get())); } return Optional.empty(); } @@ -228,14 +227,19 @@ HiveMetastore getMetastore() private boolean isHudiTable(ConnectorSession session, Table table) { String basePath = table.getStorage().getLocation(); - Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(basePath)); try { - checkTableValidity(getFs(basePath, configuration), new Path(basePath), new Path(basePath, METAFOLDER_NAME)); + if (!getFs(basePath, hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(basePath))).getFileStatus(new Path(basePath, METAFOLDER_NAME)).isDirectory()) { + log.warn("Could not find Hudi table at path '%s'.", basePath); + return false; + } } - catch (org.apache.hudi.exception.TableNotFoundException e) { - log.warn("Could not find Hudi table at path '%s'", basePath); + catch (IllegalArgumentException e) { + log.warn("Could not find Hudi table at path '%s'. Error: %s", basePath, e.getMessage()); return false; } + catch (IOException e) { + throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Could not check if %s is a valid table", basePath), e); + } return true; } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java index 45db0165c471..9c2edec73309 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java @@ -18,7 +18,6 @@ import io.trino.spi.block.Block; import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.connector.ConnectorPageSource; -import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; @@ -51,7 +50,7 @@ public HudiPageSource( List columnHandles, Map partitionBlocks, ConnectorPageSource dataPageSource, - Path path, + String path, long fileSize, long fileModifiedTime) { @@ -76,7 +75,7 @@ else if (column.getName().equals(PARTITION_COLUMN_NAME)) { delegateIndexes[outputIndex] = -1; } else if (column.getName().equals(PATH_COLUMN_NAME)) { - prefilledBlocks[outputIndex] = nativeValueToBlock(PATH_TYPE, utf8Slice(path.toString())); + prefilledBlocks[outputIndex] = nativeValueToBlock(PATH_TYPE, utf8Slice(path)); delegateIndexes[outputIndex] = -1; } else if (column.getName().equals(FILE_SIZE_COLUMN_NAME)) { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index a1dcf5ebddaf..236b41855d8b 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -43,7 +43,6 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Decimals; import io.trino.spi.type.TypeSignature; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -146,8 +145,8 @@ public ConnectorPageSource createPageSource( DynamicFilter dynamicFilter) { HudiSplit split = (HudiSplit) connectorSplit; - Path path = new Path(split.getPath()); - HoodieFileFormat hudiFileFormat = getHudiFileFormat(path.toString()); + String path = split.getPath(); + HoodieFileFormat hudiFileFormat = getHudiFileFormat(path); if (!HoodieFileFormat.PARQUET.equals(hudiFileFormat)) { throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat)); } @@ -161,7 +160,7 @@ public ConnectorPageSource createPageSource( .filter(columnHandle -> !columnHandle.isPartitionKey() && !columnHandle.isHidden()) .collect(Collectors.toList()); TrinoFileSystem fileSystem = fileSystemFactory.create(session); - TrinoInputFile inputFile = fileSystem.newInputFile(path.toString(), split.getFileSize()); + TrinoInputFile inputFile = fileSystem.newInputFile(path, split.getFileSize()); ConnectorPageSource dataPageSource = createPageSource(session, regularColumns, split, inputFile, dataSourceStats, options, timeZone); return new HudiPageSource( @@ -185,7 +184,7 @@ private static ConnectorPageSource createPageSource( { ParquetDataSource dataSource = null; boolean useColumnNames = shouldUseParquetColumnNames(session); - Path path = new Path(hudiSplit.getPath()); + String path = hudiSplit.getPath(); long start = hudiSplit.getStart(); long length = hudiSplit.getLength(); try { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index 34afd7e042f3..3b1006e2c3e4 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hudi; -import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.plugin.hive.HiveColumnHandle; @@ -29,7 +28,6 @@ import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.security.ConnectorIdentity; -import org.apache.hadoop.fs.Path; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -97,7 +95,7 @@ public ConnectorSplitSource getSplits( metastore, table, hudiTableHandle, - hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(table.getStorage().getLocation())), + hdfsEnvironment, partitionColumnHandles, executor, maxSplitsPerSecond, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index c2666a9287cd..6b367071c984 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -15,6 +15,8 @@ import com.google.common.util.concurrent.Futures; import io.airlift.units.DataSize; +import io.trino.hdfs.HdfsContext; +import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.Table; @@ -29,7 +31,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; @@ -62,15 +64,15 @@ public HudiSplitSource( HiveMetastore metastore, Table table, HudiTableHandle tableHandle, - Configuration configuration, + HdfsEnvironment hdfsEnvironment, Map partitionColumnHandleMap, ExecutorService executor, int maxSplitsPerSecond, int maxOutstandingSplits) { boolean metadataEnabled = isHudiMetadataEnabled(session); - HoodieTableMetaClient metaClient = buildTableMetaClient(configuration, tableHandle.getBasePath()); - HoodieEngineContext engineContext = new HoodieLocalEngineContext(configuration); + HoodieTableMetaClient metaClient = buildTableMetaClient(hdfsEnvironment, session, tableHandle.getBasePath()); + HoodieEngineContext engineContext = new HoodieLocalEngineContext(hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(tableHandle.getBasePath()))); HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() .enable(metadataEnabled) .build(); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java index 6d3962b65530..5f3ff7ab575d 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java @@ -15,6 +15,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.trino.hdfs.HdfsContext; +import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartition; import io.trino.plugin.hive.HivePartitionKey; @@ -22,12 +24,13 @@ import io.trino.plugin.hive.metastore.Column; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InputFormat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; @@ -149,9 +152,9 @@ public static List buildPartitionKeys(List keys, List< return partitionKeys.build(); } - public static HoodieTableMetaClient buildTableMetaClient(Configuration configuration, String basePath) + public static HoodieTableMetaClient buildTableMetaClient(HdfsEnvironment hdfsEnvironment, ConnectorSession session, String basePath) { - HoodieTableMetaClient client = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(basePath).build(); + HoodieTableMetaClient client = HoodieTableMetaClient.builder().setConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(basePath))).setBasePath(basePath).build(); // Do not load the bootstrap index, will not read bootstrap base data or a mapping index defined client.getTableConfig().setValue("hoodie.bootstrap.index.enable", "false"); return client; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java index 5d819c32bdbe..42a005a0a476 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java @@ -14,6 +14,7 @@ package io.trino.plugin.hudi; import com.google.common.collect.ImmutableList; +import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; @@ -25,7 +26,6 @@ import io.trino.spi.connector.SystemTable; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -43,10 +43,10 @@ public class TimelineTable { private final ConnectorTableMetadata tableMetadata; private final List types; - private final Configuration configuration; + private final HdfsEnvironment hdfsEnvironment; private final String location; - public TimelineTable(Configuration configuration, SchemaTableName tableName, Table hudiTable) + public TimelineTable(HdfsEnvironment hdfsEnvironment, SchemaTableName tableName, Table hudiTable) { this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), ImmutableList.builder() @@ -55,7 +55,7 @@ public TimelineTable(Configuration configuration, SchemaTableName tableName, Tab .add(new ColumnMetadata("state", VARCHAR)) .build()); this.types = tableMetadata.getColumns().stream().map(ColumnMetadata::getType).collect(toImmutableList()); - this.configuration = requireNonNull(configuration, "configuration is null"); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.location = requireNonNull(hudiTable.getStorage().getLocation(), "location is null"); } @@ -74,7 +74,7 @@ public ConnectorTableMetadata getTableMetadata() @Override public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { - HoodieTableMetaClient metaClient = buildTableMetaClient(configuration, location); + HoodieTableMetaClient metaClient = buildTableMetaClient(hdfsEnvironment, session, location); Iterable> records = () -> metaClient.getCommitsTimeline().getInstants().map(this::getRecord).iterator(); return new InMemoryRecordSet(types, records).cursor(); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java index ef410530af03..afa482d00b62 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hudi.partition; +import io.trino.filesystem.Location; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hive.metastore.Column; @@ -21,8 +22,6 @@ import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hive.util.HiveUtil; import io.trino.spi.predicate.TupleDomain; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.exception.HoodieIOException; import java.util.Collections; @@ -108,12 +107,35 @@ public void loadPartitionInfo(Optional partition) if (partition.isEmpty()) { throw new HoodieIOException(format("Cannot find partition in Hive Metastore: %s", hivePartitionName)); } - this.relativePartitionPath = FSUtils.getRelativePartitionPath( - new Path(table.getStorage().getLocation()), - new Path(partition.get().getStorage().getLocation())); + this.relativePartitionPath = getRelativePartitionPath( + Location.parse(table.getStorage().getLocation()), + Location.parse(partition.get().getStorage().getLocation())); this.hivePartitionKeys = buildPartitionKeys(partitionColumns, partition.get().getValues()); } + /* + * Given a base partition and a partition path, return relative path of partition path to the base path. + * This is equivalent to org.apache.hudi.common.fs.FSUtils#getRelativePartitionPath + */ + private static String getRelativePartitionPath(Location baseLocation, Location fullPartitionLocation) + { + String basePath = baseLocation.path(); + String fullPartitionPath = fullPartitionLocation.path(); + + if (!fullPartitionPath.startsWith(basePath)) { + throw new IllegalArgumentException("Partition path does not belong to base-path"); + } + + String baseLocationParent = baseLocation.parentDirectory().path(); + String baseLocationName = baseLocation.fileName(); + int partitionStartIndex = fullPartitionPath.indexOf( + baseLocationName, + baseLocationParent == null ? 0 : baseLocationParent.length()); + // Partition-Path could be empty for non-partitioned tables + boolean isNonPartitionedTable = partitionStartIndex + baseLocationName.length() == fullPartitionPath.length(); + return isNonPartitionedTable ? "" : fullPartitionPath.substring(partitionStartIndex + baseLocationName.length() + 1); + } + @Override public String toString() { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java index fece58e54d4f..94067323a344 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java @@ -19,7 +19,6 @@ import io.trino.plugin.hudi.HudiSplit; import io.trino.plugin.hudi.HudiTableHandle; import io.trino.spi.TrinoException; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.apache.hudi.hadoop.PathWithBootstrapFileStatus; @@ -75,15 +74,14 @@ private List createSplits(HudiFileStatus fileStatus) throw new IOException("Not a file: " + fileStatus.path()); } - Path path = fileStatus.path(); long length = fileStatus.length(); if (length == 0) { - return ImmutableList.of(new FileSplit(path, 0, 0, new String[0])); + return ImmutableList.of(new FileSplit(fileStatus.path(), 0, 0, new String[0])); } - if (!isSplitable(path)) { - return ImmutableList.of(new FileSplit(path, 0, length, (String[]) null)); + if (fileStatus.path() instanceof PathWithBootstrapFileStatus) { + return ImmutableList.of(new FileSplit(fileStatus.path(), 0, length, (String[]) null)); } ImmutableList.Builder splits = ImmutableList.builder(); @@ -91,17 +89,12 @@ private List createSplits(HudiFileStatus fileStatus) long bytesRemaining = length; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - splits.add(new FileSplit(path, length - bytesRemaining, splitSize, (String[]) null)); + splits.add(new FileSplit(fileStatus.path(), length - bytesRemaining, splitSize, (String[]) null)); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { - splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, (String[]) null)); + splits.add(new FileSplit(fileStatus.path(), length - bytesRemaining, bytesRemaining, (String[]) null)); } return splits.build(); } - - private static boolean isSplitable(Path filename) - { - return !(filename instanceof PathWithBootstrapFileStatus); - } } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/HudiQueryRunner.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/HudiQueryRunner.java index 4efc5bdfe7fd..ccf2846d06b9 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/HudiQueryRunner.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/HudiQueryRunner.java @@ -30,7 +30,7 @@ import java.util.Map; import java.util.Optional; -import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; import static io.trino.testing.TestingSession.testSessionBuilder; @@ -72,7 +72,7 @@ public static DistributedQueryRunner createHudiQueryRunner( queryRunner.createCatalog("hudi", "hudi", connectorProperties); String dataDir = coordinatorBaseDir.resolve("data").toString(); - dataLoader.initializeTables(queryRunner, metastore, SCHEMA_NAME, dataDir, newEmptyConfiguration()); + dataLoader.initializeTables(queryRunner, metastore, SCHEMA_NAME, dataDir, HDFS_ENVIRONMENT); return queryRunner; } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/S3HudiQueryRunner.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/S3HudiQueryRunner.java index f9cb8d117318..62b576d84630 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/S3HudiQueryRunner.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/S3HudiQueryRunner.java @@ -21,7 +21,6 @@ import io.trino.hdfs.DynamicHdfsConfiguration; import io.trino.hdfs.HdfsConfig; import io.trino.hdfs.HdfsConfigurationInitializer; -import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.SchemaAlreadyExistsException; @@ -36,15 +35,12 @@ import io.trino.spi.security.PrincipalType; import io.trino.testing.DistributedQueryRunner; import io.trino.tpch.TpchTable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import java.util.Map; import java.util.Optional; import static io.trino.plugin.hive.HiveTestUtils.SOCKS_PROXY; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; -import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.testing.TestingSession.testSessionBuilder; import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; @@ -53,7 +49,6 @@ public final class S3HudiQueryRunner { private static final String TPCH_SCHEMA = "tpch"; - private static final HdfsContext CONTEXT = new HdfsContext(SESSION); private S3HudiQueryRunner() {} @@ -66,7 +61,6 @@ public static DistributedQueryRunner create( { String basePath = "s3a://" + hiveMinioDataLake.getBucketName() + "/" + TPCH_SCHEMA; HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(hiveMinioDataLake); - Configuration configuration = hdfsEnvironment.getConfiguration(CONTEXT, new Path(basePath)); HiveMetastore metastore = new BridgingHiveMetastore( testingThriftHiveMetastoreBuilder() @@ -100,7 +94,7 @@ public static DistributedQueryRunner create( .putAll(connectorProperties) .buildOrThrow()); - dataLoader.initializeTables(queryRunner, metastore, TPCH_SCHEMA, basePath, configuration); + dataLoader.initializeTables(queryRunner, metastore, TPCH_SCHEMA, basePath, hdfsEnvironment); return queryRunner; } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/HudiTablesInitializer.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/HudiTablesInitializer.java index 2dfac921ecf7..770ed52f0406 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/HudiTablesInitializer.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/HudiTablesInitializer.java @@ -13,9 +13,9 @@ */ package io.trino.plugin.hudi.testing; +import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.testing.QueryRunner; -import org.apache.hadoop.conf.Configuration; public interface HudiTablesInitializer { @@ -24,6 +24,6 @@ void initializeTables( HiveMetastore metastore, String schemaName, String dataDir, - Configuration conf) + HdfsEnvironment hdfsEnvironment) throws Exception; } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java index 8d1cda74e287..2e8de4515e97 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; +import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveStorageFormat; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.PartitionStatistics; @@ -27,7 +28,6 @@ import io.trino.plugin.hive.metastore.StorageFormat; import io.trino.plugin.hive.metastore.Table; import io.trino.testing.QueryRunner; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hudi.common.model.HoodieTableType; @@ -65,7 +65,7 @@ public void initializeTables( HiveMetastore metastore, String schemaName, String dataDir, - Configuration conf) + HdfsEnvironment environment) throws Exception { Path basePath = Path.of(dataDir); diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java index 2a6c5f44b224..484042c0bfd5 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java @@ -16,6 +16,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; +import io.trino.hdfs.HdfsContext; +import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveStorageFormat; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.metastore.Column; @@ -35,6 +37,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex; @@ -72,6 +75,7 @@ import static io.trino.plugin.hive.HiveType.HIVE_LONG; import static io.trino.plugin.hive.HiveType.HIVE_STRING; import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; +import static io.trino.testing.TestingConnectorSession.SESSION; import static java.lang.String.format; import static java.util.Collections.unmodifiableList; import static java.util.Objects.requireNonNull; @@ -91,6 +95,7 @@ public class TpchHudiTablesInitializer new Column("_hoodie_record_key", HIVE_STRING, Optional.empty()), new Column("_hoodie_partition_path", HIVE_STRING, Optional.empty()), new Column("_hoodie_file_name", HIVE_STRING, Optional.empty())); + private static final HdfsContext CONTEXT = new HdfsContext(SESSION); private final HoodieTableType tableType; private final List> tpchTables; @@ -107,12 +112,12 @@ public void initializeTables( HiveMetastore metastore, String schemaName, String dataDir, - Configuration conf) + HdfsEnvironment hdfsEnvironment) { queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog(TPCH_TINY.getCatalogName(), "tpch", ImmutableMap.of()); for (TpchTable table : tpchTables) { - load(table, queryRunner, metastore, schemaName, dataDir, conf); + load(table, queryRunner, metastore, schemaName, dataDir, hdfsEnvironment); } } @@ -122,9 +127,9 @@ private void load( HiveMetastore metastore, String schemaName, String basePath, - Configuration conf) + HdfsEnvironment hdfsEnvironment) { - try (HoodieJavaWriteClient writeClient = createWriteClient(tpchTables, basePath, conf)) { + try (HoodieJavaWriteClient writeClient = createWriteClient(tpchTables, basePath, hdfsEnvironment)) { RecordConverter recordConverter = createRecordConverter(tpchTables); @Language("SQL") String sql = generateScanSql(TPCH_TINY, tpchTables); @@ -180,11 +185,12 @@ private Table createMetastoreTable(String schemaName, TpchTable table, String .build(); } - private HoodieJavaWriteClient createWriteClient(TpchTable table, String basePath, Configuration conf) + private HoodieJavaWriteClient createWriteClient(TpchTable table, String basePath, HdfsEnvironment hdfsEnvironment) { String tableName = table.getTableName(); String tablePath = getTablePath(table, basePath); Schema schema = createAvroSchema(table); + Configuration conf = hdfsEnvironment.getConfiguration(CONTEXT, new Path(tablePath)); try { HoodieTableMetaClient.withPropertyBuilder()