diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 4978ed5e011c5..4485df6ecf7f1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -68,6 +68,7 @@ import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; @@ -234,6 +235,18 @@ public TableSchema getTableSchema() { return schema; } + @Override + public String explainSource() { + final String filterString = filters.stream() + .map(Expression::asSummaryString) + .collect(Collectors.joining(",")); + return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + + (requiredPartitions == null ? "" : ", requiredPartition=" + requiredPartitions) + + (requiredPos == null ? "" : ", requiredPos=" + Arrays.toString(requiredPos)) + + (limit == -1 ? "" : ", limit=" + limit) + + (filters.size() == 0 ? "" : ", filters=" + filterString); + } + @Override public DataType getProducedDataType() { String[] schemaFieldNames = this.schema.getFieldNames(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index cf99e8b224aa8..8b39ee74e04e3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -171,9 +171,16 @@ void testWriteAndRead(ExecMode execMode) { execInsertSql(tableEnv, insertInto); - List rows = CollectionUtil.iterableToList( + List result1 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); - assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + // apply filters + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect()); + assertRowsEquals(result2, "[" + + "id6,Emma,20,1970-01-01T00:00:06,par3, " + + "id7,Bob,44,1970-01-01T00:00:07,par4, " + + "id8,Han,56,1970-01-01T00:00:08,par4]"); } // -------------------------------------------------------------------------