From c3426f20d13b77011942705edecedda72ad72a9a Mon Sep 17 00:00:00 2001 From: dylanhz Date: Fri, 26 Dec 2025 20:43:00 +0800 Subject: [PATCH] [FLINK-38841][table] Enforce expected nullability in FamilyArgumentStrategy --- .../strategies/FamilyArgumentTypeStrategy.java | 5 ++++- .../types/inference/InputTypeStrategiesTest.java | 12 +++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java index 0847036504078..e707606ff6f02 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java @@ -86,13 +86,16 @@ public Optional inferArgumentType( if (Objects.equals(expectedNullability, Boolean.FALSE) && actualType.isNullable()) { return callContext.fail( throwOnFailure, - "Unsupported argument type. Expected nullable type of family '%s' but actual type was '%s'.", + "Unsupported argument type. Expected NOT NULL type of family '%s' but actual type was '%s'.", expectedFamily, actualType); } // type is part of the family if (actualType.getTypeRoot().getFamilies().contains(expectedFamily)) { + if (Objects.equals(expectedNullability, Boolean.TRUE) && !actualType.isNullable()) { + return Optional.of(actualDataType.nullable()); + } return Optional.of(actualDataType); } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java index e6973c3e0d0a7..5ca7f45f80749 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java @@ -549,23 +549,33 @@ ANY, explicit(DataTypes.INT()) sequence( logical(LogicalTypeFamily.CHARACTER_STRING, true), logical(LogicalTypeFamily.EXACT_NUMERIC), + logical(LogicalTypeFamily.EXACT_NUMERIC, true), logical(LogicalTypeFamily.APPROXIMATE_NUMERIC), logical(LogicalTypeFamily.APPROXIMATE_NUMERIC), logical(LogicalTypeFamily.APPROXIMATE_NUMERIC, false))) .calledWithArgumentTypes( DataTypes.NULL(), DataTypes.TINYINT(), + DataTypes.SMALLINT().notNull(), DataTypes.INT(), DataTypes.BIGINT().notNull(), DataTypes.DECIMAL(10, 2).notNull()) .expectSignature( - "f(, , , , )") + "f(, , , , , )") .expectArgumentTypes( DataTypes.VARCHAR(1), DataTypes.TINYINT(), + DataTypes.SMALLINT(), DataTypes.DOUBLE(), // widening with preserved nullability DataTypes.DOUBLE().notNull(), // widening with preserved nullability DataTypes.DOUBLE().notNull()), + TestSpec.forStrategy( + "Logical type family with invalid nullability", + sequence(logical(LogicalTypeFamily.EXACT_NUMERIC, false))) + .calledWithArgumentTypes(DataTypes.INT()) + .expectSignature("f()") + .expectErrorMessage( + "Unsupported argument type. Expected NOT NULL type of family 'EXACT_NUMERIC' but actual type was 'INT'."), TestSpec.forStrategy( "Logical type family with invalid type", sequence(logical(LogicalTypeFamily.EXACT_NUMERIC)))