Skip to content

aurelien-clu/silex

Repository files navigation

Silex

Add more 🔥 to Apache Spark!

CI Pypi License Python Apache Spark

Manager: Poetry Test: BDD Test: Doctest Code style: Black Imports: isort Linter: Flake8 try/except style: tryceratops Typing: MyPy Security: Bandit Git: Pre-commit Git: Conventional Version: Semantic

TLDR

Silex is a Data Engineering library to extend PySpark.

You don't need another class, just use PySpark as usual and you have new functions to your DataFrames!

pip install spark-silex
import silex
from pyspark.sql import DataFrame

# extends your DataFrame with silex functions!
# if for some reason you don't want to do that, check 'Without extending Dataframes' README section below
silex.extend_dataframes()

df: DataFrame = ...  # your regular Spark DataFrame
df: DataFrame = df.drop_col_if_na()  # new function! and still a regular Spark Dataframe!
# scroll for more information!

TODO

  • IDE auto-completions
  • show-off coverage
  • more documentation and examples
  • more functions

Available functions

# assertions => raises an Exception if not met /!\
def expect_column(self, col: str) -> DataFrame: ...
def expect_columns(self, cols: Union[str, List[str]]) -> DataFrame: ...

def expect_distinct_values_equal_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> DataFrame: ...
def expect_distinct_values_in_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> DataFrame: ...

def expect_min_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...
def expect_avg_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...
def expect_max_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...

def expect_unique_id(self, cols: Union[str, List[str]]) -> DataFrame: ...

# boolean checks
def has_column(self, col: str) -> bool: ...
def has_columns(self, cols: Union[str, List[str]]) -> bool: ...

def has_distinct_values_equal_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> bool: ...
def has_distinct_values_in_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> bool: ...

def has_min_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...
def has_avg_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...
def has_max_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...

def has_unique_id(self, cols: Union[str, List[str]]) -> bool: ...

# dates
def with_date_column(self, col: str, fmt: str, new_col: Optional[str] = None) -> DataFrame: ...

# drop
def drop_col_if_na(self, max: int) -> DataFrame: ...
def drop_col_if_not_distinct(self, min: int) -> DataFrame: ...

# filters
def filter_on_range(self, col: str, from_: Any, to: Any, ...) -> DataFrame: ...

# joins
def join_closest_date(self, other: DataFrame, ...) -> DataFrame: ...

Getting started

Pre-requisites

  • Python 3.8 or above
  • Spark 3 or above

Installation

pip install spark-silex

Usage

examples/

By extending DataFrames! ⚡

import silex
from pyspark.sql import DataFrame, SparkSession

# extends your DataFrame with silex functions!
# if for some reason you don't want to do that, check next example
silex.extend_dataframes()

spark = SparkSession.builder.getOrCreate()

data = [
    (0, "01/29/2022", "a", 1.0),
    (1, "01/30/2022", "b", 2.0),
    (2, "01/31/2022", "c", 3.0),
]
df: DataFrame = spark.createDataFrame(data, schema=["id", "date_US", "text", "value"])
df.show()
# +---+----------+----+-----+
# | id|   date_US|text|value|
# +---+----------+----+-----+
# |  0|01/29/2022|   a|  1.0|
# |  1|01/30/2022|   b|  2.0|
# |  2|01/31/2022|   c|  3.0|
# +---+----------+----+-----+

df = df.with_date_column(col="date_US", new_col="date", fmt="MM/dd/yyyy")
df.show()
df.printSchema()
# +---+----------+----+-----+----------+
# | id|   date_US|text|value|      date|
# +---+----------+----+-----+----------+
# |  0|01/29/2022|   a|  1.0|2022-01-29|
# |  1|01/30/2022|   b|  2.0|2022-01-30|
# |  2|01/31/2022|   c|  3.0|2022-01-31|
# +---+----------+----+-----+----------+
#
# root
#  |-- id: long (nullable = true)
#  |-- date_US: string (nullable = true)
#  |-- text: string (nullable = true)
#  |-- value: double (nullable = true)
#  |-- date: date (nullable = true)

Without extending Dataframes 🌧️

from silex.fn.date import with_date_column

# same as before except you need to provide the DataFrame
df = with_date_column(df=df, col="date_US", new_col="date", fmt="MM/dd/yyyy")

Contributing

# install poetry and python 3.8, using pyenv for instance

cd silex
poetry env use path/to/python3.8  # e.g. ~/.pyenv/versions/3.8.12/bin/python
poetry shell
poetry install
pre-commit install

make help
# or open Makefile to learn about available commands for development