Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions iceberg/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@
<property name="format" value="(void setUp\(\))|(void setup\(\))|(void setupStatic\(\))|(void setUpStatic\(\))|(void beforeTest\(\))|(void teardown\(\))|(void tearDown\(\))|(void beforeStatic\(\))|(void afterStatic\(\))"/>
<property name="message" value="Test setup/teardown methods are called before(), beforeClass(), after(), afterClass(), but not setUp, teardown, etc."/>
</module>
<module name="RegexpSinglelineJava">
<property name="ignoreComments" value="true"/>
<property name="format" value="@Test\(.*expected.*\)"/>
<property name="message" value="Prefer using Assertions.assertThatThrownBy(...).isInstanceOf(...) instead."/>
</module>
<module name="RightCurly"> <!-- Java Style Guide: Nonempty blocks: K & R style -->
<property name="option" value="same"/>
<property name="tokens" value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_DO"/>
Expand Down
4 changes: 4 additions & 0 deletions iceberg/iceberg-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -72,12 +74,14 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa
private FileIO fileIO;
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
private Map<String, String> catalogProperties;

public HiveCatalog() {
}

@Override
public void initialize(String inputName, Map<String, String> properties) {
this.catalogProperties = ImmutableMap.copyOf(properties);
this.name = inputName;
if (conf == null) {
LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
Expand All @@ -89,7 +93,8 @@ public void initialize(String inputName, Map<String, String> properties) {
}

if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION)));
}

this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
Expand Down Expand Up @@ -231,15 +236,19 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {

@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
Preconditions.checkArgument(
identifier != null && isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
Preconditions.checkArgument(
metadataFileLocation != null && !metadataFileLocation.isEmpty(),
"Cannot register an empty metadata file location as a table");

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
InputFile metadataFile = ops.io().newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

Expand Down Expand Up @@ -545,6 +554,11 @@ public Configuration getConf() {
return conf;
}

@Override
protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}

@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hive.iceberg.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
Expand All @@ -56,6 +60,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand All @@ -72,6 +77,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries";
private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;

// the max size is based on HMS backend database. For Hive versions below 2.3, the max table parameter size is 4000
// characters, see https://issues.apache.org/jira/browse/HIVE-12274
// set to 0 to not expose Iceberg metadata in HMS Table properties.
private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;

private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things but with different names
GC_ENABLED, "external.table.purge",
Expand Down Expand Up @@ -100,6 +111,7 @@ public static String translateToIcebergProp(String hmsProp) {
private final String database;
private final String tableName;
private final Configuration conf;
private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
private final FileIO fileIO;
private final ClientPool<IMetaStoreClient, TException> metaClients;
Expand All @@ -115,6 +127,7 @@ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO
this.tableName = table;
this.metadataRefreshMaxRetries =
conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
this.maxHiveTablePropertySize = conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
}

@Override
Expand Down Expand Up @@ -250,6 +263,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
} finally {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, commitLock);
}

LOG.info("Committed to table {} with the new metadata location {}", fullName, newMetadataLocation);
}

@VisibleForTesting
Expand Down Expand Up @@ -341,9 +356,84 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM
parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}

setSnapshotStats(metadata, parameters);
setSchema(metadata, parameters);
setPartitionSpec(metadata, parameters);
setSortOrder(metadata, parameters);

tbl.setParameters(parameters);
}

@VisibleForTesting
void setSnapshotStats(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY);

Snapshot currentSnapshot = metadata.currentSnapshot();
if (exposeInHmsProperties() && currentSnapshot != null) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId()));
parameters.put(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, String.valueOf(currentSnapshot.timestampMillis()));
setSnapshotSummary(parameters, currentSnapshot);
}

parameters.put(TableProperties.SNAPSHOT_COUNT, String.valueOf(metadata.snapshots().size()));
}

@VisibleForTesting
void setSnapshotSummary(Map<String, String> parameters, Snapshot currentSnapshot) {
try {
String summary = JsonUtil.mapper().writeValueAsString(currentSnapshot.summary());
if (summary.length() <= maxHiveTablePropertySize) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary);
} else {
LOG.warn("Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters",
currentSnapshot.snapshotId(), maxHiveTablePropertySize);
}
} catch (JsonProcessingException e) {
LOG.warn("Failed to convert current snapshot({}) summary to a json string", currentSnapshot.snapshotId(), e);
}
}

@VisibleForTesting
void setSchema(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SCHEMA);
if (exposeInHmsProperties() && metadata.schema() != null) {
String schema = SchemaParser.toJson(metadata.schema());
setField(parameters, TableProperties.CURRENT_SCHEMA, schema);
}
}

@VisibleForTesting
void setPartitionSpec(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC);
if (exposeInHmsProperties() && metadata.spec() != null && metadata.spec().isPartitioned()) {
String spec = PartitionSpecParser.toJson(metadata.spec());
setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec);
}
}

@VisibleForTesting
void setSortOrder(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
if (exposeInHmsProperties() && metadata.sortOrder() != null && metadata.sortOrder().isSorted()) {
String sortOrder = SortOrderParser.toJson(metadata.sortOrder());
setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder);
}
}

private void setField(Map<String, String> parameters, String key, String value) {
if (value.length() <= maxHiveTablePropertySize) {
parameters.put(key, value);
} else {
LOG.warn("Not exposing {} in HMS since it exceeds {} characters", key, maxHiveTablePropertySize);
}
}

private boolean exposeInHmsProperties() {
return maxHiveTablePropertySize > 0;
}

private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {

final StorageDescriptor storageDescriptor = new StorageDescriptor();
Expand All @@ -364,6 +454,7 @@ private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hive
return storageDescriptor;
}

@SuppressWarnings("ReverseDnsLookup")
@VisibleForTesting
HiveCommitLock createLock() throws UnknownHostException, TException, InterruptedException {
return new HiveCommitLock(conf, metaClients, catalogName, database, tableName);
Expand All @@ -377,8 +468,7 @@ private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadata
io().deleteFile(metadataLocation);
}
} catch (RuntimeException e) {
LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e);
throw e;
LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e);
} finally {
doUnlock(lock);
}
Expand Down
Loading