-
Notifications
You must be signed in to change notification settings - Fork 749
[GOBBLIN-1942] Create MySQL util class for re-usable methods and setup MysqlDagActio… #3812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
1e7abaf
d7bcbd9
a412898
d174487
2518c6e
150ca0b
e6ec812
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,6 @@ | |
|
|
||
| import com.google.inject.Inject; | ||
| import com.typesafe.config.Config; | ||
| import com.zaxxer.hikari.HikariDataSource; | ||
| import java.io.IOException; | ||
| import java.sql.Connection; | ||
| import java.sql.PreparedStatement; | ||
|
|
@@ -30,7 +29,6 @@ | |
| import java.util.Calendar; | ||
| import java.util.Optional; | ||
| import java.util.TimeZone; | ||
| import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import javax.sql.DataSource; | ||
| import lombok.Data; | ||
|
|
@@ -40,8 +38,7 @@ | |
| import org.apache.gobblin.metastore.MysqlDataSourceFactory; | ||
| import org.apache.gobblin.service.ServiceConfigKeys; | ||
| import org.apache.gobblin.util.ConfigUtils; | ||
|
|
||
| import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*; | ||
| import org.apache.gobblin.util.MySQLStoreUtils; | ||
|
|
||
|
|
||
| /** | ||
|
|
@@ -80,13 +77,9 @@ | |
| */ | ||
| @Slf4j | ||
| public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter { | ||
| /** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */ | ||
| @FunctionalInterface | ||
| protected interface CheckedFunction<T, R> { | ||
| R apply(T t) throws IOException, SQLException; | ||
| } | ||
|
|
||
| protected final DataSource dataSource; | ||
| private final MySQLStoreUtils mySQLStoreUtils; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pretty unintuitive name for an instance (generally 'utils' suggests a grab-bag of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about |
||
| private final String leaseArbiterTableName; | ||
| private final String constantsTableName; | ||
| private final int epsilonMillis; | ||
|
|
@@ -121,7 +114,7 @@ protected interface CheckedFunction<T, R> { | |
| // Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed | ||
| // since retention >> linger | ||
| private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < " | ||
| + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)"; | ||
| + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)"; | ||
| private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s " | ||
| + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))"; | ||
| // Only insert epsilon and linger values from config if this table does not contain a pre-existing values already. | ||
|
|
@@ -196,7 +189,8 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { | |
| ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS); | ||
| this.retentionPeriodMillis = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY, | ||
| ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS); | ||
| this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName); | ||
| this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName, | ||
| retentionPeriodMillis); | ||
| this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, | ||
| this.constantsTableName); | ||
| this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER, | ||
|
|
@@ -208,6 +202,7 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { | |
| this.thisTableAcquireLeaseIfFinishedStatement = | ||
| String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName); | ||
| this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); | ||
| this.mySQLStoreUtils = new MySQLStoreUtils(this.dataSource, log); | ||
| String createArbiterStatement = String.format( | ||
| CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName); | ||
| try (Connection connection = dataSource.getConnection(); | ||
|
|
@@ -218,52 +213,28 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { | |
| throw new IOException("Table creation failure for " + leaseArbiterTableName, e); | ||
| } | ||
| initializeConstantsTable(); | ||
| runRetentionOnArbitrationTable(); | ||
|
|
||
| // Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config. | ||
| mySQLStoreUtils.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement, 4, TimeUnit.HOURS); | ||
|
|
||
| log.info("MysqlMultiActiveLeaseArbiter initialized"); | ||
| } | ||
|
|
||
| // Initialize Constants table if needed and insert row into it if one does not exist | ||
| private void initializeConstantsTable() throws IOException { | ||
| String createConstantsStatement = String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName); | ||
| withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), true); | ||
| mySQLStoreUtils.withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seeing this use--which is quite reasonable IMO--makes me wonder whether this ought to be a common base class. what are the args for vs. against that approach? are you concerned about multiple inheritance, since some mysql store classes need another different base class?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I avoided this approach because almost all of our mysql store classes extend another base class which is meant to have different store implementations (although we typically use mysql). |
||
| true); | ||
|
|
||
| String insertConstantsStatement = String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName); | ||
| withPreparedStatement(insertConstantsStatement, insertStatement -> { | ||
| mySQLStoreUtils.withPreparedStatement(insertConstantsStatement, insertStatement -> { | ||
| int i = 0; | ||
| insertStatement.setInt(++i, epsilonMillis); | ||
| insertStatement.setInt(++i, lingerMillis); | ||
| return insertStatement.executeUpdate(); | ||
| }, true); | ||
| } | ||
|
|
||
| /** | ||
| * Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config. | ||
| * // TODO: create a utility to run a SQL commend in a STPE using interval T | ||
| */ | ||
| private void runRetentionOnArbitrationTable() { | ||
| ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); | ||
| Runnable retentionTask = () -> { | ||
| try { | ||
| withPreparedStatement(thisTableRetentionStatement, | ||
| retentionStatement -> { | ||
| retentionStatement.setLong(1, retentionPeriodMillis); | ||
| int numRowsDeleted = retentionStatement.executeUpdate(); | ||
| if (numRowsDeleted != 0) { | ||
| log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table", | ||
| numRowsDeleted); | ||
| } | ||
| return numRowsDeleted; | ||
| }, true); | ||
| } catch (IOException e) { | ||
| log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and " | ||
| + "affect our system performance. Examine exception: ", e); | ||
| } | ||
| }; | ||
|
|
||
| // Run retention thread every 4 hours (6 times a day) | ||
| executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS); | ||
| } | ||
|
|
||
| @Override | ||
| public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis, | ||
| boolean isReminderEvent) throws IOException { | ||
|
|
@@ -370,7 +341,7 @@ else if (leaseValidityStatus == 2) { | |
| */ | ||
| protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction, | ||
| boolean isReminderEvent, long eventTimeMillis) throws IOException { | ||
| return withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement, | ||
| return mySQLStoreUtils.withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement, | ||
| getInfoStatement -> { | ||
| int i = 0; | ||
| if (isReminderEvent) { | ||
|
|
@@ -425,7 +396,7 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOE | |
| protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws IOException { | ||
| String formattedAcquireLeaseNewRowStatement = | ||
| String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName); | ||
| return withPreparedStatement(formattedAcquireLeaseNewRowStatement, | ||
| return mySQLStoreUtils.withPreparedStatement(formattedAcquireLeaseNewRowStatement, | ||
| insertStatement -> { | ||
| completeInsertPreparedStatement(insertStatement, flowAction); | ||
| try { | ||
|
|
@@ -447,7 +418,7 @@ protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws I | |
| protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionStore.DagAction flowAction, | ||
| boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp dbEventTimestamp, | ||
| Timestamp dbLeaseAcquisitionTimestamp) throws IOException { | ||
| return withPreparedStatement(acquireLeaseStatement, | ||
| return mySQLStoreUtils.withPreparedStatement(acquireLeaseStatement, | ||
| insertStatement -> { | ||
| completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck, needLeaseAcquisition, | ||
| dbEventTimestamp, dbLeaseAcquisitionTimestamp); | ||
|
|
@@ -460,7 +431,7 @@ protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionS | |
| * was successful or not. | ||
| */ | ||
| protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throws IOException { | ||
| return withPreparedStatement(thisTableSelectAfterInsertStatement, | ||
| return mySQLStoreUtils.withPreparedStatement(thisTableSelectAfterInsertStatement, | ||
| selectStatement -> { | ||
| completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction); | ||
| ResultSet resultSet = selectStatement.executeQuery(); | ||
|
|
@@ -596,7 +567,7 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status) | |
| String flowGroup = flowAction.getFlowGroup(); | ||
| String flowName = flowAction.getFlowName(); | ||
| DagActionStore.FlowActionType flowActionType = flowAction.getFlowActionType(); | ||
| return withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName), | ||
| return mySQLStoreUtils.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName), | ||
| updateStatement -> { | ||
| int i = 0; | ||
| updateStatement.setString(++i, flowGroup); | ||
|
|
@@ -621,25 +592,6 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status) | |
| }, true); | ||
| } | ||
|
|
||
| /** Abstracts recurring pattern around resource management and exception re-mapping. */ | ||
| protected <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) | ||
| throws IOException { | ||
| try (Connection connection = this.dataSource.getConnection(); | ||
| PreparedStatement statement = connection.prepareStatement(sql)) { | ||
| T result = f.apply(statement); | ||
| if (shouldCommit) { | ||
| connection.commit(); | ||
| } | ||
| statement.close(); | ||
| return result; | ||
| } catch (SQLException e) { | ||
| log.warn("Received SQL exception that can result from invalid connection. Checking if validation query is set {} " | ||
| + "Exception is {}", ((HikariDataSource) this.dataSource).getConnectionTestQuery(), e); | ||
| throw new IOException(e); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * DTO for arbiter's current lease state for a FlowActionEvent | ||
| */ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.