-
Notifications
You must be signed in to change notification settings - Fork 749
[GOBBLIN-1923] Add retention for lease arbiter table #3792
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
65abea8
2933333
ba04633
ad4eaae
a23bc7b
ab2b0de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,8 @@ | |
| import java.util.Calendar; | ||
| import java.util.Optional; | ||
| import java.util.TimeZone; | ||
| import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import javax.sql.DataSource; | ||
| import lombok.Data; | ||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
@@ -85,15 +87,16 @@ protected interface CheckedFunction<T, R> { | |
| protected final DataSource dataSource; | ||
| private final String leaseArbiterTableName; | ||
| private final String constantsTableName; | ||
| private final int epsilon; | ||
| private final int linger; | ||
| private final int epsilonMillis; | ||
| private final int lingerMillis; | ||
| private final long retentionPeriodMillis; | ||
| private String thisTableRetentionStatement; | ||
| private String thisTableGetInfoStatement; | ||
| private String thisTableGetInfoStatementForReminder; | ||
| private String thisTableSelectAfterInsertStatement; | ||
| private String thisTableAcquireLeaseIfMatchingAllStatement; | ||
| private String thisTableAcquireLeaseIfFinishedStatement; | ||
|
|
||
| // TODO: define retention on this table | ||
| /* | ||
| Notes: | ||
| - Set `event_timestamp` default value to turn off timestamp auto-updates for row modifications which alters this col | ||
|
|
@@ -110,9 +113,13 @@ protected interface CheckedFunction<T, R> { | |
| private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" | ||
| + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" | ||
| + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, " | ||
| + "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), " | ||
| + "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, " | ||
| + "event_timestamp TIMESTAMP NOT NULL, " | ||
| + "lease_acquisition_timestamp TIMESTAMP NULL, " | ||
| + "PRIMARY KEY (flow_group,flow_name,flow_action))"; | ||
| // Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed | ||
| // since retention >> linger | ||
| private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < " | ||
| + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)"; | ||
| private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s " | ||
| + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))"; | ||
| // Only insert epsilon and linger values from config if this table does not contain a pre-existing values already. | ||
|
|
@@ -163,7 +170,8 @@ protected interface CheckedFunction<T, R> { | |
| // Complete lease acquisition if values have not changed since lease was acquired | ||
| protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = "UPDATE %s SET " | ||
| + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW; | ||
| protected static final Calendar UTC_CAL = Calendar.getInstance(TimeZone.getTimeZone("UTC")); | ||
| private static final ThreadLocal<Calendar> UTC_CAL = | ||
| ThreadLocal.withInitial(() -> Calendar.getInstance(TimeZone.getTimeZone("UTC"))); | ||
|
|
||
| @Inject | ||
| public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { | ||
|
|
@@ -178,10 +186,13 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { | |
| ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE); | ||
| this.constantsTableName = ConfigUtils.getString(config, ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY, | ||
| ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE); | ||
| this.epsilon = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, | ||
| this.epsilonMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, | ||
| ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS); | ||
| this.linger = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, | ||
| this.lingerMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, | ||
| ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS); | ||
| this.retentionPeriodMillis = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY, | ||
| ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS); | ||
| this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName); | ||
| this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, | ||
| this.constantsTableName); | ||
| this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER, | ||
|
|
@@ -203,7 +214,7 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { | |
| throw new IOException("Table creation failure for " + leaseArbiterTableName, e); | ||
| } | ||
| initializeConstantsTable(); | ||
|
|
||
| runRetentionOnArbitrationTable(); | ||
| log.info("MysqlMultiActiveLeaseArbiter initialized"); | ||
| } | ||
|
|
||
|
|
@@ -215,12 +226,41 @@ private void initializeConstantsTable() throws IOException { | |
| String insertConstantsStatement = String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName); | ||
| withPreparedStatement(insertConstantsStatement, insertStatement -> { | ||
| int i = 0; | ||
| insertStatement.setInt(++i, epsilon); | ||
| insertStatement.setInt(++i, linger); | ||
| insertStatement.setInt(++i, epsilonMillis); | ||
| insertStatement.setInt(++i, lingerMillis); | ||
|
Comment on lines
-218
to
+232
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
| return insertStatement.executeUpdate(); | ||
| }, true); | ||
| } | ||
|
|
||
| /** | ||
| * Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config. | ||
| * // TODO: create a utility to run a SQL commend in a STPE using interval T | ||
| */ | ||
| private void runRetentionOnArbitrationTable() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small nit but can we add the code for starting a thread within this method? Due to how it's implemented as an infinite loop we would generally always want this function to be run asynchronously
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems worth abstracting into a utility along the lines of "run this arbitrary SQL in a STPE using interval T" you decide right now whether to do or merely "TODO" ;) |
||
| ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); | ||
| Runnable retentionTask = () -> { | ||
| try { | ||
| Thread.sleep(10000); | ||
|
umustafi marked this conversation as resolved.
|
||
| withPreparedStatement(thisTableRetentionStatement, | ||
| retentionStatement -> { | ||
| retentionStatement.setLong(1, retentionPeriodMillis); | ||
| int numRowsDeleted = retentionStatement.executeUpdate(); | ||
| if (numRowsDeleted != 0) { | ||
| log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table", | ||
| numRowsDeleted); | ||
| } | ||
| return numRowsDeleted; | ||
| }, true); | ||
| } catch (InterruptedException | IOException e) { | ||
| log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and " | ||
| + "affect our system performance. Examine exception: ", e); | ||
| } | ||
| }; | ||
|
|
||
| // Run retention thread every 4 hours (6 times a day) | ||
| executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS); | ||
| } | ||
|
|
||
| @Override | ||
| public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis, | ||
| boolean isReminderEvent) throws IOException { | ||
|
|
@@ -340,12 +380,12 @@ protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAc | |
| protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException { | ||
| try { | ||
| // Extract values from result set | ||
| Timestamp dbEventTimestamp = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL); | ||
| Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL); | ||
| Timestamp dbEventTimestamp = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()); | ||
| Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get()); | ||
| boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon"); | ||
| int leaseValidityStatus = resultSet.getInt("lease_validity_status"); | ||
| int dbLinger = resultSet.getInt("linger"); | ||
| Timestamp dbCurrentTimestamp = resultSet.getTimestamp("utc_current_timestamp", UTC_CAL); | ||
| Timestamp dbCurrentTimestamp = resultSet.getTimestamp("utc_current_timestamp", UTC_CAL.get()); | ||
| return new GetEventInfoResult(dbEventTimestamp, dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus, | ||
| dbLinger, dbCurrentTimestamp); | ||
| } catch (SQLException e) { | ||
|
|
@@ -423,14 +463,14 @@ protected static SelectInfoResult createSelectInfoResult(ResultSet resultSet) th | |
| throw new IOException("Expected resultSet containing row information for the lease that was attempted but " | ||
| + "received nothing."); | ||
| } | ||
| if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL) == null) { | ||
| if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()) == null) { | ||
| throw new IOException("event_timestamp should never be null (it is always set to current timestamp)"); | ||
| } | ||
| long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL).getTime(); | ||
| long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()).getTime(); | ||
| // Lease acquisition timestamp is null if another participant has completed the lease | ||
| Optional<Long> leaseAcquisitionTimeMillis = | ||
| resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL) == null ? Optional.empty() : | ||
| Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL).getTime()); | ||
| resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get()) == null ? Optional.empty() : | ||
| Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get()).getTime()); | ||
| int dbLinger = resultSet.getInt("linger"); | ||
| return new SelectInfoResult(eventTimeMillis, leaseAcquisitionTimeMillis, dbLinger); | ||
| } catch (SQLException e) { | ||
|
|
@@ -526,10 +566,10 @@ protected static void completeUpdatePreparedStatement(PreparedStatement statemen | |
| statement.setString(++i, flowAction.getFlowActionType().toString()); | ||
| // Values that may be needed depending on the insert statement | ||
| if (needEventTimeCheck) { | ||
| statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL); | ||
| statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL.get()); | ||
| } | ||
| if (needLeaseAcquisitionTimeCheck) { | ||
| statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL); | ||
| statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL.get()); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -546,8 +586,8 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status) | |
| updateStatement.setString(++i, flowGroup); | ||
| updateStatement.setString(++i, flowName); | ||
| updateStatement.setString(++i, flowActionType.toString()); | ||
| updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL); | ||
| updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL); | ||
| updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get()); | ||
| updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get()); | ||
| int numRowsUpdated = updateStatement.executeUpdate(); | ||
| if (numRowsUpdated == 0) { | ||
| log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because " | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as migrating this schema... will it require manual intervention to either
droporalter table?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I believe we'll have to drop that table. I will do that before deploying.