Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-19319 Make replication key as automatic inclusion #195

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ jobs:
command: |
source /usr/local/share/virtualenvs/tap-hubspot/bin/activate
nosetests tap_hubspot/tests
pip install coverage
nosetests --with-coverage --cover-erase --cover-package=tap_hubspot --cover-html-dir=htmlcov tap_hubspot/tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'JSON Validator'
command: |
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ This tap:

## Configuration

This tap requires a `config.json` which specifies details regarding [OAuth 2.0](https://developers.hubspot.com/docs/methods/oauth2/oauth2-overview) authentication, a cutoff date for syncing historical data, and an optional flag which controls collection of anonymous usage metrics. See [config.sample.json](config.sample.json) for an example. You may specify an API key instead of OAuth parameters for development purposes, as detailed below.
This tap requires a `config.json` which specifies details regarding [OAuth 2.0](https://developers.hubspot.com/docs/methods/oauth2/oauth2-overview) authentication, a cutoff date for syncing historical data, an optional parameter request_timeout for which request should wait to get the response and an optional flag which controls collection of anonymous usage metrics. See [config.sample.json](config.sample.json) for an example. You may specify an API key instead of OAuth parameters for development purposes, as detailed below.

To run `tap-hubspot` with the configuration file, use this command:

Expand Down
1 change: 1 addition & 0 deletions config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"client_secret": "my_secret",
"refresh_token": "my_token",
"start_date": "2017-01-01T00:00:00Z",
"request_timeout": 300,
"disable_collection": false
}
56 changes: 45 additions & 11 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
LOGGER = singer.get_logger()
SESSION = requests.Session()

REQUEST_TIMEOUT = 300
class InvalidAuthException(Exception):
pass

Expand Down Expand Up @@ -228,7 +229,7 @@ def acquire_access_token_from_refresh_token():
}


resp = requests.post(BASE_URL + "/oauth/v1/token", data=payload)
resp = requests.post(BASE_URL + "/oauth/v1/token", data=payload, timeout=get_request_timeout())
if resp.status_code == 403:
raise InvalidAuthException(resp.content)

Expand Down Expand Up @@ -288,6 +289,8 @@ def get_params_and_headers(params):
return params, headers


# backoff for Timeout error is already included in "requests.exceptions.RequestException"
# as it is a parent class of "Timeout" error
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
Expand All @@ -303,7 +306,7 @@ def request(url, params=None):
req = requests.Request('GET', url, params=params, headers=headers).prepare()
LOGGER.info("GET %s", req.url)
with metrics.http_request_timer(parse_source_from_url(url)) as timer:
resp = SESSION.send(req)
resp = SESSION.send(req, timeout=get_request_timeout())
timer.tags[metrics.Tag.http_status_code] = resp.status_code
if resp.status_code == 403:
raise SourceUnavailableException(resp.content)
Expand Down Expand Up @@ -331,6 +334,8 @@ def lift_properties_and_versions(record):
record['properties_versions'] += versions
return record

# backoff for Timeout error is already included in "requests.exceptions.RequestException"
# as it is a parent class of "Timeout" error
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
Expand All @@ -349,6 +354,7 @@ def post_search_endpoint(url, data, params=None):
url=url,
json=data,
params=params,
timeout=get_request_timeout(),
headers=headers
)

Expand Down Expand Up @@ -433,7 +439,7 @@ def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys,
singer.write_state(STATE)


def _sync_contact_vids(catalog, vids, schema, bumble_bee):
def _sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key):
if len(vids) == 0:
return

Expand All @@ -442,6 +448,8 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee):
mdata = metadata.to_map(catalog.get('metadata'))

for record in data.values():
# Explicitly add the bookmark field "versionTimestamp" and its value in the record.
record[bookmark_key] = bookmark_values.get(record.get("vid"))
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted)

Expand All @@ -465,6 +473,8 @@ def sync_contacts(STATE, ctx):
url = get_url("contacts_all")

vids = []
# Dict to store replication key value for each contact record
bookmark_values = {}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'contacts', url, default_contact_params, 'contacts', 'has-more', ['vid-offset'], ['vidOffset']):
modified_time = None
Expand All @@ -476,15 +486,18 @@ def sync_contacts(STATE, ctx):

if not modified_time or modified_time >= start:
vids.append(row['vid'])
# Adding replication key value in `bookmark_values` dict
# Here, key is vid(primary key) and value is replication key value.
bookmark_values[row['vid']] = utils.strftime(modified_time)

if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time

if len(vids) == 100:
_sync_contact_vids(catalog, vids, schema, bumble_bee)
_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)
vids = []

_sync_contact_vids(catalog, vids, schema, bumble_bee)
_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)

STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(max_bk_value))
singer.write_state(STATE)
Expand Down Expand Up @@ -586,14 +599,24 @@ def sync_companies(STATE, ctx):
def has_selected_custom_field(mdata):
top_level_custom_props = [x for x in mdata if len(x) == 2 and 'property_' in x[1]]
for prop in top_level_custom_props:
if mdata.get(prop, {}).get('selected') == True:
# Return 'True' if the custom field is automatic.
if (mdata.get(prop, {}).get('selected') == True) or (mdata.get(prop, {}).get('inclusion') == "automatic"):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return True if the custom field is automatic.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prijendev can you add this statement as the Code comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in the code.

return True
return False

def sync_deals(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
bookmark_key = 'hs_lastmodifieddate'
bookmark_key = 'property_hs_lastmodifieddate'
# The Bookmark field('hs_lastmodifieddate') available in the record is different from
# the tap's bookmark key(property_hs_lastmodifieddate).
# `hs_lastmodifieddate` is available in the properties field at the nested level.
# As `hs_lastmodifieddate` is not available at the 1st level it can not be marked as automatic inclusion.
# tap includes all nested fields of the properties field as custom fields in the schema by appending the
# prefix `property_` along with each field.
# That's why bookmark_key is `property_hs_lastmodifieddate` so that we can mark it as automatic inclusion.

bookmark_field_in_record = 'hs_lastmodifieddate'
start = utils.strptime_with_tz(get_start(STATE, "deals", bookmark_key))
max_bk_value = start
LOGGER.info("sync_deals from %s", start)
Expand Down Expand Up @@ -635,9 +658,9 @@ def sync_deals(STATE, ctx):
for row in gen_request(STATE, 'deals', url, params, 'deals', "hasMore", ["offset"], ["offset"], v3_fields=v3_fields):
row_properties = row['properties']
modified_time = None
if bookmark_key in row_properties:
if bookmark_field_in_record in row_properties:
# Hubspot returns timestamps in millis
timestamp_millis = row_properties[bookmark_key]['timestamp'] / 1000.0
timestamp_millis = row_properties[bookmark_field_in_record]['timestamp'] / 1000.0
modified_time = datetime.datetime.fromtimestamp(timestamp_millis, datetime.timezone.utc)
elif 'createdate' in row_properties:
# Hubspot returns timestamps in millis
Expand Down Expand Up @@ -941,16 +964,16 @@ class Stream(object):
# Do these first as they are incremental
Stream('subscription_changes', sync_subscription_changes, ['timestamp', 'portalId', 'recipient'], 'startTimestamp', 'INCREMENTAL'),
Stream('email_events', sync_email_events, ['id'], 'startTimestamp', 'INCREMENTAL'),
Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'INCREMENTAL'),
Stream('deals', sync_deals, ["dealId"], 'property_hs_lastmodifieddate', 'INCREMENTAL'),

# Do these last as they are full table
Stream('forms', sync_forms, ['guid'], 'updatedAt', 'FULL_TABLE'),
Stream('workflows', sync_workflows, ['id'], 'updatedAt', 'FULL_TABLE'),
Stream('owners', sync_owners, ["ownerId"], 'updatedAt', 'FULL_TABLE'),
Stream('campaigns', sync_campaigns, ["id"], None, 'FULL_TABLE'),
Stream('contact_lists', sync_contact_lists, ["listId"], 'updatedAt', 'FULL_TABLE'),
Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'FULL_TABLE'),
Stream('companies', sync_companies, ["companyId"], 'hs_lastmodifieddate', 'FULL_TABLE'),
Stream('deals', sync_deals, ["dealId"], 'hs_lastmodifieddate', 'FULL_TABLE'),
Stream('deal_pipelines', sync_deal_pipelines, ['pipelineId'], None, 'FULL_TABLE'),
Stream('engagements', sync_engagements, ["engagement_id"], 'lastUpdated', 'FULL_TABLE')
]
Expand Down Expand Up @@ -1081,6 +1104,17 @@ def do_discover():
LOGGER.info('Loading schemas')
json.dump(discover_schemas(), sys.stdout, indent=4)

def get_request_timeout():
# Get `request_timeout` value from config.
config_request_timeout = CONFIG.get('request_timeout')
# if config request_timeout is other than 0, "0" or "" then use request_timeout
if config_request_timeout and float(config_request_timeout):
request_timeout = float(config_request_timeout)
else:
# If value is 0, "0", "" or not passed then it set default to 300 seconds.
request_timeout = REQUEST_TIMEOUT
return request_timeout

def main_impl():
args = utils.parse_args(
["redirect_uri",
Expand Down
4 changes: 4 additions & 0 deletions tap_hubspot/schemas/contacts.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
"vid": {
"type": ["null", "integer"]
},
"versionTimestamp": {
"type": ["null", "string"],
"format": "date-time"
},
"canonical-vid": {
"type": ["null", "integer"]
},
Expand Down
121 changes: 121 additions & 0 deletions tap_hubspot/tests/unittests/test_request_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import unittest
import requests
from unittest import mock
import tap_hubspot
class TestRequestTimeoutValue(unittest.TestCase):

def test_integer_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config(integer value) then it should be use
"""
tap_hubspot.CONFIG.update({"request_timeout": 100}) # integer timeout in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 100.0) # Verify timeout value

def test_float_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config(float value) then it should be use
"""
tap_hubspot.CONFIG.update({"request_timeout": 100.5}) # float timeout in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 100.5) # Verify timeout value

def test_string_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config(string value) then it should be use
"""
tap_hubspot.CONFIG.update({"request_timeout": "100"}) # string format timeout in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 100.0) # Verify timeout value

def test_empty_string_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config with empty string then default value is used
"""
tap_hubspot.CONFIG.update({"request_timeout": ""}) # empty string in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 300) # Verify timeout value

def test_zero_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config with zero value then default value is used
"""
tap_hubspot.CONFIG.update({"request_timeout": 0}) # zero value in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 300) # Verify timeout value

def test_zero_string_request_timeout_in_config(self):
"""
Verify that if request_timeout is provided in config with zero in string format then default value is used
"""
tap_hubspot.CONFIG.update({"request_timeout": '0'}) # zero value in config

request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 300) # Verify timeout value

def test_no_request_timeout_in_config(self):
"""
Verify that if request_timeout is not provided in config then default value is used
"""
tap_hubspot.CONFIG = {}
request_timeout = tap_hubspot.get_request_timeout()

self.assertEqual(request_timeout, 300) # Verify timeout value


@mock.patch("time.sleep")
class TestRequestTimeoutBackoff(unittest.TestCase):

@mock.patch('requests.Session.send', side_effect = requests.exceptions.Timeout)
@mock.patch("requests.Request.prepare")
@mock.patch('tap_hubspot.get_params_and_headers', return_value = ({}, {}))
def test_request_timeout_backoff(self, mocked_get, mocked_prepare, mocked_send, mocked_sleep):
"""
Verify request function is backoff for only 5 times on Timeout exception.
"""
try:
tap_hubspot.request('dummy_url', {})
except Exception:
pass

# Verify that Session.send is called 5 times
self.assertEqual(mocked_send.call_count, 5)

@mock.patch('tap_hubspot.get_params_and_headers', return_value = ({}, {}))
@mock.patch('requests.post', side_effect = requests.exceptions.Timeout)
def test_request_timeout_backoff_for_post_search_endpoint(self, mocked_post, mocked_get, mocked_sleep):
"""
Verify post_search_endpoint function is backoff for only 5 times on Timeout exception.
"""
try:
tap_hubspot.post_search_endpoint('dummy_url', {})
except Exception:
pass

# Verify that requests.post is called 5 times
self.assertEqual(mocked_post.call_count, 5)

@mock.patch('requests.post', side_effect = requests.exceptions.Timeout)
def test_request_timeout_backoff_for_acquire_access_token_from_refresh_token(self, mocked_post, mocked_sleep):
"""
Verify request function is backoff for only 5 times instead of 25 times on Timeout exception that thrown from `acquire_access_token_from_refresh_token` method.
Here get_params_and_headers method called from request method and acquire_access_token_from_refresh_token called from get_params_and_headers method.
"""
try:
tap_hubspot.post_search_endpoint('dummy_url', {})
except Exception:
pass

# Verify that requests.post is called 5 times
self.assertEqual(mocked_post.call_count, 5)
2 changes: 1 addition & 1 deletion tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def expected_metadata(self): # DOCS_BUG https://stitchdata.atlassian.net/browse
"deals": {
self.PRIMARY_KEYS: {"dealId"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"hs_lastmodifieddate"},
self.REPLICATION_KEYS: {"property_hs_lastmodifieddate"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this include the property_ prefix from the denesting method? None of the other replicaiton keys have this. And I think it will break every customer's connection that is replicating this stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, hs_lastmodifieddate is the bookmark key for the deals stream. hs_lastmodifieddate is available in the properties field at the nested level.
For example, the response structure:

{
    "dealId": 1,
    "properties": {
        "hs_lastmodifieddate": 1653590080602
    }
}

As hs_lastmodifieddate is not available at the 1st level, it can not be marked as automatic inclusion and we can not select that particular field even. Also, tap includes all nested fields of the properties field as custom fields in the schema by appending the prefix property_ along with each field.

For example, the schema of the deals stream looks like as below,

{
    "dealId": {"type": ["null", "integer"]},
    "properties": {"type": "object", "properties": {
            "hs_lastmodifieddate": {},
            "hs_likelihood_to_close": {}
        }
    },
    "property_hs_lastmodifieddate": {},
    "property_hs_likelihood_to_close": {}
}

That's why we are just marking property_hs_lastmodifieddate as the inclusion of automatic. So, I am not sure in which case it will break the customer's connection because we have not removed or updated any fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this nested structure is true of the other streams as well, but companies for example denests the key and does not prefix property_ onto the key. I think it will break downstream for customers because the value of the replication key is changing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kspeer825 We have changed the bookmark key just for the deals stream. This change will not affect any other streams.
If bookmark(last_modifieddate) is already available for the deals stream, then tap will use only that value for the first time. (#196 )
After completing sync, the tap will write an updated bookmark in the new bookmark key(i.e. property_lastmodified_date).
We haven't removed the existing bookmark as well, so in case we need to revert back, we can able to.
After sync, the bookmark will look like the below.

  "bookmarks": {
        "stream_id_1": {
            "old_bookmark": "2022-04-08T00:00:00",
            "current_bookmark": "2022-04-15T00:00:00"
        }
    }

We have incorporated this change to fix the customer's issue. So, @KrisPersonal shall we plan an alpha release first for the particular customer?

self.OBEYS_START_DATE: True
},
"email_events": {
Expand Down
3 changes: 2 additions & 1 deletion tests/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ def get_deals(self):

response = self.get(v1_url, params=v1_params)
records.extend([record for record in response['deals']
if record['properties'][replication_key]['timestamp'] >= self.start_date])
# Here replication key of the deals stream is derived from "hs_lastmodifieddate" field.
if record['properties']["hs_lastmodifieddate"]['timestamp'] >= self.start_date])
has_more = response['hasMore']
v1_params['offset'] = response['offset']

Expand Down
12 changes: 11 additions & 1 deletion tests/test_hubspot_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records):
can_save = True
return ret_records

FIELDS_ADDED_BY_TAP = {
# In 'contacts' streams 'versionTimeStamp' is not available in response of the second call.
# In the 1st call, Tap retrieves records of all contacts and from those records, it collects vids(id of contact).
# These records contain the versionTimestamp field.
# In the 2nd call, vids collected from the 1st call will be used to retrieve the whole contact record.
# Here, the records collected for detailed contact information do not contain the versionTimestamp field.
# So, we add the versionTimestamp field(fetched from 1st call records) explicitly in the record of 2nd call.
"contacts": { "versionTimestamp" }
}

KNOWN_EXTRA_FIELDS = {
'deals': {
# BUG_TDL-14993 | https://jira.talendforge.org/browse/TDL-14993
Expand Down Expand Up @@ -228,7 +238,7 @@ def test_run(self):
continue # skip this expected record if it isn't replicated
actual_record = matching_actual_records_by_pk[0]

expected_keys = set(expected_record.keys())
expected_keys = set(expected_record.keys()).union(FIELDS_ADDED_BY_TAP.get(stream, {}))
actual_keys = set(actual_record.keys())

# NB: KNOWN_MISSING_FIELDS is a dictionary of streams to aggregated missing fields.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_hubspot_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_run(self):
expected_keys = self.expected_automatic_fields().get(stream)

# BUG_TDL-9939 https://jira.talendforge.org/browse/TDL-9939 Replication keys are not included as an automatic field for these streams
if stream in {'companies', 'deals', 'contacts', 'subscription_changes', 'email_events'}:
if stream in {'companies', 'subscription_changes', 'email_events'}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this change for the other streams with replication keys that are missing automatic inclusion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kspeer825 Yes, the replication key is not automatic for companies, subscription_changes, and email_events streams.

  1. The companies stream has the same issue(link) of the replication key as the deals stream. So, can we create a card to resolve this issue and work on that?
  2. subscription_changes and email_events streams are retrieving records in the date window. These streams are saving the start date of the last window as a bookmark value in startTimestamp field.
    email_events stream has a created field in the API response and subscription_changes has a timestamp field in the response that we may use as a replication key for the respective stream. But, we are not sure whether filter parameters(Start timestamp, End timestamp) are following these keys or not as we can't find any doc stating the same, so we asked a question for the same in the community(link).
    Possible solutions:
    1. We can add a new startTimestamp field in the record explicitly and mark that field as an automatic to keep current behavior as is.
    2. Update the bookmark key for email_events and subscription_changes as created and timestamp respectively if we got a positive reply from the support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Sounds good
  2. Are you able to verify the usage of these keys by exploratory testing? If you have a record before the StartTimestamp, another record after the EndTimestamp, and another record between those two I think you'd answer that question. I supposed you'd actually need two more records at either of those fence posts to see if the API treats either inclusively.

Copy link
Contributor Author

@prijendev prijendev Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per discussion with @cosimon , we concluded to make a change in the replication key just for the deals stream and company stream as this problem is for 2 streams only. We are not making any changes in subscription_changes and email_events as of now.

# replication keys not in the expected_keys
remove_keys = self.expected_metadata()[stream].get(self.REPLICATION_KEYS)
expected_keys = expected_keys.difference(remove_keys)
Expand Down
Loading