Skip to content

Add Avro support to Iceberg Connector#4776

Closed
lxynov wants to merge 4 commits intotrinodb:masterfrom
lxynov:iceberg-avro
Closed

Add Avro support to Iceberg Connector#4776
lxynov wants to merge 4 commits intotrinodb:masterfrom
lxynov:iceberg-avro

Conversation

@lxynov
Copy link
Member

@lxynov lxynov commented Aug 11, 2020

Spec

https://iceberg.apache.org/spec/#avro

Values should be stored in Avro using the Avro types and logical type annotations in the table.

Iceberg struct, list, and map types identify nested types by ID. When writing data to Avro files, these IDs must be stored in the Avro schema to support ID-based column pruning.

Implementation

This PR's implementation utilizes Iceberg Avro reader/writer to read/write Iceberg Avro files. Involved Iceberg classes include DataReader, DataWriter, Avro, etc. Class IcebergAvroDataConversion was implemented to convert between Presto data representation and Iceberg Avro data presentation. Presto data presentation refers to data presentation in Blocks. Iceberg Avro data presentation refers to the one used when read/write through DataReader/DataWriter. The table below illustrates the conversion map.

Presto Type Iceberg Type Presto Representation (in Blocks) Iceberg Avro Representation (if read/written through DataReader/DataWriter)
BooleanType.BOOLEAN BOOLEAN boolean boolean
IntegerType.INTEGER INTEGER long int
BigintType.BIGINT LONG long long
RealType.REAL FLOAT Bit representation in int float
DoubleType.DOUBLE DOUBLE Bit representation in long double
ShortDecimalType
LongDecimalType
DECIMAL ShortDecimalType: unscaled value is stored in long
LongDecimalType: unscaled value is encoded and stored in Slice
BigDecimal
VarcharType.VARCHAR STRING Slice String
VarbinaryType BINARY
FIXED(L)
Slice BINARY: ByteBuffer
FIXED(L): byte[]
DateType.DATE DATE Days since Epoch in long LocalDate
TIME_MICROS TIME Picos of day in long LocalTime
TIMESTAMP_MICROS TimestampType.withoutZone() Microseconds since Epoch in long LocalDateTime
TIMESTAMP_TZ_MICROS TimestampType.withZone() LongTimestampWithTimeZone OffsetDateTime
ArrayType LIST Block Collection
MapType MAP Block Map
RowType STRUCT Block Record

Tests and TODOs

This PR doesn't implement additional tests but applies AbstractTestIcebergSmoke and TestSparkCompatibility to Avro format.
There are two test failures.

  1. AbstractTestIcebergSmoke.testCreateNestedPartitionedTable. This test depends on Avro: Fix pruning columns when a logical-map array's value type is nested apache/iceberg#1321. It's not in apache-iceberg-0.9.1 but in apache-iceberg-0.10.0-rc0.
  2. TestSparkCompatibility.testPrestoReadingSparkData. This test doesn't pass due to the handling of TIMESTAMP objects. I wasn't able to get it pass and felt Presto's spec on TIMESTAMP is not really identical to Iceberg's.
    Presto's spec on TIMESTAMP: Instant in time that includes the date and time of day without a time zone with P digits of precision for the fraction of seconds. A precision of up to 12 (picoseconds) is supported. Values of this type are parsed and rendered in the session time zone.
    Iceberg's spec on TIMESTAMP: All time and timestamp values are stored with microsecond precision. Timestamps with time zone represent a point in time: values are stored as UTC and do not retain a source time zone (2017-11-16 17:10:34 PST is stored/retrieved as 2017-11-17 01:10:34 UTC and these values are considered identical). Timestamps without time zone represent a date and time of day regardless of zone: the time value is independent of zone adjustments (2017-11-16 17:10:34 is always retrieved as 2017-11-16 17:10:34). Timestamp values are stored as a long that encodes microseconds from the unix epoch.]

    I feel this PR's implementation is correct and perhaps there's something wrong on the Spark side.

Closes #2298
Part of #1324

Copy link
Member

@phd3 phd3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments (mostly minor), still reviewing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcFileWriter --> IcebergAvroPageSource

}

private IcebergFileWriter createAvroWriter(
String schemaName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the requirement of providing a "tableName" in Avro.WriteBuilder#named() api feels a bit strange to me. However, I was also wondering if we could just pass hdfsContext as an argument here and use the table name from there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We already pass HdfsContext to createParquetWriter()


@Test
public void testHourTransform()
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are changes in this file from a different commit?

import static io.prestosql.plugin.iceberg.util.IcebergAvroDataConversion.serializeToPrestoObject;
import static java.util.Objects.requireNonNull;

public class IcebergAvroPageSource
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to not use RecordPageSource with a cursor implementation? The page building mechanism there is pretty similar.

IIRC using RecordPageSource with an extended record cursor had some performance advantage over using a ConnectorPageSource that is internally row-oriented, but don't remember the details. @dain is that still the case? If so, is the performance difference considerable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, RecordPageSource has special handling in the engine. It has the advantage of not materializing entire column pages if there is filtering. Consider this:

SELECT x WHERE y > 5

With ConnectorPageSource, we materialize entire pages of x even for rows where the y predicate is false. Since we're not using lazy pages, we actually materialize x even if the predicate is false for the entire page (and thus we don't need x at all).

Note that I'm not saying we need to do it this way -- just something to consider.

.build();
}
catch (IOException e) {
throw new PrestoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add file path in the error message for ease of debugging?

else {
unscaledValue = Decimals.decodeUnscaledValue(decimalType.getSlice(block, position));
}
return new BigDecimal(unscaledValue, decimalType.getScale());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use new BigDecimal(unscaledValue, decimalType.getScale(), type.getPrecision()) ? Or may be just Decimals#readBigDecimal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decimals.readBigDecimal() is the best way

return type.getSlice(block, position).toStringUtf8();
}
if (type.equals(VARBINARY)) {
if (icebergType.typeId().equals(FIXED)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you elaborate on what is the reason for this special case?

if (type instanceof MapType) {
Type keyType = type.getTypeParameters().get(0);
Type valueType = type.getTypeParameters().get(1);
org.apache.iceberg.types.Type keyIcebergtype = icebergType.asMapType().keyType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: camel case in keyIcebergType and valueIcebergType

List<Types.NestedField> icebergFields = icebergType.asStructType().fields();
BlockBuilder currentBuilder = builder.beginBlockEntry();
for (int i = 0; i < typeParameters.size(); i++) {
serializeToPrestoObject(typeParameters.get(i), icebergFields.get(1).type(), currentBuilder, record.get(i), timeZoneKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? icebergFields.get(i).type()

Map<?, ?> map = (Map<?, ?>) object;
Type keyType = ((MapType) type).getKeyType();
Type valueType = ((MapType) type).getValueType();
org.apache.iceberg.types.Type keyIcebergtype = icebergType.asMapType().keyType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: camelcase in the name

Copy link
Member

@phd3 phd3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finished reviewing, it looks great, only a couple more comments.

Block rowBlock = block.getObject(position, Block.class);

List<Type> fieldTypes = type.getTypeParameters();
checkCondition(fieldTypes.size() == rowBlock.getPositionCount(), GENERIC_INTERNAL_ERROR, "Expected row value field count does not match type field count");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of relying on a hive module class, may be throw PrestoException here directly?

}
return new BigDecimal(unscaledValue, decimalType.getScale());
}
if (type.equals(VARCHAR)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this cause bounded varchar types to throw an exception? we might want to use instanceof VarcharType right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, need to use instanceof VarcharType here

}
return;
}
if (type.equals(VARCHAR)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about supporting bounded varchar types

@lxynov lxynov mentioned this pull request Jan 9, 2021
@rdsr
Copy link
Member

rdsr commented Jan 20, 2021

@lxynov what's pending. Is there something that we can help with?

@lxynov
Copy link
Member Author

lxynov commented Jan 20, 2021

@lxynov what's pending. Is there something that we can help with?

@rdsr Let me rebase it to Trino master and also address @phd3 's comments so that you can help review

@rdsr
Copy link
Member

rdsr commented Jan 21, 2021

@lxynov what's pending. Is there something that we can help with?

@rdsr Let me rebase it to Trino master and also address @phd3 's comments so that you can help review

Thanks @lxynov !

"(DATE '2015-05-15', 2, NULL, NULL, 4, 5), " +
"(DATE '2020-02-21', 2, NULL, NULL, 6, 7)";
}
if (!columnStatisticsCollected) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these tests will become simpler if we make it similar to how we handle ORC. For example instead of defining columnStatisticsCollected and adding new methods and sublcassing. Could we not just do
if (format == AVRO) and test appropriately?

@lxynov
Copy link
Member Author

lxynov commented Jan 26, 2021

@electrum @rdsr @phd3 Hey I'm thinking of dividing this PR into 3 parts:

  1. Test all file formats in TestSparkCompatibility #6699: clean up TestSparkCompatibility and test both ORC and Parquet in it.
  2. An independent PR that upgrades Iceberg dependency to 0.10.0. io.trino.plugin.iceberg.HiveTableOperations needs to be updated in that PR. It references org.apache.iceberg.hive.HiveTypeConverter which no longer exists in Iceberg 0.10.0. Furthermore, we need to figure out if more updates are needed.
  3. The rest of this PR that adds Avro integration.

Please LMK if you have comments.

@rdsr
Copy link
Member

rdsr commented Jan 26, 2021

@lxynov sounds good to me!

if (closed) {
return;
}
closed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be moved after recordIterator.close()?

import static org.apache.iceberg.util.DateTimeUtil.timestampFromMicros;
import static org.apache.iceberg.util.DateTimeUtil.timestamptzFromMicros;

public final class IcebergAvroDataConversion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there tests that cover this class?

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started reviewing this a while back and had a bunch of pending comments. I'll submit them now -- not sure if they are still relevant after the more recent changes.

ICEBERG_MISSING_DATA(5, EXTERNAL),
ICEBERG_CANNOT_OPEN_SPLIT(6, EXTERNAL),
ICEBERG_WRITER_OPEN_ERROR(7, EXTERNAL),
ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to change the existing error codes?

{
ORC,
PARQUET,
AVRO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add trailing comma


@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPrestoReadingSparkData()
@DataProvider(name = "storage_formats")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can leave off name and have it default to the method name

String baseTableName = "test_spark_reads_presto_partitioned_table";
String prestoTableName = prestoTableName(baseTableName);
onPresto().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'])", prestoTableName));
onPresto().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '" + storageFormat + "')", prestoTableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the existing string formatting instead of concatenation

String baseTableName = "test_spark_reads_presto_partitioned_table";
String sparkTableName = sparkTableName(baseTableName);
onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string)", sparkTableName));
onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string)" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: missing space after last )

private List<Block> columnBlocks;
private List<Type> types;
private List<org.apache.iceberg.types.Type> icebergTypes;
private Schema icebergSchema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

public long getSystemMemoryUsage()
{
//TODO: try to add memory used by recordIterator
return INSTANCE_SIZE + pageBuilder.getRetainedSizeInBytes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could reset the PageBuilder at the end, then we don't need to calculate retained size for it.

@Override
public Page getNextPage()
{
if (closed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could simplify this by removing the closed flag

if (!recordIterator.hasNext()) {
    return null;
}

The engine won't call getNextPage() after closing the page source.

This also allows removing the explicit close() below.

else {
unscaledValue = Decimals.decodeUnscaledValue(decimalType.getSlice(block, position));
}
return new BigDecimal(unscaledValue, decimalType.getScale());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decimals.readBigDecimal() is the best way

}
return new BigDecimal(unscaledValue, decimalType.getScale());
}
if (type.equals(VARCHAR)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, need to use instanceof VarcharType here

@roman-ambinder roman-ambinder mentioned this pull request Jul 7, 2021
11 tasks
@caneGuy
Copy link
Contributor

caneGuy commented Jul 19, 2021

Any progress for this? @lxynov thanks

@rdsr
Copy link
Member

rdsr commented Jul 22, 2021

@lxynov is this patch now split into 3 parts? Or is it safe to use this existing patch? I wanted to backport this to our internal Trino repo

@phd3
Copy link
Member

phd3 commented Jul 22, 2021

@caneGuy I don't think @lxynov is continuing work on this anymore. Feel free to pick it up if you'd like to.

@rdsr FWIW, w.r.t. part-2 in #4776 (comment) , we've upgraded to 0.11.0.

@findepi
Copy link
Member

findepi commented Aug 24, 2021

@jackye1995 is it correct you have picked this up?

@findepi
Copy link
Member

findepi commented Apr 25, 2022

Superseded by @ebyhr in #12125

@findepi findepi closed this Apr 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Allow querying Iceberg table by its location, without registering it in metastore

7 participants