-
I just discovered this library and I think it is really amazing! I'm coming from However, the only thing I'm missing is custom checks. I'm aware of the satisfies check that allows for a custom SQL expression, but this is quite limiting for building custom checks. Ideally, I'd like to have the ability to write a function that can be used as a check and register it to add it to the available built-in check, similar to how it is done in pandera. Would this be a feature that could make it into the library? If so, I'd gladly contribute to this but I have the feeling I'd need the support of the core devs for this. Let me know your thoughs! |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 6 replies
-
Hi @marrov thanks for opening a discussion, great to have new fresh ideas! |
Beta Was this translation helpful? Give feedback.
-
So the background for this request is that I'm working on a metadata-driven data transformation framework for a couple of tens of users that develop data products in pyspark. These data products entail a range of workflows, from simple dashboarding to more complex ML solutions. For all of these, I am developing a generic data quality solution, for which I originally went with However, another key requirement as I mentioned in my first post is to have user defined quality checks as functions. This would be similar to the generic
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("date_gaps").getOrCreate()
def has_gaps_daily(df, date_column):
windowSpec = Window.orderBy(date_column)
return (
df.withColumn("day_diff", F.datediff(F.col(date_column), F.lag(F.col(date_column)).over(windowSpec)) - 1)
.filter(F.col("day_diff") > 0)
.count()
)
data = [("2023-01-01",), ("2023-01-02",), ("2023-01-04",), ("2023-01-07",)] # Note the gaps
columns = ["date"]
df = spark.createDataFrame(data, schema=columns).withColumn("date", F.to_date(F.col("date")))
gap_count = has_gaps_daily(df, "date")
print(f"Number of gaps: {gap_count}") As far as I can see, something like the above cannot be done with the builtin-checks. Then the SQL-syntax could be an option, but the advantage of this approach is that users can define a unit test for their custom logic so that they can be sure that it is working as expected. One could also think about more complex checks, like to ensure that the std of a dataset does not vary period by period by more than x%. This would be a way of incorporating data drift checks as quality metrics for something like a gold layer (i.e. aggregated level) data product. Similarly, complex outlier detection would be programatically checked this way with a custom function that could even use an external python library. This is clearly not possible with the current architecture. I hope this writeup clarifies the ask and the motivation behind it. Let me know if there are any more questions. If this sounds compelling I'd be happy to brainstorm some plugin/extension/registry code concepts. |
Beta Was this translation helpful? Give feedback.
-
SQL injection and code injection in general is prevented in all frameworks, by passing the code to a syntax tree AST. That will immediately throws an exception when trying to execute something not recognized as part of the grammar. Perhaps The institution references, are just remarks on how the International Software Testing Qualifications Board (ISTQB) or the Data Management Body of Knowledge (DAMA-DMBOK) will suggest to structure Data Quality. There is large and well structured body of knowledge on the theories behind testing and data management/quality overall, things that help reducing chances of failure. Last, I was just making a reference that at the point that I will suggest the following:
I am happy to continue the discussion over email or a call, you can reach me at |
Beta Was this translation helpful? Give feedback.
-
@marrov def test_positive(spark):
df = spark.range(10)
check = Check(CheckLevel.WARNING, "pytest")
check.is_custom("id", lambda x: x.withColumn("test", F.col("id") >= 0))
rs = check.validate(df)
assert rs.first().status == "PASS"
assert rs.first().violations == 0
assert rs.first().pass_threshold == 1.0 At the moment is only covered for |
Beta Was this translation helpful? Give feedback.
-
This is quite amazing @canimus, thanks for the fast implementation. I had some time to play around with this today and it looks like what I was going for! I have a couple of questions but let me introduce an example: import inspect
import pyspark.sql.functions as F
import pyspark.sql.types as T
from cuallee import Check
from pyspark.sql import DataFrame, SparkSession
from toolz import curry
spark = SparkSession.builder.getOrCreate()
data = [("A", 1), ("B", -1), ("B", 0), ("C", 2)]
schema = T.StructType([T.StructField("id", T.StringType(), True), T.StructField("quantity", T.IntegerType(), True)])
orders = spark.createDataFrame(data, schema=schema)
orders.show()
check = Check(name="orders_checks")
check = check.add_rule("is_unique", "id", 1)
check = check.add_rule("is_greater_than", "quantity", 0, 0.5)
# Define and add a custom check
@curry
def mean_above_threshold(df: DataFrame, column_name: str, threshold: float) -> DataFrame:
mean_value = df.select(F.mean(column_name).alias("mean")).collect()[0]["mean"]
is_above_threshold = mean_value > threshold
return df.withColumn("mean_above_threshold", F.lit(is_above_threshold))
col_name = "quantity"
check = check.add_rule("is_custom", col_name, mean_above_threshold(column_name=col_name, threshold=0), 1)
# Define a custom check function for data type validation
@curry
def is_correct_dtype(df: DataFrame, column_name: str, expected_dtype: T.DataType) -> DataFrame:
actual_dtype = [field.dataType for field in df.schema.fields if field.name == column_name][0]
is_dtype_correct = actual_dtype == expected_dtype
return df.withColumn(f"{column_name}_is_dtype_correct", F.lit(is_dtype_correct))
check = check.add_rule("is_custom", "id", is_correct_dtype(column_name="id", expected_dtype=T.StringType()), 1)
check = check.add_rule("is_custom", "quantity", is_correct_dtype(column_name="quantity", expected_dtype=T.IntegerType()), 1)
# Run the checks
output = check.validate(orders)
output.show()
# Verbose alternative to `f(x)?`
func = check.rules[-1].value
print(f"{func.__name__}{inspect.signature(func)}") So from this example I have a couple of questions. Some could probably be a different issue, but let's at least pose them here:
+---+-------------------+-------------+-------+--------+---------------+-----+----+----------+---------+--------------+------+
| id| timestamp| check| level| column| rule|value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-------------+-------+--------+---------------+-----+----+----------+---------+--------------+------+
| 1|2024-07-03 09:54:29|orders_checks|WARNING| id| is_unique| N/A| 4| 1| 0.75| 1.0| FAIL|
| 2|2024-07-03 09:54:29|orders_checks|WARNING|quantity|is_greater_than| 0| 4| 2| 0.5| 0.5| PASS|
| 3|2024-07-03 09:54:29|orders_checks|WARNING|quantity| is_custom| f(x)| 4| 0| 1.0| 1.0| PASS|
| 4|2024-07-03 09:54:29|orders_checks|WARNING| id| is_custom| f(x)| 4| 0| 1.0| 1.0| PASS|
| 5|2024-07-03 09:54:29|orders_checks|WARNING|quantity| is_custom| f(x)| 4| 0| 1.0| 1.0| PASS|
+---+-------------------+-------------+-------+--------+---------------+-----+----+----------+---------+--------------+------+ As you can see when I start adding a couple of custom functions it is very hard to track which is the logic due to value being is_correct_dtype(df: pyspark.sql.dataframe.DataFrame = '__no__default__', *, column_name: str = 'quantity', expected_dtype: pyspark.sql.types.DataType = IntegerType()) -> pyspark.sql.dataframe.DataFrame But maybe just the name or at least without the return types. Ideally, there would maybe be something like an optional description column? I'm sure you will have an even better suggestion than these. +---+-------------------+-------------+-------+--------+---------------+-----+----+----------+---------+--------------+------+
| id| timestamp| check| level| column| rule|value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-------------+-------+--------+---------------+-----+----+----------+---------+--------------+------+
| 1|2024-07-03 09:54:29|orders_checks|WARNING| id| is_unique| N/A| 4| 1| 0.75| 1.0| FAIL|
| 2|2024-07-03 09:54:29|orders_checks|WARNING|quantity|is_greater_than| 0| 4| 2| 0.5| 0.5| PASS|
| 3|2024-07-03 09:54:29|orders_checks|WARNING|quantity| is_custom| f(x)| 4| 0| 1.0| 1.0| PASS|
| 4|2024-07-03 09:54:29|orders_checks|WARNING| id| is_custom| f(x)| 4| 0| 1.0| 1.0| PASS|
| 5|2024-07-03 09:54:29|orders_checks|WARNING|quantity| is_custom| f(x)| 4| 4| 0.0| 1.0| FAIL|
+---+-------------------+-------------+-------+--------+---------------+-----+----+----------+---------+--------------+------+ Am I missing something or this cannot be done in this framework? |
Beta Was this translation helpful? Give feedback.
@marrov
v0.11.1
has now ais_custom
check.The following test case show how to use it, in general, is just a function that receives a dataframe, and by default it will use the last column of the dataframe to compute the evaluation summary:
At the moment is only covered for
pyspark
at a later stage, we could see if its as straight forward in the other dataframe libraries.…