Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, L
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec)
throws PartitionNotExistException, CatalogException {
return null;
throw new PartitionNotExistException(getName(), tablePath, catalogPartitionSpec);
}

@Override
Expand Down Expand Up @@ -409,7 +409,7 @@ public List<String> listFunctions(String databaseName) throws DatabaseNotExistEx

@Override
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
return null;
throw new FunctionNotExistException(getName(), functionPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.utils.TestConfigurations.catalog;
import static org.apache.hudi.utils.TestConfigurations.sql;
import static org.apache.hudi.utils.TestData.assertRowsEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -1192,6 +1193,47 @@ void testParquetComplexNestedRowTypes(String operation) {
assertRowsEquals(result, expected);
}

@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testBuiltinFunctionWithCatalog(String operation) {
TableEnvironment tableEnv = streamTableEnv;

String hudiCatalogDDL = catalog("hudi_" + operation)
.catalogPath(tempFile.getAbsolutePath())
.end();

tableEnv.executeSql(hudiCatalogDDL);
tableEnv.executeSql("use catalog " + ("hudi_" + operation));

String dbName = "hudi";
tableEnv.executeSql("create database " + dbName);
tableEnv.executeSql("use " + dbName);

String hoodieTableDDL = sql("t1")
.field("f_int int")
.field("f_date DATE")
.pkField("f_int")
.partitionField("f_int")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/" + dbName + "/" + operation)
.option(FlinkOptions.OPERATION, operation)
.end();
tableEnv.executeSql(hoodieTableDDL);

String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02')), (2, DATE '2022-02-02')";
execInsertSql(tableEnv, insertSql);

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "+I[1, 2022-02-02], "
+ "+I[2, 2022-02-02]]";
assertRowsEquals(result, expected);

List<Row> partitionResult = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where f_int = 1").execute().collect());
assertRowsEquals(partitionResult, "[+I[1, 2022-02-02]]");
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ private TestConfigurations() {
.map(RowType.RowField::asSummaryString).collect(Collectors.toList());

public static final DataType ROW_DATA_TYPE_WIDER = DataTypes.ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("salary", DataTypes.DOUBLE()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("salary", DataTypes.DOUBLE()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
.notNull();

public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType();
Expand Down Expand Up @@ -112,6 +112,15 @@ public static String getCreateHoodieTableDDL(
return builder.toString();
}

public static String getCreateHudiCatalogDDL(final String catalogName, final String catalogPath) {
StringBuilder builder = new StringBuilder();
builder.append("create catalog ").append(catalogName).append(" with (\n");
builder.append(" 'type' = 'hudi',\n"
+ " 'catalog.path' = '").append(catalogPath).append("'");
builder.append("\n)");
return builder.toString();
}

public static String getFileSourceDDL(String tableName) {
return getFileSourceDDL(tableName, "test_source.data");
}
Expand Down Expand Up @@ -222,6 +231,10 @@ public static Sql sql(String tableName) {
return new Sql(tableName);
}

public static Catalog catalog(String catalogName) {
return new Catalog(catalogName);
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down Expand Up @@ -285,4 +298,22 @@ public String end() {
this.withPartition, this.pkField, this.partitionField);
}
}

public static class Catalog {
private final String catalogName;
private String catalogPath = ".";

public Catalog(String catalogName) {
this.catalogName = catalogName;
}

public Catalog catalogPath(String catalogPath) {
this.catalogPath = catalogPath;
return this;
}

public String end() {
return TestConfigurations.getCreateHudiCatalogDDL(catalogName, catalogPath);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf = (Configuration) helper.getOptions();
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should be not empty.")));
return new ContinuousFileSource(context.getCatalogTable().getSchema(), path, conf);
return new ContinuousFileSource(context.getCatalogTable().getResolvedSchema(), path, conf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

package org.apache.hudi.utils.source;

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand Down Expand Up @@ -59,12 +59,12 @@
*/
public class ContinuousFileSource implements ScanTableSource {

private final TableSchema tableSchema;
private final ResolvedSchema tableSchema;
private final Path path;
private final Configuration conf;

public ContinuousFileSource(
TableSchema tableSchema,
ResolvedSchema tableSchema,
Path path,
Configuration conf) {
this.tableSchema = tableSchema;
Expand All @@ -83,7 +83,7 @@ public boolean isBounded() {

@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
final RowType rowType = (RowType) tableSchema.toSourceRowDataType().getLogicalType();
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
Expand Down Expand Up @@ -178,7 +178,7 @@ private void loadDataBuffer() {
}

@Override
public void notifyCheckpointComplete(long l) throws Exception {
public void notifyCheckpointComplete(long l) {
this.currentCP.incrementAndGet();
}
}
Expand Down