Skip to content

Conversation

@hzding621
Copy link
Collaborator

@hzding621 hzding621 commented Aug 2, 2024

Summary

This PR removes topic (including both EventSource.topic, and EntitySource.mutationTopic) from the calculation logic of semantic_hash during offline join backfill logic.

detailed logic

  • load old semantic_hash from hive:
    • read semantic_hash from hive metadata
    • check if exclude_topic flag exists - this will determine which version of semantic_hash logic to use
  • compute current semantic_hash of the user config
    • if exclude_topic exists, we compute current semantic_hash without topic.
    • if exclude_topic didn't exist, we compute current semantic_hash with topic.
  • calculate the diff between old semantic_hash and current semantic_hash
  • for diff'd tables, we run archiving:
    • [manual archive] if exclude_topic didn't exist in hive, we stop the job (throw exception), and let user decide to archive manually
    • [auto archive] if exclude_topic did exist, we perform auto archive as usual
  • when everything is done, the new semantic_hash that we write to hive will always use the new logic (i.e. without topic)

migration plan

  1. release the change
  2. let airflow run for a few days with updated JAR version to migrate all semantic_hash for production join confs
    for production join confs without active airflow runs, manually synchronize the semantic_hash.
  3. update global kafka topic logic (mapping)
  4. for each team, create the codemod PR to update all configs.
    • At this point, the semantic_hash should already been migrated. if not, the user might run into the manual archive exception, which is fine.
    • some users may have lingering local configs, which might bring old topic format back to master. add a CI check in config repo to prevent that.

Why / Goal

  1. topic should NOT be part of semantic_hash because it never affects offline behavior
  2. this will unblock future migration of kafka topic format, without affecting semantic_hash and triggering a bunch of backfills.

Test Plan

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested

Integration Test Plan

✅ test case 1 (also in UT):

  1. create a test join config using old kafka format
  2. run the join job using prod JAR
  3. run the join job again using dev JAR
    • we expect no diffs or archives happen
    • we expect the semantic_hash to be migrated, and flag set in hive
  4. run the join job yet again using dev JAR
    • we expect no diffs nor archive.
  5. update the test join config with new kafka topic format
  6. rerun the join job using dev JAR
    • we expect no archive to happen

✅ test case 2:

  1. create a test join config using old kafka format
  2. run the join job using prod JAR
  3. update the test join config with new kafka topic format (note that the hash migration is not done)
  4. rerun the join job using dev JAR
    • we expect the job failed with manual archive error
  5. rerun the join job using dev JAR with unset flag
    • we expect the job to succeed, hash migrated and flag set

✅ test case 3 (also in UT):

  1. create a test join config using old kafka format
  2. run the join job using prod JAR
  3. update the test join config to introduce some real semantic changes
  4. rerun the join job using dev JAR
    • we expect correct diffs produced
    • we expect the job failed with manual archive error
  5. manually drop the tables
  6. rerun the join job using dev JAR after manual dropping
    • we expect the job to succeed, hash migrated and flag set
  7. update the test join config to use new kafka topic format
  8. rerun the join job using dev JAR
    • we expect no archive to happen

✅ test case 4:

  1. create a test join config old kafka format
  2. run the join job using prod JAR
  3. run the join job again using dev JAR
    • we expect the semantic_hash to be migrated, and flag set in hive
  4. update the test join config to introduce some real semantic changes
    • we expect correct diffs produced
    • we expect auto archive to take place

Reviewers

@donghanz @yuli-han @pengyu-hou

} else {
false
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: moved to SemanticHashUtils.scala

@hzding621 hzding621 force-pushed the haozhen--topic-semantic-hash branch from 63348b8 to ad90dae Compare August 2, 2024 06:46
@yuli-han
Copy link
Collaborator

yuli-han commented Aug 2, 2024

Hi @hzding621 my understanding on this is: we want to provide a way for users to update topic without triggering the archive. Could you clarify what is the new workflow if users want to update the topic without triggering an archive? Do they need to set the exclude_topic flag in hive table and update the topic at the same time? Also when we load the old semantic hash how do we avoid the diff with new hash when the topic is updated and exclude_topic flag is set? I am confused of this line. Should we perform an archive or not here?
[auto archive] if exclude_topic did exist, we perform auto archive as usual

val TopicInvalidSuffix = "_invalid"
val lineTab = "\n "
val SemanticHashKey = "semantic_hash"
val SemanticHashExcludeTopicKey = "semantic_hash_exclude_topic"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the boolean flag a map so that we can reuse this for future semantic hash change, like exclude the backfill start date?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion. Will do

@hzding621 hzding621 force-pushed the haozhen--topic-semantic-hash branch from ad90dae to 783b032 Compare August 2, 2024 22:42
Copy link
Collaborator

@yuli-han yuli-han left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

confTableProps ++ Map(Constants.SemanticHashKey -> gson.toJson(joinConf.semanticHash.asJava))
confTableProps ++ Map(
Constants.SemanticHashKey -> gson.toJson(joinConf.semanticHash(excludeTopic = true).asJava),
Constants.SemanticHashOptionsKey -> gson.toJson(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: with SemanticHashOptionsKey, we might not need to pass something like excludeTopic to .semanticHash(). All arguments can be packaged into SemanticHashOptionsKey.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@donghanz i think we will still keep SemanticHashKey and SemanticHashOptionsKey as two separate properties in hive tables.

  • SemanticHashKey: will store the semantic hashes (a map from string to hash string)
  • SemanticHashOptionsKey: will store a few flags (a map from string to boolean)

I pass excludeTopic to .semanticHash(), only because excludeTopic will determine the logic I use to compute the hash, the flag itself is not stored in the hash.

Lmk if that makes sense

@hzding621 hzding621 merged commit f624b01 into main Aug 5, 2024
@hzding621 hzding621 deleted the haozhen--topic-semantic-hash branch August 5, 2024 20:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants