Skip to content
Merged
32 changes: 20 additions & 12 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,40 +41,45 @@ private TableProperties() {
public static final String FORMAT_VERSION = "format-version";

/**
* Reserved table property for UUID.
* <p>
* This reserved property is used to store the UUID of the table.
* Reserved table property for table UUID.
*/
public static final String UUID = "uuid";

/**
* Reserved table property for the total number of snapshots.
* <p>
* This reserved property is used to store the total number of snapshots.
*/
public static final String SNAPSHOT_COUNT = "snapshot-count";

/**
* Reserved table property for current snapshot summary.
* <p>
* This reserved property is used to store the current snapshot summary.
*/
public static final String CURRENT_SNAPSHOT_SUMMARY = "current-snapshot-summary";

/**
* Reserved table property for current snapshot id.
* <p>
* This reserved property is used to store the current snapshot id.
*/
public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";

/**
* Reserved table property for current snapshot timestamp.
* <p>
* This reserved property is used to store the current snapshot timestamp.
*/
public static final String CURRENT_SNAPSHOT_TIMESTAMP = "current-snapshot-timestamp-ms";

/**
* Reserved table property for the JSON representation of current schema.
*/
public static final String CURRENT_SCHEMA = "current-schema";

/**
* Reserved table property for the JSON representation of current(default) partition spec.
*/
public static final String DEFAULT_PARTITION_SPEC = "default-partition-spec";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to clarify what is meant by default for both of these.

Each javadoc states that it’s for the “default” spec / sort order, but I’m still not entirely sure what that means.

Is it the current sort order / partition spec that would be used if the user doesn’t override it for an individual query ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like “JSON representation of the table’s current configured partition spec, which will be used if not overridden for individual writes”. Kind of wordy but something along those lines would be helpful for me if quickly looking through the JavaDocs etc. Will leave that decision to you though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is for the current partition spec and sort order. Make sense to me. Will make the change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you reverted too many? Should be 'current'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to be consistent with the name in metadata.json, which is default-partition-spec. The same for sort order. I changed the comments though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see, yea I always find it confusing.

In the comment, maybe we can add that they are equivalent, otherwise the comment is even more confusing:

Reserved table property for the JSON representation of current (default) schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing. I like the current more. I keep the original name just for consistency.


/**
* Reserved table property for the JSON representation of current(default) sort order.
*/
public static final String DEFAULT_SORT_ORDER = "default-sort-order";

/**
* Reserved Iceberg table properties list.
* <p>
Expand All @@ -87,7 +92,10 @@ private TableProperties() {
SNAPSHOT_COUNT,
CURRENT_SNAPSHOT_ID,
CURRENT_SNAPSHOT_SUMMARY,
CURRENT_SNAPSHOT_TIMESTAMP
CURRENT_SNAPSHOT_TIMESTAMP,
CURRENT_SCHEMA,
DEFAULT_PARTITION_SPEC,
DEFAULT_SORT_ORDER
);

public static final String COMMIT_NUM_RETRIES = "commit.retry.num-retries";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
Copy link
Member

@szehon-ho szehon-ho Apr 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two other suggestion for this class: can we add in comment of "HIVE_TABLE_PROPERTY_MAX_SIZE" , one more sentence to let user know how to turn off feature?

// set to 0 to not expose Iceberg metadata in HMS Table properties

And also, a precondition in HiveTableOperations constructor to check if value is non-negative.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comment. Negative is fine, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think probably better to disallow negative as it makes little sense? But to me its ok either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would throwing exception be too much in that case? May just log a warning.

import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
Expand Down Expand Up @@ -399,6 +402,9 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM
}

setSnapshotStats(metadata, parameters);
setSchema(metadata, parameters);
setPartitionSpec(metadata, parameters);
setSortOrder(metadata, parameters);

tbl.setParameters(parameters);
}
Expand Down Expand Up @@ -433,6 +439,38 @@ void setSnapshotSummary(Map<String, String> parameters, Snapshot currentSnapshot
}
}

private void setSchema(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SCHEMA);
if (metadata.schema() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets' have common method and re-use to set various fields like

private void setField(TableMetadata metadata, Map<String, String> parameters, String key, String value) {
   parameters.remove(key);
   if (value.length <= maxHiveTablepropertySize) {
      parameters.put(key, value);
   } else {
      LOG.warn("Not exposing {} in HMS since it exceeds {} characters", maxHiveTablePropertySize);
   }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change.

String schema = SchemaParser.toJson(metadata.schema());
setField(parameters, TableProperties.CURRENT_SCHEMA, schema);
}
}

private void setPartitionSpec(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC);
if (metadata.spec() != null && metadata.spec().isPartitioned()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] In v1 spec when a partition is dropped it is replaced by VoidTransform, if all the transforms are void we should consider it un-partitioned (This may be beyond the scope of present PR), but presently when we call isPartitioned it will return true in this case. do we want to store partition spec in this scenario ? Your thoughts.

This is based on ticket #3014 @RussellSpitzer filed a while back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @singhpk234 for pointing out. #3059 is trying to fix #3014, and it is almost ready to merge. It should be fine in that case.

String spec = PartitionSpecParser.toJson(metadata.spec());
setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec);
}
}

private void setSortOrder(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
if (metadata.sortOrder() != null && metadata.sortOrder().isSorted()) {
String sortOrder = SortOrderParser.toJson(metadata.sortOrder());
setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder);
}
}

private void setField(Map<String, String> parameters, String key, String value) {
Copy link
Member

@szehon-ho szehon-ho Apr 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One performance suggestion, if the user sets to 0 (disable this feature), we can skip the serialization for performance.

Maybe , easiest, we can we add some boolean function like exposeInHmsProperties() that checks if value is 0, and use it in all the methods? (open to better names)

    if (exposeInHmsProperties() && metadata.sortOrder() != null && metadata.sortOrder().isSorted()) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting it to 0 can control some of them(snapshot summary, schema, partition spec, and sort order), but not all of them. I'd suggest to create another PR to make sure all are taken care.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, but could we at least take care of the ones in this PR? ( schema, partition spec, sort order)? We can have a follow up for the other ones not touched by this PR.

Just didn't want to leave it in a state where we are wasting CPU cycle (JSON serialization) needlessly if the user turns off this feature. As this is done in the critical commit block, unlike the original serialization which happens before. The other HMS table properties to me are also less CPU intensive as they are just getting a field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, made the change.

if (value.length() <= maxHiveTablePropertySize) {
parameters.put(key, value);
} else {
LOG.warn("Not exposing {} in HMS since it exceeds {} characters", key, maxHiveTablePropertySize);
}
Comment on lines +471 to +476
Copy link
Contributor

@singhpk234 singhpk234 Apr 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] should we make this :

if (summary.length() <= maxHiveTablePropertySize) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary);
} else {
LOG.warn("Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters",
currentSnapshot.snapshotId(), maxHiveTablePropertySize);
}

also use this setter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was trying to do that, but the warn message needs snapshot id here. But it requires changes for method setFiled() like the below, and changes for all other callers. It's like removing one duplication, but adding a few complication. I'd suggest to keep it as is.

setField(Map<String, String> parameters, String key, String value, String warnMessage) 

}

private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {

final StorageDescriptor storageDescriptor = new StorageDescriptor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -60,6 +64,8 @@

import static org.apache.iceberg.NullOrder.NULLS_FIRST;
import static org.apache.iceberg.SortDirection.ASC;
import static org.apache.iceberg.TableProperties.DEFAULT_SORT_ORDER;
import static org.apache.iceberg.expressions.Expressions.bucket;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -232,7 +238,7 @@ public void testReplaceTxnBuilder() throws Exception {
}

@Test
public void testCreateTableDefaultSortOrder() {
public void testCreateTableDefaultSortOrder() throws Exception {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
Expand All @@ -246,13 +252,16 @@ public void testCreateTableDefaultSortOrder() {
Table table = catalog.createTable(tableIdent, schema, spec);
Assert.assertEquals("Order ID must match", 0, table.sortOrder().orderId());
Assert.assertTrue("Order must unsorted", table.sortOrder().isUnsorted());

Assert.assertFalse("Must not have default sort order in catalog",
hmsTableParameters().containsKey(DEFAULT_SORT_ORDER));
} finally {
catalog.dropTable(tableIdent);
}
}

@Test
public void testCreateTableCustomSortOrder() {
public void testCreateTableCustomSortOrder() throws Exception {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
Expand All @@ -277,6 +286,8 @@ public void testCreateTableCustomSortOrder() {
Assert.assertEquals("Null order must match ", NULLS_FIRST, sortOrder.fields().get(0).nullOrder());
Transform<?, ?> transform = Transforms.identity(Types.IntegerType.get());
Assert.assertEquals("Transform must match", transform, sortOrder.fields().get(0).transform());

Assert.assertEquals(SortOrderParser.toJson(table.sortOrder()), hmsTableParameters().get(DEFAULT_SORT_ORDER));
} finally {
catalog.dropTable(tableIdent);
}
Expand Down Expand Up @@ -469,13 +480,7 @@ public void testUUIDinTableProperties() throws Exception {
.withLocation(location)
.create();

String tableName = tableIdentifier.name();
org.apache.hadoop.hive.metastore.api.Table hmsTable =
metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName);

// check parameters are in expected state
Map<String, String> parameters = hmsTable.getParameters();
Assert.assertNotNull(parameters.get(TableProperties.UUID));
Assert.assertNotNull(hmsTableParameters().get(TableProperties.UUID));
} finally {
catalog.dropTable(tableIdentifier);
}
Expand All @@ -495,12 +500,8 @@ public void testSnapshotStatsTableProperties() throws Exception {
.withLocation(location)
.create();

String tableName = tableIdentifier.name();
org.apache.hadoop.hive.metastore.api.Table hmsTable =
metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName);

// check whether parameters are in expected state
Map<String, String> parameters = hmsTable.getParameters();
Map<String, String> parameters = hmsTableParameters();
Assert.assertEquals("0", parameters.get(TableProperties.SNAPSHOT_COUNT));
Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_SUMMARY));
Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_ID));
Expand All @@ -517,8 +518,7 @@ public void testSnapshotStatsTableProperties() throws Exception {
icebergTable.newFastAppend().appendFile(file).commit();

// check whether parameters are in expected state
hmsTable = metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName);
parameters = hmsTable.getParameters();
parameters = hmsTableParameters();
Assert.assertEquals("1", parameters.get(TableProperties.SNAPSHOT_COUNT));
String summary = JsonUtil.mapper().writeValueAsString(icebergTable.currentSnapshot().summary());
Assert.assertEquals(summary, parameters.get(TableProperties.CURRENT_SNAPSHOT_SUMMARY));
Expand Down Expand Up @@ -562,6 +562,60 @@ public void testSetSnapshotSummary() throws Exception {
Assert.assertEquals("The snapshot summary must not be in parameters due to the size limit", 0, parameters.size());
}

@Test
public void testSetDefaultPartitionSpec() throws Exception {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
);
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

try {
Table table = catalog.buildTable(tableIdent, schema).create();
Assert.assertFalse("Must not have default partition spec",
hmsTableParameters().containsKey(TableProperties.DEFAULT_PARTITION_SPEC));

table.updateSpec().addField(bucket("data", 16)).commit();
Assert.assertEquals(PartitionSpecParser.toJson(table.spec()),
hmsTableParameters().get(TableProperties.DEFAULT_PARTITION_SPEC));
} finally {
catalog.dropTable(tableIdent);
}
}

@Test
public void testSetCurrentSchema() throws Exception {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
);
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

try {
Table table = catalog.buildTable(tableIdent, schema).create();

Assert.assertEquals(SchemaParser.toJson(table.schema()),
hmsTableParameters().get(TableProperties.CURRENT_SCHEMA));

// add many new fields to make the schema json string exceed the limit
UpdateSchema updateSchema = table.updateSchema();
for (int i = 0; i < 600; i++) {
updateSchema.addColumn("new_col_" + i, Types.StringType.get());
}
updateSchema.commit();

Assert.assertTrue(SchemaParser.toJson(table.schema()).length() > 32672);
Assert.assertNull(hmsTableParameters().get(TableProperties.CURRENT_SCHEMA));
} finally {
catalog.dropTable(tableIdent);
}
}

private Map<String, String> hmsTableParameters() throws TException {
org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, "tbl");
return hmsTable.getParameters();
}

@Test
public void testConstructorWarehousePathWithEndSlash() {
HiveCatalog catalogWithSlash = new HiveCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Assert.assertEquals(12, hmsParams.size());
Assert.assertEquals(13, hmsParams.size());
Assert.assertEquals("initial_val", hmsParams.get("custom_property"));
Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL"));
Expand Down Expand Up @@ -662,7 +662,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Assert.assertEquals(15, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals(16, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals("true", hmsParams.get("new_prop_1"));
Assert.assertEquals("false", hmsParams.get("new_prop_2"));
Assert.assertEquals("new_val", hmsParams.get("custom_property"));
Expand Down