Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -60,8 +58,6 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand All @@ -81,13 +77,11 @@
import org.apache.spark.sql.execution.datasources.FileStatusCache;
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex;
import org.apache.spark.sql.execution.datasources.PartitionDirectory;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.Option;
import scala.Some;
import scala.collection.JavaConverters;
import scala.collection.immutable.Seq;

Expand Down Expand Up @@ -617,20 +611,6 @@ private static String sqlString(org.apache.iceberg.expressions.Literal<?> lit) {
}
}

/**
* Returns a metadata table as a Dataset based on the given Iceberg table.
*
* @param spark SparkSession where the Dataset will be created
* @param table an Iceberg table
* @param type the type of metadata table
* @return a Dataset that will read the metadata table
*/
private static Dataset<Row> loadMetadataTable(SparkSession spark, org.apache.iceberg.Table table,
MetadataTableType type) {
Table metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false);
return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty()));
}

/**
* Returns an Iceberg Table by its name from a Spark V2 Catalog. If cache is enabled in {@link SparkCatalog},
* the {@link TableOperations} of the table may be stale, please refresh the table to get the latest one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
Expand All @@ -55,8 +55,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.TaskContext;
Expand All @@ -67,7 +69,6 @@
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
Expand All @@ -83,6 +84,8 @@
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function2;
Expand Down Expand Up @@ -599,46 +602,26 @@ private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
.run(item -> io.deleteFile(item.path()));
}

// Attempt to use Spark3 Catalog resolution if available on the path
private static final DynMethods.UnboundMethod LOAD_METADATA_TABLE = DynMethods.builder("loadMetadataTable")
.hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class, Table.class, MetadataTableType.class)
.orNoop()
.build();

/**
* Loads a metadata table.
*
* @deprecated since 0.14.0, will be removed in 0.15.0;
* use {@link #loadMetadataTable(SparkSession, Table, MetadataTableType)}.
*/
@Deprecated
public static Dataset<Row> loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just flagging that this will remove public method, in case anyone may use this method (though I feel there should not be).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I missed it. I wonder why we made it public, though. I'd probably remove the method but I don't mind deprecating and delegating to the one below if folks think this may impact anyone.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably deprecate it for at least one release before dropping it.

Copy link
Contributor

@flyrain flyrain May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a grey area, in which these methods actually are APIs, but not considered as APIs. Per our community sync, only the public things in the api module are considered as APIs. We need something(e.g. annotation) to mark these methods as APIs. Otherwise, the discussion of which public method should be deprecated and which shouldn't will keep going.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Preconditions.checkArgument(!LOAD_METADATA_TABLE.isNoop(), "Cannot find Spark3Util class but Spark3 is in use");
return LOAD_METADATA_TABLE.asStatic().invoke(spark, table, type);
return loadMetadataTable(spark, table, type);
}

public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, MetadataTableType type) {
if (spark.version().startsWith("3")) {
// construct the metadata table instance directly
Dataset<Row> catalogMetadataTable = loadCatalogMetadataTable(spark, table, type);
if (catalogMetadataTable != null) {
return catalogMetadataTable;
}
}

String tableName = table.name();
String tableLocation = table.location();

DataFrameReader dataFrameReader = spark.read().format("iceberg");
if (tableName.contains("/")) {
// Hadoop Table or Metadata location passed, load without a catalog
return dataFrameReader.load(tableName + "#" + type);
}
return loadMetadataTable(spark, table, type, ImmutableMap.of());
}

// Catalog based resolution failed, our catalog may be a non-DatasourceV2 Catalog
if (tableName.startsWith("hadoop.")) {
// Try loading by location as Hadoop table without Catalog
return dataFrameReader.load(tableLocation + "#" + type);
} else if (tableName.startsWith("hive")) {
// Try loading by name as a Hive table without Catalog
return dataFrameReader.load(tableName.replaceFirst("hive\\.", "") + "." + type);
} else {
throw new IllegalArgumentException(String.format(
"Cannot find the metadata table for %s of type %s", tableName, type));
}
public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, MetadataTableType type,
Map<String, String> extraOptions) {
SparkTable metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like constructing DataSourceV2Relation directly is the easiest and safest option.
Let me know if I missed a use case where we still need the old code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this code in that private method in Spark3Util. I just moved it.

CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(extraOptions);
return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options));
}

/**
Expand Down