Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
dc20b7f
Port: Hive: Expose Snapshot Stats in HMS (apache#4456)
flyrain Apr 8, 2022
c27c4d3
Port: Hive: Expose default partition spec and sort order in HMS (apac…
flyrain Apr 26, 2022
f679290
Port: Hive: Log new metadata location in commit (apache#4681)
flyrain May 4, 2022
16ed540
Port: Core, AWS: Remove throw in finally block (apache#5222)
nastra Jul 12, 2022
f61ac71
Fix failing qtets - first one
zsmiskolczi Jan 4, 2023
a872434
Rebase based on the Iceberg 1.1.0 ugprade on master
zsmiskolczi Jan 4, 2023
179dacb
Fix whitespaces ++
zsmiskolczi Jan 5, 2023
fae44b0
Try to fix whitespaces N+1
zsmiskolczi Jan 5, 2023
6989620
Flink: Add SupportsRowPosition to Avro reader to fix position deletes…
openinx Nov 14, 2021
55a6590
Core: Deprecate functions in DeleteWriters (#5771)
Sep 28, 2022
14a32be
Core: Reuse PositionDelete (#5896)
nastra Oct 4, 2022
051cfa9
API: Provide better error message for invalid FileFormat enum (#5918)
nastra Oct 11, 2022
2c40600
Core: Use CharSequenceSet instead of Set<CharSequence> (#2712)
nastra Jun 22, 2021
d9e3b50
Hive: Hadoop Path fails on s3 endpoint (#5405)
Fokko Aug 4, 2022
8cbd477
Core: Support deleting tables without metadata files (#5510)
yabola Sep 1, 2022
e26b6f9
Port: Core: Strip trailing slashes in location consistently (#4585)
singhpk234 Apr 20, 2022
f4e0e4e
Port: Core: Allow controlling table properties through catalog config
SinghAsDev May 17, 2022
0ef7166
Port: Core: Use ImmutableMap for catalog properties
zsmiskolczi Jan 15, 2023
b27f36a
Port: Core: Implement BaseMetastoreCatalog.registerTable()
Mehul2500 Jul 22, 2022
065cc65
Fix failing qtest
zsmiskolczi Jan 18, 2023
5845317
Fix failing test
zsmiskolczi Jan 18, 2023
9fdb7bd
Fix qtest outputs
zsmiskolczi Feb 16, 2023
181a428
Fix failing qtests
zsmiskolczi Feb 20, 2023
7cc91e6
Fix failing qtests n+1
zsmiskolczi Feb 20, 2023
5d257f5
Failing qtest - remove tailing whitespaces
zsmiskolczi Feb 21, 2023
df80d40
Merge branch 'master' into HIVE-26808_Port_Iceberg_catalog_changes
InvisibleProgrammer Feb 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/LocationUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.util;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class LocationUtil {
private LocationUtil() {

}

public static String stripTrailingSlash(String path) {
Preconditions.checkArgument(path != null && path.length() > 0, "path must not be null or empty");

String result = path;
while (result.endsWith("/")) {
result = result.substring(0, result.length() - 1);
}
return result;
}
}
6 changes: 6 additions & 0 deletions iceberg/iceberg-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,27 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -72,12 +72,14 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa
private FileIO fileIO;
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
private Map<String, String> catalogProperties;

public HiveCatalog() {
}

@Override
public void initialize(String inputName, Map<String, String> properties) {
this.catalogProperties = ImmutableMap.copyOf(properties);
this.name = inputName;
if (conf == null) {
LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
Expand All @@ -89,7 +91,8 @@ public void initialize(String inputName, Map<String, String> properties) {
}

if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION)));
}

this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
Expand Down Expand Up @@ -152,11 +155,16 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
String database = identifier.namespace().level(0);

TableOperations ops = newTableOps(identifier);
TableMetadata lastMetadata;
if (purge && ops.current() != null) {
lastMetadata = ops.current();
} else {
lastMetadata = null;
TableMetadata lastMetadata = null;
if (purge) {
try {
lastMetadata = ops.current();
} catch (NotFoundException e) {
LOG.warn(
"Failed to load table metadata for table: {}, continuing drop without purge",
identifier,
e);
}
}

try {
Expand Down Expand Up @@ -229,23 +237,6 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
}
}

@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, identifier.toString());
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -465,19 +456,17 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
throw new RuntimeException("Interrupted during commit", e);
}

// Otherwise stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path
String warehouseLocation = getWarehouseLocation();
return String.format(
"%s/%s.db/%s",
warehouseLocation,
tableIdentifier.namespace().levels()[0],
tableIdentifier.name());
// Otherwise, stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path
String databaseLocation = databaseLocation(tableIdentifier.namespace().levels()[0]);
return String.format("%s/%s", databaseLocation, tableIdentifier.name());
}

private String getWarehouseLocation() {
private String databaseLocation(String databaseName) {
String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
Preconditions.checkNotNull(warehouseLocation, "Warehouse location is not set: hive.metastore.warehouse.dir=null");
return warehouseLocation;
Preconditions.checkNotNull(
warehouseLocation, "Warehouse location is not set: hive.metastore.warehouse.dir=null");
warehouseLocation = LocationUtil.stripTrailingSlash(warehouseLocation);
return String.format("%s/%s.db", warehouseLocation, databaseName);
}

private String getExternalWarehouseLocation() {
Expand Down Expand Up @@ -510,7 +499,7 @@ Database convertToDatabase(Namespace namespace, Map<String, String> meta) {

database.setName(namespace.level(0));
database.setLocationUri(new Path(getExternalWarehouseLocation(), namespace.level(0)).toString() + ".db");
database.setManagedLocationUri(new Path(getWarehouseLocation(), namespace.level(0)).toString() + ".db");
database.setManagedLocationUri(databaseLocation(namespace.level(0)));

meta.forEach((key, value) -> {
if (key.equals("comment")) {
Expand Down Expand Up @@ -545,6 +534,11 @@ public Configuration getConf() {
return conf;
}

@Override
protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}

@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hive.iceberg.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
Expand All @@ -56,6 +60,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand All @@ -72,6 +77,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries";
private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;

// the max size is based on HMS backend database. For Hive versions below 2.3, the max table parameter size is 4000
// characters, see https://issues.apache.org/jira/browse/HIVE-12274
// set to 0 to not expose Iceberg metadata in HMS Table properties.
private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;

private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things but with different names
GC_ENABLED, "external.table.purge",
Expand Down Expand Up @@ -100,6 +111,7 @@ public static String translateToIcebergProp(String hmsProp) {
private final String database;
private final String tableName;
private final Configuration conf;
private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
private final FileIO fileIO;
private final ClientPool<IMetaStoreClient, TException> metaClients;
Expand All @@ -115,6 +127,7 @@ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO
this.tableName = table;
this.metadataRefreshMaxRetries =
conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
this.maxHiveTablePropertySize = conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
}

@Override
Expand Down Expand Up @@ -250,6 +263,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
} finally {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, commitLock);
}

LOG.info("Committed to table {} with the new metadata location {}", fullName, newMetadataLocation);
}

@VisibleForTesting
Expand Down Expand Up @@ -341,9 +356,84 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM
parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}

setSnapshotStats(metadata, parameters);
setSchema(metadata, parameters);
setPartitionSpec(metadata, parameters);
setSortOrder(metadata, parameters);

tbl.setParameters(parameters);
}

@VisibleForTesting
void setSnapshotStats(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY);

Snapshot currentSnapshot = metadata.currentSnapshot();
if (exposeInHmsProperties() && currentSnapshot != null) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId()));
parameters.put(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, String.valueOf(currentSnapshot.timestampMillis()));
setSnapshotSummary(parameters, currentSnapshot);
}

parameters.put(TableProperties.SNAPSHOT_COUNT, String.valueOf(metadata.snapshots().size()));
}

@VisibleForTesting
void setSnapshotSummary(Map<String, String> parameters, Snapshot currentSnapshot) {
try {
String summary = JsonUtil.mapper().writeValueAsString(currentSnapshot.summary());
if (summary.length() <= maxHiveTablePropertySize) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary);
} else {
LOG.warn("Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters",
currentSnapshot.snapshotId(), maxHiveTablePropertySize);
}
} catch (JsonProcessingException e) {
LOG.warn("Failed to convert current snapshot({}) summary to a json string", currentSnapshot.snapshotId(), e);
}
}

@VisibleForTesting
void setSchema(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SCHEMA);
if (exposeInHmsProperties() && metadata.schema() != null) {
String schema = SchemaParser.toJson(metadata.schema());
setField(parameters, TableProperties.CURRENT_SCHEMA, schema);
}
}

@VisibleForTesting
void setPartitionSpec(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC);
if (exposeInHmsProperties() && metadata.spec() != null && metadata.spec().isPartitioned()) {
String spec = PartitionSpecParser.toJson(metadata.spec());
setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec);
}
}

@VisibleForTesting
void setSortOrder(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
if (exposeInHmsProperties() && metadata.sortOrder() != null && metadata.sortOrder().isSorted()) {
String sortOrder = SortOrderParser.toJson(metadata.sortOrder());
setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder);
}
}

private void setField(Map<String, String> parameters, String key, String value) {
if (value.length() <= maxHiveTablePropertySize) {
parameters.put(key, value);
} else {
LOG.warn("Not exposing {} in HMS since it exceeds {} characters", key, maxHiveTablePropertySize);
}
}

private boolean exposeInHmsProperties() {
return maxHiveTablePropertySize > 0;
}

private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {

final StorageDescriptor storageDescriptor = new StorageDescriptor();
Expand Down Expand Up @@ -377,8 +467,7 @@ private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadata
io().deleteFile(metadataLocation);
}
} catch (RuntimeException e) {
LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e);
throw e;
LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e);
} finally {
doUnlock(lock);
}
Expand Down
Loading