diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 645162b647d..0d33620ecb3 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -15459,9 +15459,9 @@ are limited. S NS NS +PS
Support for arrays of arrays of floats/doubles requires spark.rapids.sql.hasNans to be set to false;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS -NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
+PS
Support for structs containing float/double array columns requires spark.rapids.sql.hasNans to be set to false;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS @@ -15480,7 +15480,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
@@ -15502,9 +15502,9 @@ are limited. S NS NS +PS
Support for arrays of arrays of floats/doubles requires spark.rapids.sql.hasNans to be set to false;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS -NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
+PS
Support for structs containing float/double array columns requires spark.rapids.sql.hasNans to be set to false;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS @@ -15523,7 +15523,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
@@ -15545,9 +15545,9 @@ are limited. S NS NS +PS
Support for arrays of arrays of floats/doubles requires spark.rapids.sql.hasNans to be set to false;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS -NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
+PS
Support for structs containing float/double array columns requires spark.rapids.sql.hasNans to be set to false;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS @@ -15566,7 +15566,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index e9057b77d6b..2e41927d14b 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -949,6 +949,10 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): # all of the basic types in a single struct all_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens)]) +all_basic_struct_gen_no_nan = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens_no_nan)]) + +struct_array_gen_no_nans = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(single_level_array_gens_no_nan)]) + # Some struct gens, but not all because of nesting nonempty_struct_gens_sample = [all_basic_struct_gen, StructGen([['child0', byte_gen], ['child1', all_basic_struct_gen]]), diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 43948645574..09e4abd89a5 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -574,10 +574,23 @@ def test_hash_pivot_groupby_duplicates_fallback(data_gen): RepeatSeqGen(StructGen([ ['c0', all_basic_struct_gen], ['c1', int_gen]]), length=15)] +# data generating for collect_set based-nested Struct[Array] types +_repeat_agg_column_for_collect_set_op_nested = [ + RepeatSeqGen(struct_array_gen_no_nans, length=15), + RepeatSeqGen(StructGen([ + ['c0', struct_array_gen_no_nans], ['c1', int_gen]]), length=15), + RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_nan), length=15)] + +_array_of_array_gen = [RepeatSeqGen(ArrayGen(sub_gen), length=15) for sub_gen in single_level_array_gens_no_nan] + _gen_data_for_collect_set_op = [[ ('a', RepeatSeqGen(LongGen(), length=20)), ('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op] +_gen_data_for_collect_set_op_nested = [[ + ('a', RepeatSeqGen(LongGen(), length=20)), + ('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op_nested + _array_of_array_gen] + # very simple test for just a count on decimals 128 values until we can support more with them @ignore_order(local=True) @pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn) @@ -622,9 +635,7 @@ def test_hash_groupby_collect_list(data_gen, use_obj_hash_agg): conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower(), 'spark.sql.shuffle.partitions': '1'}) -@approximate_float @ignore_order(local=True) -@incompat @pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn) def test_hash_groupby_collect_set(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -632,9 +643,7 @@ def test_hash_groupby_collect_set(data_gen): .groupby('a') .agg(f.sort_array(f.collect_set('b')), f.count('b'))) -@approximate_float @ignore_order(local=True) -@incompat @pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op, ids=idfn) def test_hash_groupby_collect_set_on_nested_type(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -642,9 +651,75 @@ def test_hash_groupby_collect_set_on_nested_type(data_gen): .groupby('a') .agg(f.sort_array(f.collect_set('b')))) -@approximate_float + +# Note, using sort_array() on the CPU, because sort_array() does not yet +# support sorting certain nested/arbitrary types on the GPU +# See https://github.com/NVIDIA/spark-rapids/issues/3715 +# and https://github.com/rapidsai/cudf/issues/11222 +@ignore_order(local=True) +@allow_non_gpu("ProjectExec", "SortArray") +@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_nested, ids=idfn) +def test_hash_groupby_collect_set_on_nested_array_type(data_gen): + conf = copy_and_update(_no_nans_float_conf, { + "spark.rapids.sql.castFloatToString.enabled": "true", + "spark.rapids.sql.castDecimalToString.enabled": "true", + "spark.rapids.sql.expression.SortArray": "false" + }) + + def do_it(spark): + df = gen_df(spark, data_gen, length=100)\ + .groupby('a')\ + .agg(f.collect_set('b').alias("collect_set")) + # pull out the rdd and schema and create a new dataframe to run SortArray + # to handle Spark 3.3.0+ optimization that moves SortArray from ProjectExec + # to ObjectHashAggregateExec + return spark.createDataFrame(df.rdd, schema=df.schema)\ + .selectExpr("sort_array(collect_set)") + + assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) + + +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn) +def test_hash_reduction_collect_set(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: gen_df(spark, data_gen, length=100) + .agg(f.sort_array(f.collect_set('b')), f.count('b'))) + +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op, ids=idfn) +def test_hash_reduction_collect_set_on_nested_type(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: gen_df(spark, data_gen, length=100) + .agg(f.sort_array(f.collect_set('b')))) + + +# Note, using sort_array() on the CPU, because sort_array() does not yet +# support sorting certain nested/arbitrary types on the GPU +# See https://github.com/NVIDIA/spark-rapids/issues/3715 +# and https://github.com/rapidsai/cudf/issues/11222 +@ignore_order(local=True) +@allow_non_gpu("ProjectExec", "SortArray") +@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_nested, ids=idfn) +def test_hash_reduction_collect_set_on_nested_array_type(data_gen): + conf = copy_and_update(_no_nans_float_conf, { + "spark.rapids.sql.castFloatToString.enabled": "true", + "spark.rapids.sql.castDecimalToString.enabled": "true", + "spark.rapids.sql.expression.SortArray": "false" + }) + + def do_it(spark): + df = gen_df(spark, data_gen, length=100)\ + .agg(f.collect_set('b').alias("collect_set")) + # pull out the rdd and schema and create a new dataframe to run SortArray + # to handle Spark 3.3.0+ optimization that moves SortArray from ProjectExec + # to ObjectHashAggregateExec + return spark.createDataFrame(df.rdd, schema=df.schema)\ + .selectExpr("sort_array(collect_set)") + + assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) + @ignore_order(local=True) -@incompat @pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn) def test_hash_groupby_collect_with_single_distinct(data_gen): # test collect_ops with other distinct aggregations @@ -656,9 +731,7 @@ def test_hash_groupby_collect_with_single_distinct(data_gen): f.countDistinct('c'), f.count('c'))) -@approximate_float @ignore_order(local=True) -@incompat @pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn) def test_hash_groupby_single_distinct_collect(data_gen): # test distinct collect @@ -681,7 +754,6 @@ def test_hash_groupby_single_distinct_collect(data_gen): df_fun=lambda spark: gen_df(spark, data_gen, length=100), table_name="tbl", sql=sql) -@approximate_float @ignore_order(local=True) @pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn) def test_hash_groupby_collect_with_multi_distinct(data_gen): diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 313e4f57b66..b18fa095c5c 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -139,6 +139,11 @@ FloatGen(no_nans=True, special_cases=[]), DoubleGen(no_nans=True, special_cases=[]), string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] +_no_nans_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', + 'spark.rapids.sql.hasNans': 'false', + 'spark.rapids.sql.castStringToFloat.enabled': 'true' + } + @ignore_order @pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn) def test_decimal128_count_window(data_gen): @@ -920,6 +925,28 @@ def test_running_window_function_exec_for_all_aggs(): ('c_fp_nan', RepeatSeqGen(FloatGen().with_special_case(math.nan, 200.0), length=5)), ] +_gen_data_for_collect_set_nested = [ + ('a', RepeatSeqGen(LongGen(), length=20)), + ('b', LongRangeGen()), + ('c_int', RepeatSeqGen(IntegerGen(), length=15)), + ('c_struct_array_1', RepeatSeqGen(struct_array_gen_no_nans, length=15)), + ('c_struct_array_2', RepeatSeqGen(StructGen([ + ['c0', struct_array_gen_no_nans], ['c1', int_gen]]), length=14)), + ('c_array_struct', RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_nan), length=15)), + ('c_array_array_bool', RepeatSeqGen(ArrayGen(ArrayGen(BooleanGen())), length=15)), + ('c_array_array_int', RepeatSeqGen(ArrayGen(ArrayGen(IntegerGen())), length=15)), + ('c_array_array_long', RepeatSeqGen(ArrayGen(ArrayGen(LongGen())), length=15)), + ('c_array_array_short', RepeatSeqGen(ArrayGen(ArrayGen(ShortGen())), length=15)), + ('c_array_array_date', RepeatSeqGen(ArrayGen(ArrayGen(DateGen())), length=15)), + ('c_array_array_timestamp', RepeatSeqGen(ArrayGen(ArrayGen(TimestampGen())), length=15)), + ('c_array_array_byte', RepeatSeqGen(ArrayGen(ArrayGen(ByteGen())), length=15)), + ('c_array_array_string', RepeatSeqGen(ArrayGen(ArrayGen(StringGen())), length=15)), + ('c_array_array_float', RepeatSeqGen(ArrayGen(ArrayGen(FloatGen(no_nans=True))), length=15)), + ('c_array_array_double', RepeatSeqGen(ArrayGen(ArrayGen(DoubleGen(no_nans=True))), length=15)), + ('c_array_array_decimal_32', RepeatSeqGen(ArrayGen(ArrayGen(DecimalGen(precision=8, scale=3))), length=15)), + ('c_array_array_decimal_64', RepeatSeqGen(ArrayGen(ArrayGen(decimal_gen_64bit)), length=15)), + ('c_array_array_decimal_128', RepeatSeqGen(ArrayGen(ArrayGen(decimal_gen_128bit)), length=15)), +] # SortExec does not support array type, so sort the result locally. @ignore_order(local=True) @@ -977,6 +1004,86 @@ def test_window_aggs_for_rows_collect_set(): ) t ''') + +# Note, using sort_array() on the CPU, because sort_array() does not yet +# support sorting certain nested/arbitrary types on the GPU +# See https://github.com/NVIDIA/spark-rapids/issues/3715 +# and https://github.com/rapidsai/cudf/issues/11222 +@ignore_order(local=True) +@allow_non_gpu("ProjectExec", "SortArray") +def test_window_aggs_for_rows_collect_set_nested_array(): + conf = copy_and_update(_no_nans_float_conf, { + "spark.rapids.sql.castFloatToString.enabled": "true", + "spark.rapids.sql.castDecimalToString.enabled": "true", + "spark.rapids.sql.expression.SortArray": "false" + }) + + def do_it(spark): + df = gen_df(spark, _gen_data_for_collect_set_nested, length=512) + df.createOrReplaceTempView("window_collect_table") + df = spark.sql( + """select a, b, + collect_set(c_struct_array_1) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_struct_array_1, + collect_set(c_struct_array_2) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_struct_array_2, + collect_set(c_array_struct) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_struct, + collect_set(c_array_array_bool) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_bool, + collect_set(c_array_array_int) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_int, + collect_set(c_array_array_long) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_long, + collect_set(c_array_array_short) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_short, + collect_set(c_array_array_date) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_date, + collect_set(c_array_array_timestamp) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_ts, + collect_set(c_array_array_byte) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_byte, + collect_set(c_array_array_string) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_str, + collect_set(c_array_array_float) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_float, + collect_set(c_array_array_double) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_double, + collect_set(c_array_array_decimal_32) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_decimal_32, + collect_set(c_array_array_decimal_64) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_decimal_64, + collect_set(c_array_array_decimal_128) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_decimal_128 + from window_collect_table + """) + df = spark.createDataFrame(df.rdd, schema=df.schema) + # pull out the rdd and schema and create a new dataframe to run SortArray + # to handle Databricks 10.4+ optimization that moves SortArray from ProjectExec + # to ObjectHashAggregateExec + df.createOrReplaceTempView("window_collect_table_2") + return spark.sql("""select a, b, + sort_array(cc_struct_array_1), + sort_array(cc_struct_array_2), + sort_array(cc_array_struct), + sort_array(cc_array_array_bool), + sort_array(cc_array_array_int), + sort_array(cc_array_array_long), + sort_array(cc_array_array_short), + sort_array(cc_array_array_date), + sort_array(cc_array_array_ts), + sort_array(cc_array_array_byte), + sort_array(cc_array_array_str), + sort_array(cc_array_array_float), + sort_array(cc_array_array_double), + sort_array(cc_array_array_decimal_32), + sort_array(cc_array_array_decimal_64), + sort_array(cc_array_array_decimal_128) + from window_collect_table_2 + """) + assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) + + # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally @ignore_order(local=True) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a13e3f69cd2..9299d3de487 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3332,13 +3332,38 @@ object GpuOverrides extends Logging { "Collect a set of unique elements, not supported in reduction", ExprChecks.fullAgg( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + - TypeSig.NULL + TypeSig.STRUCT), + TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + - TypeSig.NULL + TypeSig.STRUCT).nested(), + TypeSig.NULL + + TypeSig.STRUCT.withPsNote(TypeEnum.STRUCT, "Support for structs containing " + + s"float/double array columns requires ${RapidsConf.HAS_NANS} to be set to false") + + TypeSig.ARRAY.withPsNote(TypeEnum.ARRAY, "Support for arrays of arrays of " + + s"floats/doubles requires ${RapidsConf.HAS_NANS} to be set to false")).nested(), TypeSig.all))), (c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) { + + private def isNestedArrayType(dt: DataType): Boolean = { + dt match { + case StructType(fields) => + fields.exists { field => + field.dataType match { + case sdt: StructType => isNestedArrayType(sdt) + case _: ArrayType => true + case _ => false + } + } + case ArrayType(et, _) => et.isInstanceOf[ArrayType] || et.isInstanceOf[StructType] + case _ => false + } + } + + override def tagAggForGpu(): Unit = { + if (isNestedArrayType(c.child.dataType)) { + checkAndTagFloatNanAgg("CollectSet", c.child.dataType, conf, this) + } + } override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = GpuCollectSet(childExprs.head, c.mutableAggBufferOffset, c.inputAggBufferOffset) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 5c7701f4833..94e76a3ffb6 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -1691,7 +1691,8 @@ case class GpuCollectSet( override def windowAggregation( inputs: Seq[(ColumnVector, Int)]): RollingAggregationOnColumn = - RollingAggregation.collectSet().onColumn(inputs.head._2) + RollingAggregation.collectSet(NullPolicy.EXCLUDE, NullEquality.EQUAL, + NaNEquality.UNEQUAL).onColumn(inputs.head._2) } trait CpuToGpuAggregateBufferConverter { diff --git a/tools/src/main/resources/supportedExprs.csv b/tools/src/main/resources/supportedExprs.csv index 1ae14023b5d..caabbaa7883 100644 --- a/tools/src/main/resources/supportedExprs.csv +++ b/tools/src/main/resources/supportedExprs.csv @@ -581,11 +581,11 @@ CollectList,S,`collect_list`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS CollectList,S,`collect_list`,None,reduction,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA CollectList,S,`collect_list`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS CollectList,S,`collect_list`,None,window,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA -CollectSet,S,`collect_set`,None,aggregation,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,PS,NS +CollectSet,S,`collect_set`,None,aggregation,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NS,PS,NS CollectSet,S,`collect_set`,None,aggregation,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA -CollectSet,S,`collect_set`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,PS,NS +CollectSet,S,`collect_set`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NS,PS,NS CollectSet,S,`collect_set`,None,reduction,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA -CollectSet,S,`collect_set`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,PS,NS +CollectSet,S,`collect_set`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NS,PS,NS CollectSet,S,`collect_set`,None,window,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA Count,S,`count`,None,aggregation,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S Count,S,`count`,None,aggregation,result,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA