Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 64 additions & 39 deletions core/src/main/java/org/apache/iceberg/MetricsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SortOrderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS;
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
import static org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX;

@Immutable
Expand All @@ -45,9 +49,8 @@ public final class MetricsConfig implements Serializable {
private static final Joiner DOT = Joiner.on('.');

// Disable metrics by default for wide tables to prevent excessive metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielcweeks Is this a left-over comment?

private static final int MAX_COLUMNS = 32;
private static final MetricsConfig DEFAULT = new MetricsConfig(ImmutableMap.of(),
MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT));
private static final MetricsMode DEFAULT_MODE = MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT);
private static final MetricsConfig DEFAULT = new MetricsConfig(ImmutableMap.of(), DEFAULT_MODE);

private final Map<String, MetricsMode> columnModes;
private final MetricsMode defaultMode;
Expand Down Expand Up @@ -96,21 +99,15 @@ static Map<String, String> updateProperties(Map<String, String> props, List<Stri
**/
@Deprecated
public static MetricsConfig fromProperties(Map<String, String> props) {
return from(props, null, DEFAULT_WRITE_METRICS_MODE_DEFAULT);
return from(props, null, null);
}

/**
* Creates a metrics config from a table.
* @param table iceberg table
*/
public static MetricsConfig forTable(Table table) {
String defaultMode;
if (table.schema().columns().size() <= MAX_COLUMNS) {
defaultMode = DEFAULT_WRITE_METRICS_MODE_DEFAULT;
} else {
defaultMode = MetricsModes.None.get().toString();
}
return from(table.properties(), table.sortOrder(), defaultMode);
return from(table.properties(), table.schema(), table.sortOrder());
}

/**
Expand All @@ -136,50 +133,55 @@ public static MetricsConfig forPositionDelete(Table table) {
}

/**
* Generate a MetricsConfig for all columns based on overrides, sortOrder, and defaultMode.
* Generate a MetricsConfig for all columns based on overrides, schema, and sort order.
*
* @param props will be read for metrics overrides (write.metadata.metrics.column.*) and default
* (write.metadata.metrics.default)
* @param schema table schema
* @param order sort order columns, will be promoted to truncate(16)
* @param defaultMode default, if not set by user property
* @return metrics configuration
*/
private static MetricsConfig from(Map<String, String> props, SortOrder order, String defaultMode) {
private static MetricsConfig from(Map<String, String> props, Schema schema, SortOrder order) {
int maxInferredDefaultColumns = maxInferredColumnDefaults(props);
Map<String, MetricsMode> columnModes = Maps.newHashMap();

// Handle user override of default mode
MetricsMode finalDefaultMode;
String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, defaultMode);
try {
finalDefaultMode = MetricsModes.fromString(defaultModeAsString);
} catch (IllegalArgumentException err) {
// User override was invalid, log the error and use the default
LOG.warn("Ignoring invalid default metrics mode: {}", defaultModeAsString, err);
finalDefaultMode = MetricsModes.fromString(defaultMode);
MetricsMode defaultMode;
String configuredDefault = props.get(DEFAULT_WRITE_METRICS_MODE);
if (configuredDefault != null) {
// a user-configured default mode is applied for all columns
defaultMode = parseMode(configuredDefault, DEFAULT_MODE, "default");

} else if (schema == null || schema.columns().size() <= maxInferredDefaultColumns) {
// there are less than the inferred limit, so the default is used everywhere
defaultMode = DEFAULT_MODE;

} else {
// an inferred default mode is applied to the first few columns, up to the limit
Schema subSchema = new Schema(schema.columns().subList(0, maxInferredDefaultColumns));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick warning: subList would return a view on top of the original list and any subsequent changes would be reflected in both. This does not seem to cause issues here but I better mention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary schema so it should be fine.

Copy link
Contributor

@aokolnychyi aokolnychyi Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I have a highly nested schema? The number of stored metrics can be way more than 32 in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this is the current behavior. We use the top-level columns for the current check.

for (Integer id : TypeUtil.getProjectedIds(subSchema)) {
columnModes.put(subSchema.findColumnName(id), DEFAULT_MODE);
}

// all other columns don't use metrics
defaultMode = MetricsModes.None.get();
}

// First set sorted column with sorted column default (can be overridden by user)
MetricsMode sortedColDefaultMode = sortedColumnDefaultMode(finalDefaultMode);
MetricsMode sortedColDefaultMode = sortedColumnDefaultMode(defaultMode);
Set<String> sortedCols = SortOrderUtil.orderPreservingSortedColumns(order);
sortedCols.forEach(sc -> columnModes.put(sc, sortedColDefaultMode));

// Handle user overrides of defaults
MetricsMode finalDefaultModeVal = finalDefaultMode;
props.keySet().stream()
.filter(key -> key.startsWith(METRICS_MODE_COLUMN_CONF_PREFIX))
.forEach(key -> {
String columnAlias = key.replaceFirst(METRICS_MODE_COLUMN_CONF_PREFIX, "");
MetricsMode mode;
try {
mode = MetricsModes.fromString(props.get(key));
} catch (IllegalArgumentException err) {
// Mode was invalid, log the error and use the default
LOG.warn("Ignoring invalid metrics mode for column {}: {}", columnAlias, props.get(key), err);
mode = finalDefaultModeVal;
}
columnModes.put(columnAlias, mode);
});
for (String key : props.keySet()) {
if (key.startsWith(METRICS_MODE_COLUMN_CONF_PREFIX)) {
String columnAlias = key.replaceFirst(METRICS_MODE_COLUMN_CONF_PREFIX, "");
MetricsMode mode = parseMode(props.get(key), defaultMode, "column " + columnAlias);
columnModes.put(columnAlias, mode);
}
}

return new MetricsConfig(columnModes, finalDefaultMode);
return new MetricsConfig(columnModes, defaultMode);
}

/**
Expand All @@ -195,6 +197,29 @@ private static MetricsMode sortedColumnDefaultMode(MetricsMode defaultMode) {
}
}

private static int maxInferredColumnDefaults(Map<String, String> properties) {
int maxInferredDefaultColumns = PropertyUtil.propertyAsInt(properties,
METRICS_MAX_INFERRED_COLUMN_DEFAULTS, METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT);
if (maxInferredDefaultColumns < 0) {
LOG.warn("Invalid value for {} (negative): {}, falling back to {}",
METRICS_MAX_INFERRED_COLUMN_DEFAULTS, maxInferredDefaultColumns,
METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT);
return METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
} else {
return maxInferredDefaultColumns;
}
}

private static MetricsMode parseMode(String modeString, MetricsMode fallback, String context) {
try {
return MetricsModes.fromString(modeString);
} catch (IllegalArgumentException err) {
// User override was invalid, log the error and use the default
LOG.warn("Ignoring invalid metrics mode ({}): {}", context, modeString, err);
return fallback;
}
}

public void validateReferencedColumns(Schema schema) {
for (String column : columnModes.keySet()) {
ValidationException.check(
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ private TableProperties() {
public static final String METADATA_DELETE_AFTER_COMMIT_ENABLED = "write.metadata.delete-after-commit.enabled";
public static final boolean METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT = false;

public static final String METRICS_MAX_INFERRED_COLUMN_DEFAULTS =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: INFERRED doesn't really make sense to me in this context. We're not inferring, we're actually explicit here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated what to call this. The problem is that we are defaulting the default. You can explicitly set a default using write.metadata.metrics.default, and that will apply to all columns. If you don't set a default we infer one for you, but only for the first 32 columns (at least, after this PR). That's why I used "inferred" -- I thought it was better than max-defaulted-default-columns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this table property, initially I had made one but it was taken out during the discussions. Indeed it's a bit of a confusing config, but I dont see any other great option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming felt a little bit confusing to me too. After I read the explanation, it started to make more sense. However, I am still not sure inferred is the right word. Technically, we infer defaults for all columns (after this limit it just becomes none). To me, this is more about limiting the number of columns for which we persist metrics by default. Can the property name revolve around persist rather than infer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with "persist" and similar is that it misses the distinction between an explicit default (when write.metadata.metrics.default is set) and an implicit default that comes from Iceberg. I think the right behavior is to preserve what we currently do, which is to use the explicit default for all columns. But that means that this property should obviously not apply to the explicit default. That's why I used "inferred default".

What about changing this to include unconfigured or missing? Something like missing-mode-limit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just keep it as-is then. I don't think it is a big deal.

"write.metadata.metrics.max-inferred-column-defaults";
public static final int METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT = 32;

public static final String METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column.";
public static final String DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default";
public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)";
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMetricsModes.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.MetricsModes.Counts;
import org.apache.iceberg.MetricsModes.Full;
Expand Down Expand Up @@ -160,4 +161,28 @@ public void testMetricsConfigSortedColsDefaultByInvalid() throws Exception {
Assert.assertEquals("Original default applies as user entered invalid mode for sorted column",
Counts.get(), config.columnMode("col2"));
}

@Test
public void testMetricsConfigInferredDefaultModeLimit() throws IOException {
Schema schema = new Schema(
required(1, "col1", Types.IntegerType.get()),
required(2, "col2", Types.IntegerType.get()),
required(3, "col3", Types.IntegerType.get())
);

File tableDir = temp.newFolder();
Assert.assertTrue(tableDir.delete());

Table table = TestTables.create(
tableDir, "test", schema, PartitionSpec.unpartitioned(), SortOrder.unsorted(), formatVersion);

// only infer a default for the first two columns
table.updateProperties().set(TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS, "2").commit();

MetricsConfig config = MetricsConfig.forTable(table);

Assert.assertEquals("Should use default mode for col1", Truncate.withLength(16), config.columnMode("col1"));
Assert.assertEquals("Should use default mode for col2", Truncate.withLength(16), config.columnMode("col2"));
Assert.assertEquals("Should use None for col3", None.get(), config.columnMode("col3"));
}
}
24 changes: 18 additions & 6 deletions data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,24 @@ public void testMaxColumns() throws IOException {
dataWriter.close();
DataFile dataFile = dataWriter.toDataFile();

// No field should have metrics
Assert.assertTrue("Should not have any lower bound metrics", dataFile.lowerBounds().isEmpty());
Assert.assertTrue("Should not have any upper bound metrics", dataFile.upperBounds().isEmpty());
Assert.assertTrue("Should not have any nan value metrics", dataFile.nanValueCounts().isEmpty());
Assert.assertTrue("Should not have any null value metrics", dataFile.nullValueCounts().isEmpty());
Assert.assertTrue("Should not have any value metrics", dataFile.valueCounts().isEmpty());
// start at 1 because IDs were reassigned in the table
int id = 1;
for (; id <= 32; id += 1) {
Assert.assertNotNull("Should have lower bound metrics", dataFile.lowerBounds().get(id));
Assert.assertNotNull("Should have upper bound metrics", dataFile.upperBounds().get(id));
Assert.assertNull("Should not have nan value metrics (not floating point)", dataFile.nanValueCounts().get(id));
Assert.assertNotNull("Should have null value metrics", dataFile.nullValueCounts().get(id));
Assert.assertNotNull("Should have value metrics", dataFile.valueCounts().get(id));
}

// Remaining fields should not have metrics
for (; id <= numColumns; id += 1) {
Assert.assertNull("Should not have any lower bound metrics", dataFile.lowerBounds().get(id));
Assert.assertNull("Should not have any upper bound metrics", dataFile.upperBounds().get(id));
Assert.assertNull("Should not have any nan value metrics", dataFile.nanValueCounts().get(id));
Assert.assertNull("Should not have any null value metrics", dataFile.nullValueCounts().get(id));
Assert.assertNull("Should not have any value metrics", dataFile.valueCounts().get(id));
}
}

@Test
Expand Down