-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Support changing column types in Hive connector #15938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a8ed176
63f66ac
3c15ec3
6e081f1
45629b4
79d32e3
ec89b8e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,9 @@ | |
| import io.trino.spi.block.Block; | ||
| import io.trino.spi.block.ByteArrayBlock; | ||
| import io.trino.spi.block.IntArrayBlock; | ||
| import io.trino.spi.block.LongArrayBlock; | ||
| import io.trino.spi.block.RunLengthEncodedBlock; | ||
| import io.trino.spi.block.ShortArrayBlock; | ||
| import io.trino.spi.type.Type; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
@@ -43,7 +45,9 @@ | |
| import static io.trino.orc.reader.ReaderUtils.minNonNullValueSize; | ||
| import static io.trino.orc.reader.ReaderUtils.verifyStreamType; | ||
| import static io.trino.orc.stream.MissingInputStreamSource.missingStreamSource; | ||
| import static io.trino.spi.type.BigintType.BIGINT; | ||
| import static io.trino.spi.type.IntegerType.INTEGER; | ||
| import static io.trino.spi.type.SmallintType.SMALLINT; | ||
| import static io.trino.spi.type.TinyintType.TINYINT; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
|
|
@@ -77,8 +81,7 @@ public ByteColumnReader(Type type, OrcColumn column, LocalMemoryContext memoryCo | |
| throws OrcCorruptionException | ||
| { | ||
| this.type = requireNonNull(type, "type is null"); | ||
| // Iceberg maps ORC tinyint type to integer | ||
| verifyStreamType(column, type, t -> t == TINYINT || t == INTEGER); | ||
| verifyStreamType(column, type, t -> t == TINYINT || t == SMALLINT || t == INTEGER || t == BIGINT); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i though the purpose of "Convert ORC block within ColumnAdaptation" was to have readers simplified, so surprised to see this change here. I think I like the non-composed (without column adaptations) approach better though, so no changes requested here. |
||
|
|
||
| this.column = requireNonNull(column, "column is null"); | ||
| this.memoryContext = requireNonNull(memoryContext, "memoryContext is null"); | ||
|
|
@@ -153,9 +156,15 @@ private Block readNonNullBlock() | |
| if (type == TINYINT) { | ||
| return new ByteArrayBlock(nextBatchSize, Optional.empty(), values); | ||
| } | ||
| if (type == SMALLINT) { | ||
| return new ShortArrayBlock(nextBatchSize, Optional.empty(), convertToShortArray(values)); | ||
| } | ||
| if (type == INTEGER) { | ||
| return new IntArrayBlock(nextBatchSize, Optional.empty(), convertToIntArray(values)); | ||
| } | ||
| if (type == BIGINT) { | ||
| return new LongArrayBlock(nextBatchSize, Optional.empty(), convertToLongArray(values)); | ||
| } | ||
| throw new VerifyError("Unsupported type " + type); | ||
| } | ||
|
|
||
|
|
@@ -175,9 +184,15 @@ private Block readNullBlock(boolean[] isNull, int nonNullCount) | |
| if (type == TINYINT) { | ||
| return new ByteArrayBlock(nextBatchSize, Optional.of(isNull), result); | ||
| } | ||
| if (type == SMALLINT) { | ||
| return new ShortArrayBlock(nextBatchSize, Optional.of(isNull), convertToShortArray(result)); | ||
| } | ||
| if (type == INTEGER) { | ||
| return new IntArrayBlock(nextBatchSize, Optional.of(isNull), convertToIntArray(result)); | ||
| } | ||
| if (type == BIGINT) { | ||
| return new LongArrayBlock(nextBatchSize, Optional.of(isNull), convertToLongArray(result)); | ||
| } | ||
| throw new VerifyError("Unsupported type " + type); | ||
| } | ||
|
|
||
|
|
@@ -220,6 +235,15 @@ public void startRowGroup(InputStreamSources dataStreamSources) | |
| rowGroupOpen = false; | ||
| } | ||
|
|
||
| private static short[] convertToShortArray(byte[] bytes) | ||
| { | ||
| short[] values = new short[bytes.length]; | ||
| for (int i = 0; i < bytes.length; i++) { | ||
| values[i] = bytes[i]; | ||
| } | ||
| return values; | ||
| } | ||
|
|
||
| private static int[] convertToIntArray(byte[] bytes) | ||
| { | ||
| int[] values = new int[bytes.length]; | ||
|
|
@@ -229,6 +253,15 @@ private static int[] convertToIntArray(byte[] bytes) | |
| return values; | ||
| } | ||
|
|
||
| private static long[] convertToLongArray(byte[] bytes) | ||
|
raunaqmorarka marked this conversation as resolved.
Outdated
|
||
| { | ||
| long[] values = new long[bytes.length]; | ||
| for (int i = 0; i < bytes.length; i++) { | ||
| values[i] = bytes[i]; | ||
| } | ||
| return values; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,6 @@ | |
| import static io.trino.orc.metadata.OrcType.OrcTypeKind.BINARY; | ||
| import static io.trino.orc.metadata.OrcType.OrcTypeKind.LONG; | ||
| import static io.trino.orc.reader.ReaderUtils.invalidStreamType; | ||
| import static io.trino.spi.type.IntegerType.INTEGER; | ||
| import static io.trino.spi.type.TimeType.TIME_MICROS; | ||
|
|
||
| public final class ColumnReaders | ||
|
|
@@ -65,9 +64,6 @@ public static ColumnReader createColumnReader( | |
| case BOOLEAN: | ||
| return new BooleanColumnReader(type, column, memoryContext.newLocalMemoryContext(ColumnReaders.class.getSimpleName())); | ||
| case BYTE: | ||
| if (type == INTEGER && !column.getAttributes().containsKey("iceberg.id")) { | ||
| throw invalidStreamType(column, type); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If I am understanding the change correctly, it could be titled "Fix reading ORC files after column evolved from tinyint to integer" Am i reading this right?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for my misleading commit tile. Updated it.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for changing the commit title. Is TestHiveTransactionalTable.java the only test coverage we have for this "Fix reading ORC files after column evolved from tinyint to integer" change? |
||
| } | ||
| return new ByteColumnReader(type, column, memoryContext.newLocalMemoryContext(ColumnReaders.class.getSimpleName())); | ||
| case SHORT: | ||
| case INT: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,10 @@ | |
| import com.google.common.collect.ImmutableMap; | ||
| import com.google.common.collect.ImmutableSet; | ||
| import com.google.common.collect.Iterables; | ||
| import com.google.common.collect.Lists; | ||
| import com.google.common.collect.Maps; | ||
| import com.google.common.collect.Sets; | ||
| import com.google.common.collect.Streams; | ||
| import io.airlift.json.JsonCodec; | ||
| import io.airlift.log.Logger; | ||
| import io.airlift.slice.Slice; | ||
|
|
@@ -119,11 +121,14 @@ | |
| import io.trino.spi.statistics.TableStatistics; | ||
| import io.trino.spi.statistics.TableStatisticsMetadata; | ||
| import io.trino.spi.type.ArrayType; | ||
| import io.trino.spi.type.CharType; | ||
| import io.trino.spi.type.DecimalType; | ||
| import io.trino.spi.type.MapType; | ||
| import io.trino.spi.type.RowType; | ||
| import io.trino.spi.type.TimestampType; | ||
| import io.trino.spi.type.Type; | ||
| import io.trino.spi.type.TypeManager; | ||
| import io.trino.spi.type.VarcharType; | ||
| import org.apache.avro.Schema; | ||
| import org.apache.avro.SchemaParseException; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
|
|
@@ -167,6 +172,7 @@ | |
| import static com.google.common.collect.ImmutableSet.toImmutableSet; | ||
| import static com.google.common.collect.Iterables.concat; | ||
| import static com.google.common.collect.Iterables.getOnlyElement; | ||
| import static com.google.common.collect.MoreCollectors.toOptional; | ||
| import static io.trino.hdfs.ConfigurationUtils.toJobConf; | ||
| import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; | ||
| import static io.trino.plugin.hive.HiveAnalyzeProperties.getColumnNames; | ||
|
|
@@ -326,6 +332,11 @@ | |
| import static io.trino.spi.predicate.TupleDomain.withColumnDomains; | ||
| import static io.trino.spi.statistics.TableStatisticType.ROW_COUNT; | ||
| import static io.trino.spi.type.BigintType.BIGINT; | ||
| import static io.trino.spi.type.DoubleType.DOUBLE; | ||
| import static io.trino.spi.type.IntegerType.INTEGER; | ||
| import static io.trino.spi.type.RealType.REAL; | ||
| import static io.trino.spi.type.SmallintType.SMALLINT; | ||
| import static io.trino.spi.type.TinyintType.TINYINT; | ||
| import static io.trino.spi.type.TypeUtils.isFloatingPointNaN; | ||
| import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; | ||
| import static java.lang.Boolean.parseBoolean; | ||
|
|
@@ -1345,6 +1356,133 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl | |
| metastore.dropColumn(hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName(), columnHandle.getName()); | ||
| } | ||
|
|
||
| @Override | ||
| public void setColumnType(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, Type type) | ||
| { | ||
| HiveTableHandle table = (HiveTableHandle) tableHandle; | ||
| failIfAvroSchemaIsSet(table); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should fail for CSV files too (csv is all |
||
| HiveColumnHandle hiveColumn = (HiveColumnHandle) columnHandle; | ||
|
|
||
| HiveStorageFormat storageFormat = extractHiveStorageFormat(metastore.getTable(table.getSchemaName(), table.getTableName()) | ||
| .orElseThrow(() -> new TableNotFoundException(table.getSchemaTableName()))); | ||
| verifyStorageFormatForColumnTypeChange(storageFormat); | ||
| verifySupportedColumnTypeChange(hiveColumn.getType(), type); | ||
| if (table.getPartitionColumns().stream().anyMatch(partition -> partition.getName().equals(hiveColumn.getName()))) { | ||
| throw new TrinoException(NOT_SUPPORTED, "Changing partition column types is not supported"); | ||
| } | ||
| ImmutableList.Builder<Partition> partitionsBuilder = ImmutableList.builder(); | ||
| List<String> partitionsNames = metastore.getPartitionNames(table.getSchemaName(), table.getTableName()) | ||
| .orElseThrow(() -> new TableNotFoundException(table.getSchemaTableName())); | ||
| for (List<String> partitionsNamesBatch : Lists.partition(partitionsNames, 1000)) { | ||
| metastore.getPartitionsByNames(table.getSchemaName(), table.getTableName(), partitionsNamesBatch).values().stream() | ||
| .filter(Optional::isPresent).map(Optional::get) | ||
| .forEach(partition -> { | ||
| boolean skipUpdatePartition = false; | ||
| verifyStorageFormatForColumnTypeChange(extractHiveStorageFormat(partition.getStorage().getStorageFormat())); | ||
| ImmutableList.Builder<Column> columns = ImmutableList.builder(); | ||
| for (Column column : partition.getColumns()) { | ||
| if (column.getName().equals(hiveColumn.getName())) { | ||
| Type sourceType = column.getType().getType(typeManager, getTimestampPrecision(session)); | ||
| verifySupportedColumnTypeChange(sourceType, type); | ||
| columns.add(column); | ||
| skipUpdatePartition = sourceType.equals(type); | ||
| } | ||
| else { | ||
| columns.add(column); | ||
| } | ||
| } | ||
| if (!skipUpdatePartition) { | ||
| partitionsBuilder.add(Partition.builder(partition).setColumns(columns.build()).build()); | ||
| } | ||
| }); | ||
| } | ||
| List<Partition> partitions = partitionsBuilder.build(); | ||
| metastore.setColumnType(table.getSchemaName(), table.getTableName(), hiveColumn.getName(), toHiveType(type)); | ||
| if (!partitions.isEmpty()) { | ||
| // Hive changes a column definition in each partitions unless the ALTER TABLE statement doesn't contain partition condition | ||
| // Trino doesn't support specifying partitions in ALTER TABLE, so SET DATA TYPE updates all partitions | ||
| // https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-AlterPartition | ||
| metastore.alterPartitions(table.getSchemaName(), table.getTableName(), partitions, OptionalLong.empty()); | ||
|
Comment on lines
+1402
to
+1405
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am surprised Firstl, i wouldn't expect existing partitions to be updated at all. Second, it will make it impossible to update type in large tables. Let's talk more about this |
||
| } | ||
| } | ||
|
|
||
| private void verifyStorageFormatForColumnTypeChange(HiveStorageFormat storageFormat) | ||
| { | ||
| // TODO: Support other storage format except for CSV and REGEX | ||
| // AVRO leads to query failure after converting varchar to char(20) | ||
| // RCBINARY leads to query failure and incorrect results | ||
| // RCTEXT returns incorrect results on row types | ||
| // SEQUENCEFILE returns incorrect results on row types | ||
| // JSON leads to query failure for NaN after changing real to double type | ||
| // TEXTFILE returns incorrect results on row types | ||
| if (storageFormat != HiveStorageFormat.ORC && storageFormat != HiveStorageFormat.PARQUET) { | ||
| throw new TrinoException(NOT_SUPPORTED, "Unsupported storage format for changing column type: " + storageFormat); | ||
|
Comment on lines
1418
to
1419
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the check effective?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw it's looks like we should be able to lift the limitation for eg TEXTFILE easily. |
||
| } | ||
| } | ||
|
|
||
| private void verifySupportedColumnTypeChange(Type sourceType, Type targetType) | ||
| { | ||
| if (!canChangeColumnType(sourceType, targetType)) { | ||
| throw new TrinoException(NOT_SUPPORTED, "Cannot change type from %s to %s".formatted(sourceType, targetType)); | ||
| } | ||
| } | ||
|
|
||
| private static boolean canChangeColumnType(Type sourceType, Type targetType) | ||
| { | ||
| if (sourceType.equals(targetType)) { | ||
| return true; | ||
| } | ||
| if (sourceType == TINYINT) { | ||
| return targetType == SMALLINT || targetType == INTEGER || targetType == BIGINT; | ||
| } | ||
| if (sourceType == SMALLINT) { | ||
| return targetType == INTEGER || targetType == BIGINT; | ||
| } | ||
| if (sourceType == INTEGER) { | ||
| return targetType == BIGINT; | ||
| } | ||
| if (sourceType == REAL) { | ||
| return targetType == DOUBLE; | ||
| } | ||
| if (sourceType instanceof VarcharType || sourceType instanceof CharType) { | ||
| // Truncation characters is supported | ||
| return targetType instanceof VarcharType || targetType instanceof CharType; | ||
| } | ||
| if (sourceType instanceof DecimalType sourceDecimal && targetType instanceof DecimalType targetDecimal) { | ||
| // TODO: Support rescale in ORC DecimalColumnReader | ||
| return sourceDecimal.getScale() == targetDecimal.getScale() | ||
| && sourceDecimal.getPrecision() <= targetDecimal.getPrecision(); | ||
| } | ||
| if (sourceType instanceof ArrayType sourceArrayType && targetType instanceof ArrayType targetArrayType) { | ||
| return canChangeColumnType(sourceArrayType.getElementType(), targetArrayType.getElementType()); | ||
| } | ||
| if (sourceType instanceof RowType sourceRowType && targetType instanceof RowType targetRowType) { | ||
| List<RowType.Field> fields = Streams.concat(sourceRowType.getFields().stream(), targetRowType.getFields().stream()) | ||
| .distinct() | ||
| .collect(toImmutableList()); | ||
| for (RowType.Field field : fields) { | ||
| String fieldName = field.getName().orElseThrow(); | ||
| boolean allowedChange = findFieldByName(sourceRowType.getFields(), fieldName) | ||
| .flatMap(sourceField -> findFieldByName(targetRowType.getFields(), fieldName) | ||
| .map(targetField -> canChangeColumnType(sourceField.getType(), targetField.getType()))) | ||
| // Allow removal and addition of fields as the connector supports dropping and re-adding a column | ||
| .orElse(true); | ||
| if (!allowedChange) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private static Optional<RowType.Field> findFieldByName(List<RowType.Field> fields, String fieldName) | ||
| { | ||
| return fields.stream() | ||
| .filter(field -> field.getName().orElseThrow().equals(fieldName)) | ||
| .collect(toOptional()); | ||
| } | ||
|
|
||
| @Override | ||
| public void setTableAuthorization(ConnectorSession session, SchemaTableName table, TrinoPrincipal principal) | ||
| { | ||
|
|
@@ -3463,7 +3601,11 @@ public List<GrantInfo> listTablePrivileges(ConnectorSession session, SchemaTable | |
|
|
||
| private static HiveStorageFormat extractHiveStorageFormat(Table table) | ||
| { | ||
| StorageFormat storageFormat = table.getStorage().getStorageFormat(); | ||
| return extractHiveStorageFormat(table.getStorage().getStorageFormat()); | ||
| } | ||
|
|
||
| private static HiveStorageFormat extractHiveStorageFormat(StorageFormat storageFormat) | ||
| { | ||
| String outputFormat = storageFormat.getOutputFormat(); | ||
| String serde = storageFormat.getSerde(); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit message bears no rational. in fact, what's the benefit of the change?