Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.deletes;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.DeleteFile;
Expand All @@ -29,10 +28,12 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class EqualityDeleteWriter<T> implements Closeable {
public class EqualityDeleteWriter<T> implements FileWriter<T, DeleteWriteResult> {
private final FileAppender<T> appender;
private final FileFormat format;
private final String location;
Expand All @@ -56,14 +57,32 @@ public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String
this.equalityFieldIds = equalityFieldIds;
}

@Override
public void write(T row) throws IOException {
appender.add(row);
}

/**
* Writes equality deletes.
*
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(Iterable)} instead.
*/
@Deprecated
public void deleteAll(Iterable<T> rows) {
appender.addAll(rows);
}

/**
* Writes an equality delete.
*
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(Object)} instead.
*/
@Deprecated
public void delete(T row) {
appender.add(row);
}

@Override
public long length() {
return appender.length();
}
Expand All @@ -89,4 +108,9 @@ public DeleteFile toDeleteFile() {
Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer");
return deleteFile;
}

@Override
public DeleteWriteResult result() {
return new DeleteWriteResult(toDeleteFile());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.deletes;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.DeleteFile;
Expand All @@ -28,19 +27,21 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.CharSequenceSet;

public class PositionDeleteWriter<T> implements Closeable {
public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
private final FileAppender<StructLike> appender;
private final FileFormat format;
private final String location;
private final PartitionSpec spec;
private final StructLike partition;
private final ByteBuffer keyMetadata;
private final PositionDelete<T> delete;
private final CharSequenceSet pathSet;
private final CharSequenceSet referencedDataFiles;
private DeleteFile deleteFile = null;

public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format, String location,
Expand All @@ -52,18 +53,41 @@ public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format
this.partition = partition;
this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null;
this.delete = PositionDelete.create();
this.pathSet = CharSequenceSet.empty();
this.referencedDataFiles = CharSequenceSet.empty();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx @rdblue, it is the rename we discussed here.

}

@Override
public void write(PositionDelete<T> positionDelete) throws IOException {
referencedDataFiles.add(positionDelete.path());
appender.add(positionDelete);
}

/**
* Writes a position delete.
*
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(PositionDelete)} instead.
*/
@Deprecated
public void delete(CharSequence path, long pos) {
delete(path, pos, null);
}

/**
* Writes a position delete and persists the deleted row.
*
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(PositionDelete)} instead.
*/
@Deprecated
public void delete(CharSequence path, long pos, T row) {
pathSet.add(path);
referencedDataFiles.add(path);
appender.add(delete.set(path, pos, row));
}

@Override
public long length() {
return appender.length();
}

@Override
public void close() throws IOException {
if (deleteFile == null) {
Expand All @@ -81,11 +105,16 @@ public void close() throws IOException {
}

public CharSequenceSet referencedDataFiles() {
return pathSet;
return referencedDataFiles;
}

public DeleteFile toDeleteFile() {
Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer");
return deleteFile;
}

@Override
public DeleteWriteResult result() {
return new DeleteWriteResult(toDeleteFile(), referencedDataFiles());
}
}
46 changes: 46 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/DataWriteResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.Collections;
import java.util.List;
import org.apache.iceberg.DataFile;

/**
* A result of writing data files.
* <p>
* Note that objects of this class are NOT meant to be serialized. Task or delta writers will wrap
* these results into their own serializable results that can be sent back to query engines.
*/
public class DataWriteResult {
private final List<DataFile> dataFiles;

public DataWriteResult(DataFile dataFile) {
this.dataFiles = Collections.singletonList(dataFile);
}

public DataWriteResult(List<DataFile> dataFiles) {
this.dataFiles = dataFiles;
}

public List<DataFile> dataFiles() {
return dataFiles;
}
}
20 changes: 18 additions & 2 deletions core/src/main/java/org/apache/iceberg/io/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.DataFile;
Expand All @@ -31,7 +30,7 @@
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class DataWriter<T> implements Closeable {
public class DataWriter<T> implements FileWriter<T, DataWriteResult> {
private final FileAppender<T> appender;
private final FileFormat format;
private final String location;
Expand All @@ -57,10 +56,22 @@ public DataWriter(FileAppender<T> appender, FileFormat format, String location,
this.sortOrder = sortOrder;
}

@Override
public void write(T row) {
appender.add(row);
}

/**
* Writes a data record.
*
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(Object)} instead.
*/
@Deprecated
public void add(T row) {
appender.add(row);
}

@Override
public long length() {
return appender.length();
}
Expand All @@ -86,4 +97,9 @@ public DataFile toDataFile() {
Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer");
return dataFile;
}

@Override
public DataWriteResult result() {
return new DataWriteResult(toDataFile());
}
}
64 changes: 64 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.Collections;
import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.util.CharSequenceSet;

/**
* A result of writing delete files.
* <p>
* Note that objects of this class are NOT meant to be serialized. Task or delta writers will wrap
* these results into their own serializable results that can be sent back to query engines.
*/
public class DeleteWriteResult {
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;

public DeleteWriteResult(DeleteFile deleteFile) {
this.deleteFiles = Collections.singletonList(deleteFile);
this.referencedDataFiles = CharSequenceSet.empty();
}

public DeleteWriteResult(DeleteFile deleteFile, CharSequenceSet referencedDataFiles) {
this.deleteFiles = Collections.singletonList(deleteFile);
this.referencedDataFiles = referencedDataFiles;
}

public DeleteWriteResult(List<DeleteFile> deleteFiles) {
this.deleteFiles = deleteFiles;
this.referencedDataFiles = CharSequenceSet.empty();
}

public DeleteWriteResult(List<DeleteFile> deleteFiles, CharSequenceSet referencedDataFiles) {
this.deleteFiles = deleteFiles;
this.referencedDataFiles = referencedDataFiles;
}

public List<DeleteFile> deleteFiles() {
return deleteFiles;
}

public CharSequenceSet referencedDataFiles() {
return referencedDataFiles;
}
}
74 changes: 74 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/FileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;

/**
* A writer capable of writing files of a single type (i.e. data/delete) to one spec/partition.
* <p>
* As opposed to {@link FileAppender}, this interface should be implemented by classes that not only
* append records to files but actually produce {@link DataFile}s or {@link DeleteFile}s objects
* with Iceberg metadata. Implementations may wrap {@link FileAppender}s with extra information
* such as spec, partition, sort order ID needed to construct {@link DataFile}s or {@link DeleteFile}s.
*
* @param <T> the row type
* @param <R> the result type
*/
public interface FileWriter<T, R> extends Closeable {

/**
* Writes rows to a predefined spec/partition.
*
* @param rows data or delete records
* @throws IOException in case of an error during the write process
*/
default void write(Iterable<T> rows) throws IOException {
for (T row : rows) {
write(row);
}
}

/**
* Writes a row to a predefined spec/partition.
*
* @param row a data or delete record
* @throws IOException in case of an error during the write process
*/
void write(T row) throws IOException;

/**
* Returns the number of bytes that were currently written by this writer.
*
* @return the number of written bytes
*/
long length();

/**
* Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s.
* The result is valid only after the writer is closed.
*
* @return the file writer result
*/
R result();
}
Loading