Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

class GenericDataFile extends BaseFile<DataFile> implements DataFile {
public class GenericDataFile extends BaseFile<DataFile> implements DataFile {
/**
* Used by Avro reflection to instantiate this class when reading manifest files.
*/
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/PartitionData.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

class PartitionData
public class PartitionData
implements IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {

static Schema partitionDataSchema(Types.StructType partitionType) {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,7 @@ private TableProperties() {

public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;

public static final String WRITE_AUTO_COMPACT_FILES = "write.auto-compact-files";
public static final boolean WRITE_AUTO_COMPACT_FILES_DEFAULT = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.actions;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.flink.source.RowDataIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;

public class SyncRewriteDataFilesAction extends BaseRewriteDataFilesAction<SyncRewriteDataFilesAction> {

private static final Logger LOG = LoggerFactory.getLogger(SyncRewriteDataFilesAction.class);

private final Schema schema;
private final FileFormat format;
private final String nameMapping;
private final FileIO io;
private final boolean caseSensitive;
private final EncryptionManager encryptionManager;
private final TaskWriterFactory<RowData> taskWriterFactory;
private TaskWriter<RowData> writer;
private int subTaskId;
private int attemptId;
private Exception exception;

public SyncRewriteDataFilesAction(Table table, int subTaskId, int attemptId) {
super(table);
this.schema = table.schema();
this.caseSensitive = caseSensitive();
this.io = fileIO();
this.encryptionManager = encryptionManager();
this.nameMapping = PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null);

String formatString = PropertyUtil.propertyAsString(table.properties(), TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
this.taskWriterFactory = new RowDataTaskWriterFactory(
table.schema(),
flinkSchema,
table.spec(),
table.locationProvider(),
io,
encryptionManager,
Long.MAX_VALUE,
format,
table.properties(),
null);

// Initialize the task writer factory.
this.taskWriterFactory.initialize(subTaskId, attemptId);
this.subTaskId = subTaskId;
this.attemptId = attemptId;
}

@Override
protected FileIO fileIO() {
return table().io();
}

@Override
protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTask) {
synchronized (SyncRewriteDataFilesAction.class) {
List<DataFile> dataFiles = new ArrayList<>();
// Initialize the task writer.
this.writer = taskWriterFactory.create();
for (CombinedScanTask task : combinedScanTask) {
try (RowDataIterator iterator =
new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)) {
while (iterator.hasNext()) {
RowData rowData = iterator.next();
writer.write(rowData);
}
dataFiles.addAll(Lists.newArrayList(writer.dataFiles()));
} catch (Throwable originalThrowable) {
try {
LOG.error("Aborting commit for (subTaskId {}, attemptId {})", subTaskId, attemptId);
writer.abort();
LOG.error("Aborted commit for (subTaskId {}, attemptId {})", subTaskId, attemptId);
} catch (Throwable inner) {
if (originalThrowable != inner) {
originalThrowable.addSuppressed(inner);
LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), inner);
}
}

if (originalThrowable instanceof Exception) {
this.exception = (Exception) originalThrowable;
} else {
this.exception = new RuntimeException(originalThrowable);
}
}
}
return dataFiles;
}
}

@Override
protected SyncRewriteDataFilesAction self() {
return this;
}

public Exception getException() {
return this.exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
Expand All @@ -39,13 +42,18 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.actions.SyncRewriteDataFilesAction;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -54,11 +62,19 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_AUTO_COMPACT_FILES;
import static org.apache.iceberg.TableProperties.WRITE_AUTO_COMPACT_FILES_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;

class IcebergFilesCommitter extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {

Expand Down Expand Up @@ -109,6 +125,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// All pending checkpoints states for this function.
private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
private transient SyncRewriteDataFilesAction action;
private transient BaseRewriteDataFilesAction<SyncRewriteDataFilesAction> rewriteDataFilesAction;

IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
this.tableLoader = tableLoader;
Expand Down Expand Up @@ -171,23 +189,62 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
jobIdState.clear();
jobIdState.add(flinkJobId);

if (getAutoCompactFiles(table.properties())) {
boolean hasNewData = false;
Map<String, Set<Object>> partitions = new HashMap<>();
for (WriteResult ckpt : writeResultsOfCurrentCkpt) {
if (ckpt.dataFiles().length > 0 || ckpt.deleteFiles().length > 0) {
hasNewData = true;
}
Arrays.stream(ckpt.dataFiles()).forEach(e -> setPartitionData(e, partitions));
Arrays.stream(ckpt.deleteFiles()).forEach(e -> setPartitionData(e, partitions));
}
if (hasNewData) {
action = new SyncRewriteDataFilesAction(table,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getAttemptNumber());
rewriteDataFilesAction = action
.targetSizeInBytes(getTargetFileSizeBytes(table.properties()))
.splitOpenFileCost(getSplitOpenFileCost(table.properties()));

for (Map.Entry<String, Set<Object>> p : partitions.entrySet()) {
for (Object pValue : p.getValue()) {
rewriteDataFilesAction
.filter(Expressions.equal(p.getKey(), pValue));
}
}
} else {
rewriteDataFilesAction = null;
}
}

// Clear the local buffer for current checkpoint.
writeResultsOfCurrentCkpt.clear();
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
// It's possible that we have the following events:
// 1. snapshotState(ckpId);
// 2. snapshotState(ckpId+1);
// 3. notifyCheckpointComplete(ckpId+1);
// 4. notifyCheckpointComplete(ckpId);
// For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
// sync exec commit checkpoint and compact datafile
synchronized (IcebergFilesCommitter.class) {
super.notifyCheckpointComplete(checkpointId);
// It's possible that we have the following events:
// 1. snapshotState(ckpId);
// 2. snapshotState(ckpId+1);
// 3. notifyCheckpointComplete(ckpId+1);
// 4. notifyCheckpointComplete(ckpId);
// For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
}

if (rewriteDataFilesAction != null) {
rewriteDataFilesAction.execute();
if (action.getException() != null) {
throw action.getException();
}
}
}
}

Expand Down Expand Up @@ -297,8 +354,9 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
}
}

private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description,
String newFlinkJobId, long checkpointId) {
private void commitOperation(
SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description,
String newFlinkJobId, long checkpointId) {
LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles,
numDeleteFiles, table);
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
Expand Down Expand Up @@ -376,4 +434,37 @@ static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {

return lastCommittedCheckpointId;
}

private static void setPartitionData(ContentFile file, Map<String, Set<Object>> partitions) {
for (int i = 0; i < file.partition().size(); i++) {
String partitionName = ((PartitionData) file.partition()).getPartitionType().fields().get(i).name();
Object partitionValue = Conversions.fromPartitionString(((PartitionData) file.partition()).getType(i),
((PartitionData) file.partition()).get(i).toString());
if (!partitions.containsKey(partitionName)) {
partitions.put(partitionName, new HashSet<>());
}
partitions.get(partitionName).add(partitionValue);
}
}

private static long getTargetFileSizeBytes(Map<String, String> properties) {
return PropertyUtil.propertyAsLong(
properties,
WRITE_TARGET_FILE_SIZE_BYTES,
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
}

private static boolean getAutoCompactFiles(Map<String, String> properties) {
return PropertyUtil.propertyAsBoolean(
properties,
WRITE_AUTO_COMPACT_FILES,
WRITE_AUTO_COMPACT_FILES_DEFAULT);
}

private static long getSplitOpenFileCost(Map<String, String> properties) {
return PropertyUtil.propertyAsLong(
properties,
SPLIT_OPEN_FILE_COST,
SPLIT_OPEN_FILE_COST_DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;

class RowDataIterator extends DataIterator<RowData> {
public class RowDataIterator extends DataIterator<RowData> {

private final Schema tableSchema;
private final Schema projectedSchema;
private final String nameMapping;
private final boolean caseSensitive;

RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
public RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
Schema projectedSchema, String nameMapping, boolean caseSensitive) {
super(task, io, encryption);
this.tableSchema = tableSchema;
Expand Down