Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/org/apache/iceberg/SortOrder.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public SortOrder build() {
}
}

static void checkCompatibility(SortOrder sortOrder, Schema schema) {
public static void checkCompatibility(SortOrder sortOrder, Schema schema) {
for (SortField field : sortOrder.fields) {
Type sourceType = schema.findType(field.sourceId());
ValidationException.check(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.actions;

import java.util.Map;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;

Expand Down Expand Up @@ -68,7 +69,7 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri
* independently and asynchronously.
**/
String MAX_CONCURRENT_FILE_GROUP_REWRITES = "max-concurrent-file-group-rewrites";
int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
int MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT = 1;

/**
* The output file size that this rewrite strategy will attempt to generate when rewriting files. By default this
Expand All @@ -90,6 +91,23 @@ default RewriteDataFiles binPack() {
return this;
}

/**
* Choose SORT as a strategy for this rewrite operation using the table's sortOrder
* @return this for method chaining
*/
default RewriteDataFiles sort() {
throw new UnsupportedOperationException("SORT Rewrite Strategy not implemented for this framework");
}

/**
* Choose SORT as a strategy for this rewrite operations and manually specify the sortOrder to use
* @param sortOrder user definied sortOrder
* @return this for method chaining
*/
default RewriteDataFiles sort(SortOrder sortOrder) {
throw new UnsupportedOperationException("SORT Rewrite Strategy not implemented for this framework");
}

/**
* A user provided filter for determining which files will be considered by the rewrite strategy. This will be used
* in addition to whatever rules the rewrite strategy generates. For example this would be used for providing a
Expand All @@ -107,6 +125,14 @@ default RewriteDataFiles binPack() {
*/
interface Result {
Map<FileGroupInfo, FileGroupRewriteResult> resultMap();

default int addedDataFilesCount() {
return resultMap().values().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
}

default int rewrittenDataFilesCount() {
return resultMap().values().stream().mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount).sum();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;

interface RewriteStrategy extends Serializable {
public interface RewriteStrategy extends Serializable {
/**
* Returns the name of this rewrite strategy
*/
Expand Down Expand Up @@ -71,8 +71,9 @@ interface RewriteStrategy extends Serializable {
* Method which will rewrite files based on this particular RewriteStrategy's algorithm.
* This will most likely be Action framework specific (Spark/Presto/Flink ....).
*
* @param groupID an identifier for this set of files
* @param filesToRewrite a group of files to be rewritten together
* @return a list of newly written files
*/
List<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite);
Set<DataFile> rewriteFiles(String groupID, List<FileScanTask> filesToRewrite);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* more files than {@link MIN_INPUT_FILES} or would produce at least one file of
* {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
*/
abstract class BinPackStrategy implements RewriteStrategy {
public abstract class BinPackStrategy implements RewriteStrategy {

/**
* The minimum number of files that need to be in a file group for it to be considered for
Expand Down Expand Up @@ -78,6 +78,7 @@ abstract class BinPackStrategy implements RewriteStrategy {
private long maxFileSize;
private long targetFileSize;
private long maxGroupSize;
private long specId;

@Override
public String name() {
Expand Down Expand Up @@ -118,6 +119,10 @@ public RewriteStrategy options(Map<String, String> options) {
MIN_INPUT_FILES,
MIN_INPUT_FILES_DEFAULT);

specId = PropertyUtil.propertyAsInt(options,
RewriteDataFiles.OUTPUT_PARTITION_SPEC_ID,
table().spec().specId());

validateOptions();
return this;
}
Expand Down Expand Up @@ -162,4 +167,12 @@ private void validateOptions() {
"Cannot set %s is less than 1. All values less than 1 have the same effect as 1. %d < 1",
MIN_INPUT_FILES, minInputFiles);
}

protected long targetFileSize() {
return this.targetFileSize;
}

protected long specId() {
return this.specId;
}
}
141 changes: 141 additions & 0 deletions core/src/main/java/org/apache/iceberg/actions/SortStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A rewrite strategy for data files which aims to reorder data with data files to optimally lay them out
* in relation to a column. For example, if the Sort strategy is used on a set of files which is ordered
* by column x and original has files File A (x: 0 - 50), File B ( x: 10 - 40) and File C ( x: 30 - 60),
* this Strategy will attempt to rewrite those files into File A' (x: 0-20), File B' (x: 21 - 40),
* File C' (x: 41 - 60).
* <p>
* Currently the there is no clustering detection and we will rewrite all files if {@link SortStrategy#REWRITE_ALL}
* is true (default). If this property is disabled any files with the incorrect sort-order as well as any files
* that would be chosen by {@link BinPackStrategy} will be rewrite candidates.
* <p>
* In the future other algorithms for determining files to rewrite will be provided.
*/
public abstract class SortStrategy extends BinPackStrategy {
private static final Logger LOG = LoggerFactory.getLogger(SortStrategy.class);

/**
* Rewrites all files, regardless of their size. Defaults to false, rewriting only wrong sort-order and mis-sized
* files;
*/
public static final String REWRITE_ALL = "no-size-filter";
public static final boolean REWRITE_ALL_DEFAULT = false;


private static final Set<String> validOptions = ImmutableSet.of(
REWRITE_ALL
);

private boolean rewriteAll;
private SortOrder sortOrder;
private int sortOrderId = -1;

/**
* Sets the sort order to be used in this strategy when rewriting files
* @param order the order to use
* @return this for method chaining
*/
public SortStrategy sortOrder(SortOrder order) {
this.sortOrder = order;

// See if this order matches any of our known orders
Optional<Entry<Integer, SortOrder>> knownOrder = table().sortOrders().entrySet().stream()
.filter(entry -> entry.getValue().sameOrder(order))
.findFirst();
knownOrder.ifPresent(entry -> sortOrderId = entry.getKey());

return this;
}

protected SortOrder sortOrder() {
return sortOrder;
}

@Override
public String name() {
return "SORT";
}

@Override
public Set<String> validOptions() {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.addAll(validOptions)
.build();
}

@Override
public RewriteStrategy options(Map<String, String> options) {
super.options(options); // Also checks validity of BinPack options

rewriteAll = PropertyUtil.propertyAsBoolean(options,
REWRITE_ALL,
REWRITE_ALL_DEFAULT);

if (sortOrder == null) {
sortOrder = table().sortOrder();
sortOrderId = sortOrder.orderId();
}

validateOptions();
return this;
}

@Override
public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> dataFiles) {
if (rewriteAll) {
LOG.info("Sort Strategy for table {} set to rewrite all data files", table().name());
return dataFiles;
} else {
FluentIterable filesWithCorrectOrder =
FluentIterable.from(dataFiles).filter(file -> file.file().sortOrderId() == sortOrderId);

FluentIterable filesWithIncorrectOrder =
FluentIterable.from(dataFiles).filter(file -> file.file().sortOrderId() != sortOrderId);
return filesWithIncorrectOrder.append(super.selectFilesToRewrite(filesWithCorrectOrder));
}
}

private void validateOptions() {
Preconditions.checkArgument(!sortOrder.isUnsorted(),
"Can't use %s when there is no sort order, either define table %s's sort order or set sort" +
"order in the action",
name(), table().name());

SortOrder.checkCompatibility(sortOrder, table().schema());
}
}
14 changes: 14 additions & 0 deletions core/src/test/java/org/apache/iceberg/MockFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg;

import org.mockito.Mockito;

public class MockFileScanTask extends BaseFileScanTask {

private final long length;
Expand All @@ -28,6 +30,18 @@ public MockFileScanTask(long length) {
this.length = length;
}

public MockFileScanTask(DataFile file) {
super(file, null, null, null, null);
this.length = file.fileSizeInBytes();
}

public static MockFileScanTask mockTask(long length, int sortOrderId) {
DataFile mockFile = Mockito.mock(DataFile.class);
Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length);
Mockito.when(mockFile.sortOrderId()).thenReturn(sortOrderId);
return new MockFileScanTask(mockFile);
}

@Override
public long length() {
return length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -59,7 +60,7 @@ public Table table() {
}

@Override
public List<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
public Set<DataFile> rewriteFiles(String groupID, List<FileScanTask> filesToRewrite) {
throw new UnsupportedOperationException();
}
}
Expand Down
Loading