Skip to content

feat(source-hubspot): Migrate deals stream to Low Code #59127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Apr 28, 2025

What

Migrate deals stream to low code.

Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/12485

How

Review guide

User Impact

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Copy link

vercel bot commented Apr 28, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback May 2, 2025 4:17am

@tolik0
Copy link
Contributor Author

tolik0 commented Apr 30, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (d6a5dd5)

@tolik0 tolik0 marked this pull request as ready for review April 30, 2025 15:26
@tolik0 tolik0 requested a review from a team as a code owner April 30, 2025 15:26
@@ -78,6 +78,7 @@
scopes = {
"email_subscriptions": {"content"},
"marketing_emails": {"content"},
"deals": {"contacts", "crm.objects.deals.read"},
Copy link
Contributor

@aldogonzalez8 aldogonzalez8 Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now Deals class is 🪓, I think you can remove it from the import section.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

parameters: Mapping[str, Any] = {}

access_token = config["credentials"]["access_token"]
authenticator = BearerAuthenticator(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to add SelectiveAuthenticator here, I will update the PR.

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really really good work so far! I'm super hyped for seeing how such a complex stream can actually make use of so many low-code concepts we already have like selective streams, grouping, etc We have so many specialized features in the CDK its cool to see that we did end up needing them to migrate hubspot

type: DpathExtractor
field_path: []
request_body_json:
limit: 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be 100 right? In the existing CRMSearchStream._process_search we hardcode this to 100

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

pagination_strategy:
type: CustomPaginationStrategy
class_name: source_hubspot.components.HubspotCRMSearchPaginationStrategy
page_size: 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question, why is page_size 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to change back after testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -67,3 +100,183 @@ def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:

def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return stream_state.get(self.cursor_field) == ""


class HubspotAssociationsTransformation(RecordTransformation):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you add a comment about why we need the custom component. just to mention that we flatten the associations stored in the record and why DpathFlatten fields isn't enough

Also, can rename this HubspotFlattenAssociationsTransformation, just for readability from the manifest and specifying what the purpose of the transformation is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

yield from records_by_pk.values()


def build_associations_retriever(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of writing a method build_associations_retriever, would it be possible to instead have this all defined as an associations_retriever in the manifest.yaml? And then in the above HubspotAssociationsExtractor, we also allow for it to take in another field: associations_retriever: SimpleRetriever.

I'm not strictly opposed to how you have it, but it might be nice to lean more on defining things in manifest when we can instead of this custom flow to invoke the constructors in our custom code. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not a fan of low-code in Python) However, how would we safely inject the body with the IDs retrieved by the extractor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From extract_records() can we take the identifier and insert them into _slice under extra_fields or wherever and then from the HttpRequester:

request_options_provider=InterpolatedRequestOptionsProvider(
    request_body_json={
        "inputs": "{{ [{"id": id} for id in stream_partition.extra_fields['identifiers'] ] }}",
    },
    config=config,
    parameters=parameters,
)

Something like that? I think it would be nice to avoid having to instantiate a new retriever/requester we call extract_records() which could be frequent based on the number of pages we read

slices = assoc_retriever.stream_slices()

for _slice in slices:
logger.info(f"Reading {_slice} associations of {self.entity_primary_key}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know this was previously in our hubspot python code, but we probably don't need this clogging up our logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to debug log

@tolik0
Copy link
Contributor Author

tolik0 commented May 1, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (43d594b)

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that we might not be generating an accurate catalog for our various Hubspot streams related to entity. We probably need to solve this before we move forward

Basically for streams like deals, companies, contacts, etc, we actually don't rely on a static schema but rather on a dynamic schema based on the customer-specific properties for their unique implementation. A schema will contain a map of properties fields like hs_is_in_first_deal_stage and it's flattened properties_hs_is_in_first_deal_stage. And to make things a little more complicated, we also need:

  • Some of the schema fields will be static
  • We dynamically get the properties fields from the properties endpoint
  • We remap the Hubspot types back to types the Airbyte protocol understands.

I think this had originally gone unnoticed because our deals stream already had quite a few of these extra properties key/values, but after inspecting them the schemas between the low-code migration here and latest master do not match up.

So I don't think we can't use the InlineSchemaLoader like we have been for our non-entity streams. I'm gonna try spiking out if the DynamicSchemaLoader has all the features we need to generate

yield from records_by_pk.values()


def build_associations_retriever(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From extract_records() can we take the identifier and insert them into _slice under extra_fields or wherever and then from the HttpRequester:

request_options_provider=InterpolatedRequestOptionsProvider(
    request_body_json={
        "inputs": "{{ [{"id": id} for id in stream_partition.extra_fields['identifiers'] ] }}",
    },
    config=config,
    parameters=parameters,
)

Something like that? I think it would be nice to avoid having to instantiate a new retriever/requester we call extract_records() which could be frequent based on the number of pages we read

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants