diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java index 1b835c447909..2ae01dad284a 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java +++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java @@ -44,6 +44,8 @@ public final class MetricsConfig implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(MetricsConfig.class); private static final Joiner DOT = Joiner.on('.'); + // Disable metrics by default for wide tables to prevent excessive metadata + private static final int MAX_COLUMNS = 32; private static final MetricsConfig DEFAULT = new MetricsConfig(ImmutableMap.of(), MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT)); @@ -94,7 +96,7 @@ static Map updateProperties(Map props, List props) { - return from(props, null); + return from(props, null, DEFAULT_WRITE_METRICS_MODE_DEFAULT); } /** @@ -102,7 +104,13 @@ public static MetricsConfig fromProperties(Map props) { * @param table iceberg table */ public static MetricsConfig forTable(Table table) { - return from(table.properties(), table.sortOrder()); + String defaultMode; + if (table.schema().columns().size() <= MAX_COLUMNS) { + defaultMode = DEFAULT_WRITE_METRICS_MODE_DEFAULT; + } else { + defaultMode = MetricsModes.None.get().toString(); + } + return from(table.properties(), table.sortOrder(), defaultMode); } /** @@ -127,24 +135,35 @@ public static MetricsConfig forPositionDelete(Table table) { return new MetricsConfig(columnModes.build(), defaultMode); } - private static MetricsConfig from(Map props, SortOrder order) { + /** + * Generate a MetricsConfig for all columns based on overrides, sortOrder, and defaultMode. + * @param props will be read for metrics overrides (write.metadata.metrics.column.*) and default + * (write.metadata.metrics.default) + * @param order sort order columns, will be promoted to truncate(16) + * @param defaultMode default, if not set by user property + * @return metrics configuration + */ + private static MetricsConfig from(Map props, SortOrder order, String defaultMode) { Map columnModes = Maps.newHashMap(); - MetricsMode defaultMode; - String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, DEFAULT_WRITE_METRICS_MODE_DEFAULT); + + // Handle user override of default mode + MetricsMode finalDefaultMode; + String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, defaultMode); try { - defaultMode = MetricsModes.fromString(defaultModeAsString); + finalDefaultMode = MetricsModes.fromString(defaultModeAsString); } catch (IllegalArgumentException err) { - // Mode was invalid, log the error and use the default + // User override was invalid, log the error and use the default LOG.warn("Ignoring invalid default metrics mode: {}", defaultModeAsString, err); - defaultMode = MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT); + finalDefaultMode = MetricsModes.fromString(defaultMode); } // First set sorted column with sorted column default (can be overridden by user) - MetricsMode sortedColDefaultMode = sortedColumnDefaultMode(defaultMode); + MetricsMode sortedColDefaultMode = sortedColumnDefaultMode(finalDefaultMode); Set sortedCols = SortOrderUtil.orderPreservingSortedColumns(order); sortedCols.forEach(sc -> columnModes.put(sc, sortedColDefaultMode)); - MetricsMode defaultModeFinal = defaultMode; + // Handle user overrides of defaults + MetricsMode finalDefaultModeVal = finalDefaultMode; props.keySet().stream() .filter(key -> key.startsWith(METRICS_MODE_COLUMN_CONF_PREFIX)) .forEach(key -> { @@ -155,12 +174,12 @@ private static MetricsConfig from(Map props, SortOrder order) { } catch (IllegalArgumentException err) { // Mode was invalid, log the error and use the default LOG.warn("Ignoring invalid metrics mode for column {}: {}", columnAlias, props.get(key), err); - mode = defaultModeFinal; + mode = finalDefaultModeVal; } columnModes.put(columnAlias, mode); }); - return new MetricsConfig(columnModes, defaultMode); + return new MetricsConfig(columnModes, finalDefaultMode); } /** diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java index f52e422cb533..1ad88a729134 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java +++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.util.List; import java.util.Map; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -31,12 +32,14 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestTables; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.After; @@ -83,7 +86,6 @@ public abstract class TestWriterMetrics { protected FileFormat fileFormat; protected TestTables.TestTable table = null; - protected File metadataDir = null; private OutputFileFactory fileFactory = null; @Parameterized.Parameters(name = "FileFormat = {0}") @@ -98,17 +100,17 @@ public TestWriterMetrics(FileFormat fileFormat) { this.fileFormat = fileFormat; } - protected abstract FileWriterFactory newWriterFactory(Schema dataSchema); + protected abstract FileWriterFactory newWriterFactory(Table sourceTable); protected abstract T toRow(Integer id, String data, boolean boolValue, Long longValue); + protected abstract T toGenericRow(int value, int repeated); + @Before public void setupTable() throws Exception { File tableDir = temp.newFolder(); tableDir.delete(); // created by table create - this.metadataDir = new File(tableDir, "metadata"); - this.table = TestTables.create( tableDir, "test", @@ -129,7 +131,7 @@ public void after() { @Test public void verifySortedColMetric() throws Exception { T row = toRow(3, "3", true, 3L); - DataWriter dataWriter = newWriterFactory(SCHEMA).newDataWriter( + DataWriter dataWriter = newWriterFactory(table).newDataWriter( fileFactory.newOutputFile(), PartitionSpec.unpartitioned(), null @@ -156,7 +158,7 @@ public void verifySortedColMetric() throws Exception { @Test public void testPositionDeleteMetrics() throws IOException { - FileWriterFactory writerFactory = newWriterFactory(SCHEMA); + FileWriterFactory writerFactory = newWriterFactory(table); EncryptedOutputFile outputFile = fileFactory.newOutputFile(); PositionDeleteWriter deleteWriter = writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null); @@ -202,4 +204,87 @@ public void testPositionDeleteMetrics() throws IOException { Assert.assertFalse(upperBounds.containsKey(4)); Assert.assertEquals(3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5))); } + + @Test + public void testMaxColumns() throws IOException { + File tableDir = temp.newFolder(); + tableDir.delete(); // created by table create + + int numColumns = 33; + List fields = Lists.newArrayListWithCapacity(numColumns); + for (int i = 0; i < numColumns; i++) { + fields.add(required(i, "col" + i, Types.IntegerType.get())); + } + Schema maxColSchema = new Schema(fields); + + Table maxColumnTable = TestTables.create( + tableDir, + "max_col_table", + maxColSchema, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + FORMAT_V2); + OutputFileFactory maxColFactory = OutputFileFactory.builderFor(maxColumnTable, 1, 1) + .format(fileFormat).build(); + + T row = toGenericRow(1, numColumns); + DataWriter dataWriter = newWriterFactory(maxColumnTable).newDataWriter( + maxColFactory.newOutputFile(), + PartitionSpec.unpartitioned(), + null + ); + dataWriter.add(row); + dataWriter.close(); + DataFile dataFile = dataWriter.toDataFile(); + + // No field should have metrics + Assert.assertTrue("Should not have any lower bound metrics", dataFile.lowerBounds().isEmpty()); + Assert.assertTrue("Should not have any upper bound metrics", dataFile.upperBounds().isEmpty()); + Assert.assertTrue("Should not have any nan value metrics", dataFile.nanValueCounts().isEmpty()); + Assert.assertTrue("Should not have any null value metrics", dataFile.nullValueCounts().isEmpty()); + Assert.assertTrue("Should not have any value metrics", dataFile.valueCounts().isEmpty()); + } + + @Test + public void testMaxColumnsWithDefaultOverride() throws IOException { + File tableDir = temp.newFolder(); + tableDir.delete(); // created by table create + + int numColumns = 33; + List fields = Lists.newArrayListWithCapacity(numColumns); + for (int i = 0; i < numColumns; i++) { + fields.add(required(i, "col" + i, Types.IntegerType.get())); + } + Schema maxColSchema = new Schema(fields); + + Table maxColumnTable = TestTables.create( + tableDir, + "max_col_table", + maxColSchema, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + FORMAT_V2); + maxColumnTable.updateProperties().set(TableProperties.DEFAULT_WRITE_METRICS_MODE, + TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT).commit(); + OutputFileFactory maxColFactory = OutputFileFactory.builderFor(maxColumnTable, 1, 1) + .format(fileFormat).build(); + + T row = toGenericRow(1, numColumns); + DataWriter dataWriter = newWriterFactory(maxColumnTable).newDataWriter( + maxColFactory.newOutputFile(), + PartitionSpec.unpartitioned(), + null + ); + dataWriter.add(row); + dataWriter.close(); + DataFile dataFile = dataWriter.toDataFile(); + + // Field should have metrics because the user set the default explicitly + Map upperBounds = dataFile.upperBounds(); + Map lowerBounds = dataFile.upperBounds(); + for (int i = 0; i < numColumns; i++) { + Assert.assertEquals(1, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1))); + Assert.assertEquals(1, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1))); + } + } } diff --git a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java index 0e94476d82f4..aa31c1819d10 100644 --- a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java +++ b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java @@ -23,7 +23,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; @@ -34,12 +34,12 @@ public TestFlinkWriterMetrics(FileFormat fileFormat) { } @Override - protected FileWriterFactory newWriterFactory(Schema dataSchema) { - return FlinkFileWriterFactory.builderFor(table) - .dataSchema(table.schema()) + protected FileWriterFactory newWriterFactory(Table sourceTable) { + return FlinkFileWriterFactory.builderFor(sourceTable) + .dataSchema(sourceTable.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat) - .positionDeleteRowSchema(table.schema()) + .positionDeleteRowSchema(sourceTable.schema()) .build(); } @@ -49,4 +49,13 @@ protected RowData toRow(Integer id, String data, boolean boolValue, Long longVal GenericRowData row = GenericRowData.of(id, StringData.fromString(data), nested); return row; } + + @Override + public RowData toGenericRow(int value, int repeated) { + GenericRowData row = new GenericRowData(repeated); + for (int i = 0; i < repeated; i++) { + row.setField(i, value); + } + return row; + } } diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java index 0e94476d82f4..aa31c1819d10 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java @@ -23,7 +23,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; @@ -34,12 +34,12 @@ public TestFlinkWriterMetrics(FileFormat fileFormat) { } @Override - protected FileWriterFactory newWriterFactory(Schema dataSchema) { - return FlinkFileWriterFactory.builderFor(table) - .dataSchema(table.schema()) + protected FileWriterFactory newWriterFactory(Table sourceTable) { + return FlinkFileWriterFactory.builderFor(sourceTable) + .dataSchema(sourceTable.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat) - .positionDeleteRowSchema(table.schema()) + .positionDeleteRowSchema(sourceTable.schema()) .build(); } @@ -49,4 +49,13 @@ protected RowData toRow(Integer id, String data, boolean boolValue, Long longVal GenericRowData row = GenericRowData.of(id, StringData.fromString(data), nested); return row; } + + @Override + public RowData toGenericRow(int value, int repeated) { + GenericRowData row = new GenericRowData(repeated); + for (int i = 0; i < repeated; i++) { + row.setField(i, value); + } + return row; + } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java index 0e94476d82f4..aa31c1819d10 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java @@ -23,7 +23,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; @@ -34,12 +34,12 @@ public TestFlinkWriterMetrics(FileFormat fileFormat) { } @Override - protected FileWriterFactory newWriterFactory(Schema dataSchema) { - return FlinkFileWriterFactory.builderFor(table) - .dataSchema(table.schema()) + protected FileWriterFactory newWriterFactory(Table sourceTable) { + return FlinkFileWriterFactory.builderFor(sourceTable) + .dataSchema(sourceTable.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat) - .positionDeleteRowSchema(table.schema()) + .positionDeleteRowSchema(sourceTable.schema()) .build(); } @@ -49,4 +49,13 @@ protected RowData toRow(Integer id, String data, boolean boolValue, Long longVal GenericRowData row = GenericRowData.of(id, StringData.fromString(data), nested); return row; } + + @Override + public RowData toGenericRow(int value, int repeated) { + GenericRowData row = new GenericRowData(repeated); + for (int i = 0; i < repeated; i++) { + row.setField(i, value); + } + return row; + } } diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java index 026111cf48ae..967f394faa74 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java @@ -20,7 +20,7 @@ package org.apache.iceberg.spark.source; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,12 +34,12 @@ public TestSparkWriterMetrics(FileFormat fileFormat) { } @Override - protected FileWriterFactory newWriterFactory(Schema dataSchema) { - return SparkFileWriterFactory.builderFor(table) - .dataSchema(table.schema()) + protected FileWriterFactory newWriterFactory(Table sourceTable) { + return SparkFileWriterFactory.builderFor(sourceTable) + .dataSchema(sourceTable.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat) - .positionDeleteRowSchema(table.schema()) + .positionDeleteRowSchema(sourceTable.schema()) .build(); } @@ -56,4 +56,13 @@ protected InternalRow toRow(Integer id, String data, boolean boolValue, Long lon row.update(2, nested); return row; } + + @Override + protected InternalRow toGenericRow(int value, int repeated) { + InternalRow row = new GenericInternalRow(repeated); + for (int i = 0; i < repeated; i++) { + row.update(i, value); + } + return row; + } } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java index 026111cf48ae..967f394faa74 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java @@ -20,7 +20,7 @@ package org.apache.iceberg.spark.source; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,12 +34,12 @@ public TestSparkWriterMetrics(FileFormat fileFormat) { } @Override - protected FileWriterFactory newWriterFactory(Schema dataSchema) { - return SparkFileWriterFactory.builderFor(table) - .dataSchema(table.schema()) + protected FileWriterFactory newWriterFactory(Table sourceTable) { + return SparkFileWriterFactory.builderFor(sourceTable) + .dataSchema(sourceTable.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat) - .positionDeleteRowSchema(table.schema()) + .positionDeleteRowSchema(sourceTable.schema()) .build(); } @@ -56,4 +56,13 @@ protected InternalRow toRow(Integer id, String data, boolean boolValue, Long lon row.update(2, nested); return row; } + + @Override + protected InternalRow toGenericRow(int value, int repeated) { + InternalRow row = new GenericInternalRow(repeated); + for (int i = 0; i < repeated; i++) { + row.update(i, value); + } + return row; + } } diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java index 026111cf48ae..967f394faa74 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java @@ -20,7 +20,7 @@ package org.apache.iceberg.spark.source; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,12 +34,12 @@ public TestSparkWriterMetrics(FileFormat fileFormat) { } @Override - protected FileWriterFactory newWriterFactory(Schema dataSchema) { - return SparkFileWriterFactory.builderFor(table) - .dataSchema(table.schema()) + protected FileWriterFactory newWriterFactory(Table sourceTable) { + return SparkFileWriterFactory.builderFor(sourceTable) + .dataSchema(sourceTable.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat) - .positionDeleteRowSchema(table.schema()) + .positionDeleteRowSchema(sourceTable.schema()) .build(); } @@ -56,4 +56,13 @@ protected InternalRow toRow(Integer id, String data, boolean boolValue, Long lon row.update(2, nested); return row; } + + @Override + protected InternalRow toGenericRow(int value, int repeated) { + InternalRow row = new GenericInternalRow(repeated); + for (int i = 0; i < repeated; i++) { + row.update(i, value); + } + return row; + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java index 026111cf48ae..967f394faa74 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java @@ -20,7 +20,7 @@ package org.apache.iceberg.spark.source; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,12 +34,12 @@ public TestSparkWriterMetrics(FileFormat fileFormat) { } @Override - protected FileWriterFactory newWriterFactory(Schema dataSchema) { - return SparkFileWriterFactory.builderFor(table) - .dataSchema(table.schema()) + protected FileWriterFactory newWriterFactory(Table sourceTable) { + return SparkFileWriterFactory.builderFor(sourceTable) + .dataSchema(sourceTable.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat) - .positionDeleteRowSchema(table.schema()) + .positionDeleteRowSchema(sourceTable.schema()) .build(); } @@ -56,4 +56,13 @@ protected InternalRow toRow(Integer id, String data, boolean boolValue, Long lon row.update(2, nested); return row; } + + @Override + protected InternalRow toGenericRow(int value, int repeated) { + InternalRow row = new GenericInternalRow(repeated); + for (int i = 0; i < repeated; i++) { + row.update(i, value); + } + return row; + } }