Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public final class HiveSessionProperties
private static final String ORC_TINY_STRIPE_THRESHOLD = "orc_tiny_stripe_threshold";
private static final String ORC_MAX_READ_BLOCK_SIZE = "orc_max_read_block_size";
private static final String ORC_LAZY_READ_SMALL_RANGES = "orc_lazy_read_small_ranges";
private static final String ORC_NESTED_LAZY_ENABLED = "orc_nested_lazy_enabled";
private static final String ORC_STRING_STATISTICS_LIMIT = "orc_string_statistics_limit";
private static final String ORC_OPTIMIZED_WRITER_VALIDATE = "orc_optimized_writer_validate";
private static final String ORC_OPTIMIZED_WRITER_VALIDATE_PERCENTAGE = "orc_optimized_writer_validate_percentage";
Expand Down Expand Up @@ -168,6 +169,11 @@ public HiveSessionProperties(
"Experimental: ORC: Read small file segments lazily",
orcReaderConfig.isLazyReadSmallRanges(),
false),
booleanProperty(
ORC_NESTED_LAZY_ENABLED,
"Experimental: ORC: Lazily read nested data",
orcReaderConfig.isNestedLazy(),
false),
dataSizeProperty(
ORC_STRING_STATISTICS_LIMIT,
"ORC: Maximum size of string statistics; drop if exceeding",
Expand Down Expand Up @@ -385,6 +391,11 @@ public static boolean getOrcLazyReadSmallRanges(ConnectorSession session)
return session.getProperty(ORC_LAZY_READ_SMALL_RANGES, Boolean.class);
}

public static boolean isOrcNestedLazy(ConnectorSession session)
{
return session.getProperty(ORC_NESTED_LAZY_ENABLED, Boolean.class);
}

public static DataSize getOrcStringStatisticsLimit(ConnectorSession session)
{
return session.getProperty(ORC_STRING_STATISTICS_LIMIT, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,26 @@
*/
package io.prestosql.plugin.hive.orc;

import com.google.common.collect.ImmutableList;
import io.prestosql.memory.context.AggregatedMemoryContext;
import io.prestosql.orc.OrcCorruptionException;
import io.prestosql.orc.OrcDataSource;
import io.prestosql.orc.OrcDataSourceId;
import io.prestosql.orc.OrcRecordReader;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.LazyBlock;
import io.prestosql.spi.block.LazyBlockLoader;
import io.prestosql.spi.block.RunLengthEncodedBlock;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.type.Type;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.List;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static io.prestosql.orc.OrcReader.MAX_BATCH_SIZE;
import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
import static java.lang.String.format;
Expand All @@ -43,12 +42,9 @@ public class OrcPageSource
implements ConnectorPageSource
{
private final OrcRecordReader recordReader;
private final List<ColumnAdaptation> columnAdaptations;
private final OrcDataSource orcDataSource;

private final Block[] constantBlocks;
private final int[] hiveColumnIndexes;

private int batchId;
private boolean closed;

private final AggregatedMemoryContext systemMemoryContext;
Expand All @@ -57,31 +53,15 @@ public class OrcPageSource

public OrcPageSource(
OrcRecordReader recordReader,
List<ColumnAdaptation> columnAdaptations,
OrcDataSource orcDataSource,
Map<Integer, Type> includedColumns,
AggregatedMemoryContext systemMemoryContext,
FileFormatDataSourceStats stats)
{
this.recordReader = requireNonNull(recordReader, "recordReader is null");
this.columnAdaptations = ImmutableList.copyOf(requireNonNull(columnAdaptations, "columnAdaptations is null"));
this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null");

int size = requireNonNull(includedColumns, "includedColumns is null").size();

this.stats = requireNonNull(stats, "stats is null");

this.constantBlocks = new Block[size];
this.hiveColumnIndexes = new int[size];

int blockIndex = 0;
for (Map.Entry<Integer, Type> entry : includedColumns.entrySet()) {
hiveColumnIndexes[blockIndex] = entry.getKey();
if (!recordReader.isColumnPresent(hiveColumnIndexes[blockIndex])) {
Type type = entry.getValue();
constantBlocks[blockIndex] = RunLengthEncodedBlock.create(type, null, MAX_BATCH_SIZE);
}
blockIndex++;
}

this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
}

Expand All @@ -106,37 +86,36 @@ public boolean isFinished()
@Override
public Page getNextPage()
{
Page page;
try {
batchId++;
int batchSize = recordReader.nextBatch();
if (batchSize <= 0) {
close();
return null;
}

Block[] blocks = new Block[hiveColumnIndexes.length];
for (int fieldId = 0; fieldId < blocks.length; fieldId++) {
if (constantBlocks[fieldId] != null) {
blocks[fieldId] = constantBlocks[fieldId].getRegion(0, batchSize);
}
else {
blocks[fieldId] = new LazyBlock(batchSize, new OrcBlockLoader(hiveColumnIndexes[fieldId]));
}
}
return new Page(batchSize, blocks);
page = recordReader.nextPage();
}
catch (PrestoException e) {
catch (IOException | RuntimeException e) {
closeWithSuppression(e);
throw e;
throw handleException(orcDataSource.getId(), e);
}
catch (OrcCorruptionException e) {
closeWithSuppression(e);
throw new PrestoException(HIVE_BAD_DATA, e);

if (page == null) {
close();
return null;
}
catch (IOException | RuntimeException e) {
closeWithSuppression(e);
throw new PrestoException(HIVE_CURSOR_ERROR, format("Failed to read ORC file: %s", orcDataSource.getId()), e);

Block[] blocks = new Block[columnAdaptations.size()];
for (int i = 0; i < columnAdaptations.size(); i++) {
blocks[i] = columnAdaptations.get(i).block(page);
}
return new Page(page.getPositionCount(), blocks);
}

static PrestoException handleException(OrcDataSourceId dataSourceId, Exception exception)
{
if (exception instanceof PrestoException) {
return (PrestoException) exception;
}
if (exception instanceof OrcCorruptionException) {
return new PrestoException(HIVE_BAD_DATA, exception);
}
return new PrestoException(HIVE_CURSOR_ERROR, format("Failed to read ORC file: %s", dataSourceId), exception);
}

@Override
Expand All @@ -161,7 +140,8 @@ public void close()
public String toString()
{
return toStringHelper(this)
.add("hiveColumnIndexes", hiveColumnIndexes)
.add("orcDataSource", orcDataSource.getId())
.add("columns", columnAdaptations)
.toString();
}

Expand All @@ -171,7 +151,7 @@ public long getSystemMemoryUsage()
return systemMemoryContext.getBytes();
}

protected void closeWithSuppression(Throwable throwable)
private void closeWithSuppression(Throwable throwable)
{
requireNonNull(throwable, "throwable is null");
try {
Expand All @@ -185,36 +165,73 @@ protected void closeWithSuppression(Throwable throwable)
}
}

private final class OrcBlockLoader
implements LazyBlockLoader<LazyBlock>
public interface ColumnAdaptation
{
private final int expectedBatchId = batchId;
private final int columnIndex;
private boolean loaded;
Block block(Page sourcePage);

public OrcBlockLoader(int columnIndex)
static ColumnAdaptation nullColumn(Type type)
{
this.columnIndex = columnIndex;
return new NullColumn(type);
}

static ColumnAdaptation sourceColumn(int index)
{
return new SourceColumn(index);
}
}

private static class NullColumn
implements ColumnAdaptation
{
private final Type type;
private final Block nullBlock;

public NullColumn(Type type)
{
this.type = requireNonNull(type, "type is null");
this.nullBlock = type.createBlockBuilder(null, 1, 0)
.appendNull()
.build();
}

@Override
public final void load(LazyBlock lazyBlock)
public Block block(Page sourcePage)
{
checkState(!loaded, "Already loaded");
checkState(batchId == expectedBatchId);
return new RunLengthEncodedBlock(nullBlock, sourcePage.getPositionCount());
}

try {
Block block = recordReader.readBlock(columnIndex);
lazyBlock.setBlock(block);
}
catch (OrcCorruptionException e) {
throw new PrestoException(HIVE_BAD_DATA, e);
}
catch (IOException | RuntimeException e) {
throw new PrestoException(HIVE_CURSOR_ERROR, format("Failed to read ORC file: %s", orcDataSource.getId()), e);
}
@Override
public String toString()
{
return toStringHelper(this)
.add("type", type)
.toString();
}
}

private static class SourceColumn
implements ColumnAdaptation
{
private final int index;

public SourceColumn(int index)
{
checkArgument(index >= 0, "index is negative");
this.index = index;
}

loaded = true;
@Override
public Block block(Page sourcePage)
{
return sourcePage.getBlock(index);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("index", index)
.toString();
}
}
}
Loading