diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index fe114e07c886b..38baf9b99135d 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2320,10 +2320,20 @@ def approx_count_distinct(col: "ColumnOrName", rsd: Optional[float] = None) -> C maximum relative standard deviation allowed (default = 0.05). For rsd < 0.01, it is more efficient to use :func:`count_distinct` + Returns + ------- + :class:`~pyspark.sql.Column` + the column of computed results. + Examples -------- - >>> df.agg(approx_count_distinct(df.age).alias('distinct_ages')).collect() - [Row(distinct_ages=2)] + >>> df = spark.createDataFrame([1,2,2,3], "INT") + >>> df.agg(approx_count_distinct("value").alias('distinct_values')).show() + +---------------+ + |distinct_values| + +---------------+ + | 3| + +---------------+ """ if rsd is None: return _invoke_function_over_columns("approx_count_distinct", col) @@ -2624,6 +2634,7 @@ def grouping(col: "ColumnOrName") -> Column: Examples -------- + >>> df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age")) >>> df.cube("name").agg(grouping("name"), sum("age")).orderBy("name").show() +-----+--------------+--------+ | name|grouping(name)|sum(age)| @@ -2966,9 +2977,14 @@ def rand(seed: Optional[int] = None) -> Column: Examples -------- - >>> df.withColumn('rand', rand(seed=42) * 3).collect() - [Row(age=2, name='Alice', rand=2.4052597283576684), - Row(age=5, name='Bob', rand=2.3913904055683974)] + >>> df = spark.range(2) + >>> df.withColumn('rand', rand(seed=42) * 3).show() # doctest: +SKIP + +---+------------------+ + | id| rand| + +---+------------------+ + | 0|1.4385751892400076| + | 1|1.7082186019706387| + +---+------------------+ """ if seed is not None: return _invoke_function("rand", seed) @@ -2998,9 +3014,14 @@ def randn(seed: Optional[int] = None) -> Column: Examples -------- - >>> df.withColumn('randn', randn(seed=42)).collect() - [Row(age=2, name='Alice', randn=1.1027054481455365), - Row(age=5, name='Bob', randn=0.7400395449950132)] + >>> df = spark.range(2) + >>> df.withColumn('randn', randn(seed=42)).show() # doctest: +SKIP + +---+--------------------+ + | id| randn| + +---+--------------------+ + | 0|-0.04167221574820542| + | 1| 0.15241403986452778| + +---+--------------------+ """ if seed is not None: return _invoke_function("randn", seed) @@ -3190,6 +3211,7 @@ def spark_partition_id() -> Column: Examples -------- + >>> df = spark.range(2) >>> df.repartition(1).select(spark_partition_id().alias("pid")).collect() [Row(pid=0), Row(pid=0)] """ @@ -3213,8 +3235,14 @@ def expr(str: str) -> Column: Examples -------- - >>> df.select(expr("length(name)")).collect() - [Row(length(name)=5), Row(length(name)=3)] + >>> df = spark.createDataFrame([["Alice"], ["Bob"]], ["name"]) + >>> df.select("name", expr("length(name)")).show() + +-----+------------+ + | name|length(name)| + +-----+------------+ + |Alice| 5| + | Bob| 3| + +-----+------------+ """ return _invoke_function("expr", str) @@ -3248,6 +3276,7 @@ def struct( Examples -------- + >>> df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age")) >>> df.select(struct('age', 'name').alias("struct")).collect() [Row(struct=Row(age=2, name='Alice')), Row(struct=Row(age=5, name='Bob'))] >>> df.select(struct([df.age, df.name]).alias("struct")).collect() @@ -3335,11 +3364,24 @@ def when(condition: Column, value: Any) -> Column: Examples -------- - >>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect() - [Row(age=3), Row(age=4)] + >>> df = spark.range(3) + >>> df.select(when(df['id'] == 2, 3).otherwise(4).alias("age")).show() + +---+ + |age| + +---+ + | 4| + | 4| + | 3| + +---+ - >>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect() - [Row(age=3), Row(age=None)] + >>> df.select(when(df.id == 2, df.id + 1).alias("age")).show() + +----+ + | age| + +----+ + |null| + |null| + | 3| + +----+ """ # Explicitly not using ColumnOrName type here to make reading condition less opaque if not isinstance(condition, Column): @@ -3380,11 +3422,26 @@ def log(arg1: Union["ColumnOrName", float], arg2: Optional["ColumnOrName"] = Non Examples -------- - >>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect() - ['0.30102', '0.69897'] + >>> df = spark.createDataFrame([10, 100, 1000], "INT") + >>> df.select(log(10.0, df.value).alias('ten')).show() # doctest: +SKIP + +---+ + |ten| + +---+ + |1.0| + |2.0| + |3.0| + +---+ - >>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect() - ['0.69314', '1.60943'] + And Natural logarithm + + >>> df.select(log(df.value)).show() # doctest: +SKIP + +-----------------+ + | ln(value)| + +-----------------+ + |2.302585092994046| + |4.605170185988092| + |4.605170185988092| + +-----------------+ """ if arg2 is None: return _invoke_function_over_columns("log", cast("ColumnOrName", arg1)) @@ -3409,8 +3466,13 @@ def log2(col: "ColumnOrName") -> Column: Examples -------- - >>> spark.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect() - [Row(log2=2.0)] + >>> df = spark.createDataFrame([(4,)], ['a']) + >>> df.select(log2('a').alias('log2')).show() + +----+ + |log2| + +----+ + | 2.0| + +----+ """ return _invoke_function_over_columns("log2", col) @@ -4976,11 +5038,14 @@ def sha2(col: "ColumnOrName", numBits: int) -> Column: Examples -------- - >>> digests = df.select(sha2(df.name, 256).alias('s')).collect() - >>> digests[0] - Row(s='3bc51062973c458d5a6f2d8d64a023246354ad7e064b1e4e009ec8a0699a3043') - >>> digests[1] - Row(s='cd9fb1e148ccd8442e5aa74904cc73bf6fb54d1d54d333bd596aa9bb4bb4e961') + >>> df = spark.createDataFrame([["Alice"], ["Bob"]], ["name"]) + >>> df.withColumn("sha2", sha2(df.name, 256)).show(truncate=False) + +-----+----------------------------------------------------------------+ + |name |sha2 | + +-----+----------------------------------------------------------------+ + |Alice|3bc51062973c458d5a6f2d8d64a023246354ad7e064b1e4e009ec8a0699a3043| + |Bob |cd9fb1e148ccd8442e5aa74904cc73bf6fb54d1d54d333bd596aa9bb4bb4e961| + +-----+----------------------------------------------------------------+ """ return _invoke_function("sha2", _to_java_column(col), numBits) @@ -6076,7 +6141,8 @@ def bin(col: "ColumnOrName") -> Column: Examples -------- - >>> df.select(bin(df.age).alias('c')).collect() + >>> df = spark.createDataFrame([2,5], "INT") + >>> df.select(bin(df.value).alias('c')).collect() [Row(c='10'), Row(c='101')] """ return _invoke_function_over_columns("bin", col) @@ -6268,6 +6334,7 @@ def create_map( Examples -------- + >>> df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age")) >>> df.select(create_map('name', 'age').alias("map")).collect() [Row(map={'Alice': 2}), Row(map={'Bob': 5})] >>> df.select(create_map([df.name, df.age]).alias("map")).collect() @@ -6344,6 +6411,7 @@ def array( Examples -------- + >>> df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age")) >>> df.select(array('age', 'age').alias("arr")).collect() [Row(arr=[2, 2]), Row(arr=[5, 5])] >>> df.select(array([df.age, df.age]).alias("arr")).collect() @@ -7289,6 +7357,11 @@ def size(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + length of the array/map. + Examples -------- >>> df = spark.createDataFrame([([1, 2, 3],),([1],),([],)], ['data']) @@ -7309,6 +7382,11 @@ def array_min(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + minimum value of array. + Examples -------- >>> df = spark.createDataFrame([([2, 1, 3],), ([None, 10, -1],)], ['data']) @@ -7329,6 +7407,11 @@ def array_max(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + maximum value of an array. + Examples -------- >>> df = spark.createDataFrame([([2, 1, 3],), ([None, 10, -1],)], ['data']) @@ -7352,6 +7435,13 @@ def sort_array(col: "ColumnOrName", asc: bool = True) -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression asc : bool, optional + whether to sort in ascending or descending order. If `asc` is True (default) + then ascending and if False then descending. + + Returns + ------- + :class:`~pyspark.sql.Column` + sorted array. Examples -------- @@ -7386,6 +7476,11 @@ def array_sort( positive integer as the first element is less than, equal to, or greater than the second element. If the comparator function returns null, the function will fail and raise an error. + Returns + ------- + :class:`~pyspark.sql.Column` + sorted array. + Examples -------- >>> df = spark.createDataFrame([([2, 1, None, 3],),([1],),([],)], ['data']) @@ -7410,14 +7505,19 @@ def shuffle(col: "ColumnOrName") -> Column: .. versionadded:: 2.4.0 + Notes + ----- + The function is non-deterministic. + Parameters ---------- col : :class:`~pyspark.sql.Column` or str name of column or expression - Notes - ----- - The function is non-deterministic. + Returns + ------- + :class:`~pyspark.sql.Column` + an array of elements in random order. Examples -------- @@ -7439,6 +7539,11 @@ def reverse(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + array of elements in reverse order. + Examples -------- >>> df = spark.createDataFrame([('Spark SQL',)], ['data']) @@ -7464,11 +7569,28 @@ def flatten(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + flattened array. + Examples -------- >>> df = spark.createDataFrame([([[1, 2, 3], [4, 5], [6]],), ([None, [4, 5]],)], ['data']) - >>> df.select(flatten(df.data).alias('r')).collect() - [Row(r=[1, 2, 3, 4, 5, 6]), Row(r=None)] + >>> df.show(truncate=False) + +------------------------+ + |data | + +------------------------+ + |[[1, 2, 3], [4, 5], [6]]| + |[null, [4, 5]] | + +------------------------+ + >>> df.select(flatten(df.data).alias('r')).show() + +------------------+ + | r| + +------------------+ + |[1, 2, 3, 4, 5, 6]| + | null| + +------------------+ """ return _invoke_function_over_columns("flatten", col) @@ -7477,6 +7599,8 @@ def map_contains_key(col: "ColumnOrName", value: Any) -> Column: """ Returns true if the map contains the key. + .. versionadded:: 3.4.0 + Parameters ---------- col : :class:`~pyspark.sql.Column` or str @@ -7484,7 +7608,10 @@ def map_contains_key(col: "ColumnOrName", value: Any) -> Column: value : a literal value - .. versionadded:: 3.4.0 + Returns + ------- + :class:`~pyspark.sql.Column` + True if key is in the map and False otherwise. Examples -------- @@ -7517,6 +7644,11 @@ def map_keys(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + keys of the map as an array. + Examples -------- >>> from pyspark.sql.functions import map_keys @@ -7542,6 +7674,11 @@ def map_values(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + values of the map as an array. + Examples -------- >>> from pyspark.sql.functions import map_values @@ -7567,23 +7704,36 @@ def map_entries(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + ar array of key value pairs as a struct type + Examples -------- >>> from pyspark.sql.functions import map_entries >>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as data") - >>> df.select(map_entries("data").alias("entries")).show() + >>> df = df.select(map_entries("data").alias("entries")) + >>> df.show() +----------------+ | entries| +----------------+ |[{1, a}, {2, b}]| +----------------+ + >>> df.printSchema() + root + |-- entries: array (nullable = false) + | |-- element: struct (containsNull = false) + | | |-- key: integer (nullable = false) + | | |-- value: string (nullable = false) """ return _invoke_function_over_columns("map_entries", col) def map_from_entries(col: "ColumnOrName") -> Column: """ - Collection function: Returns a map created from the given array of entries. + Collection function: Converts an array of entries (key value struct types) to a map + of values. .. versionadded:: 2.4.0 @@ -7592,6 +7742,11 @@ def map_from_entries(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + a map created from the given array of entries. + Examples -------- >>> from pyspark.sql.functions import map_from_entries @@ -7619,6 +7774,11 @@ def array_repeat(col: "ColumnOrName", count: Union["ColumnOrName", int]) -> Colu count : :class:`~pyspark.sql.Column` or str or int column name, column, or int containing the number of times to repeat the first argument + Returns + ------- + :class:`~pyspark.sql.Column` + an array of repeated elements. + Examples -------- >>> df = spark.createDataFrame([('ab',)], ['data']) @@ -7633,7 +7793,8 @@ def array_repeat(col: "ColumnOrName", count: Union["ColumnOrName", int]) -> Colu def arrays_zip(*cols: "ColumnOrName") -> Column: """ Collection function: Returns a merged array of structs in which the N-th struct contains all - N-th values of input arrays. + N-th values of input arrays. If one of the arrays is shorter than others then + resulting struct type value will be a `null` for missing elemets. .. versionadded:: 2.4.0 @@ -7642,12 +7803,29 @@ def arrays_zip(*cols: "ColumnOrName") -> Column: cols : :class:`~pyspark.sql.Column` or str columns of arrays to be merged. + Returns + ------- + :class:`~pyspark.sql.Column` + merged array of entries. + Examples -------- >>> from pyspark.sql.functions import arrays_zip - >>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2']) - >>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect() - [Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])] + >>> df = spark.createDataFrame([(([1, 2, 3], [2, 4, 6], [3, 6]))], ['vals1', 'vals2', 'vals3']) + >>> df = df.select(arrays_zip(df.vals1, df.vals2, df.vals3).alias('zipped')) + >>> df.show(truncate=False) + +------------------------------------+ + |zipped | + +------------------------------------+ + |[{1, 2, 3}, {2, 4, 6}, {3, 6, null}]| + +------------------------------------+ + >>> df.printSchema() + root + |-- zipped: array (nullable = true) + | |-- element: struct (containsNull = false) + | | |-- vals1: long (nullable = true) + | | |-- vals2: long (nullable = true) + | | |-- vals3: long (nullable = true) """ return _invoke_function_over_seq_of_columns("arrays_zip", cols) @@ -7674,6 +7852,11 @@ def map_concat( cols : :class:`~pyspark.sql.Column` or str column names or :class:`~pyspark.sql.Column`\\s + Returns + ------- + :class:`~pyspark.sql.Column` + a map of merged entries from other maps. + Examples -------- >>> from pyspark.sql.functions import map_concat @@ -7700,6 +7883,20 @@ def sequence( .. versionadded:: 2.4.0 + Parameters + ---------- + start : :class:`~pyspark.sql.Column` or str + starting value (inclusive) + stop : :class:`~pyspark.sql.Column` or str + last values (inclusive) + step : :class:`~pyspark.sql.Column` or str, optional + value to add to current to get next element (default is 1) + + Returns + ------- + :class:`~pyspark.sql.Column` + an array of sequence values + Examples -------- >>> df1 = spark.createDataFrame([(-2, 2)], ('C1', 'C2')) @@ -7739,6 +7936,11 @@ def from_csv( .. # noqa + Returns + ------- + :class:`~pyspark.sql.Column` + a column of parsed CSV values + Examples -------- >>> data = [("1,2,3",)] @@ -7912,6 +8114,7 @@ def transform( Returns ------- :class:`~pyspark.sql.Column` + a new array of transformed elements. Examples -------- @@ -7951,7 +8154,12 @@ def exists(col: "ColumnOrName", f: Callable[[Column], Column]) -> Column: :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. Python ``UserDefinedFunctions`` are not supported (`SPARK-27052 `__). - :return: a :class:`~pyspark.sql.Column` + + Returns + ------- + :class:`~pyspark.sql.Column` + True if "any" element of an array evaluates to True when passed as an argument to + given function and False otherwise. Examples -------- @@ -7987,6 +8195,8 @@ def forall(col: "ColumnOrName", f: Callable[[Column], Column]) -> Column: Returns ------- :class:`~pyspark.sql.Column` + True if "all" elements of an array evaluates to True when passed as an argument to + given function and False otherwise. Examples -------- @@ -8045,6 +8255,8 @@ def filter( Returns ------- :class:`~pyspark.sql.Column` + filtered array of elements where given function evaluated to True + when passed as an argument. Examples -------- @@ -8100,6 +8312,7 @@ def aggregate( Returns ------- :class:`~pyspark.sql.Column` + final value after aggregate function is applied. Examples -------- @@ -8164,6 +8377,7 @@ def zip_with( Returns ------- :class:`~pyspark.sql.Column` + array of calculated values derived by applying given function to each pair of arguments. Examples -------- @@ -8207,6 +8421,8 @@ def transform_keys(col: "ColumnOrName", f: Callable[[Column, Column], Column]) - Returns ------- :class:`~pyspark.sql.Column` + a new map of enties where new keys were calculated by applying given function to + each key value argument. Examples -------- @@ -8244,6 +8460,8 @@ def transform_values(col: "ColumnOrName", f: Callable[[Column, Column], Column]) Returns ------- :class:`~pyspark.sql.Column` + a new map of enties where new values were calculated by applying given function to + each key value argument. Examples -------- @@ -8280,6 +8498,7 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co Returns ------- :class:`~pyspark.sql.Column` + filtered map. Examples -------- @@ -8322,6 +8541,8 @@ def map_zip_with( Returns ------- :class:`~pyspark.sql.Column` + zipped map where entries are calculated by applying given function to each + pair of arguments. Examples -------- @@ -8351,6 +8572,16 @@ def years(col: "ColumnOrName") -> Column: .. versionadded:: 3.1.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target date or timestamp column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + data partitioned by years. + Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP @@ -8374,6 +8605,16 @@ def months(col: "ColumnOrName") -> Column: .. versionadded:: 3.1.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target date or timestamp column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + data partitioned by months. + Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( @@ -8397,6 +8638,16 @@ def days(col: "ColumnOrName") -> Column: .. versionadded:: 3.1.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target date or timestamp column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + data partitioned by days. + Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP @@ -8420,6 +8671,16 @@ def hours(col: "ColumnOrName") -> Column: .. versionadded:: 3.1.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target date or timestamp column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + data partitioned by hours. + Examples -------- >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP @@ -8449,6 +8710,16 @@ def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column: ... bucket(42, "ts") ... ).createOrReplace() + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target date or timestamp column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + data partitioned by given columns. + Notes ----- This function can be used only in combination with @@ -8482,6 +8753,11 @@ def call_udf(udfName: str, *cols: "ColumnOrName") -> Column: cols : :class:`~pyspark.sql.Column` or str column names or :class:`~pyspark.sql.Column`\\s to be used in the UDF + Returns + ------- + :class:`~pyspark.sql.Column` + result of executed udf. + Examples -------- >>> from pyspark.sql.functions import call_udf, col @@ -8647,7 +8923,7 @@ def udf( def _test() -> None: import doctest - from pyspark.sql import Row, SparkSession + from pyspark.sql import SparkSession import pyspark.sql.functions globs = pyspark.sql.functions.__dict__.copy() @@ -8655,7 +8931,6 @@ def _test() -> None: sc = spark.sparkContext globs["sc"] = sc globs["spark"] = spark - globs["df"] = spark.createDataFrame([Row(age=2, name="Alice"), Row(age=5, name="Bob")]) (failure_count, test_count) = doctest.testmod( pyspark.sql.functions, globs=globs,