Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions core/src/main/java/org/apache/iceberg/MetricsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public final class MetricsConfig implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(MetricsConfig.class);
private static final Joiner DOT = Joiner.on('.');

// Disable metrics by default for wide tables to prevent excessive metadata
private static final int MAX_COLUMNS = 32;
private static final MetricsConfig DEFAULT = new MetricsConfig(ImmutableMap.of(),
MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT));

Expand Down Expand Up @@ -94,15 +96,21 @@ static Map<String, String> updateProperties(Map<String, String> props, List<Stri
**/
@Deprecated
public static MetricsConfig fromProperties(Map<String, String> props) {
return from(props, null);
return from(props, null, DEFAULT_WRITE_METRICS_MODE_DEFAULT);
}

/**
* Creates a metrics config from a table.
* @param table iceberg table
*/
public static MetricsConfig forTable(Table table) {
return from(table.properties(), table.sortOrder());
String defaultMode;
if (table.schema().columns().size() <= MAX_COLUMNS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to override this behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I can make a table config , does that sound reasonable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"defaultMetricCollection : all" or something? Don't we already have that?

Also this feels like it should probably be a catalog level prop 🤷

Copy link
Member Author

@szehon-ho szehon-ho Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was initially thinking like 'write.metadata.metrics.max.columns'='100'? Yea good point maybe catalog level prop is cleaner

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took another look, and actually it's a bit messy to make this a catalog property in the current code (the Table is serialized and sent to writers, but Catalog is not..).

Was thinking it makes sense as a table property as well (ie, if i know this table is worth optimizing, set the max columns to be higher). May be a bit awkward to have a different catalog just for this. And if user really wants to have one global setting, this change is coming: #4011

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay for now. Tables will still respect whatever is set as write.metadata.metrics.default so this really just changes Iceberg's default in a reasonable way. It is also good to note that metrics for sort columns are automatically promoted to at least truncate[16] so it isn't as though we're losing all stats.

defaultMode = DEFAULT_WRITE_METRICS_MODE_DEFAULT;
} else {
defaultMode = MetricsModes.None.get().toString();
}
return from(table.properties(), table.sortOrder(), defaultMode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: missing newline after control flow block and before return.

}

/**
Expand All @@ -127,24 +135,35 @@ public static MetricsConfig forPositionDelete(Table table) {
return new MetricsConfig(columnModes.build(), defaultMode);
}

private static MetricsConfig from(Map<String, String> props, SortOrder order) {
/**
* Generate a MetricsConfig for all columns based on overrides, sortOrder, and defaultMode.
* @param props will be read for metrics overrides (write.metadata.metrics.column.*) and default
* (write.metadata.metrics.default)
* @param order sort order columns, will be promoted to truncate(16)
* @param defaultMode default, if not set by user property
* @return metrics configuration
*/
private static MetricsConfig from(Map<String, String> props, SortOrder order, String defaultMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of renaming the variable that is used everywhere, I'd just rename the incoming argument. There would be fewer changes if this used defaultDefaultMode or something.

Map<String, MetricsMode> columnModes = Maps.newHashMap();
MetricsMode defaultMode;
String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, DEFAULT_WRITE_METRICS_MODE_DEFAULT);

// Handle user override of default mode
MetricsMode finalDefaultMode;
String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, defaultMode);
try {
defaultMode = MetricsModes.fromString(defaultModeAsString);
finalDefaultMode = MetricsModes.fromString(defaultModeAsString);
} catch (IllegalArgumentException err) {
// Mode was invalid, log the error and use the default
// User override was invalid, log the error and use the default
LOG.warn("Ignoring invalid default metrics mode: {}", defaultModeAsString, err);
defaultMode = MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT);
finalDefaultMode = MetricsModes.fromString(defaultMode);
}

// First set sorted column with sorted column default (can be overridden by user)
MetricsMode sortedColDefaultMode = sortedColumnDefaultMode(defaultMode);
MetricsMode sortedColDefaultMode = sortedColumnDefaultMode(finalDefaultMode);
Set<String> sortedCols = SortOrderUtil.orderPreservingSortedColumns(order);
sortedCols.forEach(sc -> columnModes.put(sc, sortedColDefaultMode));

MetricsMode defaultModeFinal = defaultMode;
// Handle user overrides of defaults
MetricsMode finalDefaultModeVal = finalDefaultMode;
props.keySet().stream()
.filter(key -> key.startsWith(METRICS_MODE_COLUMN_CONF_PREFIX))
.forEach(key -> {
Expand All @@ -155,12 +174,12 @@ private static MetricsConfig from(Map<String, String> props, SortOrder order) {
} catch (IllegalArgumentException err) {
// Mode was invalid, log the error and use the default
LOG.warn("Ignoring invalid metrics mode for column {}: {}", columnAlias, props.get(key), err);
mode = defaultModeFinal;
mode = finalDefaultModeVal;
}
columnModes.put(columnAlias, mode);
});

return new MetricsConfig(columnModes, defaultMode);
return new MetricsConfig(columnModes, finalDefaultMode);
}

/**
Expand Down
97 changes: 91 additions & 6 deletions data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand All @@ -31,12 +32,14 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.After;
Expand Down Expand Up @@ -83,7 +86,6 @@ public abstract class TestWriterMetrics<T> {

protected FileFormat fileFormat;
protected TestTables.TestTable table = null;
protected File metadataDir = null;
private OutputFileFactory fileFactory = null;

@Parameterized.Parameters(name = "FileFormat = {0}")
Expand All @@ -98,17 +100,17 @@ public TestWriterMetrics(FileFormat fileFormat) {
this.fileFormat = fileFormat;
}

protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema);
protected abstract FileWriterFactory<T> newWriterFactory(Table sourceTable);

protected abstract T toRow(Integer id, String data, boolean boolValue, Long longValue);

protected abstract T toGenericRow(int value, int repeated);

@Before
public void setupTable() throws Exception {
File tableDir = temp.newFolder();
tableDir.delete(); // created by table create

this.metadataDir = new File(tableDir, "metadata");

this.table = TestTables.create(
tableDir,
"test",
Expand All @@ -129,7 +131,7 @@ public void after() {
@Test
public void verifySortedColMetric() throws Exception {
T row = toRow(3, "3", true, 3L);
DataWriter dataWriter = newWriterFactory(SCHEMA).newDataWriter(
DataWriter dataWriter = newWriterFactory(table).newDataWriter(
fileFactory.newOutputFile(),
PartitionSpec.unpartitioned(),
null
Expand All @@ -156,7 +158,7 @@ public void verifySortedColMetric() throws Exception {

@Test
public void testPositionDeleteMetrics() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(SCHEMA);
FileWriterFactory<T> writerFactory = newWriterFactory(table);
EncryptedOutputFile outputFile = fileFactory.newOutputFile();
PositionDeleteWriter<T> deleteWriter = writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);

Expand Down Expand Up @@ -202,4 +204,87 @@ public void testPositionDeleteMetrics() throws IOException {
Assert.assertFalse(upperBounds.containsKey(4));
Assert.assertEquals(3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
}

@Test
public void testMaxColumns() throws IOException {
File tableDir = temp.newFolder();
tableDir.delete(); // created by table create

int numColumns = 33;
List<Types.NestedField> fields = Lists.newArrayListWithCapacity(numColumns);
for (int i = 0; i < numColumns; i++) {
fields.add(required(i, "col" + i, Types.IntegerType.get()));
}
Schema maxColSchema = new Schema(fields);

Table maxColumnTable = TestTables.create(
tableDir,
"max_col_table",
maxColSchema,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
FORMAT_V2);
OutputFileFactory maxColFactory = OutputFileFactory.builderFor(maxColumnTable, 1, 1)
.format(fileFormat).build();

T row = toGenericRow(1, numColumns);
DataWriter dataWriter = newWriterFactory(maxColumnTable).newDataWriter(
maxColFactory.newOutputFile(),
PartitionSpec.unpartitioned(),
null
);
dataWriter.add(row);
dataWriter.close();
DataFile dataFile = dataWriter.toDataFile();

// No field should have metrics
Assert.assertTrue("Should not have any lower bound metrics", dataFile.lowerBounds().isEmpty());
Assert.assertTrue("Should not have any upper bound metrics", dataFile.upperBounds().isEmpty());
Assert.assertTrue("Should not have any nan value metrics", dataFile.nanValueCounts().isEmpty());
Assert.assertTrue("Should not have any null value metrics", dataFile.nullValueCounts().isEmpty());
Assert.assertTrue("Should not have any value metrics", dataFile.valueCounts().isEmpty());
}

@Test
public void testMaxColumnsWithDefaultOverride() throws IOException {
File tableDir = temp.newFolder();
tableDir.delete(); // created by table create

int numColumns = 33;
List<Types.NestedField> fields = Lists.newArrayListWithCapacity(numColumns);
for (int i = 0; i < numColumns; i++) {
fields.add(required(i, "col" + i, Types.IntegerType.get()));
}
Schema maxColSchema = new Schema(fields);

Table maxColumnTable = TestTables.create(
tableDir,
"max_col_table",
maxColSchema,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
FORMAT_V2);
maxColumnTable.updateProperties().set(TableProperties.DEFAULT_WRITE_METRICS_MODE,
TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT).commit();
OutputFileFactory maxColFactory = OutputFileFactory.builderFor(maxColumnTable, 1, 1)
.format(fileFormat).build();

T row = toGenericRow(1, numColumns);
DataWriter dataWriter = newWriterFactory(maxColumnTable).newDataWriter(
maxColFactory.newOutputFile(),
PartitionSpec.unpartitioned(),
null
);
dataWriter.add(row);
dataWriter.close();
DataFile dataFile = dataWriter.toDataFile();

// Field should have metrics because the user set the default explicitly
Map<Integer, ByteBuffer> upperBounds = dataFile.upperBounds();
Map<Integer, ByteBuffer> lowerBounds = dataFile.upperBounds();
for (int i = 0; i < numColumns; i++) {
Assert.assertEquals(1, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1)));
Assert.assertEquals(1, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.TestWriterMetrics;

Expand All @@ -34,12 +34,12 @@ public TestFlinkWriterMetrics(FileFormat fileFormat) {
}

@Override
protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema) {
return FlinkFileWriterFactory.builderFor(table)
.dataSchema(table.schema())
protected FileWriterFactory<RowData> newWriterFactory(Table sourceTable) {
return FlinkFileWriterFactory.builderFor(sourceTable)
.dataSchema(sourceTable.schema())
.dataFileFormat(fileFormat)
.deleteFileFormat(fileFormat)
.positionDeleteRowSchema(table.schema())
.positionDeleteRowSchema(sourceTable.schema())
.build();
}

Expand All @@ -49,4 +49,13 @@ protected RowData toRow(Integer id, String data, boolean boolValue, Long longVal
GenericRowData row = GenericRowData.of(id, StringData.fromString(data), nested);
return row;
}

@Override
public RowData toGenericRow(int value, int repeated) {
GenericRowData row = new GenericRowData(repeated);
for (int i = 0; i < repeated; i++) {
row.setField(i, value);
}
return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.TestWriterMetrics;

Expand All @@ -34,12 +34,12 @@ public TestFlinkWriterMetrics(FileFormat fileFormat) {
}

@Override
protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema) {
return FlinkFileWriterFactory.builderFor(table)
.dataSchema(table.schema())
protected FileWriterFactory<RowData> newWriterFactory(Table sourceTable) {
return FlinkFileWriterFactory.builderFor(sourceTable)
.dataSchema(sourceTable.schema())
.dataFileFormat(fileFormat)
.deleteFileFormat(fileFormat)
.positionDeleteRowSchema(table.schema())
.positionDeleteRowSchema(sourceTable.schema())
.build();
}

Expand All @@ -49,4 +49,13 @@ protected RowData toRow(Integer id, String data, boolean boolValue, Long longVal
GenericRowData row = GenericRowData.of(id, StringData.fromString(data), nested);
return row;
}

@Override
public RowData toGenericRow(int value, int repeated) {
GenericRowData row = new GenericRowData(repeated);
for (int i = 0; i < repeated; i++) {
row.setField(i, value);
}
return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.TestWriterMetrics;

Expand All @@ -34,12 +34,12 @@ public TestFlinkWriterMetrics(FileFormat fileFormat) {
}

@Override
protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema) {
return FlinkFileWriterFactory.builderFor(table)
.dataSchema(table.schema())
protected FileWriterFactory<RowData> newWriterFactory(Table sourceTable) {
return FlinkFileWriterFactory.builderFor(sourceTable)
.dataSchema(sourceTable.schema())
.dataFileFormat(fileFormat)
.deleteFileFormat(fileFormat)
.positionDeleteRowSchema(table.schema())
.positionDeleteRowSchema(sourceTable.schema())
.build();
}

Expand All @@ -49,4 +49,13 @@ protected RowData toRow(Integer id, String data, boolean boolValue, Long longVal
GenericRowData row = GenericRowData.of(id, StringData.fromString(data), nested);
return row;
}

@Override
public RowData toGenericRow(int value, int repeated) {
GenericRowData row = new GenericRowData(repeated);
for (int i = 0; i < repeated; i++) {
row.setField(i, value);
}
return row;
}
}
Loading