[GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant#3715
Conversation
...rvice/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
| + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " | ||
| + "event_timestamp TIMESTAMP, " | ||
| + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," | ||
| + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT '1970-01-02 00:00:00', " |
There was a problem hiding this comment.
this choice of default seems questionable... what advantage does it have over just using null?
There was a problem hiding this comment.
I was actually able to get this working again with CURRENT_TIMESTAMP by ensuring this value is always set manually to previous value when updating other columns in the row
| + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000 <= epsilon as isWithinEpsilon, CASE " | ||
| + "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 " | ||
| + "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 " | ||
| + "ELSE 3 END as leaseValidityStatus, linger, CURRENT_TIMESTAMP FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY; |
There was a problem hiding this comment.
I don't know enough about sql's evaluation rules to be certain whether the multiple invocations of CURRENT_TIMESTAMP all resolve to a common evaluation or would be invoked multiple times (and if so, whether in a defined order or not). do you know?
if this leads to multiple evaluations with no ordering guarantee, would all logic remain internally consistent and provide an atomic view across multiple fields?
if not, tge alternative would be to evaluate CURRENT_TIMESTAMP only once in an inner (nested) query.
There was a problem hiding this comment.
In MySQL, multiple invocations of the CURRENT_TIMESTAMP function within a single query will all resolve to the same timestamp value. This is because the function is evaluated only once per query execution and the same value is used for all occurrences of the function within the query. -> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
| // CASE 1: If no existing row for this flow action, then go ahead and insert | ||
| if (!resultSet.next()) { | ||
| if (getResult == null) { | ||
| log.debug("CASE 1: no existing row for this flow action, then go ahead and insert"); |
There was a problem hiding this comment.
may be worth writing only in log.debug, not also as a comment just above. [update: I see other cases lack logging. I'd imagine tracing more helpful to debugging than a source comment.]
also, I'm forgetting on the numbering... did each case receive a number somewhere previously, which you're referencing here (e.g. in a javadoc comment)?
There was a problem hiding this comment.
Let me move these all to debug level logs instead of comments. I am numbering these cases here explicitly actually and referencing these numbers in the unit tests. They are not defined elsewhere.
| int dbLinger = resultSet.getInt(3); | ||
| // Fetch values in row after attempted insert | ||
| String formattedSelectAfterInsertStatement = | ||
| String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName, this.constantsTableName); |
There was a problem hiding this comment.
wondering: is it truly necessary to bind the table name every time this query is made? e.g. couldn't it be done once when the instance is constructed? or might the table names ever change throughout the instance's lifetime?
There was a problem hiding this comment.
The table name shouldn't change over the instance's lifetime since it would need to be re-initialized through the config. For this particular statement I defined it globally and also did so for the other SELECT statement that is re-used. The CREATE statements aren't re-used so it doesn't make sense to make them class variables.
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
| private int dbLinger; | ||
| private Timestamp dbCurrentTimestamp; | ||
|
|
||
| GetEventInfoResult(ResultSet resultSet) { |
There was a problem hiding this comment.
nit, but rather than defining a variant constructor, this would be better as a factory method:
public static fromResultSet(ResultSet rs);
There was a problem hiding this comment.
good idea, I created two factory methods for GetEventInfoResult and SelectInfoResult (naming perhaps isn't the best but can be addressed in later PR)
| protected Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter; | ||
| protected SchedulerService schedulerService; | ||
| protected DagActionStore dagActionStore; | ||
| protected Optional<DagActionStore> dagActionStore; |
There was a problem hiding this comment.
great! I strongly prefer this to null ;)
even so, since the two must work together, the ultimate abstraction would be a Strategy interface w/ capability to persist the flow action and handle the trigger event. in that approach, this ctor would pass these two optionals to a factory to get an appropriate instance of the strategy. when both present, the factory would give a strategy using them both, but when they're both missing, a "null strategy" would be returned. the factory would throw if only one were given.
There was a problem hiding this comment.
That's a good idea! I can think about adding that interface in the future. By null strategy do you mean doing nothing? I agree with throwing an error if only one is given.
There was a problem hiding this comment.
yes, "Null Pattern", in essence, codes each operation as a no-op
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
| Optional<Integer> count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, this.constantsTableName), getStatement -> { | ||
| ResultSet resultSet = getStatement.executeQuery(); | ||
| if (resultSet.next()) { | ||
| return resultSet.getInt(1); | ||
| return Optional.of(resultSet.getInt(1)); | ||
| } | ||
| return -1; | ||
| return Optional.absent(); | ||
| }, true); | ||
|
|
||
| // Only insert epsilon and linger values from config if this table does not contain pre-existing values. | ||
| if (count == 0) { | ||
| if (count.isPresent() && count.get() == 0) { |
There was a problem hiding this comment.
with multiple participants, use of "check, then set" is error-prone. e.g. two or more rows could be inserted.
the algo requires there to only ever be exactly one. as this is essential, add a primary key field that should only ever have one value (== 1). every INSERT should hard-code that PK value. to seamlessly handle both first-time init, or instead update when the config settings have changed, utilize INSERT... ON DUPLICATE KEY UPDATE. see: https://stackoverflow.com/a/1361368
every participant would then "upsert" upon start up, where the most recent value wins, clobbering the former. even so, due to all participants having uniform config, in general, no actual overwriting happens with most updates.
There was a problem hiding this comment.
Ah good catch, I updated the statement to a simpler one INSERT ... WHERE NOT EXISTS (SELECT 1 FROM table) to insert if there are no rows in table.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3715 +/- ##
============================================
+ Coverage 46.82% 46.94% +0.11%
- Complexity 10804 10818 +14
============================================
Files 2141 2143 +2
Lines 84429 84591 +162
Branches 9383 9401 +18
============================================
+ Hits 39537 39713 +176
+ Misses 41294 41254 -40
- Partials 3598 3624 +26 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
phet
left a comment
There was a problem hiding this comment.
also, what are you thoughts on the potential race-condition I raised about the constants table init?
| } catch (SQLException e) { | ||
| throw e; |
There was a problem hiding this comment.
-
if this is all you want, it's the implicit behavior, which you need not write explicitly.
-
even so, elsewhere, we wrap
SQLExceptionin anIOException. do we want that here too... or is there already a higher layer wrapping around this invocation where that will happen for us?
There was a problem hiding this comment.
I meant to wrap in IOException as we do that in other places, updating to wrap it.
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
Addressed above, forgot to make the change in previous commit. |
| private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (epsilon, linger) SELECT ?, ? " | ||
| + "WHERE NOT EXISTS (SELECT 1 FROM %s)"; |
There was a problem hiding this comment.
don't we need to overwrite in case the pre-existing values are out-of-date? that's why I suggested INSERT INTO... ON DUPLICATE KEY UPDATE
There was a problem hiding this comment.
Fixed, had a misunderstanding about whether or not we want to update the values upon each redeploy if new config values are provided.
| // Only insert epsilon and linger values from config if this table does not contain a pre-existing values already. | ||
| private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (epsilon, linger) SELECT ?, ? " | ||
| + "WHERE NOT EXISTS (SELECT 1 FROM %s)"; | ||
| private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (primary_key, epsilon, linger) " |
There was a problem hiding this comment.
nit: I might call UPSERT_CONSTANTS_TABLE_STATEMENT
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Show resolved
Hide resolved
| protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException { | ||
| try { | ||
| if (!resultSet.next()) { | ||
| log.error("Expected num rows and lease_acquisition_timestamp returned from query but received nothing"); |
There was a problem hiding this comment.
I'm confused here, so if there's no item after in the result set we log and error but still try to parse the current result set results?
There was a problem hiding this comment.
Oh good catch, I want this code path to terminate so instead will through an IO Error.
| Assert.assertTrue( | ||
| killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimestamp()); | ||
|
|
||
| // Tests CASE 2 of acquire lease for a flow action event that already has a valid lease for the same event in db |
There was a problem hiding this comment.
Does it make sense for us to have separate cases instead of combining these all into one super test? In case implementation details change in the future it will be easier to triage/isolate for certain cases.
There was a problem hiding this comment.
I initially tried to make it separate tests but it was easier to write this way since a lot of them related to one another. For now my priority is to get this tested and merged. I'll add a TODO to refactor it in the future.
There was a problem hiding this comment.
You can make tests have dependencies on each other so that they run synchronously, but I guess not urgent if you are still working on improving this
* upstream/master: Fix bug with total count watermark whitelist (apache#3724) [GOBBLIN-1858] Fix logs relating to multi-active lease arbiter (apache#3720) [GOBBLIN-1838] Introduce total count based completion watermark (apache#3701) Correct num of failures (apache#3722) [GOBBLIN- 1856] Add flow trigger handler leasing metrics (apache#3717) [GOBBLIN-1857] Add override flag to force generate a job execution id based on gobbl… (apache#3719) [GOBBLIN-1855] Metadata writer tests do not work in isolation after upgrading to Iceberg 1.2.0 (apache#3718) Remove unused ORC writer code (apache#3710) [GOBBLIN-1853] Reduce # of Hive calls during schema related updates (apache#3716) [GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant (apache#3715) [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (apache#3712) [GOBBLIN-1849] Add Flow Group & Name to Job Config for Job Scheduler (apache#3713) [GOBBLIN-1841] Move disabling of current live instances to the GobblinClusterManager startup (apache#3708) [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time (apache#3704) [GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the existing workflow if it is launched (apache#3711) [GOBBLIN-1842] Add timers to GobblinMCEWriter (apache#3703) [GOBBLIN-1844] Ignore workflows marked for deletion when calculating container count (apache#3709) [GOBBLIN-1846] Validate Multi-active Scheduler with Logs (apache#3707) [GOBBLIN-1845] Changes parallelstream to stream in DatasetsFinderFilteringDecorator to avoid classloader issues in spark (apache#3706) [GOBBLIN-1843] Utility for detecting non optional unions should convert dataset urn to hive compatible format (apache#3705) [GOBBLIN-1837] Implement multi-active, non blocking for leader host (apache#3700) [GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0 (apache#3697) Update CHANGELOG to reflect changes in 0.17.0 Reserving 0.18.0 version for next release [GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and Graceful Exits for Error-Free Completion (apache#3699) [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics (apache#3698) [GOBBLIN-1825]Hive retention job should fail if deleting underlying files fail (apache#3687) [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (apache#3692) [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (apache#3693) [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (apache#3696)
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Here are some details about my PR, including screenshots (if applicable):
Tests all cases of trying to acquire a lease for a flow action event with one participant involved and makes corresponding fixes in the
MultiActiveLeaseArbiter.One key change this PR includes is to remove usage the participant's local
event_timestampin the database to identify the particular flow_action event. We swap it out for the database utilize the CURRENT_TIMESTAMP of the database to insert or keep track of our event. This is to avoid any discrepancies between local time and database time for future comparisons.Large number of fixes relate to MySQL-specific bugs for example NULL values not being permitted by default, syntax errors in creation, inserting into a table with a create statement, etc...
Tests
Tests the following cases
Commits