Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.prestosql.plugin.hive.orc.OrcPageSourceFactory;
import io.prestosql.plugin.hive.orc.OrcReaderConfig;
import io.prestosql.plugin.hive.orc.OrcWriterConfig;
import io.prestosql.plugin.hive.parquet.ParquetFileWriterFactory;
Comment thread
qqibrow marked this conversation as resolved.
Outdated
import io.prestosql.plugin.hive.parquet.ParquetPageSourceFactory;
import io.prestosql.plugin.hive.parquet.ParquetReaderConfig;
import io.prestosql.plugin.hive.parquet.ParquetWriterConfig;
Expand Down Expand Up @@ -122,6 +123,7 @@ public void configure(Binder binder)

configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);
fileWriterFactoryBinder.addBinding().to(ParquetFileWriterFactory.class).in(Scopes.SINGLETON);

jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public final class HiveSessionProperties
private static final String IGNORE_ABSENT_PARTITIONS = "ignore_absent_partitions";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -363,6 +364,11 @@ public HiveSessionProperties(
PROJECTION_PUSHDOWN_ENABLED,
"Projection push down enabled for hive",
hiveConfig.isProjectionPushdownEnabled(),
false),
booleanProperty(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Experimental: Enable optimized writer",
parquetWriterConfig.isParquetOptimizedWriterEnabled(),
false));
}

Expand Down Expand Up @@ -624,4 +630,9 @@ public static boolean isProjectionPushdownEnabled(ConnectorSession session)
{
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isParquetOptimizedWriterEnabled(ConnectorSession session)
{
return session.getProperty(PARQUET_OPTIMIZED_WRITER_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.hive.parquet;

import com.google.common.collect.ImmutableList;
import io.prestosql.parquet.writer.ParquetWriter;
import io.prestosql.parquet.writer.ParquetWriterOptions;
import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.block.RunLengthEncodedBlock;
import io.prestosql.spi.type.Type;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.openjdk.jol.info.ClassLayout;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.Callable;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
import static java.util.Objects.requireNonNull;

public class ParquetFileWriter
implements FileWriter
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ParquetFileWriter.class).instanceSize();

private final ParquetWriter parquetWriter;
private final Callable<Void> rollbackAction;
private final int[] fileInputColumnIndexes;
private final List<Block> nullBlocks;

public ParquetFileWriter(
OutputStream outputStream,
Callable<Void> rollbackAction,
List<String> columnNames,
List<Type> fileColumnTypes,
ParquetWriterOptions parquetWriterOptions,
int[] fileInputColumnIndexes,
CompressionCodecName compressionCodecName)
{
requireNonNull(outputStream, "outputStream is null");

this.parquetWriter = new ParquetWriter(
outputStream,
columnNames,
fileColumnTypes,
parquetWriterOptions,
compressionCodecName);

this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction is null");
this.fileInputColumnIndexes = requireNonNull(fileInputColumnIndexes, "fileInputColumnIndexes is null");

ImmutableList.Builder<Block> nullBlocks = ImmutableList.builder();
for (Type fileColumnType : fileColumnTypes) {
BlockBuilder blockBuilder = fileColumnType.createBlockBuilder(null, 1, 0);
blockBuilder.appendNull();
nullBlocks.add(blockBuilder.build());
}
this.nullBlocks = nullBlocks.build();
}

@Override
public long getWrittenBytes()
{
return parquetWriter.getWrittenBytes();
}

@Override
public long getSystemMemoryUsage()
{
return INSTANCE_SIZE + parquetWriter.getRetainedBytes();
}

@Override
public void appendRows(Page dataPage)
{
Block[] blocks = new Block[fileInputColumnIndexes.length];
for (int i = 0; i < fileInputColumnIndexes.length; i++) {
int inputColumnIndex = fileInputColumnIndexes[i];
if (inputColumnIndex < 0) {
blocks[i] = new RunLengthEncodedBlock(nullBlocks.get(i), dataPage.getPositionCount());
}
else {
blocks[i] = dataPage.getBlock(inputColumnIndex);
}
}
Page page = new Page(dataPage.getPositionCount(), blocks);
try {
parquetWriter.write(page);
}
catch (IOException | UncheckedIOException e) {
throw new PrestoException(HIVE_WRITER_DATA_ERROR, e);
}
}

@Override
public void commit()
{
try {
parquetWriter.close();
}
catch (IOException | UncheckedIOException e) {
try {
rollbackAction.call();
}
catch (Exception ignored) {
// ignore
}
throw new PrestoException(HIVE_WRITER_CLOSE_ERROR, "Error committing write parquet to Hive", e);
}
}

@Override
public void rollback()
{
try {
try {
parquetWriter.close();
}
finally {
rollbackAction.call();
}
}
catch (Exception e) {
throw new PrestoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write parquet to Hive", e);
}
}

@Override
public long getValidationCpuNanos()
{
return 0;
}
Comment thread
qqibrow marked this conversation as resolved.
Outdated

@Override
public String toString()
{
return toStringHelper(this)
.add("writer", parquetWriter)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.hive.parquet;

import io.prestosql.parquet.writer.ParquetWriterOptions;
import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HiveFileWriterFactory;
import io.prestosql.plugin.hive.HiveSessionProperties;
Comment thread
qqibrow marked this conversation as resolved.
Outdated
import io.prestosql.plugin.hive.NodeVersion;
import io.prestosql.plugin.hive.metastore.StorageFormat;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTimeZone;

import javax.inject.Inject;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;

import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnNames;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnTypes;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class ParquetFileWriterFactory
implements HiveFileWriterFactory
{
private final DateTimeZone hiveStorageTimeZone;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final NodeVersion nodeVersion;

@Inject
public ParquetFileWriterFactory(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
NodeVersion nodeVersion,
HiveConfig hiveConfig)
{
this(
hdfsEnvironment,
typeManager,
nodeVersion,
requireNonNull(hiveConfig, "hiveConfig is null").getDateTimeZone());
}

public ParquetFileWriterFactory(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
NodeVersion nodeVersion,
DateTimeZone hiveStorageTimeZone)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.hiveStorageTimeZone = requireNonNull(hiveStorageTimeZone, "hiveStorageTimeZone is null");
}

@Override
public Optional<FileWriter> createFileWriter(
Path path,
List<String> inputColumnNames,
StorageFormat storageFormat,
Properties schema,
JobConf conf,
ConnectorSession session)
{
if (!HiveSessionProperties.isParquetOptimizedWriterEnabled(session)) {
return Optional.empty();
}

if (!MapredParquetOutputFormat.class.getName().equals(storageFormat.getOutputFormat())) {
return Optional.empty();
}

ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder()
.setMaxPageSize(HiveSessionProperties.getParquetWriterPageSize(session))
.setMaxBlockSize(HiveSessionProperties.getParquetWriterBlockSize(session))
.build();

CompressionCodecName compressionCodecName = getCompression(conf);

List<String> fileColumnNames = getColumnNames(schema);
List<Type> fileColumnTypes = getColumnTypes(schema).stream()
.map(hiveType -> hiveType.getType(typeManager))
.collect(toList());

int[] fileInputColumnIndexes = fileColumnNames.stream()
.mapToInt(inputColumnNames::indexOf)
.toArray();

try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), path, conf);

Callable<Void> rollbackAction = () -> {
fileSystem.delete(path, false);
return null;
};

return Optional.of(new ParquetFileWriter(
fileSystem.create(path),
rollbackAction,
fileColumnNames,
fileColumnTypes,
parquetWriterOptions,
fileInputColumnIndexes,
compressionCodecName));
}
catch (IOException e) {
throw new PrestoException(HIVE_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
}
}

private static CompressionCodecName getCompression(JobConf configuration)
{
String compressionName = configuration.get(ParquetOutputFormat.COMPRESSION);
if (compressionName == null) {
return CompressionCodecName.GZIP;
}
return CompressionCodecName.valueOf(compressionName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package io.prestosql.plugin.hive.parquet;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.prestosql.parquet.writer.ParquetWriterOptions;
import org.apache.parquet.hadoop.ParquetWriter;

public class ParquetWriterConfig
{
private boolean parquetOptimizedWriterEnabled;

private DataSize blockSize = DataSize.ofBytes(ParquetWriter.DEFAULT_BLOCK_SIZE);
private DataSize pageSize = DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE);

Expand All @@ -45,4 +49,25 @@ public ParquetWriterConfig setPageSize(DataSize pageSize)
this.pageSize = pageSize;
return this;
}

public boolean isParquetOptimizedWriterEnabled()
{
return parquetOptimizedWriterEnabled;
}

@Config("hive.parquet.optimized-writer.enabled")
@ConfigDescription("Enable optimized Parquet writer")
public ParquetWriterConfig setParquetOptimizedWriterEnabled(boolean parquetOptimizedWriterEnabled)
{
this.parquetOptimizedWriterEnabled = parquetOptimizedWriterEnabled;
return this;
}

public ParquetWriterOptions toParquetWriterOptions()
{
return ParquetWriterOptions.builder()
.setMaxBlockSize(getBlockSize())
.setMaxPageSize(getPageSize())
.build();
}
}
Loading