Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

class ParquetVariantUtil {
public class ParquetVariantUtil {
private ParquetVariantUtil() {}

/**
Expand Down Expand Up @@ -212,7 +212,7 @@ static int scale(PrimitiveType primitive) {
* @param value a variant value
* @return a Parquet schema that can fully shred the value
*/
static Type toParquetSchema(VariantValue value) {
public static Type toParquetSchema(VariantValue value) {
return VariantVisitor.visit(value, new ParquetSchemaProducer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final Map<String, String> metadata;
private final ParquetProperties props;
private final CodecFactory.BytesCompressor compressor;
private final MessageType parquetSchema;
private MessageType parquetSchema;
private final ParquetValueWriter<T> model;
private final MetricsConfig metricsConfig;
private final int columnIndexTruncateLength;
Expand Down Expand Up @@ -134,6 +134,30 @@ private void ensureWriterInitialized() {

@Override
public void add(T value) {
if (model instanceof WriterLazyInitializable) {
WriterLazyInitializable lazy = (WriterLazyInitializable) model;
if (lazy.needsInitialization()) {
model.write(0, value);
recordCount += 1;

if (!lazy.needsInitialization()) {
WriterLazyInitializable.InitializationResult result =
lazy.initialize(props, compressor, rowGroupOrdinal);
this.parquetSchema = result.getSchema();
this.pageStore = result.getPageStore();
this.writeStore = result.getWriteStore();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the initial writeStore/pageStore from startRowGroup() aren’t closed before being replaced here. Could this cause memory leak?


// Re-initialize the file writer with the new schema
ensureWriterInitialized();

// Buffered rows were already written with endRecord() calls
// in the lazy writer's initialization, so we don't call endRecord() here
checkSize();
}
return;
}
}

recordCount += 1;
model.write(0, value);
writeStore.endRecord();
Expand Down Expand Up @@ -255,6 +279,21 @@ private void startRowGroup() {
public void close() throws IOException {
if (!closed) {
this.closed = true;

// Force initialization if lazy writer still has buffered data
if (model instanceof WriterLazyInitializable) {
WriterLazyInitializable lazy = (WriterLazyInitializable) model;
if (lazy.needsInitialization()) {
WriterLazyInitializable.InitializationResult result =
lazy.initialize(props, compressor, rowGroupOrdinal);
this.parquetSchema = result.getSchema();
this.pageStore = result.getPageStore();
this.writeStore = result.getWriteStore();

ensureWriterInitialized();
}
}

flushRowGroup(true);
writeStore.close();
if (writer != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;

import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
import org.apache.parquet.schema.MessageType;

/**
* Interface for ParquetValueWriters that need to defer initialization until they can analyze the
* data. This is useful for scenarios like variant shredding where the schema needs to be inferred
* from the actual data before creating the writer structures.
*
* <p>Writers implementing this interface can buffer initial rows and perform schema inference
* before committing to a final Parquet schema.
*/
public interface WriterLazyInitializable {
/**
* Result returned by lazy initialization of a ParquetValueWriter required by ParquetWriter.
* Contains the finalized schema and write stores after schema inference or other initialization
* logic.
*/
class InitializationResult {
private final MessageType schema;
private final ColumnChunkPageWriteStore pageStore;
private final ColumnWriteStore writeStore;

public InitializationResult(
MessageType schema, ColumnChunkPageWriteStore pageStore, ColumnWriteStore writeStore) {
this.schema = schema;
this.pageStore = pageStore;
this.writeStore = writeStore;
}

public MessageType getSchema() {
return schema;
}

public ColumnChunkPageWriteStore getPageStore() {
return pageStore;
}

public ColumnWriteStore getWriteStore() {
return writeStore;
}
}

/**
* Checks if this writer still needs initialization. This will return true until the writer has
* buffered enough data to perform initialization (e.g., schema inference).
*
* @return true if initialization is still needed, false if already initialized
*/
boolean needsInitialization();

/**
* Performs initialization and returns the result containing updated schema and write stores. This
* method should only be called when {@link #needsInitialization()} returns true.
*
* @param props Parquet properties needed for creating write stores
* @param compressor Bytes compressor for compression
* @param rowGroupOrdinal The ordinal number of the current row group
* @return InitializationResult containing the finalized schema and write stores
*/
InitializationResult initialize(
ParquetProperties props,
CompressionCodecFactory.BytesInputCompressor compressor,
int rowGroupOrdinal);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,14 @@ private SparkSQLProperties() {}
// Controls whether to report available column statistics to Spark for query optimization.
public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats";
public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;

// Controls whether to shred variant columns during write operations
public static final String SHRED_VARIANTS = "spark.sql.iceberg.shred-variants";
public static final boolean SHRED_VARIANTS_DEFAULT = true;

// Controls the buffer size for variant schema inference during writes
// This determines how many rows are buffered before inferring shredded schema
public static final String VARIANT_INFERENCE_BUFFER_SIZE =
"spark.sql.iceberg.variant.inference.buffer-size";
public static final int VARIANT_INFERENCE_BUFFER_SIZE_DEFAULT = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ private Map<String, String> dataWriteProperties() {
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}
writeProperties.put(SparkSQLProperties.SHRED_VARIANTS, String.valueOf(shredVariants()));
break;

case AVRO:
Expand Down Expand Up @@ -725,4 +726,13 @@ public DeleteGranularity deleteGranularity() {
.defaultValue(DeleteGranularity.FILE)
.parse();
}

public boolean shredVariants() {
return confParser
.booleanConf()
.option(SparkWriteOptions.SHRED_VARIANTS)
.sessionConf(SparkSQLProperties.SHRED_VARIANTS)
.defaultValue(SparkSQLProperties.SHRED_VARIANTS_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ private SparkWriteOptions() {}

// Overrides the delete granularity
public static final String DELETE_GRANULARITY = "delete-granularity";

// Controls whether to shred variant columns during write operations
public static final String SHRED_VARIANTS = "shred-variants";
}
Loading