diff --git a/iceberg/checkstyle/checkstyle.xml b/iceberg/checkstyle/checkstyle.xml
index a288af5de908..cdbe81056961 100644
--- a/iceberg/checkstyle/checkstyle.xml
+++ b/iceberg/checkstyle/checkstyle.xml
@@ -368,6 +368,11 @@
+
+
+
+
+
diff --git a/iceberg/iceberg-catalog/pom.xml b/iceberg/iceberg-catalog/pom.xml
index 3b41f8868705..d922b18af9bb 100644
--- a/iceberg/iceberg-catalog/pom.xml
+++ b/iceberg/iceberg-catalog/pom.xml
@@ -58,6 +58,10 @@
tests
test
+
+ org.assertj
+ assertj-core
+
org.junit.jupiter
junit-jupiter-api
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index b97ff3daa8b3..d766695db04b 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -56,7 +56,9 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.LocationUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,12 +74,14 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa
private FileIO fileIO;
private ClientPool clients;
private boolean listAllTables = false;
+ private Map catalogProperties;
public HiveCatalog() {
}
@Override
public void initialize(String inputName, Map properties) {
+ this.catalogProperties = ImmutableMap.copyOf(properties);
this.name = inputName;
if (conf == null) {
LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
@@ -89,7 +93,8 @@ public void initialize(String inputName, Map properties) {
}
if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
- this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+ this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+ LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION)));
}
this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
@@ -231,7 +236,11 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
- Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
+ Preconditions.checkArgument(
+ identifier != null && isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
+ Preconditions.checkArgument(
+ metadataFileLocation != null && !metadataFileLocation.isEmpty(),
+ "Cannot register an empty metadata file location as a table");
// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
@@ -239,7 +248,7 @@ public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String
}
TableOperations ops = newTableOps(identifier);
- InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
+ InputFile metadataFile = ops.io().newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);
@@ -545,6 +554,11 @@ public Configuration getConf() {
return conf;
}
+ @Override
+ protected Map properties() {
+ return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
+ }
+
@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 9fe617de8251..f44e1b9bfe02 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -37,10 +37,14 @@
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hive.iceberg.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -56,6 +60,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.JsonUtil;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -72,6 +77,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries";
private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
+ // the max size is based on HMS backend database. For Hive versions below 2.3, the max table parameter size is 4000
+ // characters, see https://issues.apache.org/jira/browse/HIVE-12274
+ // set to 0 to not expose Iceberg metadata in HMS Table properties.
+ private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
+ private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;
+
private static final BiMap ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things but with different names
GC_ENABLED, "external.table.purge",
@@ -100,6 +111,7 @@ public static String translateToIcebergProp(String hmsProp) {
private final String database;
private final String tableName;
private final Configuration conf;
+ private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
private final FileIO fileIO;
private final ClientPool metaClients;
@@ -115,6 +127,7 @@ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO
this.tableName = table;
this.metadataRefreshMaxRetries =
conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
+ this.maxHiveTablePropertySize = conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
}
@Override
@@ -250,6 +263,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
} finally {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, commitLock);
}
+
+ LOG.info("Committed to table {} with the new metadata location {}", fullName, newMetadataLocation);
}
@VisibleForTesting
@@ -341,9 +356,84 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM
parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}
+ setSnapshotStats(metadata, parameters);
+ setSchema(metadata, parameters);
+ setPartitionSpec(metadata, parameters);
+ setSortOrder(metadata, parameters);
+
tbl.setParameters(parameters);
}
+ @VisibleForTesting
+ void setSnapshotStats(TableMetadata metadata, Map parameters) {
+ parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID);
+ parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP);
+ parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY);
+
+ Snapshot currentSnapshot = metadata.currentSnapshot();
+ if (exposeInHmsProperties() && currentSnapshot != null) {
+ parameters.put(TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId()));
+ parameters.put(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, String.valueOf(currentSnapshot.timestampMillis()));
+ setSnapshotSummary(parameters, currentSnapshot);
+ }
+
+ parameters.put(TableProperties.SNAPSHOT_COUNT, String.valueOf(metadata.snapshots().size()));
+ }
+
+ @VisibleForTesting
+ void setSnapshotSummary(Map parameters, Snapshot currentSnapshot) {
+ try {
+ String summary = JsonUtil.mapper().writeValueAsString(currentSnapshot.summary());
+ if (summary.length() <= maxHiveTablePropertySize) {
+ parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary);
+ } else {
+ LOG.warn("Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters",
+ currentSnapshot.snapshotId(), maxHiveTablePropertySize);
+ }
+ } catch (JsonProcessingException e) {
+ LOG.warn("Failed to convert current snapshot({}) summary to a json string", currentSnapshot.snapshotId(), e);
+ }
+ }
+
+ @VisibleForTesting
+ void setSchema(TableMetadata metadata, Map parameters) {
+ parameters.remove(TableProperties.CURRENT_SCHEMA);
+ if (exposeInHmsProperties() && metadata.schema() != null) {
+ String schema = SchemaParser.toJson(metadata.schema());
+ setField(parameters, TableProperties.CURRENT_SCHEMA, schema);
+ }
+ }
+
+ @VisibleForTesting
+ void setPartitionSpec(TableMetadata metadata, Map parameters) {
+ parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC);
+ if (exposeInHmsProperties() && metadata.spec() != null && metadata.spec().isPartitioned()) {
+ String spec = PartitionSpecParser.toJson(metadata.spec());
+ setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec);
+ }
+ }
+
+ @VisibleForTesting
+ void setSortOrder(TableMetadata metadata, Map parameters) {
+ parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
+ if (exposeInHmsProperties() && metadata.sortOrder() != null && metadata.sortOrder().isSorted()) {
+ String sortOrder = SortOrderParser.toJson(metadata.sortOrder());
+ setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder);
+ }
+ }
+
+ private void setField(Map parameters, String key, String value) {
+ if (value.length() <= maxHiveTablePropertySize) {
+ parameters.put(key, value);
+ } else {
+ LOG.warn("Not exposing {} in HMS since it exceeds {} characters", key, maxHiveTablePropertySize);
+ }
+ }
+
+ private boolean exposeInHmsProperties() {
+ return maxHiveTablePropertySize > 0;
+ }
+
private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {
final StorageDescriptor storageDescriptor = new StorageDescriptor();
@@ -364,6 +454,7 @@ private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hive
return storageDescriptor;
}
+ @SuppressWarnings("ReverseDnsLookup")
@VisibleForTesting
HiveCommitLock createLock() throws UnknownHostException, TException, InterruptedException {
return new HiveCommitLock(conf, metaClients, catalogName, database, tableName);
@@ -377,8 +468,7 @@ private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadata
io().deleteFile(metadataLocation);
}
} catch (RuntimeException e) {
- LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e);
- throw e;
+ LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e);
} finally {
doUnlock(lock);
}
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index 777e7ea590aa..9654a24c49ec 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -21,12 +21,16 @@
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -34,8 +38,10 @@
import org.apache.hive.iceberg.org.apache.avro.generic.GenericData;
import org.apache.hive.iceberg.org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
@@ -49,13 +55,16 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.ConfigProperties;
+import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -68,6 +77,8 @@
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
+import static org.apache.iceberg.TableMetadataParser.Codec;
+import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
@@ -137,30 +148,7 @@ public void testDrop() {
public void testDropWithoutPurgeLeavesTableData() throws IOException {
Table table = catalog.loadTable(TABLE_IDENTIFIER);
- GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
- List records = Lists.newArrayList(
- recordBuilder.set("id", 1L).build(),
- recordBuilder.set("id", 2L).build(),
- recordBuilder.set("id", 3L).build()
- );
-
- String fileLocation = table.location().replace("file:", "") + "/data/file.avro";
- try (FileAppender writer = Avro.write(Files.localOutput(fileLocation))
- .schema(schema)
- .named("test")
- .build()) {
- for (GenericData.Record rec : records) {
- writer.add(rec);
- }
- }
-
- DataFile file = DataFiles.builder(table.spec())
- .withRecordCount(3)
- .withPath(fileLocation)
- .withFileSizeInBytes(Files.localInput(fileLocation).getLength())
- .build();
-
- table.newAppend().appendFile(file).commit();
+ String fileLocation = appendData(table, "file");
String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", "");
@@ -298,16 +286,18 @@ public void testColumnTypeChangeInMetastore() throws TException {
Assert.assertEquals("Schema should match expected", expectedSchema.asStruct(), icebergTable.schema().asStruct());
}
- @Test(expected = CommitFailedException.class)
+ @Test
public void testFailure() throws TException {
Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
org.apache.hadoop.hive.metastore.api.Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
String dummyLocation = "dummylocation";
table.getParameters().put(METADATA_LOCATION_PROP, dummyLocation);
metastoreClient.alter_table(DB_NAME, TABLE_NAME, table);
- icebergTable.updateSchema()
- .addColumn("data", Types.LongType.get())
- .commit();
+ Assertions.assertThatThrownBy(() -> icebergTable.updateSchema()
+ .addColumn("data", Types.LongType.get())
+ .commit())
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("is not same as the current table metadata location 'dummylocation'");
}
@Test
@@ -404,6 +394,83 @@ public void testRegisterTable() throws TException {
Assert.assertEquals(originalTable.getSd(), newTable.getSd());
}
+ @Test
+ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TException {
+ // create a hadoop catalog
+ String tableLocation = tempFolder.newFolder().toString();
+ HadoopCatalog hadoopCatalog = new HadoopCatalog(new Configuration(), tableLocation);
+ // create table using hadoop catalog
+ TableIdentifier identifier = TableIdentifier.of(DB_NAME, "table1");
+ Table table = hadoopCatalog.createTable(identifier, schema, PartitionSpec.unpartitioned(), Maps.newHashMap());
+ // insert some data
+ String file1Location = appendData(table, "file1");
+ List tasks = Lists.newArrayList(table.newScan().planFiles());
+ Assert.assertEquals("Should scan 1 file", 1, tasks.size());
+ Assert.assertEquals(tasks.get(0).file().path(), file1Location);
+
+ // collect metadata file
+ List metadataFiles =
+ Arrays.stream(new File(table.location() + "/metadata").listFiles())
+ .map(File::getAbsolutePath)
+ .filter(f -> f.endsWith(getFileExtension(Codec.NONE)))
+ .collect(Collectors.toList());
+ Assert.assertEquals(2, metadataFiles.size());
+
+ AssertHelpers.assertThrows(
+ "Hive metastore should not have this table", NoSuchObjectException.class,
+ "table not found",
+ () -> metastoreClient.getTable(DB_NAME, "table1"));
+ AssertHelpers.assertThrows(
+ "Hive catalog should fail to load the table", NoSuchTableException.class,
+ "Table does not exist:",
+ () -> catalog.loadTable(identifier));
+
+ // register the table to hive catalog using the latest metadata file
+ String latestMetadataFile = ((BaseTable) table).operations().current().metadataFileLocation();
+ catalog.registerTable(identifier, "file:" + latestMetadataFile);
+ Assert.assertNotNull(metastoreClient.getTable(DB_NAME, "table1"));
+
+ // load the table in hive catalog
+ table = catalog.loadTable(identifier);
+ Assert.assertNotNull(table);
+
+ // insert some data
+ String file2Location = appendData(table, "file2");
+ tasks = Lists.newArrayList(table.newScan().planFiles());
+ Assert.assertEquals("Should scan 2 files", 2, tasks.size());
+ Set files = tasks.stream().map(task -> task.file().path().toString()).collect(Collectors.toSet());
+ Assert.assertTrue(files.contains(file1Location) && files.contains(file2Location));
+ }
+
+ private String appendData(Table table, String fileName) throws IOException {
+ GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
+ List records = Lists.newArrayList(
+ recordBuilder.set("id", 1L).build(),
+ recordBuilder.set("id", 2L).build(),
+ recordBuilder.set("id", 3L).build()
+ );
+
+ String fileLocation = table.location().replace("file:", "") + "/data/" + fileName + ".avro";
+ try (FileAppender writer = Avro.write(Files.localOutput(fileLocation))
+ .schema(schema)
+ .named("test")
+ .build()) {
+ for (GenericData.Record rec : records) {
+ writer.add(rec);
+ }
+ }
+
+ DataFile file = DataFiles.builder(table.spec())
+ .withRecordCount(3)
+ .withPath(fileLocation)
+ .withFileSizeInBytes(Files.localInput(fileLocation).getLength())
+ .build();
+
+ table.newAppend().appendFile(file).commit();
+
+ return fileLocation;
+ }
+
@Test
public void testRegisterExistingTable() throws TException {
org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
index 6ba3a46a27a6..80891f6f489b 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
@@ -21,15 +21,29 @@
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -42,6 +56,7 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.JsonUtil;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Rule;
@@ -51,7 +66,16 @@
import static org.apache.iceberg.NullOrder.NULLS_FIRST;
import static org.apache.iceberg.SortDirection.ASC;
+import static org.apache.iceberg.TableProperties.CURRENT_SCHEMA;
+import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_ID;
+import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_SUMMARY;
+import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_TIMESTAMP;
+import static org.apache.iceberg.TableProperties.DEFAULT_PARTITION_SPEC;
+import static org.apache.iceberg.TableProperties.DEFAULT_SORT_ORDER;
+import static org.apache.iceberg.expressions.Expressions.bucket;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestHiveCatalog extends HiveMetastoreTest {
private static ImmutableMap meta = ImmutableMap.of(
@@ -220,7 +244,7 @@ public void testReplaceTxnBuilder() throws Exception {
}
@Test
- public void testCreateTableDefaultSortOrder() {
+ public void testCreateTableDefaultSortOrder() throws Exception {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
@@ -234,13 +258,16 @@ public void testCreateTableDefaultSortOrder() {
Table table = catalog.createTable(tableIdent, schema, spec);
Assert.assertEquals("Order ID must match", 0, table.sortOrder().orderId());
Assert.assertTrue("Order must unsorted", table.sortOrder().isUnsorted());
+
+ Assert.assertFalse("Must not have default sort order in catalog",
+ hmsTableParameters().containsKey(DEFAULT_SORT_ORDER));
} finally {
catalog.dropTable(tableIdent);
}
}
@Test
- public void testCreateTableCustomSortOrder() {
+ public void testCreateTableCustomSortOrder() throws Exception {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
@@ -265,6 +292,8 @@ public void testCreateTableCustomSortOrder() {
Assert.assertEquals("Null order must match ", NULLS_FIRST, sortOrder.fields().get(0).nullOrder());
Transform, ?> transform = Transforms.identity(Types.IntegerType.get());
Assert.assertEquals("Transform must match", transform, sortOrder.fields().get(0).transform());
+
+ Assert.assertEquals(SortOrderParser.toJson(table.sortOrder()), hmsTableParameters().get(DEFAULT_SORT_ORDER));
} finally {
catalog.dropTable(tableIdent);
}
@@ -457,15 +486,228 @@ public void testUUIDinTableProperties() throws Exception {
.withLocation(location)
.create();
- String tableName = tableIdentifier.name();
- org.apache.hadoop.hive.metastore.api.Table hmsTable =
- metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName);
+ Assert.assertNotNull(hmsTableParameters().get(TableProperties.UUID));
+ } finally {
+ catalog.dropTable(tableIdentifier);
+ }
+ }
+
+ @Test
+ public void testSnapshotStatsTableProperties() throws Exception {
+ Schema schema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID"),
+ required(2, "data", Types.StringType.get())
+ );
+ TableIdentifier tableIdentifier = TableIdentifier.of(DB_NAME, "tbl");
+ String location = temp.newFolder("tbl").toString();
+
+ try {
+ catalog.buildTable(tableIdentifier, schema)
+ .withLocation(location)
+ .create();
+
+ // check whether parameters are in expected state
+ Map parameters = hmsTableParameters();
+ Assert.assertEquals("0", parameters.get(TableProperties.SNAPSHOT_COUNT));
+ Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_SUMMARY));
+ Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_ID));
+ Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_TIMESTAMP));
+
+ // create a snapshot
+ Table icebergTable = catalog.loadTable(tableIdentifier);
+ String fileName = UUID.randomUUID().toString();
+ DataFile file = DataFiles.builder(icebergTable.spec())
+ .withPath(FileFormat.PARQUET.addExtension(fileName))
+ .withRecordCount(2)
+ .withFileSizeInBytes(0)
+ .build();
+ icebergTable.newFastAppend().appendFile(file).commit();
+
+ // check whether parameters are in expected state
+ parameters = hmsTableParameters();
+ Assert.assertEquals("1", parameters.get(TableProperties.SNAPSHOT_COUNT));
+ String summary = JsonUtil.mapper().writeValueAsString(icebergTable.currentSnapshot().summary());
+ Assert.assertEquals(summary, parameters.get(CURRENT_SNAPSHOT_SUMMARY));
+ long snapshotId = icebergTable.currentSnapshot().snapshotId();
+ Assert.assertEquals(String.valueOf(snapshotId), parameters.get(CURRENT_SNAPSHOT_ID));
+ Assert.assertEquals(String.valueOf(icebergTable.currentSnapshot().timestampMillis()),
+ parameters.get(CURRENT_SNAPSHOT_TIMESTAMP));
- // check parameters are in expected state
- Map parameters = hmsTable.getParameters();
- Assert.assertNotNull(parameters.get(TableProperties.UUID));
} finally {
catalog.dropTable(tableIdentifier);
}
}
+
+ @Test
+ public void testSetSnapshotSummary() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("iceberg.hive.table-property-max-size", "4000");
+ HiveTableOperations ops = new HiveTableOperations(conf, null, null, catalog.name(), DB_NAME, "tbl");
+ Snapshot snapshot = mock(Snapshot.class);
+ Map summary = Maps.newHashMap();
+ when(snapshot.summary()).thenReturn(summary);
+
+ // create a snapshot summary whose json string size is less than the limit
+ for (int i = 0; i < 100; i++) {
+ summary.put(String.valueOf(i), "value");
+ }
+ Assert.assertTrue(JsonUtil.mapper().writeValueAsString(summary).length() < 4000);
+ Map parameters = Maps.newHashMap();
+ ops.setSnapshotSummary(parameters, snapshot);
+ Assert.assertEquals("The snapshot summary must be in parameters", 1, parameters.size());
+
+ // create a snapshot summary whose json string size exceeds the limit
+ for (int i = 0; i < 1000; i++) {
+ summary.put(String.valueOf(i), "value");
+ }
+ long summarySize = JsonUtil.mapper().writeValueAsString(summary).length();
+ // the limit has been updated to 4000 instead of the default value(32672)
+ Assert.assertTrue(summarySize > 4000 && summarySize < 32672);
+ parameters.remove(CURRENT_SNAPSHOT_SUMMARY);
+ ops.setSnapshotSummary(parameters, snapshot);
+ Assert.assertEquals("The snapshot summary must not be in parameters due to the size limit", 0, parameters.size());
+ }
+
+ @Test
+ public void testNotExposeTableProperties() {
+ Configuration conf = new Configuration();
+ conf.set("iceberg.hive.table-property-max-size", "0");
+ HiveTableOperations ops = new HiveTableOperations(conf, null, null, catalog.name(), DB_NAME, "tbl");
+ TableMetadata metadata = mock(TableMetadata.class);
+ Map parameters = Maps.newHashMap();
+ parameters.put(CURRENT_SNAPSHOT_SUMMARY, "summary");
+ parameters.put(CURRENT_SNAPSHOT_ID, "snapshotId");
+ parameters.put(CURRENT_SNAPSHOT_TIMESTAMP, "timestamp");
+ parameters.put(CURRENT_SCHEMA, "schema");
+ parameters.put(DEFAULT_PARTITION_SPEC, "partitionSpec");
+ parameters.put(DEFAULT_SORT_ORDER, "sortOrder");
+
+ ops.setSnapshotStats(metadata, parameters);
+ Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_SUMMARY));
+ Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_ID));
+ Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_TIMESTAMP));
+
+ ops.setSchema(metadata, parameters);
+ Assert.assertNull(parameters.get(CURRENT_SCHEMA));
+
+ ops.setPartitionSpec(metadata, parameters);
+ Assert.assertNull(parameters.get(DEFAULT_PARTITION_SPEC));
+
+ ops.setSortOrder(metadata, parameters);
+ Assert.assertNull(parameters.get(DEFAULT_SORT_ORDER));
+ }
+
+ @Test
+ public void testSetDefaultPartitionSpec() throws Exception {
+ Schema schema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID"),
+ required(2, "data", Types.StringType.get())
+ );
+ TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
+
+ try {
+ Table table = catalog.buildTable(tableIdent, schema).create();
+ Assert.assertFalse("Must not have default partition spec",
+ hmsTableParameters().containsKey(TableProperties.DEFAULT_PARTITION_SPEC));
+
+ table.updateSpec().addField(bucket("data", 16)).commit();
+ Assert.assertEquals(PartitionSpecParser.toJson(table.spec()),
+ hmsTableParameters().get(TableProperties.DEFAULT_PARTITION_SPEC));
+ } finally {
+ catalog.dropTable(tableIdent);
+ }
+ }
+
+ @Test
+ public void testSetCurrentSchema() throws Exception {
+ Schema schema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID"),
+ required(2, "data", Types.StringType.get())
+ );
+ TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
+
+ try {
+ Table table = catalog.buildTable(tableIdent, schema).create();
+
+ Assert.assertEquals(SchemaParser.toJson(table.schema()), hmsTableParameters().get(CURRENT_SCHEMA));
+
+ // add many new fields to make the schema json string exceed the limit
+ UpdateSchema updateSchema = table.updateSchema();
+ for (int i = 0; i < 600; i++) {
+ updateSchema.addColumn("new_col_" + i, Types.StringType.get());
+ }
+ updateSchema.commit();
+
+ Assert.assertTrue(SchemaParser.toJson(table.schema()).length() > 32672);
+ Assert.assertNull(hmsTableParameters().get(CURRENT_SCHEMA));
+ } finally {
+ catalog.dropTable(tableIdent);
+ }
+ }
+
+ private Map hmsTableParameters() throws TException {
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, "tbl");
+ return hmsTable.getParameters();
+ }
+
+ @Test
+ public void testConstructorWarehousePathWithEndSlash() {
+ HiveCatalog catalogWithSlash = new HiveCatalog();
+ String wareHousePath = "s3://bucket/db/tbl";
+
+ catalogWithSlash.initialize("hive_catalog", ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION,
+ wareHousePath + "/"));
+ Assert.assertEquals("Should have trailing slash stripped", wareHousePath, catalogWithSlash.getConf().get(
+ HiveConf.ConfVars.METASTOREWAREHOUSE.varname));
+ }
+
+ @Test
+ public void testTablePropsDefinedAtCatalogLevel() {
+ Schema schema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID")
+ );
+ TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
+
+ ImmutableMap catalogProps = ImmutableMap.of(
+ "table-default.key1", "catalog-default-key1",
+ "table-default.key2", "catalog-default-key2",
+ "table-default.key3", "catalog-default-key3",
+ "table-override.key3", "catalog-override-key3",
+ "table-override.key4", "catalog-override-key4");
+ Catalog hiveCatalog = CatalogUtil.loadCatalog(HiveCatalog.class.getName(),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, catalogProps, hiveConf);
+
+ try {
+ Table table = hiveCatalog.buildTable(tableIdent, schema)
+ .withProperty("key2", "table-key2")
+ .withProperty("key3", "table-key3")
+ .withProperty("key5", "table-key5")
+ .create();
+
+ Assert.assertEquals(
+ "Table defaults set for the catalog must be added to the table properties.",
+ "catalog-default-key1",
+ table.properties().get("key1"));
+ Assert.assertEquals(
+ "Table property must override table default properties set at catalog level.",
+ "table-key2",
+ table.properties().get("key2"));
+ Assert.assertEquals(
+ "Table property override set at catalog level must override table default" +
+ " properties set at catalog level and table property specified.",
+ "catalog-override-key3",
+ table.properties().get("key3"));
+ Assert.assertEquals(
+ "Table override not in table props or defaults should be added to table properties",
+ "catalog-override-key4",
+ table.properties().get("key4"));
+ Assert.assertEquals(
+ "Table properties without any catalog level default or override should be added to table" +
+ " properties.",
+ "table-key5",
+ table.properties().get("key5"));
+ } finally {
+ hiveCatalog.dropTable(tableIdent);
+ }
+ }
}
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 293dd5010cd1..876fefc0ac72 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -44,10 +44,10 @@
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index 1afe98d81b7e..00e0239fea61 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -36,8 +36,8 @@
import org.junit.Assert;
import org.junit.Test;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index b00017726b16..712e993ba58b 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -972,7 +972,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());
if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
- Assert.assertEquals(11, hmsParams.size());
+ Assert.assertEquals(13, hmsParams.size());
Assert.assertEquals("initial_val", hmsParams.get("custom_property"));
Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL"));
Assert.assertEquals("true", hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED));
@@ -1009,7 +1009,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
- Assert.assertEquals(14, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
+ Assert.assertEquals(16, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals("true", hmsParams.get("new_prop_1"));
Assert.assertEquals("false", hmsParams.get("new_prop_2"));
Assert.assertEquals("new_val", hmsParams.get("custom_property"));