diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index a7e11e1a7b01..791871b810dc 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -218,12 +218,15 @@ private Map summary(TableMetadata previous) { } } else { // if there was no previous snapshot, default the summary to start totals at 0 - previousSummary = ImmutableMap.of( - SnapshotSummary.TOTAL_RECORDS_PROP, "0", - SnapshotSummary.TOTAL_DATA_FILES_PROP, "0", - SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0", - SnapshotSummary.TOTAL_POS_DELETES_PROP, "0", - SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0"); + ImmutableMap.Builder summaryBuilder = ImmutableMap.builder(); + summaryBuilder + .put(SnapshotSummary.TOTAL_RECORDS_PROP, "0") + .put(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "0") + .put(SnapshotSummary.TOTAL_DATA_FILES_PROP, "0") + .put(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0") + .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0") + .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0"); + previousSummary = summaryBuilder.build(); } ImmutableMap.Builder builder = ImmutableMap.builder(); @@ -234,6 +237,9 @@ private Map summary(TableMetadata previous) { updateTotal( builder, previousSummary, SnapshotSummary.TOTAL_RECORDS_PROP, summary, SnapshotSummary.ADDED_RECORDS_PROP, SnapshotSummary.DELETED_RECORDS_PROP); + updateTotal( + builder, previousSummary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, + summary, SnapshotSummary.ADDED_FILE_SIZE_PROP, SnapshotSummary.REMOVED_FILE_SIZE_PROP); updateTotal( builder, previousSummary, SnapshotSummary.TOTAL_DATA_FILES_PROP, summary, SnapshotSummary.ADDED_FILES_PROP, SnapshotSummary.DELETED_FILES_PROP); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index 834fa0b95456..2f1a6e25e5c3 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -38,6 +38,7 @@ public class SnapshotSummary { public static final String TOTAL_RECORDS_PROP = "total-records"; public static final String ADDED_FILE_SIZE_PROP = "added-files-size"; public static final String REMOVED_FILE_SIZE_PROP = "removed-files-size"; + public static final String TOTAL_FILE_SIZE_PROP = "total-files-size"; public static final String ADDED_POS_DELETES_PROP = "added-position-deletes"; public static final String REMOVED_POS_DELETES_PROP = "removed-position-deletes"; public static final String TOTAL_POS_DELETES_PROP = "total-position-deletes"; diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java new file mode 100644 index 000000000000..f1b0fdb35b3a --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSnapshotSummary extends TableTestBase { + public TestSnapshotSummary(int formatVersion) { + super(formatVersion); + } + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] { 1, 2 }; + } + + @Test + public void testFileSizeSummary() { + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + // fast append + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + Map summary = table.currentSnapshot().summary(); + Assert.assertEquals("10", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP)); + Assert.assertNull(summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)); + Assert.assertEquals("10", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + + // merge append + table.newAppend() + .appendFile(FILE_B) + .commit(); + summary = table.currentSnapshot().summary(); + Assert.assertEquals("10", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP)); + Assert.assertNull(summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)); + Assert.assertEquals("20", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + + table.newOverwrite() + .deleteFile(FILE_A) + .deleteFile(FILE_B) + .addFile(FILE_C) + .addFile(FILE_D) + .addFile(FILE_D) + .commit(); + summary = table.currentSnapshot().summary(); + Assert.assertEquals("30", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP)); + Assert.assertEquals("20", summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)); + Assert.assertEquals("30", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + + table.newDelete() + .deleteFile(FILE_C) + .deleteFile(FILE_D) + .commit(); + summary = table.currentSnapshot().summary(); + Assert.assertNull(summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP)); + Assert.assertEquals("20", summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)); + Assert.assertEquals("10", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + } +} diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 51a16f508778..2a820061064f 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynMethods; @@ -196,7 +198,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { .collect(Collectors.toSet()); } - setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), removedProps, hiveEngineEnabled); + Map summary = Optional.ofNullable(metadata.currentSnapshot()) + .map(Snapshot::summary) + .orElseGet(ImmutableMap::of); + setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), removedProps, hiveEngineEnabled, summary); persistTable(tbl, updateHiveTable); threw = false; @@ -268,7 +273,8 @@ private Table newHmsTable() { } private void setHmsTableParameters(String newMetadataLocation, Table tbl, Map icebergTableProps, - Set obsoleteProps, boolean hiveEngineEnabled) { + Set obsoleteProps, boolean hiveEngineEnabled, + Map summary) { Map parameters = tbl.getParameters(); if (parameters == null) { @@ -296,6 +302,17 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, Map IGNORED_PARAMS = - ImmutableSet.of("bucketing_version", StatsSetupConst.ROW_COUNT, - StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE, StatsSetupConst.NUM_FILES, "numFilesErasureCoded"); + private static final Set IGNORED_PARAMS = ImmutableSet.of("bucketing_version", "numFilesErasureCoded"); @Parameters(name = "catalog={0}") public static Collection parameters() { @@ -485,7 +485,7 @@ public void testCreateTableWithoutColumnComments() { } @Test - public void testIcebergAndHmsTableProperties() throws TException, InterruptedException { + public void testIcebergAndHmsTableProperties() throws Exception { TableIdentifier identifier = TableIdentifier.of("default", "customers"); shell.executeStatement(String.format("CREATE EXTERNAL TABLE default.customers " + @@ -576,14 +576,22 @@ public void testIcebergAndHmsTableProperties() throws TException, InterruptedExc .remove("custom_property") .remove("new_prop_1") .commit(); - hmsParams = shell.metastore().getTable("default", "customers").getParameters() - .entrySet().stream() - .filter(e -> !IGNORED_PARAMS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + hmsParams = shell.metastore().getTable("default", "customers").getParameters(); Assert.assertFalse(hmsParams.containsKey("custom_property")); Assert.assertFalse(hmsParams.containsKey("new_prop_1")); Assert.assertTrue(hmsParams.containsKey("new_prop_2")); } + + // append some data and check whether HMS stats are aligned with snapshot summary + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + List records = HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS; + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, FileFormat.PARQUET, null, records); + hmsParams = shell.metastore().getTable("default", "customers").getParameters(); + Map summary = icebergTable.currentSnapshot().summary(); + Assert.assertEquals(summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP), hmsParams.get(StatsSetupConst.NUM_FILES)); + Assert.assertEquals(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP), hmsParams.get(StatsSetupConst.ROW_COUNT)); + Assert.assertEquals(summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP), hmsParams.get(StatsSetupConst.TOTAL_SIZE)); + } } @Test