Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
*/
package io.prestosql.plugin.hive;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Streams;
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -32,7 +35,6 @@
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -58,11 +60,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -86,13 +91,21 @@
import static io.prestosql.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static io.prestosql.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Integer.parseInt;
import static java.lang.Math.max;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER;

public class BackgroundHiveSplitLoader
implements HiveSplitLoader
{
private static final Iterable<Pattern> BUCKET_PATTERNS = ImmutableList.of(
// Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`
Pattern.compile("(0\\d+)_\\d+.*"),
// legacy Presto naming pattern (current version matches Hive)
Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?"));

private static final ListenableFuture<?> COMPLETED_FUTURE = immediateFuture(null);

private final Table table;
Expand Down Expand Up @@ -413,10 +426,11 @@ private List<InternalHiveSplit> getBucketedSplits(Path path, FileSystem fileSyst
{
int readBucketCount = bucketSplitInfo.getReadBucketCount();
int tableBucketCount = bucketSplitInfo.getTableBucketCount();
int partitionBucketCount = bucketConversion.isPresent() ? bucketConversion.get().getPartitionBucketCount() : tableBucketCount;
int partitionBucketCount = bucketConversion.map(BucketConversion::getPartitionBucketCount).orElse(tableBucketCount);
int bucketCount = max(readBucketCount, partitionBucketCount);

// list all files in the partition
ArrayList<LocatedFileStatus> files = new ArrayList<>(partitionBucketCount);
List<LocatedFileStatus> files = new ArrayList<>(partitionBucketCount);
try {
Iterators.addAll(files, new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, FAIL));
}
Expand All @@ -425,35 +439,54 @@ private List<InternalHiveSplit> getBucketedSplits(Path path, FileSystem fileSyst
throw new PrestoException(
HIVE_INVALID_BUCKET_FILES,
format("Hive table '%s' is corrupt. Found sub-directory in bucket directory for partition: %s",
new SchemaTableName(table.getDatabaseName(), table.getTableName()),
table.getSchemaTableName(),
splitFactory.getPartitionName()));
}

// verify we found one file per bucket
if (files.size() != partitionBucketCount) {
throw new PrestoException(
HIVE_INVALID_BUCKET_FILES,
format("Hive table '%s' is corrupt. The number of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s",
new SchemaTableName(table.getDatabaseName(), table.getTableName()),
files.size(),
partitionBucketCount,
splitFactory.getPartitionName()));
}
// build mapping of file name to bucket
ListMultimap<Integer, LocatedFileStatus> bucketFiles = ArrayListMultimap.create();
for (LocatedFileStatus file : files) {
String fileName = file.getPath().getName();
OptionalInt bucket = getBucketNumber(fileName);
if (bucket.isPresent()) {
bucketFiles.put(bucket.getAsInt(), file);
continue;
}

// legacy mode requires exactly one file per bucket
if (files.size() != partitionBucketCount) {
throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format(
"Hive table '%s' is corrupt. File '%s' does not match the standard naming pattern, and the number " +
"of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s",
table.getSchemaTableName(),
fileName,
files.size(),
partitionBucketCount,
splitFactory.getPartitionName()));
}

// Sort FileStatus objects (instead of, e.g., fileStatus.getPath().toString). This matches org.apache.hadoop.hive.ql.metadata.Table.getSortedPaths
files.sort(null);
// sort FileStatus objects per `org.apache.hadoop.hive.ql.metadata.Table#getSortedPaths()`
files.sort(null);

// use position in sorted list as the bucket number
bucketFiles.clear();
for (int i = 0; i < files.size(); i++) {
bucketFiles.put(i, files.get(i));
}
break;
}

// convert files internal splits
List<InternalHiveSplit> splitList = new ArrayList<>();
for (int bucketNumber = 0; bucketNumber < Math.max(readBucketCount, partitionBucketCount); bucketNumber++) {
for (int bucketNumber = 0; bucketNumber < bucketCount; bucketNumber++) {
// Physical bucket #. This determine file name. It also determines the order of splits in the result.
int partitionBucketNumber = bucketNumber % partitionBucketCount;
// Logical bucket #. Each logical bucket corresponds to a "bucket" from engine's perspective.
int readBucketNumber = bucketNumber % readBucketCount;

boolean containsEligibleTableBucket = false;
boolean containsIneligibleTableBucket = false;
for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += Math.max(readBucketCount, partitionBucketCount)) {
for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += bucketCount) {
// table bucket number: this is used for evaluating "$bucket" filters.
if (bucketSplitInfo.isTableBucketEnabled(tableBucketNumber)) {
containsEligibleTableBucket = true;
Expand All @@ -474,14 +507,27 @@ private List<InternalHiveSplit> getBucketedSplits(Path path, FileSystem fileSyst
"partition bucket count: " + partitionBucketCount + ", effective reading bucket count: " + readBucketCount + ")");
}
if (containsEligibleTableBucket) {
LocatedFileStatus file = files.get(partitionBucketNumber);
splitFactory.createInternalHiveSplit(file, readBucketNumber)
.ifPresent(splitList::add);
for (LocatedFileStatus file : bucketFiles.get(partitionBucketNumber)) {
splitFactory.createInternalHiveSplit(file, readBucketNumber)
.ifPresent(splitList::add);
}
}
}
return splitList;
}

@VisibleForTesting
static OptionalInt getBucketNumber(String name)
{
for (Pattern pattern : BUCKET_PATTERNS) {
Matcher matcher = pattern.matcher(name);
if (matcher.matches()) {
return OptionalInt.of(parseInt(matcher.group(1)));
}
}
return OptionalInt.empty();
}

private static List<Path> getTargetPathsFromSymlink(FileSystem fileSystem, Path symlinkDir)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,13 @@ private static SchemaTableName parseTableName(String tableName)
public RemoteIterator<LocatedFileStatus> list(FileSystem fs, Table table, Path path)
throws IOException
{
SchemaTableName schemaTableName = new SchemaTableName(table.getDatabaseName(), table.getTableName());

List<LocatedFileStatus> files = cache.getIfPresent(path);
if (files != null) {
return simpleRemoteIterator(files);
}
RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path);

if (!tableNames.contains(schemaTableName)) {
if (!tableNames.contains(table.getSchemaTableName())) {
return iterator;
}
return cachingRemoteIterator(iterator, path);
Expand Down
14 changes: 14 additions & 0 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class HiveConfig
private HiveCompressionCodec hiveCompressionCodec = HiveCompressionCodec.GZIP;
private boolean respectTableFormat = true;
private boolean immutablePartitions;
private boolean createEmptyBucketFiles = true;
private int maxPartitionsPerWriter = 100;
private int maxOpenSortFiles = 50;
private int writeValidationThreads = 16;
Expand Down Expand Up @@ -609,6 +610,19 @@ public HiveConfig setImmutablePartitions(boolean immutablePartitions)
return this;
}

public boolean isCreateEmptyBucketFiles()
{
return createEmptyBucketFiles;
}

@Config("hive.create-empty-bucket-files")
@ConfigDescription("Create empty files for buckets that have no data")
public HiveConfig setCreateEmptyBucketFiles(boolean createEmptyBucketFiles)
{
this.createEmptyBucketFiles = createEmptyBucketFiles;
return this;
}

@Min(1)
public int getMaxPartitionsPerWriter()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public HiveInsertTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("inputColumns") List<HiveColumnHandle> inputColumns,
@JsonProperty("filePrefix") String filePrefix,
@JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata,
@JsonProperty("locationHandle") LocationHandle locationHandle,
@JsonProperty("bucketProperty") Optional<HiveBucketProperty> bucketProperty,
Expand All @@ -41,7 +40,6 @@ public HiveInsertTableHandle(
schemaName,
tableName,
inputColumns,
filePrefix,
pageSinkMetadata,
locationHandle,
bucketProperty,
Expand Down
Loading