From 17e13717e27c3c5050e882c45f52147b86f213dc Mon Sep 17 00:00:00 2001 From: David Phillips Date: Tue, 11 Jul 2023 15:50:07 -0700 Subject: [PATCH] Use Hive native writers for creating empty bucket files --- .../hive/s3select/S3SelectTestHelper.java | 2 + .../io/trino/plugin/hive/HiveMetadata.java | 114 ++++++------------ .../plugin/hive/HiveMetadataFactory.java | 7 ++ .../trino/plugin/hive/AbstractTestHive.java | 1 + .../hive/AbstractTestHiveFileSystem.java | 1 + .../plugin/hive/BaseHiveConnectorTest.java | 16 +-- 6 files changed, 52 insertions(+), 89 deletions(-) diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java index a6ef12f7c602..00a8d48dbe0f 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java @@ -75,6 +75,7 @@ import static io.trino.plugin.hive.HiveFileSystemTestUtils.filterTable; import static io.trino.plugin.hive.HiveFileSystemTestUtils.getSplitsCount; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; +import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; @@ -146,6 +147,7 @@ public S3SelectTestHelper(String host, this.hiveConfig, new HiveMetastoreConfig(), HiveMetastoreFactory.ofInstance(metastoreClient), + getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, hivePartitionManager, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 9783b9f87fac..8e00abc6dc78 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -32,7 +32,6 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.TrinoOutputFile; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.CatalogName; @@ -59,7 +58,6 @@ import io.trino.plugin.hive.statistics.HiveStatisticsProvider; import io.trino.plugin.hive.util.HiveBucketing; import io.trino.plugin.hive.util.HiveUtil; -import io.trino.plugin.hive.util.HiveWriteUtils; import io.trino.plugin.hive.util.SerdeConstants; import io.trino.spi.ErrorType; import io.trino.spi.Page; @@ -128,8 +126,6 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.mapred.JobConf; import java.io.FileNotFoundException; import java.io.IOException; @@ -162,8 +158,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; -import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath; -import static io.trino.hdfs.ConfigurationUtils.toJobConf; +import static com.google.common.reflect.Reflection.newProxy; import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.hive.HiveAnalyzeProperties.getColumnNames; @@ -176,7 +171,6 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveColumnHandle.mergeRowIdColumnHandle; -import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; import static io.trino.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; @@ -185,7 +179,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED; import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName; @@ -283,11 +276,8 @@ import static io.trino.plugin.hive.util.AcidTables.isFullAcidTable; import static io.trino.plugin.hive.util.AcidTables.isTransactionalTable; import static io.trino.plugin.hive.util.AcidTables.writeAcidVersionFile; -import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression; import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle; import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing; -import static io.trino.plugin.hive.util.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS; -import static io.trino.plugin.hive.util.HiveClassNames.ORC_OUTPUT_FORMAT_CLASS; import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles; @@ -302,7 +292,6 @@ import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported; import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable; import static io.trino.plugin.hive.util.HiveWriteUtils.createPartitionValues; -import static io.trino.plugin.hive.util.HiveWriteUtils.initializeSerializer; import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery; import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType; import static io.trino.plugin.hive.util.RetryDriver.retry; @@ -375,6 +364,7 @@ public class HiveMetadata private final CatalogName catalogName; private final SemiTransactionalHiveMetastore metastore; private final boolean autoCommit; + private final Set fileWriterFactories; private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final HivePartitionManager partitionManager; @@ -403,6 +393,7 @@ public HiveMetadata( CatalogName catalogName, SemiTransactionalHiveMetastore metastore, boolean autoCommit, + Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -430,6 +421,7 @@ public HiveMetadata( this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.autoCommit = autoCommit; + this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); @@ -1748,12 +1740,13 @@ public Optional finishCreateTable(ConnectorSession sess partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, true, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, true, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table, partitionUpdate)); - createEmptyFiles(session, hadoopPath(partitionUpdate.getWritePath()), table, partition, partitionUpdate.getFileNames()); + Location writePath = Location.of(partitionUpdate.getWritePath().toString()); + createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames()); } if (handle.isTransactional()) { AcidTransaction transaction = handle.getTransaction(); @@ -1832,19 +1825,15 @@ public Optional finishCreateTable(ConnectorSession sess private List computePartitionUpdatesForMissingBuckets( ConnectorSession session, HiveWritableTableHandle handle, - Table table, boolean isCreateTable, List partitionUpdates) { ImmutableList.Builder partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder(); - HiveStorageFormat storageFormat = table.getPartitionColumns().isEmpty() ? handle.getTableStorageFormat() : handle.getPartitionStorageFormat(); for (PartitionUpdate partitionUpdate : partitionUpdates) { int bucketCount = handle.getBucketProperty().get().getBucketCount(); List fileNamesForMissingBuckets = computeFileNamesForMissingBuckets( session, - storageFormat, - hadoopPath(partitionUpdate.getTargetPath()), bucketCount, isCreateTable && handle.isTransactional(), partitionUpdate); @@ -1863,8 +1852,6 @@ private List computePartitionUpdatesForMissingBuckets( private List computeFileNamesForMissingBuckets( ConnectorSession session, - HiveStorageFormat storageFormat, - Path targetPath, int bucketCount, boolean transactionalCreateTable, PartitionUpdate partitionUpdate) @@ -1873,10 +1860,7 @@ private List computeFileNamesForMissingBuckets( // fast path for common case return ImmutableList.of(); } - HdfsContext hdfsContext = new HdfsContext(session); - JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, targetPath)); - configureCompression(conf, selectCompressionCodec(session, storageFormat)); - String fileExtension = HiveWriterFactory.getFileExtension(conf, fromHiveStorageFormat(storageFormat)); + Set fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames()); Set bucketsWithFiles = fileNames.stream() .map(HiveWriterFactory::getBucketFromFileName) @@ -1887,21 +1871,16 @@ private List computeFileNamesForMissingBuckets( if (bucketsWithFiles.contains(i)) { continue; } - String fileName; - if (transactionalCreateTable) { - fileName = computeTransactionalBucketedFilename(i) + fileExtension; - } - else { - fileName = computeNonTransactionalBucketedFilename(session.getQueryId(), i) + fileExtension; - } - missingFileNamesBuilder.add(fileName); + missingFileNamesBuilder.add(transactionalCreateTable + ? computeTransactionalBucketedFilename(i) + : computeNonTransactionalBucketedFilename(session.getQueryId(), i)); } List missingFileNames = missingFileNamesBuilder.build(); verify(fileNames.size() + missingFileNames.size() == bucketCount); return missingFileNames; } - private void createEmptyFiles(ConnectorSession session, Path path, Table table, Optional partition, List fileNames) + private void createEmptyFiles(ConnectorSession session, Location path, Table table, Optional partition, List fileNames) { Properties schema; StorageFormat format; @@ -1914,49 +1893,35 @@ private void createEmptyFiles(ConnectorSession session, Path path, Table table, format = table.getStorage().getStorageFormat(); } - HiveCompressionCodec compression = selectCompressionCodec(session, format); - if (format.getOutputFormat().equals(ORC_OUTPUT_FORMAT_CLASS) && (compression == HiveCompressionCodec.ZSTD)) { - compression = HiveCompressionCodec.GZIP; // ZSTD not supported by Hive ORC writer - } - JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path)); - configureCompression(conf, compression); - - // for simple line-oriented formats, just create an empty file directly - if (format.getOutputFormat().equals(HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS)) { - TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity()); - for (String fileName : fileNames) { - TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of(path.toString()).appendPath(fileName)); - try { - // create empty file - trinoOutputFile.create().close(); - } - catch (IOException e) { - throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e); - } - } - return; + for (String fileName : fileNames) { + Location location = path.appendPath(fileName); + fileWriterFactories.stream() + .map(factory -> factory.createFileWriter( + location, + ImmutableList.of(), + format, + HiveCompressionCodec.NONE, + schema, + nativeWriterAlwaysEnabled(session), + OptionalInt.empty(), + NO_ACID_TRANSACTION, + false, + WriterKind.INSERT)) + .flatMap(Optional::stream) + .findFirst() + .orElseThrow(() -> new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Writing not supported for " + format)) + .commit(); } - - hdfsEnvironment.doAs(session.getIdentity(), () -> { - for (String fileName : fileNames) { - writeEmptyFile(session, new Path(path, fileName), conf, schema, format.getSerde(), format.getOutputFormat()); - } - }); } - private static void writeEmptyFile(ConnectorSession session, Path target, JobConf conf, Properties properties, String serde, String outputFormatName) + private static ConnectorSession nativeWriterAlwaysEnabled(ConnectorSession session) { - // Some serializers such as Avro set a property in the schema. - initializeSerializer(conf, properties, serde); - - // The code below is not a try with resources because RecordWriter is not Closeable. - FileSinkOperator.RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(target, conf, properties, outputFormatName, session); - try { - recordWriter.close(false); - } - catch (IOException e) { - throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e); - } + return newProxy(ConnectorSession.class, (proxy, method, args) -> { + if (method.getName().equals("getProperty") && ((String) args[0]).endsWith("_native_writer_enabled")) { + return true; + } + return method.invoke(session, args); + }); } @Override @@ -2143,7 +2108,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc } if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, false, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, false, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { @@ -2160,7 +2125,8 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc statistics, handle.isRetriesEnabled()); } - createEmptyFiles(session, hadoopPath(partitionUpdate.getWritePath()), table, partition, partitionUpdate.getFileNames()); + Location writePath = Location.of(partitionUpdate.getWritePath().toString()); + createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames()); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java index 4419261eb5e7..991fdd36310b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive; +import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.airlift.concurrent.BoundedExecutor; import io.airlift.json.JsonCodec; @@ -56,6 +57,7 @@ public class HiveMetadataFactory private final boolean hideDeltaLakeTables; private final long perTransactionCacheMaximumSize; private final HiveMetastoreFactory metastoreFactory; + private final Set fileWriterFactories; private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final HivePartitionManager partitionManager; @@ -86,6 +88,7 @@ public HiveMetadataFactory( HiveConfig hiveConfig, HiveMetastoreConfig hiveMetastoreConfig, HiveMetastoreFactory metastoreFactory, + Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -108,6 +111,7 @@ public HiveMetadataFactory( this( catalogName, metastoreFactory, + fileWriterFactories, fileSystemFactory, hdfsEnvironment, partitionManager, @@ -146,6 +150,7 @@ public HiveMetadataFactory( public HiveMetadataFactory( CatalogName catalogName, HiveMetastoreFactory metastoreFactory, + Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -192,6 +197,7 @@ public HiveMetadataFactory( this.perTransactionCacheMaximumSize = perTransactionCacheMaximumSize; this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); @@ -248,6 +254,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm catalogName, metastore, autoCommit, + fileWriterFactories, fileSystemFactory, hdfsEnvironment, partitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 7855fb66848d..c90eed86a59c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -834,6 +834,7 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas metadataFactory = new HiveMetadataFactory( new CatalogName("hive"), HiveMetastoreFactory.ofInstance(metastoreClient), + getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, partitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 01d42a2ceeda..e22c4e290c3d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -213,6 +213,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec config, new HiveMetastoreConfig(), HiveMetastoreFactory.ofInstance(metastoreClient), + getDefaultHiveFileWriterFactories(config, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, hivePartitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 978579d5f158..6a7f6e3989ed 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -2135,21 +2135,7 @@ public void testEmptyBucketedTable() private void testEmptyBucketedTable(Session session, HiveStorageFormat storageFormat) { - for (HiveCompressionCodec compressionCodec : HiveCompressionCodec.values()) { - if ((storageFormat == HiveStorageFormat.AVRO) && (compressionCodec == HiveCompressionCodec.LZ4)) { - continue; - } - if ((storageFormat == HiveStorageFormat.PARQUET) && (compressionCodec == HiveCompressionCodec.LZ4)) { - // TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer - continue; - } - testEmptyBucketedTable( - Session.builder(session) - .setCatalogSessionProperty(session.getCatalog().orElseThrow(), "compression_codec", compressionCodec.name()) - .build(), - storageFormat, - true); - } + testEmptyBucketedTable(session, storageFormat, true); testEmptyBucketedTable(session, storageFormat, false); }