Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public enum HudiErrorCode
// HUDI_MISSING_DATA(3, EXTERNAL) is deprecated
HUDI_CANNOT_OPEN_SPLIT(4, EXTERNAL),
HUDI_UNSUPPORTED_FILE_FORMAT(5, EXTERNAL),
HUDI_CURSOR_ERROR(6, EXTERNAL);
HUDI_CURSOR_ERROR(6, EXTERNAL),
HUDI_FILESYSTEM_ERROR(7, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieTableType;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -58,6 +58,7 @@
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR;
import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide;
import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
Expand All @@ -70,7 +71,6 @@
import static org.apache.hudi.common.fs.FSUtils.getFs;
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.exception.TableNotFoundException.checkTableValidity;

public class HudiMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -121,11 +121,11 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
return getRawSystemTable(session, tableName)
return getRawSystemTable(tableName)
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName tableName)
private Optional<SystemTable> getRawSystemTable(SchemaTableName tableName)
{
HudiTableName name = HudiTableName.from(tableName.getTableName());
if (name.getTableType() == TableType.DATA) {
Expand All @@ -141,8 +141,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
break;
case TIMELINE:
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType());
Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(tableOptional.get().getStorage().getLocation()));
return Optional.of(new TimelineTable(configuration, systemTableName, tableOptional.get()));
return Optional.of(new TimelineTable(hdfsEnvironment, systemTableName, tableOptional.get()));
}
return Optional.empty();
}
Expand Down Expand Up @@ -228,14 +227,19 @@ HiveMetastore getMetastore()
private boolean isHudiTable(ConnectorSession session, Table table)
{
String basePath = table.getStorage().getLocation();
Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(basePath));
try {
checkTableValidity(getFs(basePath, configuration), new Path(basePath), new Path(basePath, METAFOLDER_NAME));
if (!getFs(basePath, hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(basePath))).getFileStatus(new Path(basePath, METAFOLDER_NAME)).isDirectory()) {
log.warn("Could not find Hudi table at path '%s'.", basePath);
return false;
}
}
catch (org.apache.hudi.exception.TableNotFoundException e) {
log.warn("Could not find Hudi table at path '%s'", basePath);
catch (IllegalArgumentException e) {
log.warn("Could not find Hudi table at path '%s'. Error: %s", basePath, e.getMessage());
return false;
}
catch (IOException e) {
throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Could not check if %s is a valid table", basePath), e);
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorPageSource;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -51,7 +50,7 @@ public HudiPageSource(
List<HiveColumnHandle> columnHandles,
Map<String, Block> partitionBlocks,
ConnectorPageSource dataPageSource,
Path path,
String path,
long fileSize,
long fileModifiedTime)
{
Expand All @@ -76,7 +75,7 @@ else if (column.getName().equals(PARTITION_COLUMN_NAME)) {
delegateIndexes[outputIndex] = -1;
}
else if (column.getName().equals(PATH_COLUMN_NAME)) {
prefilledBlocks[outputIndex] = nativeValueToBlock(PATH_TYPE, utf8Slice(path.toString()));
prefilledBlocks[outputIndex] = nativeValueToBlock(PATH_TYPE, utf8Slice(path));
delegateIndexes[outputIndex] = -1;
}
else if (column.getName().equals(FILE_SIZE_COLUMN_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.TypeSignature;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -146,8 +145,8 @@ public ConnectorPageSource createPageSource(
DynamicFilter dynamicFilter)
{
HudiSplit split = (HudiSplit) connectorSplit;
Path path = new Path(split.getPath());
HoodieFileFormat hudiFileFormat = getHudiFileFormat(path.toString());
String path = split.getPath();
HoodieFileFormat hudiFileFormat = getHudiFileFormat(path);
if (!HoodieFileFormat.PARQUET.equals(hudiFileFormat)) {
throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat));
}
Expand All @@ -161,7 +160,7 @@ public ConnectorPageSource createPageSource(
.filter(columnHandle -> !columnHandle.isPartitionKey() && !columnHandle.isHidden())
.collect(Collectors.toList());
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = fileSystem.newInputFile(path.toString(), split.getFileSize());
TrinoInputFile inputFile = fileSystem.newInputFile(path, split.getFileSize());
ConnectorPageSource dataPageSource = createPageSource(session, regularColumns, split, inputFile, dataSourceStats, options, timeZone);

return new HudiPageSource(
Expand All @@ -185,7 +184,7 @@ private static ConnectorPageSource createPageSource(
{
ParquetDataSource dataSource = null;
boolean useColumnNames = shouldUseParquetColumnNames(session);
Path path = new Path(hudiSplit.getPath());
String path = hudiSplit.getPath();
long start = hudiSplit.getStart();
long length = hudiSplit.getLength();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hudi;

import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
import io.trino.plugin.hive.HiveColumnHandle;
Expand All @@ -29,7 +28,6 @@
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.hadoop.fs.Path;

import javax.annotation.PreDestroy;
import javax.inject.Inject;
Expand Down Expand Up @@ -97,7 +95,7 @@ public ConnectorSplitSource getSplits(
metastore,
table,
hudiTableHandle,
hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(table.getStorage().getLocation())),
hdfsEnvironment,
partitionColumnHandles,
executor,
maxSplitsPerSecond,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.google.common.util.concurrent.Futures;
import io.airlift.units.DataSize;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.Table;
Expand All @@ -29,7 +31,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
Expand Down Expand Up @@ -62,15 +64,15 @@ public HudiSplitSource(
HiveMetastore metastore,
Table table,
HudiTableHandle tableHandle,
Configuration configuration,
HdfsEnvironment hdfsEnvironment,
Map<String, HiveColumnHandle> partitionColumnHandleMap,
ExecutorService executor,
int maxSplitsPerSecond,
int maxOutstandingSplits)
{
boolean metadataEnabled = isHudiMetadataEnabled(session);
HoodieTableMetaClient metaClient = buildTableMetaClient(configuration, tableHandle.getBasePath());
HoodieEngineContext engineContext = new HoodieLocalEngineContext(configuration);
HoodieTableMetaClient metaClient = buildTableMetaClient(hdfsEnvironment, session, tableHandle.getBasePath());
HoodieEngineContext engineContext = new HoodieLocalEngineContext(hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(tableHandle.getBasePath())));
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(metadataEnabled)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HivePartition;
import io.trino.plugin.hive.HivePartitionKey;
import io.trino.plugin.hive.HivePartitionManager;
import io.trino.plugin.hive.metastore.Column;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand Down Expand Up @@ -149,9 +152,9 @@ public static List<HivePartitionKey> buildPartitionKeys(List<Column> keys, List<
return partitionKeys.build();
}

public static HoodieTableMetaClient buildTableMetaClient(Configuration configuration, String basePath)
public static HoodieTableMetaClient buildTableMetaClient(HdfsEnvironment hdfsEnvironment, ConnectorSession session, String basePath)
{
HoodieTableMetaClient client = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(basePath).build();
HoodieTableMetaClient client = HoodieTableMetaClient.builder().setConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(basePath))).setBasePath(basePath).build();
// Do not load the bootstrap index, will not read bootstrap base data or a mapping index defined
client.getTableConfig().setValue("hoodie.bootstrap.index.enable", "false");
return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hudi;

import com.google.common.collect.ImmutableList;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -25,7 +26,6 @@
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;

Expand All @@ -43,10 +43,10 @@ public class TimelineTable
{
private final ConnectorTableMetadata tableMetadata;
private final List<Type> types;
private final Configuration configuration;
private final HdfsEnvironment hdfsEnvironment;
private final String location;

public TimelineTable(Configuration configuration, SchemaTableName tableName, Table hudiTable)
public TimelineTable(HdfsEnvironment hdfsEnvironment, SchemaTableName tableName, Table hudiTable)
{
this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
Expand All @@ -55,7 +55,7 @@ public TimelineTable(Configuration configuration, SchemaTableName tableName, Tab
.add(new ColumnMetadata("state", VARCHAR))
.build());
this.types = tableMetadata.getColumns().stream().map(ColumnMetadata::getType).collect(toImmutableList());
this.configuration = requireNonNull(configuration, "configuration is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.location = requireNonNull(hudiTable.getStorage().getLocation(), "location is null");
}

Expand All @@ -74,7 +74,7 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
HoodieTableMetaClient metaClient = buildTableMetaClient(configuration, location);
HoodieTableMetaClient metaClient = buildTableMetaClient(hdfsEnvironment, session, location);
Iterable<List<Object>> records = () -> metaClient.getCommitsTimeline().getInstants().map(this::getRecord).iterator();
return new InMemoryRecordSet(types, records).cursor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hudi.partition;

import io.trino.filesystem.Location;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HivePartitionKey;
import io.trino.plugin.hive.metastore.Column;
Expand All @@ -21,8 +22,6 @@
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.spi.predicate.TupleDomain;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieIOException;

import java.util.Collections;
Expand Down Expand Up @@ -108,12 +107,35 @@ public void loadPartitionInfo(Optional<Partition> partition)
if (partition.isEmpty()) {
throw new HoodieIOException(format("Cannot find partition in Hive Metastore: %s", hivePartitionName));
}
this.relativePartitionPath = FSUtils.getRelativePartitionPath(
new Path(table.getStorage().getLocation()),
new Path(partition.get().getStorage().getLocation()));
this.relativePartitionPath = getRelativePartitionPath(
Location.parse(table.getStorage().getLocation()),
Location.parse(partition.get().getStorage().getLocation()));
this.hivePartitionKeys = buildPartitionKeys(partitionColumns, partition.get().getValues());
}

/*
* Given a base partition and a partition path, return relative path of partition path to the base path.
* This is equivalent to org.apache.hudi.common.fs.FSUtils#getRelativePartitionPath
*/
private static String getRelativePartitionPath(Location baseLocation, Location fullPartitionLocation)
{
String basePath = baseLocation.path();
String fullPartitionPath = fullPartitionLocation.path();

if (!fullPartitionPath.startsWith(basePath)) {
throw new IllegalArgumentException("Partition path does not belong to base-path");
}

String baseLocationParent = baseLocation.parentDirectory().path();
String baseLocationName = baseLocation.fileName();
int partitionStartIndex = fullPartitionPath.indexOf(
baseLocationName,
baseLocationParent == null ? 0 : baseLocationParent.length());
// Partition-Path could be empty for non-partitioned tables
boolean isNonPartitionedTable = partitionStartIndex + baseLocationName.length() == fullPartitionPath.length();
return isNonPartitionedTable ? "" : fullPartitionPath.substring(partitionStartIndex + baseLocationName.length() + 1);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.plugin.hudi.HudiSplit;
import io.trino.plugin.hudi.HudiTableHandle;
import io.trino.spi.TrinoException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.hadoop.PathWithBootstrapFileStatus;

Expand Down Expand Up @@ -75,33 +74,27 @@ private List<FileSplit> createSplits(HudiFileStatus fileStatus)
throw new IOException("Not a file: " + fileStatus.path());
}

Path path = fileStatus.path();
long length = fileStatus.length();

if (length == 0) {
return ImmutableList.of(new FileSplit(path, 0, 0, new String[0]));
return ImmutableList.of(new FileSplit(fileStatus.path(), 0, 0, new String[0]));
}

if (!isSplitable(path)) {
return ImmutableList.of(new FileSplit(path, 0, length, (String[]) null));
if (fileStatus.path() instanceof PathWithBootstrapFileStatus) {
return ImmutableList.of(new FileSplit(fileStatus.path(), 0, length, (String[]) null));
}

ImmutableList.Builder<FileSplit> splits = ImmutableList.builder();
long splitSize = fileStatus.blockSize();

long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
splits.add(new FileSplit(path, length - bytesRemaining, splitSize, (String[]) null));
splits.add(new FileSplit(fileStatus.path(), length - bytesRemaining, splitSize, (String[]) null));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, (String[]) null));
splits.add(new FileSplit(fileStatus.path(), length - bytesRemaining, bytesRemaining, (String[]) null));
}
return splits.build();
}

private static boolean isSplitable(Path filename)
{
return !(filename instanceof PathWithBootstrapFileStatus);
}
}
Loading