diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java index df20556178133..dc4d47d4a09df 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java @@ -40,6 +40,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -110,8 +111,13 @@ private WindowTableFunctionOperatorBase createAlignedWindowTableFunctionOperator TimeWindowUtil.getShiftTimeZone( windowingStrategy.getTimeAttributeType(), TableConfigUtils.getLocalTimeZone(config)); + final int timestampPrecision = + LogicalTypeChecks.getPrecision(windowingStrategy.getTimeAttributeType()); return new AlignedWindowTableFunctionOperator( - windowAssigner, windowingStrategy.getTimeAttributeIndex(), shiftTimeZone); + windowAssigner, + windowingStrategy.getTimeAttributeIndex(), + timestampPrecision, + shiftTimeZone); } protected abstract Transformation translateWithUnalignedWindow( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java index 28fd8c128ee90..f2bd3cf0a2e5d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java @@ -46,6 +46,7 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -152,12 +153,15 @@ private WindowTableFunctionOperatorBase createUnalignedWindowTableFunctionOperat TimeWindowUtil.getShiftTimeZone( windowingStrategy.getTimeAttributeType(), TableConfigUtils.getLocalTimeZone(config)); + final int timestampPrecision = + LogicalTypeChecks.getPrecision(windowingStrategy.getTimeAttributeType()); return new UnalignedWindowTableFunctionOperator( windowAssigner, windowAssigner.getWindowSerializer(new ExecutionConfig()), new RowDataSerializer(inputRowType), windowingStrategy.getTimeAttributeIndex(), + timestampPrecision, shiftTimeZone); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java index 9d3651f02e604..e0e3afaad6a1b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java @@ -42,6 +42,7 @@ public List programs() { WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF, WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG, WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_POSITIVE_OFFSET, - WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET); + WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET, + WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_UNION_ALL); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java index a4db730e7b5f7..f501054a70c37 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java @@ -115,6 +115,49 @@ public class WindowTableFunctionTestPrograms { + " %s\n" + " GROUP BY window_start, window_end"; + static final String QUERY_TVF_UNION_ALL_VALUES = + "INSERT INTO sink_t SELECT\n" + + " * FROM (\n" + + " WITH values_table AS (\n" + + " SELECT cast('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time\n" + + " UNION ALL\n" + + " SELECT cast('2024-01-01 10:05:00' AS TIMESTAMP_LTZ) AS event_time\n" + + " UNION ALL\n" + + " SELECT cast('2024-01-01 10:10:00' AS TIMESTAMP_LTZ) AS event_time\n" + + " ) SELECT\n" + + " window_start,\n" + + " window_end\n" + + " FROM TABLE(\n" + + " HOP(\n" + + " TABLE values_table,\n" + + " DESCRIPTOR(event_time),\n" + + " INTERVAL '1' MINUTES,\n" + + " INTERVAL '2' MINUTES)\n" + + " ) GROUP BY\n" + + " window_start,\n" + + " window_end\n" + + ")"; + + public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_UNION_ALL = + TableTestProgram.of( + "window-table-function-tumble-tvf-union-all", + "validates window with BinaryRowData using non-compact timestamp precision") + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "window_start TIMESTAMP(3)", "window_end TIMESTAMP(3)") + .consumedBeforeRestore( + "+I[2024-01-01T09:59, 2024-01-01T10:01]", + "+I[2024-01-01T10:00, 2024-01-01T10:02]", + "+I[2024-01-01T10:04, 2024-01-01T10:06]", + "+I[2024-01-01T10:05, 2024-01-01T10:07]", + "+I[2024-01-01T10:09, 2024-01-01T10:11]", + "+I[2024-01-01T10:10, 2024-01-01T10:12]") + .build()) + .setupConfig(TableConfigOptions.LOCAL_TIME_ZONE, "UTC") + .runSql(QUERY_TVF_UNION_ALL_VALUES) + .build(); + public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF = TableTestProgram.of( "window-table-function-tumble-tvf", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json new file mode 100644 index 0000000000000..eece76f322355 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json @@ -0,0 +1,450 @@ +{ + "flinkVersion" : "2.1", + "nodes" : [ { + "id" : 1, + "type" : "batch-exec-values_1", + "tuples" : [ [ { + "kind" : "LITERAL", + "value" : 0, + "type" : "INT NOT NULL" + } ] ], + "outputType" : "ROW<`ZERO` INT NOT NULL>", + "description" : "Values(tuples=[[{ 0 }]], values=[ZERO])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "batch-exec-calc_1", + "projection" : [ { + "kind" : "LITERAL", + "value" : "2024-01-01 10:00:00", + "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL>", + "description" : "Calc(select=[2024-01-01 10:00:00 AS event_time])" + }, { + "id" : 3, + "type" : "batch-exec-calc_1", + "projection" : [ { + "kind" : "LITERAL", + "value" : "2024-01-01 10:05:00", + "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL>", + "description" : "Calc(select=[2024-01-01 10:05:00 AS event_time])" + }, { + "id" : 4, + "type" : "batch-exec-union_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL>", + "description" : "Union(all=[true], union=[event_time])" + }, { + "id" : 5, + "type" : "batch-exec-calc_1", + "projection" : [ { + "kind" : "LITERAL", + "value" : "2024-01-01 10:10:00", + "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL>", + "description" : "Calc(select=[2024-01-01 10:10:00 AS event_time])" + }, { + "id" : 6, + "type" : "batch-exec-union_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL>", + "description" : "Union(all=[true], union=[event_time])" + }, { + "id" : 7, + "type" : "batch-exec-window-table-function_1", + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "HoppingWindow", + "size" : "PT2M", + "slide" : "PT1M" + }, + "timeAttributeType" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL", + "timeAttributeIndex" : 0, + "isRowtime" : false + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `window_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL>", + "description" : "WindowTableFunction(window=[HOP(time_col=[event_time], size=[2 min], slide=[1 min])])" + }, { + "id" : 8, + "type" : "batch-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Calc(select=[window_start, window_end])" + }, { + "id" : 9, + "type" : "batch-exec-sort_1", + "configuration" : { + "table.exec.resource.sort.memory" : "128 mb", + "table.exec.sort.async-merge-enabled" : "true", + "table.exec.sort.max-num-file-handles" : "128", + "table.exec.spill-compression.block-size" : "64 kb", + "table.exec.spill-compression.enabled" : "true" + }, + "sortSpec" : { + "fields" : [ { + "index" : 0, + "isAscending" : true, + "nullIsLast" : false + }, { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "END_INPUT", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Sort(orderBy=[window_start ASC, window_end ASC])" + }, { + "id" : 15, + "type" : "batch-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "KEEP_INPUT_AS_IS", + "inputDistribution" : { + "type" : "UNKNOWN" + }, + "isStrict" : true + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Exchange(distribution=[forward])", + "requiredExchangeMode" : "UNDEFINED" + }, { + "id" : 10, + "type" : "batch-exec-sort-aggregate_1", + "grouping" : [ 0, 1 ], + "auxGrouping" : [ ], + "aggCalls" : [ ], + "aggInputRowType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "isMerge" : false, + "isFinal" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "LocalSortAggregate(groupBy=[window_start, window_end], select=[window_start, window_end])" + }, { + "id" : 11, + "type" : "batch-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "BLOCKING", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Exchange(distribution=[hash[window_start, window_end]])", + "requiredExchangeMode" : "UNDEFINED" + }, { + "id" : 12, + "type" : "batch-exec-sort_1", + "configuration" : { + "table.exec.resource.sort.memory" : "128 mb", + "table.exec.sort.async-merge-enabled" : "true", + "table.exec.sort.max-num-file-handles" : "128", + "table.exec.spill-compression.block-size" : "64 kb", + "table.exec.spill-compression.enabled" : "true" + }, + "sortSpec" : { + "fields" : [ { + "index" : 0, + "isAscending" : true, + "nullIsLast" : false + }, { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "END_INPUT", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Sort(orderBy=[window_start ASC, window_end ASC])" + }, { + "id" : 16, + "type" : "batch-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "KEEP_INPUT_AS_IS", + "inputDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "isStrict" : true + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Exchange(distribution=[forward])", + "requiredExchangeMode" : "UNDEFINED" + }, { + "id" : 13, + "type" : "batch-exec-sort-aggregate_1", + "grouping" : [ 0, 1 ], + "auxGrouping" : [ ], + "aggCalls" : [ ], + "aggInputRowType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "isMerge" : true, + "isFinal" : true, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "SortAggregate(isMerge=[true], groupBy=[window_start, window_end], select=[window_start, window_end])" + }, { + "id" : 14, + "type" : "batch-exec-sink_1", + "configuration" : { + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.type-length-enforcer" : "IGNORE" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "BLOCKING", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[window_start, window_end])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 1, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 1, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java index 87867d5f4b212..a69d49e7b7d99 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java @@ -46,8 +46,9 @@ public class AlignedWindowTableFunctionOperator extends WindowTableFunctionOpera public AlignedWindowTableFunctionOperator( GroupWindowAssigner windowAssigner, int rowtimeIndex, + int timestampPrecision, ZoneId shiftTimeZone) { - super(windowAssigner, rowtimeIndex, shiftTimeZone); + super(windowAssigner, rowtimeIndex, timestampPrecision, shiftTimeZone); } @Override @@ -60,7 +61,7 @@ public void processElement(StreamRecord element) throws Exception { numNullRowTimeRecordsDropped.inc(); return; } - timestamp = inputRow.getTimestamp(rowtimeIndex, 3).getMillisecond(); + timestamp = inputRow.getTimestamp(rowtimeIndex, timestampPrecision).getMillisecond(); } else { timestamp = getProcessingTimeService().getCurrentProcessingTime(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java index 02b30e8f603c9..fdaca5557718c 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java @@ -120,8 +120,9 @@ public UnalignedWindowTableFunctionOperator( TypeSerializer windowSerializer, TypeSerializer inputSerializer, int rowtimeIndex, + int timestampPrecision, ZoneId shiftTimeZone) { - super(windowAssigner, rowtimeIndex, shiftTimeZone); + super(windowAssigner, rowtimeIndex, timestampPrecision, shiftTimeZone); this.trigger = createTrigger(windowAssigner); this.windowSerializer = checkNotNull(windowSerializer); this.inputSerializer = checkNotNull(inputSerializer); @@ -202,7 +203,7 @@ public void processElement(StreamRecord element) throws Exception { numNullRowTimeRecordsDropped.inc(); return; } - timestamp = inputRow.getTimestamp(rowtimeIndex, 3).getMillisecond(); + timestamp = inputRow.getTimestamp(rowtimeIndex, timestampPrecision).getMillisecond(); } else { timestamp = getProcessingTimeService().getCurrentProcessingTime(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java index 0e78ad72bb389..e76ff4bdec1c2 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java @@ -58,6 +58,8 @@ public abstract class WindowTableFunctionOperatorBase extends TableStreamOperato protected final GroupWindowAssigner windowAssigner; + protected final int timestampPrecision; + /** This is used for emitting elements with a given timestamp. */ private transient TimestampedCollector collector; @@ -73,10 +75,12 @@ public abstract class WindowTableFunctionOperatorBase extends TableStreamOperato public WindowTableFunctionOperatorBase( GroupWindowAssigner windowAssigner, int rowtimeIndex, + int timestampPrecision, ZoneId shiftTimeZone) { this.shiftTimeZone = shiftTimeZone; this.rowtimeIndex = rowtimeIndex; this.windowAssigner = windowAssigner; + this.timestampPrecision = timestampPrecision; checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0); } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java index 1f2238588dd06..e9317a84eebe8 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java @@ -340,7 +340,7 @@ private OneInputStreamOperatorTestHarness createTestHarness( GroupWindowAssigner windowAssigner, ZoneId shiftTimeZone) throws Exception { AlignedWindowTableFunctionOperator operator = new AlignedWindowTableFunctionOperator( - windowAssigner, ROW_TIME_INDEX, shiftTimeZone); + windowAssigner, ROW_TIME_INDEX, DEFAULT_TIMESTAMP_PRECISION, shiftTimeZone); return new OneInputStreamOperatorTestHarness<>(operator, INPUT_ROW_SER); } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java index 05e54aaca2d3a..cafe6e284f683 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java @@ -563,6 +563,7 @@ private UnalignedWindowTableFunctionOperator createOperator( windowAssigner.getWindowSerializer(new ExecutionConfig()), new RowDataSerializer(INPUT_ROW_TYPE), rowTimeIndex, + DEFAULT_TIMESTAMP_PRECISION, shiftTimeZone); } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java index 06ede388ca9fc..0d015ac40e733 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java @@ -67,6 +67,8 @@ protected long localMills(long epochMills) { protected static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE); protected static final int ROW_TIME_INDEX = 2; + protected static final int DEFAULT_TIMESTAMP_PRECISION = 3; + protected static final LogicalType[] OUTPUT_TYPES = new LogicalType[] { new VarCharType(Integer.MAX_VALUE),