Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions docs/src/main/sphinx/connector/hudi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ Additionally, following configuration properties can be set depending on the use
for structural data types. The equivalent catalog session property is
``parquet_optimized_nested_reader_enabled``.
- ``true``
* - ``hudi.min-partition-batch-size``
- Minimum number of partitions returned in a single batch.
- ``10``
* - ``hudi.max-partition-batch-size``
- Maximum number of partitions returned in a single batch.
- ``100``
* - ``hudi.split-generator-parallelism``
- Number of threads to generate splits from partitions.
- ``4``
* - ``hudi.split-loader-parallelism``
- Number of threads to run background split loader.
A single background split loader is needed per query.
- ``4``
* - ``hudi.size-based-split-weights-enabled``
- Unlike uniform splitting, size-based splitting ensures that each batch of splits
has enough data to process. By default, it is enabled to improve performance.
Expand All @@ -95,6 +96,10 @@ Additionally, following configuration properties can be set depending on the use
* - ``hudi.max-outstanding-splits``
- Maximum outstanding splits in a batch enqueued for processing.
- ``1000``
* - ``hudi.per-transaction-metastore-cache-maximum-size``
- Maximum number of metastore data objects per transaction in
the Hive metastore cache.
- ``2000``


SQL support
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hudi;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForHudiSplitSource {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.units.DataSize;

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand All @@ -32,20 +32,25 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.Locale.ENGLISH;

@DefunctConfig({
"hudi.min-partition-batch-size",
"hudi.max-partition-batch-size",
})
public class HudiConfig
{
private static final Splitter COMMA_SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults();

private List<String> columnsToHide = ImmutableList.of();
private boolean metadataEnabled;
private boolean shouldUseParquetColumnNames = true;
private int minPartitionBatchSize = 10;
private int maxPartitionBatchSize = 100;
private boolean sizeBasedSplitWeightsEnabled = true;
private DataSize standardSplitWeightSize = DataSize.of(128, MEGABYTE);
private double minimumAssignedSplitWeight = 0.05;
private int maxSplitsPerSecond = Integer.MAX_VALUE;
private int maxOutstandingSplits = 1000;
private int splitLoaderParallelism = 4;
private int splitGeneratorParallelism = 4;
private long perTransactionMetastoreCacheMaximumSize = 2000;

public List<String> getColumnsToHide()
{
Expand Down Expand Up @@ -90,36 +95,6 @@ public boolean getUseParquetColumnNames()
return this.shouldUseParquetColumnNames;
}

@Config("hudi.min-partition-batch-size")
@ConfigDescription("Minimum number of partitions returned in a single batch.")
public HudiConfig setMinPartitionBatchSize(int minPartitionBatchSize)
{
this.minPartitionBatchSize = minPartitionBatchSize;
return this;
}

@Min(1)
@Max(100)
public int getMinPartitionBatchSize()
{
return minPartitionBatchSize;
}

@Config("hudi.max-partition-batch-size")
@ConfigDescription("Maximum number of partitions returned in a single batch.")
public HudiConfig setMaxPartitionBatchSize(int maxPartitionBatchSize)
{
this.maxPartitionBatchSize = maxPartitionBatchSize;
return this;
}

@Min(1)
@Max(1000)
public int getMaxPartitionBatchSize()
{
return maxPartitionBatchSize;
}

@Config("hudi.size-based-split-weights-enabled")
@ConfigDescription("Unlike uniform splitting, size-based splitting ensures that each batch of splits has enough data to process. " +
"By default, it is enabled to improve performance.")
Expand Down Expand Up @@ -191,4 +166,45 @@ public HudiConfig setMaxOutstandingSplits(int maxOutstandingSplits)
this.maxOutstandingSplits = maxOutstandingSplits;
return this;
}

@Min(1)
public int getSplitGeneratorParallelism()
{
return splitGeneratorParallelism;
}

@Config("hudi.split-generator-parallelism")
@ConfigDescription("Number of threads to generate splits from partitions.")
public HudiConfig setSplitGeneratorParallelism(int splitGeneratorParallelism)
{
this.splitGeneratorParallelism = splitGeneratorParallelism;
return this;
}

@Min(1)
public int getSplitLoaderParallelism()
{
return splitLoaderParallelism;
}

@Config("hudi.split-loader-parallelism")
@ConfigDescription("Number of threads to run background split loader. A single background split loader is needed per query.")
public HudiConfig setSplitLoaderParallelism(int splitLoaderParallelism)
{
this.splitLoaderParallelism = splitLoaderParallelism;
return this;
}

@Min(1)
public long getPerTransactionMetastoreCacheMaximumSize()
{
return perTransactionMetastoreCacheMaximumSize;
}

@Config("hudi.per-transaction-metastore-cache-maximum-size")
public HudiConfig setPerTransactionMetastoreCacheMaximumSize(long perTransactionMetastoreCacheMaximumSize)
{
this.perTransactionMetastoreCacheMaximumSize = perTransactionMetastoreCacheMaximumSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,38 @@

import com.google.inject.Inject;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;

import java.util.Optional;

import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
import static java.util.Objects.requireNonNull;

public class HudiMetadataFactory
{
private final HiveMetastoreFactory metastoreFactory;
private final TrinoFileSystemFactory fileSystemFactory;
private final TypeManager typeManager;
private final long perTransactionMetastoreCacheMaximumSize;

@Inject
public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager)
public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager, HudiConfig hudiConfig)
{
this.metastoreFactory = requireNonNull(metastoreFactory, "metastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.perTransactionMetastoreCacheMaximumSize = hudiConfig.getPerTransactionMetastoreCacheMaximumSize();
}

public HudiMetadata create(ConnectorIdentity identity)
{
HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(identity));
return new HudiMetadata(metastore, fileSystemFactory, typeManager);
// create per-transaction cache over hive metastore interface
CachingHiveMetastore cachingHiveMetastore = memoizeMetastore(
metastoreFactory.createMetastore(Optional.of(identity)),
perTransactionMetastoreCacheMaximumSize);
return new HudiMetadata(cachingHiveMetastore, fileSystemFactory, typeManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import io.trino.spi.security.ConnectorIdentity;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class HudiModule
Expand All @@ -64,22 +66,33 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);

binder.bind(HudiPartitionManager.class).in(Scopes.SINGLETON);
binder.bind(HudiMetadataFactory.class).in(Scopes.SINGLETON);

binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();
}

@ForHudiSplitManager
@Singleton
@Provides
@Singleton
@ForHudiSplitManager
public ExecutorService createExecutorService()
{
return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%d"));
return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%s"));
}

@Provides
@Singleton
@ForHudiSplitSource
public ScheduledExecutorService createSplitLoaderExecutor(HudiConfig hudiConfig)
{
return newScheduledThreadPool(
hudiConfig.getSplitLoaderParallelism(),
daemonThreadsNamed("hudi-split-loader-%s"));
}

@Provides
@Singleton
public BiFunction<ConnectorIdentity, HiveTransactionHandle, HiveMetastore> createHiveMetastoreGetter(HudiTransactionManager transactionManager)
{
return (identity, transactionHandle) ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hudi;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.type.TypeManager;

import java.util.List;
import java.util.stream.Collectors;

import static io.trino.plugin.hive.metastore.MetastoreUtil.computePartitionKeyFilter;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static java.util.Objects.requireNonNull;

public class HudiPartitionManager
{
private final TypeManager typeManager;

@Inject
public HudiPartitionManager(TypeManager typeManager)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

public List<String> getEffectivePartitions(HudiTableHandle tableHandle, HiveMetastore metastore)
{
Table table = metastore.getTable(tableHandle.getSchemaName(), tableHandle.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName()));
List<Column> partitionColumns = table.getPartitionColumns();
if (partitionColumns.isEmpty()) {
return ImmutableList.of("");
}

List<HiveColumnHandle> partitionColumnHandles = getPartitionKeyColumnHandles(table, typeManager);

return metastore.getPartitionNamesByFilter(
tableHandle.getSchemaName(),
tableHandle.getTableName(),
partitionColumns.stream().map(Column::getName).collect(Collectors.toList()),
computePartitionKeyFilter(partitionColumnHandles, tableHandle.getPartitionPredicates()))
.orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName()));
}
}
Loading