From 17a62134a2bd6350272255955f0f24bf0e7ce5e9 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Tue, 26 Sep 2023 17:13:32 +0000 Subject: [PATCH 01/43] Merge branch 'add-kwargs-to-dbconn' into fix-asc-desc-merge-fix-join-of-aux-table From effbc5af69d1fefa998031bb2269415f69ad28d8 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Fri, 6 Oct 2023 12:14:40 +0000 Subject: [PATCH 02/43] add executor_config to DatatableBatchTransform --- datapipe/step/batch_generate.py | 10 +++++++--- datapipe/step/batch_transform.py | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py index 37383c18..9e57934c 100644 --- a/datapipe/step/batch_generate.py +++ b/datapipe/step/batch_generate.py @@ -29,6 +29,7 @@ def do_batch_generate( output_dts: List[DataTable], run_config: Optional[RunConfig] = None, kwargs: Optional[Dict] = None, + delete_stale_by_process_ts: bool = True ) -> None: """ Создание новой таблицы из результатов запуска `proc_func`. @@ -78,9 +79,10 @@ def do_batch_generate( for k, dt_k in enumerate(output_dts): dt_k.store_chunk(chunk_dfs[k], run_config=run_config) - with tracer.start_as_current_span("delete stale rows"): - for k, dt_k in enumerate(output_dts): - dt_k.delete_stale_by_process_ts(now, run_config=run_config) + if delete_stale_by_process_ts: + with tracer.start_as_current_span("delete stale rows"): + for k, dt_k in enumerate(output_dts): + dt_k.delete_stale_by_process_ts(now, run_config=run_config) @dataclass @@ -89,6 +91,7 @@ class BatchGenerate(PipelineStep): outputs: List[str] kwargs: Optional[Dict] = None labels: Optional[Labels] = None + delete_stale_by_process_ts: bool = True def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: return [ @@ -102,6 +105,7 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: output_dts=output_dts, run_config=run_config, kwargs=kwargs, + delete_stale_by_process_ts=self.delete_stale_by_process_ts ), ), input_dts=[], diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index cf1f69f1..3225e61f 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -854,6 +854,10 @@ class DatatableBatchTransform(PipelineStep): transform_keys: Optional[List[str]] = None kwargs: Optional[Dict] = None labels: Optional[Labels] = None + executor_config: Optional[ExecutorConfig] = None + filters: Optional[Union[LabelDict, Callable[[], LabelDict]]] = None + order_by: Optional[List[str]] = None + order: Literal["asc", "desc"] = "asc" def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: input_dts = [catalog.get_datatable(ds, name) for name in self.inputs] @@ -870,6 +874,10 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: transform_keys=self.transform_keys, chunk_size=self.chunk_size, labels=self.labels, + executor_config=self.executor_config, + filters=self.filters, + order_by=self.order_by, + order=self.order ) ] @@ -886,6 +894,10 @@ def __init__( transform_keys: Optional[List[str]] = None, chunk_size: int = 1000, labels: Optional[Labels] = None, + executor_config: Optional[ExecutorConfig] = None, + filters: Optional[Union[LabelDict, Callable[[], LabelDict]]] = None, + order_by: Optional[List[str]] = None, + order: Literal["asc", "desc"] = "asc", ) -> None: super().__init__( ds=ds, @@ -895,6 +907,10 @@ def __init__( transform_keys=transform_keys, chunk_size=chunk_size, labels=labels, + executor_config=executor_config, + filters=filters, + order_by=order_by, + order=order ) self.func = func From 295eb899c0c1652aeb410ffe6d197ee22dcc7f33 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 23 Oct 2023 13:01:18 +0000 Subject: [PATCH 03/43] refactor filters as list of LabelDict --- datapipe/run_config.py | 4 +-- datapipe/sql_util.py | 13 +++++++-- datapipe/step/batch_transform.py | 47 +++++++++++++++++++++----------- 3 files changed, 43 insertions(+), 21 deletions(-) diff --git a/datapipe/run_config.py b/datapipe/run_config.py index 4ee9b327..45eddc53 100644 --- a/datapipe/run_config.py +++ b/datapipe/run_config.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import Any, Dict, Optional +from typing import Any, List, Dict, Optional LabelDict = Dict[str, Any] @@ -10,7 +10,7 @@ class RunConfig: # если не пуст, то во время запуска обрабатываются только те строки, # которые строго соответствуют фильтру # (в случае, если у таблицы есть идентификатор с совпадающим именем). - filters: LabelDict = field(default_factory=dict) + filters: List[LabelDict] = field(default_factory=list) labels: LabelDict = field(default_factory=dict) @classmethod diff --git a/datapipe/sql_util.py b/datapipe/sql_util.py index a326ef66..588e50a0 100644 --- a/datapipe/sql_util.py +++ b/datapipe/sql_util.py @@ -2,6 +2,7 @@ import pandas as pd from sqlalchemy import Column, Integer, String, Table, column, tuple_ +from sqlalchemy.sql.expression import or_ from datapipe.run_config import RunConfig from datapipe.types import IndexDF @@ -63,9 +64,15 @@ def sql_apply_runconfig_filter( run_config: Optional[RunConfig] = None, ) -> Any: if run_config is not None: - for k, v in run_config.filters.items(): - if k in primary_keys: - sql = sql.where(table.c[k] == v) + sql = sql.where( + or_( + *[ + table.c[k] == v + for filter in run_config.filters + for k, v in filter.items() if k in primary_keys + ] + ) + ) return sql diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 3225e61f..93afdd67 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -273,7 +273,7 @@ def __init__( chunk_size: int = 1000, labels: Optional[Labels] = None, executor_config: Optional[ExecutorConfig] = None, - filters: Optional[Union[LabelDict, Callable[[], LabelDict]]] = None, + filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None, order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", ) -> None: @@ -487,23 +487,35 @@ def _make_agg_of_agg(ctes, agg_col): return (self.transform_keys, sql) def _apply_filters_to_run_config( - self, run_config: Optional[RunConfig] = None + self, + ds: DataStore, + run_config: Optional[RunConfig] = None ) -> Optional[RunConfig]: if self.filters is None: return run_config else: - if isinstance(self.filters, dict): + filters: List[LabelDict] + if isinstance(self.filters, list) and all([isinstance(x, dict) for x in self.filters]): filters = self.filters elif isinstance(self.filters, Callable): # type: ignore - filters = self.filters() + filters_func = cast(Callable[..., List[LabelDict]], self.filters) + parameters = inspect.signature(filters_func).parameters + kwargs = { + **({"ds": ds} if "ds" in parameters else {}), + } + filters = filters_func(**kwargs) + + if isinstance(self.filters, str): + dt = ds.get_table(self.filters) + df = dt.get_data() + filters = df[dt.primary_keys].to_dict(orient="records") if run_config is None: return RunConfig(filters=filters) else: run_config = copy.deepcopy(run_config) filters = copy.deepcopy(filters) - filters.update(run_config.filters) - run_config.filters = filters + run_config.filters += filters return run_config def get_changed_idx_count( @@ -511,7 +523,7 @@ def get_changed_idx_count( ds: DataStore, run_config: Optional[RunConfig] = None, ) -> int: - run_config = self._apply_filters_to_run_config(run_config) + run_config = self._apply_filters_to_run_config(ds, run_config) _, sql = self._build_changed_idx_sql(ds, run_config=run_config) with ds.meta_dbconn.con.begin() as con: @@ -537,7 +549,7 @@ def get_full_process_ids( - idx_size - количество индексов требующих обработки - idx_df - датафрейм без колонок с данными, только индексная колонка """ - run_config = self._apply_filters_to_run_config(run_config) + run_config = self._apply_filters_to_run_config(ds, run_config) chunk_size = chunk_size or self.chunk_size with tracer.start_as_current_span("compute ids to process"): @@ -560,7 +572,10 @@ def get_full_process_ids( extra_filters: LabelDict if run_config is not None: extra_filters = { - k: v for k, v in run_config.filters.items() if k not in join_keys + k: v + for filter in run_config.filters + for k, v in filter.items() + if k not in join_keys } else: extra_filters = {} @@ -581,7 +596,7 @@ def get_change_list_process_ids( change_list: ChangeList, run_config: Optional[RunConfig] = None, ) -> Tuple[int, Iterable[IndexDF]]: - run_config = self._apply_filters_to_run_config(run_config) + run_config = self._apply_filters_to_run_config(ds, run_config) with tracer.start_as_current_span("compute ids to process"): changes = [pd.DataFrame(columns=self.transform_keys)] @@ -623,7 +638,7 @@ def store_batch_result( process_ts: float, run_config: Optional[RunConfig] = None, ) -> ChangeList: - run_config = self._apply_filters_to_run_config(run_config) + run_config = self._apply_filters_to_run_config(ds, run_config) changes = ChangeList() @@ -670,7 +685,7 @@ def store_batch_err( process_ts: float, run_config: Optional[RunConfig] = None, ) -> None: - run_config = self._apply_filters_to_run_config(run_config) + run_config = self._apply_filters_to_run_config(ds, run_config) logger.error(f"Process batch failed: {str(e)}") ds.event_logger.log_exception( @@ -855,7 +870,7 @@ class DatatableBatchTransform(PipelineStep): kwargs: Optional[Dict] = None labels: Optional[Labels] = None executor_config: Optional[ExecutorConfig] = None - filters: Optional[Union[LabelDict, Callable[[], LabelDict]]] = None + filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None order_by: Optional[List[str]] = None order: Literal["asc", "desc"] = "asc" @@ -895,7 +910,7 @@ def __init__( chunk_size: int = 1000, labels: Optional[Labels] = None, executor_config: Optional[ExecutorConfig] = None, - filters: Optional[Union[LabelDict, Callable[[], LabelDict]]] = None, + filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None, order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", ) -> None: @@ -941,7 +956,7 @@ class BatchTransform(PipelineStep): transform_keys: Optional[List[str]] = None labels: Optional[Labels] = None executor_config: Optional[ExecutorConfig] = None - filters: Optional[Union[LabelDict, Callable[[], LabelDict]]] = None + filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None order_by: Optional[List[str]] = None order: Literal["asc", "desc"] = "asc" @@ -981,7 +996,7 @@ def __init__( chunk_size: int = 1000, labels: Optional[Labels] = None, executor_config: Optional[ExecutorConfig] = None, - filters: Optional[Union[LabelDict, Callable[[], LabelDict]]] = None, + filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None, order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", ) -> None: From 43ad6666284abbbe34d1f112854706ab9b1d597a Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 23 Oct 2023 13:10:30 +0000 Subject: [PATCH 04/43] added Filters to str as some table support --- datapipe/run_config.py | 3 +-- datapipe/step/batch_transform.py | 16 +++++++++------- datapipe/types.py | 6 +++++- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/datapipe/run_config.py b/datapipe/run_config.py index 45eddc53..ee16063b 100644 --- a/datapipe/run_config.py +++ b/datapipe/run_config.py @@ -1,7 +1,6 @@ from dataclasses import dataclass, field from typing import Any, List, Dict, Optional - -LabelDict = Dict[str, Any] +from datapipe.types import LabelDict @dataclass diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 93afdd67..707b1ae4 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -45,7 +45,7 @@ from datapipe.compute import Catalog, ComputeStep, PipelineStep from datapipe.datatable import DataStore, DataTable, MetaTable from datapipe.executor import Executor, ExecutorConfig, SingleThreadExecutor -from datapipe.run_config import LabelDict, RunConfig +from datapipe.run_config import RunConfig from datapipe.sql_util import ( sql_apply_filters_idx_to_subquery, sql_apply_runconfig_filter, @@ -53,6 +53,8 @@ from datapipe.store.database import DBConn from datapipe.types import ( ChangeList, + LabelDict, + Filters, DataDF, DataSchema, IndexDF, @@ -273,7 +275,7 @@ def __init__( chunk_size: int = 1000, labels: Optional[Labels] = None, executor_config: Optional[ExecutorConfig] = None, - filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None, + filters: Optional[Filters] = None, order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", ) -> None: @@ -508,7 +510,7 @@ def _apply_filters_to_run_config( if isinstance(self.filters, str): dt = ds.get_table(self.filters) df = dt.get_data() - filters = df[dt.primary_keys].to_dict(orient="records") + filters = cast(List[LabelDict], df[dt.primary_keys].to_dict(orient="records")) if run_config is None: return RunConfig(filters=filters) @@ -870,7 +872,7 @@ class DatatableBatchTransform(PipelineStep): kwargs: Optional[Dict] = None labels: Optional[Labels] = None executor_config: Optional[ExecutorConfig] = None - filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None + filters: Optional[Filters] = None order_by: Optional[List[str]] = None order: Literal["asc", "desc"] = "asc" @@ -910,7 +912,7 @@ def __init__( chunk_size: int = 1000, labels: Optional[Labels] = None, executor_config: Optional[ExecutorConfig] = None, - filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None, + filters: Optional[Filters] = None, order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", ) -> None: @@ -956,7 +958,7 @@ class BatchTransform(PipelineStep): transform_keys: Optional[List[str]] = None labels: Optional[Labels] = None executor_config: Optional[ExecutorConfig] = None - filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None + filters: Optional[Filters] = None order_by: Optional[List[str]] = None order: Literal["asc", "desc"] = "asc" @@ -996,7 +998,7 @@ def __init__( chunk_size: int = 1000, labels: Optional[Labels] = None, executor_config: Optional[ExecutorConfig] = None, - filters: Optional[Union[List[LabelDict], Callable[..., List[LabelDict]]]] = None, + filters: Optional[Filters] = None, order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", ) -> None: diff --git a/datapipe/types.py b/datapipe/types.py index 6fcaad9c..5371b6fa 100644 --- a/datapipe/types.py +++ b/datapipe/types.py @@ -2,7 +2,7 @@ import itertools from dataclasses import dataclass, field -from typing import Callable, Dict, List, NewType, Set, Tuple, TypeVar, Union, cast +from typing import Any, Callable, Dict, List, NewType, Set, Tuple, TypeVar, Union, cast import pandas as pd from sqlalchemy import Column @@ -26,6 +26,10 @@ TransformResult = Union[DataDF, List[DataDF], Tuple[DataDF, ...]] +LabelDict = Dict[str, Any] + +Filters = Union[str, List[LabelDict], Callable[..., List[LabelDict]]] + @dataclass class ChangeList: From 4064147df62b5971a0dbe5f7ea2c764463361ce1 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 23 Oct 2023 14:22:12 +0000 Subject: [PATCH 05/43] added filters to cli --- datapipe/cli.py | 5 ++++- datapipe/step/batch_transform.py | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/datapipe/cli.py b/datapipe/cli.py index 15c2c6b1..0439a281 100644 --- a/datapipe/cli.py +++ b/datapipe/cli.py @@ -5,6 +5,7 @@ from typing import Dict, List, Optional, cast import click +from datapipe.run_config import RunConfig import pandas as pd import rich from opentelemetry import trace @@ -309,6 +310,7 @@ def step( steps = filter_steps_by_labels_and_name(app, labels=labels_list, name_prefix=name) ctx.obj["steps"] = steps + ctx.obj["labels"] = labels_list def to_human_repr(step: ComputeStep, extra_args: Optional[Dict] = None) -> str: @@ -373,6 +375,7 @@ def list(ctx: click.Context, status: bool) -> None: # noqa def run(ctx: click.Context, loop: bool, loop_delay: int) -> None: # noqa app: DatapipeApp = ctx.obj["pipeline"] steps_to_run: List[ComputeStep] = ctx.obj["steps"] + run_config = RunConfig(labels={k: v for k, v in ctx.obj["labels"]}) executor: Executor = ctx.obj["executor"] @@ -381,7 +384,7 @@ def run(ctx: click.Context, loop: bool, loop_delay: int) -> None: # noqa while True: if len(steps_to_run) > 0: - run_steps(app.ds, steps_to_run, executor=executor) + run_steps(app.ds, steps_to_run, run_config=run_config, executor=executor) if not loop: break diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 707b1ae4..a5a61ec6 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -504,6 +504,7 @@ def _apply_filters_to_run_config( parameters = inspect.signature(filters_func).parameters kwargs = { **({"ds": ds} if "ds" in parameters else {}), + **({"run_config": run_config} if "run_config" in parameters else {}) } filters = filters_func(**kwargs) From 7daaae0c516c337b154d589f500de99a79616e8a Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 23 Oct 2023 15:08:44 +0000 Subject: [PATCH 06/43] fix filter in transform tables --- datapipe/step/batch_transform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index a5a61ec6..fb1419ba 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -441,6 +441,7 @@ def _make_agg_of_agg(ctes, agg_col): ) out = sql_apply_filters_idx_to_subquery(out, self.transform_keys, filters_idx) + out = sql_apply_runconfig_filter(out, tr_tbl, self.transform_keys, run_config) out = out.cte(name="transform") From fdde3245779b22d3f44682a5100b64448eb55105 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 23 Oct 2023 16:53:40 +0000 Subject: [PATCH 07/43] fix or_ -> and_ --- datapipe/sql_util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datapipe/sql_util.py b/datapipe/sql_util.py index 588e50a0..f1299b47 100644 --- a/datapipe/sql_util.py +++ b/datapipe/sql_util.py @@ -2,7 +2,7 @@ import pandas as pd from sqlalchemy import Column, Integer, String, Table, column, tuple_ -from sqlalchemy.sql.expression import or_ +from sqlalchemy.sql.expression import and_ from datapipe.run_config import RunConfig from datapipe.types import IndexDF @@ -65,7 +65,7 @@ def sql_apply_runconfig_filter( ) -> Any: if run_config is not None: sql = sql.where( - or_( + and_( *[ table.c[k] == v for filter in run_config.filters From abf37250b18cde3e34da82acf862a7608bc28170 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 23 Oct 2023 16:55:52 +0000 Subject: [PATCH 08/43] fix or_ -> and_ --- datapipe/sql_util.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/datapipe/sql_util.py b/datapipe/sql_util.py index f1299b47..078a3ee9 100644 --- a/datapipe/sql_util.py +++ b/datapipe/sql_util.py @@ -2,7 +2,7 @@ import pandas as pd from sqlalchemy import Column, Integer, String, Table, column, tuple_ -from sqlalchemy.sql.expression import and_ +from sqlalchemy.sql.expression import and_, or_ from datapipe.run_config import RunConfig from datapipe.types import IndexDF @@ -65,11 +65,15 @@ def sql_apply_runconfig_filter( ) -> Any: if run_config is not None: sql = sql.where( - and_( + or_( *[ - table.c[k] == v + and_( + *[ + table.c[k] == v + for k, v in filter.items() if k in primary_keys + ] + ) for filter in run_config.filters - for k, v in filter.items() if k in primary_keys ] ) ) From 0df0f2260aa417f3f3a00c99057589444c1bb755 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 13 Nov 2023 15:33:30 +0000 Subject: [PATCH 09/43] * --- datapipe/step/batch_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index fb1419ba..be875c0d 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -933,7 +933,7 @@ def __init__( ) self.func = func - self.kwargs = kwargs + self.kwargs = kwargs or {} def process_batch_dts( self, From e4db5eab66e0bea194c4bf2d87c498b8a60e928d Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Thu, 1 Feb 2024 16:05:01 +0000 Subject: [PATCH 10/43] fix delete_stale --- datapipe/step/batch_generate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py index 6b919f80..2e83b75f 100644 --- a/datapipe/step/batch_generate.py +++ b/datapipe/step/batch_generate.py @@ -105,7 +105,7 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: output_dts=output_dts, run_config=run_config, kwargs=kwargs, - delete_stale_by_process_ts=self.delete_stale_by_process_ts + delete_stale=self.delete_stale ), ), input_dts=[], From 9997d04f9cc560b720accd44e79fec1c75c00448 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 1 Apr 2024 11:19:39 +0000 Subject: [PATCH 11/43] fix suffix problem --- datapipe/store/filedir.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/store/filedir.py b/datapipe/store/filedir.py index 6d36ac20..23ce3780 100644 --- a/datapipe/store/filedir.py +++ b/datapipe/store/filedir.py @@ -494,7 +494,7 @@ def read_rows_meta_pseudo_df( filepaths = [] for f in files: - m = re.match(self.filename_match_first_suffix, f.path) + m = re.match(self.filename_match_first_suffix, f"{self.protocol_str}{f.path}") if m is None: continue From 55849d06a94651cf8bc1e4d443d6896e7e574c33 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 15 Jul 2024 16:08:58 +0000 Subject: [PATCH 12/43] fix bug when reading multiply suffixes --- datapipe/store/filedir.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datapipe/store/filedir.py b/datapipe/store/filedir.py index 23ce3780..b4c1fa83 100644 --- a/datapipe/store/filedir.py +++ b/datapipe/store/filedir.py @@ -241,7 +241,7 @@ def __init__( self.attrnames = _pattern_to_attrnames(filename_pattern) self.filename_glob = [_pattern_to_glob(pat) for pat in self.filename_patterns] self.filename_match = _pattern_to_match(filename_pattern_for_match) - self.filename_match_first_suffix = _pattern_to_match(self.filename_patterns[0]) + self.filename_match_suffixes = [_pattern_to_match(pattern) for pattern in self.filename_patterns] # Any * and ** pattern check if "*" in path: @@ -494,8 +494,10 @@ def read_rows_meta_pseudo_df( filepaths = [] for f in files: - m = re.match(self.filename_match_first_suffix, f"{self.protocol_str}{f.path}") - + for filemath_match_suffix in self.filename_match_suffixes: + m = re.match(filemath_match_suffix, f"{self.protocol_str}{f.path}") + if m is not None: + break if m is None: continue From 208ddf8b84843d2a9b6253e46b76bd60b3fa7531 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 15 Jul 2024 16:15:25 +0000 Subject: [PATCH 13/43] fix2 --- datapipe/store/filedir.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datapipe/store/filedir.py b/datapipe/store/filedir.py index b4c1fa83..7efef646 100644 --- a/datapipe/store/filedir.py +++ b/datapipe/store/filedir.py @@ -492,6 +492,7 @@ def read_rows_meta_pseudo_df( ids: Dict[str, List[str]] = {attrname: [] for attrname in self.attrnames} ukeys = [] filepaths = [] + looked_keys = set() for f in files: for filemath_match_suffix in self.filename_match_suffixes: @@ -501,8 +502,12 @@ def read_rows_meta_pseudo_df( if m is None: continue - for attrname in self.attrnames: - ids[attrname].append(m.group(attrname)) + keys_values = tuple(m.group(attrname) for attrname in self.attrnames) + if keys_values in looked_keys: + continue + + for attrname, key_value in zip(self.attrnames, keys_values): + ids[attrname].append(key_value) ukeys.append(files.fs.ukey(f.path)) filepaths.append(f"{self.protocol_str}{f.path}") From b5790617b824ed9c9718d3e20ca405fcf124d873 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 12 Aug 2024 13:43:48 +0000 Subject: [PATCH 14/43] * --- datapipe/step/batch_generate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py index 8a19ea59..74b3adad 100644 --- a/datapipe/step/batch_generate.py +++ b/datapipe/step/batch_generate.py @@ -106,7 +106,6 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: run_config=run_config, delete_stale=self.delete_stale, kwargs=kwargs, - delete_stale=self.delete_stale ), ), input_dts=[], From 3dc9de4311abaec2802f114560da492d4b31545c Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Wed, 14 Aug 2024 11:40:37 +0000 Subject: [PATCH 15/43] WIPg --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 99f8b4e1..ef422de1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.14.0-alpha.2" +version = "0.14.0" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From 526b1abfc38eb2ffbdd48f89887a49e8af52b0b9 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Wed, 14 Aug 2024 12:13:29 +0000 Subject: [PATCH 16/43] fix typing --- datapipe/step/datatable_transform.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datapipe/step/datatable_transform.py b/datapipe/step/datatable_transform.py index d40400a8..e8dd8091 100644 --- a/datapipe/step/datatable_transform.py +++ b/datapipe/step/datatable_transform.py @@ -23,8 +23,7 @@ def __call__( input_dts: List[DataTable], output_dts: List[DataTable], run_config: Optional[RunConfig], - # Возможно, лучше передавать как переменную, а не ** - **kwargs, + kwargs: Optional[Dict[str, Any]] = None, ) -> None: ... From 50c8f9f654ab5164b39b3649feaffbdaa8d417d1 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Fri, 16 Aug 2024 15:10:49 +0000 Subject: [PATCH 17/43] mypy fixs + add IndexDF support --- datapipe/step/batch_transform.py | 11 +++++++---- datapipe/store/filedir.py | 4 ++-- datapipe/types.py | 3 +-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 7801c423..a04d2f7a 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -545,15 +545,18 @@ def _apply_filters_to_run_config( if isinstance(self.filters, list) and all([isinstance(x, dict) for x in self.filters]): filters = self.filters elif isinstance(self.filters, Callable): # type: ignore - filters_func = cast(Callable[..., List[LabelDict]], self.filters) + filters_func = cast(Callable[..., Union[List[LabelDict], IndexDF]], self.filters) parameters = inspect.signature(filters_func).parameters kwargs = { **({"ds": ds} if "ds" in parameters else {}), **({"run_config": run_config} if "run_config" in parameters else {}) } - filters = filters_func(**kwargs) - - if isinstance(self.filters, str): + filters_res = filters_func(**kwargs) + if isinstance(filters_res, pd.DataFrame): + filters = cast(List[LabelDict], filters_res.to_dict(orient="records")) + else: + filters = filters_res + elif isinstance(self.filters, str): dt = ds.get_table(self.filters) df = dt.get_data() filters = cast(List[LabelDict], df[dt.primary_keys].to_dict(orient="records")) diff --git a/datapipe/store/filedir.py b/datapipe/store/filedir.py index f43413a2..e1bec6b6 100644 --- a/datapipe/store/filedir.py +++ b/datapipe/store/filedir.py @@ -5,7 +5,7 @@ import re from abc import ABC from pathlib import Path -from typing import IO, Any, Dict, Iterator, List, Optional, Union, cast +from typing import IO, Any, Dict, Iterator, List, Optional, Union, cast, Set import fsspec import numpy as np @@ -492,7 +492,7 @@ def read_rows_meta_pseudo_df( ids: Dict[str, List[str]] = {attrname: [] for attrname in self.attrnames} ukeys = [] filepaths = [] - looked_keys = set() + looked_keys: Set[Any] = set() for f in files: for filemath_match_suffix in self.filename_match_suffixes: diff --git a/datapipe/types.py b/datapipe/types.py index 53f4c589..b3b75b8d 100644 --- a/datapipe/types.py +++ b/datapipe/types.py @@ -44,8 +44,7 @@ TransformResult = Union[DataDF, List[DataDF], Tuple[DataDF, ...]] LabelDict = Dict[str, Any] - -Filters = Union[str, List[LabelDict], Callable[..., List[LabelDict]]] +Filters = Union[str, IndexDF, List[LabelDict], Callable[..., List[LabelDict]], Callable[..., IndexDF]] try: from sqlalchemy.orm import DeclarativeBase From 0008bf9d8ece03b746040c4acdee666ff5e50610 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Fri, 16 Aug 2024 17:08:31 +0000 Subject: [PATCH 18/43] add tests, part 1 --- datapipe/step/batch_transform.py | 31 +++++---- datapipe/types.py | 4 +- tests/test_complex_pipeline.py | 91 +++++++++++++++++++++++- tests/test_core_steps2.py | 115 ++++++++++++++++++++++++++++++- 4 files changed, 223 insertions(+), 18 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index a04d2f7a..5c728d92 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -542,7 +542,15 @@ def _apply_filters_to_run_config( return run_config else: filters: List[LabelDict] - if isinstance(self.filters, list) and all([isinstance(x, dict) for x in self.filters]): + if isinstance(self.filters, str): + dt = ds.get_table(self.filters) + df = dt.get_data() + filters = cast(List[LabelDict], df[dt.primary_keys].to_dict(orient="records")) + elif isinstance(self.filters, DataTable): + filters = cast(List[LabelDict], self.filters.get_data().to_dict(orient="records")) + elif isinstance(self.filters, pd.DataFrame): + filters = cast(List[LabelDict], self.filters.to_dict(orient="records")) + elif isinstance(self.filters, list) and all([isinstance(x, dict) for x in self.filters]): filters = self.filters elif isinstance(self.filters, Callable): # type: ignore filters_func = cast(Callable[..., Union[List[LabelDict], IndexDF]], self.filters) @@ -556,10 +564,6 @@ def _apply_filters_to_run_config( filters = cast(List[LabelDict], filters_res.to_dict(orient="records")) else: filters = filters_res - elif isinstance(self.filters, str): - dt = ds.get_table(self.filters) - df = dt.get_data() - filters = cast(List[LabelDict], df[dt.primary_keys].to_dict(orient="records")) if run_config is None: return RunConfig(filters=filters) @@ -627,24 +631,25 @@ def get_full_process_ids( ) # Список ключей из фильтров, которые нужно добавить в результат - extra_filters: LabelDict + extra_filters: Optional[List[str, Dict]] = None if run_config is not None: - extra_filters = { + extra_filters = [{ k: v - for filter in run_config.filters for k, v in filter.items() if k not in join_keys - } - else: - extra_filters = {} + } for filter in run_config.filters] def alter_res_df(): with ds.meta_dbconn.con.begin() as con: for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): df = df[self.transform_keys] - for k, v in extra_filters.items(): - df[k] = v + if extra_filters is not None: + df__extra_filters = pd.DataFrame(extra_filters) + if set(df__extra_filters.columns).intersection(df.columns): + df = pd.merge(df, df__extra_filters) + else: + df = pd.merge(df, df__extra_filters, how="cross") yield df diff --git a/datapipe/types.py b/datapipe/types.py index b3b75b8d..b73fa2f4 100644 --- a/datapipe/types.py +++ b/datapipe/types.py @@ -9,7 +9,6 @@ Dict, List, NewType, - Optional, Set, Tuple, Type, @@ -17,7 +16,6 @@ Union, cast, ) - import pandas as pd from sqlalchemy import Column @@ -44,7 +42,7 @@ TransformResult = Union[DataDF, List[DataDF], Tuple[DataDF, ...]] LabelDict = Dict[str, Any] -Filters = Union[str, IndexDF, List[LabelDict], Callable[..., List[LabelDict]], Callable[..., IndexDF]] +Filters = Union[str, "DataTable", IndexDF, List[LabelDict], Callable[..., List[LabelDict]], Callable[..., IndexDF]] try: from sqlalchemy.orm import DeclarativeBase diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index e0ea30ab..8cffc548 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -4,11 +4,12 @@ from datapipe.compute import Catalog, Pipeline, Table, build_compute, run_steps from datapipe.datatable import DataStore +from datapipe.step.batch_generate import BatchGenerate from datapipe.step.batch_transform import BatchTransform from datapipe.store.database import TableStoreDB from datapipe.types import IndexDF -from .util import assert_datatable_equal +from .util import assert_datatable_equal, assert_df_equal TEST__ITEM = pd.DataFrame( { @@ -287,3 +288,91 @@ def train( assert len( ds.get_table("pipeline__is_trained_on__frozen_dataset").get_data() ) == len(TEST__FROZEN_DATASET) * len(TEST__TRAIN_CONFIG) + + +def test_complex_transform_with_filters(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + catalog = Catalog({ + "tbl_image": Table( + store=TableStoreDB( + dbconn, + "tbl_image", + [ + Column("image_id", Integer, primary_key=True), + ], + True + ) + ), + "tbl_prediction": Table( + store=TableStoreDB( + dbconn, + "tbl_prediction", + [ + Column("image_id", Integer, primary_key=True), + Column("model_id", Integer, primary_key=True), + ], + True + ) + ), + "tbl_output": Table( + store=TableStoreDB( + dbconn, + "tbl_output", + [ + Column("model_id", Integer, primary_key=True), + Column("count", Integer), + ], + True + ) + ) + }) + + def gen_tbl(df): + yield df + + test_df__image = pd.DataFrame( + {"image_id": [0, 1, 2, 3]} + ) + test_df__prediction = pd.DataFrame({ + "image_id": [0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3], + "model_id": [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1] + }) + + def count_func(df__image: pd.DataFrame, df__prediction: pd.DataFrame): + df__image = pd.merge(df__image, df__prediction, on=["image_id"]) + print(f"{df__image=}") + print(f"{df__prediction=}") + df__output = df__image.groupby("model_id").agg(len).reset_index().rename(columns={"image_id": "count"}) + print(f"{df__output=}") + return df__output + + pipeline = Pipeline( + [ + BatchGenerate( + func=gen_tbl, + outputs=["tbl_image"], + kwargs=dict(df=test_df__image), + ), + BatchGenerate( + func=gen_tbl, + outputs=["tbl_prediction"], + kwargs=dict(df=test_df__prediction), + ), + BatchTransform( + func=count_func, + inputs=["tbl_image", "tbl_prediction"], + outputs=["tbl_output"], + transform_keys=["model_id"], + chunk_size=6, + # filters=[{"image_id": 0}, {"image_id": 1}, {"image_id": 2}] + ), + ] + ) + steps = build_compute(ds, catalog, pipeline) + run_steps(ds, steps) + print(ds.get_table("tbl_output").get_data()) + # assert_df_equal( + # ds.get_table("tbl_output").get_data(), + # count_func(test_df__image, test_df__prediction), + # index_cols=["model_id"] + # ) diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index d2bdcaca..40106462 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -4,6 +4,7 @@ # import pytest import time +from typing import Optional, cast import pandas as pd from sqlalchemy import Column, String @@ -14,7 +15,7 @@ from datapipe.step.batch_generate import do_batch_generate from datapipe.step.batch_transform import BatchTransformStep from datapipe.store.database import MetaKey, TableStoreDB -from datapipe.types import ChangeList, IndexDF +from datapipe.types import ChangeList, Filters, IndexDF from .util import assert_datatable_equal, assert_df_equal @@ -387,3 +388,115 @@ def update_df(products: pd.DataFrame, items: pd.DataFrame): items2_df = merged_df[["item_id", "pipeline_id", "product_id", "a"]] assert_df_equal(items2.get_data(), items2_df, index_cols=["item_id", "pipeline_id"]) + + +PRODUCTS_DF = pd.DataFrame( + { + "product_id": list(range(2)), + "pipeline_id": list(range(2)), + "b": range(10, 12), + } +) + +ITEMS_DF = pd.DataFrame( + { + "item_id": list(range(5)) * 2, + "pipeline_id": list(range(2)) * 5, + "product_id": list(range(2)) * 5, + "a": range(10), + } +) + + +def batch_transform_with_filters(dbconn, filters: Filters, ds: Optional[DataStore] = None): + if ds is None: + ds = DataStore(dbconn, create_meta_table=True) + + products = ds.create_table( + "products", + table_store=TableStoreDB(dbconn, "products_data", PRODUCTS_SCHEMA, True), + ) + + items = ds.create_table( + "items", table_store=TableStoreDB(dbconn, "items_data", ITEMS_SCHEMA, True) + ) + + items2 = ds.create_table( + "items2", table_store=TableStoreDB(dbconn, "items2_data", ITEMS_SCHEMA, True) + ) + + products.store_chunk(PRODUCTS_DF, now=0) + items.store_chunk(ITEMS_DF, now=0) + + def update_df(products: pd.DataFrame, items: pd.DataFrame): + merged_df = pd.merge(items, products, on=["product_id", "pipeline_id"]) + merged_df["a"] = merged_df.apply(lambda x: x["a"] + x["b"], axis=1) + + return merged_df[["item_id", "pipeline_id", "product_id", "a"]] + + step = BatchTransformStep( + ds=ds, + name="test", + func=update_df, + input_dts=[products, items], + output_dts=[items2], + filters=filters + ) + + step.run_full(ds) + + merged_df = pd.merge(ITEMS_DF, PRODUCTS_DF, on=["product_id", "pipeline_id"]) + merged_df["a"] = merged_df.apply(lambda x: x["a"] + x["b"], axis=1) + + items2_df = merged_df[["item_id", "pipeline_id", "product_id", "a"]] + items2_df = items2_df[items2_df["item_id"].isin([0, 1, 2])] + + assert_df_equal(items2.get_data(), items2_df, index_cols=["item_id", "pipeline_id"]) + + +def test_batch_transform_with_filters_as_str(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + filters_data = pd.DataFrame([{"item_id": 0}, {"item_id": 1}, {"item_id": 2}]) + filters = ds.create_table( + "filters_data", table_store=TableStoreDB( + dbconn, "filters_data", [Column("item_id", Integer, primary_key=True)], True + ) + ) + filters.store_chunk(filters_data, now=0) + batch_transform_with_filters(dbconn, filters="filters_data", ds=ds) + + +def test_batch_transform_with_filters_as_datatable(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + filters_data = pd.DataFrame([{"item_id": 0}, {"item_id": 1}, {"item_id": 2}]) + filters = ds.create_table( + "filters_data", table_store=TableStoreDB( + dbconn, "filters_data", [Column("item_id", Integer, primary_key=True)], True + ) + ) + filters.store_chunk(filters_data, now=0) + batch_transform_with_filters(dbconn, filters=filters) + + +def test_batch_transform_with_filters_as_IndexDF(dbconn): + batch_transform_with_filters( + dbconn, filters=cast(IndexDF, pd.DataFrame([{"item_id": 0}, {"item_id": 1}, {"item_id": 2}])) + ) + + +def test_batch_transform_with_filters_as_list_of_dict(dbconn): + batch_transform_with_filters(dbconn, filters=[{"item_id": 0}, {"item_id": 1}, {"item_id": 2}]) + + +def test_batch_transform_with_filters_as_callable_IndexDF(dbconn): + def callable(ds: DataStore, run_config: Optional[RunConfig]): + return cast(IndexDF, pd.DataFrame([{"item_id": 0}, {"item_id": 1}, {"item_id": 2}])) + + batch_transform_with_filters(dbconn, filters=callable) + + +def test_batch_transform_with_filters_as_callable_list_of_dict(dbconn): + def callable(ds: DataStore, run_config: Optional[RunConfig]): + return [{"item_id": 0}, {"item_id": 1}, {"item_id": 2}] + + batch_transform_with_filters(dbconn, filters=callable) From 181cf4073cab9f4173caa9f93f56a93d941aec3d Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 12:54:16 +0000 Subject: [PATCH 19/43] fix tests --- datapipe/step/batch_transform.py | 2 -- datapipe/types.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 5c728d92..a6c89e42 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -546,8 +546,6 @@ def _apply_filters_to_run_config( dt = ds.get_table(self.filters) df = dt.get_data() filters = cast(List[LabelDict], df[dt.primary_keys].to_dict(orient="records")) - elif isinstance(self.filters, DataTable): - filters = cast(List[LabelDict], self.filters.get_data().to_dict(orient="records")) elif isinstance(self.filters, pd.DataFrame): filters = cast(List[LabelDict], self.filters.to_dict(orient="records")) elif isinstance(self.filters, list) and all([isinstance(x, dict) for x in self.filters]): diff --git a/datapipe/types.py b/datapipe/types.py index b73fa2f4..848a8d71 100644 --- a/datapipe/types.py +++ b/datapipe/types.py @@ -42,7 +42,7 @@ TransformResult = Union[DataDF, List[DataDF], Tuple[DataDF, ...]] LabelDict = Dict[str, Any] -Filters = Union[str, "DataTable", IndexDF, List[LabelDict], Callable[..., List[LabelDict]], Callable[..., IndexDF]] +Filters = Union[str, IndexDF, List[LabelDict], Callable[..., List[LabelDict]], Callable[..., IndexDF]] try: from sqlalchemy.orm import DeclarativeBase From a775700ec47ccb19804f541a458b2358db37bff3 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 13:47:51 +0000 Subject: [PATCH 20/43] fix mypyg --- .vscode/settings.json | 4 +++- Makefile | 2 ++ datapipe/step/batch_transform.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) create mode 100644 Makefile diff --git a/.vscode/settings.json b/.vscode/settings.json index 1c74547d..75d01431 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,7 +6,7 @@ "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, "python.linting.pylintEnabled": false, - "python.linting.flake8Enabled": false, + "python.linting.flake8Enabled": true, "python.formatting.provider": "none", "python.analysis.extraPaths": [ "${workspaceFolder}" @@ -43,4 +43,6 @@ "githubPullRequests.ignoredPullRequestBranches": [ "master" ], + "python.linting.pycodestyleEnabled": false, + "python.linting.enabled": true, } \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..00780f6a --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +mypy: + mypy -p datapipe --ignore-missing-imports --follow-imports=silent --namespace-packages \ No newline at end of file diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index a6c89e42..e8f72fe8 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -629,7 +629,7 @@ def get_full_process_ids( ) # Список ключей из фильтров, которые нужно добавить в результат - extra_filters: Optional[List[str, Dict]] = None + extra_filters: Optional[List[Dict[str, Any]]] = None if run_config is not None: extra_filters = [{ k: v From fab83ca53afacedf528fa52ee1e689050bc3d722 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 15:09:51 +0000 Subject: [PATCH 21/43] sql filters change --- datapipe/datatable.py | 4 ++-- datapipe/sql_util.py | 33 +++++++++++++++++--------------- datapipe/step/batch_transform.py | 4 ++-- datapipe/store/database.py | 2 +- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index d7a19c7a..e3313e82 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -407,7 +407,7 @@ def get_stale_idx( ) sql = sql_apply_runconfig_filter( - sql, self.sql_table, self.primary_keys, run_config + sql, self.primary_keys, run_config ) with self.dbconn.con.begin() as con: @@ -607,7 +607,7 @@ def get_agg_cte( sql = sql.group_by(*key_cols) sql = sql_apply_filters_idx_to_subquery(sql, keys, filters_idx) - sql = sql_apply_runconfig_filter(sql, tbl, self.primary_keys, run_config) + sql = sql_apply_runconfig_filter(sql, self.primary_keys, run_config) return (keys, sql.cte(name=f"{tbl.name}__update")) diff --git a/datapipe/sql_util.py b/datapipe/sql_util.py index 078a3ee9..4c82a789 100644 --- a/datapipe/sql_util.py +++ b/datapipe/sql_util.py @@ -1,3 +1,4 @@ +from collections import defaultdict from typing import Any, Dict, List, Optional import pandas as pd @@ -5,7 +6,7 @@ from sqlalchemy.sql.expression import and_, or_ from datapipe.run_config import RunConfig -from datapipe.types import IndexDF +from datapipe.types import IndexDF, LabelDict def sql_apply_filters_idx_to_subquery( @@ -59,24 +60,26 @@ def sql_apply_idx_filter_to_table( def sql_apply_runconfig_filter( sql: Any, - table: Table, - primary_keys: List[str], + keys: List[str], run_config: Optional[RunConfig] = None, ) -> Any: if run_config is not None: - sql = sql.where( - or_( - *[ - and_( - *[ - table.c[k] == v - for k, v in filter.items() if k in primary_keys - ] - ) - for filter in run_config.filters - ] + applicable_filter_keys_to_records: Dict[Any, List[LabelDict]] = defaultdict(list) + for record in run_config.filters: + applicable_filter_keys = [k for k in keys if k in record] + if len(applicable_filter_keys) > 0: + applicable_filter_keys_to_records[tuple(applicable_filter_keys)].append(record) + for applicable_filter_keys, records in applicable_filter_keys_to_records.items(): + sql = sql.where( + or_(*[ + tuple_(*[column(i) for i in applicable_filter_keys]).in_( + [ + tuple_(*[r[k] for k in applicable_filter_keys]) + for r in records + ]) + ] + ) ) - ) return sql diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index e8f72fe8..a0ed06f6 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -284,7 +284,7 @@ def mark_all_rows_unprocessed( ) sql = sql_apply_runconfig_filter( - update_sql, self.sql_table, self.primary_keys, run_config + update_sql, self.primary_keys, run_config ) # execute @@ -473,7 +473,7 @@ def _make_agg_of_agg(ctes, agg_col): ) out = sql_apply_filters_idx_to_subquery(out, self.transform_keys, filters_idx) - out = sql_apply_runconfig_filter(out, tr_tbl, self.transform_keys, run_config) + out = sql_apply_runconfig_filter(out, self.transform_keys, run_config) out = out.cte(name="transform") diff --git a/datapipe/store/database.py b/datapipe/store/database.py index cac2c7a1..8941289f 100644 --- a/datapipe/store/database.py +++ b/datapipe/store/database.py @@ -303,7 +303,7 @@ def read_rows_meta_pseudo_df( sql = select(*self.data_table.c) sql = sql_apply_runconfig_filter( - sql, self.data_table, self.primary_keys, run_config + sql, self.primary_keys, run_config ) with self.dbconn.con.execution_options(stream_results=True).begin() as con: From a458dced0040c43c8a273e35c2fb09feab542305 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 15:26:46 +0000 Subject: [PATCH 22/43] fix tests --- datapipe/step/batch_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index a0ed06f6..002fb2e0 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -642,7 +642,7 @@ def alter_res_df(): for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): df = df[self.transform_keys] - if extra_filters is not None: + if extra_filters is not None and len(extra_filters) > 0: df__extra_filters = pd.DataFrame(extra_filters) if set(df__extra_filters.columns).intersection(df.columns): df = pd.merge(df, df__extra_filters) From 9ec0675a4ba97443869137de421894bf59f0fcf9 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 15:27:47 +0000 Subject: [PATCH 23/43] fix test --- tests/test_core_steps2.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index 40106462..86ee2a90 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -466,18 +466,6 @@ def test_batch_transform_with_filters_as_str(dbconn): batch_transform_with_filters(dbconn, filters="filters_data", ds=ds) -def test_batch_transform_with_filters_as_datatable(dbconn): - ds = DataStore(dbconn, create_meta_table=True) - filters_data = pd.DataFrame([{"item_id": 0}, {"item_id": 1}, {"item_id": 2}]) - filters = ds.create_table( - "filters_data", table_store=TableStoreDB( - dbconn, "filters_data", [Column("item_id", Integer, primary_key=True)], True - ) - ) - filters.store_chunk(filters_data, now=0) - batch_transform_with_filters(dbconn, filters=filters) - - def test_batch_transform_with_filters_as_IndexDF(dbconn): batch_transform_with_filters( dbconn, filters=cast(IndexDF, pd.DataFrame([{"item_id": 0}, {"item_id": 1}, {"item_id": 2}])) From 220e3442941017609806f0a712b0a08544bb5cd2 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 15:36:19 +0000 Subject: [PATCH 24/43] fix tests --- tests/test_core_steps2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index 86ee2a90..ba770f0a 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -139,7 +139,7 @@ def test_batch_transform_with_filter(dbconn): step.run_full( ds, run_config=RunConfig( - filters={"pipeline_id": 0}, + filters=[{"pipeline_id": 0}], ), ) @@ -169,7 +169,7 @@ def test_batch_transform_with_filter_not_in_transform_index(dbconn): step.run_full( ds, - run_config=RunConfig(filters={"pipeline_id": 0}), + run_config=RunConfig(filters=[{"pipeline_id": 0}]), ) assert_datatable_equal(tbl2, TEST_DF1_2.query("pipeline_id == 0")[["item_id", "a"]]) @@ -298,7 +298,7 @@ def gen_func(): func=gen_func, ds=ds, output_dts=[tbl], - run_config=RunConfig(filters={"pipeline_id": 0}), + run_config=RunConfig(filters=[{"pipeline_id": 0}]), ) assert_datatable_equal( From c4b97fb359ad2c0007b0906995c79f1235c2b157 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 15:44:32 +0000 Subject: [PATCH 25/43] fix filedir --- datapipe/store/filedir.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/store/filedir.py b/datapipe/store/filedir.py index e1bec6b6..8216deb4 100644 --- a/datapipe/store/filedir.py +++ b/datapipe/store/filedir.py @@ -493,7 +493,6 @@ def read_rows_meta_pseudo_df( ukeys = [] filepaths = [] looked_keys: Set[Any] = set() - for f in files: for filemath_match_suffix in self.filename_match_suffixes: m = re.match(filemath_match_suffix, f"{self.protocol_str}{f.path}") @@ -505,6 +504,7 @@ def read_rows_meta_pseudo_df( keys_values = tuple(m.group(attrname) for attrname in self.attrnames) if keys_values in looked_keys: continue + looked_keys.add(keys_values) for attrname, key_value in zip(self.attrnames, keys_values): ids[attrname].append(key_value) From 18d3f93bc33913c1fceec479a004fb23f9de33ce Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 15:45:37 +0000 Subject: [PATCH 26/43] revert changes --- .vscode/settings.json | 4 +--- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 75d01431..1c74547d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,7 +6,7 @@ "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, "python.linting.pylintEnabled": false, - "python.linting.flake8Enabled": true, + "python.linting.flake8Enabled": false, "python.formatting.provider": "none", "python.analysis.extraPaths": [ "${workspaceFolder}" @@ -43,6 +43,4 @@ "githubPullRequests.ignoredPullRequestBranches": [ "master" ], - "python.linting.pycodestyleEnabled": false, - "python.linting.enabled": true, } \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index ef422de1..99f8b4e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.14.0" +version = "0.14.0-alpha.2" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From ecc68fde0232b8c6edf46b8cf2e1a9f23439ab78 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 17:37:32 +0000 Subject: [PATCH 27/43] refactoring filters --- CHANGELOG.md | 2 + Makefile | 2 - datapipe/run_config.py | 15 ++++ datapipe/step/batch_transform.py | 78 +++++++++---------- docs/source/concepts.md | 14 +++- .../filters-as-function/.gitignore | 1 + .../filters-as-function/app.py | 67 ++++++++++++++++ .../filters-as-function/input.jsonline | 9 +++ .../simple-example/.gitignore | 1 + .../simple-example/app.py | 61 +++++++++++++++ .../simple-example/input.jsonline | 9 +++ examples/model_inference/app.py | 4 +- examples/model_inference/playground.ipynb | 76 ------------------ tests/test_core_steps2.py | 3 +- 14 files changed, 215 insertions(+), 127 deletions(-) delete mode 100644 Makefile create mode 100644 examples/batch_transform_with_filters/filters-as-function/.gitignore create mode 100644 examples/batch_transform_with_filters/filters-as-function/app.py create mode 100644 examples/batch_transform_with_filters/filters-as-function/input.jsonline create mode 100644 examples/batch_transform_with_filters/simple-example/.gitignore create mode 100644 examples/batch_transform_with_filters/simple-example/app.py create mode 100644 examples/batch_transform_with_filters/simple-example/input.jsonline delete mode 100644 examples/model_inference/playground.ipynb diff --git a/CHANGELOG.md b/CHANGELOG.md index 48bbb2a5..ed99d5a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ Changes: See "Migration from v0.13 to v0.14" for more details +* `BatchTransform` has new argument `filters`. It's using to filter processing transform indexes using only that indexes that as indicated in `filters`. See `docs/concepts.md` for more details. + # 0.13.14 * Fix [#334](https://github.com/epoch8/datapipe/issues/334) diff --git a/Makefile b/Makefile deleted file mode 100644 index 00780f6a..00000000 --- a/Makefile +++ /dev/null @@ -1,2 +0,0 @@ -mypy: - mypy -p datapipe --ignore-missing-imports --follow-imports=silent --namespace-packages \ No newline at end of file diff --git a/datapipe/run_config.py b/datapipe/run_config.py index ee16063b..8c94b7fe 100644 --- a/datapipe/run_config.py +++ b/datapipe/run_config.py @@ -1,5 +1,7 @@ from dataclasses import dataclass, field from typing import Any, List, Dict, Optional + +import pandas as pd from datapipe.types import LabelDict @@ -21,3 +23,16 @@ def add_labels(cls, rc: Optional["RunConfig"], labels: LabelDict) -> "RunConfig" ) else: return RunConfig(labels=labels) + + @classmethod + def add_filters(cls, rc: Optional["RunConfig"], filters: List[LabelDict]) -> "RunConfig": + if rc is not None: + return RunConfig( + filters=list( + pd.DataFrame(filters) + .drop_duplicates() + .apply(lambda row : row.dropna().to_dict(), axis=1) + ), + ) + else: + return RunConfig(filters=filters) \ No newline at end of file diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 002fb2e0..db857ec7 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -533,43 +533,37 @@ def _make_agg_of_agg(ctes, agg_col): ) return (self.transform_keys, sql) - def _apply_filters_to_run_config( + def _get_filters( self, ds: DataStore, run_config: Optional[RunConfig] = None - ) -> Optional[RunConfig]: + ) -> List[LabelDict]: if self.filters is None: - return run_config - else: - filters: List[LabelDict] - if isinstance(self.filters, str): - dt = ds.get_table(self.filters) - df = dt.get_data() - filters = cast(List[LabelDict], df[dt.primary_keys].to_dict(orient="records")) - elif isinstance(self.filters, pd.DataFrame): - filters = cast(List[LabelDict], self.filters.to_dict(orient="records")) - elif isinstance(self.filters, list) and all([isinstance(x, dict) for x in self.filters]): - filters = self.filters - elif isinstance(self.filters, Callable): # type: ignore - filters_func = cast(Callable[..., Union[List[LabelDict], IndexDF]], self.filters) - parameters = inspect.signature(filters_func).parameters - kwargs = { - **({"ds": ds} if "ds" in parameters else {}), - **({"run_config": run_config} if "run_config" in parameters else {}) - } - filters_res = filters_func(**kwargs) - if isinstance(filters_res, pd.DataFrame): - filters = cast(List[LabelDict], filters_res.to_dict(orient="records")) - else: - filters = filters_res - - if run_config is None: - return RunConfig(filters=filters) + return [] + + filters: List[LabelDict] + if isinstance(self.filters, str): + dt = ds.get_table(self.filters) + df = dt.get_data() + filters = cast(List[LabelDict], df[dt.primary_keys].to_dict(orient="records")) + elif isinstance(self.filters, pd.DataFrame): + filters = cast(List[LabelDict], self.filters.to_dict(orient="records")) + elif isinstance(self.filters, list) and all([isinstance(x, dict) for x in self.filters]): + filters = self.filters + elif isinstance(self.filters, Callable): # type: ignore + filters_func = cast(Callable[..., Union[List[LabelDict], IndexDF]], self.filters) + parameters = inspect.signature(filters_func).parameters + kwargs = { + **({"ds": ds} if "ds" in parameters else {}), + **({"run_config": run_config} if "run_config" in parameters else {}) + } + filters_res = filters_func(**kwargs) + if isinstance(filters_res, pd.DataFrame): + filters = cast(List[LabelDict], filters_res.to_dict(orient="records")) else: - run_config = copy.deepcopy(run_config) - filters = copy.deepcopy(filters) - run_config.filters += filters - return run_config + filters = filters_res + + return filters def get_status(self, ds: DataStore) -> StepStatus: return StepStatus( @@ -583,7 +577,7 @@ def get_changed_idx_count( ds: DataStore, run_config: Optional[RunConfig] = None, ) -> int: - run_config = self._apply_filters_to_run_config(ds, run_config) + filters = self._get_filters(ds, run_config) _, sql = self._build_changed_idx_sql(ds, run_config=run_config) with ds.meta_dbconn.con.begin() as con: @@ -609,7 +603,6 @@ def get_full_process_ids( - idx_size - количество индексов требующих обработки - idx_df - датафрейм без колонок с данными, только индексная колонка """ - run_config = self._apply_filters_to_run_config(ds, run_config) chunk_size = chunk_size or self.chunk_size with tracer.start_as_current_span("compute ids to process"): @@ -659,7 +652,6 @@ def get_change_list_process_ids( change_list: ChangeList, run_config: Optional[RunConfig] = None, ) -> Tuple[int, Iterable[IndexDF]]: - run_config = self._apply_filters_to_run_config(ds, run_config) with tracer.start_as_current_span("compute ids to process"): changes = [pd.DataFrame(columns=self.transform_keys)] @@ -702,8 +694,6 @@ def store_batch_result( process_ts: float, run_config: Optional[RunConfig] = None, ) -> ChangeList: - run_config = self._apply_filters_to_run_config(ds, run_config) - changes = ChangeList() if output_dfs is not None: @@ -749,8 +739,6 @@ def store_batch_err( process_ts: float, run_config: Optional[RunConfig] = None, ) -> None: - run_config = self._apply_filters_to_run_config(ds, run_config) - idx_records = idx.to_dict(orient="records") logger.error( @@ -853,8 +841,10 @@ def run_full( logger.info(f"Running: {self.name}") run_config = RunConfig.add_labels(run_config, {"step_name": self.name}) + filters = self._get_filters(ds, run_config) + run_config_with_filters = RunConfig.add_filters(run_config, filters) - (idx_count, idx_gen) = self.get_full_process_ids(ds=ds, run_config=run_config) + (idx_count, idx_gen) = self.get_full_process_ids(ds=ds, run_config=run_config_with_filters) logger.info(f"Batches to process {idx_count}") @@ -867,7 +857,7 @@ def run_full( idx_count=idx_count, idx_gen=idx_gen, process_fn=self.process_batch, - run_config=run_config, + run_config=run_config_with_filters, executor_config=self.executor_config, ) @@ -884,9 +874,11 @@ def run_changelist( executor = SingleThreadExecutor() run_config = RunConfig.add_labels(run_config, {"step_name": self.name}) + filters = self._get_filters(ds, run_config) + run_config_with_filters = RunConfig.add_filters(run_config, filters) (idx_count, idx_gen) = self.get_change_list_process_ids( - ds, change_list, run_config + ds, change_list, run_config_with_filters ) logger.info(f"Batches to process {idx_count}") @@ -902,7 +894,7 @@ def run_changelist( idx_count=idx_count, idx_gen=idx_gen, process_fn=self.process_batch, - run_config=run_config, + run_config=run_config_with_filters, executor_config=self.executor_config, ) diff --git a/docs/source/concepts.md b/docs/source/concepts.md index 59b66aab..c9d125d6 100644 --- a/docs/source/concepts.md +++ b/docs/source/concepts.md @@ -50,9 +50,17 @@ обработки. Имеет magic injection: -- Если у функции `func` есть аргумент `ds`, то туда передатся используемый `DataStore`. -- Если у функции `func` есть аргумент `run_config`, то туда передатся используемый текущий `RunConfig`. -- Если у функции `func` есть аргумент `idx`, то туда передатся используемый `IndexDF` -- текущие индексы обработки. +* Если у функции `func` есть аргумент `ds`, то туда передатся используемый `DataStore`. +* Если у функции `func` есть аргумент `run_config`, то туда передатся используемый текущий `RunConfig`. +* Если у функции `func` есть аргумент `idx`, то туда передатся используемый `IndexDF` -- текущие индексы обработки. + +Имеет поддержку фильтров `filters`, аналогичный по свойству в `RunConfig`. При использовании фильтров при вычислении текущих индексов обработки берутся только те значения, которые записаны в `filters`. Индексы, не попавшие в фильтрацию, не считаются обработанными и они могут быть обработаны, убрав фильтрацию. +Параметр принимает одно из значений: +* список ключи:значения, например `[{"idx": "0"}, {"idx": "1"}]` +* `IndexDF` (датафрейм с индексами) +* вызываемую функцию, которая на выходе возвращает либо список ключи:значения, либо `IndexDF`. Для этой функции есть также поддержка magic injection на аргументы `ds` и `run_config`. С помощью динамической функции и `run_config` можно реализовать довольно сложную логику фильтрации. + +Примеры запуска трансформации с фильтров можно найти в `examples/batch_transform_with_filters` ## `BatchGenerate` diff --git a/examples/batch_transform_with_filters/filters-as-function/.gitignore b/examples/batch_transform_with_filters/filters-as-function/.gitignore new file mode 100644 index 00000000..74d0d3a3 --- /dev/null +++ b/examples/batch_transform_with_filters/filters-as-function/.gitignore @@ -0,0 +1 @@ +output.jsonline diff --git a/examples/batch_transform_with_filters/filters-as-function/app.py b/examples/batch_transform_with_filters/filters-as-function/app.py new file mode 100644 index 00000000..9ec7a3fc --- /dev/null +++ b/examples/batch_transform_with_filters/filters-as-function/app.py @@ -0,0 +1,67 @@ +import pandas as pd +import sqlalchemy as sa + +from datapipe.compute import Catalog, DatapipeApp, Pipeline, Table +from datapipe.datatable import DataStore +from datapipe.run_config import RunConfig +from datapipe.step.batch_transform import BatchTransform +from datapipe.step.update_external_table import UpdateExternalTable +from datapipe.store.database import DBConn +from datapipe.store.pandas import TableStoreJsonLine + +dbconn = DBConn("sqlite+pysqlite3:///db.sqlite") +ds = DataStore(dbconn) + + +def filter_cases(ds: DataStore, run_config: RunConfig): + label = run_config.labels.get("stage", None) + if label == "label1": + return pd.DataFrame({"input_id": [1, 3, 4, 6, 9]}) + elif label == "label2": + return pd.DataFrame({"input_id": [2, 6, 9]}) + else: + return pd.DataFrame({"input_id": [6, 9]}) + + +def apply_transformation(input_df: pd.DataFrame) -> pd.DataFrame: + input_df["text"] = "Yay! I have been transformed." + return input_df + + +input_tbl = Table( + name="input", + store=TableStoreJsonLine( + filename="input.jsonline", + primary_schema=[ + sa.Column("input_id", sa.Integer, primary_key=True), + ], + ), +) + +output_tbl = Table( + name="output", + store=TableStoreJsonLine( + filename="output.jsonline", + primary_schema=[ + sa.Column("input_id", sa.Integer, primary_key=True), + ], + ), +) + + +pipeline = Pipeline( + [ + UpdateExternalTable( + output=input_tbl, + ), + BatchTransform( + apply_transformation, + inputs=[input_tbl], + outputs=[output_tbl], + labels=[("stage", "label1"), ("stage", "label2")], + filters=filter_cases + ), + ] +) + +app = DatapipeApp(ds, Catalog({}), pipeline) diff --git a/examples/batch_transform_with_filters/filters-as-function/input.jsonline b/examples/batch_transform_with_filters/filters-as-function/input.jsonline new file mode 100644 index 00000000..7108cc6a --- /dev/null +++ b/examples/batch_transform_with_filters/filters-as-function/input.jsonline @@ -0,0 +1,9 @@ +{"input_id": 1, "text": "I need to be transformed when labels=stage=label1."} +{"input_id": 2, "text": "I need to be transformed when labels=stage=label2."} +{"input_id": 3, "text": "I need to be transformed when labels=stage=label1."} +{"input_id": 4, "text": "I need to be transformed when labels=stage=label1."} +{"input_id": 5, "text": "I need to be ignored."} +{"input_id": 6, "text": "I need to be transformed anytime."} +{"input_id": 7, "text": "I need to be ignored."} +{"input_id": 8, "text": "I need to be ignored."} +{"input_id": 9, "text": "I need to be transformed anytime."} diff --git a/examples/batch_transform_with_filters/simple-example/.gitignore b/examples/batch_transform_with_filters/simple-example/.gitignore new file mode 100644 index 00000000..74d0d3a3 --- /dev/null +++ b/examples/batch_transform_with_filters/simple-example/.gitignore @@ -0,0 +1 @@ +output.jsonline diff --git a/examples/batch_transform_with_filters/simple-example/app.py b/examples/batch_transform_with_filters/simple-example/app.py new file mode 100644 index 00000000..6dda3ae0 --- /dev/null +++ b/examples/batch_transform_with_filters/simple-example/app.py @@ -0,0 +1,61 @@ +import pandas as pd +import sqlalchemy as sa + +from datapipe.compute import Catalog, DatapipeApp, Pipeline, Table +from datapipe.datatable import DataStore +from datapipe.step.batch_transform import BatchTransform +from datapipe.step.update_external_table import UpdateExternalTable +from datapipe.store.database import DBConn +from datapipe.store.pandas import TableStoreJsonLine + +dbconn = DBConn("sqlite+pysqlite3:///db.sqlite") +ds = DataStore(dbconn) + + +def apply_transformation(input_df: pd.DataFrame) -> pd.DataFrame: + input_df["text"] = "Yay! I have been transformed." + return input_df + + +input_tbl = Table( + name="input", + store=TableStoreJsonLine( + filename="input.jsonline", + primary_schema=[ + sa.Column("input_id", sa.Integer, primary_key=True), + ], + ), +) + +output_tbl = Table( + name="output", + store=TableStoreJsonLine( + filename="output.jsonline", + primary_schema=[ + sa.Column("input_id", sa.Integer, primary_key=True), + ], + ), +) + + +pipeline = Pipeline( + [ + UpdateExternalTable( + output=input_tbl, + ), + BatchTransform( + apply_transformation, + inputs=[input_tbl], + outputs=[output_tbl], + filters=[ + {"input_id": 1}, + {"input_id": 3}, + {"input_id": 4}, + {"input_id": 6}, + {"input_id": 9} + ] + ), + ] +) + +app = DatapipeApp(ds, Catalog({}), pipeline) diff --git a/examples/batch_transform_with_filters/simple-example/input.jsonline b/examples/batch_transform_with_filters/simple-example/input.jsonline new file mode 100644 index 00000000..77185c50 --- /dev/null +++ b/examples/batch_transform_with_filters/simple-example/input.jsonline @@ -0,0 +1,9 @@ +{"input_id": 1, "text": "I need to be transformed."} +{"input_id": 2, "text": "I need to be ignored."} +{"input_id": 3, "text": "I need to be transformed."} +{"input_id": 4, "text": "I need to be transformed."} +{"input_id": 5, "text": "I need to be ignored."} +{"input_id": 6, "text": "I need to be transformed."} +{"input_id": 7, "text": "I need to be ignored."} +{"input_id": 8, "text": "I need to be ignored."} +{"input_id": 9, "text": "I need to be transformed"} diff --git a/examples/model_inference/app.py b/examples/model_inference/app.py index 26b03c3b..237ba1ac 100644 --- a/examples/model_inference/app.py +++ b/examples/model_inference/app.py @@ -35,8 +35,8 @@ def apply_model(input_df: pd.DataFrame, model_df: pd.DataFrame) -> pd.DataFrame: store=TableStoreJsonLine( filename="input.jsonline", primary_schema=[ - sa.Column("pipeline_id", sa.String, primary_key=True), sa.Column("input_id", sa.Integer, primary_key=True), + sa.Column("pipeline_id", sa.String, primary_key=True) ], ), ) @@ -57,8 +57,8 @@ def apply_model(input_df: pd.DataFrame, model_df: pd.DataFrame) -> pd.DataFrame: store=TableStoreJsonLine( filename="output.jsonline", primary_schema=[ - sa.Column("pipeline_id", sa.String, primary_key=True), sa.Column("input_id", sa.Integer, primary_key=True), + sa.Column("pipeline_id", sa.String, primary_key=True), sa.Column("model_id", sa.String, primary_key=True), ], ), diff --git a/examples/model_inference/playground.ipynb b/examples/model_inference/playground.ipynb deleted file mode 100644 index 0196f2e5..00000000 --- a/examples/model_inference/playground.ipynb +++ /dev/null @@ -1,76 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [], - "source": [ - "import app\n", - "step = app.app.steps[-1]" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "WITH input_meta__update_ts AS \n", - "(SELECT input_id, max(input_meta.update_ts) AS update_ts \n", - "FROM input_meta GROUP BY input_id), \n", - "models_meta__update_ts AS \n", - "(SELECT model_id, max(models_meta.update_ts) AS update_ts \n", - "FROM models_meta GROUP BY model_id), \n", - "all__update_ts AS \n", - "(SELECT input_meta__update_ts.input_id AS input_id, models_meta__update_ts.model_id AS model_id, max(input_meta__update_ts.update_ts, models_meta__update_ts.update_ts) AS update_ts \n", - "FROM input_meta__update_ts FULL OUTER JOIN models_meta__update_ts ON :param_1), \n", - "transform AS \n", - "(SELECT input_id, model_id, apply_functions_634cbbc660_meta.process_ts AS process_ts \n", - "FROM apply_functions_634cbbc660_meta \n", - "WHERE apply_functions_634cbbc660_meta.is_success = true GROUP BY input_id, model_id)\n", - " SELECT coalesce(all__update_ts.input_id, transform.input_id) AS input_id, coalesce(all__update_ts.model_id, transform.model_id) AS model_id \n", - "FROM all__update_ts FULL OUTER JOIN transform ON all__update_ts.input_id = transform.input_id AND all__update_ts.model_id = transform.model_id \n", - "WHERE transform.process_ts < all__update_ts.update_ts OR all__update_ts.update_ts IS NULL OR transform.process_ts IS NULL\n" - ] - } - ], - "source": [ - "_, sql = step._build_changed_idx_sql(ds=app.app.ds)\n", - "print(str(sql))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "venv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.6" - }, - "orig_nbformat": 4 - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index ba770f0a..ae153048 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -428,7 +428,8 @@ def batch_transform_with_filters(dbconn, filters: Filters, ds: Optional[DataStor products.store_chunk(PRODUCTS_DF, now=0) items.store_chunk(ITEMS_DF, now=0) - def update_df(products: pd.DataFrame, items: pd.DataFrame): + def update_df(products: pd.DataFrame, items: pd.DataFrame, run_config: RunConfig): + assert len(run_config.filters) == 3 merged_df = pd.merge(items, products, on=["product_id", "pipeline_id"]) merged_df["a"] = merged_df.apply(lambda x: x["a"] + x["b"], axis=1) From 0838d0b37c9c1d0741af407f39a91be6ec0eaaaa Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 17:53:35 +0000 Subject: [PATCH 28/43] rename function --- datapipe/datatable.py | 6 +++--- datapipe/sql_util.py | 2 +- datapipe/step/batch_transform.py | 19 ++++++++++++++----- datapipe/store/database.py | 4 ++-- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index e3313e82..696061ae 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -17,7 +17,7 @@ from datapipe.sql_util import ( sql_apply_filters_idx_to_subquery, sql_apply_idx_filter_to_table, - sql_apply_runconfig_filter, + sql_apply_runconfig_filters, ) from datapipe.store.database import DBConn, MetaKey from datapipe.store.table_store import TableStore @@ -406,7 +406,7 @@ def get_stale_idx( ) ) - sql = sql_apply_runconfig_filter( + sql = sql_apply_runconfig_filters( sql, self.primary_keys, run_config ) @@ -607,7 +607,7 @@ def get_agg_cte( sql = sql.group_by(*key_cols) sql = sql_apply_filters_idx_to_subquery(sql, keys, filters_idx) - sql = sql_apply_runconfig_filter(sql, self.primary_keys, run_config) + sql = sql_apply_runconfig_filters(sql, self.primary_keys, run_config) return (keys, sql.cte(name=f"{tbl.name}__update")) diff --git a/datapipe/sql_util.py b/datapipe/sql_util.py index 4c82a789..2def491f 100644 --- a/datapipe/sql_util.py +++ b/datapipe/sql_util.py @@ -58,7 +58,7 @@ def sql_apply_idx_filter_to_table( return sql -def sql_apply_runconfig_filter( +def sql_apply_runconfig_filters( sql: Any, keys: List[str], run_config: Optional[RunConfig] = None, diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index db857ec7..57b48575 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -48,7 +48,7 @@ from datapipe.run_config import RunConfig from datapipe.sql_util import ( sql_apply_filters_idx_to_subquery, - sql_apply_runconfig_filter, + sql_apply_runconfig_filters, ) from datapipe.store.database import DBConn from datapipe.types import ( @@ -283,7 +283,7 @@ def mark_all_rows_unprocessed( .where(self.sql_table.c.is_success == True) ) - sql = sql_apply_runconfig_filter( + sql = sql_apply_runconfig_filters( update_sql, self.primary_keys, run_config ) @@ -473,7 +473,7 @@ def _make_agg_of_agg(ctes, agg_col): ) out = sql_apply_filters_idx_to_subquery(out, self.transform_keys, filters_idx) - out = sql_apply_runconfig_filter(out, self.transform_keys, run_config) + out = sql_apply_runconfig_filters(out, self.transform_keys, run_config) out = out.cte(name="transform") @@ -560,8 +560,18 @@ def _get_filters( filters_res = filters_func(**kwargs) if isinstance(filters_res, pd.DataFrame): filters = cast(List[LabelDict], filters_res.to_dict(orient="records")) - else: + elif isinstance(filters_res, list) and all([isinstance(x, dict) for x in filters_res]): filters = filters_res + else: + raise TypeError( + "Function filters must return pd.Dataframe or list of key:values." + f" Returned type: {type(filters_res)}" + ) + else: + raise TypeError( + "Argument filters must be pd.Dataframe, list of key:values or function." + f" Got type: {type(self.filters)}" + ) return filters @@ -577,7 +587,6 @@ def get_changed_idx_count( ds: DataStore, run_config: Optional[RunConfig] = None, ) -> int: - filters = self._get_filters(ds, run_config) _, sql = self._build_changed_idx_sql(ds, run_config=run_config) with ds.meta_dbconn.con.begin() as con: diff --git a/datapipe/store/database.py b/datapipe/store/database.py index 8941289f..69ec416b 100644 --- a/datapipe/store/database.py +++ b/datapipe/store/database.py @@ -13,7 +13,7 @@ from sqlalchemy.sql.expression import delete, select from datapipe.run_config import RunConfig -from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filter +from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filters from datapipe.store.table_store import TableStore from datapipe.types import DataDF, DataSchema, IndexDF, MetaSchema, OrmTable, TAnyDF @@ -302,7 +302,7 @@ def read_rows_meta_pseudo_df( ) -> Iterator[DataDF]: sql = select(*self.data_table.c) - sql = sql_apply_runconfig_filter( + sql = sql_apply_runconfig_filters( sql, self.primary_keys, run_config ) From d2045add5645ec53b1b8fe156568d7537084d8aa Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 18:07:06 +0000 Subject: [PATCH 29/43] fix tests --- datapipe/run_config.py | 2 +- datapipe/step/batch_transform.py | 1 + tests/test_core_steps2.py | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/datapipe/run_config.py b/datapipe/run_config.py index 8c94b7fe..6998f973 100644 --- a/datapipe/run_config.py +++ b/datapipe/run_config.py @@ -29,7 +29,7 @@ def add_filters(cls, rc: Optional["RunConfig"], filters: List[LabelDict]) -> "Ru if rc is not None: return RunConfig( filters=list( - pd.DataFrame(filters) + pd.concat([pd.DataFrame(rc.filters), pd.DataFrame(filters)], ignore_index=True) .drop_duplicates() .apply(lambda row : row.dropna().to_dict(), axis=1) ), diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 57b48575..84ab08f2 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -852,6 +852,7 @@ def run_full( run_config = RunConfig.add_labels(run_config, {"step_name": self.name}) filters = self._get_filters(ds, run_config) run_config_with_filters = RunConfig.add_filters(run_config, filters) + print(f"{run_config_with_filters.filters=}") (idx_count, idx_gen) = self.get_full_process_ids(ds=ds, run_config=run_config_with_filters) diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index ae153048..5c952efb 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -116,7 +116,7 @@ def test_batch_transform(dbconn): assert all(meta_df["process_ts"] == process_ts) -def test_batch_transform_with_filter(dbconn): +def test_batch_transform_with_filter_in_run_config(dbconn): ds = DataStore(dbconn, create_meta_table=True) tbl1 = ds.create_table( @@ -146,7 +146,7 @@ def test_batch_transform_with_filter(dbconn): assert_datatable_equal(tbl2, TEST_DF1_1.query("pipeline_id == 0")) -def test_batch_transform_with_filter_not_in_transform_index(dbconn): +def test_batch_transform_with_filter_in_run_config_not_in_transform_index(dbconn): ds = DataStore(dbconn, create_meta_table=True) tbl1 = ds.create_table( From 7310152f3fe9a248da7de09de6d25aa8ccfb4f73 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 18:07:28 +0000 Subject: [PATCH 30/43] rm print --- datapipe/step/batch_transform.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 84ab08f2..57b48575 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -852,7 +852,6 @@ def run_full( run_config = RunConfig.add_labels(run_config, {"step_name": self.name}) filters = self._get_filters(ds, run_config) run_config_with_filters = RunConfig.add_filters(run_config, filters) - print(f"{run_config_with_filters.filters=}") (idx_count, idx_gen) = self.get_full_process_ids(ds=ds, run_config=run_config_with_filters) From 267ca06fc76dceb06eb44bc97ddf7bc2b4df67cb Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 20:32:04 +0000 Subject: [PATCH 31/43] fix tests --- datapipe/datatable.py | 2 +- datapipe/sql_util.py | 18 +------- datapipe/step/batch_transform.py | 9 ++-- tests/test_complex_pipeline.py | 70 ++++++++++++++++++++++++-------- 4 files changed, 60 insertions(+), 39 deletions(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 696061ae..5f4bf484 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -607,7 +607,7 @@ def get_agg_cte( sql = sql.group_by(*key_cols) sql = sql_apply_filters_idx_to_subquery(sql, keys, filters_idx) - sql = sql_apply_runconfig_filters(sql, self.primary_keys, run_config) + sql = sql_apply_runconfig_filters(sql, keys, run_config) return (keys, sql.cte(name=f"{tbl.name}__update")) diff --git a/datapipe/sql_util.py b/datapipe/sql_util.py index 2def491f..8fee41fa 100644 --- a/datapipe/sql_util.py +++ b/datapipe/sql_util.py @@ -64,22 +64,8 @@ def sql_apply_runconfig_filters( run_config: Optional[RunConfig] = None, ) -> Any: if run_config is not None: - applicable_filter_keys_to_records: Dict[Any, List[LabelDict]] = defaultdict(list) - for record in run_config.filters: - applicable_filter_keys = [k for k in keys if k in record] - if len(applicable_filter_keys) > 0: - applicable_filter_keys_to_records[tuple(applicable_filter_keys)].append(record) - for applicable_filter_keys, records in applicable_filter_keys_to_records.items(): - sql = sql.where( - or_(*[ - tuple_(*[column(i) for i in applicable_filter_keys]).in_( - [ - tuple_(*[r[k] for k in applicable_filter_keys]) - for r in records - ]) - ] - ) - ) + filters_idx = pd.DataFrame(run_config.filters) + sql = sql_apply_filters_idx_to_subquery(sql, keys, filters_idx) return sql diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 57b48575..fd48bf02 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -46,10 +46,7 @@ from datapipe.datatable import DataStore, DataTable, MetaTable from datapipe.executor import Executor, ExecutorConfig, SingleThreadExecutor from datapipe.run_config import RunConfig -from datapipe.sql_util import ( - sql_apply_filters_idx_to_subquery, - sql_apply_runconfig_filters, -) +from datapipe.sql_util import sql_apply_filters_idx_to_subquery, sql_apply_runconfig_filters from datapipe.store.database import DBConn from datapipe.types import ( ChangeList, @@ -573,6 +570,9 @@ def _get_filters( f" Got type: {type(self.filters)}" ) + keys = set([key for keys in filters for key in keys]) + if not all(len(filter) == len(keys) for filter in filters): + raise ValueError("Size of keys in each filters must have same length") return filters def get_status(self, ds: DataStore) -> StepStatus: @@ -643,7 +643,6 @@ def alter_res_df(): with ds.meta_dbconn.con.begin() as con: for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): df = df[self.transform_keys] - if extra_filters is not None and len(extra_filters) > 0: df__extra_filters = pd.DataFrame(extra_filters) if set(df__extra_filters.columns).intersection(df.columns): diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index 8cffc548..0102fdc6 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -303,6 +303,17 @@ def test_complex_transform_with_filters(dbconn): True ) ), + "tbl_subset__has__image": Table( + store=TableStoreDB( + dbconn, + "tbl_subset__has__image", + [ + Column("image_id", Integer, primary_key=True), + Column("subset_id", Integer, primary_key=True), + ], + True + ) + ), "tbl_prediction": Table( store=TableStoreDB( dbconn, @@ -319,6 +330,7 @@ def test_complex_transform_with_filters(dbconn): dbconn, "tbl_output", [ + Column("subset_id", Integer, primary_key=True), Column("model_id", Integer, primary_key=True), Column("count", Integer), ], @@ -330,20 +342,26 @@ def test_complex_transform_with_filters(dbconn): def gen_tbl(df): yield df - test_df__image = pd.DataFrame( - {"image_id": [0, 1, 2, 3]} - ) + test_df__image = pd.DataFrame({ + "image_id": [0, 1, 2, 3] + }) + test_df__subset__has__image = pd.DataFrame({ + "image_id": [0, 1, 2, 3], + "subset_id": [0, 0, 1, 1] + }) test_df__prediction = pd.DataFrame({ "image_id": [0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3], - "model_id": [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1] + "model_id": [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2] }) - def count_func(df__image: pd.DataFrame, df__prediction: pd.DataFrame): + def count_func( + df__image: pd.DataFrame, + df__subset__has__image: pd.DataFrame, + df__prediction: pd.DataFrame, + ): + df__image = pd.merge(df__image, df__subset__has__image, on=["image_id"]) df__image = pd.merge(df__image, df__prediction, on=["image_id"]) - print(f"{df__image=}") - print(f"{df__prediction=}") - df__output = df__image.groupby("model_id").agg(len).reset_index().rename(columns={"image_id": "count"}) - print(f"{df__output=}") + df__output = df__image.groupby(["subset_id", "model_id"]).agg(len).reset_index().rename(columns={"image_id": "count"}) return df__output pipeline = Pipeline( @@ -353,6 +371,11 @@ def count_func(df__image: pd.DataFrame, df__prediction: pd.DataFrame): outputs=["tbl_image"], kwargs=dict(df=test_df__image), ), + BatchGenerate( + func=gen_tbl, + outputs=["tbl_subset__has__image"], + kwargs=dict(df=test_df__subset__has__image), + ), BatchGenerate( func=gen_tbl, outputs=["tbl_prediction"], @@ -360,19 +383,32 @@ def count_func(df__image: pd.DataFrame, df__prediction: pd.DataFrame): ), BatchTransform( func=count_func, - inputs=["tbl_image", "tbl_prediction"], + inputs=["tbl_image", "tbl_subset__has__image", "tbl_prediction"], outputs=["tbl_output"], - transform_keys=["model_id"], + transform_keys=["subset_id", "model_id"], chunk_size=6, - # filters=[{"image_id": 0}, {"image_id": 1}, {"image_id": 2}] + filters=[ + {"subset_id": 0, "model_id": 1}, + {"subset_id": 0, "model_id": 2}, + ] ), ] ) steps = build_compute(ds, catalog, pipeline) run_steps(ds, steps) + test__df_output = count_func( + df__image=test_df__image, + df__subset__has__image=test_df__subset__has__image[ + test_df__subset__has__image["subset_id"] == 0 + ], + df__prediction=test_df__prediction[ + test_df__prediction["model_id"].isin([1, 2]) + ] + ) + # print(f"{test__df_output=}") print(ds.get_table("tbl_output").get_data()) - # assert_df_equal( - # ds.get_table("tbl_output").get_data(), - # count_func(test_df__image, test_df__prediction), - # index_cols=["model_id"] - # ) + assert_df_equal( + ds.get_table("tbl_output").get_data(), + test__df_output, + index_cols=["model_id"] + ) From a61aedb7f01ed27d11f9ac8b8c2f119633600896 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 20:37:49 +0000 Subject: [PATCH 32/43] add tests examples --- .github/workflows/test_examples.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test_examples.yaml b/.github/workflows/test_examples.yaml index 5fc3d59b..b3049f1a 100644 --- a/.github/workflows/test_examples.yaml +++ b/.github/workflows/test_examples.yaml @@ -25,6 +25,8 @@ jobs: - many_to_zero - model_inference - one_to_many_pipeline + - batch_transform_with_filters/simple-example + - batch_transform_with_filters/filters-as-function executor: - SingleThreadExecutor - RayExecutor From 38ba3a8fc19016b9ff8b68c9e8bb33c7229f98ff Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Mon, 19 Aug 2024 20:38:45 +0000 Subject: [PATCH 33/43] fix tests --- tests/test_table_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_table_store.py b/tests/test_table_store.py index c0a0d84c..2929b938 100644 --- a/tests/test_table_store.py +++ b/tests/test_table_store.py @@ -472,7 +472,7 @@ def test_read_rows_meta_pseudo_df_with_runconfig(store: TableStore, test_df: pd. assert_ts_contains(store, test_df) # TODO проверять, что runconfig реально влияет на результирующие данные - pseudo_df_iter = store.read_rows_meta_pseudo_df(run_config=RunConfig(filters={"a": 1})) + pseudo_df_iter = store.read_rows_meta_pseudo_df(run_config=RunConfig(filters=[{"a": 1}])) assert isinstance(pseudo_df_iter, Iterable) for pseudo_df in pseudo_df_iter: assert isinstance(pseudo_df, DataDF) From d1f6a99fe7f54f710290b0fe5925b718df7ba508 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Tue, 20 Aug 2024 17:28:09 +0000 Subject: [PATCH 34/43] add test_complex_transform_with_filters2 --- tests/test_complex_pipeline.py | 119 +++++++++++++++++++++++++++++---- 1 file changed, 105 insertions(+), 14 deletions(-) diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index 0102fdc6..db07916d 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -343,15 +343,15 @@ def gen_tbl(df): yield df test_df__image = pd.DataFrame({ - "image_id": [0, 1, 2, 3] + "image_id": range(1000) }) test_df__subset__has__image = pd.DataFrame({ - "image_id": [0, 1, 2, 3], - "subset_id": [0, 0, 1, 1] + "image_id": range(1000), + "subset_id": [0 for _ in range(200)] + [1 for _ in range(200)] + [2 for _ in range(600)] }) test_df__prediction = pd.DataFrame({ - "image_id": [0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3], - "model_id": [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2] + "image_id": list(range(1000)) + list(range(1000)), + "model_id": [0] * 1000 + [1] * 1000 }) def count_func( @@ -386,10 +386,10 @@ def count_func( inputs=["tbl_image", "tbl_subset__has__image", "tbl_prediction"], outputs=["tbl_output"], transform_keys=["subset_id", "model_id"], - chunk_size=6, + chunk_size=100, filters=[ - {"subset_id": 0, "model_id": 1}, - {"subset_id": 0, "model_id": 2}, + {"subset_id": 0}, + {"subset_id": 1}, ] ), ] @@ -399,16 +399,107 @@ def count_func( test__df_output = count_func( df__image=test_df__image, df__subset__has__image=test_df__subset__has__image[ - test_df__subset__has__image["subset_id"] == 0 + test_df__subset__has__image["subset_id"].isin([0, 1]) ], - df__prediction=test_df__prediction[ - test_df__prediction["model_id"].isin([1, 2]) - ] + df__prediction=test_df__prediction ) - # print(f"{test__df_output=}") - print(ds.get_table("tbl_output").get_data()) assert_df_equal( ds.get_table("tbl_output").get_data(), test__df_output, index_cols=["model_id"] ) + + +def test_complex_transform_with_filters2(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + catalog = Catalog({ + "tbl_image": Table( + store=TableStoreDB( + dbconn, + "tbl_image", + [ + Column("image_id", Integer, primary_key=True), + ], + True + ) + ), + "tbl_model": Table( + store=TableStoreDB( + dbconn, + "tbl_model", + [ + Column("model_id", Integer, primary_key=True), + ], + True + ) + ), + "tbl_prediction": Table( + store=TableStoreDB( + dbconn, + "tbl_prediction", + [ + Column("image_id", Integer, primary_key=True), + Column("model_id", Integer, primary_key=True), + ], + True + ) + ) + }) + + def gen_tbl(df): + yield df + + test_df__image = pd.DataFrame({ + "image_id": range(1000) + }) + test_df__model = pd.DataFrame({ + "model_id": [0, 1, 2, 3, 4] + }) + + def filters_images(): + return [{"image_id": i} for i in range(500)] + + + def make_prediction( + df__image: pd.DataFrame, + df__model: pd.DataFrame, + ): + df__prediction = pd.merge(df__image, df__model, how="cross") + return df__prediction + + pipeline = Pipeline( + [ + BatchGenerate( + func=gen_tbl, + outputs=["tbl_image"], + kwargs=dict(df=test_df__image), + ), + BatchGenerate( + func=gen_tbl, + outputs=["tbl_model"], + kwargs=dict(df=test_df__model), + ), + BatchTransform( + func=make_prediction, + inputs=["tbl_image", "tbl_model"], + outputs=["tbl_prediction"], + transform_keys=["image_id", "model_id"], + chunk_size=100, + filters=filters_images + ), + ] + ) + steps = build_compute(ds, catalog, pipeline) + run_steps(ds, steps) + test__df_output = make_prediction( + df__image=test_df__image[ + test_df__image["image_id"].isin([r["image_id"] for r in filters_images()]) + ], + # df__image=test_df__image, + df__model=test_df__model + ) + assert_df_equal( + ds.get_table("tbl_prediction").get_data(), + test__df_output, + index_cols=["model_id"] + ) From 586afc0dc38ae8a539c74714d7c168725e4138b1 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Tue, 20 Aug 2024 17:42:46 +0000 Subject: [PATCH 35/43] fix tests --- datapipe/step/batch_transform.py | 2 -- tests/test_complex_pipeline.py | 7 +++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index fd48bf02..c194cbe9 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -647,8 +647,6 @@ def alter_res_df(): df__extra_filters = pd.DataFrame(extra_filters) if set(df__extra_filters.columns).intersection(df.columns): df = pd.merge(df, df__extra_filters) - else: - df = pd.merge(df, df__extra_filters, how="cross") yield df diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index db07916d..2b1a54ea 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -450,14 +450,14 @@ def gen_tbl(df): yield df test_df__image = pd.DataFrame({ - "image_id": range(1000) + "image_id": range(100) }) test_df__model = pd.DataFrame({ "model_id": [0, 1, 2, 3, 4] }) def filters_images(): - return [{"image_id": i} for i in range(500)] + return [{"image_id": i} for i in range(50)] def make_prediction( @@ -484,7 +484,7 @@ def make_prediction( inputs=["tbl_image", "tbl_model"], outputs=["tbl_prediction"], transform_keys=["image_id", "model_id"], - chunk_size=100, + chunk_size=1000, filters=filters_images ), ] @@ -495,7 +495,6 @@ def make_prediction( df__image=test_df__image[ test_df__image["image_id"].isin([r["image_id"] for r in filters_images()]) ], - # df__image=test_df__image, df__model=test_df__model ) assert_df_equal( From 33b482d741a95982c2656e0c38f0aae5afe6eb3a Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Tue, 20 Aug 2024 18:06:51 +0000 Subject: [PATCH 36/43] fix tests, added new ValueError --- datapipe/step/batch_transform.py | 4 +- tests/test_core_steps2.py | 120 +++++++++++++------------------ 2 files changed, 54 insertions(+), 70 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index c194cbe9..7c9a38c2 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -572,7 +572,9 @@ def _get_filters( keys = set([key for keys in filters for key in keys]) if not all(len(filter) == len(keys) for filter in filters): - raise ValueError("Size of keys in each filters must have same length") + raise ValueError("Size of keys from filters must have same length") + if not all([key in self.transform_keys for key in keys]): + raise ValueError(f"Keys from filters must be in transform_keys={self.transform_keys}.") return filters def get_status(self, ds: DataStore) -> StepStatus: diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index 5c952efb..a6b5001c 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -146,35 +146,6 @@ def test_batch_transform_with_filter_in_run_config(dbconn): assert_datatable_equal(tbl2, TEST_DF1_1.query("pipeline_id == 0")) -def test_batch_transform_with_filter_in_run_config_not_in_transform_index(dbconn): - ds = DataStore(dbconn, create_meta_table=True) - - tbl1 = ds.create_table( - "tbl1", table_store=TableStoreDB(dbconn, "tbl1_data", TEST_SCHEMA1, True) - ) - - tbl2 = ds.create_table( - "tbl2", table_store=TableStoreDB(dbconn, "tbl2_data", TEST_SCHEMA2, True) - ) - - tbl1.store_chunk(TEST_DF1_2, now=0) - - step = BatchTransformStep( - ds=ds, - name="test", - func=lambda df: df[["item_id", "a"]], - input_dts=[tbl1], - output_dts=[tbl2], - ) - - step.run_full( - ds, - run_config=RunConfig(filters=[{"pipeline_id": 0}]), - ) - - assert_datatable_equal(tbl2, TEST_DF1_2.query("pipeline_id == 0")[["item_id", "a"]]) - - def test_batch_transform_with_dt_on_input_and_output(dbconn): ds = DataStore(dbconn, create_meta_table=True) @@ -390,69 +361,80 @@ def update_df(products: pd.DataFrame, items: pd.DataFrame): assert_df_equal(items2.get_data(), items2_df, index_cols=["item_id", "pipeline_id"]) -PRODUCTS_DF = pd.DataFrame( - { - "product_id": list(range(2)), - "pipeline_id": list(range(2)), - "b": range(10, 12), - } -) - -ITEMS_DF = pd.DataFrame( - { - "item_id": list(range(5)) * 2, - "pipeline_id": list(range(2)) * 5, - "product_id": list(range(2)) * 5, - "a": range(10), - } -) - - def batch_transform_with_filters(dbconn, filters: Filters, ds: Optional[DataStore] = None): if ds is None: ds = DataStore(dbconn, create_meta_table=True) - products = ds.create_table( - "products", - table_store=TableStoreDB(dbconn, "products_data", PRODUCTS_SCHEMA, True), + item = ds.create_table( + "item", + table_store=TableStoreDB( + dbconn, + "item", + [Column("item_id", Integer, primary_key=True)], + True + ), ) - items = ds.create_table( - "items", table_store=TableStoreDB(dbconn, "items_data", ITEMS_SCHEMA, True) + inner_item = ds.create_table( + "inner_item", table_store=TableStoreDB( + dbconn, + "inner_item", + [ + Column("item_id", Integer, primary_key=True), + Column("inner_item_id", Integer, primary_key=True) + ], + True + ) ) - items2 = ds.create_table( - "items2", table_store=TableStoreDB(dbconn, "items2_data", ITEMS_SCHEMA, True) + output = ds.create_table( + "output", table_store=TableStoreDB( + dbconn, + "output", + [ + Column("item_id", Integer, primary_key=True), + Column("inner_item_id", Integer, primary_key=True) + ], + True + ) ) - products.store_chunk(PRODUCTS_DF, now=0) - items.store_chunk(ITEMS_DF, now=0) + test_df__item = pd.DataFrame( + { + "item_id": list(range(10)), + } + ) - def update_df(products: pd.DataFrame, items: pd.DataFrame, run_config: RunConfig): - assert len(run_config.filters) == 3 - merged_df = pd.merge(items, products, on=["product_id", "pipeline_id"]) - merged_df["a"] = merged_df.apply(lambda x: x["a"] + x["b"], axis=1) + test_df__inner_item = pd.DataFrame( + { + "item_id": list(range(10)) * 10, + "inner_item_id": list(range(100)), + } + ) + item.store_chunk(test_df__item, now=0) + inner_item.store_chunk(test_df__inner_item, now=0) - return merged_df[["item_id", "pipeline_id", "product_id", "a"]] + def update_df(df__item: pd.DataFrame, df__inner_item: pd.DataFrame): + merged_df = pd.merge(df__item, df__inner_item, on=["item_id"]) + return merged_df step = BatchTransformStep( ds=ds, name="test", func=update_df, - input_dts=[products, items], - output_dts=[items2], + input_dts=[item, inner_item], + output_dts=[output], filters=filters ) step.run_full(ds) - merged_df = pd.merge(ITEMS_DF, PRODUCTS_DF, on=["product_id", "pipeline_id"]) - merged_df["a"] = merged_df.apply(lambda x: x["a"] + x["b"], axis=1) - - items2_df = merged_df[["item_id", "pipeline_id", "product_id", "a"]] - items2_df = items2_df[items2_df["item_id"].isin([0, 1, 2])] + test_df__output = update_df( + df__item=test_df__item[test_df__item["item_id"].isin([0, 1, 2])], + df__inner_item=test_df__inner_item[test_df__inner_item["item_id"].isin([0, 1, 2])] + ) - assert_df_equal(items2.get_data(), items2_df, index_cols=["item_id", "pipeline_id"]) + assert_df_equal(output.get_data(), test_df__output, index_cols=["item_id", "inner_item_id"]) def test_batch_transform_with_filters_as_str(dbconn): From bf69bf815dd81c83bdb22623daf612d38f3270c0 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Tue, 20 Aug 2024 18:39:14 +0000 Subject: [PATCH 37/43] add new tests --- datapipe/step/batch_transform.py | 14 -------------- tests/test_complex_pipeline.py | 15 ++++++++++++--- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 7c9a38c2..2614589b 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -632,24 +632,10 @@ def get_full_process_ids( order=self.order, ) - # Список ключей из фильтров, которые нужно добавить в результат - extra_filters: Optional[List[Dict[str, Any]]] = None - if run_config is not None: - extra_filters = [{ - k: v - for k, v in filter.items() - if k not in join_keys - } for filter in run_config.filters] - def alter_res_df(): with ds.meta_dbconn.con.begin() as con: for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): df = df[self.transform_keys] - if extra_filters is not None and len(extra_filters) > 0: - df__extra_filters = pd.DataFrame(extra_filters) - if set(df__extra_filters.columns).intersection(df.columns): - df = pd.merge(df, df__extra_filters) - yield df return math.ceil(idx_count / chunk_size), alter_res_df() diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index 2b1a54ea..215e9d19 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -1,4 +1,5 @@ import pandas as pd +import pytest from sqlalchemy import Column from sqlalchemy.sql.sqltypes import Integer, String @@ -410,7 +411,7 @@ def count_func( ) -def test_complex_transform_with_filters2(dbconn): +def complex_transform_with_filters2_by_N(dbconn, N): ds = DataStore(dbconn, create_meta_table=True) catalog = Catalog({ "tbl_image": Table( @@ -450,14 +451,14 @@ def gen_tbl(df): yield df test_df__image = pd.DataFrame({ - "image_id": range(100) + "image_id": range(N) }) test_df__model = pd.DataFrame({ "model_id": [0, 1, 2, 3, 4] }) def filters_images(): - return [{"image_id": i} for i in range(50)] + return [{"image_id": i} for i in range(N // 2)] def make_prediction( @@ -502,3 +503,11 @@ def make_prediction( test__df_output, index_cols=["model_id"] ) + +def test_complex_transform_with_filters2_N100(dbconn): + complex_transform_with_filters2_by_N(dbconn, N=100) + + +@pytest.mark.skip(reason="big filters not supported yet") +def test_complex_transform_with_filters2_N10000(dbconn): + complex_transform_with_filters2_by_N(dbconn, N=10000) \ No newline at end of file From 5cb433e37204217c27594c3e181f90d18c2b1021 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Thu, 22 Aug 2024 10:23:24 +0400 Subject: [PATCH 38/43] * --- docs/source/SUMMARY.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/source/SUMMARY.md b/docs/source/SUMMARY.md index 8ec0adb7..26a56dd5 100644 --- a/docs/source/SUMMARY.md +++ b/docs/source/SUMMARY.md @@ -4,6 +4,10 @@ - [Introduction](./introduction.md) +# Concepts + +- [Concepts](./concepts.md) + # Command Line Interface - [Command Line Interface](./cli.md) From 17cffbb5267a7b0287219ca6d87066723f522c19 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Thu, 5 Sep 2024 17:17:26 +0000 Subject: [PATCH 39/43] fix tests --- tests/test_core_steps2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index e2640cab..fe507dff 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -458,7 +458,7 @@ def update_df(df__item: pd.DataFrame, df__inner_item: pd.DataFrame): ds=ds, name="test", func=update_df, - input_dts=[item, inner_item], + input_dts=[ComputeInput(dt=item, join_type="full"), ComputeInput(dt=inner_item, join_type="full")], output_dts=[output], filters=filters ) From d95b2afe6791c6c1f642107a4fbddcf3b0e25a9c Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Thu, 5 Sep 2024 17:46:46 +0000 Subject: [PATCH 40/43] fix mypy --- datapipe/meta/sql_meta.py | 8 ++++---- datapipe/sql_util.py | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 9513c8d4..4ed8ff06 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -389,8 +389,8 @@ def get_stale_idx( ) ) - sql = sql_apply_runconfig_filters_to_subquery( - sql, self.primary_keys, run_config + sql = sql_apply_runconfig_filters( + sql, self.sql_table, self.primary_keys, run_config ) with self.dbconn.con.begin() as con: @@ -649,8 +649,8 @@ def mark_all_rows_unprocessed( .where(self.sql_table.c.is_success == True) ) - sql = sql_apply_runconfig_filters_to_subquery( - update_sql, self.primary_keys, run_config + sql = sql_apply_runconfig_filters( + update_sql, self.sql_table, self.primary_keys, run_config ) # execute diff --git a/datapipe/sql_util.py b/datapipe/sql_util.py index 8b99c84e..ae9c92b4 100644 --- a/datapipe/sql_util.py +++ b/datapipe/sql_util.py @@ -1,5 +1,5 @@ from collections import defaultdict -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, cast import pandas as pd from sqlalchemy import Column, Integer, String, Table, column, tuple_ @@ -43,8 +43,9 @@ def sql_apply_runconfig_filters( ) -> Any: if run_config is not None: filters_idx = pd.DataFrame(run_config.filters) - keys = [key for key in table.c if key in keys] - sql = sql_apply_idx_filter_to_table(sql, table, keys, filters_idx) + primary_keys = [key for key in keys if key in table.c] + if len(filters_idx) > 0 and len(keys) > 0: + sql = sql_apply_idx_filter_to_table(sql, table, primary_keys, cast(IndexDF, filters_idx)) return sql From 97dd2d2ae634931f3074e1ef7bf332be5dd4eea7 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Fri, 6 Sep 2024 11:28:23 +0000 Subject: [PATCH 41/43] fix tests --- Makefile | 3 +++ black.toml | 2 ++ datapipe/sql_util.py | 4 ++-- datapipe/step/batch_transform.py | 10 ++++++++++ tests/test_core_steps2.py | 2 +- 5 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 black.toml diff --git a/Makefile b/Makefile index 34f5cda0..9f31f38a 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,6 @@ black: autoflake -r --in-place --remove-all-unused-imports steps/ *.py brandlink_utils/ black --verbose --config black.toml steps/ alembic *.py brandlink_utils/ + +mypy: + mypy -p datapipe --ignore-missing-imports --follow-imports=silent --namespace-packages \ No newline at end of file diff --git a/black.toml b/black.toml new file mode 100644 index 00000000..55ec8d78 --- /dev/null +++ b/black.toml @@ -0,0 +1,2 @@ +[tool.black] +line-length = 120 diff --git a/datapipe/sql_util.py b/datapipe/sql_util.py index ae9c92b4..26ffd806 100644 --- a/datapipe/sql_util.py +++ b/datapipe/sql_util.py @@ -43,8 +43,8 @@ def sql_apply_runconfig_filters( ) -> Any: if run_config is not None: filters_idx = pd.DataFrame(run_config.filters) - primary_keys = [key for key in keys if key in table.c] - if len(filters_idx) > 0 and len(keys) > 0: + primary_keys = [key for key in keys if key in table.c and key in filters_idx.columns] + if len(filters_idx) > 0 and len(primary_keys) > 0: sql = sql_apply_idx_filter_to_table(sql, table, primary_keys, cast(IndexDF, filters_idx)) return sql diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 918e7ec6..9c3beb70 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -275,10 +275,20 @@ def get_full_process_ids( order=self.order, # type: ignore # pylance is stupid ) + if run_config is not None: + extra_filters = pd.DataFrame(run_config.filters) + else: + extra_filters = None + def alter_res_df(): with ds.meta_dbconn.con.begin() as con: for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): df = df[self.transform_keys] + if extra_filters is not None: + if len(set(df.columns).intersection(extra_filters.columns)) > 0: + df = pd.merge(df, extra_filters) + else: + df = pd.merge(df, extra_filters, how="cross") yield df return math.ceil(idx_count / chunk_size), alter_res_df() diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index fe507dff..604d8145 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -170,7 +170,7 @@ def test_batch_transform_with_filter_not_in_transform_index(dbconn): step.run_full( ds, - run_config=RunConfig(filters={"pipeline_id": 0}), + run_config=RunConfig(filters=[{"pipeline_id": 0}]), ) assert_datatable_equal(tbl2, TEST_DF1_2.query("pipeline_id == 0")[["item_id", "a"]]) From b1de4b84086acfcc8d8f5f0ec56d45a4e4c3a965 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Fri, 6 Sep 2024 11:38:39 +0000 Subject: [PATCH 42/43] * --- datapipe/step/batch_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 9c3beb70..1db7d3e6 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -284,7 +284,7 @@ def alter_res_df(): with ds.meta_dbconn.con.begin() as con: for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): df = df[self.transform_keys] - if extra_filters is not None: + if extra_filters is not None and len(extra_filters) > 0: if len(set(df.columns).intersection(extra_filters.columns)) > 0: df = pd.merge(df, extra_filters) else: From 805586060ce9f1496b3412ec2ec48f36173568c0 Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Fri, 6 Sep 2024 11:51:13 +0000 Subject: [PATCH 43/43] removed Keys from filters must be in transform_keys error --- datapipe/step/batch_transform.py | 2 -- tests/test_complex_pipeline.py | 3 +-- tests/test_core_steps2.py | 31 ++++++++++++++++++++++++++++++- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 1db7d3e6..88b3e82b 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -207,8 +207,6 @@ def _get_filters( keys = set([key for keys in filters for key in keys]) if not all(len(filter) == len(keys) for filter in filters): raise ValueError("Size of keys from filters must have same length") - if not all([key in self.transform_keys for key in keys]): - raise ValueError(f"Keys from filters must be in transform_keys={self.transform_keys}.") return filters def get_status(self, ds: DataStore) -> StepStatus: diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index 7e3321c2..f9dc079b 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -458,11 +458,10 @@ def gen_tbl(df): test_df__model = pd.DataFrame({ "model_id": [0, 1, 2, 3, 4] }) - + def filters_images(): return [{"image_id": i} for i in range(N // 2)] - def make_prediction( df__image: pd.DataFrame, df__model: pd.DataFrame, diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index 604d8145..90237339 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -147,7 +147,7 @@ def test_batch_transform_with_filter_in_run_config(dbconn): assert_datatable_equal(tbl2, TEST_DF1_1.query("pipeline_id == 0")) -def test_batch_transform_with_filter_not_in_transform_index(dbconn): +def test_batch_transform_with_filter_in_run_config_not_in_transform_index(dbconn): ds = DataStore(dbconn, create_meta_table=True) tbl1 = ds.create_table( @@ -176,6 +176,35 @@ def test_batch_transform_with_filter_not_in_transform_index(dbconn): assert_datatable_equal(tbl2, TEST_DF1_2.query("pipeline_id == 0")[["item_id", "a"]]) +def test_batch_transform_with_filter_not_in_transform_index(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + + tbl1 = ds.create_table( + "tbl1", table_store=TableStoreDB(dbconn, "tbl1_data", TEST_SCHEMA1, True) + ) + + tbl2 = ds.create_table( + "tbl2", table_store=TableStoreDB(dbconn, "tbl2_data", TEST_SCHEMA2, True) + ) + + tbl1.store_chunk(TEST_DF1_2, now=0) + + step = BatchTransformStep( + ds=ds, + name="test", + func=lambda df: df[["item_id", "a"]], + input_dts=[ComputeInput(dt=tbl1, join_type="full")], + output_dts=[tbl2], + filters=[{"pipeline_id": 0}] + ) + + step.run_full( + ds, + ) + + assert_datatable_equal(tbl2, TEST_DF1_2.query("pipeline_id == 0")[["item_id", "a"]]) + + def test_batch_transform_with_dt_on_input_and_output(dbconn): ds = DataStore(dbconn, create_meta_table=True)