Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'get_aggregation_unit' action and 'aggregation_unit' params for all spatial aggregates #5278

Merged
merged 46 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8537429
OneOfQuerySchema
jc-harrison Jun 15, 2022
1a980cf
Remove hack for making query_kind read-only
jc-harrison Jun 15, 2022
9f40488
Try adding read-only aggregation unit to SpatialAggregateSchema
jc-harrison Jun 15, 2022
8cccc16
Don't create new scopes from read-only aggregation unit
jc-harrison Jun 15, 2022
67d9670
Fix get_available_queries action
jc-harrison Jun 16, 2022
90bac9b
Re-approve API schema
jc-harrison Jun 16, 2022
16ea188
Helper function to load query
jc-harrison Jun 20, 2022
db52326
Add "get_aggregation_unit" action handler
jc-harrison Jun 20, 2022
43bacdb
Update "unknown action" test
jc-harrison Jun 20, 2022
91c6ce2
Add some tests for get_aggregation_unit action
jc-harrison Jul 1, 2022
8820019
Fix #4816
jc-harrison Jul 4, 2022
c18d822
Add regression test for #4816
jc-harrison Jul 4, 2022
4d4c9eb
Add tests for all spatial aggregate query kinds
jc-harrison Jul 4, 2022
cd03113
Add aggregation_unit property to active_at_reference_location_counts
jc-harrison Jul 4, 2022
74a4ed7
add aggregation_unit property to aggregate_network_objects
jc-harrison Jul 4, 2022
c662c92
Add aggregation_unit property to flows
jc-harrison Jul 4, 2022
11ed189
Make aggregation_unit attribute of DFSTotalMetricAmountExposed consis…
jc-harrison Jul 4, 2022
4e3f383
Don't silently treat badly-formed aggregation units as not-aggregatio…
jc-harrison Jul 5, 2022
c59924f
Fix default direction param
jc-harrison Jul 5, 2022
82cb141
Add aggregation_unit property to joined_spatial_aggregate
jc-harrison Jul 5, 2022
1543fba
Add aggregation_unit property to labelled_flows
jc-harrison Jul 5, 2022
84ee9fd
Add aggregation_unit property to labelled_spatial_aggregate
jc-harrison Jul 5, 2022
f348afb
Add aggregation_unit property to unique_visitor_counts
jc-harrison Jul 5, 2022
790ffb4
Add aggregation_unit property to unmoving_counts
jc-harrison Jul 5, 2022
3960186
Add aggregation_unit property to unmoving_at_reference_location_counts
jc-harrison Jul 5, 2022
4763ccb
Ensure all required fields are marked as such
jc-harrison Jul 6, 2022
f8edee4
Skip schema validation on field errors
jc-harrison Jul 6, 2022
b3c7354
Fix Direction field
jc-harrison Jul 6, 2022
4092a35
Use real db connection for action handler test
jc-harrison Jul 6, 2022
112db6d
Fix LocationIntroversion
jc-harrison Jul 6, 2022
97c4847
Add missing query field in test params
jc-harrison Jul 6, 2022
4a8edee
Integrate nocturnal_events spec change into flowclient
jc-harrison Jul 6, 2022
0220f09
Re-approve API spec
jc-harrison Jul 6, 2022
f1abe58
Add docs section on exposing a new query kind
jc-harrison Jul 7, 2022
47d161e
Fix docs
jc-harrison Jul 7, 2022
ff419e1
Update flowmachine/flowmachine/core/server/query_schemas/aggregation_…
jc-harrison Jul 7, 2022
f2dba5d
Merge branch 'master' into explicit-aggregation-unit
jc-harrison Jul 7, 2022
a2a6f8e
More tests
jc-harrison Jul 8, 2022
b2bc665
Approve query IDs for modified query specs
jc-harrison Jul 8, 2022
bc60f17
Docstrings
jc-harrison Jul 8, 2022
96794da
Merge remote-tracking branch 'origin/master' into explicit-aggregatio…
jc-harrison Jul 8, 2022
b06785e
Update CHANGELOG.md
jc-harrison Jul 8, 2022
0560a96
Merge branch 'master' into explicit-aggregation-unit
jc-harrison Jul 11, 2022
d8faf2c
Merge branch 'master' into explicit-aggregation-unit
jc-harrison Aug 9, 2022
d3af691
Quiet pip install to avert threading issue
jc-harrison Aug 12, 2022
64a9ff7
Merge branch 'master' into explicit-aggregation-unit
jc-harrison Aug 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 37 additions & 36 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,49 +26,49 @@ nav:
- index.md
- install.md
- Analysts:
- analyst/index.md
- FlowClient:
- analyst/flowclient/example_usage.ipynb
- api-docs-flowclient
- Worked examples:
- analyst/worked_examples/index.md
- analyst/worked_examples/flows-above-normal.ipynb
- analyst/worked_examples/commuting-patterns.ipynb
- analyst/worked_examples/cell-towers-per-region.ipynb
- analyst/worked_examples/unique-subscriber-counts.ipynb
- analyst/worked_examples/joined-spatial-aggregate.ipynb
- analyst/autoflow.md
- Advanced usage:
- analyst/advanced_usage/index.md
- Worked examples:
- analyst/advanced_usage/worked_examples/mobile-data-usage.ipynb
- analyst/index.md
- FlowClient:
- analyst/flowclient/example_usage.ipynb
- api-docs-flowclient
- Worked examples:
- analyst/worked_examples/index.md
- analyst/worked_examples/flows-above-normal.ipynb
- analyst/worked_examples/commuting-patterns.ipynb
- analyst/worked_examples/cell-towers-per-region.ipynb
- analyst/worked_examples/unique-subscriber-counts.ipynb
- analyst/worked_examples/joined-spatial-aggregate.ipynb
- analyst/autoflow.md
- Advanced usage:
- analyst/advanced_usage/index.md
- Worked examples:
- analyst/advanced_usage/worked_examples/mobile-data-usage.ipynb
- Developers:
- developer/index.md
- developer/dev_environment_setup.md
- api-docs-flowmachine
- FlowAPI specification: developer/api-spec.html
- Architectural Decision Records (ADR):
- developer/adr/README.md
- developer/adr/0001-pipenv-for-package-and-dependency-management.md
- developer/adr/0002-pytest-for-testing.md
- developer/adr/0003-http-api.md
- developer/adr/0004-http-framework.md
- developer/adr/0005-IPC-methods.md
- developer/adr/0006-JWTs.md
- developer/adr/0007-mapbox-for-worked-examples.md
- developer/adr/0008-jupyter-notebooks-for-autoflow.md
- developer/adr/0009-asciidoctor-pdf-for-notebook-conversion.md
- developer/adr/0010-prefect-for-autoflow.md
- developer/adr/README.md
- developer/adr/0001-pipenv-for-package-and-dependency-management.md
- developer/adr/0002-pytest-for-testing.md
- developer/adr/0003-http-api.md
- developer/adr/0004-http-framework.md
- developer/adr/0005-IPC-methods.md
- developer/adr/0006-JWTs.md
- developer/adr/0007-mapbox-for-worked-examples.md
- developer/adr/0008-jupyter-notebooks-for-autoflow.md
- developer/adr/0009-asciidoctor-pdf-for-notebook-conversion.md
- developer/adr/0010-prefect-for-autoflow.md
- System Administrators:
- administrator/index.md
- administrator/deployment.md
- System management:
- administrator/management/cache.md
- ETL:
- administrator/management/etl/etl.md
- api-docs-flowetl
- administrator/management/logging.md
- administrator/management/users.md
- administrator/index.md
- administrator/deployment.md
- System management:
- administrator/management/cache.md
- ETL:
- administrator/management/etl/etl.md
- api-docs-flowetl
- administrator/management/logging.md
- administrator/management/users.md
- license.md
- changelog.md

Expand All @@ -82,6 +82,7 @@ theme:
language: en
features:
- navigation.tabs
- content.tabs.link
palette:
accent: "#095798"
primary: "#2977B8"
Expand Down
201 changes: 201 additions & 0 deletions docs/source/developer/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,207 @@ FlowMachine is a Python toolkit for the analysis of CDR data. It is essentially

Documentation for FlowMachine can be found [here](../flowmachine/flowmachine/). A worked example of using FlowMachine for analysis is provided [here](../analyst/advanced_usage/worked_examples/mobile-data-usage/).

### Exposing a new query kind

The FlowMachine server is responsible for defining the queries that users can run via FlowAPI. Translation from parameters provided in calls to the `run` API endpoint to the underlying FlowMachine query objects is handled by [marshmallow](https://marshmallow.readthedocs.io) schemas defined in [flowmachine.core.server.query_schemas](../flowmachine/flowmachine/core/server/query_schemas/).

In this section we assume that a FlowMachine query class `MyQuery` (derived from [`flowmachine.core.query.Query](../flowmachine/flowmachine/core/query/#class-query)) is already defined, and describe the steps required to expose this query class, so that queries of this kind can be run via the API.

The information below refers to the following categories of query:
- **aggregate**: A query whose results are aggregated over groups of subscribers, so that no individual-level information is revealed. Example: [histogram_aggregate](../flowmachine/flowmachine/core/server/query_schemas/histogram_aggregate/)
- **spatial aggregate**: An aggregate query that returns a result per location (e.g. a count of subscribers per administrative region). Example: [spatial_aggregate](../flowmachine/flowmachine/core/server/query_schemas/spatial_aggregate/)
- **individual-level query**: A query whose result consists of a value per subscriber. Example: [event_count](../flowmachine/flowmachine/core/server/query_schemas/event_count/)
- **reference location**: An individual-level query that assigns a single location to each subscriber. Example: [daily_location](../flowmachine/flowmachine/core/server/query_schemas/daily_location/)

#### 1. Define an "exposed query" class

In a new file 'flowmachine/core/server/query_schemas/my_query.py', define a new class `MyQueryExposed`. This class is responsible for constructing the appropriate `MyQuery` object from parameter values supplied in an API call.

There are two options here: if users should be able to select a random sample of the rows from this query result, the exposed query class should inherit from [`BaseExposedQueryWithSampling`](../flowmachine/flowmachine/core/server/query_schemas/base_query_with_sampling/#class-baseexposedquerywithsampling) (this is usually appropriate for _individual-level_ queries). If it does not make sense to allow random sampling of the query result (as is usually the case for _aggregate_ queries), the exposed query class should inherit from [`BaseExposedQuery`](../flowmachine/flowmachine/core/server/query_schemas/base_exposed_query/#class-baseexposedquery).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might consider changing the names of these really.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, could do. What would you suggest?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about BaseIndividualQuery and BaseAggregateQuery, but I'm not sure it makes sense given that it may not be universally the case that these are the appropriate bases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes, I don't know whether it will always be the case that we'd want to allow sampling for all non-aggregates and never for aggregates.

I wonder whether we'd be better-off rolling the sampling stuff into BaseExposedQuery (i.e. apply sampling if there's a sampling parameter, otherwise don't), and turning BaseQueryWithSamplingSchema into a mixin - then we don't have to worry about having two different bases.


=== "Without sampling"

```python
from flowmachine.core.server.query_schemas.base_exposed_query import BaseExposedQuery

class MyQueryExposed(BaseExposedQuery):
query_kind = "daily_location" # (1)

def __init__( # (2)
self,
*,
start_date,
end_date,
sub_query,
other_param,
):
self.start_date = start_date # (3)
self.end_date = end_date
self.sub_query = sub_query
self.other_param = other_param

@property
def aggregation_unit(self): # (4)
return self.sub_query.aggregation_unit

@property
def _flowmachine_query_obj(self): # (5)
return MyQuery(
start=self.start_date, # (6)
stop=self.end_date,
sub_query=self.sub_query._flowmachine_query_obj, # (7)
other_param=self.other_param,
non_exposed_param="default_value", # (8)
)
```

1. `query_kind` class attribute is required, and must be different from the `query_kind` of all other exposed query classes.
2. The `__init__` method should take as arguments all parameters of `MyQuery` that will be exposed via the API.
3. All input parameters ust be set as attributes on `self` so that the object can be serialised correctly.
4. If `MyQuery` is a _spatial aggregate_ or a _reference location_, but does not have an explicit `aggregation_unit` parameter (e.g. because the aggregation unit is determined by a nested sub-query), you must define an `aggregation_unit` property or attribute so that other queries (and the `get_aggregation_unit` server action) can identify the aggregation unit associated with this query.
5. Define a `_flowmachine_query_obj` property that returns the underlying `MyQuery` FlowMachine query object.
6. The exposed parameters do not need to have names that match the corresponding parameters of the underlying `MyQuery` object.
7. If a parameter is a nested sub-query, you will need to access its `_flowmachine_query_obj` property so that the `MyQuery` constructor receives a Flowmachine query object and not the _exposed_ query object.
8. It is not necessary for all parameters of the underlying `MyQuery` object to be exposed as parameters of `MyQueryExposed`.

=== "With sampling"

```python
from flowmachine.core.server.query_schemas.base_query_with_sampling import BaseExposedQueryWithSampling

class MyQueryExposed(BaseExposedQueryWithSampling):
query_kind = "daily_location" # (1)

def __init__( # (2)
self,
*,
start_date,
end_date,
sub_query,
other_param,
sampling=None, # (9)
):
self.start_date = start_date # (3)
self.end_date = end_date
self.sub_query = sub_query
self.other_param = other_param
self.sampling = sampling # (9)

@property
def aggregation_unit(self): # (4)
return self.sub_query.aggregation_unit

@property
def _unsampled_query_obj(self): # (5)
return MyQuery(
start=self.start_date, # (6)
stop=self.end_date,
sub_query=self.sub_query._flowmachine_query_obj, # (7)
other_param=self.other_param,
non_exposed_param="default_value", # (8)
)
```

1. `query_kind` class attribute is required, and must be different from the `query_kind` of all other exposed query classes.
2. The `__init__` method should take as arguments all parameters of `MyQuery` that will be exposed via the API.
3. All input parameters ust be set as attributes on `self` so that the object can be serialised correctly.
4. If `MyQuery` is a _spatial aggregate_ or a _reference location_, but does not have an explicit `aggregation_unit` parameter (e.g. because the aggregation unit is determined by a nested sub-query), you must define an `aggregation_unit` property or attribute so that other queries (and the `get_aggregation_unit` server action) can identify the aggregation unit associated with this query.
5. Define a `_unsampled_query_obj` property that returns the underlying `MyQuery` FlowMachine query object. **Note:** When inheriting from `BaseExposedQueryWithSampling`, this property should be named `_unsampled_query_obj` - the `_flowmachine_query_obj` property will return this query wrapped in an appropriate "random sample" query.
6. The exposed parameters do not need to have names that match the corresponding parameters of the underlying `MyQuery` object.
7. If a parameter is a nested sub-query, you will need to access its `_flowmachine_query_obj` property so that the `MyQuery` constructor receives a Flowmachine query object and not the _exposed_ query object.
8. It is not necessary for all parameters of the underlying `MyQuery` object to be exposed as parameters of `MyQueryExposed`.
9. When inheriting from `BaseExposedQueryWithSampling`, it is important to also accept the `sampling` argument here.


#### 2. Define a "query schema" class

In the same file as `MyQueryExposed`, define a new class `MyQuerySchema`. This is a [marshmallow](https://marshmallow.readthedocs.io) schema, responsible for validation and deserialisation of parameter values supplied in an API call.

As before, there are two options, depending on whether or not random sampling should be enabled for this query kind. If `MyQueryExposed` inherits from `BaseExposedQueryWithSampling` then `MyQuerySchema` should inherit from [`BaseQueryWithSamplingSchema`](../flowmachine/flowmachine/core/server/query_schemas/base_query_with_sampling/#class-basequerywithsamplingschema). Otherwise, `MyQuerySchema` should inherit from [`BaseSchema`](../flowmachine/flowmachine/core/server/query_schemas/base_schema/#class-baseschema).

=== "Without sampling"

```python
from marshmallow import fields, validate
from flowmachine.core.server.query_schemas.field_mixins import StartAndEndField
from flowmachine.core.server.query_schemas.base_schema import BaseSchema
from flowmachine.core.server.query_schemas.aggregation_unit import AggregationUnitKind

class MyQuerySchema(
StartAndEndField, # (1)
BaseSchema,
):
__model__ = MyQueryExposed # (2)

query_kind = fields.String( # (3)
validate=validate.OneOf([__model__.query_kind]),
required=True,
)
sub_query = fields.Nested(SomeOtherQuerySchema, required=True) # (4)
other_param = fields.Integer(
validate=validate.Range(0, 10), # (5)
required=False,
load_default=0, # (6)
)
# Only relevant for spatial aggregates:
aggregation_unit = AggregationUnitKind(dump_only=True) # (7)
```

1. The `StartAndEndField` mixin adds `start_date` and `end_date` fields. There are other mixins available for adding commonly-used fields, e.g. `HoursField` and `AggregationUnitMixin`.
2. Set `MyQueryExposed` as the `__model__` class attribute so that `MyQuerySchema` will deserialise parameters to an instance of `MyQueryExposed`.
3. `query_kind` field must be defined here. This field will not be passed on to `MyQueryExposed.__init__()`.
4. Sub-queries can be accepted as parameters by specifying the appropriate query schema in a marshmallow `Nested` field.
5. The fields specified here should provide all necessary validation of parameter values.
6. If you wish to set a default parameter value to be used if no value is supplied by the user, it is better to specify this here than in `MyQueryExposed.__init__()` so that the default value will be stated in the API spec.
7. If `MyQuery` is a _spatial aggregate_ but does not have an explicit `aggregation_unit` parameter (e.g. because the aggregation unit is determined by a nested sub-query), add a _dump-only_ 'aggregation_unit' field. This enables FlowAPI to identify this query kind as a spatial aggregate, without exposing a redundant 'aggregation_unit' input parameter.

=== "With sampling"

```python
from marshmallow import fields, validate
from flowmachine.core.server.query_schemas.field_mixins import StartAndEndField
from flowmachine.core.server.query_schemas.base_query_with_sampling import BaseQueryWithSamplingSchema
from flowmachine.core.server.query_schemas.aggregation_unit import AggregationUnitKind

class MyQuerySchema(
StartAndEndField, # (1)
BaseQueryWithSamplingSchema, # (8)
):
__model__ = MyQueryExposed # (2)

query_kind = fields.String( # (3)
validate=validate.OneOf([__model__.query_kind]),
required=True,
)
sub_query = fields.Nested(SomeOtherQuerySchema, required=True) # (4)
other_param = fields.Integer(
validate=validate.Range(0, 10), # (5)
required=False,
load_default=0, # (6)
)
# Only relevant for spatial aggregates:
aggregation_unit = AggregationUnitKind(dump_only=True) # (7)
```

1. The `StartAndEndField` mixin adds `start_date` and `end_date` fields. There are other mixins available for adding commonly-used fields, e.g. `HoursField` and `AggregationUnitMixin`.
2. Set `MyQueryExposed` as the `__model__` class attribute so that `MyQuerySchema` will deserialise parameters to an instance of `MyQueryExposed`.
3. `query_kind` field must be defined here. This field will not be passed on to `MyQueryExposed.__init__()`.
4. Sub-queries can be accepted as parameters by specifying the appropriate query schema in a marshmallow `Nested` field.
5. The fields specified here should provide all necessary validation of parameter values.
6. If you wish to set a default parameter value to be used if no value is supplied by the user, it is better to specify this here than in `MyQueryExposed.__init__()` so that the default value will be stated in the API spec.
7. If `MyQuery` is a _spatial aggregate_ but does not have an explicit `aggregation_unit` parameter (e.g. because the aggregation unit is determined by a nested sub-query), add a _dump-only_ 'aggregation_unit' field. This enables FlowAPI to identify this query kind as a spatial aggregate, without exposing a redundant 'aggregation_unit' input parameter.
8. `BaseQueryWithSamplingSchema` adds a `sampling` field.

#### 3. Expose the query

If `MyQuery` is an _aggregate_, it can be exposed as a top-level query (meaning that API users will be able to directly run and get the results of `MyQuery` queries). In this case, add `MyQuerySchema` to [`FlowmachineQuerySchema.query_schemas`](../flowmachine/flowmachine/core/server/query_schemas/flowmachine_query/#class-flowmachinequeryschema).

!!! warning

If the query is an _aggregate_ and will be exposed as a top-level query, it is essential that the underlying FlowMachine query defined in the exposed query's `_flowmachine_query_object` property is _redacted_ - i.e. all rows in the query result corresponding to 15 or fewer individuals are removed from the output. This protects individuals' privacy through k-anonymity.

If `MyQuery` is an _individual-level_ query, it should **not** be exposed directly as a top-level query. In this case, `MyQuerySchema` should be added as a nested sub-query parameter of the appropriate other query schemas. For example, if `MyQuery` is a _reference location_, add `MyQuerySchema` to [`ReferenceLocationSchema.query_schemas`](../flowmachine/flowmachine/core/server/query_schemas/reference_location/#class-referencelocationschema) so that it will be accepted as a parameter to query kinds such as `spatial_aggregate` and `flows`.


<a name="flowdb">

Expand Down
10 changes: 0 additions & 10 deletions flowapi/flowapi/api_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@ async def get_spec(socket: Socket, request_id: str) -> APISpec:
# Get the reply.
reply = await socket.recv_json()
flowmachine_query_schemas = reply["payload"]["query_schemas"]
# Need to mark query_kind as a required field
# this is a workaround because the marshmallow-oneOf plugin strips
# the query_kind off, which means it can't be required from the marshmallow
# side without raising an error
for schema, schema_dict in flowmachine_query_schemas.items():
try:
if "query_kind" in schema_dict["properties"]:
schema_dict["required"].append("query_kind")
except KeyError:
pass # Doesn't have any properties
spec = APISpec(
title="FlowAPI",
version=__version__,
Expand Down
3 changes: 3 additions & 0 deletions flowapi/flowapi/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def enum_paths(
"enum" in tree.keys()
and len(tree["enum"]) > 1
and new_path[-1] in argument_names_to_extract
and not (
("readOnly" in tree.keys()) and tree["readOnly"]
) # Workaround - read-only aggregation units will be required for new permissions, but should not be taken into account for current permissions
):
yield (new_path, f"{{{new_path[-1]}}}")
elif "items" in tree.keys():
Expand Down
Loading