From 20e95c4d3603e46f0687b65b868cd42342b3cffb Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 12 May 2022 10:49:55 -0700 Subject: [PATCH 1/2] Spark 3.2: Avoid reflection to load metadata tables in SparkTableUtil --- .../org/apache/iceberg/spark/Spark3Util.java | 20 ------- .../apache/iceberg/spark/SparkTableUtil.java | 52 +++++-------------- 2 files changed, 12 insertions(+), 60 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java index 3d0b6a6a8c72..f84e1ae100ee 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -28,8 +28,6 @@ import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -60,8 +58,6 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.Pair; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.CatalystTypeConverters; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -81,13 +77,11 @@ import org.apache.spark.sql.execution.datasources.FileStatusCache; import org.apache.spark.sql.execution.datasources.InMemoryFileIndex; import org.apache.spark.sql.execution.datasources.PartitionDirectory; -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; import org.apache.spark.sql.types.IntegerType; import org.apache.spark.sql.types.LongType; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import scala.Option; -import scala.Some; import scala.collection.JavaConverters; import scala.collection.immutable.Seq; @@ -617,20 +611,6 @@ private static String sqlString(org.apache.iceberg.expressions.Literal lit) { } } - /** - * Returns a metadata table as a Dataset based on the given Iceberg table. - * - * @param spark SparkSession where the Dataset will be created - * @param table an Iceberg table - * @param type the type of metadata table - * @return a Dataset that will read the metadata table - */ - private static Dataset loadMetadataTable(SparkSession spark, org.apache.iceberg.Table table, - MetadataTableType type) { - Table metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false); - return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty())); - } - /** * Returns an Iceberg Table by its name from a Spark V2 Catalog. If cache is enabled in {@link SparkCatalog}, * the {@link TableOperations} of the table may be stale, please refresh the table to get the latest one. diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index e6c90cbccfd3..ed818c8ddc19 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -37,11 +37,11 @@ import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.data.TableMigrationUtil; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.SerializableConfiguration; @@ -55,8 +55,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.spark.TaskContext; @@ -67,7 +69,6 @@ import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Column; -import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -83,6 +84,8 @@ import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.parser.ParseException; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Function2; @@ -599,46 +602,15 @@ private static void deleteManifests(FileIO io, List manifests) { .run(item -> io.deleteFile(item.path())); } - // Attempt to use Spark3 Catalog resolution if available on the path - private static final DynMethods.UnboundMethod LOAD_METADATA_TABLE = DynMethods.builder("loadMetadataTable") - .hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class, Table.class, MetadataTableType.class) - .orNoop() - .build(); - - public static Dataset loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type) { - Preconditions.checkArgument(!LOAD_METADATA_TABLE.isNoop(), "Cannot find Spark3Util class but Spark3 is in use"); - return LOAD_METADATA_TABLE.asStatic().invoke(spark, table, type); - } - public static Dataset loadMetadataTable(SparkSession spark, Table table, MetadataTableType type) { - if (spark.version().startsWith("3")) { - // construct the metadata table instance directly - Dataset catalogMetadataTable = loadCatalogMetadataTable(spark, table, type); - if (catalogMetadataTable != null) { - return catalogMetadataTable; - } - } - - String tableName = table.name(); - String tableLocation = table.location(); - - DataFrameReader dataFrameReader = spark.read().format("iceberg"); - if (tableName.contains("/")) { - // Hadoop Table or Metadata location passed, load without a catalog - return dataFrameReader.load(tableName + "#" + type); - } + return loadMetadataTable(spark, table, type, ImmutableMap.of()); + } - // Catalog based resolution failed, our catalog may be a non-DatasourceV2 Catalog - if (tableName.startsWith("hadoop.")) { - // Try loading by location as Hadoop table without Catalog - return dataFrameReader.load(tableLocation + "#" + type); - } else if (tableName.startsWith("hive")) { - // Try loading by name as a Hive table without Catalog - return dataFrameReader.load(tableName.replaceFirst("hive\\.", "") + "." + type); - } else { - throw new IllegalArgumentException(String.format( - "Cannot find the metadata table for %s of type %s", tableName, type)); - } + public static Dataset loadMetadataTable(SparkSession spark, Table table, MetadataTableType type, + Map extraOptions) { + SparkTable metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false); + CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(extraOptions); + return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options)); } /** From f1bd31fd07ecb9be6276dc5f2f64d12e0f562740 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Fri, 13 May 2022 12:03:25 -0700 Subject: [PATCH 2/2] Deprecate method --- .../java/org/apache/iceberg/spark/SparkTableUtil.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index ed818c8ddc19..b2dbac381e73 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -602,6 +602,17 @@ private static void deleteManifests(FileIO io, List manifests) { .run(item -> io.deleteFile(item.path())); } + /** + * Loads a metadata table. + * + * @deprecated since 0.14.0, will be removed in 0.15.0; + * use {@link #loadMetadataTable(SparkSession, Table, MetadataTableType)}. + */ + @Deprecated + public static Dataset loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type) { + return loadMetadataTable(spark, table, type); + } + public static Dataset loadMetadataTable(SparkSession spark, Table table, MetadataTableType type) { return loadMetadataTable(spark, table, type, ImmutableMap.of()); }