Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dlt plugin #820

Merged
merged 7 commits into from
Apr 17, 2024
Merged

dlt plugin #820

merged 7 commits into from
Apr 17, 2024

Conversation

zilto
Copy link
Collaborator

@zilto zilto commented Apr 12, 2024

The dlt library provides many Sources and Destinations to extract (E) and load (L) data.

With Hamilton being a transform (T) tool the goal is to build constructs to:

  1. ET: Create Hamilton nodes from dlt Sources, providing a structured way for them to be upstream dependencies
  2. TL: Store/materialize Hamilton node results into dlt Destinations.
    These 2 features alone allow for the full flexibility of (ETL, ELTL, etc.)

For a good user experience, and more easily integrating "Hamilton within dlt", it is valuable to have:
3. ETL: essentially a combination of 1. and 2.
4. ELT: no special integration required here, (dlt does EL, then Hamilton does T), but the transition could be streamlined. We can showcase that via the pipeline.sql_client(). This is where dlt + Ibis integration would shine.

Changes

  • Added DltResourceLoader (1.)
  • Added DltDestinationSaver materializer (2.)
  • Scenario (3.) is covered by adding both Loaders and Savers.

How I tested this

  • DataSaver is tested for the 3 basic types accepted by dlt: iterables of records, pandas DataFrame, and pyarrow Table
  • DataLoader tested for a mock Resource
  • Given dlt pipelines are stateful, there's a possibility errors appear when using many Loader/Saver in a single project.

Notes

  • there's no easy mapping for dlt Source to a Hamilton construct. The Source has many Resource that would map to Hamilton nodes. Then Resource is closer to a DataLoader, but the Source construct remains needed because it is responsible for authentication and more.

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

hamilton/plugins/dlt_extensions.py Outdated Show resolved Hide resolved
hamilton/plugins/dlt_extensions.py Outdated Show resolved Hide resolved
hamilton/plugins/dlt_extensions.py Outdated Show resolved Hide resolved
hamilton/plugins/dlt_extensions.py Outdated Show resolved Hide resolved
hamilton/plugins/h_dlt.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Most curious about the parquet thing -- wonder if we can use duckdb:memory...

import pyarrow as pa

DATAFRAME_TYPES.extend([pa.Table, pa.RecordBatch])
except ModuleNotFoundError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this works, the problem is that the error message will be confusing... For now, however, I think we can just document it well.

Copy link
Collaborator Author

@zilto zilto Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we want to throw an error? You mean if a node has annotations for pa.Table, but pyarrow is not installed and therefore finds no registered materializers?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly. Its just a confusing case, but let's not worry about it (maybe add to docs).

hamilton/plugins/dlt_extensions.py Outdated Show resolved Hide resolved

# TODO use pyarrow directly to support different dataframe libraries
# ref: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html#pyarrow.parquet.ParquetDataset
df = pd.concat([pd.read_parquet(f) for f in partition_file_paths], ignore_index=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet locally versus duckdb? Will the next pipeline.drop() erase it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the dlt implementation for in-memory duckdb is currently bugged (they're fixing it), but I found a work around.

Using duckdb in-memory doesn't really provide additional value and only adds a dependency:

source -> extract (parquet) -> normalize (parquet) -> load (parquet) -> duckdb (memory) -> query db (memory) -> to pandas (memory)

At the end of the process, memory (duckdb, pandas) is freed and dlt pipeline is cleaned (extract, normalize, load)

My current implementation skips the duckdb steps

source -> extract (parquet) -> normalize (parquet) -> read parquet partitions (memory) -> pandas (memory)

Copy link
Collaborator Author

@zilto zilto Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The related design decision is "how can I selectively reload dlt Sources". For example, only load Slack messages once then run Hamilton dataflows many times over that same data.

This would be an ELT use case (dlt -> Hamilton) where you want to refresh each independently. It probably makes more sense to have run_dlt.py and run_hamilton.py

The current Source materializer (from_) with everything in-memory aims to enable ET and the user is responsible for loading the data, potentially with the Destination (to) materializer which does TL

Copy link
Collaborator

@skrawcz skrawcz Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note sure I'm following, but just to mention - we can just cache the result of this function if we don't want it to run more than once?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So yeah, the worry is that it fills up disk space. But I think we can switch this up as needed. Seems reasonable.

@zilto zilto marked this pull request as ready for review April 15, 2024 20:24
@zilto zilto changed the title [WIP] dlt plugin dlt plugin Apr 15, 2024
@zilto zilto added the i/o label Apr 16, 2024
Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's ship, see where it goes from here!

@zilto zilto merged commit d89b03e into main Apr 17, 2024
23 checks passed
@zilto zilto deleted the extension/dlt-materializer branch April 17, 2024 18:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants