Skip to content

Commit

Permalink
refactor(sdk/patch): improve patch implementation internals (datahub-…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 2, 2025
1 parent bdc34b7 commit f396d8d
Show file tree
Hide file tree
Showing 21 changed files with 535 additions and 992 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
directory: ./build/coverage-reports/
fail_ci_if_error: false
flags: airflow,airflow-${{ matrix.extra_pip_extras }}
name: pytest-airflow-${{ matrix.python-version }}-${{ matrix.extra_pip_requirements }}
flags: airflow-${{ matrix.python-version }}-${{ matrix.extra_pip_extras }}
name: pytest-airflow
verbose: true

event-file:
Expand Down
9 changes: 3 additions & 6 deletions .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ jobs:
"testIntegrationBatch1",
"testIntegrationBatch2",
]
include:
- python-version: "3.8"
- python-version: "3.11"
fail-fast: false
steps:
- name: Free up disk space
Expand Down Expand Up @@ -92,14 +89,14 @@ jobs:
**/junit.*.xml
!**/binary/**
- name: Upload coverage to Codecov
if: ${{ always() && matrix.python-version == '3.10' }}
if: ${{ always() }}
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: ./build/coverage-reports/
fail_ci_if_error: false
flags: pytest-${{ matrix.command }}
name: pytest-${{ matrix.python-version }}-${{ matrix.command }}
flags: ingestion-${{ matrix.python-version }}-${{ matrix.command }}
name: pytest-ingestion
verbose: true

event-file:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/prefect-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
directory: ./build/coverage-reports/
fail_ci_if_error: false
flags: prefect,prefect-${{ matrix.python-version }}
name: pytest-prefect-${{ matrix.python-version }}
flags: prefect-${{ matrix.python-version }}
name: pytest-prefect
verbose: true

event-file:
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import typing_inspect
from avrogen.dict_wrapper import DictWrapper
from typing_extensions import assert_never

from datahub.emitter.enum_helpers import get_enum_options
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -269,9 +270,8 @@ def make_owner_urn(owner: str, owner_type: OwnerType) -> str:
return make_user_urn(owner)
elif owner_type == OwnerType.GROUP:
return make_group_urn(owner)
# This should pretty much never happen.
# TODO: With Python 3.11, we can use typing.assert_never() here.
return f"urn:li:{owner_type.value}:{owner}"
else:
assert_never(owner_type)


def make_ownership_type_urn(type: str) -> str:
Expand Down
48 changes: 36 additions & 12 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Union
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Protocol,
Tuple,
Union,
runtime_checkable,
)

from typing_extensions import LiteralString

from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE
from datahub.emitter.serialization_helper import pre_json_transform
Expand All @@ -19,25 +31,36 @@
from datahub.utilities.urns.urn import guess_entity_type


@runtime_checkable
class SupportsToObj(Protocol):
def to_obj(self) -> Any:
...


def _recursive_to_obj(obj: Any) -> Any:
if isinstance(obj, list):
return [_recursive_to_obj(v) for v in obj]
elif hasattr(obj, "to_obj"):
elif isinstance(obj, SupportsToObj):
return obj.to_obj()
else:
return obj


PatchPath = Tuple[Union[LiteralString, Urn], ...]
PatchOp = Literal["add", "remove", "replace"]


@dataclass
class _Patch:
op: str # one of ['add', 'remove', 'replace']; we don't support move, copy or test
path: str
class _Patch(SupportsToObj):
op: PatchOp
path: PatchPath
value: Any

def to_obj(self) -> Dict:
quoted_path = "/" + "/".join(MetadataPatchProposal.quote(p) for p in self.path)
return {
"op": self.op,
"path": self.path,
"path": quoted_path,
"value": _recursive_to_obj(self.value),
}

Expand All @@ -63,15 +86,16 @@ def __init__(

# Json Patch quoting based on https://jsonpatch.com/#json-pointer
@classmethod
def quote(cls, value: str) -> str:
return value.replace("~", "~0").replace("/", "~1")
def quote(cls, value: Union[str, Urn]) -> str:
return str(value).replace("~", "~0").replace("/", "~1")

def _add_patch(
self, aspect_name: str, op: str, path: Union[str, Sequence[str]], value: Any
self,
aspect_name: str,
op: PatchOp,
path: PatchPath,
value: Any,
) -> None:
if not isinstance(path, str):
path = "/" + "/".join(self.quote(p) for p in path)

# TODO: Validate that aspectName is a valid aspect for this entityType
self.patches[aspect_name].append(_Patch(op, path, value))

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from abc import abstractmethod
from typing import Dict, Optional, Tuple

from typing_extensions import Self

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath


class HasCustomPropertiesPatch(MetadataPatchProposal):
@classmethod
@abstractmethod
def _custom_properties_location(self) -> Tuple[str, PatchPath]:
...

def add_custom_property(self, key: str, value: str) -> Self:
"""Add a custom property to the entity.
Args:
key: The key of the custom property.
value: The value of the custom property.
Returns:
The patch builder instance.
"""
aspect_name, path = self._custom_properties_location()
self._add_patch(
aspect_name,
"add",
path=(*path, key),
value=value,
)
return self

def add_custom_properties(
self, custom_properties: Optional[Dict[str, str]] = None
) -> Self:
if custom_properties is not None:
for key, value in custom_properties.items():
self.add_custom_property(key, value)
return self

def remove_custom_property(self, key: str) -> Self:
"""Remove a custom property from the entity.
Args:
key: The key of the custom property to remove.
Returns:
The patch builder instance.
"""
aspect_name, path = self._custom_properties_location()
self._add_patch(
aspect_name,
"remove",
path=(*path, key),
value={},
)
return self

def set_custom_properties(self, custom_properties: Dict[str, str]) -> Self:
"""Sets the custom properties of the entity.
This method replaces all existing custom properties with the given dictionary.
Args:
custom_properties: A dictionary containing the custom properties to be set.
Returns:
The patch builder instance.
"""

aspect_name, path = self._custom_properties_location()
self._add_patch(
aspect_name,
"add",
path=path,
value=custom_properties,
)
return self
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import List, Optional

from typing_extensions import Self

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)


class HasOwnershipPatch(MetadataPatchProposal):
def add_owner(self, owner: OwnerClass) -> Self:
"""Add an owner to the entity.
Args:
owner: The Owner object to add.
Returns:
The patch builder instance.
"""
self._add_patch(
OwnershipClass.ASPECT_NAME,
"add",
path=("owners", owner.owner, str(owner.type)),
value=owner,
)
return self

def remove_owner(
self, owner: str, owner_type: Optional[OwnershipTypeClass] = None
) -> Self:
"""Remove an owner from the entity.
If owner_type is not provided, the owner will be removed regardless of ownership type.
Args:
owner: The owner to remove.
owner_type: The ownership type of the owner (optional).
Returns:
The patch builder instance.
"""
self._add_patch(
OwnershipClass.ASPECT_NAME,
"remove",
path=("owners", owner) + ((str(owner_type),) if owner_type else ()),
value=owner,
)
return self

def set_owners(self, owners: List[OwnerClass]) -> Self:
"""Set the owners of the entity.
This will effectively replace all existing owners with the new list - it doesn't really patch things.
Args:
owners: The list of owners to set.
Returns:
The patch builder instance.
"""
self._add_patch(
OwnershipClass.ASPECT_NAME, "add", path=("owners",), value=owners
)
return self
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import List, Union

from typing_extensions import Self

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
StructuredPropertiesClass,
StructuredPropertyValueAssignmentClass,
)
from datahub.utilities.urns.structured_properties_urn import (
make_structured_property_urn,
)


class HasStructuredPropertiesPatch(MetadataPatchProposal):
def set_structured_property(
self, key: str, value: Union[str, float, List[Union[str, float]]]
) -> Self:
"""Add or update a structured property.
Args:
key: the name of the property (either bare or urn form)
value: the value of the property (for multi-valued properties, this can be a list)
Returns:
The patch builder instance.
"""
self.remove_structured_property(key)
self.add_structured_property(key, value)
return self

def remove_structured_property(self, key: str) -> Self:
"""Remove a structured property.
Args:
key: the name of the property (either bare or urn form)
Returns:
The patch builder instance.
"""

self._add_patch(
StructuredPropertiesClass.ASPECT_NAME,
"remove",
path=("properties", make_structured_property_urn(key)),
value={},
)
return self

def add_structured_property(
self, key: str, value: Union[str, float, List[Union[str, float]]]
) -> Self:
"""Add a structured property.
Args:
key: the name of the property (either bare or urn form)
value: the value of the property (for multi-valued properties, this value will be appended to the list)
Returns:
The patch builder instance.
"""

self._add_patch(
StructuredPropertiesClass.ASPECT_NAME,
"add",
path=("properties", make_structured_property_urn(key)),
value=StructuredPropertyValueAssignmentClass(
propertyUrn=make_structured_property_urn(key),
values=value if isinstance(value, list) else [value],
),
)
return self
Loading

0 comments on commit f396d8d

Please sign in to comment.