Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Apr 24, 2024
1 parent c9a4c19 commit ab57be6
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 34 deletions.
12 changes: 7 additions & 5 deletions metricflow/dataflow/builder/node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,6 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(
# then produce the linkable spec. See comments further below for more details.

for entity_spec_in_right_node in entity_specs_in_right_node:
# If an entity has links, what that means and whether it can be used is unclear at the moment,
# so skip it.
if len(entity_spec_in_right_node.entity_links) > 0:
continue

entity_instance_in_right_node = None
for instance in data_set_in_right_node.instance_set.entity_instances:
if instance.spec == entity_spec_in_right_node:
Expand Down Expand Up @@ -253,6 +248,13 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(

if entity_instance_in_left_node is None:
# The right node can have a superset of entities.
if isinstance(right_node, ComputeMetricsNode):
print("exited here")
print(
"left node entities:",
[inst.spec.reference for inst in left_node_instance_set.entity_instances],
)
print("entity_spec_in_right_node:", entity_spec_in_right_node.reference)
continue

assert len(entity_instance_in_left_node.defined_from) == 1
Expand Down
36 changes: 19 additions & 17 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
CreateSqlColumnReferencesForInstances,
FilterElements,
FilterLinkableInstancesWithLeadingLink,
InstanceSetTransform,
RemoveMeasures,
RemoveMetrics,
UpdateMeasureFillNullsWith,
Expand Down Expand Up @@ -608,8 +609,7 @@ def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> SqlDataSet:

# Add select columns that would compute the metrics to the select columns.
metric_select_columns = []
metric_instances = []
group_by_metric_instances = []
metric_instances: List[MetricInstance] = []
for metric_spec in node.metric_specs:
metric = self._metric_lookup.get_metric(metric_spec.reference)

Expand Down Expand Up @@ -723,23 +723,25 @@ def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> SqlDataSet:
spec=metric_spec,
)
)
group_by_metric_instances.append(
GroupByMetricInstance(
associated_columns=(output_column_association,),
defined_from=MetricModelReference(metric_name=metric_spec.element_name),
spec=GroupByMetricSpec(
element_name=metric_spec.element_name,
entity_links=(),
metric_subquery_entity_links=(), # TODO
),
)

transform_func: InstanceSetTransform = AddMetrics(metric_instances)
if node.for_group_by_source_node:
assert (
len(metric_instances) == 1 and len(output_instance_set.entity_instances) == 1
), "Group by metrics currently only support exactly one metric grouped by exactly one entity."
metric_instance = metric_instances[0]
entity_instance = output_instance_set.entity_instances[0]
group_by_metric_instance = GroupByMetricInstance(
associated_columns=metric_instance.associated_columns,
defined_from=metric_instance.defined_from,
spec=GroupByMetricSpec(
element_name=metric_spec.element_name,
entity_links=(), # check this
metric_subquery_entity_links=entity_instance.spec.entity_links,
),
)
transform_func = AddGroupByMetrics([group_by_metric_instance])

transform_func = (
AddGroupByMetrics(group_by_metric_instances)
if node.for_group_by_source_node
else AddMetrics(metric_instances)
)
output_instance_set = output_instance_set.transform(transform_func)

combined_select_column_set = non_metric_select_column_set.merge(
Expand Down
1 change: 1 addition & 0 deletions metricflow/plan_conversion/instance_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ def transform(self, instance_set: InstanceSet) -> InstanceSet: # noqa: D102
)


# TODO: should this be singular, allowing only one group by metric?
class AddGroupByMetrics(InstanceSetTransform[InstanceSet]):
"""Adds the given metric instances to the instance set."""

Expand Down
6 changes: 3 additions & 3 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from metricflow.dataflow.dataflow_plan import (
BaseOutput,
)
from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode
from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode
from metricflow.dataflow.nodes.filter_elements import FilterElementsNode
from metricflow.dataflow.nodes.join_to_base import JoinDescription, JoinToBaseOutputNode
Expand Down Expand Up @@ -130,9 +131,6 @@ def _node_contains_entity(
if entity_spec_in_first_node.reference != entity_reference:
continue

if len(entity_spec_in_first_node.entity_links) > 0:
continue

assert (
len(entity_instance_in_first_node.defined_from) == 1
), "Multiple items in defined_from not yet supported"
Expand Down Expand Up @@ -216,6 +214,8 @@ def _get_candidates_nodes_for_multi_hop(
right_instance_set=data_set_of_second_node_that_can_be_joined.instance_set,
on_entity_reference=entity_reference_to_join_first_and_second_nodes,
):
if isinstance(second_node_that_could_be_joined, ComputeMetricsNode):
print("exited multi hop here")
continue

# filter measures out of joinable_node
Expand Down
1 change: 1 addition & 0 deletions metricflow/specs/spec_set_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ def transform(self, spec_set: InstanceSpecSet) -> Set[str]: # noqa: D102
.union({x.element_name for x in spec_set.dimension_specs})
.union({x.element_name for x in spec_set.time_dimension_specs})
.union({x.element_name for x in spec_set.entity_specs})
.union({x.element_name for x in spec_set.metric_specs})
.union({x.element_name for x in spec_set.group_by_metric_specs})
)
207 changes: 198 additions & 9 deletions tests/query_rendering/test_metric_filter_rendering.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import string

import pytest
from _pytest.fixtures import FixtureRequest
from dbt_semantic_interfaces.implementations.filters.where_filter import PydanticWhereFilter
Expand Down Expand Up @@ -283,25 +285,211 @@ def test_multi_hop_with_explicit_entity_link(
)


@pytest.mark.sql_engine_snapshot
def test_multi_hop_without_explicit_entity_link(
# TODO - need a different example because this one won't work due to ambiguous join path
# @pytest.mark.sql_engine_snapshot
# def test_multi_hop_without_explicit_entity_link(
# request: FixtureRequest,
# mf_test_configuration: MetricFlowTestConfiguration,
# dataflow_plan_builder: DataflowPlanBuilder,
# sql_client: SqlClient,
# dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
# query_parser: MetricFlowQueryParser,
# ) -> None:
# """Tests a metric filter that requires multiple join hops and does not state the entity link.

# Should return the same SQL as if the entity link was stated (group by resolution determines the entity link).
# """
# query_spec = query_parser.parse_and_validate_query(
# metric_names=("listings",),
# where_constraint=PydanticWhereFilter(
# where_sql_template="{{ Metric('instant_bookings', ['company']) }} > 2",
# ),
# )
# dataflow_plan = dataflow_plan_builder.build_plan(query_spec)

# convert_and_check(
# request=request,
# mf_test_configuration=mf_test_configuration,
# dataflow_to_sql_converter=dataflow_to_sql_converter,
# sql_client=sql_client,
# node=dataflow_plan.sink_output_nodes[0].parent_node,
# )


# Remove from group by options: cumulative & time offset metrics (temporary) - TODO
# Fix join path resolution for these. MORE IMPORTANT!
# The group by shows you the path to get from the query-level measure to the entity, not the filter-level metric to the entity. That's confusing. Is that how it should be??


def test_all_available_single_hop_metric_filters(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
sql_client: SqlClient,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
query_parser: MetricFlowQueryParser,
) -> None:
"""Tests a metric filter that requires multiple join hops and does not state the entity link.
"""Checks that all allowed metric filters do not error."""
# TODO: get filter options from linkable spec resolver instead of hard coding
filter_strs = [
# "listing__active_listings", # why doesn't this work?
"listing__approximate_continuous_booking_value_p99",
"listing__approximate_discrete_booking_value_p99",
"listing__average_booking_value",
# "listing__average_instant_booking_value",
"listing__bookers",
"listing__booking_fees",
# "listing__booking_fees_last_week_per_booker_this_week", # offset metrics require metric_time
"listing__booking_fees_per_booker",
# "listing__booking_fees_since_start_of_month",
"listing__booking_payments",
"listing__booking_value",
# "listing__booking_value_for_non_null_listing_id",
# "listing__booking_value_p99",
# "listing__booking_value_per_view",
# "listing__booking_value_sub_instant",
# "listing__booking_value_sub_instant_add_10",
"listing__bookings",
# "listing__bookings_5_day_lag",
# "listing__bookings_at_start_of_month",
"listing__bookings_fill_nulls_with_0",
"listing__bookings_fill_nulls_with_0_without_time_spine",
# "listing__bookings_growth_2_weeks",
# "listing__bookings_growth_2_weeks_fill_nulls_with_0",
# "listing__bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset",
# "listing__bookings_growth_since_start_of_month",
"listing__bookings_join_to_time_spine",
# "listing__bookings_month_start_compared_to_1_month_prior",
# "listing__bookings_offset_once",
# "listing__bookings_offset_twice",
"listing__bookings_per_booker",
"listing__bookings_per_dollar",
"listing__bookings_per_listing",
# "listing__bookings_per_lux_listing_derived",
"listing__bookings_per_view",
# "listing__every_2_days_bookers_2_days_ago",
# "listing__every_two_days_bookers",
# "listing__every_two_days_bookers_fill_nulls_with_0",
# "listing__instant_booking_fraction_of_max_value",
# "listing__instant_booking_value",
# "listing__instant_booking_value_ratio",
"listing__instant_bookings",
# "listing__instant_lux_booking_value_rate",
"listing__instant_plus_non_referred_bookings_pct",
"listing__largest_listing",
"listing__listings",
# "listing__lux_booking_fraction_of_max_value",
# "listing__lux_booking_value_rate_expr",
# "listing__lux_listings",
# "listing__max_booking_value",
# "listing__median_booking_value",
# "listing__min_booking_value",
"listing__nested_fill_nulls_without_time_spine",
"listing__non_referred_bookings_pct",
"listing__referred_bookings",
"listing__smallest_listing",
"listing__twice_bookings_fill_nulls_with_0_without_time_spine",
"listing__views",
"listing__views_times_booking_value",
# "user__active_listings",
# "user__approximate_continuous_booking_value_p99",
# "user__approximate_discrete_booking_value_p99",
# "user__average_booking_value",
# "user__average_instant_booking_value",
# "user__bookers",
# "user__booking_fees",
# "user__booking_fees_last_week_per_booker_this_week",
# "user__booking_fees_per_booker",
# "user__booking_fees_since_start_of_month",
# "user__booking_payments",
# "user__booking_value",
# "user__booking_value_for_non_null_listing_id",
# "user__booking_value_p99",
# "user__booking_value_per_view",
# "user__booking_value_sub_instant",
# "user__booking_value_sub_instant_add_10",
# "user__bookings",
# "user__bookings_5_day_lag",
# "user__bookings_at_start_of_month",
# "user__bookings_fill_nulls_with_0",
# "user__bookings_fill_nulls_with_0_without_time_spine",
# "user__bookings_growth_2_weeks",
# "user__bookings_growth_2_weeks_fill_nulls_with_0",
# "user__bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset",
# "user__bookings_growth_since_start_of_month",
# "user__bookings_join_to_time_spine",
# "user__bookings_month_start_compared_to_1_month_prior",
# "user__bookings_offset_once",
# "user__bookings_offset_twice",
# "user__bookings_per_booker",
# "user__bookings_per_dollar",
# "user__bookings_per_listing",
# "user__bookings_per_lux_listing_derived",
# "user__bookings_per_view",
# "user__every_2_days_bookers_2_days_ago",
# "user__every_two_days_bookers",
# "user__every_two_days_bookers_fill_nulls_with_0",
"user__identity_verifications",
# "user__instant_booking_fraction_of_max_value",
# "user__instant_booking_value",
# "user__instant_booking_value_ratio",
# "user__instant_bookings",
# "user__instant_lux_booking_value_rate",
# "user__instant_plus_non_referred_bookings_pct",
"user__largest_listing",
"user__listings",
# "user__lux_booking_fraction_of_max_value",
# "user__lux_booking_value_rate_expr",
# "user__lux_listings",
# "user__max_booking_value",
# "user__median_booking_value",
# "user__min_booking_value",
# "user__nested_fill_nulls_without_time_spine",
# "user__non_referred_bookings_pct",
# "user__referred_bookings",
# "user__regional_starting_balance_ratios",
"user__revenue",
"user__revenue_all_time",
"user__revenue_mtd",
"user__smallest_listing",
# "user__total_account_balance_first_day",
"user__trailing_2_months_revenue",
"user__trailing_2_months_revenue_sub_10",
# "user__twice_bookings_fill_nulls_with_0_without_time_spine",
"user__views",
# "user__views_times_booking_value",
"user__visit_buy_conversion_rate",
"user__visit_buy_conversion_rate_7days",
"user__visit_buy_conversion_rate_7days_fill_nulls_with_0",
"user__visit_buy_conversion_rate_by_session",
"user__visit_buy_conversions",
]
for filter_str in filter_strs:
entity_name, metric_name = filter_str.split("__")
query_spec = query_parser.parse_and_validate_query(
metric_names=("listings",),
where_constraint=PydanticWhereFilter(
where_sql_template=string.Template("{{ Metric('$metric_name', ['$entity_name']) }} > 2").substitute(
metric_name=metric_name, entity_name=entity_name
),
),
)
dataflow_plan_builder.build_plan(query_spec)

Should return the same SQL as if the entity link was stated (group by resolution determines the entity link).
"""

@pytest.mark.sql_engine_snapshot
def test_testy_test(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
sql_client: SqlClient,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("listings",),
where_constraint=PydanticWhereFilter(
where_sql_template="{{ Metric('instant_bookings', ['company']) }} > 2",
),
metric_names=("instant_bookings",), group_by_names=("user__company",)
)
assert 0, query_spec.entity_specs
dataflow_plan = dataflow_plan_builder.build_plan(query_spec)

convert_and_check(
Expand All @@ -311,3 +499,4 @@ def test_multi_hop_without_explicit_entity_link(
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)
# Join path used: listing__user__company

0 comments on commit ab57be6

Please sign in to comment.