Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.deletes;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.DeleteFile;
Expand All @@ -29,10 +28,12 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class EqualityDeleteWriter<T> implements Closeable {
public class EqualityDeleteWriter<T> implements FileWriter<T, DeleteWriteResult> {
private final FileAppender<T> appender;
private final FileFormat format;
private final String location;
Expand All @@ -56,10 +57,17 @@ public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String
this.equalityFieldIds = equalityFieldIds;
}

@Override
public void write(T row) throws IOException {
appender.add(row);
}

@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expose those deleteAll and delete methods in the public iceberg-api module, so is there any neccessary to keep those deprecated API for at least a release ? I'm thinking that we could just remove those from this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be up for that. I did not do this in this WIP PR to avoid touching more places. I think this is a low-level API which we can break.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a low-level API, but it could be used by external projects like Hive since this is the easiest way to correctly write Iceberg files. I think we should deprecate them like Anton did here.

public void deleteAll(Iterable<T> rows) {
appender.addAll(rows);
}

@Deprecated
public void delete(T row) {
appender.add(row);
}
Expand Down Expand Up @@ -89,4 +97,9 @@ public DeleteFile toDeleteFile() {
Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer");
return deleteFile;
}

@Override
public DeleteWriteResult result() {
return new DeleteWriteResult(toDeleteFile());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.deletes;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.DeleteFile;
Expand All @@ -28,11 +27,13 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.CharSequenceSet;

public class PositionDeleteWriter<T> implements Closeable {
public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
private final FileAppender<StructLike> appender;
private final FileFormat format;
private final String location;
Expand All @@ -55,15 +56,27 @@ public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format
this.pathSet = CharSequenceSet.empty();
}

@Override
public void write(PositionDelete<T> positionDelete) throws IOException {
pathSet.add(positionDelete.path());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think it's clear to rename pathSet as referencedDataFiles. When I check this variable at the first glance, I was thinking: pathSet ? which path set ? what's used for. I did not get it until I checked the referencedDataFiles() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I'll do that rename when I split this into smaller PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the rename.

appender.add(positionDelete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class makes an assumption similar to the ClusteredFileWriter classes. Should we name it OrderedPositionDeleteWriter instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will definitely be more descriptive. Do you think it will break anyone? This class existed for a while and we reference it in multiple places such as WriterFactory. I think either we keep the original name and deprecate old methods here and in EqualityDeleteWriter or just drop old methods and rename as needed.

}

@Deprecated
public void delete(CharSequence path, long pos) {
delete(path, pos, null);
}

@Deprecated
public void delete(CharSequence path, long pos, T row) {
pathSet.add(path);
appender.add(delete.set(path, pos, row));
}

public long length() {
return appender.length();
}

@Override
public void close() throws IOException {
if (deleteFile == null) {
Expand All @@ -88,4 +101,9 @@ public DeleteFile toDeleteFile() {
Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer");
return deleteFile;
}

@Override
public DeleteWriteResult result() {
return new DeleteWriteResult(toDeleteFile(), referencedDataFiles());
}
}
97 changes: 97 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.IOException;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;

abstract class BaseDeltaWriter<T> implements DeltaWriter<T> {

private final List<DataFile> dataFiles = Lists.newArrayList();
private final List<DeleteFile> deleteFiles = Lists.newArrayList();
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();

private boolean closed = false;

@Override
public Result result() {
Preconditions.checkState(closed, "Cannot obtain result from unclosed task writer");
return new BaseDeltaTaskWriteResult(dataFiles, deleteFiles, referencedDataFiles);
}

@Override
public void close() throws IOException {
if (!closed) {
closeWriters();
this.closed = true;
}
}

protected abstract void closeWriters() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This base class is really strange to me because the actual close is abstract, but then methods to close writers are in this implementation. I think I'd probably see if I could remove this class.


protected void closeDataWriter(PartitionAwareFileWriter<?, DataWriteResult> writer) throws IOException {
writer.close();

DataWriteResult result = writer.result();
dataFiles.addAll(result.dataFiles());
}

protected void closeDeleteWriter(PartitionAwareFileWriter<?, DeleteWriteResult> deleteWriter) throws IOException {
deleteWriter.close();

DeleteWriteResult result = deleteWriter.result();
deleteFiles.addAll(result.deleteFiles());
referencedDataFiles.addAll(result.referencedDataFiles());
}

protected static class BaseDeltaTaskWriteResult implements DeltaWriter.Result {

private final List<DataFile> dataFiles;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;

public BaseDeltaTaskWriteResult(List<DataFile> dataFiles, List<DeleteFile> deleteFiles,
CharSequenceSet referencedDataFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.referencedDataFiles = referencedDataFiles;
}

@Override
public List<DataFile> dataFiles() {
return dataFiles;
}

@Override
public List<DeleteFile> deleteFiles() {
return deleteFiles;
}

@Override
public CharSequenceSet referencedDataFiles() {
return referencedDataFiles;
}
}
}
135 changes: 135 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/CDCWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.StructProjection;

public class CDCWriter<T> extends BaseDeltaWriter<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs some Javadoc to explain the context.


private final FanoutDataWriter<T> dataWriter;
private final PartitionAwareFileWriter<T, DeleteWriteResult> equalityDeleteWriter;
private final PartitionAwareFileWriter<PositionDelete<T>, DeleteWriteResult> positionDeleteWriter;
private final StructProjection keyProjection;
private final Map<StructLike, PartitionAwarePathOffset> insertedRows;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One distinction compared to the existing CDC writer is that we track inserted rows across partitions. For example, if we add a record with ID=1 into partition year=2020 and then add another record with ID=1, we will write a position delete for ID=1 and year=2020.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why the CDC writer is aware of partitions.

private final PositionDelete<T> positionDelete;
private final Function<T, StructLike> toStructLike;

public CDCWriter(FanoutDataWriter<T> dataWriter,
PartitionAwareFileWriter<T, DeleteWriteResult> equalityDeleteWriter,
PartitionAwareFileWriter<PositionDelete<T>, DeleteWriteResult> positionDeleteWriter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this needs to be FanoutSortedPositionDeleteWriter because the position deletes could be in any order.

Schema schema, Schema deleteSchema, Function<T, StructLike> toStructLike) {
this.dataWriter = dataWriter;
this.equalityDeleteWriter = equalityDeleteWriter;
this.positionDeleteWriter = positionDeleteWriter;
this.positionDelete = new PositionDelete<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: PositionDelete.create()

The constructor should probably be made private.

this.keyProjection = StructProjection.create(schema, deleteSchema);
this.insertedRows = StructLikeMap.create(deleteSchema.asStruct());
this.toStructLike = toStructLike;
}

@Override
public void insert(T row, PartitionSpec spec, StructLike partition) throws IOException {
CharSequence currentPath = dataWriter.currentPath(spec, partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we merge the two calls into one method?

FilePathPosition pathPosition = dataWriter.filePathPosition(spec, partition);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is something that we will have to do for all insert rows, I was trying to avoid generating an extra object to reduce the memory consumption.

long currentPosition = dataWriter.currentPosition(spec, partition);
PartitionAwarePathOffset offset = new PartitionAwarePathOffset(spec, partition, currentPath, currentPosition);

StructLike copiedKey = StructCopy.copy(keyProjection.wrap(toStructLike.apply(row)));

PartitionAwarePathOffset previous = insertedRows.put(copiedKey, offset);
if (previous != null) {
// TODO: attach the previous row if has a position delete row schema
positionDelete.set(previous.path(), previous.rowOffset(), null);
positionDeleteWriter.write(positionDelete, spec, partition);
}

dataWriter.write(row, spec, partition);
}

@Override
public void delete(T row, PartitionSpec spec, StructLike partition) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to clearly document that this assumes that the row to delete has the same schema as the rows that will be inserted. We could also have a directDelete method that passes just the equality delete columns (key). That's worth considering if you want to split the interface for equality and position delete use cases.

StructLike key = keyProjection.wrap(toStructLike.apply(row));
PartitionAwarePathOffset previous = insertedRows.remove(key);
if (previous != null) {
// TODO: attach the previous row if has a position delete row schema
positionDelete.set(previous.path(), previous.rowOffset(), null);
positionDeleteWriter.write(positionDelete, previous.spec, previous.partition);
}

equalityDeleteWriter.write(row, spec, partition);
}

@Override
public void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException {
throw new IllegalArgumentException(this.getClass().getName() + " does not implement explicit position delete");
}

@Override
protected void closeWriters() throws IOException {
if (dataWriter != null) {
closeDataWriter(dataWriter);
}

if (equalityDeleteWriter != null) {
closeDeleteWriter(equalityDeleteWriter);
}

if (positionDeleteWriter != null) {
closeDeleteWriter(positionDeleteWriter);
}
}

private static class PartitionAwarePathOffset {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this extend PositionDelete so it can be passed directly into the delete writer?

private final PartitionSpec spec;
private final StructLike partition;
private final CharSequence path;
private final long rowOffset;

private PartitionAwarePathOffset(PartitionSpec spec, StructLike partition, CharSequence path, long rowOffset) {
this.spec = spec;
this.partition = partition;
this.path = path;
this.rowOffset = rowOffset;
}

public PartitionSpec spec() {
return spec;
}

public StructLike partition() {
return partition;
}

public CharSequence path() {
return path;
}

public long rowOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called position nearly everywhere else. Why call it rowOffset here?

return rowOffset;
}
}
}
Loading