Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(HttpComponentsResolver): added stream slices to HttpComponentsResolver #175

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -3007,6 +3007,7 @@ definitions:
interpolation_context:
- config
- components_values
- stream_slice
- stream_template_config
examples:
- ["data"]
Expand All @@ -3023,10 +3024,13 @@ definitions:
- config
- stream_template_config
- components_values
- stream_slice
examples:
- "{{ components_values['updates'] }}"
- "{{ components_values['MetaData']['LastUpdatedTime'] }}"
- "{{ config['segment_id'] }}"
- "{{ stream_slice['parent_id'] }}"
- "{{ stream_slice['extra_fields']['name'] }}"
value_type:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,7 @@ def create_http_components_resolver(
config=config,
name="",
primary_key=None,
stream_slicer=combined_slicers,
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
transformations=[],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,25 @@ def resolve_components(
"""
kwargs = {"stream_template_config": stream_template_config}

for components_values in self.retriever.read_records({}):
updated_config = deepcopy(stream_template_config)
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
stream_slices = self.retriever.stream_slices() if self.retriever.stream_slicer else [{}]
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved

for resolved_component in self._resolved_components:
valid_types = (
(resolved_component.value_type,) if resolved_component.value_type else None
)
value = resolved_component.value.eval(
self.config, valid_types=valid_types, **kwargs
)
for stream_slice in stream_slices:
for components_values in self.retriever.read_records({}, stream_slice):
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
updated_config = deepcopy(stream_template_config)
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
kwargs["stream_slice"] = stream_slice

for resolved_component in self._resolved_components:
valid_types = (
(resolved_component.value_type,) if resolved_component.value_type else None
)
value = resolved_component.value.eval(
self.config, valid_types=valid_types, **kwargs
)

path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
dpath.set(updated_config, path, value)
path = [
path.eval(self.config, **kwargs) for path in resolved_component.field_path
]
dpath.set(updated_config, path, value)

yield updated_config
yield updated_config
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def test_http_components_resolver(
):
mock_retriever = MagicMock()
mock_retriever.read_records.return_value = retriever_data
mock_retriever.stream_slices.return_value = [{}]
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
config = {}

resolver = HttpComponentsResolver(
Expand Down
Loading