Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ffe2f19
metadataTableSchena->metadataTableSchema
felixYyu Nov 26, 2021
6aa419c
Merge branch 'apache:master' into master
felixYyu Dec 1, 2021
222867e
Merge branch 'apache:master' into master
felixYyu Dec 2, 2021
e625646
Merge branch 'apache:master' into master
felixYyu Dec 5, 2021
8db1919
Merge branch 'apache:master' into master
felixYyu Dec 6, 2021
dae5d5e
Merge branch 'apache:master' into master
felixYyu Dec 8, 2021
10eb9be
Merge branch 'apache:master' into master
felixYyu Dec 14, 2021
53c5e83
Merge branch 'apache:master' into master
felixYyu Dec 15, 2021
7fb9ea5
Merge branch 'apache:master' into master
felixYyu Dec 16, 2021
24c3320
Merge branch 'apache:master' into master
felixYyu Dec 20, 2021
ac7d28e
Merge branch 'apache:master' into master
felixYyu Dec 21, 2021
0942be4
Merge branch 'apache:master' into master
felixYyu Dec 27, 2021
8e4c56e
Merge branch 'apache:master' into master
felixYyu Dec 28, 2021
901fa0d
Merge branch 'apache:master' into master
felixYyu Dec 30, 2021
f455284
fix #3558 Support alter and show partition in V2 Catalog
felixYyu Jan 9, 2022
6261b1c
Merge branch 'apache:master' into master
felixYyu Jan 9, 2022
85c8581
fix #3558 Support alter and show partition in V2 Catalog
felixYyu Jan 9, 2022
da71f4f
fix #3558 Support alter and show partition in V2 Catalog
felixYyu Jan 9, 2022
f30f99a
fix #3558 Support alter and show partition in V2 Catalog
felixYyu Jan 9, 2022
ce85c3c
fix #3558 Support alter and show partition in V2 Catalog
felixYyu Jan 9, 2022
70048ae
fix #3558 Support alter and show partition in V2 Catalog
felixYyu Jan 9, 2022
e1f6570
fix #3558 Support alter and show partition in V2 Catalog
felixYyu Jan 10, 2022
885c3d1
fix #3558 Support alter and show partition in V2 Catalog
felixYyu Jan 10, 2022
a30dfdc
Merge branch 'apache:master' into master
felixYyu Jan 11, 2022
ca6dd41
Merge branch 'apache:master' into SupportsPartitionManagement
felixYyu Jan 12, 2022
2a6325b
Merge branch 'apache:master' into master
felixYyu Jan 12, 2022
a8a37f7
Merge branch 'apache:master' into master
felixYyu Jan 13, 2022
49dfe17
Merge branch 'SupportsPartitionManagement' into support_partition_man…
felixYyu Jan 13, 2022
3cebddb
Merge pull request #1 from felixYyu/support_partition_managent
felixYyu Jan 13, 2022
6281e75
updated with #3745
felixYyu Jan 13, 2022
17ba08c
updated with #3745
felixYyu Jan 13, 2022
3f0c4eb
updated with #3745
felixYyu Jan 13, 2022
dcd42e5
fix review bug
felixYyu Jan 26, 2022
615ee19
fix review bug
felixYyu Jan 26, 2022
dc319b0
Merge branch 'master' into SupportsPartitionManagement
felixYyu Jan 26, 2022
862863e
Update Spark3Util.java
felixYyu Jan 26, 2022
154ac4c
fix review bug
felixYyu Jan 26, 2022
d79be53
Merge remote-tracking branch 'origin/SupportsPartitionManagement' int…
felixYyu Jan 26, 2022
78ec544
fix review bug
felixYyu Jan 26, 2022
9f9ce84
fix review bug
felixYyu Jan 26, 2022
af4bae2
fix review bug
felixYyu Jan 27, 2022
3d835d1
fix null exception bug
felixYyu Jan 27, 2022
00075a1
fix review bug
felixYyu Feb 14, 2022
4190533
fix format error
felixYyu Feb 21, 2022
91ecae2
Merge branch 'master' into SupportsPartitionManagement
felixYyu Feb 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
Expand All @@ -42,17 +44,25 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.connector.catalog.MetadataColumn;
import org.apache.spark.sql.connector.catalog.SupportsDelete;
import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
Expand All @@ -72,7 +82,8 @@
import org.slf4j.LoggerFactory;

public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
SupportsRead, SupportsWrite, SupportsDelete, SupportsRowLevelOperations, SupportsMetadataColumns {
SupportsRead, SupportsWrite, SupportsDelete, SupportsRowLevelOperations, SupportsMetadataColumns,
SupportsPartitionManagement {

private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class);

Expand Down Expand Up @@ -283,6 +294,65 @@ public void deleteWhere(Filter[] filters) {
}
}

@Override
public StructType partitionSchema() {
return (StructType) SparkSchemaUtil.convert(Partitioning.partitionType(table()));
}

@Override
public void createPartition(InternalRow ident, Map<String, String> properties) throws UnsupportedOperationException {
throw new UnsupportedOperationException("Cannot explicitly create partitions in Iceberg tables");
}

@Override
public boolean dropPartition(InternalRow ident) {
throw new UnsupportedOperationException("Cannot explicitly drop partitions in Iceberg tables");
}

@Override
public void replacePartitionMetadata(InternalRow ident, Map<String, String> properties)
throws UnsupportedOperationException {
throw new UnsupportedOperationException("Iceberg partitions do not support metadata");
}

@Override
public Map<String, String> loadPartitionMetadata(InternalRow ident) throws UnsupportedOperationException {
throw new UnsupportedOperationException("Iceberg partitions do not support metadata");
}

@Override
public InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident) {
// support show partitions
List<InternalRow> rows = Lists.newArrayList();
Dataset<Row> df = SparkTableUtil.loadMetadataTable(sparkSession(), icebergTable, MetadataTableType.PARTITIONS);
if (names.length > 0) {
StructType schema = partitionSchema();
df.collectAsList().forEach(row -> {
GenericRowWithSchema genericRow = (GenericRowWithSchema) row.apply(0);
boolean exits = true;
int index = 0;
while (index < names.length) {
DataType dataType = schema.apply(names[index]).dataType();
int fieldIndex = schema.fieldIndex(names[index]);
if (!genericRow.values()[fieldIndex].equals(ident.get(index, dataType))) {
exits = false;
break;
}
index += 1;
}
if (exits) {
rows.add(new GenericInternalRow(genericRow.values()));
}
});
} else {
df.collectAsList().forEach(row -> {
GenericRowWithSchema genericRow = (GenericRowWithSchema) row.apply(0);
rows.add(new GenericInternalRow(genericRow.values()));
});
}
return rows.toArray(new InternalRow[0]);
}

@Override
public String toString() {
return icebergTable.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.source.SimpleRecord;
Expand Down Expand Up @@ -176,4 +177,27 @@ public void testViewsReturnRecentResults() {
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM tmp"));
}

@Test
public void testAddPartition() {
// only check V2 command [IF NOT EXISTS] syntax
AssertHelpers.assertThrows("Cannot explicitly create partitions in Iceberg tables",
UnsupportedOperationException.class,
() -> sql("ALTER TABLE %s ADD IF NOT EXISTS PARTITION (id_trunc=2)", tableName));
}

@Test
public void testDropPartition() {
// only check V2 command [IF EXISTS] syntax
AssertHelpers.assertThrows("Cannot explicitly drop partitions in Iceberg tables",
UnsupportedOperationException.class,
() -> sql("ALTER TABLE %s DROP IF EXISTS PARTITION (id_trunc=0)", tableName));
}

@Test
public void testShowPartitions() {
Assert.assertEquals("Should have 2 rows", 2L, sql("SHOW PARTITIONS %s", tableName).size());
Assert.assertEquals("Should have 1 row", 1L,
sql("SHOW PARTITIONS %s PARTITION (id_trunc=0)", tableName).size());
}
}