Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions presto-docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ Property Name Description

``hive.immutable-partitions`` Can new data be inserted into existing partitions? ``false``

``hive.create-empty-bucket-files`` Should empty files be created for buckets that have no data? ``false``

``hive.max-partitions-per-writers`` Maximum number of partitions per writer. 100

``hive.max-partitions-per-scan`` Maximum number of partitions for a single table scan. 100,000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package com.facebook.presto.hive.metastore;

import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -77,6 +79,12 @@ public String getTableName()
return tableName;
}

@JsonIgnore
public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(databaseName, tableName);
}

@JsonProperty
public List<String> getValues()
{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package com.facebook.presto.hive.metastore;

import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -83,6 +85,12 @@ public String getTableName()
return tableName;
}

@JsonIgnore
public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(databaseName, tableName);
}

@JsonProperty
public String getOwner()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class HiveClientConfig
private HiveCompressionCodec orcCompressionCodec = HiveCompressionCodec.GZIP;
private boolean respectTableFormat = true;
private boolean immutablePartitions;
private boolean createEmptyBucketFiles;
private boolean insertOverwriteImmutablePartitions;
private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true;
private int maxPartitionsPerWriter = 100;
Expand Down Expand Up @@ -584,6 +585,19 @@ public HiveClientConfig setImmutablePartitions(boolean immutablePartitions)
return this;
}

public boolean isCreateEmptyBucketFiles()
{
return createEmptyBucketFiles;
}

@Config("hive.create-empty-bucket-files")
@ConfigDescription("Create empty files for buckets that have no data")
public HiveClientConfig setCreateEmptyBucketFiles(boolean createEmptyBucketFiles)
{
this.createEmptyBucketFiles = createEmptyBucketFiles;
return this;
}

public boolean isFailFastOnInsertIntoImmutablePartitionsEnabled()
{
return failFastOnInsertIntoImmutablePartitionsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public HiveInsertTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("inputColumns") List<HiveColumnHandle> inputColumns,
@JsonProperty("filePrefix") String filePrefix,
@JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata,
@JsonProperty("locationHandle") LocationHandle locationHandle,
@JsonProperty("bucketProperty") Optional<HiveBucketProperty> bucketProperty,
Expand All @@ -46,7 +45,6 @@ public HiveInsertTableHandle(
schemaName,
tableName,
inputColumns,
filePrefix,
pageSinkMetadata,
locationHandle,
bucketProperty,
Expand Down
176 changes: 73 additions & 103 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public HiveOutputTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("inputColumns") List<HiveColumnHandle> inputColumns,
@JsonProperty("filePrefix") String filePrefix,
@JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata,
@JsonProperty("locationHandle") LocationHandle locationHandle,
@JsonProperty("tableStorageFormat") HiveStorageFormat tableStorageFormat,
Expand All @@ -58,7 +57,6 @@ public HiveOutputTableHandle(
schemaName,
tableName,
inputColumns,
filePrefix,
pageSinkMetadata,
locationHandle,
bucketProperty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
sortedBy,
handle.getLocationHandle(),
locationService,
handle.getFilePrefix(),
session.getQueryId(),
// The scope of metastore cache is within a single HivePageSink object
// TODO: Extend metastore cache scope to the entire transaction
new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(), memoizeMetastore(metastore, metastoreImpersonationEnabled, perTransactionMetastoreCacheMaximumSize), new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class HiveSessionProperties
private static final String COMPRESSION_CODEC = "compression_codec";
private static final String ORC_COMPRESSION_CODEC = "orc_compression_codec";
public static final String RESPECT_TABLE_FORMAT = "respect_table_format";
private static final String CREATE_EMPTY_BUCKET_FILES = "create_empty_bucket_files";
private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names";
private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
Expand Down Expand Up @@ -301,6 +302,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Write new partitions using table format rather than default storage format",
hiveClientConfig.isRespectTableFormat(),
false),
booleanProperty(
CREATE_EMPTY_BUCKET_FILES,
"Create empty files for buckets that have no data",
hiveClientConfig.isCreateEmptyBucketFiles(),
false),
booleanProperty(
PARQUET_USE_COLUMN_NAME,
"Experimental: Parquet: Access Parquet columns using names from the file",
Expand Down Expand Up @@ -712,6 +718,11 @@ public static boolean isRespectTableFormat(ConnectorSession session)
return session.getProperty(RESPECT_TABLE_FORMAT, Boolean.class);
}

public static boolean isCreateEmptyBucketFiles(ConnectorSession session)
{
return session.getProperty(CREATE_EMPTY_BUCKET_FILES, Boolean.class);
}

public static boolean isUseParquetColumnNames(ConnectorSession session)
{
return session.getProperty(PARQUET_USE_COLUMN_NAME, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@
import com.facebook.presto.hive.util.HudiRealtimeSplitConverter;
import com.facebook.presto.orc.metadata.OrcType;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.SchemaTableName;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -863,11 +861,6 @@ public static Slice charPartitionKey(String value, String name, Type columnType)
return partitionKey;
}

public static SchemaTableName schemaTableName(ConnectorTableHandle tableHandle)
{
return ((HiveTableHandle) tableHandle).getSchemaTableName();
}

public static List<HiveColumnHandle> hiveColumnHandles(Table table)
{
ImmutableList.Builder<HiveColumnHandle> columns = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.facebook.presto.hive.metastore.HivePageSinkMetadata;
import com.facebook.presto.hive.metastore.SortingColumn;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

Expand All @@ -30,9 +32,7 @@ public class HiveWritableTableHandle
private final String schemaName;
private final String tableName;
private final List<HiveColumnHandle> inputColumns;
private final String filePrefix;

private HivePageSinkMetadata pageSinkMetadata;
private final HivePageSinkMetadata pageSinkMetadata;
private final LocationHandle locationHandle;
private final Optional<HiveBucketProperty> bucketProperty;
private final List<SortingColumn> preferredOrderingColumns;
Expand All @@ -46,7 +46,6 @@ public HiveWritableTableHandle(
String schemaName,
String tableName,
List<HiveColumnHandle> inputColumns,
String filePrefix,
HivePageSinkMetadata pageSinkMetadata,
LocationHandle locationHandle,
Optional<HiveBucketProperty> bucketProperty,
Expand All @@ -60,7 +59,6 @@ public HiveWritableTableHandle(
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null"));
this.filePrefix = requireNonNull(filePrefix, "filePrefix is null");
this.pageSinkMetadata = requireNonNull(pageSinkMetadata, "pageSinkMetadata is null");
this.locationHandle = requireNonNull(locationHandle, "locationHandle is null");
this.bucketProperty = requireNonNull(bucketProperty, "bucketProperty is null");
Expand Down Expand Up @@ -88,16 +86,16 @@ public String getTableName()
return tableName;
}

@JsonProperty
public List<HiveColumnHandle> getInputColumns()
@JsonIgnore
public SchemaTableName getSchemaTableName()
{
return inputColumns;
return new SchemaTableName(schemaName, tableName);
}

@JsonProperty
public String getFilePrefix()
public List<HiveColumnHandle> getInputColumns()
{
return filePrefix;
return inputColumns;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public static void checkTableIsWritable(Table table, boolean writesToNonManagedT
}

checkWritable(
new SchemaTableName(table.getDatabaseName(), table.getTableName()),
table.getSchemaTableName(),
Optional.empty(),
getProtectMode(table),
table.getParameters(),
Expand All @@ -318,7 +318,7 @@ public static void checkTableIsWritable(Table table, boolean writesToNonManagedT
public static void checkPartitionIsWritable(String partitionName, Partition partition)
{
checkWritable(
new SchemaTableName(partition.getDatabaseName(), partition.getTableName()),
partition.getSchemaTableName(),
Optional.of(partitionName),
getProtectMode(partition),
partition.getParameters(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@

import static com.facebook.presto.hive.HiveCompressionCodec.NONE;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_READ_ONLY;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
Expand Down Expand Up @@ -106,7 +105,11 @@ public class HiveWriterFactory
{
private static final int MAX_BUCKET_COUNT = 100_000;
private static final int BUCKET_NUMBER_PADDING = Integer.toString(MAX_BUCKET_COUNT - 1).length();
private static final Pattern BUCKET_FILE_NAME_PATTERN = Pattern.compile(".*_bucket-(\\d+)(\\..*)?");
private static final Iterable<Pattern> BUCKET_PATTERNS = ImmutableList.of(
// Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`
Pattern.compile("(0\\d+)_\\d+.*"),
// legacy Presto naming pattern (current version matches Hive)
Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?"));

private final Set<HiveFileWriterFactory> fileWriterFactories;
private final String schemaName;
Expand All @@ -124,7 +127,7 @@ public class HiveWriterFactory
private final Map<String, String> additionalTableParameters;
private final LocationHandle locationHandle;
private final LocationService locationService;
private final String filePrefix;
private final String queryId;

private final HivePageSinkMetadataProvider pageSinkMetadataProvider;
private final TypeManager typeManager;
Expand Down Expand Up @@ -163,7 +166,7 @@ public HiveWriterFactory(
List<SortingColumn> sortedBy,
LocationHandle locationHandle,
LocationService locationService,
String filePrefix,
String queryId,
HivePageSinkMetadataProvider pageSinkMetadataProvider,
TypeManager typeManager,
HdfsEnvironment hdfsEnvironment,
Expand Down Expand Up @@ -191,7 +194,7 @@ public HiveWriterFactory(
this.additionalTableParameters = ImmutableMap.copyOf(requireNonNull(additionalTableParameters, "additionalTableParameters is null"));
this.locationHandle = requireNonNull(locationHandle, "locationHandle is null");
this.locationService = requireNonNull(locationService, "locationService is null");
this.filePrefix = requireNonNull(filePrefix, "filePrefix is null");
this.queryId = requireNonNull(queryId, "queryId is null");

this.pageSinkMetadataProvider = requireNonNull(pageSinkMetadataProvider, "pageSinkMetadataProvider is null");

Expand Down Expand Up @@ -347,15 +350,15 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
String targetFileName;
if (bucketNumber.isPresent()) {
// Use the bucket number for file name when fileRenaming is enabled
targetFileName = isFileRenamingEnabled(session) ? String.valueOf(bucketNumber.getAsInt()) : computeBucketedFileName(filePrefix, bucketNumber.getAsInt()) + extension;
targetFileName = isFileRenamingEnabled(session) ? String.valueOf(bucketNumber.getAsInt()) : computeBucketedFileName(queryId, bucketNumber.getAsInt()) + extension;
}
else {
targetFileName = filePrefix + "_" + randomUUID() + extension;
targetFileName = queryId + "_" + randomUUID() + extension;
}

String writeFileName;
if (writeToTempFile) {
writeFileName = ".tmp.presto." + filePrefix + "_" + randomUUID() + extension;
writeFileName = ".tmp.presto." + queryId + "_" + randomUUID() + extension;
}
else {
writeFileName = targetFileName;
Expand Down Expand Up @@ -657,24 +660,26 @@ public LocationHandle getLocationHandle()
return locationHandle;
}

public static String computeBucketedFileName(String filePrefix, int bucket)
public static String computeBucketedFileName(String queryId, int bucket)
{
return filePrefix + "_bucket-" + Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0');
String paddedBucket = Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0');
return format("0%s_0_%s", paddedBucket, queryId);
}

public static int getBucketNumber(String fileName)
public static OptionalInt getBucketNumber(String fileName)
{
Matcher matcher = BUCKET_FILE_NAME_PATTERN.matcher(fileName);
if (matcher.matches()) {
return parseInt(matcher.group(1));
for (Pattern pattern : BUCKET_PATTERNS) {
Matcher matcher = pattern.matcher(fileName);
if (matcher.matches()) {
return OptionalInt.of(parseInt(matcher.group(1)));
}
}
// Numerical file name when "file_renaming_enabled" is true
else if (fileName.matches("\\d+")) {
return parseInt(fileName);
}
else {
throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format("invalid hive bucket file name: %s", fileName));
if (fileName.matches("\\d+")) {
return OptionalInt.of(parseInt(fileName));
}

return OptionalInt.empty();
}

public static String getFileExtension(StorageFormat storageFormat, HiveCompressionCodec compressionCodec)
Expand Down
Loading