Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Missed Updates Over Multiple Syncs #98

Merged
merged 3 commits into from
Aug 23, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,24 @@ def get_start(state, tap_stream_id, bookmark_key):
return CONFIG['start_date']
return current_bookmark

def has_bookmark(state, tap_stream_id, bookmark_key):
return singer.get_bookmark(state, tap_stream_id, bookmark_key) is not None

def get_previous_time_window(state, tap_stream_id):
return singer.get_bookmark(state, tap_stream_id, "last_sync_duration")

def write_stream_duration(state, tap_stream_id, start, end):
duration = (end - start).total_seconds()
return singer.write_bookmark(state, tap_stream_id, "last_sync_duration", duration)
def get_current_sync_start(state, tap_stream_id):
current_sync_start_value = singer.get_bookmark(state, tap_stream_id, "current_sync_start")
if current_sync_start_value is None:
return current_sync_start_value
return utils.strptime_to_utc(current_sync_start_value)

def write_current_sync_start(state, tap_stream_id, start):
value = start
if start is not None:
value = utils.strftime(start)
return singer.write_bookmark(state, tap_stream_id, "current_sync_start", value)

def clean_state(state):
""" Clear deprecated keys out of state. """
for stream, bookmark_map in state.get("bookmarks", {}).items():
if "last_sync_duration" in bookmark_map:
LOGGER.info("{} - Removing last_sync_duration from state.".format(stream))
state["bookmarks"][stream].pop("last_sync_duration", None)

def get_url(endpoint, **kwargs):
if endpoint not in ENDPOINTS:
Expand Down Expand Up @@ -407,6 +416,16 @@ def sync_companies(STATE, ctx):
schema = load_schema('companies')
singer.write_schema("companies", schema, ["companyId"], [bookmark_key], catalog.get('stream_alias'))

# Because this stream doesn't query by `lastUpdated`, it cycles
# through the data set every time. The issue with this is that there
# is a race condition by which records may be updated between the
# start of this table's sync and the end, causing some updates to not
# be captured, in order to combat this, we must store the current
# sync's start in the state and not move the bookmark past this value.
current_sync_start = get_current_sync_start(STATE, "companies") or utils.now()
STATE = write_current_sync_start(STATE, "companies", current_sync_start)
singer.write_state(STATE)

url = get_url("companies_all")
max_bk_value = start
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
Expand Down Expand Up @@ -436,7 +455,10 @@ def sync_companies(STATE, ctx):
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
STATE = _sync_contacts_by_company(STATE, record['companyId'])

STATE = singer.write_bookmark(STATE, 'companies', bookmark_key, utils.strftime(max_bk_value))
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, current_sync_start)
STATE = singer.write_bookmark(STATE, 'companies', bookmark_key, utils.strftime(new_bookmark))
STATE = write_current_sync_start(STATE, 'companies', None)
singer.write_state(STATE)
return STATE

Expand Down Expand Up @@ -712,17 +734,12 @@ def sync_engagements(STATE, ctx):
# through the data set every time. The issue with this is that there
# is a race condition by which records may be updated between the
# start of this table's sync and the end, causing some updates to not
# be captured, in order to combat this, we must save a lookback window
# that handles the duration of time that this stream was last syncing,
# and look back by that amount on the next sync
last_sync_duration = get_previous_time_window(STATE, "engagements")
current_sync_start = utils.now()
if has_bookmark(STATE, "engagements", bookmark_key) and \
last_sync_duration is not None:
LOGGER.info(("Last sync of engagements lasted {} seconds. Adjusting bookmark by this "
"amount to account for race conditions with record updates.").format(last_sync_duration))
start = utils.strptime_to_utc(start) - datetime.timedelta(seconds=last_sync_duration)
start = utils.strftime(start)
# be captured, in order to combat this, we must store the current
# sync's start in the state and not move the bookmark past this value.
current_sync_start = get_current_sync_start(STATE, "engagements") or utils.now()
STATE = write_current_sync_start(STATE, "engagements", current_sync_start)
singer.write_state(STATE)

max_bk_value = start
LOGGER.info("sync_engagements from %s", start)

Expand All @@ -747,9 +764,10 @@ def sync_engagements(STATE, ctx):
if record['engagement'][bookmark_key] >= max_bk_value:
max_bk_value = record['engagement'][bookmark_key]

STATE = singer.write_bookmark(STATE, 'engagements', bookmark_key, max_bk_value)
# Write duration for next sync's lookback window
STATE = write_stream_duration(STATE, 'engagements', current_sync_start, utils.now())
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, current_sync_start)
STATE = singer.write_bookmark(STATE, 'engagements', bookmark_key, new_bookmark)
STATE = write_current_sync_start(STATE, 'engagements', None)
singer.write_state(STATE)
return STATE

Expand Down Expand Up @@ -818,6 +836,9 @@ def get_selected_streams(remaining_streams, annotated_schema):
return selected_streams

def do_sync(STATE, catalogs):
# Clear out keys that are no longer used
clean_state(STATE)

ctx = Context(catalogs)
validate_dependencies(ctx)

Expand Down