Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -46,6 +47,7 @@
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
Expand Down Expand Up @@ -119,7 +121,7 @@ private Namespace toNamespace(String database) {
return Namespace.of(namespace);
}

private TableIdentifier toIdentifier(ObjectPath path) {
public TableIdentifier toIdentifier(ObjectPath path) {
return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName());
}

Expand Down Expand Up @@ -278,14 +280,18 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep

@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
try {
Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
Table table = getIcebergTable(tablePath);
TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));

// NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new
// catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
return new CatalogTableImpl(tableSchema, table.properties(), null);
}

// NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new
// catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
return new CatalogTableImpl(tableSchema, table.properties(), null);
Table getIcebergTable(ObjectPath tablePath) throws TableNotExistException {
try {
return icebergCatalog.loadTable(toIdentifier(tablePath));
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new TableNotExistException(getName(), tablePath, e);
}
Expand Down Expand Up @@ -335,6 +341,19 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
throw new UnsupportedOperationException("Not support alterTable now.");
}

CatalogLoader getCatalogLoader() {
return catalogLoader;
}

Configuration getHadoopConf() {
return hadoopConf;
}

@Override
public Optional<TableFactory> getTableFactory() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: The javadoc says it's deprecated now, What's the time for us to use the getFactory in future ?

	 * @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses the
	 *             new table sources and sinks defined in FLIP-95 and a slightly different discovery mechanism.
	 */
	@Deprecated
	default Optional<TableFactory> getTableFactory() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until the new API is really ready...
In the 1.11 and master, FLIP-95 interfaces still lack many things. I am not sure about Flink 1.12, maybe 1.13 is the time.

return Optional.of(new FlinkTableFactory(this));
}

// ------------------------------ Unsupported methods ---------------------------------------------

@Override
Expand Down
51 changes: 51 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import org.apache.iceberg.Schema;
import org.apache.iceberg.types.FixupTypes;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

/**
* The uuid and fixed are converted to the same Flink type. Conversion back can produce only one,
* which may not be correct.
*/
class FlinkFixupTypes extends FixupTypes {

private FlinkFixupTypes(Schema referenceSchema) {
super(referenceSchema);
}

static Schema fixup(Schema schema, Schema referenceSchema) {
return new Schema(TypeUtil.visit(schema,
new FlinkFixupTypes(referenceSchema)).asStructType().fields());
}

@Override
protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) {
if (type instanceof Types.FixedType) {
int length = ((Types.FixedType) type).length();
return source.typeId() == Type.TypeID.UUID && length == 16;
}
return false;
}
}
23 changes: 23 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

/**
* Converter between Flink types and Iceberg type.
Expand Down Expand Up @@ -63,6 +64,28 @@ public static Schema convert(TableSchema schema) {
return new Schema(converted.asStructType().fields());
}

/**
* Convert a Flink {@link TableSchema} to a {@link Schema} based on the given schema.
* <p>
* This conversion does not assign new ids; it uses ids from the base schema.
* <p>
* Data types, field order, and nullability will match the Flink type. This conversion may return
* a schema that is not compatible with base schema.
*
* @param baseSchema a Schema on which conversion is based
* @param flinkSchema a Flink TableSchema
* @return the equivalent Schema
* @throws IllegalArgumentException if the type cannot be converted or there are missing ids
*/
public static Schema convert(Schema baseSchema, TableSchema flinkSchema) {
// convert to a type with fresh ids
Types.StructType struct = convert(flinkSchema).asStruct();
// reassign ids to match the base schema
Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema);
// fix types that can't be represented in Flink (UUID)
return FlinkFixupTypes.fixup(schema, baseSchema);
}

/**
* Convert a {@link Schema} to a {@link RowType Flink type}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.iceberg.flink.source.FlinkTableSource;

/**
* Flink Iceberg table factory to create table source and sink.
* Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
*/
class FlinkTableFactory implements TableSourceFactory<RowData> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, should be in this class too.


private final FlinkCatalog catalog;

FlinkTableFactory(FlinkCatalog catalog) {
this.catalog = catalog;
}

@Override
public Map<String, String> requiredContext() {
throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
}

@Override
public List<String> supportedProperties() {
throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
}

@Override
public TableSource<RowData> createTableSource(Context context) {
ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
try {
return new FlinkTableSource(
catalog.getIcebergTable(objectPath), createTableLoader(objectPath), catalog.getHadoopConf(), tableSchema,
context.getTable().getOptions());
} catch (TableNotExistException e) {
throw new ValidationException(String.format("Iceberg Table(%s) not exist.", objectPath), e);
}
}

private TableLoader createTableLoader(ObjectPath objectPath) {
return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public LogicalType primitive(Type.PrimitiveType primitive) {
case DATE:
return new DateType();
case TIME:
// MICROS
return new TimeType(6);
// Flink only support TimeType with default precision now.
return new TimeType();
case TIMESTAMP:
Types.TimestampType timestamp = (Types.TimestampType) primitive;
if (timestamp.shouldAdjustToUTC()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,14 @@
public class FlinkOrcReader implements OrcRowReader<RowData> {
private final OrcValueReader<?> reader;

private FlinkOrcReader(Schema iSchema, TypeDescription readSchema) {
public FlinkOrcReader(Schema iSchema, TypeDescription readSchema) {
this(iSchema, readSchema, ImmutableMap.of());
}

private FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map<Integer, ?> idToConstant) {
public FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map<Integer, ?> idToConstant) {
this.reader = OrcSchemaWithTypeVisitor.visit(iSchema, readSchema, new ReadBuilder(idToConstant));
}

public static OrcRowReader<RowData> buildReader(Schema schema, TypeDescription readSchema) {
return new FlinkOrcReader(schema, readSchema);
}

@Override
public RowData read(VectorizedRowBatch batch, int row) {
return (RowData) reader.read(new StructColumnVector(batch.size, batch.cols), row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -127,6 +128,11 @@ private static class Decimal18Reader implements OrcValueReader<DecimalData> {
@Override
public DecimalData nonNullRead(ColumnVector vector, int row) {
HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row];

// The hive ORC writer may will adjust the scale of decimal data.
Preconditions.checkArgument(value.precision() <= precision,
"Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value);

return DecimalData.fromUnscaledLong(value.serialize64(scale), precision, scale);
}
}
Expand All @@ -143,6 +149,10 @@ private static class Decimal38Reader implements OrcValueReader<DecimalData> {
@Override
public DecimalData nonNullRead(ColumnVector vector, int row) {
BigDecimal value = ((DecimalColumnVector) vector).vector[row].getHiveDecimal().bigDecimalValue();

Preconditions.checkArgument(value.precision() <= precision,
"Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value);

return DecimalData.fromBigDecimal(value, precision, scale);
}
}
Expand Down Expand Up @@ -246,7 +256,7 @@ private static class StructReader extends OrcValueReaders.StructReader<RowData>

StructReader(List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
this.numFields = readers.size();
this.numFields = struct.fields().size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

class FlinkParquetReaders {
public class FlinkParquetReaders {
private FlinkParquetReaders() {
}

Expand Down
Loading