Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -456,7 +457,7 @@ private static void validateFlinkTable(CatalogBaseTable table) {

TableSchema schema = table.getSchema();
schema.getTableColumns().forEach(column -> {
if (column.isGenerated()) {
if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

package org.apache.iceberg.flink.actions;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.iceberg.Table;

public class Actions {

public static final Configuration CONFIG = new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

private StreamExecutionEnvironment env;
private Table table;

Expand All @@ -37,7 +43,7 @@ public static Actions forTable(StreamExecutionEnvironment env, Table table) {
}

public static Actions forTable(Table table) {
return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), table);
return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this.

}

public RewriteDataFilesAction rewriteDataFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
Expand All @@ -45,6 +44,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -101,7 +101,7 @@ public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();

DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
return builderFor(input, rowConverter::toInternal, RowDataTypeInfo.of(rowType))
return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType))
.tableSchema(tableSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
Expand All @@ -40,6 +39,7 @@
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkTableOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

Expand Down Expand Up @@ -204,7 +204,7 @@ public DataStream<RowData> build() {
FlinkInputFormat format = buildFormat();

ScanContext context = contextBuilder.build();
TypeInformation<RowData> typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project()));
TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));

if (!context.isStreaming()) {
int parallelism = inferParallelism(format, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.util;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

/**
* This is a small util class that try to hide calls to Flink
* Internal or PublicEvolve interfaces as Flink can change
* those APIs during minor version release.
*/
public class FlinkCompatibilityUtil {

private FlinkCompatibilityUtil() {
}

public static TypeInformation<RowData> toTypeInfo(RowType rowType) {
return InternalTypeInfo.of(rowType);
}

public static boolean isPhysicalColumn(TableColumn column) {
return column.isPhysical();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.flink;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.flink.util.ArrayUtils;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -28,6 +29,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.After;
Expand Down Expand Up @@ -119,6 +121,12 @@ protected String warehouseRoot() {
}
}

protected String getFullQualifiedTableName(String tableName) {
final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
levels.add(tableName);
return Joiner.on('.').join(levels);
}

static String getURI(HiveConf conf) {
return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
}
Expand Down
31 changes: 15 additions & 16 deletions flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
package org.apache.iceberg.flink;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -34,8 +34,17 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;

public abstract class FlinkTestBase extends AbstractTestBase {
public abstract class FlinkTestBase extends TestBaseUtils {

@ClassRule
public static MiniClusterWithClientResource miniClusterResource =
MiniClusterResource.createWithClassloaderCheckDisabled();

@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private static TestHiveMetastore metastore = null;
protected static HiveConf hiveConf = null;
Expand Down Expand Up @@ -87,25 +96,15 @@ protected TableResult exec(String query, Object... args) {

protected List<Object[]> sql(String query, Object... args) {
TableResult tableResult = exec(query, args);

tableResult.getJobClient().ifPresent(c -> {
try {
c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

List<Object[]> results = Lists.newArrayList();
try (CloseableIterator<Row> iter = tableResult.collect()) {
List<Object[]> results = Lists.newArrayList();
while (iter.hasNext()) {
Row row = iter.next();
results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
}
return results;
} catch (Exception e) {
throw new RuntimeException(e);
throw new RuntimeException("Failed to collect table result", e);
}

return results;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;

public class MiniClusterResource {

private static final int DEFAULT_TM_NUM = 1;
private static final int DEFAULT_PARALLELISM = 4;

public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG = new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

private MiniClusterResource() {

}

/**
* It will start a mini cluster with classloader.check-leaked-classloader=false,
* so that we won't break the unit tests because of the class loader leak issue.
* In our iceberg integration tests, there're some that will assert the results
* after finished the flink jobs, so actually we may access the class loader
* that has been closed by the flink task managers if we enable the switch
* classloader.check-leaked-classloader by default.
*/
public static MiniClusterWithClientResource createWithClassloaderCheckDisabled() {
return new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(DEFAULT_TM_NUM)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
}

}
Loading