Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Mysql Dag Action Store configuration
public static final String MYSQL_DAG_ACTION_STORE_PREFIX = "MysqlDagActionStore.";
public static final String MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
public static final long DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 60; // (3 days in seconds)
// Scheduler lease determination store configuration
public static final String MYSQL_LEASE_ARBITER_PREFIX = "MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -30,7 +29,6 @@
import java.util.Calendar;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import lombok.Data;
Expand All @@ -40,8 +38,7 @@
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;

import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
import org.apache.gobblin.util.DBStatementExecutor;


/**
Expand Down Expand Up @@ -80,13 +77,9 @@
*/
@Slf4j
public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
/** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */
@FunctionalInterface
protected interface CheckedFunction<T, R> {
R apply(T t) throws IOException, SQLException;
}

protected final DataSource dataSource;
private final DBStatementExecutor dbStatementExecutor;
private final String leaseArbiterTableName;
private final String constantsTableName;
private final int epsilonMillis;
Expand Down Expand Up @@ -121,7 +114,7 @@ protected interface CheckedFunction<T, R> {
// Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed
// since retention >> linger
private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < "
+ "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)";
+ "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))";
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already.
Expand Down Expand Up @@ -196,7 +189,8 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.retentionPeriodMillis = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName);
this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName,
retentionPeriodMillis);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
Expand All @@ -208,6 +202,7 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
this.thisTableAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName);
this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
String createArbiterStatement = String.format(
CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
Expand All @@ -218,52 +213,28 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
throw new IOException("Table creation failure for " + leaseArbiterTableName, e);
}
initializeConstantsTable();
runRetentionOnArbitrationTable();

// Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config.
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement, 4, TimeUnit.HOURS);

log.info("MysqlMultiActiveLeaseArbiter initialized");
}

// Initialize Constants table if needed and insert row into it if one does not exist
private void initializeConstantsTable() throws IOException {
String createConstantsStatement = String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), true);
dbStatementExecutor.withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(),
true);

String insertConstantsStatement = String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(insertConstantsStatement, insertStatement -> {
dbStatementExecutor.withPreparedStatement(insertConstantsStatement, insertStatement -> {
int i = 0;
insertStatement.setInt(++i, epsilonMillis);
insertStatement.setInt(++i, lingerMillis);
return insertStatement.executeUpdate();
}, true);
}

/**
* Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config.
* // TODO: create a utility to run a SQL commend in a STPE using interval T
*/
private void runRetentionOnArbitrationTable() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
retentionStatement.setLong(1, retentionPeriodMillis);
int numRowsDeleted = retentionStatement.executeUpdate();
if (numRowsDeleted != 0) {
log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table",
numRowsDeleted);
}
return numRowsDeleted;
}, true);
} catch (IOException e) {
log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
+ "affect our system performance. Examine exception: ", e);
}
};

// Run retention thread every 4 hours (6 times a day)
executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS);
}

@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
Expand Down Expand Up @@ -370,7 +341,7 @@ else if (leaseValidityStatus == 2) {
*/
protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction,
boolean isReminderEvent, long eventTimeMillis) throws IOException {
return withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
return dbStatementExecutor.withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
if (isReminderEvent) {
Expand Down Expand Up @@ -425,7 +396,7 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOE
protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws IOException {
String formattedAcquireLeaseNewRowStatement =
String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName);
return withPreparedStatement(formattedAcquireLeaseNewRowStatement,
return dbStatementExecutor.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
try {
Expand All @@ -447,7 +418,7 @@ protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws I
protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionStore.DagAction flowAction,
boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp dbEventTimestamp,
Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
return withPreparedStatement(acquireLeaseStatement,
return dbStatementExecutor.withPreparedStatement(acquireLeaseStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck, needLeaseAcquisition,
dbEventTimestamp, dbLeaseAcquisitionTimestamp);
Expand All @@ -460,7 +431,7 @@ protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionS
* was successful or not.
*/
protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throws IOException {
return withPreparedStatement(thisTableSelectAfterInsertStatement,
return dbStatementExecutor.withPreparedStatement(thisTableSelectAfterInsertStatement,
selectStatement -> {
completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction);
ResultSet resultSet = selectStatement.executeQuery();
Expand Down Expand Up @@ -596,7 +567,7 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
String flowGroup = flowAction.getFlowGroup();
String flowName = flowAction.getFlowName();
DagActionStore.FlowActionType flowActionType = flowAction.getFlowActionType();
return withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName),
return dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName),
updateStatement -> {
int i = 0;
updateStatement.setString(++i, flowGroup);
Expand All @@ -621,25 +592,6 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
}, true);
}

/** Abstracts recurring pattern around resource management and exception re-mapping. */
protected <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
T result = f.apply(statement);
if (shouldCommit) {
connection.commit();
}
statement.close();
return result;
} catch (SQLException e) {
log.warn("Received SQL exception that can result from invalid connection. Checking if validation query is set {} "
+ "Exception is {}", ((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
throw new IOException(e);
}
}


/**
* DTO for arbiter's current lease state for a FlowActionEvent
*/
Expand Down
Loading