Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand Down Expand Up @@ -299,4 +302,17 @@ public static long estimateSize(StructType tableSchema, long totalRecords) {
}
return result;
}

public static void validateMetadataColumnReferences(Schema tableSchema, Schema readSchema) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially did this for Spark only. It may make sense to move it to DataTableScan in core.

List<String> conflictingColumnNames = readSchema.columns().stream()
.map(Types.NestedField::name)
.filter(name -> MetadataColumns.isMetadataColumn(name) && tableSchema.findField(name) != null)
.collect(Collectors.toList());

ValidationException.check(
conflictingColumnNames.isEmpty(),
"Table column names conflict with names reserved for Iceberg metadata columns: %s.\n" +
"Please, use ALTER TABLE statements to rename the conflicting table columns.",
conflictingColumnNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {

SparkBatchScan(SparkSession spark, Table table, SparkReadConf readConf, boolean caseSensitive,
Schema expectedSchema, List<Expression> filters, CaseInsensitiveStringMap options) {

SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);

this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = readConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -169,6 +170,35 @@ public void testPartitionMetadataColumnWithUnknownTransforms() {
() -> sql("SELECT _partition FROM %s", TABLE_NAME));
}

@Test
public void testConflictingColumns() {
table.updateSchema()
.addColumn(MetadataColumns.SPEC_ID.name(), Types.IntegerType.get())
.addColumn(MetadataColumns.FILE_PATH.name(), Types.StringType.get())
.commit();
Comment on lines +175 to +178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, would it make sense to fail here, in the updateSchema pathway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. I would like to avoid not allowing these names, but you're right that it would catch the problem earlier.


sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1', -1, 'path/to/file')", TABLE_NAME);

assertEquals("Rows must match",
ImmutableList.of(row(1L, "a1")),
sql("SELECT id, category FROM %s", TABLE_NAME));

AssertHelpers.assertThrows("Should fail to query conflicting columns",
ValidationException.class, "column names conflict",
() -> sql("SELECT * FROM %s", TABLE_NAME));

table.refresh();

table.updateSchema()
.renameColumn(MetadataColumns.SPEC_ID.name(), "_renamed" + MetadataColumns.SPEC_ID.name())
.renameColumn(MetadataColumns.FILE_PATH.name(), "_renamed" + MetadataColumns.FILE_PATH.name())
.commit();

assertEquals("Rows must match",
ImmutableList.of(row(0, null, -1)),
sql("SELECT _spec_id, _partition, _renamed_spec_id FROM %s", TABLE_NAME));
}

private void createAndInitTable() throws IOException {
this.table = TestTables.create(temp.newFolder(), TABLE_NAME, SCHEMA, PartitionSpec.unpartitioned());

Expand Down