improvement(tools): many new tools, redesigned existing tools, better token management#55
improvement(tools): many new tools, redesigned existing tools, better token management#55
Conversation
mayurinehate
left a comment
There was a problem hiding this comment.
Mostly looks good.
Minor suggestions for sanity of OSS repo.
CHANGELOG.md
Outdated
| #### New Files | ||
| - `_token_estimator.py`: Token counting utilities | ||
| - `gql/query_entity.gql`: Specialized query for QueryEntity type | ||
|
|
||
| ### Changed | ||
|
|
||
| - **Complete rewrite of `mcp_server.py`** (2,513 lines vs 662 in previous version) | ||
| - **GraphQL API migration**: `scrollAcrossEntities` → `searchAcrossEntities` | ||
| - Replaced `scrollId` parameter with `start` for pagination | ||
| - Added `viewUrn` and `sortInput` parameters | ||
| - **Updated `gql/search.gql`**: Modern search API with pagination | ||
| - **Updated `gql/entity_details.gql`**: Reformatted `#[CLOUD]` markers for better maintainability |
There was a problem hiding this comment.
I think, we should omit New and Changed files section. I don't think it adds much value, plus this is the most basic thing git gives right out of the box.
CHANGELOG.md
Outdated
| #### Tests Synced from Integrations Service | ||
| **Added**: Comprehensive test suite synced from internal integrations service (12 test files in `tests/mcp/`): | ||
| - Entity retrieval and queries | ||
| - Column lineage extraction | ||
| - Filter conversion logic | ||
| - Schema field operations | ||
| - Tag processing | ||
| - Lineage path calculations | ||
| - Helper function tests | ||
|
|
||
| **Not Synced**: `test_mcp_telemetry.py` and `test_mcp_server.py` (service-specific) | ||
|
|
||
| **Note**: Tests use `datahub_integrations` imports to remain identical to source. These are reference implementations - adaptation needed to run in OSS. | ||
|
|
||
| #### GraphQL Query Changes | ||
| - `scrollAcrossEntities` → `searchAcrossEntities` | ||
| - `scrollId` parameter removed (use `start` instead) | ||
| - New required parameter: `start` (integer offset) | ||
| - New optional parameters: `viewUrn`, `sortInput` | ||
|
|
||
| **Migration**: See examples in README or documentation. | ||
|
|
||
| ### Documentation | ||
|
|
||
| - Added module docstrings explaining repository sync requirements | ||
| - Inline comments documenting the importance of relative imports for cross-repo compatibility | ||
|
|
||
| ### Compatibility | ||
|
|
||
| - **DataHub OSS**: Full compatibility (cloud-specific fields automatically disabled) | ||
| - **DataHub Cloud**: Enhanced features when cloud fields available | ||
| - **GMS Versions**: Adaptive compatibility with version detection | ||
| - **MCP Protocol**: Compatible with MCP 2.0+ (uses FastMCP 2.10.5) | ||
|
|
||
| ### Internal | ||
|
|
||
| - Synced from internal DataHub integrations service (commit: 7077a9ce72) | ||
| - Service-specific files (`router.py`, `mcp_telemetry.py`) intentionally not included in open source version | ||
|
|
||
| --- |
There was a problem hiding this comment.
Let's remove everything from section #### Tests Synced from Integrations Service onwards till old release section ## [0.3.11] and earlier. Its mostly repetition and needless.
|
|
||
| # Log configuration on startup | ||
| if not DISABLE_DEFAULT_VIEW: | ||
| logger.info("Default view application ENABLED (cache TTL: 5 minutes)") |
There was a problem hiding this comment.
This change is not captured in changelog
src/mcp_server_datahub/mcp_server.py
Outdated
| - queryCountLast30DaysFeature: Number of queries in last 30 days | ||
| - rowCountFeature: Table row count | ||
| - sizeInBytesFeature: Table size in bytes | ||
| - writeCountLast30DaysFeature: Number of writes/updates in last 30 days |
There was a problem hiding this comment.
I suspect these are cloud-only fields. What is the behavior if non-existing fields are passed as sort input ?
| break | ||
|
|
||
| # If in OSS repo, create datahub_integrations compatibility shim | ||
| if using_oss: |
There was a problem hiding this comment.
I'd suggest creating OSS compatible shim in cloud-only repo than adding cloud-compatible shim in OSS repo.
There was a problem hiding this comment.
Or a package rename copy script would also do. This will also help make lint happy.
There was a problem hiding this comment.
ie copy this file to the cloud, right? i will do this
There was a problem hiding this comment.
yes, n do a reverse shim.
There was a problem hiding this comment.
I agree with this. Lets keep any cloud logic into the cloud-only repo to avoid future problems.
mayurinehate
left a comment
There was a problem hiding this comment.
Please fix failing lints
|
|
||
| ## Questions or Issues? | ||
|
|
||
| - Open an issue: https://github.com/acryldata/mcp-server-datahub/issues |
There was a problem hiding this comment.
Do we really want to commit this file?
What is the expectation going forward, that this file will be constantly updated on each change?
Why not just keep release notes as the place to communicate what we've shipped?
There was a problem hiding this comment.
what are the release notes? where is that?
| #[CLOUD] } | ||
| #[CLOUD] } | ||
| #[CLOUD] } | ||
| statsSummary { #[CLOUD] |
There was a problem hiding this comment.
These are all valid fields for OSS servers? Just double checking..
| } | ||
| } | ||
| } | ||
| sqlAssertion { |
There was a problem hiding this comment.
Just to confirm, all of these data models are available in OSS as well right?
There was a problem hiding this comment.
i dont see failures
| Truncate a SQL query if it exceeds the maximum length. | ||
| """ | ||
| return truncate_with_ellipsis( | ||
| query, QUERY_LENGTH_HARD_LIMIT, suffix="... [truncated]" |
There was a problem hiding this comment.
Just need to be careful about this for SQL query generation flow. As Anna pointed out, we were seeing a good bit of truncation
There was a problem hiding this comment.
we should discuss the truncation issue - i think the issue Anna was seeing was related to incorrectly passed (or preserved) parameter for the chat type - the code was truncated due to the slack limitation even though it was run from the webui
|
|
||
|
|
||
| def _enable_newer_gms_fields(query: str) -> str: | ||
| """ |
There was a problem hiding this comment.
This is interesting. I would definitely not expect to see these methods in the mcp_Server.py file directly. Doesn't this feel like a good candidate to extract into a unit-tested utility file?
There was a problem hiding this comment.
in the past (until this merge) we were coping only mcp_server between oss and fork, hence everything was in mcp_server, now, since we are copying the entire folder we can refactor and split mcp_server
|
|
||
| def _enable_cloud_fields(query: str) -> str: | ||
| return query.replace("#[CLOUD]", "") | ||
| """ |
There was a problem hiding this comment.
Interetsing approach. I had no idea that those gql comments actually were used at runtime to filter out specific fields..
|
|
||
| try: | ||
| # Only DataHub Cloud has a frontend base url. | ||
| # Cloud instances typically run newer GMS versions with additional fields. |
There was a problem hiding this comment.
Is there really NO BETTER WAY to detect whether we have a Cloud instance than to do this?
This feels like a fragile check. What if tomorrow we add frontend_base_url to Open Source?
There was a problem hiding this comment.
Are there any env vars that we can use that have acryl in the name to detect cloud vs open source?
| query = _enable_newer_gms_fields(query) | ||
| newer_gms_enabled_for_this_query = True | ||
| else: | ||
| query = _disable_newer_gms_fields(query) |
There was a problem hiding this comment.
What is a "newer" field vs an "older" field.
Aren't new fields eventually "old" at some point? When does that happen?
There was a problem hiding this comment.
This whole thing feels a bit .. odd to me.
The fact that we have all this query manipulation directly lumped inside a method called execute_graphql just feels overwhelming.
Perhaps you should break this out into a separate function for easier reading, testing, and maintainability.
execute_graphql: ...
# 1. Resolve the final graphql query to execute
final_query = _resolve_query(query) # This is where the complex server side multiplexing occurs.
|
|
||
| # Retry with newer GMS fields disabled - process both tags again | ||
| try: | ||
| fallback_query = original_query |
There was a problem hiding this comment.
I really dislike how all of this complexity is just directly lumped into the execute_graphql_method.
Why not extract this method that is named to convey exactly what is happening? E.g. retry_simplified_graphql_query or something.
|
|
||
|
|
||
| def clean_get_entity_response(raw_response: dict) -> dict: | ||
| def _sort_fields_by_priority(fields: List[dict]) -> Iterator[dict]: |
There was a problem hiding this comment.
Another obvious utility method that likely should not live directly inside of mcp_server.py.
This file is already getting quite long and difficult to read at 650+ lines.
There was a problem hiding this comment.
Correction - this file is already getting to 1400+ lines!!!
There was a problem hiding this comment.
Correction - this file is already getting to 2000+ lines!!!!
| @mcp.tool(description="Get an entity by its DataHub URN.") | ||
| @async_background | ||
| def get_entity(urn: str) -> dict: | ||
| def get_entities(urns: List[str] | str) -> List[dict] | dict: |
There was a problem hiding this comment.
Nice glad to see we are adding this.
| f"This can happen if the entity has no aspects ingested yet, or if there's a permissions issue." | ||
| ) | ||
|
|
||
| inject_urls_for_urns(client._graph, result, [""]) |
There was a problem hiding this comment.
We might need to be careful about this method, especially if it depends on frontend_base_url from the client to build the URL.
| def list_schema_fields( | ||
| urn: str, | ||
| keywords: Optional[List[str] | str] = None, | ||
| limit: int = 100, |
There was a problem hiding this comment.
Nitpick: If we are using start, count for search pagination, why not also call these variables "start" and "count" instead of "limit" and "offset"?
| "description": "User's email", # matches both | ||
| "tags": ["PII"] # matches neither | ||
| } | ||
| Score = 4 (email in fieldPath + email in desc + user in fieldPath + user in desc) |
There was a problem hiding this comment.
Just curious, it looks like we have multiple functions in here that compute scores to help sort schema fields. Is there any simple way to unify them?
I know that this method takes the keywords as well, but just thinking aloud..
| } | ||
|
|
||
|
|
||
| def _convert_custom_filter_format(filters_obj: Any) -> Any: |
There was a problem hiding this comment.
Another obvious utility function that does't belong in the MCP server file
| return filters_obj | ||
|
|
||
|
|
||
| def _search_implementation( |
There was a problem hiding this comment.
Crazy that the method that implements search -- our most popular MCP tool -- can only be found by scrolling to line 1293 of this file.
src/mcp_server_datahub/mcp_server.py
Outdated
| register_search_tools(mcp) | ||
|
|
||
| # Register get_lineage tool | ||
| mcp.tool(name="get_lineage", description=get_lineage.__doc__)( |
There was a problem hiding this comment.
On line 2501 we actually see what tools are exposed :) This to me is just not an acceptable thing from a code quality point of view
jjoyce0510
left a comment
There was a problem hiding this comment.
I find the MCP server file very difficult to review.
I would expect that file to contain basic entry points into the tools that are callable for the MCP server, with delegation to logic defined elsewhere (managers, service classes, helper / utility classes). Currently, there is poor separation of concerns and everything is lumped into 1 2k+ lines file - I don't really see where this can go from here.
Breaking this apart into more modular components would allow us to more systematically unit test each of the logic-heavy functions for doing things like truncating query responses, remapping queries to remove fields, etc.
As a reader, I want to see progressive disclosure of logical complexity, not everything thrown at me in a vertical pile at once. This is not necessarily immediately blocking the PR, but I think is in our interest to invest in cleaning up the structure of this codebase
| async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R: | ||
| return await asyncer.asyncify(fn)(*args, **kwargs) | ||
| try: | ||
| return await asyncer.asyncify(fn)(*args, **kwargs) |
There was a problem hiding this comment.
We should eventually move to async functions for tool calls so we don't need to wrap the methods in an asyncer. Will help us scale better in the future.
| logger.info(f"Registering MCP tools (is_oss={is_oss})") | ||
|
|
||
| # Choose sorting documentation based on deployment type | ||
| if not is_oss: |
There was a problem hiding this comment.
Do we want to expose this in the open source repo?
nwadams
left a comment
There was a problem hiding this comment.
LGTM from me. I think long term we'll want to refactor the mcp_server.py and tools into separate classes.
b778b8d to
7f3fb34
Compare
sync from the hosted version - I added CHANGELOG.md to keep track of changes