Skip to content

Commit a2d08cc

Browse files
committed
Prefect: Implement suggestions by CodeRabbit
1 parent 64d5230 commit a2d08cc

File tree

2 files changed

+32
-29
lines changed

2 files changed

+32
-29
lines changed

docs/integrate/prefect/index.md

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,18 @@
88
```
99

1010
:::{div} sd-text-muted
11-
Modern Workflow Orchestration.
11+
Orchestrate modern data workflows in Python.
1212
:::
1313

1414
:::{rubric} About
1515
:::
1616

17-
[Prefect] is a workflow orchestration framework for building resilient data
18-
pipelines in Python.
17+
Use [Prefect] to orchestrate resilient data pipelines in Python.
1918

20-
Give your team the power to build reliable workflows without sacrificing
21-
development speed. Prefect Core combines the freedom of pure Python
22-
development with production-grade resilience, putting you in control of
23-
your data operations. Transform your code into scalable workflows that
24-
deliver consistent results.
19+
Build reliable workflows without sacrificing development speed. Prefect
20+
combines the freedom of pure Python with production‑grade resilience,
21+
putting you in control of your data operations. Turn code into scalable
22+
workflows that deliver consistent results.
2523

2624
:::{rubric} Learn
2725
:::

docs/integrate/prefect/tutorial.md

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,41 @@
33

44
## Introduction
55

6-
[Prefect](https://www.prefect.io/opensource/) is an open-source workflow automation and orchestration tool for data engineering, machine learning, and other data-related tasks. It allows you to define, schedule, and execute complex data workflows in a straightforward manner.
6+
[Prefect](https://www.prefect.io/opensource/) is an open-source workflow orchestration tool for data engineering, machine learning, and other data tasks. You define, schedule, and execute complex data workflows with straightforward Python code.
77

8-
Prefect workflows are defined using *Python code*. Each step in the workflow is represented as a "task," and tasks can be connected to create a directed acyclic graph (DAG). The workflow defines the sequence of task execution and can include conditional logic and branching. Furthermore, Prefect provides built-in scheduling features that set up cron-like schedules for the flow. You can also parameterize your flow, allowing a run of the same flow with different input values.
8+
You define Prefect workflows in Python. Each step is a “task, and tasks form a directed acyclic graph (DAG). Flows can branch and include conditional logic. Prefect also provides builtin scheduling and flow parameters so you can run the same flow with different inputs.
99

10-
This tutorial will explore how CrateDB and Prefect come together to streamline data ingestion, transformation, and loading (ETL) processes with a few lines of Python code.
10+
This tutorial shows how to combine CrateDB and Prefect to streamline ETL with a few lines of Python.
1111

1212
## Prerequisites
1313

1414
Before we begin, ensure you have the following prerequisites installed on your system:
1515

1616
* **Python 3.x**: Prefect is a Python-based workflow management system, so you'll need Python installed on your machine.
1717
* **CrateDB**: To work with CrateDB, create a new cluster in [CrateDB Cloud](https://console.cratedb.cloud/). You can choose the CRFEE tier cluster that does not require any payment information.
18-
* **Prefect**: Install Prefect using pip by running the following command in your terminal or command prompt: `pip install -U prefect`
18+
* **Prefect**: Install Prefect using pip: `pip install -U prefect`
19+
* **SQLAlchemy + CrateDB dialect**: `pip install -U sqlalchemy sqlalchemy-cratedb`
1920

20-
## Getting started with Perfect
21+
## Getting started with Prefect
2122

22-
1. To get started with Prefect, you need to connect to Prefect’s API: the easiest way is to sign up for a free forever Cloud account at [https://app.prefect.cloud/](https://app.prefect.cloud/?deviceId=cfc80edd-a234-4911-a25e-ff0d6bb2c32a&deviceId=cfc80edd-a234-4911-a25e-ff0d6bb2c32a).
23+
1. To get started with Prefect, connect to Prefect’s API by signing up for a free Cloud account at [https://app.prefect.cloud/](https://app.prefect.cloud/).
2324
2. Once you create a new account, create a new workspace with a name of your choice.
2425
3. Run `prefect cloud login` to [log into Prefect Cloud](https://docs.prefect.io/cloud/users/api-keys) from the local environment.
2526

2627
Now you are ready to build your first data workflows!
2728

2829
## Run your first ETL workflow with CrateDB
29-
We'll dive into the basics of Prefect by creating a simple workflow with tasks that fetch data from a source, perform basic transformations, and load it into CrateDB. For this example, we will use [the yellow taxi trip data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz), which includes pickup time, geo-coordinates, number of passengers, and several other variables. The goal is to create a workflow that does a basic transformation on this data and inserts it into a CrateDB table named `trip_data`:
30+
31+
This section walks you through a simple workflow that fetches data, applies a basic transformation, and loads it into CrateDB. It uses the [yellow taxi trip dataset](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz), which includes pickup time, geo‑coordinates, passenger count, and other fields. The goal is to write transformed data to a CrateDB table named `trip_data`:
3032

3133
```python
3234
import pandas as pd
3335
from prefect import flow, task
34-
from crate import client
36+
from sqlalchemy import create_engine
3537

3638
CSV_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"
37-
URI = "crate://admin:password@host:5432"
39+
URI = "crate://admin:password@host:4200"
40+
engine = create_engine(URI)
3841

3942
@task()
4043
def extract_data(url: str):
@@ -48,7 +51,7 @@ def transform_data(df):
4851

4952
@task()
5053
def load_data(table_name, df):
51-
df.to_sql(table_name,URI,if_exists="replace",index=False)
54+
df.to_sql(table_name, engine, if_exists="replace", index=False)
5255

5356
@flow(name="ETL workflow", log_prints=True)
5457
def main_flow():
@@ -60,25 +63,27 @@ if __name__ == '__main__':
6063
main_flow()
6164
```
6265

63-
1. We start defining the flow by importing the necessary modules, including `prefect` for working with workflows, `pandas` for data manipulation, and `crate` for interacting with CrateDB.
64-
2. Next, we specify the connection parameters for CrateDB and the URL for a file containing the dataset. You should modify these values according to your CrateDB Cloud setup.
65-
3. We define three tasks using the `@task` decorator: `extract_data(url)`, `transform_data(data)`, and `load_data(table_name, transformed_data)`. Each task represents a unit of work in the workflow:
66-
1. The `read_data()` task loads the data from the CSV file to a `pandas` data frame.
67-
2. The `transform_data(data)` task takes the data frame and returns the data frame with entries where the `passenger_count` value is different than 0.
68-
3. The `load_data(transformed_data)` task connects to the CrateDB and loads data into the `trip_data` table.
69-
4. We define the workflow, name it “ETL workflow“, and specify the sequence of tasks: `extract_data()`, `transform_data(data)`, and `load_data(table_name, transformed_data)`.
70-
5. Finally, we execute the flow by calling `main_flow()`. This runs the workflow, and each task is executed in the order defined.
66+
1. Start by importing the necessary modules: `prefect` for workflows, `pandas` for data manipulation, and SQLAlchemy for the database connection.
67+
2. Specify the CrateDB connection URI and the dataset URL. Modify these values for your CrateDB Cloud setup.
68+
3. Define three tasks with the `@task` decorator—`extract_data(url)`, `transform_data(df)`, and `load_data(table_name, df)`:
69+
70+
1. `extract_data()` reads the CSV into a pandas DataFrame.
71+
2. `transform_data(df)` filters out rows where `passenger_count` is 0.
72+
3. `load_data(table_name, df)` writes the data to the `trip_data` table in CrateDB.
73+
74+
4. Define the flow, name it “ETL workflow,” and order the tasks: `extract_data()`, `transform_data()`, then `load_data()`.
75+
5. Execute the flow by calling `main_flow()`. Prefect runs each task in order.
7176

72-
When you run this Python script, the workflow will read the trip data from a `csv` file, transform it, and load it into the CrateDB table. You can see the state of the flow run in the *Flows Runs* tab in Prefect UI:
77+
When you run the script, the workflow reads the trip data from a CSV file, transforms it, and loads it into CrateDB. You can see the state of the run in the *Flow Runs* tab in the Prefect UI:
7378

7479
![Screenshot 2023-08-01 at 09.50.02|690x328](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/ecd02359cf23b5048e084faa785c7ad795bb5e57.png)
7580

76-
You can enrich the ETL pipeline with many advanced features available in Prefect such as parameterization, error handling, retries, and more. Finally, after the successful execution of the workflow, you can query the data in the CrateDB:
81+
You can enrich the pipeline with Prefect features such as parameters, error handling, and retries. After a successful run, query the data in CrateDB:
7782

7883
![Screenshot 2023-08-01 at 09.49.20|690x340](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/5582fcd2a677f78f8f7c6a1aa4b8e14f25dda2d1.png)
7984

8085
## Wrap up
8186

82-
Throughout this tutorial, you made a simple Prefect workflow, defined tasks, and orchestrated data transformations and loading into CrateDB. Both tools offer extensive feature sets that you can use to optimize and scale your data workflows further.
87+
In this tutorial, you created a simple Prefect workflow, defined tasks, and orchestrated data transformations and loading into CrateDB. Both tools offer extensive features that help you optimize and scale your data workflows.
8388

8489
As you continue exploring, don’t forget to check out the {ref}`reference documentation <crate-reference:index>`. If you have further questions or would like to learn more about updates, features, and integrations, join the [CrateDB community](https://community.cratedb.com/). Happy data wrangling!

0 commit comments

Comments
 (0)