diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index fd7a7247fc8ad..bc807def9b5be 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2462,6 +2462,13 @@ def monotonically_increasing_id() -> Column: This expression would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. + Returns + ------- + :class:`~pyspark.sql.Column` + last value of the group. + + Examples + -------- >>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1']) >>> df0.select(monotonically_increasing_id().alias('id')).collect() [Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)] @@ -2476,6 +2483,18 @@ def nanvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: .. versionadded:: 1.6.0 + Parameters + ---------- + col1 : :class:`~pyspark.sql.Column` or str + first column to check. + col2 : :class:`~pyspark.sql.Column` or str + second column to return if first is NaN. + + Returns + ------- + :class:`~pyspark.sql.Column` + value from first column or second if first is NaN . + Examples -------- >>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) @@ -2493,19 +2512,29 @@ def percentile_approx( """Returns the approximate `percentile` of the numeric column `col` which is the smallest value in the ordered `col` values (sorted from least to greatest) such that no more than `percentage` of `col` values is less than the value or equal to that value. - The value of percentage must be between 0.0 and 1.0. - - The accuracy parameter (default: 10000) - is a positive numeric literal which controls approximation accuracy at the cost of memory. - Higher value of accuracy yields better accuracy, 1.0/accuracy is the relative error - of the approximation. - When percentage is an array, each value of the percentage array must be between 0.0 and 1.0. - In this case, returns the approximate percentile array of column col - at the given percentage array. .. versionadded:: 3.1.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + input column. + percentage : :class:`~pyspark.sql.Column`, float, list of floats or tuple of floats + percentage in decimal (must be between 0.0 and 1.0). + When percentage is an array, each value of the percentage array must be between 0.0 and 1.0. + In this case, returns the approximate percentile array of column col + at the given percentage array. + accuracy : :class:`~pyspark.sql.Column` or float + is a positive numeric literal which controls approximation accuracy + at the cost of memory. Higher value of accuracy yields better accuracy, + 1.0/accuracy is the relative error of the approximation. (default: 10000). + + Returns + ------- + :class:`~pyspark.sql.Column` + approximate `percentile` of the numeric column. + Examples -------- >>> key = (col("id") % 3).alias("key") @@ -2559,6 +2588,16 @@ def rand(seed: Optional[int] = None) -> Column: ----- The function is non-deterministic in general case. + Parameters + ---------- + seed : int (default: None) + seed value for random generator. + + Returns + ------- + :class:`~pyspark.sql.Column` + random values. + Examples -------- >>> df.withColumn('rand', rand(seed=42) * 3).collect() @@ -2581,6 +2620,16 @@ def randn(seed: Optional[int] = None) -> Column: ----- The function is non-deterministic in general case. + Parameters + ---------- + seed : int (default: None) + seed value for random generator. + + Returns + ------- + :class:`~pyspark.sql.Column` + random values. + Examples -------- >>> df.withColumn('randn', randn(seed=42)).collect() @@ -2600,6 +2649,18 @@ def round(col: "ColumnOrName", scale: int = 0) -> Column: .. versionadded:: 1.5.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + input column to round. + scale : int optional default 0 + scale value. + + Returns + ------- + :class:`~pyspark.sql.Column` + rounded values. + Examples -------- >>> spark.createDataFrame([(2.5,)], ['a']).select(round('a', 0).alias('r')).collect() @@ -2615,6 +2676,18 @@ def bround(col: "ColumnOrName", scale: int = 0) -> Column: .. versionadded:: 2.0.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + input column to round. + scale : int optional default 0 + scale value. + + Returns + ------- + :class:`~pyspark.sql.Column` + rounded values. + Examples -------- >>> spark.createDataFrame([(2.5,)], ['a']).select(bround('a', 0).alias('r')).collect() @@ -2640,6 +2713,18 @@ def shiftleft(col: "ColumnOrName", numBits: int) -> Column: .. versionadded:: 3.2.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + input column of values to shift. + numBits : int + number of bits to shift. + + Returns + ------- + :class:`~pyspark.sql.Column` + shifted value. + Examples -------- >>> spark.createDataFrame([(21,)], ['a']).select(shiftleft('a', 1).alias('r')).collect() @@ -2665,6 +2750,18 @@ def shiftright(col: "ColumnOrName", numBits: int) -> Column: .. versionadded:: 3.2.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + input column of values to shift. + numBits : int + number of bits to shift. + + Returns + ------- + :class:`~pyspark.sql.Column` + shifted values. + Examples -------- >>> spark.createDataFrame([(42,)], ['a']).select(shiftright('a', 1).alias('r')).collect() @@ -2690,6 +2787,18 @@ def shiftrightunsigned(col: "ColumnOrName", numBits: int) -> Column: .. versionadded:: 3.2.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + input column of values to shift. + numBits : int + number of bits to shift. + + Returns + ------- + :class:`~pyspark.sql.Column` + shifted value. + Examples -------- >>> df = spark.createDataFrame([(-42,)], ['a']) @@ -2708,6 +2817,11 @@ def spark_partition_id() -> Column: ----- This is non deterministic because it depends on data partitioning and task scheduling. + Returns + ------- + :class:`~pyspark.sql.Column` + partition id the record belongs to. + Examples -------- >>> df.repartition(1).select(spark_partition_id().alias("pid")).collect() @@ -2721,6 +2835,16 @@ def expr(str: str) -> Column: .. versionadded:: 1.5.0 + Parameters + ---------- + str : str + expression defined in string. + + Returns + ------- + :class:`~pyspark.sql.Column` + column representing the expression. + Examples -------- >>> df.select(expr("length(name)")).collect() @@ -2751,6 +2875,11 @@ def struct( cols : list, set, str or :class:`~pyspark.sql.Column` column names or :class:`~pyspark.sql.Column`\\s to contain in the output struct. + Returns + ------- + :class:`~pyspark.sql.Column` + a struct type column of given columns. + Examples -------- >>> df.select(struct('age', 'name').alias("struct")).collect() @@ -2766,10 +2895,20 @@ def struct( def greatest(*cols: "ColumnOrName") -> Column: """ Returns the greatest value of the list of column names, skipping null values. - This function takes at least 2 parameters. It will return null iff all parameters are null. + This function takes at least 2 parameters. It will return null if all parameters are null. .. versionadded:: 1.5.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + columns to check for gratest value. + + Returns + ------- + :class:`~pyspark.sql.Column` + gratest value. + Examples -------- >>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c']) @@ -2784,7 +2923,7 @@ def greatest(*cols: "ColumnOrName") -> Column: def least(*cols: "ColumnOrName") -> Column: """ Returns the least value of the list of column names, skipping null values. - This function takes at least 2 parameters. It will return null iff all parameters are null. + This function takes at least 2 parameters. It will return null if all parameters are null. .. versionadded:: 1.5.0 @@ -2793,6 +2932,11 @@ def least(*cols: "ColumnOrName") -> Column: cols : :class:`~pyspark.sql.Column` or str column names or columns to be compared + Returns + ------- + :class:`~pyspark.sql.Column` + least value. + Examples -------- >>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c']) @@ -2818,6 +2962,11 @@ def when(condition: Column, value: Any) -> Column: value : a literal value, or a :class:`~pyspark.sql.Column` expression. + Returns + ------- + :class:`~pyspark.sql.Column` + column representing when expression. + Examples -------- >>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect() @@ -2851,6 +3000,18 @@ def log(arg1: Union["ColumnOrName", float], arg2: Optional["ColumnOrName"] = Non .. versionadded:: 1.5.0 + Parameters + ---------- + arg1 : :class:`~pyspark.sql.Column`, str or float + base number or actual number (in this case base is `e`) + arg2 : :class:`~pyspark.sql.Column`, str or float + number to calculate logariphm for. + + Returns + ------- + :class:`~pyspark.sql.Column` + logariphm of given value. + Examples -------- >>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect() @@ -2870,6 +3031,16 @@ def log2(col: "ColumnOrName") -> Column: .. versionadded:: 1.5.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + a column to calculate logariphm for. + + Returns + ------- + :class:`~pyspark.sql.Column` + logariphm of given value. + Examples -------- >>> spark.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect() @@ -2884,6 +3055,20 @@ def conv(col: "ColumnOrName", fromBase: int, toBase: int) -> Column: .. versionadded:: 1.5.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + a column to convert base for. + fromBase: int + from base number. + toBase: int + to base number. + + Returns + ------- + :class:`~pyspark.sql.Column` + logariphm of given value. + Examples -------- >>> df = spark.createDataFrame([("010101",)], ['n']) @@ -2899,6 +3084,16 @@ def factorial(col: "ColumnOrName") -> Column: .. versionadded:: 1.5.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + a column to calculate factorial for. + + Returns + ------- + :class:`~pyspark.sql.Column` + factorial of given value. + Examples -------- >>> df = spark.createDataFrame([(5,)], ['n']) @@ -2925,10 +3120,65 @@ def lag(col: "ColumnOrName", offset: int = 1, default: Optional[Any] = None) -> ---------- col : :class:`~pyspark.sql.Column` or str name of column or expression - offset : int, optional + offset : int, optional default 1 number of row to extend default : optional default value + + Returns + ------- + :class:`~pyspark.sql.Column` + value before current row based on `offset`. + + Examples + -------- + >>> from pyspark.sql import Window + >>> df = spark.createDataFrame([("a", 1), + ... ("a", 2), + ... ("a", 3), + ... ("b", 8), + ... ("b", 2)], ["c1", "c2"]) + >>> df.show() + +---+---+ + | c1| c2| + +---+---+ + | a| 1| + | a| 2| + | a| 3| + | b| 8| + | b| 2| + +---+---+ + >>> w = Window.partitionBy("c1").orderBy("c2") + >>> df.withColumn("previos_value", lag("c2").over(w)).show() + +---+---+-------------+ + | c1| c2|previos_value| + +---+---+-------------+ + | a| 1| null| + | a| 2| 1| + | a| 3| 2| + | b| 2| null| + | b| 8| 2| + +---+---+-------------+ + >>> df.withColumn("previos_value", lag("c2", 1, 0).over(w)).show() + +---+---+-------------+ + | c1| c2|previos_value| + +---+---+-------------+ + | a| 1| 0| + | a| 2| 1| + | a| 3| 2| + | b| 2| 0| + | b| 8| 2| + +---+---+-------------+ + >>> df.withColumn("previos_value", lag("c2", 2, -1).over(w)).show() + +---+---+-------------+ + | c1| c2|previos_value| + +---+---+-------------+ + | a| 1| -1| + | a| 2| -1| + | a| 3| 1| + | b| 2| -1| + | b| 8| -1| + +---+---+-------------+ """ return _invoke_function("lag", _to_java_column(col), offset, default) @@ -2947,10 +3197,65 @@ def lead(col: "ColumnOrName", offset: int = 1, default: Optional[Any] = None) -> ---------- col : :class:`~pyspark.sql.Column` or str name of column or expression - offset : int, optional + offset : int, optional default 1 number of row to extend default : optional default value + + Returns + ------- + :class:`~pyspark.sql.Column` + value after current row based on `offset`. + + Examples + -------- + >>> from pyspark.sql import Window + >>> df = spark.createDataFrame([("a", 1), + ... ("a", 2), + ... ("a", 3), + ... ("b", 8), + ... ("b", 2)], ["c1", "c2"]) + >>> df.show() + +---+---+ + | c1| c2| + +---+---+ + | a| 1| + | a| 2| + | a| 3| + | b| 8| + | b| 2| + +---+---+ + >>> w = Window.partitionBy("c1").orderBy("c2") + >>> df.withColumn("next_value", lead("c2").over(w)).show() + +---+---+----------+ + | c1| c2|next_value| + +---+---+----------+ + | a| 1| 2| + | a| 2| 3| + | a| 3| null| + | b| 2| 8| + | b| 8| null| + +---+---+----------+ + >>> df.withColumn("next_value", lead("c2", 1, 0).over(w)).show() + +---+---+----------+ + | c1| c2|next_value| + +---+---+----------+ + | a| 1| 2| + | a| 2| 3| + | a| 3| 0| + | b| 2| 8| + | b| 8| 0| + +---+---+----------+ + >>> df.withColumn("next_value", lead("c2", 2, -1).over(w)).show() + +---+---+----------+ + | c1| c2|next_value| + +---+---+----------+ + | a| 1| 3| + | a| 2| -1| + | a| 3| -1| + | b| 2| -1| + | b| 8| -1| + +---+---+----------+ """ return _invoke_function("lead", _to_java_column(col), offset, default) @@ -2971,11 +3276,56 @@ def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = Fa ---------- col : :class:`~pyspark.sql.Column` or str name of column or expression - offset : int, optional + offset : int number of row to use as the value ignoreNulls : bool, optional indicates the Nth value should skip null in the determination of which row to use + + Returns + ------- + :class:`~pyspark.sql.Column` + value of nth row. + + Examples + -------- + >>> from pyspark.sql import Window + >>> df = spark.createDataFrame([("a", 1), + ... ("a", 2), + ... ("a", 3), + ... ("b", 8), + ... ("b", 2)], ["c1", "c2"]) + >>> df.show() + +---+---+ + | c1| c2| + +---+---+ + | a| 1| + | a| 2| + | a| 3| + | b| 8| + | b| 2| + +---+---+ + >>> w = Window.partitionBy("c1").orderBy("c2") + >>> df.withColumn("nth_value", nth_value("c2", 1).over(w)).show() + +---+---+---------+ + | c1| c2|nth_value| + +---+---+---------+ + | a| 1| 1| + | a| 2| 1| + | a| 3| 1| + | b| 2| 2| + | b| 8| 2| + +---+---+---------+ + >>> df.withColumn("nth_value", nth_value("c2", 2).over(w)).show() + +---+---+---------+ + | c1| c2|nth_value| + +---+---+---------+ + | a| 1| null| + | a| 2| 2| + | a| 3| 2| + | b| 2| null| + | b| 8| 8| + +---+---+---------+ """ return _invoke_function("nth_value", _to_java_column(col), offset, ignoreNulls) @@ -2995,6 +3345,41 @@ def ntile(n: int) -> Column: ---------- n : int an integer + + Returns + ------- + :class:`~pyspark.sql.Column` + portioned group id. + + Examples + -------- + >>> from pyspark.sql import Window + >>> df = spark.createDataFrame([("a", 1), + ... ("a", 2), + ... ("a", 3), + ... ("b", 8), + ... ("b", 2)], ["c1", "c2"]) + >>> df.show() + +---+---+ + | c1| c2| + +---+---+ + | a| 1| + | a| 2| + | a| 3| + | b| 8| + | b| 2| + +---+---+ + >>> w = Window.partitionBy("c1").orderBy("c2") + >>> df.withColumn("ntile", ntile(2).over(w)).show() + +---+---+-----+ + | c1| c2|ntile| + +---+---+-----+ + | a| 1| 1| + | a| 2| 1| + | a| 3| 2| + | b| 2| 1| + | b| 8| 2| + +---+---+-----+ """ return _invoke_function("ntile", int(n)) @@ -3008,6 +3393,21 @@ def current_date() -> Column: All calls of current_date within the same query return the same value. .. versionadded:: 1.5.0 + + Returns + ------- + :class:`~pyspark.sql.Column` + current date. + + Examples + -------- + >>> df = spark.range(1) + >>> df.select(current_date()).show() # doctest: +SKIP + +--------------+ + |current_date()| + +--------------+ + | 2022-08-26| + +--------------+ """ return _invoke_function("current_date") @@ -3018,6 +3418,21 @@ def current_timestamp() -> Column: column. All calls of current_timestamp within the same query return the same value. .. versionadded:: 1.5.0 + + Returns + ------- + :class:`~pyspark.sql.Column` + current date and time. + + Examples + -------- + >>> df = spark.range(1) + >>> df.select(current_timestamp()).show(truncate=False) # doctest: +SKIP + +-----------------------+ + |current_timestamp() | + +-----------------------+ + |2022-08-26 21:23:22.716| + +-----------------------+ """ return _invoke_function("current_timestamp") @@ -3030,16 +3445,20 @@ def localtimestamp() -> Column: .. versionadded:: 3.4.0 + Returns + ------- + :class:`~pyspark.sql.Column` + current local date and time. + Examples -------- - >>> from pyspark.sql.functions import localtimestamp - >>> df = spark.range(0, 100) - >>> df.select(localtimestamp()).distinct().show() - +--------------------+ - | localtimestamp()| - +--------------------+ - |20...-...-... ...:...:...| - +--------------------+ + >>> df = spark.range(1) + >>> df.select(localtimestamp()).show(truncate=False) # doctest: +SKIP + +-----------------------+ + |localtimestamp() | + +-----------------------+ + |2022-08-26 21:28:34.639| + +-----------------------+ """ return _invoke_function("localtimestamp")