Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import org.apache.iceberg.Schema;
import org.apache.iceberg.types.FixupTypes;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

/**
* The uuid and fixed are converted to the same Flink type. Conversion back can produce only one,
* which may not be correct.
*/
class FlinkFixupTypes extends FixupTypes {

private FlinkFixupTypes(Schema referenceSchema) {
super(referenceSchema);
}

static Schema fixup(Schema schema, Schema referenceSchema) {
return new Schema(TypeUtil.visit(schema,
new FlinkFixupTypes(referenceSchema)).asStructType().fields());
}

@Override
protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) {
if (type instanceof Types.FixedType) {
int length = ((Types.FixedType) type).length();
return source.typeId() == Type.TypeID.UUID && length == 16;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a discussion in #1302 for removing UUID. Is this a temporary solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

}
return false;
}
}
23 changes: 23 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

/**
* Converter between Flink types and Iceberg type.
Expand Down Expand Up @@ -63,6 +64,28 @@ public static Schema convert(TableSchema schema) {
return new Schema(converted.asStructType().fields());
}

/**
* Convert a Flink {@link TableSchema} to a {@link Schema} based on the given schema.
* <p>
* This conversion does not assign new ids; it uses ids from the base schema.
* <p>
* Data types, field order, and nullability will match the Flink type. This conversion may return
* a schema that is not compatible with base schema.
*
* @param baseSchema a Schema on which conversion is based
* @param flinkSchema a Flink TableSchema
* @return the equivalent Schema
* @throws IllegalArgumentException if the type cannot be converted or there are missing ids
*/
public static Schema convert(Schema baseSchema, TableSchema flinkSchema) {
// convert to a type with fresh ids
Types.StructType struct = convert(flinkSchema).asStruct();
// reassign ids to match the base schema
Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema);
// fix types that can't be represented in Flink (UUID)
return FlinkFixupTypes.fixup(schema, baseSchema);
}

/**
* Convert a {@link Schema} to a {@link RowType Flink type}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,14 @@
public class FlinkOrcReader implements OrcRowReader<RowData> {
private final OrcValueReader<?> reader;

private FlinkOrcReader(Schema iSchema, TypeDescription readSchema) {
public FlinkOrcReader(Schema iSchema, TypeDescription readSchema) {
this(iSchema, readSchema, ImmutableMap.of());
}

private FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map<Integer, ?> idToConstant) {
public FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map<Integer, ?> idToConstant) {
this.reader = OrcSchemaWithTypeVisitor.visit(iSchema, readSchema, new ReadBuilder(idToConstant));
}

public static OrcRowReader<RowData> buildReader(Schema schema, TypeDescription readSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this not used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there are two constructors, so we need to have two static helpers, I think we can use constructors directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just used by testing code.

return new FlinkOrcReader(schema, readSchema);
}

@Override
public RowData read(VectorizedRowBatch batch, int row) {
return (RowData) reader.read(new StructColumnVector(batch.size, batch.cols), row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -127,6 +128,11 @@ private static class Decimal18Reader implements OrcValueReader<DecimalData> {
@Override
public DecimalData nonNullRead(ColumnVector vector, int row) {
HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row];

// The hive ORC writer may will adjust the scale of decimal data.
Preconditions.checkArgument(value.precision() <= precision,
"Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Flink require this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to add this check to avoid potential precision mismatched bugs.


return DecimalData.fromUnscaledLong(value.serialize64(scale), precision, scale);
}
}
Expand All @@ -143,6 +149,10 @@ private static class Decimal38Reader implements OrcValueReader<DecimalData> {
@Override
public DecimalData nonNullRead(ColumnVector vector, int row) {
BigDecimal value = ((DecimalColumnVector) vector).vector[row].getHiveDecimal().bigDecimalValue();

Preconditions.checkArgument(value.precision() <= precision,
"Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value);

return DecimalData.fromBigDecimal(value, precision, scale);
}
}
Expand Down Expand Up @@ -246,7 +256,7 @@ private static class StructReader extends OrcValueReaders.StructReader<RowData>

StructReader(List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
this.numFields = readers.size();
this.numFields = struct.fields().size();
Copy link
Contributor Author

@JingsongLi JingsongLi Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes Flink Orc Reader (with partition) bug.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, if the schema is a projected read schema, then the numFields will be mismatched.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

class FlinkParquetReaders {
public class FlinkParquetReaders {
private FlinkParquetReaders() {
}

Expand Down
140 changes: 140 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, you put all reader related classes inside the source package, will we also need to put those writer related classes into sink package ? I don't have strong feeling to do that, keeping consistence is OK for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we also need to put those writer related classes into sink package ?

I think we can.


import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;

/**
* Base class of Flink iterators.
*
* @param <T> is the Java class returned by this iterator whose objects contain one or more rows.
*/
abstract class DataIterator<T> implements CloseableIterator<T> {

private Iterator<FileScanTask> tasks;
private final FileIO io;
private final EncryptionManager encryption;

private CloseableIterator<T> currentIterator;

DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) {
this.tasks = task.files().iterator();
this.io = io;
this.encryption = encryption;
this.currentIterator = CloseableIterator.empty();
}

InputFile getInputFile(FileScanTask task) {
Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
return encryption.decrypt(EncryptedFiles.encryptedInput(
io.newInputFile(task.file().path().toString()),
task.file().keyMetadata()));
}

@Override
public boolean hasNext() {
updateCurrentIterator();
return currentIterator.hasNext();
}

@Override
public T next() {
updateCurrentIterator();
return currentIterator.next();
}

/**
* Updates the current iterator field to ensure that the current Iterator
* is not exhausted.
*/
private void updateCurrentIterator() {
try {
while (!currentIterator.hasNext() && tasks.hasNext()) {
currentIterator.close();
currentIterator = openTaskIterator(tasks.next());
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

abstract CloseableIterator<T> openTaskIterator(FileScanTask scanTask) throws IOException;

@Override
public void close() throws IOException {
// close the current iterator
currentIterator.close();
tasks = null;
}

static Object convertConstant(Type type, Object value) {
if (value == null) {
return null;
}

switch (type.typeId()) {
case DECIMAL: // DecimalData
Types.DecimalType decimal = (Types.DecimalType) type;
return DecimalData.fromBigDecimal((BigDecimal) value, decimal.precision(), decimal.scale());
case STRING: // StringData
if (value instanceof Utf8) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: is it possible that we will step into this if block ? I saw PartitionData will transform the Utf8 to String ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is good to keep it safe.

Utf8 utf8 = (Utf8) value;
return StringData.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
}
return StringData.fromString(value.toString());
case FIXED: // byte[]
if (value instanceof byte[]) {
return value;
} else if (value instanceof GenericData.Fixed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here, would it be possible that the value is a GenericData.Fixed or ByteBuffer ? At least the PartitionData will tranform the ByteBuffer to byte[] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PartitionData will convert byte[] back to ByteBuffer..

if (data[pos] instanceof byte[]) {

I think it is good to keep it safe.

return ((GenericData.Fixed) value).bytes();
}
return ByteBuffers.toByteArray((ByteBuffer) value);
case BINARY: // byte[]
return ByteBuffers.toByteArray((ByteBuffer) value);
case TIME: // int mills instead of long
return (int) ((Long) value / 1000);
case TIMESTAMP: // TimestampData
return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value));
default:
}
return value;
}
}
Loading