Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.hive;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.smile.SmileCodec;
import com.facebook.presto.hive.LocationService.WriteInfo;
import com.facebook.presto.hive.PartitionUpdate.UpdateMode;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
Expand All @@ -24,7 +25,6 @@
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.procedure.Procedure.Argument;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.hadoop.hive.common.FileUtils;

Expand All @@ -39,6 +39,8 @@

import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedPartitionUpdateSerializationEnabled;
import static com.facebook.presto.hive.HiveUtil.serializeZstdCompressed;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -60,14 +62,21 @@ public class CreateEmptyPartitionProcedure
private final ExtendedHiveMetastore metastore;
private final LocationService locationService;
private final JsonCodec<PartitionUpdate> partitionUpdateJsonCodec;
private final SmileCodec<PartitionUpdate> partitionUpdateSmileCodec;

@Inject
public CreateEmptyPartitionProcedure(Supplier<TransactionalMetadata> hiveMetadataFactory, ExtendedHiveMetastore metastore, LocationService locationService, JsonCodec<PartitionUpdate> partitionUpdateCodec)
public CreateEmptyPartitionProcedure(
Supplier<TransactionalMetadata> hiveMetadataFactory,
ExtendedHiveMetastore metastore,
LocationService locationService,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec)
{
this.hiveMetadataFactory = requireNonNull(hiveMetadataFactory, "hiveMetadataFactory is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.locationService = requireNonNull(locationService, "locationService is null");
this.partitionUpdateJsonCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null");
}

@Override
Expand Down Expand Up @@ -115,23 +124,28 @@ private void doCreateEmptyPartition(ConnectorSession session, String schema, Str
String partitionName = FileUtils.makePartName(actualPartitionColumnNames, partitionStringValues);

WriteInfo writeInfo = locationService.getPartitionWriteInfo(hiveInsertTableHandle.getLocationHandle(), Optional.empty(), partitionName);
Slice serializedPartitionUpdate = Slices.wrappedBuffer(
partitionUpdateJsonCodec.toJsonBytes(
new PartitionUpdate(
partitionName,
UpdateMode.NEW,
writeInfo.getWritePath(),
writeInfo.getTargetPath(),
ImmutableList.of(),
0,
0,
0,
writeInfo.getWritePath().getName().matches("\\d+"))));
PartitionUpdate partitionUpdate = new PartitionUpdate(
partitionName,
UpdateMode.NEW,
writeInfo.getWritePath(),
writeInfo.getTargetPath(),
ImmutableList.of(),
0,
0,
0,
writeInfo.getWritePath().getName().matches("\\d+"));
byte[] serializedPartitionUpdate;
if (isOptimizedPartitionUpdateSerializationEnabled(session)) {
serializedPartitionUpdate = serializeZstdCompressed(partitionUpdateSmileCodec, partitionUpdate);
}
else {
serializedPartitionUpdate = partitionUpdateJsonCodec.toJsonBytes(partitionUpdate);
}

hiveMetadata.finishInsert(
session,
hiveInsertTableHandle,
ImmutableList.of(serializedPartitionUpdate),
ImmutableList.of(Slices.wrappedBuffer(serializedPartitionUpdate)),
ImmutableList.of());
hiveMetadata.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public class HiveClientConfig
private boolean manifestVerificationEnabled;
private boolean undoMetastoreOperationsEnabled = true;

private boolean optimizedPartitionUpdateSerializationEnabled;

public int getMaxInitialSplits()
{
return maxInitialSplits;
Expand Down Expand Up @@ -1594,4 +1596,17 @@ public boolean isUndoMetastoreOperationsEnabled()
{
return undoMetastoreOperationsEnabled;
}

public boolean isOptimizedPartitionUpdateSerializationEnabled()
{
return optimizedPartitionUpdateSerializationEnabled;
}

@Config("hive.experimental-optimized-partition-update-serialization-enabled")
Comment thread
arhimondr marked this conversation as resolved.
Outdated
@ConfigDescription("Serialize PartitionUpdate objects using binary SMILE encoding and compress with the ZSTD compression")
public HiveClientConfig setOptimizedPartitionUpdateSerializationEnabled(boolean optimizedPartitionUpdateSerializationEnabled)
{
this.optimizedPartitionUpdateSerializationEnabled = optimizedPartitionUpdateSerializationEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static com.facebook.airlift.json.smile.SmileCodecBinder.smileCodecBinder;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static java.lang.Math.toIntExact;
Expand Down Expand Up @@ -156,6 +157,7 @@ public void configure(Binder binder)
binder.bind(ConnectorMetadataUpdaterProvider.class).to(HiveMetadataUpdaterProvider.class).in(Scopes.SINGLETON);

jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class);
smileCodecBinder(binder).bindSmileCodec(PartitionUpdate.class);

binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(FileFormatDataSourceStats.class).as(generatedNameOf(FileFormatDataSourceStats.class, connectorId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.airlift.event.client.EventModule;
import com.facebook.airlift.json.JsonModule;
import com.facebook.airlift.json.smile.SmileModule;
import com.facebook.presto.cache.CachingModule;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.TypeManager;
Expand Down Expand Up @@ -106,6 +107,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
new EventModule(),
new MBeanModule(),
new JsonModule(),
new SmileModule(),
new HiveClientModule(catalogName),
new HiveS3Module(catalogName),
new HiveGcsModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.hive;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.smile.SmileCodec;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.predicate.Domain;
Expand Down Expand Up @@ -212,6 +213,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedPartitionUpdateSerializationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isPreferManifestsToListFiles;
import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat;
import static com.facebook.presto.hive.HiveSessionProperties.isShufflePartitionedColumnsForTableWriteEnabled;
Expand Down Expand Up @@ -262,6 +264,7 @@
import static com.facebook.presto.hive.HiveUtil.columnExtraInfo;
import static com.facebook.presto.hive.HiveUtil.decodeMaterializedViewData;
import static com.facebook.presto.hive.HiveUtil.decodeViewData;
import static com.facebook.presto.hive.HiveUtil.deserializeZstdCompressed;
import static com.facebook.presto.hive.HiveUtil.encodeMaterializedViewData;
import static com.facebook.presto.hive.HiveUtil.encodeViewData;
import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles;
Expand Down Expand Up @@ -384,6 +387,7 @@ public class HiveMetadata
private final FilterStatsCalculatorService filterStatsCalculatorService;
private final TableParameterCodec tableParameterCodec;
private final JsonCodec<PartitionUpdate> partitionUpdateCodec;
private final SmileCodec<PartitionUpdate> partitionUpdateSmileCodec;
private final boolean writesToNonManagedTablesEnabled;
private final boolean createsOfNonManagedTablesEnabled;
private final int maxPartitionBatchSize;
Expand Down Expand Up @@ -413,6 +417,7 @@ public HiveMetadata(
FilterStatsCalculatorService filterStatsCalculatorService,
TableParameterCodec tableParameterCodec,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
TypeTranslator typeTranslator,
String prestoVersion,
HiveStatisticsProvider hiveStatisticsProvider,
Expand All @@ -436,6 +441,7 @@ public HiveMetadata(
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.tableParameterCodec = requireNonNull(tableParameterCodec, "tableParameterCodec is null");
this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null");
this.writesToNonManagedTablesEnabled = writesToNonManagedTablesEnabled;
this.createsOfNonManagedTablesEnabled = createsOfNonManagedTablesEnabled;
this.maxPartitionBatchSize = maxPartitionBatchSize;
Expand Down Expand Up @@ -1601,7 +1607,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
{
HiveOutputTableHandle handle = (HiveOutputTableHandle) tableHandle;

List<PartitionUpdate> partitionUpdates = getPartitionUpdates(fragments);
List<PartitionUpdate> partitionUpdates = getPartitionUpdates(session, fragments);

Map<String, String> tableEncryptionParameters = ImmutableMap.of();
Map<String, String> partitionEncryptionParameters = ImmutableMap.of();
Expand Down Expand Up @@ -1921,7 +1927,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
{
HiveInsertTableHandle handle = (HiveInsertTableHandle) insertHandle;

List<PartitionUpdate> partitionUpdates = getPartitionUpdates(fragments);
List<PartitionUpdate> partitionUpdates = getPartitionUpdates(session, fragments);

HiveStorageFormat tableStorageFormat = handle.getTableStorageFormat();
partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates);
Expand Down Expand Up @@ -3219,7 +3225,7 @@ public CompletableFuture<Void> commitPageSinkAsync(ConnectorSession session, Con
handle.getTableName(),
handle.getLocationHandle().getTargetPath().toString(),
true,
getPartitionUpdates(fragments)));
getPartitionUpdates(session, fragments)));
}

@Override
Expand All @@ -3232,7 +3238,7 @@ public CompletableFuture<Void> commitPageSinkAsync(ConnectorSession session, Con
handle.getTableName(),
handle.getLocationHandle().getTargetPath().toString(),
false,
getPartitionUpdates(fragments)));
getPartitionUpdates(session, fragments)));
}

@Override
Expand Down Expand Up @@ -3272,12 +3278,22 @@ private static boolean hasAdminRole(Set<PrestoPrincipal> roles)
return roles.stream().anyMatch(principal -> principal.getName().equalsIgnoreCase(ADMIN_ROLE_NAME));
}

private List<PartitionUpdate> getPartitionUpdates(Collection<Slice> fragments)
public List<PartitionUpdate> getPartitionUpdates(ConnectorSession session, Collection<Slice> fragments)
{
return fragments.stream()
.map(Slice::getBytes)
.map(partitionUpdateCodec::fromJson)
.collect(toList());
boolean optimizedPartitionUpdateSerializationEnabled = isOptimizedPartitionUpdateSerializationEnabled(session);
ImmutableList.Builder<PartitionUpdate> result = ImmutableList.builder();
for (Slice fragment : fragments) {
byte[] bytes = fragment.getBytes();
PartitionUpdate partitionUpdate;
if (optimizedPartitionUpdateSerializationEnabled) {
partitionUpdate = deserializeZstdCompressed(partitionUpdateSmileCodec, bytes);
}
else {
partitionUpdate = partitionUpdateCodec.fromJson(bytes);
}
result.add(partitionUpdate);
}
return result.build();
}

private void verifyJvmTimeZone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.hive;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.smile.SmileCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.metastore.CachingHiveMetastore;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class HiveMetadataFactory
private final FilterStatsCalculatorService filterStatsCalculatorService;
private final TableParameterCodec tableParameterCodec;
private final JsonCodec<PartitionUpdate> partitionUpdateCodec;
private final SmileCodec<PartitionUpdate> partitionUpdateSmileCodec;
private final ListeningExecutorService fileRenameExecutor;
private final TypeTranslator typeTranslator;
private final StagingFileCommitter stagingFileCommitter;
Expand All @@ -83,6 +85,7 @@ public HiveMetadataFactory(
FilterStatsCalculatorService filterStatsCalculatorService,
TableParameterCodec tableParameterCodec,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
TypeTranslator typeTranslator,
StagingFileCommitter stagingFileCommitter,
ZeroRowFileCreator zeroRowFileCreator,
Expand Down Expand Up @@ -113,6 +116,7 @@ public HiveMetadataFactory(
filterStatsCalculatorService,
tableParameterCodec,
partitionUpdateCodec,
partitionUpdateSmileCodec,
fileRenameExecutor,
typeTranslator,
stagingFileCommitter,
Expand Down Expand Up @@ -145,6 +149,7 @@ public HiveMetadataFactory(
FilterStatsCalculatorService filterStatsCalculatorService,
TableParameterCodec tableParameterCodec,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
ListeningExecutorService fileRenameExecutor,
TypeTranslator typeTranslator,
StagingFileCommitter stagingFileCommitter,
Expand Down Expand Up @@ -175,6 +180,7 @@ public HiveMetadataFactory(
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.tableParameterCodec = requireNonNull(tableParameterCodec, "tableParameterCodec is null");
this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null");
this.fileRenameExecutor = requireNonNull(fileRenameExecutor, "fileRenameExecutor is null");
this.typeTranslator = requireNonNull(typeTranslator, "typeTranslator is null");
this.stagingFileCommitter = requireNonNull(stagingFileCommitter, "stagingFileCommitter is null");
Expand Down Expand Up @@ -220,6 +226,7 @@ public HiveMetadata get()
filterStatsCalculatorService,
tableParameterCodec,
partitionUpdateCodec,
partitionUpdateSmileCodec,
typeTranslator,
prestoVersion,
new MetastoreHiveStatisticsProvider(metastore),
Expand Down
Loading