Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.DateType;
Expand Down Expand Up @@ -46,8 +48,11 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.transforms.Transform;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -72,6 +77,8 @@
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

public class IcebergPageSink
implements ConnectorPageSink
Expand Down Expand Up @@ -124,7 +131,9 @@ public IcebergPageSink(
this.session = requireNonNull(session, "session is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.maxOpenWriters = maxOpenWriters;
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec));
this.pagePartitioner = new PagePartitioner(pageIndexerFactory,
toPartitionColumns(inputColumns, partitionSpec),
session);
}

@Override
Expand Down Expand Up @@ -281,13 +290,14 @@ private int[] getWriterIndexes(Page page)
}

// create missing writers
Page transformedPage = pagePartitioner.getTransformedPage();
for (int position = 0; position < page.getPositionCount(); position++) {
int writerIndex = writerIndexes[position];
if (writers.get(writerIndex) != null) {
continue;
}

Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), page, position);
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), transformedPage, position);
WriteContext writer = createWriter(partitionData);

writers.set(writerIndex, writer);
Expand Down Expand Up @@ -315,7 +325,7 @@ private WriteContext createWriter(Optional<PartitionData> partitionData)
return new WriteContext(writer, outputPath, partitionData);
}

private Optional<PartitionData> getPartitionData(List<PartitionColumn> columns, Page page, int position)
private Optional<PartitionData> getPartitionData(List<PartitionColumn> columns, Page transformedPage, int position)
{
if (columns.isEmpty()) {
return Optional.empty();
Expand All @@ -324,21 +334,13 @@ private Optional<PartitionData> getPartitionData(List<PartitionColumn> columns,
Object[] values = new Object[columns.size()];
for (int i = 0; i < columns.size(); i++) {
PartitionColumn column = columns.get(i);
Block block = page.getBlock(column.getSourceChannel());
Type type = column.getSourceType();
org.apache.iceberg.types.Type icebergType = outputSchema.findType(column.getField().sourceId());
Object value = getIcebergValue(block, position, type);
values[i] = applyTransform(column.getField().transform(), icebergType, value);
Block block = transformedPage.getBlock(i);
Type type = column.getResultType();
values[i] = getIcebergValue(block, position, type);
}
return Optional.of(new PartitionData(values));
}

@SuppressWarnings("unchecked")
private static Object applyTransform(Transform<?, ?> transform, org.apache.iceberg.types.Type icebergType, Object value)
{
return ((Transform<Object, Object>) transform).bind(icebergType).apply(value);
}

public static Object getIcebergValue(Block block, int position, Type type)
{
if (block.isNull(position)) {
Expand Down Expand Up @@ -379,6 +381,23 @@ public static Object getIcebergValue(Block block, int position, Type type)
throw new UnsupportedOperationException("Type not supported as partition column: " + type.getDisplayName());
}

private static Object adjustTimestampForPartitionTransform(SqlFunctionProperties functionProperties, Type type, Object value)
{
if (type instanceof TimestampType && functionProperties.isLegacyTimestamp()) {
long timestampValue = (long) value;
TimestampType timestampType = (TimestampType) type;
Instant instant = Instant.ofEpochSecond(timestampType.getPrecision().toSeconds(timestampValue),
timestampType.getPrecision().toNanos(timestampValue % timestampType.getPrecision().convert(1, SECONDS)));
LocalDateTime localDateTime = instant
.atZone(ZoneId.of(functionProperties.getTimeZoneKey().getId()))
.toLocalDateTime();

return timestampType.getPrecision().convert(localDateTime.toEpochSecond(ZoneOffset.UTC), SECONDS) +
timestampType.getPrecision().convert(localDateTime.getNano(), NANOSECONDS);
}
return value;
}

private static List<PartitionColumn> toPartitionColumns(List<IcebergColumnHandle> handles, PartitionSpec partitionSpec)
{
Map<Integer, Integer> idChannels = new HashMap<>();
Expand Down Expand Up @@ -430,26 +449,36 @@ private static class PagePartitioner
{
private final PageIndexer pageIndexer;
private final List<PartitionColumn> columns;
private final ConnectorSession session;
private Page transformedPage;

public PagePartitioner(PageIndexerFactory pageIndexerFactory, List<PartitionColumn> columns)
public PagePartitioner(PageIndexerFactory pageIndexerFactory,
List<PartitionColumn> columns,
ConnectorSession session)
{
this.pageIndexer = pageIndexerFactory.createPageIndexer(columns.stream()
.map(PartitionColumn::getResultType)
.collect(toImmutableList()));
this.columns = ImmutableList.copyOf(columns);
this.session = session;
}

public int[] partitionPage(Page page)
{
Block[] blocks = new Block[columns.size()];
for (int i = 0; i < columns.size(); i++) {
PartitionColumn column = columns.get(i);
Block block = page.getBlock(column.getSourceChannel());
Block block = adjustBlockIfNecessary(column, page.getBlock(column.getSourceChannel()));
blocks[i] = column.getBlockTransform().apply(block);
}
Page transformed = new Page(page.getPositionCount(), blocks);
this.transformedPage = new Page(page.getPositionCount(), blocks);

return pageIndexer.indexPage(transformedPage);
}

return pageIndexer.indexPage(transformed);
public Page getTransformedPage()
{
return this.transformedPage;
}

public int getMaxIndex()
Expand All @@ -461,6 +490,29 @@ public List<PartitionColumn> getColumns()
{
return columns;
}

private Block adjustBlockIfNecessary(PartitionColumn column, Block block)
{
// adjust legacy timestamp value to compatible with Iceberg non-identity transform calculation
if (column.sourceType instanceof TimestampType && session.getSqlFunctionProperties().isLegacyTimestamp() && !column.getField().transform().isIdentity()) {
TimestampType timestampType = (TimestampType) column.sourceType;
BlockBuilder blockBuilder = timestampType.createBlockBuilder(null, block.getPositionCount());
for (int t = 0; t < block.getPositionCount(); t++) {
if (block.isNull(t)) {
blockBuilder.appendNull();
}
else {
long adjustedTimestampValue = (long) adjustTimestampForPartitionTransform(
session.getSqlFunctionProperties(),
timestampType,
timestampType.getLong(block, t));
timestampType.writeLong(blockBuilder, adjustedTimestampValue);
}
}
return blockBuilder.build();
}
return block;
}
}

private static class PartitionColumn
Expand Down
Loading