diff --git a/spark/src/main/java/org/apache/iceberg/spark/FixupTypes.java b/core/src/main/java/org/apache/iceberg/types/FixupTypes.java similarity index 82% rename from spark/src/main/java/org/apache/iceberg/spark/FixupTypes.java rename to core/src/main/java/org/apache/iceberg/types/FixupTypes.java index 3c3cc1ea67c2..b203ca131807 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/FixupTypes.java +++ b/core/src/main/java/org/apache/iceberg/types/FixupTypes.java @@ -17,32 +17,23 @@ * under the License. */ -package org.apache.iceberg.spark; +package org.apache.iceberg.types; import java.util.List; import java.util.function.Supplier; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; /** - * This is used to fix primitive types to match a table schema. Some types, like binary and fixed, - * are converted to the same Spark type. Conversion back can produce only one, which may not be - * correct. This uses a reference schema to override types that were lost in round-trip conversion. + * This is used to fix primitive types to match a table schema. This uses a reference schema to + * override types that were lost in round-trip conversion. */ -class FixupTypes extends TypeUtil.CustomOrderSchemaVisitor { +public abstract class FixupTypes extends TypeUtil.CustomOrderSchemaVisitor { private final Schema referenceSchema; private Type sourceType; - static Schema fixup(Schema schema, Schema referenceSchema) { - return new Schema(TypeUtil.visit(schema, - new FixupTypes(referenceSchema)).asStructType().fields()); - } - - private FixupTypes(Schema referenceSchema) { + public FixupTypes(Schema referenceSchema) { this.referenceSchema = referenceSchema; this.sourceType = referenceSchema.asStruct(); } @@ -156,20 +147,13 @@ public Type primitive(Type.PrimitiveType primitive) { return primitive; // already correct } - switch (primitive.typeId()) { - case STRING: - if (sourceType.typeId() == Type.TypeID.UUID) { - return sourceType; - } - break; - case BINARY: - if (sourceType.typeId() == Type.TypeID.FIXED) { - return sourceType; - } - break; - default: + if (fixupPrimitive(primitive, sourceType)) { + return sourceType; } + // nothing to fix up, let validation catch promotion errors return primitive; } + + protected abstract boolean fixupPrimitive(Type.PrimitiveType type, Type source); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTypes.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTypes.java new file mode 100644 index 000000000000..2d3ea4c81f00 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTypes.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.FixupTypes; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; + +/** + * Some types, like binary and fixed, are converted to the same Spark type. Conversion back + * can produce only one, which may not be correct. + */ +class SparkFixupTypes extends FixupTypes { + + private SparkFixupTypes(Schema referenceSchema) { + super(referenceSchema); + } + + static Schema fixup(Schema schema, Schema referenceSchema) { + return new Schema(TypeUtil.visit(schema, + new SparkFixupTypes(referenceSchema)).asStructType().fields()); + } + + @Override + protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) { + switch (type.typeId()) { + case STRING: + if (source.typeId() == Type.TypeID.UUID) { + return true; + } + break; + case BINARY: + if (source.typeId() == Type.TypeID.FIXED) { + return true; + } + break; + default: + } + return false; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java index 3f6e41e3072b..484c407e0247 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java @@ -163,7 +163,7 @@ public static Schema convert(Schema baseSchema, StructType sparkType) { // reassign ids to match the base schema Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema); // fix types that can't be represented in Spark (UUID and Fixed) - return FixupTypes.fixup(schema, baseSchema); + return SparkFixupTypes.fixup(schema, baseSchema); } /**