From 2807e74121016922d3788dc8effb84aa9cceec1e Mon Sep 17 00:00:00 2001 From: yx-keith Date: Wed, 26 Apr 2023 21:04:33 +0800 Subject: [PATCH 1/4] support kerberos in hudi connector --- .../java/io/trino/plugin/hudi/HudiErrorCode.java | 3 ++- .../java/io/trino/plugin/hudi/HudiMetadata.java | 15 +++++++++++---- .../io/trino/plugin/hudi/HudiSplitManager.java | 3 ++- .../io/trino/plugin/hudi/HudiSplitSource.java | 7 +++++-- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java index 4d5686823665..f6d62c469c9f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java @@ -28,7 +28,8 @@ public enum HudiErrorCode HUDI_MISSING_DATA(3, EXTERNAL), HUDI_CANNOT_OPEN_SPLIT(4, EXTERNAL), HUDI_UNSUPPORTED_FILE_FORMAT(5, EXTERNAL), - HUDI_CURSOR_ERROR(6, EXTERNAL); + HUDI_CURSOR_ERROR(6, EXTERNAL), + HUDI_FILESYSTEM_ERROR(7, EXTERNAL); private final ErrorCode errorCode; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index bd2d83fa762b..a58099150951 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -37,10 +37,11 @@ import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieTableType; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -56,6 +57,7 @@ import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNKNOWN_TABLE_TYPE; import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide; import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY; @@ -65,7 +67,6 @@ import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; -import static org.apache.hudi.common.fs.FSUtils.getFs; import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.exception.TableNotFoundException.checkTableValidity; @@ -197,9 +198,15 @@ HiveMetastore getMetastore() private boolean isHudiTable(ConnectorSession session, Table table) { String basePath = table.getStorage().getLocation(); - Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(basePath)); + FileSystem fileSystem; try { - checkTableValidity(getFs(basePath, configuration), new Path(basePath), new Path(basePath, METAFOLDER_NAME)); + fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(basePath)); + } catch (IOException e) { + throw new TrinoException(HUDI_FILESYSTEM_ERROR, "Failed getting FileSystem: " + basePath, e); + } + + try { + checkTableValidity(fileSystem, new Path(basePath), new Path(basePath, METAFOLDER_NAME)); } catch (org.apache.hudi.exception.TableNotFoundException e) { log.warn("Could not find Hudi table at path '%s'", basePath); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index 34afd7e042f3..e2544e0b6472 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -101,7 +101,8 @@ public ConnectorSplitSource getSplits( partitionColumnHandles, executor, maxSplitsPerSecond, - maxOutstandingSplits); + maxOutstandingSplits, + hdfsEnvironment); return new ClassLoaderSafeConnectorSplitSource(splitSource, HudiSplitManager.class.getClassLoader()); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index 2726bbb741d0..dad0a2a7bb0f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -15,6 +15,7 @@ import com.google.common.util.concurrent.Futures; import io.airlift.units.DataSize; +import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.Table; @@ -65,10 +66,12 @@ public HudiSplitSource( Map partitionColumnHandleMap, ExecutorService executor, int maxSplitsPerSecond, - int maxOutstandingSplits) + int maxOutstandingSplits, + HdfsEnvironment hdfsEnvironment) { boolean metadataEnabled = isHudiMetadataEnabled(session); - HoodieTableMetaClient metaClient = buildTableMetaClient(configuration, tableHandle.getBasePath()); + HoodieTableMetaClient metaClient = + hdfsEnvironment.doAs(session.getIdentity(), () -> buildTableMetaClient(configuration, tableHandle.getBasePath())); HoodieEngineContext engineContext = new HoodieLocalEngineContext(configuration); HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() .enable(metadataEnabled) From 921ceb86ced1b2c7d0d63ad331ab070ed7e1c6a0 Mon Sep 17 00:00:00 2001 From: yx-keith Date: Fri, 28 Jul 2023 19:02:51 +0800 Subject: [PATCH 2/4] async and concurrently to process hudi table partitions in batches and create splits --- .../thrift/BridgingHiveMetastore.java | 7 +- .../plugin/hudi/ForHudiPartitionLoader.java | 34 ++++++ .../trino/plugin/hudi/ForHudiSplitLoader.java | 34 ++++++ .../java/io/trino/plugin/hudi/HudiConfig.java | 35 +++++- .../java/io/trino/plugin/hudi/HudiModule.java | 20 ++++ .../plugin/hudi/HudiSessionProperties.java | 22 ++++ .../trino/plugin/hudi/HudiSplitManager.java | 9 ++ .../io/trino/plugin/hudi/HudiSplitSource.java | 86 +++++++++++++- .../hudi/partition/HiveHudiPartitionInfo.java | 9 ++ .../hudi/partition/HudiPartitionInfo.java | 2 + .../partition/HudiPartitionInfoLoader.java | 106 +++++------------ .../hudi/query/HudiDirectoryLister.java | 2 +- .../HudiReadOptimizedDirectoryLister.java | 42 ++----- .../hudi/split/HudiBackgroundSplitLoader.java | 109 ++++++++++++------ 14 files changed, 358 insertions(+), 159 deletions(-) create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiPartitionLoader.java create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitLoader.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java index 4a9402b894b4..a8d806496f37 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java @@ -348,12 +348,13 @@ public Map> getPartitionsByNames(Table table, List, Partition> partitionValuesToPartitionMap = delegate.getPartitionsByNames(table.getDatabaseName(), table.getTableName(), partitionNames).stream() .map(partition -> fromMetastoreApiPartition(table, partition)) .collect(Collectors.toMap(Partition::getValues, identity())); - ImmutableMap.Builder> resultBuilder = ImmutableMap.builder(); + + ImmutableMap.Builder> partitionNameToPartitionMap = ImmutableMap.builder(); for (Map.Entry> entry : partitionNameToPartitionValuesMap.entrySet()) { Partition partition = partitionValuesToPartitionMap.get(entry.getValue()); - resultBuilder.put(entry.getKey(), Optional.ofNullable(partition)); + partitionNameToPartitionMap.put(entry.getKey(), Optional.ofNullable(partition)); } - return resultBuilder.buildOrThrow(); + return partitionNameToPartitionMap.buildOrThrow(); } private static Partition fromMetastoreApiPartition(Table table, org.apache.hadoop.hive.metastore.api.Partition partition) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiPartitionLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiPartitionLoader.java new file mode 100644 index 000000000000..74dbca779aa5 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiPartitionLoader.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi; + +import javax.inject.Qualifier; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * @author yaoxiao + * @version 1.0 + * @date 2023/7/17 14:26 + */ + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForHudiPartitionLoader {} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitLoader.java new file mode 100644 index 000000000000..4ef5fda5a15b --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitLoader.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi; + +import javax.inject.Qualifier; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * @author yaoxiao + * @version 1.0 + * @date 2023/7/17 14:26 + */ + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForHudiSplitLoader {} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java index ea323818bdf6..a9d2754df1ee 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java @@ -39,13 +39,16 @@ public class HudiConfig private List columnsToHide = ImmutableList.of(); private boolean metadataEnabled; private boolean shouldUseParquetColumnNames = true; - private int minPartitionBatchSize = 10; - private int maxPartitionBatchSize = 100; + private int minPartitionBatchSize = 100; + private int maxPartitionBatchSize = 500; private boolean sizeBasedSplitWeightsEnabled = true; private DataSize standardSplitWeightSize = DataSize.of(128, MEGABYTE); private double minimumAssignedSplitWeight = 0.05; private int maxSplitsPerSecond = Integer.MAX_VALUE; private int maxOutstandingSplits = 1000; + private int partitionLoaderParallelism = 5; + private int splitLoaderParallelism = 15; + public List getColumnsToHide() { @@ -191,4 +194,32 @@ public HudiConfig setMaxOutstandingSplits(int maxOutstandingSplits) this.maxOutstandingSplits = maxOutstandingSplits; return this; } + + @Min(1) + public int getPartitionLoaderParallelism() + { + return partitionLoaderParallelism; + } + + @Config("hudi.partition-loader-parallelism") + @ConfigDescription("hudi partition loader parallelism.") + public HudiConfig setPartitionLoaderParallelism(int partitionLoaderParallism) + { + this.partitionLoaderParallelism = partitionLoaderParallism; + return this; + } + + @Min(1) + public int getSplitLoaderParallelism() + { + return splitLoaderParallelism; + } + + @Config("hudi.split-loader-parallelism") + @ConfigDescription("hudi split loader parallelism.") + public HudiConfig setSplitLoaderParallelism(int splitLoaderParallelism) + { + this.splitLoaderParallelism = splitLoaderParallelism; + return this; + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java index 52bd1f7028c2..d5a2130d0549 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java @@ -35,12 +35,15 @@ import javax.inject.Singleton; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.configuration.ConfigBinder.configBinder; import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.weakref.jmx.guice.ExportBinder.newExporter; public class HudiModule @@ -79,6 +82,23 @@ public ExecutorService createExecutorService() return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%d")); } + @ForHudiPartitionLoader + @Singleton + @Provides + public ScheduledExecutorService createPartitionLoaderExecutorService(HudiConfig hudiConfig) + { + return newScheduledThreadPool(hudiConfig.getPartitionLoaderParallelism(), daemonThreadsNamed("hudi-partition-loader-%d")); + + } + + @ForHudiSplitLoader + @Singleton + @Provides + public ExecutorService createSplitLoaderExecutorService(HudiConfig hudiConfig) + { + return newFixedThreadPool(hudiConfig.getSplitLoaderParallelism(), daemonThreadsNamed("hudi-split-loader-%d")); + } + @Singleton @Provides public BiFunction createHiveMetastoreGetter(HudiTransactionManager transactionManager) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java index 92a1d90a6d73..045c3057e150 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java @@ -49,6 +49,8 @@ public class HudiSessionProperties private static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled"; private static final String STANDARD_SPLIT_WEIGHT_SIZE = "standard_split_weight_size"; private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; + private static final String PARTITION_INFO_LOADER_PARALLELISM = "partition_info_loader_parallelism"; + private static final String SPLIT_LOADER_PARALLELISM = "split_loader_parallelism"; private final List> sessionProperties; @@ -111,6 +113,16 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value)); } }, + false), + integerProperty( + PARTITION_INFO_LOADER_PARALLELISM, + "hudi partition info generator parallelism.", + hudiConfig.getPartitionLoaderParallelism(), + false), + integerProperty( + SPLIT_LOADER_PARALLELISM, + "hudi split generator parallelism.", + hudiConfig.getSplitLoaderParallelism(), false)); } @@ -165,4 +177,14 @@ public static double getMinimumAssignedSplitWeight(ConnectorSession session) { return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class); } + + public static int getPartitionInfoLoaderParallelism(ConnectorSession session) + { + return session.getProperty(PARTITION_INFO_LOADER_PARALLELISM, Integer.class); + } + + public static int getSplitLoaderParallelism(ConnectorSession session) + { + return session.getProperty(SPLIT_LOADER_PARALLELISM, Integer.class); + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index e2544e0b6472..f7c993732be3 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -50,6 +51,8 @@ public class HudiSplitManager private final BiFunction metastoreProvider; private final HdfsEnvironment hdfsEnvironment; private final ExecutorService executor; + private final ScheduledExecutorService partitionLoaderExecutor; + private final ExecutorService splitLoaderExecutor; private final int maxSplitsPerSecond; private final int maxOutstandingSplits; @@ -59,12 +62,16 @@ public HudiSplitManager( BiFunction metastoreProvider, HdfsEnvironment hdfsEnvironment, @ForHudiSplitManager ExecutorService executor, + @ForHudiPartitionLoader ScheduledExecutorService partitionLoaderExecutor, + @ForHudiSplitLoader ExecutorService splitLoaderExecutor, HudiConfig hudiConfig) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.metastoreProvider = requireNonNull(metastoreProvider, "metastoreProvider is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.executor = requireNonNull(executor, "executor is null"); + this.partitionLoaderExecutor = requireNonNull(partitionLoaderExecutor, "partitionLoaderExecutor is null"); + this.splitLoaderExecutor = requireNonNull(splitLoaderExecutor, "splitLoaderExecutor is null"); this.maxSplitsPerSecond = requireNonNull(hudiConfig, "hudiConfig is null").getMaxSplitsPerSecond(); this.maxOutstandingSplits = hudiConfig.getMaxOutstandingSplits(); } @@ -100,6 +107,8 @@ public ConnectorSplitSource getSplits( hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(table.getStorage().getLocation())), partitionColumnHandles, executor, + partitionLoaderExecutor, + splitLoaderExecutor, maxSplitsPerSecond, maxOutstandingSplits, hdfsEnvironment); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index dad0a2a7bb0f..197e76d287cb 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -17,10 +17,13 @@ import io.airlift.units.DataSize; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hive.util.ThrottledAsyncQueue; +import io.trino.plugin.hudi.partition.HudiPartitionInfo; import io.trino.plugin.hudi.query.HudiDirectoryLister; import io.trino.plugin.hudi.query.HudiReadOptimizedDirectoryLister; import io.trino.plugin.hudi.split.HudiBackgroundSplitLoader; @@ -30,6 +33,9 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.predicate.TupleDomain; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -38,16 +44,25 @@ import java.util.List; import java.util.Map; +import java.util.Deque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.toCompletableFuture; +import static io.trino.plugin.hudi.HudiSessionProperties.getMaxPartitionBatchSize; +import static io.trino.plugin.hudi.HudiSessionProperties.getMinPartitionBatchSize; import static io.trino.plugin.hudi.HudiSessionProperties.getMinimumAssignedSplitWeight; +import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled; import static io.trino.plugin.hudi.HudiSessionProperties.getStandardSplitWeightSize; import static io.trino.plugin.hudi.HudiSessionProperties.isHudiMetadataEnabled; -import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.util.stream.Collectors.toList; @@ -56,6 +71,10 @@ public class HudiSplitSource { private final AsyncQueue queue; private final AtomicReference trinoException = new AtomicReference<>(); + private List hivePartitionNames; + private final int minPartitionBatchSize; + private final int maxPartitionBatchSize; + private int currentBatchSize = -1; public HudiSplitSource( ConnectorSession session, @@ -65,6 +84,8 @@ public HudiSplitSource( Configuration configuration, Map partitionColumnHandleMap, ExecutorService executor, + ScheduledExecutorService partitionLoaderExecutor, + ExecutorService splitLoaderExecutor, int maxSplitsPerSecond, int maxOutstandingSplits, HdfsEnvironment hdfsEnvironment) @@ -76,9 +97,11 @@ public HudiSplitSource( HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() .enable(metadataEnabled) .build(); - List partitionColumnHandles = table.getPartitionColumns().stream() + List partitionColumns = table.getPartitionColumns(); + List partitionColumnHandles = partitionColumns.stream() .map(column -> partitionColumnHandleMap.get(column.getName())).collect(toList()); - + this.minPartitionBatchSize = getMinPartitionBatchSize(session); + this.maxPartitionBatchSize = getMaxPartitionBatchSize(session); HudiDirectoryLister hudiDirectoryLister = new HudiReadOptimizedDirectoryLister( metadataConfig, engineContext, @@ -86,16 +109,27 @@ public HudiSplitSource( metaClient, metastore, table, - partitionColumnHandles); + partitionColumnHandles, + partitionColumns); this.queue = new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor); + Deque> partitionNamesQueue = getPartitionNamesQueue(metastore, tableHandle, partitionColumns, partitionColumnHandles); + Deque partitionInfoQueue = new ConcurrentLinkedDeque<>(); HudiBackgroundSplitLoader splitLoader = new HudiBackgroundSplitLoader( session, tableHandle, hudiDirectoryLister, queue, - executor, + splitLoaderExecutor, createSplitWeightProvider(session), + partitionNamesQueue, + partitionInfoQueue, + partitionLoaderExecutor, + throwable -> { + trinoException.compareAndSet(null, new TrinoException(GENERIC_INTERNAL_ERROR, + "Failed to generator partitions info for " + table.getTableName(), throwable)); + queue.finish(); + }, throwable -> { trinoException.compareAndSet(null, new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to generate splits for " + table.getTableName(), throwable)); @@ -104,6 +138,48 @@ public HudiSplitSource( splitLoader.start(); } + private Deque> getPartitionNamesQueue(HiveMetastore hiveMetastore, HudiTableHandle tableHandle, List partitionColumns, List partitionColumnHandles) + { + TupleDomain partitionKeysFilter = MetastoreUtil.computePartitionKeyFilter(partitionColumnHandles, tableHandle.getPartitionPredicates()); + if (hivePartitionNames == null) { + hivePartitionNames = partitionColumns.isEmpty() + ? Collections.singletonList("") + : getPartitionNamesFromHiveMetastore(hiveMetastore, partitionColumns, tableHandle, partitionKeysFilter); + } + Deque> partitionNamesQueue = new ConcurrentLinkedDeque<>(); + Iterator iterator = hivePartitionNames.iterator(); + while(iterator.hasNext()) { + List paritionNamesBatch = new ArrayList<>(); + int batchSize = updateBatchSize(); + while (iterator.hasNext() && batchSize > 0) { + paritionNamesBatch.add(iterator.next()); + batchSize--; + } + partitionNamesQueue.offer(paritionNamesBatch); + } + return partitionNamesQueue; + } + + private List getPartitionNamesFromHiveMetastore(HiveMetastore hiveMetastore, List partitionColumns, HudiTableHandle tableHandle, TupleDomain partitionKeysFilter) + { + SchemaTableName tableName = tableHandle.getSchemaTableName(); + return hiveMetastore.getPartitionNamesByFilter( + tableName.getSchemaName(), + tableName.getTableName(), + partitionColumns.stream().map(Column::getName).collect(Collectors.toList()), + partitionKeysFilter).orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName())); + } + + private int updateBatchSize() + { + if (currentBatchSize <= 0) { + currentBatchSize = minPartitionBatchSize; + } else { + currentBatchSize = Math.min(currentBatchSize * 2, maxPartitionBatchSize); + } + return currentBatchSize; + } + @Override public CompletableFuture getNextBatch(int maxSize) { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java index cf3596092da6..7d88fb8e312a 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java @@ -96,6 +96,15 @@ public List getHivePartitionKeys() return hivePartitionKeys; } + @Override + public List getHivePartitionKeysFromPartition(Optional partition) + { + if(hivePartitionKeys == null) { + loadPartitionInfo(partition); + } + return hivePartitionKeys; + } + @Override public boolean doesMatchPredicates() { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfo.java index df71e9642735..8d17bcfcb3ec 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfo.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfo.java @@ -30,6 +30,8 @@ public interface HudiPartitionInfo List getHivePartitionKeys(); + List getHivePartitionKeysFromPartition(Optional partition); + boolean doesMatchPredicates(); String getComparingKey(); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java index bf501a777d3d..9b5bcbe656c6 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java @@ -13,110 +13,56 @@ */ package io.trino.plugin.hudi.partition; -import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hudi.query.HudiDirectoryLister; -import io.trino.spi.connector.ConnectorSession; -import org.apache.hudi.exception.HoodieIOException; -import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.stream.Collectors; -import static io.trino.plugin.hudi.HudiSessionProperties.getMaxPartitionBatchSize; -import static io.trino.plugin.hudi.HudiSessionProperties.getMinPartitionBatchSize; - public class HudiPartitionInfoLoader implements Runnable { private final HudiDirectoryLister hudiDirectoryLister; - private final int minPartitionBatchSize; - private final int maxPartitionBatchSize; - private final Deque partitionQueue; - private int currentBatchSize; + private final Deque partitionInfoQueue; + private final Deque> partitionNamesQueue; + private final Deque partitionLoadStatusQueue; public HudiPartitionInfoLoader( - ConnectorSession session, - HudiDirectoryLister hudiDirectoryLister) + Deque> partitionNamesQueues, + HudiDirectoryLister hudiDirectoryLister, + Deque partitionInfoQueue, + Deque partitionLoadStatusQueue) { + this.partitionNamesQueue = partitionNamesQueues; this.hudiDirectoryLister = hudiDirectoryLister; - this.partitionQueue = new ConcurrentLinkedDeque<>(); - this.minPartitionBatchSize = getMinPartitionBatchSize(session); - this.maxPartitionBatchSize = getMaxPartitionBatchSize(session); - this.currentBatchSize = -1; + this.partitionInfoQueue = partitionInfoQueue; + this.partitionLoadStatusQueue = partitionLoadStatusQueue; } @Override public void run() { - List hudiPartitionInfoList = hudiDirectoryLister.getPartitionsToScan().stream() - .sorted(Comparator.comparing(HudiPartitionInfo::getComparingKey)) - .collect(Collectors.toList()); - - // empty partitioned table - if (hudiPartitionInfoList.isEmpty()) { - return; - } - - // non-partitioned table - if (hudiPartitionInfoList.size() == 1 && hudiPartitionInfoList.get(0).getHivePartitionName().isEmpty()) { - partitionQueue.addAll(hudiPartitionInfoList); - return; - } - - boolean shouldUseHiveMetastore = hudiPartitionInfoList.get(0) instanceof HiveHudiPartitionInfo; - Iterator iterator = hudiPartitionInfoList.iterator(); - while (iterator.hasNext()) { - int batchSize = updateBatchSize(); - List partitionInfoBatch = new ArrayList<>(); - while (iterator.hasNext() && batchSize > 0) { - partitionInfoBatch.add(iterator.next()); - batchSize--; - } + while (!partitionNamesQueue.isEmpty()) { + List partitionNames = partitionNamesQueue.poll(); + if (partitionNames != null) { + List hudiPartitionInfoList = hudiDirectoryLister.getPartitionsToScan(partitionNames).stream() + .sorted(Comparator.comparing(HudiPartitionInfo::getComparingKey)) + .collect(Collectors.toList()); - if (!partitionInfoBatch.isEmpty()) { - if (shouldUseHiveMetastore) { - Map> partitions = hudiDirectoryLister.getPartitions(partitionInfoBatch.stream() - .map(HudiPartitionInfo::getHivePartitionName) - .collect(Collectors.toList())); - for (HudiPartitionInfo partitionInfo : partitionInfoBatch) { - String hivePartitionName = partitionInfo.getHivePartitionName(); - if (!partitions.containsKey(hivePartitionName)) { - throw new HoodieIOException("Partition does not exist: " + hivePartitionName); - } - partitionInfo.loadPartitionInfo(partitions.get(hivePartitionName)); - partitionQueue.add(partitionInfo); - } + // empty partitioned table + if (hudiPartitionInfoList.isEmpty()) { + return; } - else { - for (HudiPartitionInfo partitionInfo : partitionInfoBatch) { - partitionInfo.getHivePartitionKeys(); - partitionQueue.add(partitionInfo); - } + + // non-partitioned table + if (hudiPartitionInfoList.size() == 1 && hudiPartitionInfoList.get(0).getHivePartitionName().isEmpty()) { + partitionInfoQueue.addAll(hudiPartitionInfoList); + return; } + partitionInfoQueue.addAll(hudiPartitionInfoList); } } - } - - public Deque getPartitionQueue() - { - return partitionQueue; - } - - private int updateBatchSize() - { - if (currentBatchSize <= 0) { - currentBatchSize = minPartitionBatchSize; - } - else { - currentBatchSize *= 2; - currentBatchSize = Math.min(currentBatchSize, maxPartitionBatchSize); - } - return currentBatchSize; + partitionLoadStatusQueue.poll(); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java index 401e0f35e844..f88367f73c85 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java @@ -25,7 +25,7 @@ public interface HudiDirectoryLister extends Closeable { - List getPartitionsToScan(); + List getPartitionsToScan(List partitionNames); List listStatus(HudiPartitionInfo partitionInfo); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java index 92aad499ce63..af602740e9b5 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java @@ -16,15 +16,11 @@ import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hudi.HudiTableHandle; import io.trino.plugin.hudi.partition.HiveHudiPartitionInfo; import io.trino.plugin.hudi.partition.HudiPartitionInfo; -import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.TableNotFoundException; -import io.trino.spi.predicate.TupleDomain; import org.apache.hadoop.fs.FileStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -32,7 +28,6 @@ import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -47,14 +42,10 @@ public class HudiReadOptimizedDirectoryLister private final HudiTableHandle tableHandle; private final HiveMetastore hiveMetastore; private final Table hiveTable; - private final SchemaTableName tableName; private final List partitionColumnHandles; private final HoodieTableFileSystemView fileSystemView; - private final TupleDomain partitionKeysFilter; private final List partitionColumns; - private List hivePartitionNames; - public HudiReadOptimizedDirectoryLister( HoodieMetadataConfig metadataConfig, HoodieEngineContext engineContext, @@ -62,28 +53,23 @@ public HudiReadOptimizedDirectoryLister( HoodieTableMetaClient metaClient, HiveMetastore hiveMetastore, Table hiveTable, - List partitionColumnHandles) + List partitionColumnHandles, + List partitionColumns) { this.tableHandle = tableHandle; - this.tableName = tableHandle.getSchemaTableName(); this.hiveMetastore = hiveMetastore; this.hiveTable = hiveTable; this.partitionColumnHandles = partitionColumnHandles; this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, metadataConfig); - this.partitionKeysFilter = MetastoreUtil.computePartitionKeyFilter(partitionColumnHandles, tableHandle.getPartitionPredicates()); - this.partitionColumns = hiveTable.getPartitionColumns(); + this.partitionColumns = partitionColumns; } @Override - public List getPartitionsToScan() + public List getPartitionsToScan(List partitionNames) { - if (hivePartitionNames == null) { - hivePartitionNames = partitionColumns.isEmpty() - ? Collections.singletonList("") - : getPartitionNamesFromHiveMetastore(partitionKeysFilter); - } + Map> partitionNameToPartitionMap = getPartitions(partitionNames); - List allPartitionInfoList = hivePartitionNames.stream() + List allPartitionInfoList = partitionNames.stream() .map(hivePartitionName -> new HiveHudiPartitionInfo( hivePartitionName, partitionColumns, @@ -94,8 +80,13 @@ public List getPartitionsToScan() .collect(Collectors.toList()); return allPartitionInfoList.stream() - .filter(partitionInfo -> partitionInfo.getHivePartitionKeys().isEmpty() || partitionInfo.doesMatchPredicates()) + .filter(partitionInfo -> { + String partitionName = partitionInfo.getHivePartitionName(); + Optional optionalPartition = partitionNameToPartitionMap.get(partitionName); + return partitionInfo.getHivePartitionKeysFromPartition(optionalPartition).isEmpty() || partitionInfo.doesMatchPredicates(); + }) .collect(Collectors.toList()); + } @Override @@ -106,15 +97,6 @@ public List listStatus(HudiPartitionInfo partitionInfo) .collect(toImmutableList()); } - private List getPartitionNamesFromHiveMetastore(TupleDomain partitionKeysFilter) - { - return hiveMetastore.getPartitionNamesByFilter( - tableName.getSchemaName(), - tableName.getTableName(), - partitionColumns.stream().map(Column::getName).collect(Collectors.toList()), - partitionKeysFilter).orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName())); - } - @Override public Map> getPartitions(List partitionNames) { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index b9ca3cbe60d2..c2e569c71561 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -27,78 +27,111 @@ import io.trino.spi.connector.ConnectorSplit; import org.apache.hadoop.fs.FileStatus; -import java.util.Collection; +import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Consumer; -import java.util.stream.Collectors; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.trino.plugin.hudi.HudiSessionProperties.getPartitionInfoLoaderParallelism; +import static io.trino.plugin.hudi.HudiSessionProperties.getSplitLoaderParallelism; import static java.util.Objects.requireNonNull; public class HudiBackgroundSplitLoader { - private final ConnectorSession session; private final HudiDirectoryLister hudiDirectoryLister; private final AsyncQueue asyncQueue; - private final ExecutorService executor; - private final Consumer errorListener; + private final ExecutorService splitLoaderExecutor; + private final Consumer splitErrorListener; + private final Consumer partitionErrorListener; private final HudiSplitFactory hudiSplitFactory; + private final Deque> partitionNamesQueue; + private final Deque partitionInfoQueue; + private final ScheduledExecutorService partitionInfoLoaderExecutor; + private final Deque partitionInfoLoaderStatusQueue; + private final int partitionInfoLoaderNumThreads; + private final int splitLoaderNumThreads; + public HudiBackgroundSplitLoader( ConnectorSession session, HudiTableHandle tableHandle, HudiDirectoryLister hudiDirectoryLister, AsyncQueue asyncQueue, - ExecutorService executor, + ExecutorService splitLoaderExecutor, HudiSplitWeightProvider hudiSplitWeightProvider, - Consumer errorListener) + Deque> partitionNamesQueue, + Deque partitionInfoQueue, + ScheduledExecutorService partitionInfoLoaderExecutor, + Consumer partitionErrorListener, + Consumer splitErrorListener) { - this.session = requireNonNull(session, "session is null"); this.hudiDirectoryLister = requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null"); this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null"); - this.executor = requireNonNull(executor, "executor is null"); - this.errorListener = requireNonNull(errorListener, "errorListener is null"); + this.splitLoaderExecutor = requireNonNull(splitLoaderExecutor, "splitLoaderExecutor is null"); + this.partitionInfoLoaderExecutor = requireNonNull(partitionInfoLoaderExecutor, "partitionInfoLoaderExecutor is null"); + this.splitErrorListener = requireNonNull(splitErrorListener, "splitErrorListener is null"); + this.partitionErrorListener = requireNonNull(partitionErrorListener, "partitionErrorListener is null"); this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider); + this.partitionNamesQueue = requireNonNull(partitionNamesQueue, "partitionNamesQueue is null"); + this.partitionInfoQueue = requireNonNull(partitionInfoQueue, "partitionInfoDeque is null"); + this.partitionInfoLoaderStatusQueue = new ConcurrentLinkedDeque<>(); + this.partitionInfoLoaderNumThreads = getPartitionInfoLoaderParallelism(session); + this.splitLoaderNumThreads = getSplitLoaderParallelism(session); } + public void start() { - ListenableFuture> partitionsFuture = Futures.submit(this::loadPartitions, executor); - hookErrorListener(partitionsFuture); - - ListenableFuture splitFutures = Futures.transform( - partitionsFuture, - partitions -> { - List> futures = partitions.stream() - .map(partition -> Futures.submit(() -> loadSplits(partition), executor)) - .peek(this::hookErrorListener) - .collect(Collectors.toList()); - Futures.whenAllComplete(futures).run(asyncQueue::finish, directExecutor()); - return null; - }, - directExecutor()); - hookErrorListener(splitFutures); + List> splitFutures = new ArrayList<>(); + for (int i = 0; i < partitionInfoLoaderNumThreads; i++) { + partitionInfoLoaderStatusQueue.offer(true); + HudiPartitionInfoLoader partitionInfoLoader = new HudiPartitionInfoLoader(partitionNamesQueue, hudiDirectoryLister, partitionInfoQueue, partitionInfoLoaderStatusQueue); + hookPartitionErrorListener(Futures.submit(partitionInfoLoader, partitionInfoLoaderExecutor)); + } + + for (int i = 0; i < splitLoaderNumThreads; i++) { + ListenableFuture splitsFuture = Futures.submit(this::loadSplits, splitLoaderExecutor); + splitFutures.add(splitsFuture); + hookSplitErrorListener(splitsFuture); + } + + Futures.whenAllComplete(splitFutures).run(asyncQueue::finish, directExecutor()); } - private Collection loadPartitions() + + private void loadSplits() { - HudiPartitionInfoLoader partitionInfoLoader = new HudiPartitionInfoLoader(session, hudiDirectoryLister); - partitionInfoLoader.run(); - return partitionInfoLoader.getPartitionQueue(); + while (!partitionInfoLoaderStatusQueue.isEmpty() || !partitionInfoQueue.isEmpty()) { + HudiPartitionInfo partition = partitionInfoQueue.poll(); + if (partition != null) { + List partitionKeys = partition.getHivePartitionKeys(); + List partitionFiles = hudiDirectoryLister.listStatus(partition); + partitionFiles.stream() + .flatMap(fileStatus -> hudiSplitFactory.createSplits(partitionKeys, fileStatus)) + .map(asyncQueue::offer) + .forEachOrdered(MoreFutures::getFutureValue); + } + } } - private void loadSplits(HudiPartitionInfo partition) + private void hookPartitionErrorListener(ListenableFuture future) { - List partitionKeys = partition.getHivePartitionKeys(); - List partitionFiles = hudiDirectoryLister.listStatus(partition); - partitionFiles.stream() - .flatMap(fileStatus -> hudiSplitFactory.createSplits(partitionKeys, fileStatus)) - .map(asyncQueue::offer) - .forEachOrdered(MoreFutures::getFutureValue); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(T result) {} + + @Override + public void onFailure(Throwable t) { + partitionErrorListener.accept(t); + } + }, directExecutor()); } - private void hookErrorListener(ListenableFuture future) + private void hookSplitErrorListener(ListenableFuture future) { Futures.addCallback(future, new FutureCallback() { @@ -108,7 +141,7 @@ public void onSuccess(T result) {} @Override public void onFailure(Throwable t) { - errorListener.accept(t); + splitErrorListener.accept(t); } }, directExecutor()); } From 72ef8516d30ff941b32109841b018cad960ce224 Mon Sep 17 00:00:00 2001 From: yx-keith Date: Mon, 28 Aug 2023 17:42:37 +0800 Subject: [PATCH 3/4] hudi connector partition value bug fix --- .../src/main/java/io/trino/plugin/hudi/HudiUtil.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java index 51d61aa132e8..c1a65ad11e8f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java @@ -40,6 +40,7 @@ import java.util.Map; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; +import static io.trino.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION; import static io.trino.plugin.hive.util.HiveUtil.checkCondition; import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT; @@ -147,7 +148,9 @@ public static List buildPartitionKeys(List keys, List< for (int i = 0; i < keys.size(); i++) { String name = keys.get(i).getName(); String value = values.get(i); - partitionKeys.add(new HivePartitionKey(name, value)); + if (!value.equals(HIVE_DEFAULT_DYNAMIC_PARTITION)) { + partitionKeys.add(new HivePartitionKey(name, value)); + } } return partitionKeys.build(); } From 0486cae19457d0124a910dfd3e8c43c586704aaf Mon Sep 17 00:00:00 2001 From: yx-keith Date: Wed, 30 Aug 2023 17:13:57 +0800 Subject: [PATCH 4/4] optimize thread poll in load hudi partition and splits --- .../java/io/trino/plugin/hudi/HudiConfig.java | 2 +- .../java/io/trino/plugin/hudi/HudiModule.java | 20 ------------------- .../trino/plugin/hudi/HudiSplitManager.java | 9 --------- .../io/trino/plugin/hudi/HudiSplitSource.java | 15 ++++---------- .../partition/HudiPartitionInfoLoader.java | 3 +++ .../hudi/split/HudiBackgroundSplitLoader.java | 13 +++++------- 6 files changed, 13 insertions(+), 49 deletions(-) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java index a9d2754df1ee..7454c7df6a3e 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java @@ -47,7 +47,7 @@ public class HudiConfig private int maxSplitsPerSecond = Integer.MAX_VALUE; private int maxOutstandingSplits = 1000; private int partitionLoaderParallelism = 5; - private int splitLoaderParallelism = 15; + private int splitLoaderParallelism = 5; public List getColumnsToHide() diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java index d5a2130d0549..52bd1f7028c2 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java @@ -35,15 +35,12 @@ import javax.inject.Singleton; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.configuration.ConfigBinder.configBinder; import static java.util.concurrent.Executors.newCachedThreadPool; -import static java.util.concurrent.Executors.newScheduledThreadPool; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.weakref.jmx.guice.ExportBinder.newExporter; public class HudiModule @@ -82,23 +79,6 @@ public ExecutorService createExecutorService() return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%d")); } - @ForHudiPartitionLoader - @Singleton - @Provides - public ScheduledExecutorService createPartitionLoaderExecutorService(HudiConfig hudiConfig) - { - return newScheduledThreadPool(hudiConfig.getPartitionLoaderParallelism(), daemonThreadsNamed("hudi-partition-loader-%d")); - - } - - @ForHudiSplitLoader - @Singleton - @Provides - public ExecutorService createSplitLoaderExecutorService(HudiConfig hudiConfig) - { - return newFixedThreadPool(hudiConfig.getSplitLoaderParallelism(), daemonThreadsNamed("hudi-split-loader-%d")); - } - @Singleton @Provides public BiFunction createHiveMetastoreGetter(HudiTransactionManager transactionManager) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index f7c993732be3..e2544e0b6472 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -36,7 +36,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -51,8 +50,6 @@ public class HudiSplitManager private final BiFunction metastoreProvider; private final HdfsEnvironment hdfsEnvironment; private final ExecutorService executor; - private final ScheduledExecutorService partitionLoaderExecutor; - private final ExecutorService splitLoaderExecutor; private final int maxSplitsPerSecond; private final int maxOutstandingSplits; @@ -62,16 +59,12 @@ public HudiSplitManager( BiFunction metastoreProvider, HdfsEnvironment hdfsEnvironment, @ForHudiSplitManager ExecutorService executor, - @ForHudiPartitionLoader ScheduledExecutorService partitionLoaderExecutor, - @ForHudiSplitLoader ExecutorService splitLoaderExecutor, HudiConfig hudiConfig) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.metastoreProvider = requireNonNull(metastoreProvider, "metastoreProvider is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.executor = requireNonNull(executor, "executor is null"); - this.partitionLoaderExecutor = requireNonNull(partitionLoaderExecutor, "partitionLoaderExecutor is null"); - this.splitLoaderExecutor = requireNonNull(splitLoaderExecutor, "splitLoaderExecutor is null"); this.maxSplitsPerSecond = requireNonNull(hudiConfig, "hudiConfig is null").getMaxSplitsPerSecond(); this.maxOutstandingSplits = hudiConfig.getMaxOutstandingSplits(); } @@ -107,8 +100,6 @@ public ConnectorSplitSource getSplits( hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(table.getStorage().getLocation())), partitionColumnHandles, executor, - partitionLoaderExecutor, - splitLoaderExecutor, maxSplitsPerSecond, maxOutstandingSplits, hdfsEnvironment); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index 197e76d287cb..c7cd841bf218 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -14,6 +14,7 @@ package io.trino.plugin.hudi; import com.google.common.util.concurrent.Futures; +import io.airlift.concurrent.BoundedExecutor; import io.airlift.units.DataSize; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; @@ -51,18 +52,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.toCompletableFuture; -import static io.trino.plugin.hudi.HudiSessionProperties.getMaxPartitionBatchSize; -import static io.trino.plugin.hudi.HudiSessionProperties.getMinPartitionBatchSize; -import static io.trino.plugin.hudi.HudiSessionProperties.getMinimumAssignedSplitWeight; -import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled; -import static io.trino.plugin.hudi.HudiSessionProperties.getStandardSplitWeightSize; -import static io.trino.plugin.hudi.HudiSessionProperties.isHudiMetadataEnabled; +import static io.trino.plugin.hudi.HudiSessionProperties.*; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.util.stream.Collectors.toList; @@ -84,8 +79,6 @@ public HudiSplitSource( Configuration configuration, Map partitionColumnHandleMap, ExecutorService executor, - ScheduledExecutorService partitionLoaderExecutor, - ExecutorService splitLoaderExecutor, int maxSplitsPerSecond, int maxOutstandingSplits, HdfsEnvironment hdfsEnvironment) @@ -120,11 +113,11 @@ public HudiSplitSource( tableHandle, hudiDirectoryLister, queue, - splitLoaderExecutor, + new BoundedExecutor(executor, getSplitLoaderParallelism(session)), createSplitWeightProvider(session), partitionNamesQueue, partitionInfoQueue, - partitionLoaderExecutor, + new BoundedExecutor(executor, getPartitionInfoLoaderParallelism(session)), throwable -> { trinoException.compareAndSet(null, new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to generator partitions info for " + table.getTableName(), throwable)); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java index 9b5bcbe656c6..09897b71e98f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java @@ -43,6 +43,7 @@ public HudiPartitionInfoLoader( @Override public void run() { + partitionLoadStatusQueue.offer(true); while (!partitionNamesQueue.isEmpty()) { List partitionNames = partitionNamesQueue.poll(); if (partitionNames != null) { @@ -52,12 +53,14 @@ public void run() // empty partitioned table if (hudiPartitionInfoList.isEmpty()) { + partitionLoadStatusQueue.poll(); return; } // non-partitioned table if (hudiPartitionInfoList.size() == 1 && hudiPartitionInfoList.get(0).getHivePartitionName().isEmpty()) { partitionInfoQueue.addAll(hudiPartitionInfoList); + partitionLoadStatusQueue.poll(); return; } partitionInfoQueue.addAll(hudiPartitionInfoList); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index c2e569c71561..f3761d146adc 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -30,8 +30,7 @@ import java.util.ArrayList; import java.util.Deque; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Consumer; @@ -44,28 +43,27 @@ public class HudiBackgroundSplitLoader { private final HudiDirectoryLister hudiDirectoryLister; private final AsyncQueue asyncQueue; - private final ExecutorService splitLoaderExecutor; + private final Executor splitLoaderExecutor; private final Consumer splitErrorListener; private final Consumer partitionErrorListener; private final HudiSplitFactory hudiSplitFactory; private final Deque> partitionNamesQueue; private final Deque partitionInfoQueue; - private final ScheduledExecutorService partitionInfoLoaderExecutor; + private final Executor partitionInfoLoaderExecutor; private final Deque partitionInfoLoaderStatusQueue; private final int partitionInfoLoaderNumThreads; private final int splitLoaderNumThreads; - public HudiBackgroundSplitLoader( ConnectorSession session, HudiTableHandle tableHandle, HudiDirectoryLister hudiDirectoryLister, AsyncQueue asyncQueue, - ExecutorService splitLoaderExecutor, + Executor splitLoaderExecutor, HudiSplitWeightProvider hudiSplitWeightProvider, Deque> partitionNamesQueue, Deque partitionInfoQueue, - ScheduledExecutorService partitionInfoLoaderExecutor, + Executor partitionInfoLoaderExecutor, Consumer partitionErrorListener, Consumer splitErrorListener) { @@ -88,7 +86,6 @@ public void start() { List> splitFutures = new ArrayList<>(); for (int i = 0; i < partitionInfoLoaderNumThreads; i++) { - partitionInfoLoaderStatusQueue.offer(true); HudiPartitionInfoLoader partitionInfoLoader = new HudiPartitionInfoLoader(partitionNamesQueue, hudiDirectoryLister, partitionInfoQueue, partitionInfoLoaderStatusQueue); hookPartitionErrorListener(Futures.submit(partitionInfoLoader, partitionInfoLoaderExecutor)); }