Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source;

import org.apache.spark.sql.connector.catalog.MetadataColumn;
import org.apache.spark.sql.types.DataType;

public class SparkMetadataColumn implements MetadataColumn {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, Spark does not offer a utility for creating metadata columns similarly to Expressions. That's why I had to implement it in Iceberg. We should probably move it to Spark.


private final String name;
private final DataType dataType;
private final boolean isNullable;

public SparkMetadataColumn(String name, DataType dataType, boolean isNullable) {
this.name = name;
this.dataType = dataType;
this.isNullable = isNullable;
}

@Override
public String name() {
return name;
}

@Override
public DataType dataType() {
return dataType;
}

@Override
public boolean isNullable() {
return isNullable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
Expand All @@ -45,7 +47,9 @@
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.MetadataColumn;
import org.apache.spark.sql.connector.catalog.SupportsDelete;
import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
Expand All @@ -56,6 +60,8 @@
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
Expand All @@ -69,7 +75,7 @@
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;

public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
SupportsRead, SupportsWrite, SupportsDelete, SupportsMerge {
SupportsRead, SupportsWrite, SupportsDelete, SupportsMerge, SupportsMetadataColumns {

private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class);

Expand Down Expand Up @@ -168,6 +174,17 @@ public Set<TableCapability> capabilities() {
return CAPABILITIES;
}

@Override
public MetadataColumn[] metadataColumns() {
DataType sparkPartitionType = SparkSchemaUtil.convert(Partitioning.partitionType(table()));
return new MetadataColumn[] {
new SparkMetadataColumn(MetadataColumns.SPEC_ID.name(), DataTypes.IntegerType, false),
new SparkMetadataColumn(MetadataColumns.PARTITION_COLUMN_NAME, sparkPartitionType, true),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the partition column is nullable (e.g. unpartitioned tables).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that we can project the partition. I've been meaning to add a way to project the individual partition fields, but this is probably way easier.

new SparkMetadataColumn(MetadataColumns.FILE_PATH.name(), DataTypes.StringType, false),
new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too familiar with the metadata columns, but I see 5 currently defined in Iceberg's MetadataColumns. Is there a reason to omit _deleted here? And it probably doesn't matter, but should we keep the columns in the order defined in MetadataColumns (ordered by id from -1 to -5)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can match the order in MetadataColumns for consistency.

I am not sure how useful _deleted metadata column will be in Spark now. I guess it will be always false?
@jackye1995 @chenjunjiedada @RussellSpitzer @rdblue, any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_deleted can be added later. The purpose of that field is to allow us to merge deletes in actions.

};
}

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.iceberg.spark.source;

import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
Expand All @@ -30,14 +33,14 @@ public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> exten

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
String[] parts = ident.name().split("\\$", 2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the ugly workaround we had earlier.

if (parts.length == 2) {
TestTables.TestTable table = TestTables.load(parts[0]);
String[] metadataColumns = parts[1].split(",");
return new SparkTestTable(table, metadataColumns, false);
} else {
TestTables.TestTable table = TestTables.load(ident.name());
return new SparkTestTable(table, null, false);
TableIdentifier tableIdentifier = Spark3Util.identifierToTableIdentifier(ident);
Namespace namespace = tableIdentifier.namespace();

TestTables.TestTable table = TestTables.load(tableIdentifier.toString());
if (table == null && namespace.equals(Namespace.of("default"))) {
table = TestTables.load(tableIdentifier.name());
}

return new SparkTable(table, false);
Comment on lines +40 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If table is null but namespace isn't default, what will happen here?

I guess since this is for testing it's not as much of a concern, but should we throw NoSuchTableException anyways to help out test authors (or do I possibly have that completely confused)?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the way we use TestSparkCatalog is is a little bit weird right now. I just made it work. I can throw an exception too.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ public void dropTable() {
TestTables.clearTables();
}

// TODO: remove testing workarounds once we compile against Spark 3.2

@Test
public void testSpecAndPartitionMetadataColumns() {
// TODO: support metadata structs in vectorized ORC reads
Expand Down Expand Up @@ -156,7 +154,7 @@ public void testSpecAndPartitionMetadataColumns() {
row(3, row(null, 2))
);
assertEquals("Rows must match", expected,
sql("SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", TABLE_NAME));
sql("SELECT _spec_id, _partition FROM %s ORDER BY _spec_id", TABLE_NAME));
}

@Test
Expand All @@ -168,7 +166,7 @@ public void testPartitionMetadataColumnWithUnknownTransforms() {

AssertHelpers.assertThrows("Should fail to query the partition metadata column",
ValidationException.class, "Cannot build table partition type, unknown transforms",
() -> sql("SELECT _partition FROM `%s$_partition`", TABLE_NAME));
() -> sql("SELECT _partition FROM %s", TABLE_NAME));
}

private void createAndInitTable() throws IOException {
Expand Down