Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/WriteResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,16 @@ private Builder() {
public Builder add(WriteResult result) {
addDataFiles(result.dataFiles);
addDeleteFiles(result.deleteFiles);
addReferencedDataFiles(result.referencedDataFiles);

return this;
}

public Builder addAll(Iterable<WriteResult> results) {
results.forEach(this::add);
return this;
}

public Builder addDataFiles(DataFile... files) {
Collections.addAll(dataFiles, files);
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.sink;

import java.util.List;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class DeltaManifests {

private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];

private final ManifestFile dataManifest;
private final ManifestFile deleteManifest;
private final CharSequence[] referencedDataFiles;

DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES);
}

DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) {
Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null.");

this.dataManifest = dataManifest;
this.deleteManifest = deleteManifest;
this.referencedDataFiles = referencedDataFiles;
}

ManifestFile dataManifest() {
return dataManifest;
}

ManifestFile deleteManifest() {
return deleteManifest;
}

CharSequence[] referencedDataFiles() {
return referencedDataFiles;
}

List<ManifestFile> manifests() {
List<ManifestFile> manifests = Lists.newArrayListWithCapacity(2);
if (dataManifest != null) {
manifests.add(dataManifest);
}

if (deleteManifest != null) {
manifests.add(deleteManifest);
}

return manifests;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.sink;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
private static final int VERSION_1 = 1;
private static final int VERSION_2 = 2;
private static final byte[] EMPTY_BINARY = new byte[0];

static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();

@Override
public int getVersion() {
return VERSION_2;
}

@Override
public byte[] serialize(DeltaManifests deltaManifests) throws IOException {
Preconditions.checkNotNull(deltaManifests, "DeltaManifests to be serialized should not be null");

ByteArrayOutputStream binaryOut = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(binaryOut);

byte[] dataManifestBinary = EMPTY_BINARY;
if (deltaManifests.dataManifest() != null) {
dataManifestBinary = ManifestFiles.encode(deltaManifests.dataManifest());
}

out.writeInt(dataManifestBinary.length);
out.write(dataManifestBinary);

byte[] deleteManifestBinary = EMPTY_BINARY;
if (deltaManifests.deleteManifest() != null) {
deleteManifestBinary = ManifestFiles.encode(deltaManifests.deleteManifest());
}

out.writeInt(deleteManifestBinary.length);
out.write(deleteManifestBinary);

CharSequence[] referencedDataFiles = deltaManifests.referencedDataFiles();
out.writeInt(referencedDataFiles.length);
for (int i = 0; i < referencedDataFiles.length; i++) {
out.writeUTF(referencedDataFiles[i].toString());
}

return binaryOut.toByteArray();
}

@Override
public DeltaManifests deserialize(int version, byte[] serialized) throws IOException {
if (version == VERSION_1) {
return deserializeV1(serialized);
} else if (version == VERSION_2) {
return deserializeV2(serialized);
} else {
throw new RuntimeException("Unknown serialize version: " + version);
}
}

private DeltaManifests deserializeV1(byte[] serialized) throws IOException {
return new DeltaManifests(ManifestFiles.decode(serialized), null);
}

private DeltaManifests deserializeV2(byte[] serialized) throws IOException {
ManifestFile dataManifest = null;
ManifestFile deleteManifest = null;

ByteArrayInputStream binaryIn = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(binaryIn);

int dataManifestSize = in.readInt();
if (dataManifestSize > 0) {
byte[] dataManifestBinary = new byte[dataManifestSize];
Preconditions.checkState(in.read(dataManifestBinary) == dataManifestSize);

dataManifest = ManifestFiles.decode(dataManifestBinary);
}

int deleteManifestSize = in.readInt();
if (deleteManifestSize > 0) {
byte[] deleteManifestBinary = new byte[deleteManifestSize];
Preconditions.checkState(in.read(deleteManifestBinary) == deleteManifestSize);

deleteManifest = ManifestFiles.decode(deleteManifestBinary);
}

int referenceDataFileNum = in.readInt();
CharSequence[] referencedDataFiles = new CharSequence[referenceDataFileNum];
for (int i = 0; i < referenceDataFileNum; i++) {
referencedDataFiles[i] = in.readUTF();
}

return new DeltaManifests(dataManifest, deleteManifest, referencedDataFiles);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
Expand All @@ -32,18 +34,19 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class FlinkManifestUtil {
private static final int ICEBERG_FORMAT_VERSION = 2;
private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;

private FlinkManifestUtil() {
}

static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
throws IOException {
ManifestWriter<DataFile> writer = ManifestFiles.write(ICEBERG_FORMAT_VERSION, spec, outputFile, DUMMY_SNAPSHOT_ID);
ManifestWriter<DataFile> writer = ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);

try (ManifestWriter<DataFile> closeableWriter = writer) {
closeableWriter.addAll(dataFiles);
Expand All @@ -63,4 +66,54 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli
TableOperations ops = ((HasTableOperations) table).operations();
return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
}

static DeltaManifests writeCompletedFiles(WriteResult result,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my own education, referencedDataFiles from WriteResult doesn't seem to be used (except for unit test). What is it for? do we need to serialize it too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should serialize it and add it to the commit. This is the set of files that is referenced by any positional delete, which identifies deleted rows by file and row position. The commit will validate that all of the files still exist in the table.

This isn't strictly needed for this use case because we know that the position deletes only refer to files that are created in this commit. Since the files are being added in the commit, it isn't possible for some other process to delete some of them from metadata. But it is still good to configure the commit properly in case this gets reused later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, @rdblue . I think it's correct to validate the data files in RowDelta#commit. Will provide an extra unit test to address it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test addressed the data files validation issue

Supplier<OutputFile> outputFileSupplier,
PartitionSpec spec) throws IOException {

ManifestFile dataManifest = null;
ManifestFile deleteManifest = null;

// Write the completed data files into a newly created data manifest file.
if (result.dataFiles() != null && result.dataFiles().length > 0) {
dataManifest = writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
}

// Write the completed delete files into a newly created delete manifest file.
if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
OutputFile deleteManifestFile = outputFileSupplier.get();

ManifestWriter<DeleteFile> deleteManifestWriter = ManifestFiles.writeDeleteManifest(FORMAT_V2, spec,
deleteManifestFile, DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
for (DeleteFile deleteFile : result.deleteFiles()) {
writer.add(deleteFile);
}
}

deleteManifest = deleteManifestWriter.toManifestFile();
}

return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
}

static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException {
WriteResult.Builder builder = WriteResult.builder();

// Read the completed data files from persisted data manifest file.
if (deltaManifests.dataManifest() != null) {
builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io));
}

// Read the completed delete files from persisted delete manifests file.
if (deltaManifests.deleteManifest() != null) {
try (CloseableIterable<DeleteFile> deleteFiles = ManifestFiles
.readDeleteManifest(deltaManifests.deleteManifest(), io, null)) {
builder.addDeleteFiles(deleteFiles);
}
}

return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
Expand Down Expand Up @@ -189,7 +190,7 @@ public DataStreamSink<RowData> build() {
this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;

DataStream<Void> returnStream = rowDataInput
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter)
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
.setParallelism(writeParallelism)
.transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
.setParallelism(1)
Expand Down
Loading