diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index e7070d7a8258..0d11067d33ce 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -26,7 +26,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; -class GenericDataFile extends BaseFile implements DataFile { +public class GenericDataFile extends BaseFile implements DataFile { /** * Used by Avro reflection to instantiate this class when reading manifest files. */ diff --git a/core/src/main/java/org/apache/iceberg/PartitionData.java b/core/src/main/java/org/apache/iceberg/PartitionData.java index 2b8c0db22110..3c3795345dba 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionData.java +++ b/core/src/main/java/org/apache/iceberg/PartitionData.java @@ -36,7 +36,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -class PartitionData +public class PartitionData implements IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable { static Schema partitionDataSchema(Types.StructType partitionType) { diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 56d921b918a6..c161ea765528 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -188,4 +188,7 @@ private TableProperties() { public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled"; public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true; + + public static final String WRITE_AUTO_COMPACT_FILES = "write.auto-compact-files"; + public static final boolean WRITE_AUTO_COMPACT_FILES_DEFAULT = false; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/SyncRewriteDataFilesAction.java b/flink/src/main/java/org/apache/iceberg/flink/actions/SyncRewriteDataFilesAction.java new file mode 100644 index 000000000000..e6b438a50d4a --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/actions/SyncRewriteDataFilesAction.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.TaskWriterFactory; +import org.apache.iceberg.flink.source.RowDataIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +public class SyncRewriteDataFilesAction extends BaseRewriteDataFilesAction { + + private static final Logger LOG = LoggerFactory.getLogger(SyncRewriteDataFilesAction.class); + + private final Schema schema; + private final FileFormat format; + private final String nameMapping; + private final FileIO io; + private final boolean caseSensitive; + private final EncryptionManager encryptionManager; + private final TaskWriterFactory taskWriterFactory; + private TaskWriter writer; + private int subTaskId; + private int attemptId; + private Exception exception; + + public SyncRewriteDataFilesAction(Table table, int subTaskId, int attemptId) { + super(table); + this.schema = table.schema(); + this.caseSensitive = caseSensitive(); + this.io = fileIO(); + this.encryptionManager = encryptionManager(); + this.nameMapping = PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null); + + String formatString = PropertyUtil.propertyAsString(table.properties(), TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); + this.taskWriterFactory = new RowDataTaskWriterFactory( + table.schema(), + flinkSchema, + table.spec(), + table.locationProvider(), + io, + encryptionManager, + Long.MAX_VALUE, + format, + table.properties(), + null); + + // Initialize the task writer factory. + this.taskWriterFactory.initialize(subTaskId, attemptId); + this.subTaskId = subTaskId; + this.attemptId = attemptId; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List rewriteDataForTasks(List combinedScanTask) { + synchronized (SyncRewriteDataFilesAction.class) { + List dataFiles = new ArrayList<>(); + // Initialize the task writer. + this.writer = taskWriterFactory.create(); + for (CombinedScanTask task : combinedScanTask) { + try (RowDataIterator iterator = + new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)) { + while (iterator.hasNext()) { + RowData rowData = iterator.next(); + writer.write(rowData); + } + dataFiles.addAll(Lists.newArrayList(writer.dataFiles())); + } catch (Throwable originalThrowable) { + try { + LOG.error("Aborting commit for (subTaskId {}, attemptId {})", subTaskId, attemptId); + writer.abort(); + LOG.error("Aborted commit for (subTaskId {}, attemptId {})", subTaskId, attemptId); + } catch (Throwable inner) { + if (originalThrowable != inner) { + originalThrowable.addSuppressed(inner); + LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), inner); + } + } + + if (originalThrowable instanceof Exception) { + this.exception = (Exception) originalThrowable; + } else { + this.exception = new RuntimeException(originalThrowable); + } + } + } + return dataFiles; + } + } + + @Override + protected SyncRewriteDataFilesAction self() { + return this; + } + + public Exception getException() { + return this.exception; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index ff9174a84399..67575f95d0df 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -22,9 +22,12 @@ import java.io.IOException; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import java.util.SortedMap; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -39,13 +42,18 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.actions.SyncRewriteDataFilesAction; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -54,11 +62,19 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_AUTO_COMPACT_FILES; +import static org.apache.iceberg.TableProperties.WRITE_AUTO_COMPACT_FILES_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + class IcebergFilesCommitter extends AbstractStreamOperator implements OneInputStreamOperator, BoundedOneInput { @@ -109,6 +125,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator // All pending checkpoints states for this function. private static final ListStateDescriptor> STATE_DESCRIPTOR = buildStateDescriptor(); private transient ListState> checkpointsState; + private transient SyncRewriteDataFilesAction action; + private transient BaseRewriteDataFilesAction rewriteDataFilesAction; IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) { this.tableLoader = tableLoader; @@ -171,23 +189,62 @@ public void snapshotState(StateSnapshotContext context) throws Exception { jobIdState.clear(); jobIdState.add(flinkJobId); + if (getAutoCompactFiles(table.properties())) { + boolean hasNewData = false; + Map> partitions = new HashMap<>(); + for (WriteResult ckpt : writeResultsOfCurrentCkpt) { + if (ckpt.dataFiles().length > 0 || ckpt.deleteFiles().length > 0) { + hasNewData = true; + } + Arrays.stream(ckpt.dataFiles()).forEach(e -> setPartitionData(e, partitions)); + Arrays.stream(ckpt.deleteFiles()).forEach(e -> setPartitionData(e, partitions)); + } + if (hasNewData) { + action = new SyncRewriteDataFilesAction(table, + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getAttemptNumber()); + rewriteDataFilesAction = action + .targetSizeInBytes(getTargetFileSizeBytes(table.properties())) + .splitOpenFileCost(getSplitOpenFileCost(table.properties())); + + for (Map.Entry> p : partitions.entrySet()) { + for (Object pValue : p.getValue()) { + rewriteDataFilesAction + .filter(Expressions.equal(p.getKey(), pValue)); + } + } + } else { + rewriteDataFilesAction = null; + } + } + // Clear the local buffer for current checkpoint. writeResultsOfCurrentCkpt.clear(); } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - super.notifyCheckpointComplete(checkpointId); - // It's possible that we have the following events: - // 1. snapshotState(ckpId); - // 2. snapshotState(ckpId+1); - // 3. notifyCheckpointComplete(ckpId+1); - // 4. notifyCheckpointComplete(ckpId); - // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files, - // Besides, we need to maintain the max-committed-checkpoint-id to be increasing. - if (checkpointId > maxCommittedCheckpointId) { - commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId); - this.maxCommittedCheckpointId = checkpointId; + // sync exec commit checkpoint and compact datafile + synchronized (IcebergFilesCommitter.class) { + super.notifyCheckpointComplete(checkpointId); + // It's possible that we have the following events: + // 1. snapshotState(ckpId); + // 2. snapshotState(ckpId+1); + // 3. notifyCheckpointComplete(ckpId+1); + // 4. notifyCheckpointComplete(ckpId); + // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files, + // Besides, we need to maintain the max-committed-checkpoint-id to be increasing. + if (checkpointId > maxCommittedCheckpointId) { + commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId); + this.maxCommittedCheckpointId = checkpointId; + } + + if (rewriteDataFilesAction != null) { + rewriteDataFilesAction.execute(); + if (action.getException() != null) { + throw action.getException(); + } + } } } @@ -297,8 +354,9 @@ private void commitDeltaTxn(NavigableMap pendingResults, Stri } } - private void commitOperation(SnapshotUpdate operation, int numDataFiles, int numDeleteFiles, String description, - String newFlinkJobId, long checkpointId) { + private void commitOperation( + SnapshotUpdate operation, int numDataFiles, int numDeleteFiles, String description, + String newFlinkJobId, long checkpointId) { LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles, numDeleteFiles, table); operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); @@ -376,4 +434,37 @@ static long getMaxCommittedCheckpointId(Table table, String flinkJobId) { return lastCommittedCheckpointId; } + + private static void setPartitionData(ContentFile file, Map> partitions) { + for (int i = 0; i < file.partition().size(); i++) { + String partitionName = ((PartitionData) file.partition()).getPartitionType().fields().get(i).name(); + Object partitionValue = Conversions.fromPartitionString(((PartitionData) file.partition()).getType(i), + ((PartitionData) file.partition()).get(i).toString()); + if (!partitions.containsKey(partitionName)) { + partitions.put(partitionName, new HashSet<>()); + } + partitions.get(partitionName).add(partitionValue); + } + } + + private static long getTargetFileSizeBytes(Map properties) { + return PropertyUtil.propertyAsLong( + properties, + WRITE_TARGET_FILE_SIZE_BYTES, + WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + } + + private static boolean getAutoCompactFiles(Map properties) { + return PropertyUtil.propertyAsBoolean( + properties, + WRITE_AUTO_COMPACT_FILES, + WRITE_AUTO_COMPACT_FILES_DEFAULT); + } + + private static long getSplitOpenFileCost(Map properties) { + return PropertyUtil.propertyAsLong( + properties, + SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT); + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java index 5a568144d1f7..b907c95edcdb 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java @@ -47,14 +47,14 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; -class RowDataIterator extends DataIterator { +public class RowDataIterator extends DataIterator { private final Schema tableSchema; private final Schema projectedSchema; private final String nameMapping; private final boolean caseSensitive; - RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema, + public RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) { super(task, io, encryption); this.tableSchema = tableSchema;