diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index 672199537ac2..dccb6d6e0c7c 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -4756,6 +4756,614 @@ def map_values(col: "ColumnOrName") -> Column: # return _invoke_higher_order_function("MapZipWith", [col1, col2], [f]) +def posexplode(col: "ColumnOrName") -> Column: + """ + Returns a new row for each element with position in the given array or map. + Uses the default column name `pos` for position, and `col` for elements in the + array and `key` and `value` for elements in the map unless specified otherwise. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + one row per array item or map key value including positions as a separate column. + + Examples + -------- + >>> from pyspark.sql import Row + >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) + >>> eDF.select(posexplode(eDF.intlist)).collect() + [Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)] + + >>> eDF.select(posexplode(eDF.mapfield)).show() + +---+---+-----+ + |pos|key|value| + +---+---+-----+ + | 0| a| b| + +---+---+-----+ + """ + return _invoke_function_over_columns("posexplode", col) + + +def posexplode_outer(col: "ColumnOrName") -> Column: + """ + Returns a new row for each element with position in the given array or map. + Unlike posexplode, if the array/map is null or empty then the row (null, null) is produced. + Uses the default column name `pos` for position, and `col` for elements in the + array and `key` and `value` for elements in the map unless specified otherwise. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + one row per array item or map key value including positions as a separate column. + + Examples + -------- + >>> df = spark.createDataFrame( + ... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], + ... ("id", "an_array", "a_map") + ... ) + >>> df.select("id", "an_array", posexplode_outer("a_map")).show() + +---+----------+----+----+-----+ + | id| an_array| pos| key|value| + +---+----------+----+----+-----+ + | 1|[foo, bar]| 0| x| 1.0| + | 2| []|null|null| null| + | 3| null|null|null| null| + +---+----------+----+----+-----+ + >>> df.select("id", "a_map", posexplode_outer("an_array")).show() + +---+----------+----+----+ + | id| a_map| pos| col| + +---+----------+----+----+ + | 1|{x -> 1.0}| 0| foo| + | 1|{x -> 1.0}| 1| bar| + | 2| {}|null|null| + | 3| null|null|null| + +---+----------+----+----+ + """ + return _invoke_function_over_columns("posexplode_outer", col) + + +def reverse(col: "ColumnOrName") -> Column: + """ + Collection function: returns a reversed string or an array with reverse order of elements. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column or expression + + Returns + ------- + :class:`~pyspark.sql.Column` + array of elements in reverse order. + + Examples + -------- + >>> df = spark.createDataFrame([('Spark SQL',)], ['data']) + >>> df.select(reverse(df.data).alias('s')).collect() + [Row(s='LQS krapS')] + >>> df = spark.createDataFrame([([2, 1, 3],) ,([1],) ,([],)], ['data']) + >>> df.select(reverse(df.data).alias('r')).collect() + [Row(r=[3, 1, 2]), Row(r=[1]), Row(r=[])] + """ + return _invoke_function_over_columns("reverse", col) + + +# TODO(SPARK-41493): Support options +def schema_of_csv(csv: "ColumnOrName") -> Column: + """ + Parses a CSV string and infers its schema in DDL format. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + csv : :class:`~pyspark.sql.Column` or str + a CSV string or a foldable string column containing a CSV string. + + Returns + ------- + :class:`~pyspark.sql.Column` + a string representation of a :class:`StructType` parsed from given CSV. + + Examples + -------- + >>> df = spark.range(1) + >>> df.select(schema_of_csv(lit('1|a'), {'sep':'|'}).alias("csv")).collect() + [Row(csv='STRUCT<_c0: INT, _c1: STRING>')] + >>> df.select(schema_of_csv('1|a', {'sep':'|'}).alias("csv")).collect() + [Row(csv='STRUCT<_c0: INT, _c1: STRING>')] + """ + + if isinstance(csv, Column): + _csv = csv + elif isinstance(csv, str): + _csv = lit(csv) + else: + raise TypeError(f"csv should be a Column or str, but got {type(csv).__name__}") + + return _invoke_function("schema_of_csv", _csv) + + +# TODO(SPARK-41494): Support options +def schema_of_json(json: "ColumnOrName") -> Column: + """ + Parses a JSON string and infers its schema in DDL format. + + .. versionadded:: 2.4.0 + + Parameters + ---------- + json : :class:`~pyspark.sql.Column` or str + a JSON string or a foldable string column containing a JSON string. + + Returns + ------- + :class:`~pyspark.sql.Column` + a string representation of a :class:`StructType` parsed from given JSON. + + Examples + -------- + >>> df = spark.range(1) + >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect() + [Row(json='STRUCT')] + >>> schema = schema_of_json('{a: 1}', {'allowUnquotedFieldNames':'true'}) + >>> df.select(schema.alias("json")).collect() + [Row(json='STRUCT')] + """ + + if isinstance(json, Column): + _json = json + elif isinstance(json, str): + _json = lit(json) + else: + raise TypeError(f"json should be a Column or str, but got {type(json).__name__}") + + return _invoke_function("schema_of_json", _json) + + +def shuffle(col: "ColumnOrName") -> Column: + """ + Collection function: Generates a random permutation of the given array. + + .. versionadded:: 3.4.0 + + Notes + ----- + The function is non-deterministic. + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column or expression + + Returns + ------- + :class:`~pyspark.sql.Column` + an array of elements in random order. + + Examples + -------- + >>> df = spark.createDataFrame([([1, 20, 3, 5],), ([1, 20, None, 3],)], ['data']) + >>> df.select(shuffle(df.data).alias('s')).collect() # doctest: +SKIP + [Row(s=[3, 1, 5, 20]), Row(s=[20, None, 3, 1])] + """ + return _invoke_function_over_columns("shuffle", col) + + +def size(col: "ColumnOrName") -> Column: + """ + Collection function: returns the length of the array or map stored in the column. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column or expression + + Returns + ------- + :class:`~pyspark.sql.Column` + length of the array/map. + + Examples + -------- + >>> df = spark.createDataFrame([([1, 2, 3],),([1],),([],)], ['data']) + >>> df.select(size(df.data)).collect() + [Row(size(data)=3), Row(size(data)=1), Row(size(data)=0)] + """ + return _invoke_function_over_columns("size", col) + + +def slice( + col: "ColumnOrName", start: Union["ColumnOrName", int], length: Union["ColumnOrName", int] +) -> Column: + """ + Collection function: returns an array containing all the elements in `x` from index `start` + (array indices start at 1, or from the end if `start` is negative) with the specified `length`. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + column name or column containing the array to be sliced + start : :class:`~pyspark.sql.Column` or str or int + column name, column, or int containing the starting index + length : :class:`~pyspark.sql.Column` or str or int + column name, column, or int containing the length of the slice + + Returns + ------- + :class:`~pyspark.sql.Column` + a column of array type. Subset of array. + + Examples + -------- + >>> df = spark.createDataFrame([([1, 2, 3],), ([4, 5],)], ['x']) + >>> df.select(slice(df.x, 2, 2).alias("sliced")).collect() + [Row(sliced=[2, 3]), Row(sliced=[5])] + """ + if isinstance(start, Column): + _start = start + elif isinstance(start, int): + _start = lit(start) + else: + raise TypeError(f"start should be a Column or int, but got {type(start).__name__}") + + if isinstance(length, Column): + _length = length + elif isinstance(length, int): + _length = lit(length) + else: + raise TypeError(f"start should be a Column or int, but got {type(length).__name__}") + + return _invoke_function("slice", _to_col(col), _start, _length) + + +def sort_array(col: "ColumnOrName", asc: bool = True) -> Column: + """ + Collection function: sorts the input array in ascending or descending order according + to the natural ordering of the array elements. Null elements will be placed at the beginning + of the returned array in ascending order or at the end of the returned array in descending + order. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column or expression + asc : bool, optional + whether to sort in ascending or descending order. If `asc` is True (default) + then ascending and if False then descending. + + Returns + ------- + :class:`~pyspark.sql.Column` + sorted array. + + Examples + -------- + >>> df = spark.createDataFrame([([2, 1, None, 3],),([1],),([],)], ['data']) + >>> df.select(sort_array(df.data).alias('r')).collect() + [Row(r=[None, 1, 2, 3]), Row(r=[1]), Row(r=[])] + >>> df.select(sort_array(df.data, asc=False).alias('r')).collect() + [Row(r=[3, 2, 1, None]), Row(r=[1]), Row(r=[])] + """ + return _invoke_function("sort_array", _to_col(col), lit(asc)) + + +def struct( + *cols: Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]] +) -> Column: + """Creates a new struct column. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + cols : list, set, str or :class:`~pyspark.sql.Column` + column names or :class:`~pyspark.sql.Column`\\s to contain in the output struct. + + Returns + ------- + :class:`~pyspark.sql.Column` + a struct type column of given columns. + + Examples + -------- + >>> df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age")) + >>> df.select(struct('age', 'name').alias("struct")).collect() + [Row(struct=Row(age=2, name='Alice')), Row(struct=Row(age=5, name='Bob'))] + >>> df.select(struct([df.age, df.name]).alias("struct")).collect() + [Row(struct=Row(age=2, name='Alice')), Row(struct=Row(age=5, name='Bob'))] + """ + if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)): + cols = cols[0] # type: ignore[assignment] + return _invoke_function_over_columns("struct", *cols) # type: ignore[arg-type] + + +# TODO(SPARK-41493): Support options +def to_csv(col: "ColumnOrName") -> Column: + """ + Converts a column containing a :class:`StructType` into a CSV string. + Throws an exception, in the case of an unsupported type. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column containing a struct. + + Returns + ------- + :class:`~pyspark.sql.Column` + a CSV string converted from given :class:`StructType`. + + Examples + -------- + >>> from pyspark.sql import Row + >>> data = [(1, Row(age=2, name='Alice'))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_csv(df.value).alias("csv")).collect() + [Row(csv='2,Alice')] + """ + + return _invoke_function("to_csv", _to_col(col)) + + +# TODO(SPARK-41494): Support options +def to_json(col: "ColumnOrName") -> Column: + """ + Converts a column containing a :class:`StructType`, :class:`ArrayType` or a :class:`MapType` + into a JSON string. Throws an exception, in the case of an unsupported type. + + .. versionadded:: 2.1.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column containing a struct, an array or a map. + + Returns + ------- + :class:`~pyspark.sql.Column` + JSON object as string column. + + Examples + -------- + >>> from pyspark.sql import Row + >>> from pyspark.sql.types import * + >>> data = [(1, Row(age=2, name='Alice'))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json='{"age":2,"name":"Alice"}')] + >>> data = [(1, [Row(age=2, name='Alice'), Row(age=3, name='Bob')])] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json='[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')] + >>> data = [(1, {"name": "Alice"})] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json='{"name":"Alice"}')] + >>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json='[{"name":"Alice"},{"name":"Bob"}]')] + >>> data = [(1, ["Alice", "Bob"])] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json='["Alice","Bob"]')] + """ + + return _invoke_function("to_json", _to_col(col)) + + +# TODO(SPARK-41434): need to support LambdaFunction Expression first +# def transform( +# col: "ColumnOrName", +# f: Union[Callable[[Column], Column], Callable[[Column, Column], Column]], +# ) -> Column: +# """ +# Returns an array of elements after applying a transformation to each element in +# the input array. +# +# .. versionadded:: 3.1.0 +# +# Parameters +# ---------- +# col : :class:`~pyspark.sql.Column` or str +# name of column or expression +# f : function +# a function that is applied to each element of the input array. +# Can take one of the following forms: +# +# - Unary ``(x: Column) -> Column: ...`` +# - Binary ``(x: Column, i: Column) -> Column...``, where the second argument is +# a 0-based index of the element. +# +# and can use methods of :class:`~pyspark.sql.Column`, functions defined in +# :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. +# Python ``UserDefinedFunctions`` are not supported +# (`SPARK-27052 `__). +# +# Returns +# ------- +# :class:`~pyspark.sql.Column` +# a new array of transformed elements. +# +# Examples +# -------- +# >>> df = spark.createDataFrame([(1, [1, 2, 3, 4])], ("key", "values")) +# >>> df.select(transform("values", lambda x: x * 2).alias("doubled")).show() +# +------------+ +# | doubled| +# +------------+ +# |[2, 4, 6, 8]| +# +------------+ +# +# >>> def alternate(x, i): +# ... return when(i % 2 == 0, x).otherwise(-x) +# >>> df.select(transform("values", alternate).alias("alternated")).show() +# +--------------+ +# | alternated| +# +--------------+ +# |[1, -2, 3, -4]| +# +--------------+ +# """ +# return _invoke_higher_order_function("ArrayTransform", [col], [f]) + + +# TODO(SPARK-41434): need to support LambdaFunction Expression first +# def transform_keys(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column: +# """ +# Applies a function to every key-value pair in a map and returns +# a map with the results of those applications as the new keys for the pairs. +# +# .. versionadded:: 3.1.0 +# +# Parameters +# ---------- +# col : :class:`~pyspark.sql.Column` or str +# name of column or expression +# f : function +# a binary function ``(k: Column, v: Column) -> Column...`` +# Can use methods of :class:`~pyspark.sql.Column`, functions defined in +# :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. +# Python ``UserDefinedFunctions`` are not supported +# (`SPARK-27052 `__). +# +# Returns +# ------- +# :class:`~pyspark.sql.Column` +# a new map of enties where new keys were calculated by applying given function to +# each key value argument. +# +# Examples +# -------- +# >>> df = spark.createDataFrame([(1, {"foo": -2.0, "bar": 2.0})], ("id", "data")) +# >>> df.select(transform_keys( +# ... "data", lambda k, _: upper(k)).alias("data_upper") +# ... ).show(truncate=False) +# +-------------------------+ +# |data_upper | +# +-------------------------+ +# |{BAR -> 2.0, FOO -> -2.0}| +# +-------------------------+ +# """ +# return _invoke_higher_order_function("TransformKeys", [col], [f]) + +# TODO(SPARK-41434): need to support LambdaFunction Expression first +# def transform_values(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column: +# """ +# Applies a function to every key-value pair in a map and returns +# a map with the results of those applications as the new values for the pairs. +# +# .. versionadded:: 3.1.0 +# +# Parameters +# ---------- +# col : :class:`~pyspark.sql.Column` or str +# name of column or expression +# f : function +# a binary function ``(k: Column, v: Column) -> Column...`` +# Can use methods of :class:`~pyspark.sql.Column`, functions defined in +# :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. +# Python ``UserDefinedFunctions`` are not supported +# (`SPARK-27052 `__). +# +# Returns +# ------- +# :class:`~pyspark.sql.Column` +# a new map of enties where new values were calculated by applying given function to +# each key value argument. +# +# Examples +# -------- +# >>> df = spark.createDataFrame([(1, {"IT": 10.0, "SALES": 2.0, "OPS": 24.0})], ("id", "data")) +# >>> df.select(transform_values( +# ... "data", lambda k, v: when(k.isin("IT", "OPS"), v + 10.0).otherwise(v) +# ... ).alias("new_data")).show(truncate=False) +# +---------------------------------------+ +# |new_data | +# +---------------------------------------+ +# |{OPS -> 34.0, IT -> 20.0, SALES -> 2.0}| +# +---------------------------------------+ +# """ +# return _invoke_higher_order_function("TransformValues", [col], [f]) + + +# TODO(SPARK-41434): need to support LambdaFunction Expression first +# def zip_with( +# left: "ColumnOrName", +# right: "ColumnOrName", +# f: Callable[[Column, Column], Column], +# ) -> Column: +# """ +# Merge two given arrays, element-wise, into a single array using a function. +# If one array is shorter, nulls are appended at the end to match the length of the longer +# array, before applying the function. +# +# .. versionadded:: 3.1.0 +# +# Parameters +# ---------- +# left : :class:`~pyspark.sql.Column` or str +# name of the first column or expression +# right : :class:`~pyspark.sql.Column` or str +# name of the second column or expression +# f : function +# a binary function ``(x1: Column, x2: Column) -> Column...`` +# Can use methods of :class:`~pyspark.sql.Column`, functions defined in +# :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. +# Python ``UserDefinedFunctions`` are not supported +# (`SPARK-27052 `__). +# +# Returns +# ------- +# :class:`~pyspark.sql.Column` +# array of calculated values derived by applying given function to each pair of arguments. +# +# Examples +# -------- +# >>> df = spark.createDataFrame([(1, [1, 3, 5, 8], [0, 2, 4, 6])], ("id", "xs", "ys")) +# >>> df.select(zip_with("xs", "ys", lambda x, y: x ** y).alias("powers")).show(truncate=False) +# +---------------------------+ +# |powers | +# +---------------------------+ +# |[1.0, 9.0, 625.0, 262144.0]| +# +---------------------------+ +# +# >>> df = spark.createDataFrame([(1, ["foo", "bar"], [1, 2, 3])], ("id", "xs", "ys")) +# >>> df.select(zip_with("xs", "ys", lambda x, y: concat_ws("_", x, y)).alias("xs_ys")).show() +# +-----------------+ +# | xs_ys| +# +-----------------+ +# |[foo_1, bar_2, 3]| +# +-----------------+ +# """ +# return _invoke_higher_order_function("ZipWith", [left, right], [f]) + + # String/Binary functions diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index 921cdb287f6e..ee5d2d49d9ab 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -461,6 +461,8 @@ def test_collection_functions(self): (CF.array_distinct, SF.array_distinct), (CF.array_max, SF.array_max), (CF.array_min, SF.array_min), + (CF.reverse, SF.reverse), + (CF.size, SF.size), ]: self.assert_eq( cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(), @@ -593,6 +595,31 @@ def test_collection_functions(self): sdf.select(SF.get(sdf.a, 1)).toPandas(), ) + # test shuffle + # Can not compare the values due to the random permutation + self.assertEqual( + cdf.select(CF.shuffle(cdf.a), CF.shuffle("b")).count(), + sdf.select(SF.shuffle(sdf.a), SF.shuffle("b")).count(), + ) + + # test slice + self.assert_eq( + cdf.select(CF.slice(cdf.a, 1, 2), CF.slice("c", 2, 3)).toPandas(), + sdf.select(SF.slice(sdf.a, 1, 2), SF.slice("c", 2, 3)).toPandas(), + ) + + # test sort_array + self.assert_eq( + cdf.select(CF.sort_array(cdf.a, True), CF.sort_array("c", False)).toPandas(), + sdf.select(SF.sort_array(sdf.a, True), SF.sort_array("c", False)).toPandas(), + ) + + # test struct + self.compare_by_show( + cdf.select(CF.struct(cdf.a, "d", "e", cdf.f)), + sdf.select(SF.struct(sdf.a, "d", "e", sdf.f)), + ) + def test_map_collection_functions(self): from pyspark.sql import functions as SF from pyspark.sql.connect import functions as CF @@ -645,6 +672,12 @@ def test_map_collection_functions(self): sdf.select(SF.map_keys(sdf.a), SF.map_values("b")), ) + # test size + self.assert_eq( + cdf.select(CF.size(cdf.a), CF.size("c")).toPandas(), + sdf.select(SF.size(sdf.a), SF.size("c")).toPandas(), + ) + def test_generator_functions(self): from pyspark.sql import functions as SF from pyspark.sql.connect import functions as CF @@ -734,6 +767,44 @@ def test_generator_functions(self): .toPandas(), ) + # test posexplode with arrays + self.assert_eq( + cdf.select(CF.posexplode(cdf.a), CF.col("b")).toPandas(), + sdf.select(SF.posexplode(sdf.a), SF.col("b")).toPandas(), + ) + self.assert_eq( + cdf.select(CF.posexplode("a"), "b").toPandas(), + sdf.select(SF.posexplode("a"), "b").toPandas(), + ) + # test posexplode with maps + self.assert_eq( + cdf.select(CF.posexplode(cdf.d), CF.col("c")).toPandas(), + sdf.select(SF.posexplode(sdf.d), SF.col("c")).toPandas(), + ) + self.assert_eq( + cdf.select(CF.posexplode("d"), "c").toPandas(), + sdf.select(SF.posexplode("d"), "c").toPandas(), + ) + + # test posexplode_outer with arrays + self.assert_eq( + cdf.select(CF.posexplode_outer(cdf.a), CF.col("b")).toPandas(), + sdf.select(SF.posexplode_outer(sdf.a), SF.col("b")).toPandas(), + ) + self.assert_eq( + cdf.select(CF.posexplode_outer("a"), "b").toPandas(), + sdf.select(SF.posexplode_outer("a"), "b").toPandas(), + ) + # test posexplode_outer with maps + self.assert_eq( + cdf.select(CF.posexplode_outer(cdf.d), CF.col("c")).toPandas(), + sdf.select(SF.posexplode_outer(sdf.d), SF.col("c")).toPandas(), + ) + self.assert_eq( + cdf.select(CF.posexplode_outer("d"), "c").toPandas(), + sdf.select(SF.posexplode_outer("d"), "c").toPandas(), + ) + def test_csv_functions(self): from pyspark.sql import functions as SF from pyspark.sql.connect import functions as CF @@ -776,6 +847,18 @@ def test_csv_functions(self): ), ) + # test schema_of_csv + self.assert_eq( + cdf.select(CF.schema_of_csv(CF.lit('{"a": 0}'))).toPandas(), + sdf.select(SF.schema_of_csv(SF.lit('{"a": 0}'))).toPandas(), + ) + + # test to_csv + self.compare_by_show( + cdf.select(CF.to_csv(CF.struct(CF.lit("a"), CF.lit("b")))), + sdf.select(SF.to_csv(SF.struct(SF.lit("a"), SF.lit("b")))), + ) + def test_json_functions(self): from pyspark.sql import functions as SF from pyspark.sql.connect import functions as CF @@ -847,6 +930,18 @@ def test_json_functions(self): sdf.select(SF.json_tuple(sdf.c, "f1", "f2")).toPandas(), ) + # test schema_of_json + self.assert_eq( + cdf.select(CF.schema_of_json(CF.lit('{"a": 0}'))).toPandas(), + sdf.select(SF.schema_of_json(SF.lit('{"a": 0}'))).toPandas(), + ) + + # test to_json + self.compare_by_show( + cdf.select(CF.to_json(CF.struct(CF.lit("a"), CF.lit("b")))), + sdf.select(SF.to_json(SF.struct(SF.lit("a"), SF.lit("b")))), + ) + def test_string_functions(self): from pyspark.sql import functions as SF from pyspark.sql.connect import functions as CF @@ -875,6 +970,7 @@ def test_string_functions(self): (CF.ltrim, SF.ltrim), (CF.rtrim, SF.rtrim), (CF.trim, SF.trim), + (CF.reverse, SF.reverse), ]: self.assert_eq( cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(),