Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions plugin/trino-raptor-legacy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@
<artifactId>trino-collect</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hadoop-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
Expand All @@ -43,16 +38,6 @@
<artifactId>trino-plugin-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
</dependency>

<dependency>
<groupId>io.trino.hive</groupId>
<artifactId>hive-apache</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,173 +13,151 @@
*/
package io.trino.plugin.raptor.legacy.storage;

import com.google.common.collect.Maps;
import io.airlift.log.Logger;
import io.trino.hive.orc.NullMemoryManager;
import io.trino.plugin.raptor.legacy.util.Closer;
import io.trino.plugin.raptor.legacy.util.SyncingFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import io.airlift.slice.Slice;
import io.trino.orc.FileOrcDataSource;
import io.trino.orc.OrcPredicate;
import io.trino.orc.OrcReader;
import io.trino.orc.OrcReaderOptions;
import io.trino.orc.OrcRecordReader;
import io.trino.orc.OrcWriter;
import io.trino.plugin.raptor.legacy.metadata.ColumnInfo;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.joda.time.DateTimeZone;

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.List;
import java.util.Map;

import static io.airlift.slice.SizeOf.SIZE_OF_BYTE;
import static io.airlift.slice.SizeOf.SIZE_OF_DOUBLE;
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.Duration.nanosSince;
import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration;
import static io.trino.plugin.raptor.legacy.util.Closer.closer;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
import static io.trino.orc.OrcReader.createOrcReader;
import static io.trino.plugin.raptor.legacy.storage.OrcFileWriter.createOrcFileWriter;
import static io.trino.plugin.raptor.legacy.storage.RaptorStorageManager.getColumnInfo;
import static java.lang.Math.toIntExact;
import static org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader;
import static org.apache.hadoop.hive.ql.io.orc.OrcFile.createWriter;
import static org.apache.hadoop.hive.ql.io.orc.OrcUtil.getFieldValue;

public final class OrcFileRewriter
{
private static final Logger log = Logger.get(OrcFileRewriter.class);
private static final Configuration CONFIGURATION = newEmptyConfiguration();

private OrcFileRewriter() {}

public static OrcFileInfo rewrite(File input, File output, BitSet rowsToDelete)
public static OrcFileInfo rewrite(TypeManager typeManager, File input, File output, BitSet rowsToDelete)
throws IOException
{
FileSystem fileSystem = new SyncingFileSystem(CONFIGURATION);
Reader reader = createReader(fileSystem, path(input));
OrcReaderOptions options = new OrcReaderOptions();
OrcReader reader = createOrcReader(new FileOrcDataSource(input, options), options)
.orElseThrow(() -> new IOException("File is empty: " + input));
return rewrite(typeManager, reader, output, rowsToDelete);
}

public static OrcFileInfo rewrite(TypeManager typeManager, OrcReader reader, File output, BitSet rowsToDelete)
throws IOException
{
long start = System.nanoTime();

List<ColumnInfo> columnInfo = getColumnInfo(typeManager, reader);

List<String> columnNames = columnInfo.stream()
.map(info -> String.valueOf(info.getColumnId()))
.collect(toImmutableList());

if (reader.getNumberOfRows() < rowsToDelete.length()) {
List<Type> columnTypes = columnInfo.stream()
.map(ColumnInfo::getType)
.collect(toImmutableList());

OrcRecordReader recordReader = reader.createRecordReader(
reader.getRootColumn().getNestedColumns(),
columnTypes,
OrcPredicate.TRUE,
DateTimeZone.UTC,
newSimpleAggregatedMemoryContext(),
INITIAL_BATCH_SIZE,
RaptorPageSource::handleException);

long fileRowCount = recordReader.getFileRowCount();
if (fileRowCount < rowsToDelete.length()) {
throw new IOException("File has fewer rows than deletion vector");
}
int deleteRowCount = rowsToDelete.cardinality();
if (reader.getNumberOfRows() == deleteRowCount) {
if (fileRowCount == deleteRowCount) {
return new OrcFileInfo(0, 0);
}
if (reader.getNumberOfRows() >= Integer.MAX_VALUE) {
if (fileRowCount >= Integer.MAX_VALUE) {
throw new IOException("File has too many rows");
}
int inputRowCount = toIntExact(reader.getNumberOfRows());

WriterOptions writerOptions = OrcFile.writerOptions(CONFIGURATION)
.memory(new NullMemoryManager())
.fileSystem(fileSystem)
.compress(reader.getCompression())
.inspector(reader.getObjectInspector());
Map<String, String> metadata = Maps.transformValues(reader.getFooter().getUserMetadata(), Slice::toStringUtf8);

long start = System.nanoTime();
try (Closer<RecordReader, IOException> recordReader = closer(reader.rows(), RecordReader::close);
Closer<Writer, IOException> writer = closer(createWriter(path(output), writerOptions), Writer::close)) {
if (reader.hasMetadataValue(OrcFileMetadata.KEY)) {
ByteBuffer orcFileMetadata = reader.getMetadataValue(OrcFileMetadata.KEY);
writer.get().addUserMetadata(OrcFileMetadata.KEY, orcFileMetadata);
}
OrcFileInfo fileInfo = rewrite(recordReader.get(), writer.get(), rowsToDelete, inputRowCount);
log.debug("Rewrote file %s in %s (input rows: %s, output rows: %s)", input.getName(), nanosSince(start), inputRowCount, inputRowCount - deleteRowCount);
return fileInfo;
OrcFileInfo fileInfo;
try (OrcWriter writer = createOrcFileWriter(output, columnNames, columnTypes, reader.getFooter().getTypes(), metadata)) {
fileInfo = rewrite(recordReader, writer, rowsToDelete);
}
log.debug("Rewrote file in %s (input rows: %s, output rows: %s)", nanosSince(start), fileRowCount, fileRowCount - deleteRowCount);
return fileInfo;
}

private static OrcFileInfo rewrite(RecordReader reader, Writer writer, BitSet rowsToDelete, int inputRowCount)
private static OrcFileInfo rewrite(OrcRecordReader reader, OrcWriter writer, BitSet rowsToDelete)
throws IOException
{
Object object = null;
int row = 0;
long rowCount = 0;
long uncompressedSize = 0;

row = rowsToDelete.nextClearBit(row);
if (row < inputRowCount) {
reader.seekToRow(row);
}

while (row < inputRowCount) {
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException();
}

// seekToRow() is extremely expensive
if (reader.getRowNumber() < row) {
reader.next(object);
continue;
Page page = reader.nextPage();
if (page == null) {
break;
}

object = reader.next(object);
writer.addRow(object);
rowCount++;
uncompressedSize += uncompressedSize(object);
int start = toIntExact(reader.getFilePosition());
page = maskedPage(page, rowsToDelete, start);
writer.write(page);

row = rowsToDelete.nextClearBit(row + 1);
rowCount += page.getPositionCount();
uncompressedSize += page.getLogicalSizeInBytes();
}
return new OrcFileInfo(rowCount, uncompressedSize);
}

private static Path path(File input)
{
return new Path(input.toURI());
return new OrcFileInfo(rowCount, uncompressedSize);
}

private static int uncompressedSize(Object object)
throws IOException
private static Page maskedPage(Page page, BitSet rowsToDelete, int start)
{
if (object instanceof OrcStruct) {
OrcStruct struct = (OrcStruct) object;
int size = 0;
for (int i = 0; i < struct.getNumFields(); i++) {
size += uncompressedSize(getFieldValue(struct, i));
}
return size;
}
if ((object == null) || (object instanceof BooleanWritable)) {
return SIZE_OF_BYTE;
}
if (object instanceof LongWritable) {
return SIZE_OF_LONG;
int end = start + page.getPositionCount();
if (rowsToDelete.nextSetBit(start) >= end) {
return page;
}
if (object instanceof DoubleWritable) {
return SIZE_OF_DOUBLE;
if (rowsToDelete.nextClearBit(start) >= end) {
return page.copyPositions(new int[0], 0, 0);
}
if (object instanceof HiveDecimalWritable) {
return SIZE_OF_LONG;
}
if (object instanceof Text) {
return ((Text) object).getLength();
}
if (object instanceof BytesWritable) {
return ((BytesWritable) object).getLength();
}
if (object instanceof List<?>) {
int size = 0;
for (Object element : (Iterable<?>) object) {
size += uncompressedSize(element);

int[] ids = new int[page.getPositionCount()];
int size = 0;
for (int i = 0; i < ids.length; i++) {
if (!rowsToDelete.get(start + i)) {
ids[size] = i;
size++;
}
return size;
}
if (object instanceof Map<?, ?>) {
int size = 0;
for (Map.Entry<?, ?> entry : ((Map<?, ?>) object).entrySet()) {
size += uncompressedSize(entry.getKey());
size += uncompressedSize(entry.getValue());
}
return size;

Block[] maskedBlocks = new Block[page.getChannelCount()];
for (int i = 0; i < maskedBlocks.length; i++) {
maskedBlocks[i] = DictionaryBlock.create(size, page.getBlock(i), ids);
}
throw new IOException("Unhandled ORC object: " + object.getClass().getName());
return new Page(maskedBlocks);
}

public static class OrcFileInfo
Expand Down
Loading