diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/CreateEmptyPartitionProcedure.java b/presto-hive/src/main/java/com/facebook/presto/hive/CreateEmptyPartitionProcedure.java index 76ca382c3c09d..5b9f1fe362400 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/CreateEmptyPartitionProcedure.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/CreateEmptyPartitionProcedure.java @@ -14,6 +14,7 @@ package com.facebook.presto.hive; import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.smile.SmileCodec; import com.facebook.presto.hive.LocationService.WriteInfo; import com.facebook.presto.hive.PartitionUpdate.UpdateMode; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; @@ -24,7 +25,6 @@ import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.procedure.Procedure.Argument; import com.google.common.collect.ImmutableList; -import io.airlift.slice.Slice; import io.airlift.slice.Slices; import org.apache.hadoop.hive.common.FileUtils; @@ -39,6 +39,8 @@ import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle; import static com.facebook.presto.common.type.StandardTypes.VARCHAR; +import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedPartitionUpdateSerializationEnabled; +import static com.facebook.presto.hive.HiveUtil.serializeZstdCompressed; import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS; import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -60,14 +62,21 @@ public class CreateEmptyPartitionProcedure private final ExtendedHiveMetastore metastore; private final LocationService locationService; private final JsonCodec partitionUpdateJsonCodec; + private final SmileCodec partitionUpdateSmileCodec; @Inject - public CreateEmptyPartitionProcedure(Supplier hiveMetadataFactory, ExtendedHiveMetastore metastore, LocationService locationService, JsonCodec partitionUpdateCodec) + public CreateEmptyPartitionProcedure( + Supplier hiveMetadataFactory, + ExtendedHiveMetastore metastore, + LocationService locationService, + JsonCodec partitionUpdateCodec, + SmileCodec partitionUpdateSmileCodec) { this.hiveMetadataFactory = requireNonNull(hiveMetadataFactory, "hiveMetadataFactory is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.locationService = requireNonNull(locationService, "locationService is null"); this.partitionUpdateJsonCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null"); + this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null"); } @Override @@ -115,23 +124,28 @@ private void doCreateEmptyPartition(ConnectorSession session, String schema, Str String partitionName = FileUtils.makePartName(actualPartitionColumnNames, partitionStringValues); WriteInfo writeInfo = locationService.getPartitionWriteInfo(hiveInsertTableHandle.getLocationHandle(), Optional.empty(), partitionName); - Slice serializedPartitionUpdate = Slices.wrappedBuffer( - partitionUpdateJsonCodec.toJsonBytes( - new PartitionUpdate( - partitionName, - UpdateMode.NEW, - writeInfo.getWritePath(), - writeInfo.getTargetPath(), - ImmutableList.of(), - 0, - 0, - 0, - writeInfo.getWritePath().getName().matches("\\d+")))); + PartitionUpdate partitionUpdate = new PartitionUpdate( + partitionName, + UpdateMode.NEW, + writeInfo.getWritePath(), + writeInfo.getTargetPath(), + ImmutableList.of(), + 0, + 0, + 0, + writeInfo.getWritePath().getName().matches("\\d+")); + byte[] serializedPartitionUpdate; + if (isOptimizedPartitionUpdateSerializationEnabled(session)) { + serializedPartitionUpdate = serializeZstdCompressed(partitionUpdateSmileCodec, partitionUpdate); + } + else { + serializedPartitionUpdate = partitionUpdateJsonCodec.toJsonBytes(partitionUpdate); + } hiveMetadata.finishInsert( session, hiveInsertTableHandle, - ImmutableList.of(serializedPartitionUpdate), + ImmutableList.of(Slices.wrappedBuffer(serializedPartitionUpdate)), ImmutableList.of()); hiveMetadata.commit(); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index b2380d267b8f8..ad7b4a63df2f4 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -188,6 +188,8 @@ public class HiveClientConfig private boolean manifestVerificationEnabled; private boolean undoMetastoreOperationsEnabled = true; + private boolean optimizedPartitionUpdateSerializationEnabled; + public int getMaxInitialSplits() { return maxInitialSplits; @@ -1594,4 +1596,17 @@ public boolean isUndoMetastoreOperationsEnabled() { return undoMetastoreOperationsEnabled; } + + public boolean isOptimizedPartitionUpdateSerializationEnabled() + { + return optimizedPartitionUpdateSerializationEnabled; + } + + @Config("hive.experimental-optimized-partition-update-serialization-enabled") + @ConfigDescription("Serialize PartitionUpdate objects using binary SMILE encoding and compress with the ZSTD compression") + public HiveClientConfig setOptimizedPartitionUpdateSerializationEnabled(boolean optimizedPartitionUpdateSerializationEnabled) + { + this.optimizedPartitionUpdateSerializationEnabled = optimizedPartitionUpdateSerializationEnabled; + return this; + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java index 4e460c19c1c14..d3bec46f1ab20 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java @@ -80,6 +80,7 @@ import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.configuration.ConfigBinder.configBinder; import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static com.facebook.airlift.json.smile.SmileCodecBinder.smileCodecBinder; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static java.lang.Math.toIntExact; @@ -156,6 +157,7 @@ public void configure(Binder binder) binder.bind(ConnectorMetadataUpdaterProvider.class).to(HiveMetadataUpdaterProvider.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class); + smileCodecBinder(binder).bindSmileCodec(PartitionUpdate.class); binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); newExporter(binder).export(FileFormatDataSourceStats.class).as(generatedNameOf(FileFormatDataSourceStats.class, connectorId)); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java index 3c4970614b0f3..0051e467e8425 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java @@ -17,6 +17,7 @@ import com.facebook.airlift.bootstrap.LifeCycleManager; import com.facebook.airlift.event.client.EventModule; import com.facebook.airlift.json.JsonModule; +import com.facebook.airlift.json.smile.SmileModule; import com.facebook.presto.cache.CachingModule; import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.common.type.TypeManager; @@ -106,6 +107,7 @@ public Connector create(String catalogName, Map config, Connecto new EventModule(), new MBeanModule(), new JsonModule(), + new SmileModule(), new HiveClientModule(catalogName), new HiveS3Module(catalogName), new HiveGcsModule(), diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index aa050e5496703..144d4826bd8fc 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -14,6 +14,7 @@ package com.facebook.presto.hive; import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.smile.SmileCodec; import com.facebook.presto.common.Subfield; import com.facebook.presto.common.block.Block; import com.facebook.presto.common.predicate.Domain; @@ -212,6 +213,7 @@ import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount; +import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedPartitionUpdateSerializationEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isPreferManifestsToListFiles; import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat; import static com.facebook.presto.hive.HiveSessionProperties.isShufflePartitionedColumnsForTableWriteEnabled; @@ -262,6 +264,7 @@ import static com.facebook.presto.hive.HiveUtil.columnExtraInfo; import static com.facebook.presto.hive.HiveUtil.decodeMaterializedViewData; import static com.facebook.presto.hive.HiveUtil.decodeViewData; +import static com.facebook.presto.hive.HiveUtil.deserializeZstdCompressed; import static com.facebook.presto.hive.HiveUtil.encodeMaterializedViewData; import static com.facebook.presto.hive.HiveUtil.encodeViewData; import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles; @@ -384,6 +387,7 @@ public class HiveMetadata private final FilterStatsCalculatorService filterStatsCalculatorService; private final TableParameterCodec tableParameterCodec; private final JsonCodec partitionUpdateCodec; + private final SmileCodec partitionUpdateSmileCodec; private final boolean writesToNonManagedTablesEnabled; private final boolean createsOfNonManagedTablesEnabled; private final int maxPartitionBatchSize; @@ -413,6 +417,7 @@ public HiveMetadata( FilterStatsCalculatorService filterStatsCalculatorService, TableParameterCodec tableParameterCodec, JsonCodec partitionUpdateCodec, + SmileCodec partitionUpdateSmileCodec, TypeTranslator typeTranslator, String prestoVersion, HiveStatisticsProvider hiveStatisticsProvider, @@ -436,6 +441,7 @@ public HiveMetadata( this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); this.tableParameterCodec = requireNonNull(tableParameterCodec, "tableParameterCodec is null"); this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null"); + this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null"); this.writesToNonManagedTablesEnabled = writesToNonManagedTablesEnabled; this.createsOfNonManagedTablesEnabled = createsOfNonManagedTablesEnabled; this.maxPartitionBatchSize = maxPartitionBatchSize; @@ -1601,7 +1607,7 @@ public Optional finishCreateTable(ConnectorSession sess { HiveOutputTableHandle handle = (HiveOutputTableHandle) tableHandle; - List partitionUpdates = getPartitionUpdates(fragments); + List partitionUpdates = getPartitionUpdates(session, fragments); Map tableEncryptionParameters = ImmutableMap.of(); Map partitionEncryptionParameters = ImmutableMap.of(); @@ -1921,7 +1927,7 @@ public Optional finishInsert(ConnectorSession session, { HiveInsertTableHandle handle = (HiveInsertTableHandle) insertHandle; - List partitionUpdates = getPartitionUpdates(fragments); + List partitionUpdates = getPartitionUpdates(session, fragments); HiveStorageFormat tableStorageFormat = handle.getTableStorageFormat(); partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); @@ -3219,7 +3225,7 @@ public CompletableFuture commitPageSinkAsync(ConnectorSession session, Con handle.getTableName(), handle.getLocationHandle().getTargetPath().toString(), true, - getPartitionUpdates(fragments))); + getPartitionUpdates(session, fragments))); } @Override @@ -3232,7 +3238,7 @@ public CompletableFuture commitPageSinkAsync(ConnectorSession session, Con handle.getTableName(), handle.getLocationHandle().getTargetPath().toString(), false, - getPartitionUpdates(fragments))); + getPartitionUpdates(session, fragments))); } @Override @@ -3272,12 +3278,22 @@ private static boolean hasAdminRole(Set roles) return roles.stream().anyMatch(principal -> principal.getName().equalsIgnoreCase(ADMIN_ROLE_NAME)); } - private List getPartitionUpdates(Collection fragments) + public List getPartitionUpdates(ConnectorSession session, Collection fragments) { - return fragments.stream() - .map(Slice::getBytes) - .map(partitionUpdateCodec::fromJson) - .collect(toList()); + boolean optimizedPartitionUpdateSerializationEnabled = isOptimizedPartitionUpdateSerializationEnabled(session); + ImmutableList.Builder result = ImmutableList.builder(); + for (Slice fragment : fragments) { + byte[] bytes = fragment.getBytes(); + PartitionUpdate partitionUpdate; + if (optimizedPartitionUpdateSerializationEnabled) { + partitionUpdate = deserializeZstdCompressed(partitionUpdateSmileCodec, bytes); + } + else { + partitionUpdate = partitionUpdateCodec.fromJson(bytes); + } + result.add(partitionUpdate); + } + return result.build(); } private void verifyJvmTimeZone() diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadataFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadataFactory.java index c69d289df6e89..244a9dbebc143 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadataFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadataFactory.java @@ -14,6 +14,7 @@ package com.facebook.presto.hive; import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.smile.SmileCodec; import com.facebook.airlift.log.Logger; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.metastore.CachingHiveMetastore; @@ -57,6 +58,7 @@ public class HiveMetadataFactory private final FilterStatsCalculatorService filterStatsCalculatorService; private final TableParameterCodec tableParameterCodec; private final JsonCodec partitionUpdateCodec; + private final SmileCodec partitionUpdateSmileCodec; private final ListeningExecutorService fileRenameExecutor; private final TypeTranslator typeTranslator; private final StagingFileCommitter stagingFileCommitter; @@ -83,6 +85,7 @@ public HiveMetadataFactory( FilterStatsCalculatorService filterStatsCalculatorService, TableParameterCodec tableParameterCodec, JsonCodec partitionUpdateCodec, + SmileCodec partitionUpdateSmileCodec, TypeTranslator typeTranslator, StagingFileCommitter stagingFileCommitter, ZeroRowFileCreator zeroRowFileCreator, @@ -113,6 +116,7 @@ public HiveMetadataFactory( filterStatsCalculatorService, tableParameterCodec, partitionUpdateCodec, + partitionUpdateSmileCodec, fileRenameExecutor, typeTranslator, stagingFileCommitter, @@ -145,6 +149,7 @@ public HiveMetadataFactory( FilterStatsCalculatorService filterStatsCalculatorService, TableParameterCodec tableParameterCodec, JsonCodec partitionUpdateCodec, + SmileCodec partitionUpdateSmileCodec, ListeningExecutorService fileRenameExecutor, TypeTranslator typeTranslator, StagingFileCommitter stagingFileCommitter, @@ -175,6 +180,7 @@ public HiveMetadataFactory( this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); this.tableParameterCodec = requireNonNull(tableParameterCodec, "tableParameterCodec is null"); this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null"); + this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null"); this.fileRenameExecutor = requireNonNull(fileRenameExecutor, "fileRenameExecutor is null"); this.typeTranslator = requireNonNull(typeTranslator, "typeTranslator is null"); this.stagingFileCommitter = requireNonNull(stagingFileCommitter, "stagingFileCommitter is null"); @@ -220,6 +226,7 @@ public HiveMetadata get() filterStatsCalculatorService, tableParameterCodec, partitionUpdateCodec, + partitionUpdateSmileCodec, typeTranslator, prestoVersion, new MetastoreHiveStatisticsProvider(metastore), diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java index 025f3c8ca2ca3..2e137a29fa9dd 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java @@ -15,6 +15,7 @@ import com.facebook.airlift.concurrent.MoreFutures; import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.smile.SmileCodec; import com.facebook.airlift.log.Logger; import com.facebook.presto.common.Page; import com.facebook.presto.common.block.Block; @@ -60,7 +61,10 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS; import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled; +import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedPartitionUpdateSerializationEnabled; +import static com.facebook.presto.hive.HiveUtil.serializeZstdCompressed; import static com.facebook.presto.hive.PartitionUpdate.FileWriteInfo; +import static com.facebook.presto.hive.PartitionUpdate.mergePartitionUpdates; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -95,6 +99,7 @@ public class HivePageSink private final ListeningExecutorService writeVerificationExecutor; private final JsonCodec partitionUpdateCodec; + private final SmileCodec partitionUpdateSmileCodec; private final List writers = new ArrayList<>(); @@ -120,6 +125,7 @@ public HivePageSink( int maxOpenWriters, ListeningExecutorService writeVerificationExecutor, JsonCodec partitionUpdateCodec, + SmileCodec partitionUpdateSmileCodec, ConnectorSession session, HiveMetadataUpdater hiveMetadataUpdater) { @@ -136,6 +142,7 @@ public HivePageSink( this.maxOpenWriters = maxOpenWriters; this.writeVerificationExecutor = requireNonNull(writeVerificationExecutor, "writeVerificationExecutor is null"); this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null"); + this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null"); requireNonNull(bucketProperty, "bucketProperty is null"); this.pagePartitioner = new HiveWriterPagePartitioner( @@ -224,17 +231,41 @@ public CompletableFuture> finish() private ListenableFuture> doFinish() { - ImmutableList.Builder partitionUpdates = ImmutableList.builder(); + ImmutableList.Builder partitionUpdatesBuilder = ImmutableList.builder(); List> verificationTasks = new ArrayList<>(); for (HiveWriter writer : writers) { writer.commit(); - PartitionUpdate partitionUpdate = writer.getPartitionUpdate(); - partitionUpdates.add(wrappedBuffer(partitionUpdateCodec.toJsonBytes(partitionUpdate))); + partitionUpdatesBuilder.add(writer.getPartitionUpdate()); writer.getVerificationTask() .map(Executors::callable) .ifPresent(verificationTasks::add); } - List result = partitionUpdates.build(); + + List partitionUpdates = partitionUpdatesBuilder.build(); + boolean optimizedPartitionUpdateSerializationEnabled = isOptimizedPartitionUpdateSerializationEnabled(session); + if (optimizedPartitionUpdateSerializationEnabled) { + // Merge multiple partition updates for a single partition into one. + // Multiple partition updates for a single partition are produced when writing into a bucketed table. + // Merged partition updates will contain multiple items in the fileWriteInfos list (one per bucket). + // This optimization should be enabled only together with the optimized serialization (compression + binary encoding). + // Since serialized fragments will be transmitted as Presto pages serializing a merged partition update to JSON without + // compression is unsafe, as it may cross the maximum page size limit. + partitionUpdates = mergePartitionUpdates(partitionUpdates); + } + + ImmutableList.Builder serializedPartitionUpdatesBuilder = ImmutableList.builder(); + for (PartitionUpdate partitionUpdate : partitionUpdates) { + byte[] serializedBytes; + if (optimizedPartitionUpdateSerializationEnabled) { + serializedBytes = serializeZstdCompressed(partitionUpdateSmileCodec, partitionUpdate); + } + else { + serializedBytes = partitionUpdateCodec.toBytes(partitionUpdate); + } + serializedPartitionUpdatesBuilder.add(wrappedBuffer(serializedBytes)); + } + + List serializedPartitionUpdates = serializedPartitionUpdatesBuilder.build(); writtenBytes = writers.stream() .mapToLong(HiveWriter::getWrittenBytes) @@ -258,14 +289,14 @@ private ListenableFuture> doFinish() } if (verificationTasks.isEmpty()) { - return Futures.immediateFuture(result); + return Futures.immediateFuture(serializedPartitionUpdates); } try { List> futures = writeVerificationExecutor.invokeAll(verificationTasks).stream() .map(future -> (ListenableFuture) future) .collect(toList()); - return Futures.transform(Futures.allAsList(futures), input -> result, directExecutor()); + return Futures.transform(Futures.allAsList(futures), input -> serializedPartitionUpdates, directExecutor()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java index e14c95bb599b5..58d137cf2478e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java @@ -15,6 +15,7 @@ import com.facebook.airlift.event.client.EventClient; import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.smile.SmileCodec; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.hive.metastore.HivePageSinkMetadataProvider; @@ -67,6 +68,7 @@ public class HivePageSinkProvider private final LocationService locationService; private final ListeningExecutorService writeVerificationExecutor; private final JsonCodec partitionUpdateCodec; + private final SmileCodec partitionUpdateSmileCodec; private final NodeManager nodeManager; private final EventClient eventClient; private final HiveSessionProperties hiveSessionProperties; @@ -87,6 +89,7 @@ public HivePageSinkProvider( MetastoreClientConfig metastoreClientConfig, LocationService locationService, JsonCodec partitionUpdateCodec, + SmileCodec partitionUpdateSmileCodec, NodeManager nodeManager, EventClient eventClient, HiveSessionProperties hiveSessionProperties, @@ -108,6 +111,7 @@ public HivePageSinkProvider( this.locationService = requireNonNull(locationService, "locationService is null"); this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(hiveClientConfig.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s"))); this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null"); + this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null"); this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.eventClient = requireNonNull(eventClient, "eventClient is null"); this.hiveSessionProperties = requireNonNull(hiveSessionProperties, "hiveSessionProperties is null"); @@ -193,6 +197,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean maxOpenPartitions, writeVerificationExecutor, partitionUpdateCodec, + partitionUpdateSmileCodec, session, hiveMetadataUpdater); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 6458469d4ffd9..cd5aa5238952f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -117,6 +117,7 @@ public final class HiveSessionProperties public static final String PREFER_MANIFESTS_TO_LIST_FILES = "prefer_manifests_to_list_files"; public static final String MANIFEST_VERIFICATION_ENABLED = "manifest_verification_enabled"; public static final String NEW_PARTITION_USER_SUPPLIED_PARAMETER = "new_partition_user_supplied_parameter"; + public static final String OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED = "optimized_partition_update_serialization_enabled"; private final List> sessionProperties; @@ -551,6 +552,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon NEW_PARTITION_USER_SUPPLIED_PARAMETER, "\"user_supplied\" parameter added to all newly created partitions", null, + true), + booleanProperty( + OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED, + "Serialize PartitionUpdate objects using binary SMILE encoding and compress with the ZSTD compression", + hiveClientConfig.isOptimizedPartitionUpdateSerializationEnabled(), true)); } @@ -966,4 +972,9 @@ public static Optional getNewPartitionUserSuppliedParameter(ConnectorSes { return Optional.ofNullable(session.getProperty(NEW_PARTITION_USER_SUPPLIED_PARAMETER, String.class)); } + + public static boolean isOptimizedPartitionUpdateSerializationEnabled(ConnectorSession session) + { + return session.getProperty(OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED, Boolean.class); + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java index d6f52f6913b47..d115a678ad86d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.hive; +import com.facebook.airlift.json.Codec; import com.facebook.presto.common.predicate.NullableValue; import com.facebook.presto.common.type.CharType; import com.facebook.presto.common.type.DecimalType; @@ -39,6 +40,8 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.SchemaTableName; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Splitter; @@ -86,7 +89,11 @@ import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -240,8 +247,8 @@ private HiveUtil() } schema.stringPropertyNames().stream() - .filter(schemaFilter) - .forEach(name -> jobConf.set(name, schema.getProperty(name))); + .filter(schemaFilter) + .forEach(name -> jobConf.set(name, schema.getProperty(name))); // add Airlift LZO and LZOP to head of codecs list so as to not override existing entries List codecs = newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(jobConf.get("io.compression.codecs", ""))); @@ -361,9 +368,9 @@ public static boolean shouldUseRecordReaderFromInputFormat(Configuration configu { InputFormat inputFormat = HiveUtil.getInputFormat(configuration, storage.getStorageFormat().getInputFormat(), false); return Arrays.stream(inputFormat.getClass().getAnnotations()) - .map(Annotation::annotationType) - .map(Class::getSimpleName) - .anyMatch(name -> name.equals("UseRecordReaderFromInputFormat")); + .map(Annotation::annotationType) + .map(Class::getSimpleName) + .anyMatch(name -> name.equals("UseRecordReaderFromInputFormat")); } public static long parseHiveDate(String value) @@ -1169,4 +1176,29 @@ private static TypeSignature translateHiveUnsupportedTypeSignatureForTemporaryTa return typeSignature; } + + public static byte[] serializeZstdCompressed(Codec codec, T instance) + { + try (ByteArrayOutputStream output = new ByteArrayOutputStream(); + ZstdOutputStreamNoFinalizer zstdOutput = new ZstdOutputStreamNoFinalizer(output)) { + codec.writeBytes(zstdOutput, instance); + zstdOutput.close(); + output.close(); + return output.toByteArray(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static T deserializeZstdCompressed(Codec codec, byte[] bytes) + { + try (InputStream input = new ByteArrayInputStream(bytes); + ZstdInputStreamNoFinalizer zstdInput = new ZstdInputStreamNoFinalizer(input)) { + return codec.readBytes(zstdInput); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 28e1fe8e5c3c9..d3ca012f7dc74 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -966,6 +966,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi FILTER_STATS_CALCULATOR_SERVICE, new TableParameterCodec(), HiveTestUtils.PARTITION_UPDATE_CODEC, + HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC, listeningDecorator(executor), new HiveTypeTranslator(), new HiveStagingFileCommitter(hdfsEnvironment, listeningDecorator(executor)), @@ -1005,6 +1006,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi getMetastoreClientConfig(), locationService, HiveTestUtils.PARTITION_UPDATE_CODEC, + HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC, new TestingNodeManager("fake-environment"), new HiveEventClient(), new HiveSessionProperties(hiveClientConfig, new OrcFileWriterConfig(), new ParquetFileWriterConfig()), diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java index a8bd7fb11c46f..b38fceedab9e8 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java @@ -14,7 +14,6 @@ package com.facebook.presto.hive; import com.facebook.airlift.concurrent.BoundedExecutor; -import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.stats.CounterStat; import com.facebook.presto.GroupByHashPageIndexerFactory; import com.facebook.presto.cache.CacheConfig; @@ -190,7 +189,6 @@ protected void setup(String host, int port, String databaseName, BiFunction partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class); metadataFactory = new HiveMetadataFactory( config, metastoreClientConfig, @@ -204,7 +202,8 @@ protected void setup(String host, int port, String databaseName, BiFunction PARTITION_UPDATE_CODEC = jsonCodec(PartitionUpdate.class); + public static final SmileCodec PARTITION_UPDATE_SMILE_CODEC = smileCodec(PartitionUpdate.class); public static final ConnectorSession SESSION = new TestingConnectorSession( new HiveSessionProperties(new HiveClientConfig(), new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties()); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index 08b3acb16420f..d4054e3230460 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -150,7 +150,8 @@ public void testDefaults() .setFileRenamingEnabled(false) .setPreferManifestsToListFiles(false) .setManifestVerificationEnabled(false) - .setUndoMetastoreOperationsEnabled(true)); + .setUndoMetastoreOperationsEnabled(true) + .setOptimizedPartitionUpdateSerializationEnabled(false)); } @Test @@ -262,6 +263,7 @@ public void testExplicitPropertyMappings() .put("hive.prefer-manifests-to-list-files", "true") .put("hive.manifest-verification-enabled", "true") .put("hive.undo-metastore-operations-enabled", "false") + .put("hive.experimental-optimized-partition-update-serialization-enabled", "true") .build(); HiveClientConfig expected = new HiveClientConfig() @@ -369,7 +371,8 @@ public void testExplicitPropertyMappings() .setFileRenamingEnabled(true) .setPreferManifestsToListFiles(true) .setManifestVerificationEnabled(true) - .setUndoMetastoreOperationsEnabled(false); + .setUndoMetastoreOperationsEnabled(false) + .setOptimizedPartitionUpdateSerializationEnabled(true); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index ce88233eb27eb..0b4ae0d17babc 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -100,6 +100,7 @@ import static com.facebook.presto.hive.HiveQueryRunner.createMaterializeExchangesSession; import static com.facebook.presto.hive.HiveSessionProperties.FILE_RENAMING_ENABLED; import static com.facebook.presto.hive.HiveSessionProperties.MANIFEST_VERIFICATION_ENABLED; +import static com.facebook.presto.hive.HiveSessionProperties.OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED; import static com.facebook.presto.hive.HiveSessionProperties.PREFER_MANIFESTS_TO_LIST_FILES; import static com.facebook.presto.hive.HiveSessionProperties.PUSHDOWN_FILTER_ENABLED; import static com.facebook.presto.hive.HiveSessionProperties.RCFILE_OPTIMIZED_WRITER_ENABLED; @@ -795,10 +796,15 @@ public void testCreateTableNonSupportedVarcharColumn() public void testCreatePartitionedBucketedTableAsFewRows() { // go through all storage formats to make sure the empty buckets are correctly created - testWithAllStorageFormats(this::testCreatePartitionedBucketedTableAsFewRows); + testWithAllStorageFormats((session, format) -> testCreatePartitionedBucketedTableAsFewRows(session, format, false)); + // test with optimized PartitionUpdate serialization + testWithAllStorageFormats((session, format) -> testCreatePartitionedBucketedTableAsFewRows(session, format, true)); } - private void testCreatePartitionedBucketedTableAsFewRows(Session session, HiveStorageFormat storageFormat) + private void testCreatePartitionedBucketedTableAsFewRows( + Session session, + HiveStorageFormat storageFormat, + boolean optimizedPartitionUpdateSerializationEnabled) { String tableName = "test_create_partitioned_bucketed_table_as_few_rows"; @@ -821,7 +827,7 @@ private void testCreatePartitionedBucketedTableAsFewRows(Session session, HiveSt assertUpdate( // make sure that we will get one file per bucket regardless of writer count configured - getParallelWriteSession(), + getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled), createTable, 3); @@ -839,10 +845,11 @@ private void testCreatePartitionedBucketedTableAsFewRows(Session session, HiveSt @Test public void testCreatePartitionedBucketedTableAs() { - testCreatePartitionedBucketedTableAs(HiveStorageFormat.RCBINARY); + testCreatePartitionedBucketedTableAs(HiveStorageFormat.RCBINARY, false); + testCreatePartitionedBucketedTableAs(HiveStorageFormat.RCBINARY, true); } - private void testCreatePartitionedBucketedTableAs(HiveStorageFormat storageFormat) + private void testCreatePartitionedBucketedTableAs(HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled) { String tableName = "test_create_partitioned_bucketed_table_as"; @@ -860,7 +867,7 @@ private void testCreatePartitionedBucketedTableAs(HiveStorageFormat storageForma assertUpdate( // make sure that we will get one file per bucket regardless of writer count configured - getParallelWriteSession(), + getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled), createTable, "SELECT count(*) from orders"); @@ -873,10 +880,11 @@ private void testCreatePartitionedBucketedTableAs(HiveStorageFormat storageForma @Test public void testCreatePartitionedBucketedTableAsWithUnionAll() { - testCreatePartitionedBucketedTableAsWithUnionAll(HiveStorageFormat.RCBINARY); + testCreatePartitionedBucketedTableAsWithUnionAll(HiveStorageFormat.RCBINARY, false); + testCreatePartitionedBucketedTableAsWithUnionAll(HiveStorageFormat.RCBINARY, true); } - private void testCreatePartitionedBucketedTableAsWithUnionAll(HiveStorageFormat storageFormat) + private void testCreatePartitionedBucketedTableAsWithUnionAll(HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled) { String tableName = "test_create_partitioned_bucketed_table_as_with_union_all"; @@ -899,7 +907,7 @@ private void testCreatePartitionedBucketedTableAsWithUnionAll(HiveStorageFormat assertUpdate( // make sure that we will get one file per bucket regardless of writer count configured - getParallelWriteSession(), + getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled), createTable, "SELECT count(*) from orders"); @@ -996,10 +1004,12 @@ public void testCreatePartitionedUnionAll() public void testInsertPartitionedBucketedTableFewRows() { // go through all storage formats to make sure the empty buckets are correctly created - testWithAllStorageFormats(this::testInsertPartitionedBucketedTableFewRows); + testWithAllStorageFormats((session, format) -> testInsertPartitionedBucketedTableFewRows(session, format, false)); + // test with optimized PartitionUpdate serialization + testWithAllStorageFormats((session, format) -> testInsertPartitionedBucketedTableFewRows(session, format, true)); } - private void testInsertPartitionedBucketedTableFewRows(Session session, HiveStorageFormat storageFormat) + private void testInsertPartitionedBucketedTableFewRows(Session session, HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled) { String tableName = "test_insert_partitioned_bucketed_table_few_rows"; @@ -1016,7 +1026,7 @@ private void testInsertPartitionedBucketedTableFewRows(Session session, HiveStor assertUpdate( // make sure that we will get one file per bucket regardless of writer count configured - getParallelWriteSession(), + getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled), "INSERT INTO " + tableName + " " + "VALUES " + " (VARCHAR 'a', VARCHAR 'b', VARCHAR 'c'), " + @@ -1081,6 +1091,12 @@ public void testCastNullToColumnTypes() @Test public void testCreateEmptyNonBucketedPartition() + { + testCreateEmptyNonBucketedPartition(false); + testCreateEmptyNonBucketedPartition(true); + } + + public void testCreateEmptyNonBucketedPartition(boolean optimizedPartitionUpdateSerializationEnabled) { String tableName = "test_insert_empty_partitioned_unbucketed_table"; assertUpdate("" + @@ -1094,7 +1110,11 @@ public void testCreateEmptyNonBucketedPartition() assertQuery(format("SELECT count(*) FROM \"%s$partitions\"", tableName), "SELECT 0"); // create an empty partition - assertUpdate(format("CALL system.create_empty_partition('%s', '%s', ARRAY['part'], ARRAY['%s'])", TPCH_SCHEMA, tableName, "empty")); + assertUpdate( + Session.builder(getSession()) + .setCatalogSessionProperty(catalog, OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED, optimizedPartitionUpdateSerializationEnabled + "") + .build(), + format("CALL system.create_empty_partition('%s', '%s', ARRAY['part'], ARRAY['%s'])", TPCH_SCHEMA, tableName, "empty")); assertQuery(format("SELECT count(*) FROM \"%s$partitions\"", tableName), "SELECT 1"); assertUpdate("DROP TABLE " + tableName); } @@ -1121,11 +1141,12 @@ public void testCreateEmptyUnpartitionedBucketedTable() public void testCreateEmptyBucketedPartition() { for (TestingHiveStorageFormat storageFormat : getAllTestingHiveStorageFormat()) { - testCreateEmptyBucketedPartition(storageFormat.getFormat()); + testCreateEmptyBucketedPartition(storageFormat.getFormat(), false); + testCreateEmptyBucketedPartition(storageFormat.getFormat(), true); } } - public void testCreateEmptyBucketedPartition(HiveStorageFormat storageFormat) + public void testCreateEmptyBucketedPartition(HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled) { String tableName = "test_insert_empty_partitioned_bucketed_table"; createPartitionedBucketedTable(tableName, storageFormat); @@ -1133,7 +1154,11 @@ public void testCreateEmptyBucketedPartition(HiveStorageFormat storageFormat) List orderStatusList = ImmutableList.of("F", "O", "P"); for (int i = 0; i < orderStatusList.size(); i++) { String sql = format("CALL system.create_empty_partition('%s', '%s', ARRAY['orderstatus'], ARRAY['%s'])", TPCH_SCHEMA, tableName, orderStatusList.get(i)); - assertUpdate(sql); + assertUpdate( + Session.builder(getSession()) + .setCatalogSessionProperty(catalog, OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED, optimizedPartitionUpdateSerializationEnabled + "") + .build(), + sql); assertQuery( format("SELECT count(*) FROM \"%s$partitions\"", tableName), "SELECT " + (i + 1)); @@ -1148,10 +1173,11 @@ public void testCreateEmptyBucketedPartition(HiveStorageFormat storageFormat) @Test public void testInsertPartitionedBucketedTable() { - testInsertPartitionedBucketedTable(HiveStorageFormat.RCBINARY); + testInsertPartitionedBucketedTable(HiveStorageFormat.RCBINARY, false); + testInsertPartitionedBucketedTable(HiveStorageFormat.RCBINARY, true); } - private void testInsertPartitionedBucketedTable(HiveStorageFormat storageFormat) + private void testInsertPartitionedBucketedTable(HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled) { String tableName = "test_insert_partitioned_bucketed_table"; createPartitionedBucketedTable(tableName, storageFormat); @@ -1161,7 +1187,7 @@ private void testInsertPartitionedBucketedTable(HiveStorageFormat storageFormat) String orderStatus = orderStatusList.get(i); assertUpdate( // make sure that we will get one file per bucket regardless of writer count configured - getParallelWriteSession(), + getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled), format( "INSERT INTO " + tableName + " " + "SELECT custkey, custkey AS custkey2, comment, orderstatus " + @@ -1195,10 +1221,11 @@ private void createPartitionedBucketedTable(String tableName, HiveStorageFormat @Test public void testInsertPartitionedBucketedTableWithUnionAll() { - testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat.RCBINARY); + testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat.RCBINARY, false); + testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat.RCBINARY, true); } - private void testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat storageFormat) + private void testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat storageFormat, boolean optimizedPartitionUpdateSerializationEnabled) { String tableName = "test_insert_partitioned_bucketed_table_with_union_all"; @@ -1219,7 +1246,7 @@ private void testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat st String orderStatus = orderStatusList.get(i); assertUpdate( // make sure that we will get one file per bucket regardless of writer count configured - getParallelWriteSession(), + getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled), format( "INSERT INTO " + tableName + " " + "SELECT custkey, custkey AS custkey2, comment, orderstatus " + @@ -5520,10 +5547,11 @@ protected String withRetentionDays(int days) return ""; } - private Session getParallelWriteSession() + private Session getTableWriteTestingSession(boolean optimizedPartitionUpdateSerializationEnabled) { return Session.builder(getSession()) .setSystemProperty("task_writer_count", "4") + .setCatalogSessionProperty(catalog, OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED, optimizedPartitionUpdateSerializationEnabled + "") .build(); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMetadataFileFormatEncryptionSettings.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMetadataFileFormatEncryptionSettings.java index 56957958c4318..6ff087787051e 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMetadataFileFormatEncryptionSettings.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMetadataFileFormatEncryptionSettings.java @@ -72,6 +72,7 @@ import static com.facebook.presto.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static com.facebook.presto.hive.HiveTestUtils.HIVE_CLIENT_CONFIG; import static com.facebook.presto.hive.HiveTestUtils.PARTITION_UPDATE_CODEC; +import static com.facebook.presto.hive.HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC; import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE; import static com.facebook.presto.hive.PartitionUpdate.UpdateMode.APPEND; import static com.facebook.presto.hive.PartitionUpdate.UpdateMode.NEW; @@ -127,6 +128,7 @@ public void setup() FILTER_STATS_CALCULATOR_SERVICE, new TableParameterCodec(), PARTITION_UPDATE_CODEC, + PARTITION_UPDATE_SMILE_CODEC, listeningDecorator(executor), new HiveTypeTranslator(), new HiveStagingFileCommitter(HDFS_ENVIRONMENT, listeningDecorator(executor)), diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java index b04f38295c824..5e12ab781a345 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java @@ -13,7 +13,6 @@ */ package com.facebook.presto.hive; -import com.facebook.airlift.json.JsonCodec; import com.facebook.presto.GroupByHashPageIndexerFactory; import com.facebook.presto.common.Page; import com.facebook.presto.common.PageBuilder; @@ -302,7 +301,6 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio "test", ImmutableMap.of(), Optional.empty()); - JsonCodec partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class); HdfsEnvironment hdfsEnvironment = createTestHdfsEnvironment(config, metastoreClientConfig); HivePageSinkProvider provider = new HivePageSinkProvider( getDefaultHiveFileWriterFactories(config, metastoreClientConfig), @@ -314,7 +312,8 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio config, metastoreClientConfig, new HiveLocationService(hdfsEnvironment), - partitionUpdateCodec, + HiveTestUtils.PARTITION_UPDATE_CODEC, + HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC, new TestingNodeManager("fake-environment"), new HiveEventClient(), new HiveSessionProperties(config, new OrcFileWriterConfig(), new ParquetFileWriterConfig()), diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java index 59d6e8ae37f95..a999f2b0367a8 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java @@ -52,6 +52,7 @@ import static com.facebook.presto.SystemSessionProperties.TASK_WRITER_COUNT; import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG; import static com.facebook.presto.hive.HiveQueryRunner.TPCH_BUCKETED_SCHEMA; +import static com.facebook.presto.hive.HiveSessionProperties.OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED; import static com.facebook.presto.hive.HiveSessionProperties.VIRTUAL_BUCKET_COUNT; import static com.facebook.presto.spi.security.SelectedRole.Type.ALL; import static com.facebook.presto.spi.security.SelectedRole.Type.ROLE; @@ -136,19 +137,20 @@ public void shutdown() queryRunner = null; } - @DataProvider(name = "writerConcurrency") - public static Object[][] writerConcurrency() + @DataProvider(name = "testSettings") + public static Object[][] testSettings() { - return new Object[][] {{1}, {2}}; + return new Object[][] {{1, true}, {2, false}, {2, true}}; } - @Test(timeOut = TEST_TIMEOUT, dataProvider = "writerConcurrency", invocationCount = INVOCATION_COUNT) - public void testCreateBucketedTable(int writerConcurrency) + @Test(timeOut = TEST_TIMEOUT, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT) + public void testCreateBucketedTable(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception { testRecoverableGroupedExecution( queryRunner, writerConcurrency, + optimizedPartitionUpdateSerializationEnabled, ImmutableList.of( "CREATE TABLE create_bucketed_table_1\n" + "WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\n" + @@ -182,13 +184,14 @@ public void testCreateBucketedTable(int writerConcurrency) "DROP TABLE IF EXISTS create_bucketed_table_failure")); } - @Test(timeOut = TEST_TIMEOUT, dataProvider = "writerConcurrency", invocationCount = INVOCATION_COUNT) - public void testInsertBucketedTable(int writerConcurrency) + @Test(timeOut = TEST_TIMEOUT, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT) + public void testInsertBucketedTable(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception { testRecoverableGroupedExecution( queryRunner, writerConcurrency, + optimizedPartitionUpdateSerializationEnabled, ImmutableList.of( "CREATE TABLE insert_bucketed_table_1\n" + "WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\n" + @@ -226,13 +229,14 @@ public void testInsertBucketedTable(int writerConcurrency) "DROP TABLE IF EXISTS insert_bucketed_table_failure")); } - @Test(timeOut = TEST_TIMEOUT, dataProvider = "writerConcurrency", invocationCount = INVOCATION_COUNT) - public void testCreateUnbucketedTableWithGroupedExecution(int writerConcurrency) + @Test(timeOut = TEST_TIMEOUT, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT) + public void testCreateUnbucketedTableWithGroupedExecution(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception { testRecoverableGroupedExecution( queryRunner, writerConcurrency, + optimizedPartitionUpdateSerializationEnabled, ImmutableList.of( "CREATE TABLE create_unbucketed_table_with_grouped_execution_1\n" + "WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\n" + @@ -266,13 +270,14 @@ public void testCreateUnbucketedTableWithGroupedExecution(int writerConcurrency) "DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_failure")); } - @Test(timeOut = TEST_TIMEOUT, dataProvider = "writerConcurrency", invocationCount = INVOCATION_COUNT) - public void testInsertUnbucketedTableWithGroupedExecution(int writerConcurrency) + @Test(timeOut = TEST_TIMEOUT, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT) + public void testInsertUnbucketedTableWithGroupedExecution(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception { testRecoverableGroupedExecution( queryRunner, writerConcurrency, + optimizedPartitionUpdateSerializationEnabled, ImmutableList.of( "CREATE TABLE insert_unbucketed_table_with_grouped_execution_1\n" + "WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\n" + @@ -317,6 +322,7 @@ public void testCountOnUnbucketedTable() testRecoverableGroupedExecution( queryRunner, 4, + true, ImmutableList.of( "CREATE TABLE test_table AS\n" + "SELECT orderkey, comment\n" + @@ -335,6 +341,7 @@ public void testCountOnUnbucketedTable() private void testRecoverableGroupedExecution( DistributedQueryRunner queryRunner, int writerConcurrency, + boolean optimizedPartitionUpdateSerializationEnabled, List preQueries, @Language("SQL") String queryWithoutFailure, @Language("SQL") String queryWithFailure, @@ -344,7 +351,7 @@ private void testRecoverableGroupedExecution( { waitUntilAllNodesAreHealthy(queryRunner, new Duration(10, SECONDS)); - Session recoverableSession = createRecoverableSession(writerConcurrency); + Session recoverableSession = createRecoverableSession(writerConcurrency, optimizedPartitionUpdateSerializationEnabled); for (@Language("SQL") String postQuery : postQueries) { queryRunner.execute(recoverableSession, postQuery); } @@ -423,7 +430,7 @@ private static void cancelAllTasks(TestingPrestoServer server) server.getTaskManager().getAllTaskInfo().forEach(task -> server.getTaskManager().cancelTask(task.getTaskId())); } - private static Session createRecoverableSession(int writerConcurrency) + private static Session createRecoverableSession(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) { Identity identity = new Identity( "hive", @@ -449,6 +456,7 @@ private static Session createRecoverableSession(int writerConcurrency) .setSystemProperty(HASH_PARTITION_COUNT, "11") .setSystemProperty(MAX_STAGE_RETRIES, "4") .setCatalogSessionProperty(HIVE_CATALOG, VIRTUAL_BUCKET_COUNT, "16") + .setCatalogSessionProperty(HIVE_CATALOG, OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED, optimizedPartitionUpdateSerializationEnabled + "") .setCatalog(HIVE_CATALOG) .setSchema(TPCH_BUCKETED_SCHEMA) .build(); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java index ed3edefa97d27..d3695ade02a98 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java @@ -501,6 +501,7 @@ private void assertRedundantColumnDomains(Range predicateRange, PartitionStatist FILTER_STATS_CALCULATOR_SERVICE, new TableParameterCodec(), HiveTestUtils.PARTITION_UPDATE_CODEC, + HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC, executor, new HiveTypeTranslator(), new HiveStagingFileCommitter(hdfsEnvironment, executor), diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java index 9e0f8b598870e..606812dbfd9f6 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java @@ -1057,17 +1057,46 @@ private List> doExecute(Su waitForActionsCompletionWithTimeout(inputFutures.values(), computeNextTimeout(queryCompletionDeadline), MILLISECONDS, waitTimeMetrics); - Map> inputs = inputFutures.entrySet().stream() - .collect(toImmutableMap( - Map.Entry::getKey, - entry -> getUnchecked(entry.getValue()).stream().map(Tuple2::_2).collect(toImmutableList()))); + ImmutableMap.Builder> inputs = ImmutableMap.builder(); + long totalNumberOfPagesReceived = 0; + long totalCompressedSizeInBytes = 0; + long totalUncompressedSizeInBytes = 0; + for (Map.Entry>>> inputFuture : inputFutures.entrySet()) { + // Use a mutable list to allow memory release on per page basis + List pages = new ArrayList<>(); + List> tuples = getUnchecked(inputFuture.getValue()); + long currentFragmentOutputCompressedSizeInBytes = 0; + long currentFragmentOutputUncompressedSizeInBytes = 0; + for (Tuple2 tuple : tuples) { + PrestoSparkSerializedPage page = tuple._2; + currentFragmentOutputCompressedSizeInBytes += page.getSize(); + currentFragmentOutputUncompressedSizeInBytes += page.getUncompressedSizeInBytes(); + pages.add(page); + } + log.info( + "Received %s pages from fragment %s. Compressed size: %s. Uncompressed size: %s.", + pages.size(), + inputFuture.getKey(), + DataSize.succinctBytes(currentFragmentOutputCompressedSizeInBytes), + DataSize.succinctBytes(currentFragmentOutputUncompressedSizeInBytes)); + totalNumberOfPagesReceived += pages.size(); + totalCompressedSizeInBytes += currentFragmentOutputCompressedSizeInBytes; + totalUncompressedSizeInBytes += currentFragmentOutputUncompressedSizeInBytes; + inputs.put(inputFuture.getKey(), pages); + } + + log.info( + "Received %s pages in total. Compressed size: %s. Uncompressed size: %s.", + totalNumberOfPagesReceived, + DataSize.succinctBytes(totalCompressedSizeInBytes), + DataSize.succinctBytes(totalUncompressedSizeInBytes)); IPrestoSparkTaskExecutor prestoSparkTaskExecutor = taskExecutorFactory.create( 0, 0, serializedTaskDescriptor, emptyScalaIterator(), - new PrestoSparkTaskInputs(ImmutableMap.of(), ImmutableMap.of(), inputs), + new PrestoSparkTaskInputs(ImmutableMap.of(), ImmutableMap.of(), inputs.build()), taskInfoCollector, shuffleStatsCollector, PrestoSparkSerializedPage.class); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java index efde884e5ed36..087da7b8e5b56 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java @@ -124,6 +124,7 @@ import static com.facebook.presto.spark.PrestoSparkSessionProperties.getStorageBasedBroadcastJoinWriteBufferSize; import static com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats.Operation.WRITE; import static com.facebook.presto.spark.util.PrestoSparkUtils.deserializeZstdCompressed; +import static com.facebook.presto.spark.util.PrestoSparkUtils.getNullifyingIterator; import static com.facebook.presto.spark.util.PrestoSparkUtils.serializeZstdCompressed; import static com.facebook.presto.spark.util.PrestoSparkUtils.toPrestoSparkSerializedPage; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; @@ -416,7 +417,8 @@ public IPrestoSparkTaskExecutor doCreate( } if (inMemoryInput != null) { - remoteSourcePageInputs.add(inMemoryInput.iterator()); + // for inmemory inputs pages can be released incrementally to save memory + remoteSourcePageInputs.add(getNullifyingIterator(inMemoryInput)); continue; } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkUtils.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkUtils.java index 23be9f8665406..097628cb0b938 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkUtils.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkUtils.java @@ -24,6 +24,7 @@ import com.github.luben.zstd.Zstd; import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import com.google.common.collect.AbstractIterator; import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.slice.Slice; import io.airlift.slice.Slices; @@ -38,6 +39,8 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -248,4 +251,24 @@ public static ClassTag classTag(Class clazz) { return scala.reflect.ClassTag$.MODULE$.apply(clazz); } + + public static Iterator getNullifyingIterator(List list) + { + return new AbstractIterator() + { + private int index; + + @Override + protected T computeNext() + { + if (index >= list.size()) { + return endOfData(); + } + T element = list.get(index); + list.set(index, null); + index++; + return element; + } + }; + } } diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java index 32f425f36a1af..05cd7fa8d16b1 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java @@ -271,7 +271,8 @@ public PrestoSparkQueryRunner(String defaultCatalog, Map additio metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject("hive_test")); pluginManager.installPlugin(new HivePlugin("hive", Optional.of(metastore))); - connectorManager.createConnection("hive", "hive", ImmutableMap.of()); + connectorManager.createConnection("hive", "hive", ImmutableMap.of( + "hive.experimental-optimized-partition-update-serialization-enabled", "true")); metadata.registerBuiltInFunctions(AbstractTestQueries.CUSTOM_FUNCTIONS); diff --git a/presto-spark-testing/src/test/java/com/facebook/presto/spark/testing/TestPrestoSparkLauncherIntegrationSmokeTest.java b/presto-spark-testing/src/test/java/com/facebook/presto/spark/testing/TestPrestoSparkLauncherIntegrationSmokeTest.java index 9b51226f418a9..df53ff8871a18 100644 --- a/presto-spark-testing/src/test/java/com/facebook/presto/spark/testing/TestPrestoSparkLauncherIntegrationSmokeTest.java +++ b/presto-spark-testing/src/test/java/com/facebook/presto/spark/testing/TestPrestoSparkLauncherIntegrationSmokeTest.java @@ -136,7 +136,8 @@ public void setUp() hiveConnectorFactory, ImmutableMap.of( "hive.metastore.uri", "thrift://127.0.0.1:9083", - "hive.time-zone", TIME_ZONE.getID())); + "hive.time-zone", TIME_ZONE.getID(), + "hive.experimental-optimized-partition-update-serialization-enabled", "true")); localQueryRunner.createCatalog("tpch", new TpchConnectorFactory(), ImmutableMap.of()); // it may take some time for the docker container to start ensureHiveIsRunning(localQueryRunner, new Duration(10, MINUTES));