Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- Added `parent_ids` internal field to collections to support multi-catalog hierarchies. Collections can now belong to multiple catalogs, with parent catalog IDs stored in this field for efficient querying and management. [#554](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/554)


### Changed

- Have opensearch datetime, geometry and collections fields defined as constant strings [#553](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/553)

### Fixed

- Fix unawaited coroutine in `stac_fastapi.core.core`. [#551](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/551)
Expand Down
137 changes: 107 additions & 30 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ def __attrs_post_init__(self):

aggregation_mapping: Dict[str, Dict[str, Any]] = AGGREGATION_MAPPING

# constants for field names
# they are used in multiple methods
# and could be overwritten in subclasses used with alternate opensearch mappings.
PROPERTIES_DATETIME_FIELD = "properties.datetime"
PROPERTIES_START_DATETIME_FIELD = "properties.start_datetime"
PROPERTIES_END_DATETIME_FIELD = "properties.end_datetime"
COLLECTION_FIELD = "collection"
GEOMETRY_FIELD = "geometry"

@staticmethod
def __nested_field__(field: str):
"""Convert opensearch field to nested field format."""
return field.replace(".", "__")

"""CORE LOGIC"""

async def get_all_collections(
Expand Down Expand Up @@ -436,7 +450,10 @@ def apply_ids_filter(search: Search, item_ids: List[str]):
@staticmethod
def apply_collections_filter(search: Search, collection_ids: List[str]):
"""Database logic to search a list of STAC collection ids."""
return search.filter("terms", collection=collection_ids)
collection_nested_field = DatabaseLogic.__nested_field__(
DatabaseLogic.COLLECTION_FIELD
)
return search.filter("terms", **{collection_nested_field: collection_ids})

@staticmethod
def apply_datetime_filter(
Expand All @@ -461,6 +478,16 @@ def apply_datetime_filter(
if not datetime_search:
return search, datetime_search

nested_datetime_field = DatabaseLogic.__nested_field__(
DatabaseLogic.PROPERTIES_DATETIME_FIELD
)
nested_start_datetime_field = DatabaseLogic.__nested_field__(
DatabaseLogic.PROPERTIES_START_DATETIME_FIELD
)
nested_end_datetime_field = DatabaseLogic.__nested_field__(
DatabaseLogic.PROPERTIES_END_DATETIME_FIELD
)

if USE_DATETIME:
if "eq" in datetime_search:
# For exact matches, include:
Expand All @@ -470,28 +497,42 @@ def apply_datetime_filter(
Q(
"bool",
filter=[
Q("exists", field="properties.datetime"),
Q("exists", field=DatabaseLogic.PROPERTIES_DATETIME_FIELD),
Q(
"term",
**{"properties__datetime": datetime_search["eq"]},
**{nested_datetime_field: datetime_search["eq"]},
),
],
),
Q(
"bool",
must_not=[Q("exists", field="properties.datetime")],
must_not=[
Q("exists", field=DatabaseLogic.PROPERTIES_DATETIME_FIELD)
],
filter=[
Q("exists", field="properties.start_datetime"),
Q("exists", field="properties.end_datetime"),
Q(
"exists",
field=DatabaseLogic.PROPERTIES_START_DATETIME_FIELD,
),
Q(
"exists",
field=DatabaseLogic.PROPERTIES_END_DATETIME_FIELD,
),
Q(
"range",
properties__start_datetime={
"lte": datetime_search["eq"]
**{
nested_start_datetime_field: {
"lte": datetime_search["eq"]
}
},
),
Q(
"range",
properties__end_datetime={"gte": datetime_search["eq"]},
**{
nested_end_datetime_field: {
"gte": datetime_search["eq"]
}
},
),
],
),
Expand All @@ -504,32 +545,46 @@ def apply_datetime_filter(
Q(
"bool",
filter=[
Q("exists", field="properties.datetime"),
Q("exists", field=DatabaseLogic.PROPERTIES_DATETIME_FIELD),
Q(
"range",
properties__datetime={
"gte": datetime_search["gte"],
"lte": datetime_search["lte"],
**{
nested_datetime_field: {
"gte": datetime_search["gte"],
"lte": datetime_search["lte"],
}
},
),
],
),
Q(
"bool",
must_not=[Q("exists", field="properties.datetime")],
must_not=[
Q("exists", field=DatabaseLogic.PROPERTIES_DATETIME_FIELD)
],
filter=[
Q("exists", field="properties.start_datetime"),
Q("exists", field="properties.end_datetime"),
Q(
"exists",
field=DatabaseLogic.PROPERTIES_START_DATETIME_FIELD,
),
Q(
"exists",
field=DatabaseLogic.PROPERTIES_END_DATETIME_FIELD,
),
Q(
"range",
properties__start_datetime={
"lte": datetime_search["lte"]
**{
nested_start_datetime_field: {
"lte": datetime_search["lte"]
}
},
),
Q(
"range",
properties__end_datetime={
"gte": datetime_search["gte"]
**{
nested_end_datetime_field: {
"gte": datetime_search["gte"]
}
},
),
],
Expand All @@ -545,31 +600,53 @@ def apply_datetime_filter(
filter_query = Q(
"bool",
filter=[
Q("exists", field="properties.start_datetime"),
Q("exists", field="properties.end_datetime"),
Q(
"exists",
field=DatabaseLogic.PROPERTIES_START_DATETIME_FIELD,
),
Q("exists", field=DatabaseLogic.PROPERTIES_END_DATETIME_FIELD),
Q(
"range",
properties__start_datetime={"lte": datetime_search["eq"]},
**{
nested_start_datetime_field: {
"lte": datetime_search["eq"]
}
},
),
Q(
"range",
properties__end_datetime={"gte": datetime_search["eq"]},
**{
nested_end_datetime_field: {
"gte": datetime_search["eq"]
}
},
),
],
)
else:
filter_query = Q(
"bool",
filter=[
Q("exists", field="properties.start_datetime"),
Q("exists", field="properties.end_datetime"),
Q(
"exists",
field=DatabaseLogic.PROPERTIES_START_DATETIME_FIELD,
),
Q("exists", field=DatabaseLogic.PROPERTIES_END_DATETIME_FIELD),
Q(
"range",
properties__start_datetime={"lte": datetime_search["lte"]},
**{
nested_start_datetime_field: {
"lte": datetime_search["lte"]
}
},
),
Q(
"range",
properties__end_datetime={"gte": datetime_search["gte"]},
**{
nested_end_datetime_field: {
"gte": datetime_search["gte"]
}
},
),
],
)
Expand All @@ -594,7 +671,7 @@ def apply_bbox_filter(search: Search, bbox: List):
Q(
{
"geo_shape": {
"geometry": {
DatabaseLogic.GEOMETRY_FIELD: {
"shape": {
"type": "polygon",
"coordinates": bbox2polygon(*bbox),
Expand Down Expand Up @@ -1708,7 +1785,7 @@ def bulk_sync(
kwargs = kwargs or {}

# Resolve the `refresh` parameter
refresh = kwargs.get("refresh", self.async_settings.database_refresh)
refresh = kwargs.get("refresh", self.sync_settings.database_refresh)
refresh = validate_refresh(refresh)

# Log the bulk insert attempt
Expand Down
Loading