diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java index 2ae01dad284a..3e7a55d0117b 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java +++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java @@ -29,6 +29,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SortOrderUtil; import org.slf4j.Logger; @@ -36,6 +38,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE; import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT; +import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS; +import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT; import static org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX; @Immutable @@ -45,9 +49,8 @@ public final class MetricsConfig implements Serializable { private static final Joiner DOT = Joiner.on('.'); // Disable metrics by default for wide tables to prevent excessive metadata - private static final int MAX_COLUMNS = 32; - private static final MetricsConfig DEFAULT = new MetricsConfig(ImmutableMap.of(), - MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT)); + private static final MetricsMode DEFAULT_MODE = MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT); + private static final MetricsConfig DEFAULT = new MetricsConfig(ImmutableMap.of(), DEFAULT_MODE); private final Map columnModes; private final MetricsMode defaultMode; @@ -96,7 +99,7 @@ static Map updateProperties(Map props, List props) { - return from(props, null, DEFAULT_WRITE_METRICS_MODE_DEFAULT); + return from(props, null, null); } /** @@ -104,13 +107,7 @@ public static MetricsConfig fromProperties(Map props) { * @param table iceberg table */ public static MetricsConfig forTable(Table table) { - String defaultMode; - if (table.schema().columns().size() <= MAX_COLUMNS) { - defaultMode = DEFAULT_WRITE_METRICS_MODE_DEFAULT; - } else { - defaultMode = MetricsModes.None.get().toString(); - } - return from(table.properties(), table.sortOrder(), defaultMode); + return from(table.properties(), table.schema(), table.sortOrder()); } /** @@ -136,50 +133,55 @@ public static MetricsConfig forPositionDelete(Table table) { } /** - * Generate a MetricsConfig for all columns based on overrides, sortOrder, and defaultMode. + * Generate a MetricsConfig for all columns based on overrides, schema, and sort order. + * * @param props will be read for metrics overrides (write.metadata.metrics.column.*) and default * (write.metadata.metrics.default) + * @param schema table schema * @param order sort order columns, will be promoted to truncate(16) - * @param defaultMode default, if not set by user property * @return metrics configuration */ - private static MetricsConfig from(Map props, SortOrder order, String defaultMode) { + private static MetricsConfig from(Map props, Schema schema, SortOrder order) { + int maxInferredDefaultColumns = maxInferredColumnDefaults(props); Map columnModes = Maps.newHashMap(); // Handle user override of default mode - MetricsMode finalDefaultMode; - String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, defaultMode); - try { - finalDefaultMode = MetricsModes.fromString(defaultModeAsString); - } catch (IllegalArgumentException err) { - // User override was invalid, log the error and use the default - LOG.warn("Ignoring invalid default metrics mode: {}", defaultModeAsString, err); - finalDefaultMode = MetricsModes.fromString(defaultMode); + MetricsMode defaultMode; + String configuredDefault = props.get(DEFAULT_WRITE_METRICS_MODE); + if (configuredDefault != null) { + // a user-configured default mode is applied for all columns + defaultMode = parseMode(configuredDefault, DEFAULT_MODE, "default"); + + } else if (schema == null || schema.columns().size() <= maxInferredDefaultColumns) { + // there are less than the inferred limit, so the default is used everywhere + defaultMode = DEFAULT_MODE; + + } else { + // an inferred default mode is applied to the first few columns, up to the limit + Schema subSchema = new Schema(schema.columns().subList(0, maxInferredDefaultColumns)); + for (Integer id : TypeUtil.getProjectedIds(subSchema)) { + columnModes.put(subSchema.findColumnName(id), DEFAULT_MODE); + } + + // all other columns don't use metrics + defaultMode = MetricsModes.None.get(); } // First set sorted column with sorted column default (can be overridden by user) - MetricsMode sortedColDefaultMode = sortedColumnDefaultMode(finalDefaultMode); + MetricsMode sortedColDefaultMode = sortedColumnDefaultMode(defaultMode); Set sortedCols = SortOrderUtil.orderPreservingSortedColumns(order); sortedCols.forEach(sc -> columnModes.put(sc, sortedColDefaultMode)); // Handle user overrides of defaults - MetricsMode finalDefaultModeVal = finalDefaultMode; - props.keySet().stream() - .filter(key -> key.startsWith(METRICS_MODE_COLUMN_CONF_PREFIX)) - .forEach(key -> { - String columnAlias = key.replaceFirst(METRICS_MODE_COLUMN_CONF_PREFIX, ""); - MetricsMode mode; - try { - mode = MetricsModes.fromString(props.get(key)); - } catch (IllegalArgumentException err) { - // Mode was invalid, log the error and use the default - LOG.warn("Ignoring invalid metrics mode for column {}: {}", columnAlias, props.get(key), err); - mode = finalDefaultModeVal; - } - columnModes.put(columnAlias, mode); - }); + for (String key : props.keySet()) { + if (key.startsWith(METRICS_MODE_COLUMN_CONF_PREFIX)) { + String columnAlias = key.replaceFirst(METRICS_MODE_COLUMN_CONF_PREFIX, ""); + MetricsMode mode = parseMode(props.get(key), defaultMode, "column " + columnAlias); + columnModes.put(columnAlias, mode); + } + } - return new MetricsConfig(columnModes, finalDefaultMode); + return new MetricsConfig(columnModes, defaultMode); } /** @@ -195,6 +197,29 @@ private static MetricsMode sortedColumnDefaultMode(MetricsMode defaultMode) { } } + private static int maxInferredColumnDefaults(Map properties) { + int maxInferredDefaultColumns = PropertyUtil.propertyAsInt(properties, + METRICS_MAX_INFERRED_COLUMN_DEFAULTS, METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT); + if (maxInferredDefaultColumns < 0) { + LOG.warn("Invalid value for {} (negative): {}, falling back to {}", + METRICS_MAX_INFERRED_COLUMN_DEFAULTS, maxInferredDefaultColumns, + METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT); + return METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT; + } else { + return maxInferredDefaultColumns; + } + } + + private static MetricsMode parseMode(String modeString, MetricsMode fallback, String context) { + try { + return MetricsModes.fromString(modeString); + } catch (IllegalArgumentException err) { + // User override was invalid, log the error and use the default + LOG.warn("Ignoring invalid metrics mode ({}): {}", context, modeString, err); + return fallback; + } + } + public void validateReferencedColumns(Schema schema) { for (String column : columnModes.keySet()) { ValidationException.check( diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index e7b309f81d40..3cc520faa5a5 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -267,6 +267,10 @@ private TableProperties() { public static final String METADATA_DELETE_AFTER_COMMIT_ENABLED = "write.metadata.delete-after-commit.enabled"; public static final boolean METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT = false; + public static final String METRICS_MAX_INFERRED_COLUMN_DEFAULTS = + "write.metadata.metrics.max-inferred-column-defaults"; + public static final int METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT = 32; + public static final String METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column."; public static final String DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"; public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"; diff --git a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java index 8660f81aafd5..939811067619 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java +++ b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.io.File; +import java.io.IOException; import java.util.Map; import org.apache.iceberg.MetricsModes.Counts; import org.apache.iceberg.MetricsModes.Full; @@ -160,4 +161,28 @@ public void testMetricsConfigSortedColsDefaultByInvalid() throws Exception { Assert.assertEquals("Original default applies as user entered invalid mode for sorted column", Counts.get(), config.columnMode("col2")); } + + @Test + public void testMetricsConfigInferredDefaultModeLimit() throws IOException { + Schema schema = new Schema( + required(1, "col1", Types.IntegerType.get()), + required(2, "col2", Types.IntegerType.get()), + required(3, "col3", Types.IntegerType.get()) + ); + + File tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); + + Table table = TestTables.create( + tableDir, "test", schema, PartitionSpec.unpartitioned(), SortOrder.unsorted(), formatVersion); + + // only infer a default for the first two columns + table.updateProperties().set(TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS, "2").commit(); + + MetricsConfig config = MetricsConfig.forTable(table); + + Assert.assertEquals("Should use default mode for col1", Truncate.withLength(16), config.columnMode("col1")); + Assert.assertEquals("Should use default mode for col2", Truncate.withLength(16), config.columnMode("col2")); + Assert.assertEquals("Should use None for col3", None.get(), config.columnMode("col3")); + } } diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java index 86a9eaa0be22..c1575b46743b 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java +++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java @@ -237,12 +237,24 @@ public void testMaxColumns() throws IOException { dataWriter.close(); DataFile dataFile = dataWriter.toDataFile(); - // No field should have metrics - Assert.assertTrue("Should not have any lower bound metrics", dataFile.lowerBounds().isEmpty()); - Assert.assertTrue("Should not have any upper bound metrics", dataFile.upperBounds().isEmpty()); - Assert.assertTrue("Should not have any nan value metrics", dataFile.nanValueCounts().isEmpty()); - Assert.assertTrue("Should not have any null value metrics", dataFile.nullValueCounts().isEmpty()); - Assert.assertTrue("Should not have any value metrics", dataFile.valueCounts().isEmpty()); + // start at 1 because IDs were reassigned in the table + int id = 1; + for (; id <= 32; id += 1) { + Assert.assertNotNull("Should have lower bound metrics", dataFile.lowerBounds().get(id)); + Assert.assertNotNull("Should have upper bound metrics", dataFile.upperBounds().get(id)); + Assert.assertNull("Should not have nan value metrics (not floating point)", dataFile.nanValueCounts().get(id)); + Assert.assertNotNull("Should have null value metrics", dataFile.nullValueCounts().get(id)); + Assert.assertNotNull("Should have value metrics", dataFile.valueCounts().get(id)); + } + + // Remaining fields should not have metrics + for (; id <= numColumns; id += 1) { + Assert.assertNull("Should not have any lower bound metrics", dataFile.lowerBounds().get(id)); + Assert.assertNull("Should not have any upper bound metrics", dataFile.upperBounds().get(id)); + Assert.assertNull("Should not have any nan value metrics", dataFile.nanValueCounts().get(id)); + Assert.assertNull("Should not have any null value metrics", dataFile.nullValueCounts().get(id)); + Assert.assertNull("Should not have any value metrics", dataFile.valueCounts().get(id)); + } } @Test