FlowerPower is a simple workflow framework based on two fantastic Python libraries:
- Hamilton: Creates DAGs from your pipeline functions
- APScheduler: Handles pipeline scheduling
- π Pipeline Workflows: Create and execute complex DAG-based workflows
- β° Scheduling: Run pipelines at specific times or intervals
- βοΈ Parameterization: Easily configure pipeline parameters
- π Tracking: Monitor executions with Hamilton UI
- π οΈ Flexible Configuration: Simple YAML-based setup
- π‘ Distributed Execution: Support for distributed environments
# Basic installation
pip install flowerpower
# With scheduling support
pip install "flowerpower[scheduler]"
# Additional components
pip install "flowerpower[scheduler,mqtt]" # MQTT broker
pip install "flowerpower[scheduler,redis]" # Redis broker
pip install "flowerpower[scheduler,mongodb]" # MongoDB store
pip install "flowerpower[scheduler,ray]" # Ray computing
pip install "flowerpower[scheduler,dask]" # Dask computing
Option 1: Command Line
flowerpower init new-project
cd new-project
Option 2: Python
from flowerpower import init
init("new-project")
This creates basic config files:
conf/project.yml
It is recommended to use the project manager uv
to manage your project dependencies.
Installation
pip install uv
For more installation options, visit: https://docs.astral.sh/uv/getting-started/installation/
Project Initialization
uv init --app --no-readme --vcs git
Option 1: Command Line
flowerpower add my_flow
# or
flowerpower new my_flow
Option 2: Python
# Using PipelineManager
from flowerpower.pipeline import PipelineManager
pm = PipelineManager()
pm.new("my_flow")
# Or using the new function directly
from flowerpower.pipeline import new
new("my_flow")
This creates the new pipeline and configuration file:
pipelines/my_flow.py
conf/pipelines/my_flow.yml
-
Add Pipeline Functions Build your pipeline by adding the functions (nodes) to
pipelines/my_flow.py
that build the DAG, following the Hamilton paradigm. -
Parameterize Functions
You can parameterize functions in two ways:
Method 1: Default Values
def add_int_col(
df: pd.DataFrame,
col_name: str = "foo",
values: str = "bar"
) -> pd.DataFrame:
return df.assign(**{col_name: values})
Method 2: Configuration File
In conf/pipelines/my_flow.yml
:
...
func:
add_int_col:
col_name: foo
values: bar
...
Add the @parameterize
decorator to the function in your pipeline file:
@parameterize(**PARAMS.add_int_col)
def add_int_col(
df: pd.DataFrame,
col_name: str,
values: int
) -> pd.DataFrame:
return df.assign(**{col_name: values})
You can configure the pipeline parameters inputs
, and final_vars
, and other parameters in the pipeline
configuration file conf/pipelines/my_flow.yml
or directly in the pipeline execution function.
...
run:
inputs:
data_path: path/to/data.csv
fs_protocol: local
final_vars: [add_int_col, final_df]
# optional parameters
with_tracker: false
executor: threadpool # or processpool, ray, dask
...
There are three ways to execute a pipeline:
-
Direct Execution
- Runs in current process
- No data store required
-
Job Execution
- Runs as APScheduler job
- Returns job results
- Requires data store and event broker
-
Async Job Addition
- Adds to APScheduler
- Returns job ID
- Results retrievable from data store
# Note: add --inputs and --final-vars and other optional parameters if not specified in the config file
# Direct execution
flowerpower run my_flow
# Job execution
flowerpower run-job my_flow
# Add as scheduled job
flowerpower add-job my_flow
You can also use the --inputs
and --final-vars
flags to override the configuration file parameters or if they are not specified in the configuration file.
flowerpower run my_flow \
--inputs data_path=path/to/data.csv,fs_protocol=local \
--final-vars final_df \
--executor threadpool
--without-tracker
from flowerpower.pipeline import Pipeline, run, run_job, add_job
# Using Pipeline class
p = Pipeline("my_flow")
# Note: add inputs, final_vars, and other optional arguments if not specified in the config file
result = p.run()
result = p.run_job()
job_id = p.add_job()
# Using functions
result = run("my_flow")
result = run_job("my_flow")
job_id = add_job("my_flow")
You can also use the inputs
and final-vars
arguments to override the configuration file parameters or if they are not specified in the configuration file.
result = run(
"my_flow",
inputs={
"data_path": "path/to/data.csv",
"fs_protocol": "local"
},
final_vars=["final_df"],
executor="threadpool",
with_tracker=False
)
# Run every 30 seconds
flowerpower schedule my_flow \
--type interval \
--interval-params seconds=30
# Run at specific date/time
flowerpower schedule my_flow \
--type date \
--date-params year=2022,month=1,day=1,hour=0,minute=0,second=0
# Run with cron parameters
flowerpower schedule my_flow \
--type cron \
--cron-params second=0,minute=0,hour=0,day=1,month=1,day_of_week=0
# Run with crontab expression
flowerpower schedule my_flow \
--type cron \
--crontab "0 0 1 1 0"
from flowerpower.scheduler import schedule, Pipeline
# Using Pipeline class
p = Pipeline("my_flow")
p.schedule("interval", seconds=30)
# Using schedule function
schedule("my_flow", "interval", seconds=30)
Command Line
flowerpower start-worker
Python
# Using the SchedulerManager class
from flowerpower.scheduler import SchedulerManager
sm = SchedulerManager()
sm.start_worker()
# Using the start_worker function
from flowerpower.scheduler import start_worker
start_worker()
Configure your worker in conf/project.yml
:
# PostgreSQL Configuration
data_store:
type: postgres
uri: postgresql+asyncpq://user:password@localhost:5432/flowerpower
# Redis Event Broker
event_broker:
type: redis
uri: redis://localhost:6379
# Alternative configuration:
# host: localhost
# port: 6379
SQLite
data_store:
type: sqlite
uri: sqlite+aiosqlite:///flowerpower.db
MySQL
data_store:
type: mysql
uri: mysql+aiomysql://user:password@localhost:3306/flowerpower
MongoDB
data_store:
type: mongodb
uri: mongodb://localhost:27017/flowerpower
In-Memory
data_store:
type: memory
MQTT
event_broker:
type: mqtt
host: localhost
port: 1883
username: edge # optional
password: edge # optional
Redis
event_broker:
type: redis
uri: redis://localhost:6379
# Alternative configuration:
# host: localhost
# port: 6379
In-Memory
event_broker:
type: memory
# Install UI package
pip install "flowerpower[ui]"
# Start UI server
flowerpower hamilton-ui
Access the UI at: http://localhost:8241
# Clone Hamilton repository
git clone https://github.com/dagworks-inc/hamilton
cd hamilton/ui
# Start UI server
./run.sh
Access the UI at: http://localhost:8242
Configure tracking in conf/project.yml
:
username: [email protected]
api_url: http://localhost:8241
ui_url: http://localhost:8242
api_key: optional_key
And specify the tracker
parameter in the pipeline configuration `conf/pipelines/my_flow.yml:
...
tracker:
project_id: 1
tags:
environment: dev
version: 1.0
dag_name: my_flow_123
...
Download the docker-compose configuration:
curl -O https://raw.githubusercontent.com/legout/flowerpower/main/docker/docker-compose.yml
# MQTT Broker (EMQX)
docker-compose up mqtt -d
# Redis
docker-compose up redis -d
# MongoDB
docker-compose up mongodb -d
# PostgreSQL
docker-compose up postgres -d
Contributions are welcome! Please feel free to submit a Pull Request.
For support, please open an issue in the GitHub repository.