Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ subprojects {
options.encoding = 'UTF-8'
}

ext {
jmhVersion = '1.21'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow, this did not seem to have any effect. I had to move it to the jmh block.

}

sourceCompatibility = '1.8'
targetCompatibility = '1.8'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import org.apache.iceberg.StructLike;

public class PositionDelete<R> implements StructLike {
static <T> PositionDelete<T> create() {
public static <T> PositionDelete<T> create() {
return new PositionDelete<>();
}

private CharSequence path;
private long pos;
private R row;

private PositionDelete() {
}

public PositionDelete<R> set(CharSequence newPath, long newPos, R newRow) {
this.path = newPath;
this.pos = newPos;
Expand Down
73 changes: 73 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* A data writer capable of writing to multiple specs and partitions that requires the incoming records
* to be properly clustered by partition spec and by partition within each spec.
*/
public class ClusteredDataWriter<T> extends ClusteredWriter<T, DataWriteResult> {

private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final List<DataFile> dataFiles;

public ClusteredDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.dataFiles = Lists.newArrayList();
}

@Override
protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
if (fileFormat == FileFormat.ORC) {
EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
return writerFactory.newDataWriter(outputFile, spec, partition);
} else {
return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
}

@Override
protected void addResult(DataWriteResult result) {
dataFiles.addAll(result.dataFiles());
}

@Override
protected DataWriteResult aggregatedResult() {
return new DataWriteResult(dataFiles);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* An equality delete writer capable of writing to multiple specs and partitions that requires
* the incoming delete records to be properly clustered by partition spec and by partition within each spec.
*/
public class ClusteredEqualityDeleteWriter<T> extends ClusteredWriter<T, DeleteWriteResult> {

private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final List<DeleteFile> deleteFiles;

public ClusteredEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.deleteFiles = Lists.newArrayList();
}

@Override
protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed a separate issue for this: #3169

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

if (fileFormat == FileFormat.ORC) {
EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition);
} else {
return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
}

@Override
protected void addResult(DeleteWriteResult result) {
Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files");
deleteFiles.addAll(result.deleteFiles());
}

@Override
protected DeleteWriteResult aggregatedResult() {
return new DeleteWriteResult(deleteFiles);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;

/**
* A position delete writer capable of writing to multiple specs and partitions that requires
* the incoming delete records to be properly clustered by partition spec and by partition within each spec.
*/
public class ClusteredPositionDeleteWriter<T> extends ClusteredWriter<PositionDelete<T>, DeleteWriteResult> {

private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;

public ClusteredPositionDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
}

@Override
protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
if (fileFormat == FileFormat.ORC) {
EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
return writerFactory.newPositionDeleteWriter(outputFile, spec, partition);
} else {
return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
}

@Override
protected void addResult(DeleteWriteResult result) {
deleteFiles.addAll(result.deleteFiles());
referencedDataFiles.addAll(result.referencedDataFiles());
}

@Override
protected DeleteWriteResult aggregatedResult() {
return new DeleteWriteResult(deleteFiles, referencedDataFiles);
}
}
134 changes: 134 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.IOException;
import java.util.Comparator;
import java.util.Set;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.StructLikeSet;

/**
* A writer capable of writing to multiple specs and partitions that requires the incoming records
* to be clustered by partition spec and by partition within each spec.
* <p>
* As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
* the memory consumption. Prefer using this writer whenever the incoming records can be clustered
* by spec/partition.
*/
abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {

private static final String NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE =
"Incoming records violate the writer assumption that records are clustered by spec and " +
"by partition within each spec. Either cluster the incoming records or switch to fanout writers.\n" +
"Encountered records that belong to already closed files:\n";

private final Set<Integer> completedSpecIds = Sets.newHashSet();

private PartitionSpec currentSpec = null;
private Comparator<StructLike> partitionComparator = null;
private Set<StructLike> completedPartitions = null;
private StructLike currentPartition = null;
private FileWriter<T, R> currentWriter = null;

private boolean closed = false;

protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);

protected abstract void addResult(R result);

protected abstract R aggregatedResult();

@Override
public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
if (!spec.equals(currentSpec)) {
if (currentSpec != null) {
closeCurrentWriter();
completedSpecIds.add(currentSpec.specId());
completedPartitions.clear();
}

if (completedSpecIds.contains(spec.specId())) {
String errorCtx = String.format("spec %s", spec);
throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx);
}

StructType partitionType = spec.partitionType();

this.currentSpec = spec;
this.partitionComparator = Comparators.forType(partitionType);
this.completedPartitions = StructLikeSet.create(partitionType);
// copy the partition key as the key object may be reused
this.currentPartition = StructCopy.copy(partition);
this.currentWriter = newWriter(currentSpec, currentPartition);

} else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will throw NullPointerException if the partition is null because the partitionComparator cannot compare null values , right ? I remember we will use null value for partition for unifying the partitioned writer and unpartitioned writer code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right the comparator will throw an NPE but I think partition != currentPartition prevents us from calling the comparator whenever at least one value is null. Partition can be null only for unpartitioned specs. As long as we are writing unpartitioned records, partition != currentPartition will be false.

Whenever partition != currentPartition and at least one of them is null, it means we are changing the spec. If so, it will be handled by the if block above and we won't call the comparator at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. If currentPartition is null (as it is initialized) and a non-null partition is passed in, then the first check is true and the second check runs, which will pass both to the comparator. If we don't think that the comparator can handle null then we should update this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, in the use case you mention, this if branch won't be invoked as the one above it will work. This if branch is only tested when we wrote at least a record and the new record belongs to the same spec as the previous record. That means if one partition is null, the second must be too, so partition != currentPartition is false and the comparator is not used.

This is something that will be invoked for every row so I would like to avoid any extra checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, got it. That sounds fine.

closeCurrentWriter();
completedPartitions.add(currentPartition);

if (completedPartitions.contains(partition)) {
String errorCtx = String.format("partition '%s' in spec %s", spec.partitionToPath(partition), spec);
throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx);
}

// copy the partition key as the key object may be reused
this.currentPartition = StructCopy.copy(partition);
this.currentWriter = newWriter(currentSpec, currentPartition);
}

currentWriter.write(row);
}

@Override
public void close() throws IOException {
if (!closed) {
closeCurrentWriter();
this.closed = true;
}
}

private void closeCurrentWriter() throws IOException {
if (currentWriter != null) {
currentWriter.close();

addResult(currentWriter.result());

this.currentWriter = null;
}
}

@Override
public final R result() {
Preconditions.checkState(closed, "Cannot get result from unclosed writer");
return aggregatedResult();
}

protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
"Partition must not be null when creating output file for partitioned spec");
return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);
}
}
Loading