diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java new file mode 100644 index 000000000000..864ec060495e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.iceberg.MetricsModes.MetricsMode; + +import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE; +import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT; + +public class MetricsConfig { + + private static final String COLUMN_CONF_PREFIX = "write.metadata.metrics.column."; + + private Map columnModes = Maps.newHashMap(); + private MetricsMode defaultMode; + + private MetricsConfig() {} + + public static MetricsConfig getDefault() { + MetricsConfig spec = new MetricsConfig(); + spec.defaultMode = MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT); + return spec; + } + + public static MetricsConfig fromProperties(Map props) { + MetricsConfig spec = new MetricsConfig(); + props.keySet().stream() + .filter(key -> key.startsWith(COLUMN_CONF_PREFIX)) + .forEach(key -> { + MetricsMode mode = MetricsModes.fromString(props.get(key)); + String columnAlias = key.replaceFirst(COLUMN_CONF_PREFIX, ""); + spec.columnModes.put(columnAlias, mode); + }); + String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, DEFAULT_WRITE_METRICS_MODE_DEFAULT); + spec.defaultMode = MetricsModes.fromString(defaultModeAsString); + return spec; + } + + public MetricsMode columnMode(String columnAlias) { + return columnModes.getOrDefault(columnAlias, defaultMode); + } +} diff --git a/core/src/main/java/org/apache/iceberg/MetricsModes.java b/core/src/main/java/org/apache/iceberg/MetricsModes.java new file mode 100644 index 000000000000..453d721ceeb0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/MetricsModes.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.Preconditions; +import java.util.Locale; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This class defines different metrics modes, which allow users to control the collection of + * value_counts, null_value_counts, lower_bounds, upper_bounds for different columns in metadata. + */ +public class MetricsModes { + + private static final Pattern TRUNCATE = Pattern.compile("truncate\\((\\d+)\\)"); + + private MetricsModes() {} + + public static MetricsMode fromString(String mode) { + if ("none".equalsIgnoreCase(mode)) { + return None.get(); + } else if ("counts".equalsIgnoreCase(mode)) { + return Counts.get(); + } else if ("full".equalsIgnoreCase(mode)) { + return Full.get(); + } + + Matcher truncateMatcher = TRUNCATE.matcher(mode.toLowerCase(Locale.ENGLISH)); + if (truncateMatcher.matches()) { + int length = Integer.parseInt(truncateMatcher.group(1)); + return Truncate.withLength(length); + } + + throw new IllegalArgumentException("Invalid metrics mode: " + mode); + } + + public interface MetricsMode {} + + /** + * Under this mode, value_counts, null_value_counts, lower_bounds, upper_bounds are not persisted. + */ + public static class None implements MetricsMode { + private static final None INSTANCE = new None(); + + public static None get() { + return INSTANCE; + } + + @Override + public String toString() { + return "none"; + } + } + + /** + * Under this mode, only value_counts, null_value_counts are persisted. + */ + public static class Counts implements MetricsMode { + private static final Counts INSTANCE = new Counts(); + + public static Counts get() { + return INSTANCE; + } + + @Override + public String toString() { + return "counts"; + } + } + + /** + * Under this mode, value_counts, null_value_counts and truncated lower_bounds, upper_bounds are persisted. + */ + public static class Truncate implements MetricsMode { + private final int length; + + private Truncate(int length) { + this.length = length; + } + + public static Truncate withLength(int length) { + Preconditions.checkArgument(length > 0, "Truncate length should be positive"); + return new Truncate(length); + } + + public int length() { + return length; + } + + @Override + public String toString() { + return String.format("truncate(%d)", length); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Truncate truncate = (Truncate) obj; + return length == truncate.length; + } + + @Override + public int hashCode() { + return Objects.hash(length); + } + } + + /** + * Under this mode, value_counts, null_value_counts and full lower_bounds, upper_bounds are persisted. + */ + public static class Full implements MetricsMode { + private static final Full INSTANCE = new Full(); + + public static Full get() { + return INSTANCE; + } + + @Override + public String toString() { + return "full"; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 10892b97281f..ecb0f31d57ca 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -89,6 +89,6 @@ private TableProperties() {} public static final String METADATA_COMPRESSION = "write.metadata.compression-codec"; public static final String METADATA_COMPRESSION_DEFAULT = "none"; - public static final String WRITE_METADATA_TRUNCATE_BYTES = "write.metadata.truncate-length"; - public static final int WRITE_METADATA_TRUNCATE_BYTES_DEFAULT = 16; + public static final String DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"; + public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"; } diff --git a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java new file mode 100644 index 000000000000..03b6bb792202 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.MetricsModes.Counts; +import org.apache.iceberg.MetricsModes.Full; +import org.apache.iceberg.MetricsModes.None; +import org.apache.iceberg.MetricsModes.Truncate; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class TestMetricsModes { + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @Test + public void testMetricsModeParsing() { + Assert.assertEquals(None.get(), MetricsModes.fromString("none")); + Assert.assertEquals(None.get(), MetricsModes.fromString("nOnE")); + Assert.assertEquals(Counts.get(), MetricsModes.fromString("counts")); + Assert.assertEquals(Counts.get(), MetricsModes.fromString("coUntS")); + Assert.assertEquals(Truncate.withLength(1), MetricsModes.fromString("truncate(1)")); + Assert.assertEquals(Truncate.withLength(10), MetricsModes.fromString("truNcAte(10)")); + Assert.assertEquals(Full.get(), MetricsModes.fromString("full")); + Assert.assertEquals(Full.get(), MetricsModes.fromString("FULL")); + } + + @Test + public void testInvalidTruncationLength() { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("length should be positive"); + MetricsModes.fromString("truncate(0)"); + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 2782eea1ec8a..b0da34020949 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -28,6 +28,7 @@ import java.util.function.Function; import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; @@ -63,9 +64,6 @@ import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_TRUNCATE_BYTES; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_TRUNCATE_BYTES_DEFAULT; - public class Parquet { private Parquet() { @@ -86,6 +84,7 @@ public static class WriteBuilder { private Map metadata = Maps.newLinkedHashMap(); private Map config = Maps.newLinkedHashMap(); private Function> createWriterFunc = null; + private MetricsConfig metricsConfig = MetricsConfig.getDefault(); private WriteBuilder(OutputFile file) { this.file = file; @@ -94,6 +93,7 @@ private WriteBuilder(OutputFile file) { public WriteBuilder forTable(Table table) { schema(table.schema()); setAll(table.properties()); + metricsConfig(MetricsConfig.fromProperties(table.properties())); return this; } @@ -133,6 +133,11 @@ public WriteBuilder createWriterFunc( return this; } + public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { + this.metricsConfig = newMetricsConfig; + return this; + } + @SuppressWarnings("unchecked") private WriteSupport getWriteSupport(MessageType type) { if (writeSupport != null) { @@ -168,9 +173,6 @@ public FileAppender build() throws IOException { PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT)); int dictionaryPageSize = Integer.parseInt(config.getOrDefault( PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT)); - int statsTruncateLength = Integer.parseInt(config.getOrDefault( - WRITE_METADATA_TRUNCATE_BYTES, String.valueOf(WRITE_METADATA_TRUNCATE_BYTES_DEFAULT))); - WriterVersion writerVersion = WriterVersion.PARQUET_1_0; @@ -198,8 +200,8 @@ public FileAppender build() throws IOException { .build(); return new org.apache.iceberg.parquet.ParquetWriter<>( - conf, file, schema, rowGroupSize, statsTruncateLength, metadata, - createWriterFunc, codec(), parquetProperties); + conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(), + parquetProperties, metricsConfig); } else { return new ParquetWriteAdapter<>(new ParquetWriteBuilder(ParquetIO.file(file)) .withWriterVersion(writerVersion) @@ -212,7 +214,8 @@ public FileAppender build() throws IOException { .withRowGroupSize(rowGroupSize) .withPageSize(pageSize) .withDictionaryPageSize(dictionaryPageSize) - .build(), statsTruncateLength); + .build(), + metricsConfig); } } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index e1b4d39084b3..58e093dea761 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -31,8 +31,10 @@ import java.util.Map; import java.util.Set; import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.MetricsModes; +import org.apache.iceberg.MetricsModes.MetricsMode; import org.apache.iceberg.Schema; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.InputFile; @@ -60,18 +62,18 @@ private ParquetUtil() { // Access modifier is package-private, to only allow use from existing tests static Metrics fileMetrics(InputFile file) { - return fileMetrics(file, TableProperties.WRITE_METADATA_TRUNCATE_BYTES_DEFAULT); + return fileMetrics(file, MetricsConfig.getDefault()); } - public static Metrics fileMetrics(InputFile file, int statsTruncateLength) { + public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) { try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(file))) { - return footerMetrics(reader.getFooter(), statsTruncateLength); + return footerMetrics(reader.getFooter(), metricsConfig); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read footer of file: %s", file); } } - public static Metrics footerMetrics(ParquetMetadata metadata, int statsTruncateLength) { + public static Metrics footerMetrics(ParquetMetadata metadata, MetricsConfig metricsConfig) { long rowCount = 0; Map columnSizes = Maps.newHashMap(); Map valueCounts = Maps.newHashMap(); @@ -90,6 +92,12 @@ public static Metrics footerMetrics(ParquetMetadata metadata, int statsTruncateL ColumnPath path = column.getPath(); int fieldId = fileSchema.aliasToId(path.toDotString()); increment(columnSizes, fieldId, column.getTotalSize()); + + String columnName = fileSchema.findColumnName(fieldId); + MetricsMode metricsMode = metricsConfig.columnMode(columnName); + if (metricsMode == MetricsModes.None.get()) { + continue; + } increment(valueCounts, fieldId, column.getValueCount()); Statistics stats = column.getStatistics(); @@ -98,15 +106,14 @@ public static Metrics footerMetrics(ParquetMetadata metadata, int statsTruncateL } else if (!stats.isEmpty()) { increment(nullValueCounts, fieldId, stats.getNumNulls()); - Types.NestedField field = fileSchema.findField(fieldId); - if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema) - && statsTruncateLength > 0) { - updateMin(lowerBounds, fieldId, field.type(), - fromParquetPrimitive(field.type(), column.getPrimitiveType(), - stats.genericGetMin()), statsTruncateLength); - updateMax(upperBounds, fieldId, field.type(), - fromParquetPrimitive(field.type(), column.getPrimitiveType(), - stats.genericGetMax()), statsTruncateLength); + if (metricsMode != MetricsModes.Counts.get()) { + Types.NestedField field = fileSchema.findField(fieldId); + if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)) { + Literal min = fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin()); + updateMin(lowerBounds, fieldId, field.type(), min, metricsMode); + Literal max = fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax()); + updateMax(upperBounds, fieldId, field.type(), max, metricsMode); + } } } } @@ -165,38 +172,50 @@ private static void increment(Map columns, int fieldId, long amou @SuppressWarnings("unchecked") private static void updateMin(Map> lowerBounds, int id, Type type, - Literal min, int truncateLength) { + Literal min, MetricsMode metricsMode) { Literal currentMin = (Literal) lowerBounds.get(id); if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) { - switch (type.typeId()) { - case STRING: - lowerBounds.put(id, truncateStringMin((Literal) min, truncateLength)); - break; - case FIXED: - case BINARY: - lowerBounds.put(id, truncateBinaryMin((Literal) min, truncateLength)); - break; - default: - lowerBounds.put(id, min); + if (metricsMode == MetricsModes.Full.get()) { + lowerBounds.put(id, min); + } else { + MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; + int truncateLength = truncateMode.length(); + switch (type.typeId()) { + case STRING: + lowerBounds.put(id, truncateStringMin((Literal) min, truncateLength)); + break; + case FIXED: + case BINARY: + lowerBounds.put(id, truncateBinaryMin((Literal) min, truncateLength)); + break; + default: + lowerBounds.put(id, min); + } } } } @SuppressWarnings("unchecked") private static void updateMax(Map> upperBounds, int id, Type type, - Literal max, int truncateLength) { + Literal max, MetricsMode metricsMode) { Literal currentMax = (Literal) upperBounds.get(id); if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) { - switch (type.typeId()) { - case STRING: - upperBounds.put(id, truncateStringMax((Literal) max, truncateLength)); - break; - case FIXED: - case BINARY: - upperBounds.put(id, truncateBinaryMax((Literal) max, truncateLength)); - break; - default: - upperBounds.put(id, max); + if (metricsMode == MetricsModes.Full.get()) { + upperBounds.put(id, max); + } else { + MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; + int truncateLength = truncateMode.length(); + switch (type.typeId()) { + case STRING: + upperBounds.put(id, truncateStringMax((Literal) max, truncateLength)); + break; + case FIXED: + case BINARY: + upperBounds.put(id, truncateBinaryMax((Literal) max, truncateLength)); + break; + default: + upperBounds.put(id, max); + } } } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java index 40bde82df6b5..463fb9a80497 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java @@ -23,19 +23,20 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.ParquetMetadata; public class ParquetWriteAdapter implements FileAppender { - private ParquetWriter writer = null; - private ParquetMetadata footer = null; - private int statsTruncateLength; + private ParquetWriter writer; + private MetricsConfig metricsConfig; + private ParquetMetadata footer; - public ParquetWriteAdapter(ParquetWriter writer, int statsTruncateLength) { + public ParquetWriteAdapter(ParquetWriter writer, MetricsConfig metricsConfig) { this.writer = writer; - this.statsTruncateLength = statsTruncateLength; + this.metricsConfig = metricsConfig; } @Override @@ -50,7 +51,7 @@ public void add(D datum) { @Override public Metrics metrics() { Preconditions.checkState(footer != null, "Cannot produce metrics until closed"); - return ParquetUtil.footerMetrics(footer, statsTruncateLength); + return ParquetUtil.footerMetrics(footer, metricsConfig); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index e80920e4059b..c0956a11684e 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -27,6 +27,7 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; @@ -68,29 +69,29 @@ class ParquetWriter implements FileAppender, Closeable { private final MessageType parquetSchema; private final ParquetValueWriter model; private final ParquetFileWriter writer; + private final MetricsConfig metricsConfig; private DynMethods.BoundMethod flushPageStoreToWriter; private ColumnWriteStore writeStore; private long nextRowGroupSize = 0; private long recordCount = 0; private long nextCheckRecordCount = 10; - private int statsTruncateLength; @SuppressWarnings("unchecked") ParquetWriter(Configuration conf, OutputFile output, Schema schema, long rowGroupSize, - int statsTruncateLength, Map metadata, Function> createWriterFunc, CompressionCodecName codec, - ParquetProperties properties) { + ParquetProperties properties, + MetricsConfig metricsConfig) { this.output = output; this.targetRowGroupSize = rowGroupSize; this.props = properties; - this.statsTruncateLength = statsTruncateLength; this.metadata = ImmutableMap.copyOf(metadata); this.compressor = new CodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); this.parquetSchema = convert(schema, "table"); this.model = (ParquetValueWriter) createWriterFunc.apply(parquetSchema); + this.metricsConfig = metricsConfig; try { this.writer = new ParquetFileWriter(ParquetIO.file(output, conf), parquetSchema, @@ -118,7 +119,7 @@ public void add(T value) { @Override public Metrics metrics() { - return ParquetUtil.footerMetrics(writer.getFooter(), statsTruncateLength); + return ParquetUtil.footerMetrics(writer.getFooter(), metricsConfig); } @Override diff --git a/site/docs/configuration.md b/site/docs/configuration.md index ccce10209c3a..3b54405889f9 100644 --- a/site/docs/configuration.md +++ b/site/docs/configuration.md @@ -23,7 +23,8 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.compression-codec | gzip | Parquet compression codec | | write.avro.compression-codec | gzip | Avro compression codec | | write.metadata.compression-codec | none | Metadata compression codec; none or gzip | - +| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full | +| write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full | ### Table behavior properties diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java index c149c2375bca..d0d50bfffcdc 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -39,6 +39,7 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PendingUpdate; import org.apache.iceberg.ReplacePartitions; @@ -251,12 +252,14 @@ private class SparkAppenderFactory implements AppenderFactory { @Override public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { Schema schema = spec.schema(); + MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { case PARQUET: return Parquet.write(file) .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(schema, msgType)) .setAll(properties) + .metricsConfig(metricsConfig) .schema(schema) .build(); diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala index d7bb2ed9e02e..71f5625cca72 100644 --- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala +++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala @@ -24,7 +24,7 @@ import java.nio.ByteBuffer import java.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.iceberg.{DataFile, DataFiles, Metrics, PartitionSpec, TableProperties} +import org.apache.iceberg.{DataFile, DataFiles, Metrics, MetricsConfig, PartitionSpec} import org.apache.iceberg.hadoop.HadoopInputFile import org.apache.iceberg.orc.OrcMetrics import org.apache.iceberg.parquet.ParquetUtil @@ -232,14 +232,14 @@ object SparkTableUtil { //noinspection ScalaDeprecation private def listParquetPartition( partitionPath: Map[String, String], - partitionUri: String): Seq[SparkDataFile] = { + partitionUri: String, + metricsSpec: MetricsConfig = MetricsConfig.getDefault): Seq[SparkDataFile] = { val conf = new Configuration() val partition = new Path(partitionUri) val fs = partition.getFileSystem(conf) fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat => - val metrics = ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), - TableProperties.WRITE_METADATA_TRUNCATE_BYTES_DEFAULT) + val metrics = ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), metricsSpec) SparkDataFile( stat.getPath.toString, diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java new file mode 100644 index 000000000000..1eaf341764e3 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.spark.SparkSchemaUtil.convert; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestWriteMetricsConfig { + + private static final Configuration CONF = new Configuration(); + private static final Schema SIMPLE_SCHEMA = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()) + ); + private static final Schema COMPLEX_SCHEMA = new Schema( + required(1, "longCol", Types.IntegerType.get()), + optional(2, "strCol", Types.StringType.get()), + required(3, "record", Types.StructType.of( + required(4, "id", Types.IntegerType.get()), + required(5, "data", Types.StringType.get()) + )) + ); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static SparkSession spark = null; + private static JavaSparkContext sc = null; + + @BeforeClass + public static void startSpark() { + TestWriteMetricsConfig.spark = SparkSession.builder().master("local[2]").getOrCreate(); + TestWriteMetricsConfig.sc = new JavaSparkContext(spark.sparkContext()); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestWriteMetricsConfig.spark; + TestWriteMetricsConfig.spark = null; + TestWriteMetricsConfig.sc = null; + currentSpark.stop(); + } + + @Test + public void testFullMetricsCollectionForParquet() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "full"); + Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset df = spark.createDataFrame(expectedRecords, SimpleRecord.class); + df.select("id", "data") + .coalesce(1) + .write() + .format("iceberg") + .option("write-format", "parquet") + .mode("append") + .save(tableLocation); + + for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) { + DataFile file = task.file(); + Assert.assertEquals(2, file.nullValueCounts().size()); + Assert.assertEquals(2, file.valueCounts().size()); + Assert.assertEquals(2, file.lowerBounds().size()); + Assert.assertEquals(2, file.upperBounds().size()); + } + } + + @Test + public void testCountMetricsCollectionForParquet() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts"); + Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset df = spark.createDataFrame(expectedRecords, SimpleRecord.class); + df.select("id", "data") + .coalesce(1) + .write() + .format("iceberg") + .option("write-format", "parquet") + .mode("append") + .save(tableLocation); + + for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) { + DataFile file = task.file(); + Assert.assertEquals(2, file.nullValueCounts().size()); + Assert.assertEquals(2, file.valueCounts().size()); + Assert.assertTrue(file.lowerBounds().isEmpty()); + Assert.assertTrue(file.upperBounds().isEmpty()); + } + } + + @Test + public void testNoMetricsCollectionForParquet() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none"); + Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset df = spark.createDataFrame(expectedRecords, SimpleRecord.class); + df.select("id", "data") + .coalesce(1) + .write() + .format("iceberg") + .option("write-format", "parquet") + .mode("append") + .save(tableLocation); + + for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) { + DataFile file = task.file(); + Assert.assertTrue(file.nullValueCounts().isEmpty()); + Assert.assertTrue(file.valueCounts().isEmpty()); + Assert.assertTrue(file.lowerBounds().isEmpty()); + Assert.assertTrue(file.upperBounds().isEmpty()); + } + } + + @Test + public void testCustomMetricCollectionForParquet() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts"); + properties.put("write.metadata.metrics.column.id", "full"); + Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset df = spark.createDataFrame(expectedRecords, SimpleRecord.class); + df.select("id", "data") + .coalesce(1) + .write() + .format("iceberg") + .option("write-format", "parquet") + .mode("append") + .save(tableLocation); + + Schema schema = table.schema(); + Types.NestedField id = schema.findField("id"); + for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) { + DataFile file = task.file(); + Assert.assertEquals(2, file.nullValueCounts().size()); + Assert.assertEquals(2, file.valueCounts().size()); + Assert.assertEquals(1, file.lowerBounds().size()); + Assert.assertTrue(file.lowerBounds().containsKey(id.fieldId())); + Assert.assertEquals(1, file.upperBounds().size()); + Assert.assertTrue(file.upperBounds().containsKey(id.fieldId())); + } + } + + @Test + public void testCustomMetricCollectionForNestedParquet() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(COMPLEX_SCHEMA) + .identity("strCol") + .build(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none"); + properties.put("write.metadata.metrics.column.longCol", "counts"); + properties.put("write.metadata.metrics.column.record.id", "full"); + properties.put("write.metadata.metrics.column.record.data", "truncate(2)"); + Table table = tables.create(COMPLEX_SCHEMA, spec, properties, tableLocation); + + Iterable rows = RandomData.generateSpark(COMPLEX_SCHEMA, 10, 0); + JavaRDD rdd = sc.parallelize(Lists.newArrayList(rows)); + Dataset df = spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(COMPLEX_SCHEMA), false); + + df.coalesce(1).write() + .format("iceberg") + .option("write-format", "parquet") + .mode("append") + .save(tableLocation); + + Schema schema = table.schema(); + Types.NestedField longCol = schema.findField("longCol"); + Types.NestedField recordId = schema.findField("record.id"); + Types.NestedField recordData = schema.findField("record.data"); + for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) { + DataFile file = task.file(); + + Map nullValueCounts = file.nullValueCounts(); + Assert.assertEquals(3, nullValueCounts.size()); + Assert.assertTrue(nullValueCounts.containsKey(longCol.fieldId())); + Assert.assertTrue(nullValueCounts.containsKey(recordId.fieldId())); + Assert.assertTrue(nullValueCounts.containsKey(recordData.fieldId())); + + Map valueCounts = file.valueCounts(); + Assert.assertEquals(3, valueCounts.size()); + Assert.assertTrue(valueCounts.containsKey(longCol.fieldId())); + Assert.assertTrue(valueCounts.containsKey(recordId.fieldId())); + Assert.assertTrue(valueCounts.containsKey(recordData.fieldId())); + + Map lowerBounds = file.lowerBounds(); + Assert.assertEquals(2, lowerBounds.size()); + Assert.assertTrue(lowerBounds.containsKey(recordId.fieldId())); + ByteBuffer recordDataLowerBound = lowerBounds.get(recordData.fieldId()); + Assert.assertEquals(2, ByteBuffers.toByteArray(recordDataLowerBound).length); + + Map upperBounds = file.upperBounds(); + Assert.assertEquals(2, upperBounds.size()); + Assert.assertTrue(upperBounds.containsKey(recordId.fieldId())); + ByteBuffer recordDataUpperBound = upperBounds.get(recordData.fieldId()); + Assert.assertEquals(2, ByteBuffers.toByteArray(recordDataUpperBound).length); + } + } +}