Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/trino-server/src/main/provisio/trino.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
</artifact>
</artifactSet>

<artifactSet to="plugin/hudi">
<artifact id="${project.groupId}:trino-hudi:zip:${project.version}">
<unpack />
</artifact>
</artifactSet>

<artifactSet to="plugin/iceberg">
<artifact id="${project.groupId}:trino-iceberg:zip:${project.version}">
<unpack />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ public FSDataInputStream open(Path f, int bufferSize)
return new InputStreamWrapper(getRawFileSystem().open(f, bufferSize), this);
}

@Override
public String getScheme()
{
return getRawFileSystem().getScheme();
}

@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@
import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression;
import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle;
import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing;
import static io.trino.plugin.hive.util.HiveUtil.columnExtraInfo;
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
Expand Down Expand Up @@ -3536,40 +3536,6 @@ else if (type instanceof RowType) {
}
}

private Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(Table table)
{
ImmutableList.Builder<String> columnNames = ImmutableList.builder();
table.getPartitionColumns().stream().map(Column::getName).forEach(columnNames::add);
table.getDataColumns().stream().map(Column::getName).forEach(columnNames::add);
List<String> allColumnNames = columnNames.build();
if (allColumnNames.size() > Sets.newHashSet(allColumnNames).size()) {
throw new TrinoException(HIVE_INVALID_METADATA,
format("Hive metadata for table %s is invalid: Table descriptor contains duplicate columns", table.getTableName()));
}

List<Column> tableColumns = table.getDataColumns();
ImmutableMap.Builder<String, Optional<String>> builder = ImmutableMap.builder();
for (Column field : concat(tableColumns, table.getPartitionColumns())) {
if (field.getComment().isPresent() && !field.getComment().get().equals("from deserializer")) {
builder.put(field.getName(), field.getComment());
}
else {
builder.put(field.getName(), Optional.empty());
}
}

Map<String, Optional<String>> columnComment = builder.buildOrThrow();

return handle -> ColumnMetadata.builder()
.setName(handle.getName())
.setType(handle.getType())
.setComment(handle.isHidden() ? Optional.empty() : columnComment.get(handle.getName()))
.setExtraInfo(Optional.ofNullable(columnExtraInfo(handle.isPartitionKey())))
.setHidden(handle.isHidden())
.setProperties(partitionProjectionService.getPartitionProjectionTrinoColumnProperties(table, handle.getName()))
.build();
}

@Override
public void rollback()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,7 @@ public static ReaderPageSource createPageSource(
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
fileSchema = fileMetaData.getSchema();

Optional<MessageType> message = projectSufficientColumns(columns)
.map(projection -> projection.get().stream()
.map(HiveColumnHandle.class::cast)
.collect(toUnmodifiableList()))
.orElse(columns).stream()
.filter(column -> column.getColumnType() == REGULAR)
.map(column -> getColumnType(column, fileSchema, useColumnNames))
.filter(Optional::isPresent)
.map(Optional::get)
.map(type -> new MessageType(fileSchema.getName(), type))
.reduce(MessageType::union);
Optional<MessageType> message = getParquetMessageType(columns, useColumnNames, fileSchema);

requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of()));
messageColumn = getColumnIO(fileSchema, requestedSchema);
Expand Down Expand Up @@ -301,6 +291,22 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
}
}

public static Optional<MessageType> getParquetMessageType(List<HiveColumnHandle> columns, boolean useColumnNames, MessageType fileSchema)
{
Optional<MessageType> message = projectSufficientColumns(columns)
.map(projection -> projection.get().stream()
.map(HiveColumnHandle.class::cast)
.collect(toUnmodifiableList()))
.orElse(columns).stream()
.filter(column -> column.getColumnType() == REGULAR)
.map(column -> getColumnType(column, fileSchema, useColumnNames))
.filter(Optional::isPresent)
.map(Optional::get)
.map(type -> new MessageType(fileSchema.getName(), type))
.reduce(MessageType::union);
return message;
}

public static Optional<org.apache.parquet.schema.Type> getParquetType(GroupType groupType, boolean useParquetColumnNames, HiveColumnHandle column)
{
if (useParquetColumnNames) {
Expand Down Expand Up @@ -341,7 +347,7 @@ public static Optional<org.apache.parquet.schema.Type> getColumnType(HiveColumnH
return Optional.of(new GroupType(baseType.getRepetition(), baseType.getName(), ImmutableList.of(type)));
}

private static Optional<ColumnIndexStore> getColumnIndexStore(
public static Optional<ColumnIndexStore> getColumnIndexStore(
ParquetDataSource dataSource,
BlockMetaData blockMetadata,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand Down Expand Up @@ -416,7 +422,7 @@ public static TupleDomain<ColumnDescriptor> getParquetTupleDomain(
return TupleDomain.withColumnDomains(predicate.buildOrThrow());
}

private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
public static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
return getParquetTypeByName(column.getBaseColumnName(), messageType);
Expand All @@ -428,7 +434,7 @@ private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle co
return null;
}

private static List<ParquetReaderColumn> createParquetReaderColumns(List<HiveColumnHandle> baseColumns, MessageType fileSchema, MessageColumnIO messageColumn, boolean useColumnNames)
public static List<ParquetReaderColumn> createParquetReaderColumns(List<HiveColumnHandle> baseColumns, MessageType fileSchema, MessageColumnIO messageColumn, boolean useColumnNames)
{
for (HiveColumnHandle column : baseColumns) {
checkArgument(column == PARQUET_ROW_INDEX_COLUMN || column.getColumnType() == REGULAR, "column type must be REGULAR: %s", column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ private void closeSuper()
super.close();
}

@Override
public String getScheme()
{
return uri.getScheme();
}

@Override
public URI getUri()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import com.google.common.base.Splitter;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.compress.lzo.LzoCodec;
import io.airlift.compress.lzo.LzopCodec;
import io.airlift.slice.Slice;
Expand All @@ -36,6 +38,7 @@
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.CharType;
Expand Down Expand Up @@ -96,12 +99,14 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Lists.newArrayList;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.hdfs.ConfigurationUtils.copy;
Expand Down Expand Up @@ -1155,4 +1160,37 @@ public static boolean isSparkBucketedTable(Table table)
return table.getParameters().containsKey(SPARK_TABLE_PROVIDER_KEY)
&& table.getParameters().containsKey(SPARK_TABLE_BUCKET_NUMBER_KEY);
}

public static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(Table table)
{
ImmutableList.Builder<String> columnNames = ImmutableList.builder();
table.getPartitionColumns().stream().map(Column::getName).forEach(columnNames::add);
table.getDataColumns().stream().map(Column::getName).forEach(columnNames::add);
List<String> allColumnNames = columnNames.build();
if (allColumnNames.size() > Sets.newHashSet(allColumnNames).size()) {
throw new TrinoException(HIVE_INVALID_METADATA,
format("Hive metadata for table %s is invalid: Table descriptor contains duplicate columns", table.getTableName()));
}

List<Column> tableColumns = table.getDataColumns();
ImmutableMap.Builder<String, Optional<String>> builder = ImmutableMap.builder();
for (Column field : concat(tableColumns, table.getPartitionColumns())) {
if (field.getComment().isPresent() && !field.getComment().get().equals("from deserializer")) {
builder.put(field.getName(), field.getComment());
}
else {
builder.put(field.getName(), Optional.empty());
}
}

Map<String, Optional<String>> columnComment = builder.buildOrThrow();

return handle -> ColumnMetadata.builder()
.setName(handle.getName())
.setType(handle.getType())
.setComment(handle.isHidden() ? Optional.empty() : columnComment.get(handle.getName()))
.setExtraInfo(Optional.ofNullable(columnExtraInfo(handle.isPartitionKey())))
.setHidden(handle.isHidden())
.build();
}
}
Loading