Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public class ConfigurationKeys {
// Scheduler lease determination store configuration
public static final String MYSQL_LEASE_ARBITER_PREFIX = "MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_multi_active_scheduler_constants_store";
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = "gobblin_multi_active_scheduler_constants_store";
public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable";
public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_scheduler_lease_determination_store";
public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = "gobblin_scheduler_lease_determination_store";
public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = "eventToRevisitTimestampMillis";
public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = "triggerEventTimestampMillis";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void scheduleJob(Properties jobProps, JobListener jobListener, Map<String
// Schedule the Quartz job with a trigger built from the job configuration
Trigger trigger = createTriggerForJob(job.getKey(), jobProps);
this.scheduler.getScheduler().scheduleJob(job, trigger);
LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(), trigger.getNextFireTime()));
logNewlyScheduledJob(job, trigger);
} catch (SchedulerException se) {
LOG.error("Failed to schedule job " + jobName, se);
throw new JobException("Failed to schedule job " + jobName, se);
Expand All @@ -407,6 +407,10 @@ public void scheduleJob(Properties jobProps, JobListener jobListener, Map<String
this.scheduledJobs.put(jobName, job.getKey());
}

protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(), trigger.getNextFireTime()));
}

/**
* Unschedule and delete a job.
*
Expand Down Expand Up @@ -606,13 +610,6 @@ public void executeImpl(JobExecutionContext context)
JobScheduler jobScheduler = (JobScheduler) dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
// Obtain trigger timestamp from trigger to pass to jobProps
Trigger trigger = context.getTrigger();
// THIS current event has already fired if this method is called, so it now exists in <previousFireTime>
long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
String.valueOf(triggerTimestampMillis));

try {
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
import java.util.TimeZone;

import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,6 +99,7 @@
*/
@Alpha
@Singleton
@Slf4j
public class GobblinServiceJobScheduler extends JobScheduler implements SpecCatalogListener {

// Scheduler related configuration
Expand Down Expand Up @@ -442,6 +446,19 @@ public synchronized void scheduleJob(Properties jobProps, JobListener jobListene
}
}

@Override
protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY);
log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} - Job newly scheduled",
trigger.getNextFireTime());
}

protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) {
return String.format("Scheduler trigger tracing: [flowName: %s flowGroup: %s] - ",
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, "<<no flow name>>"),
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<no flow group>>"));
}

@Override
public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
try {
Expand Down Expand Up @@ -576,6 +593,13 @@ private void unscheduleSpec(URI specURI, String specVersion) throws JobException
this.scheduledFlowSpecs.remove(specURI.toString());
this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
unscheduleJob(specURI.toString());
try {
FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI);
Properties properties = spec.getConfigAsProperties();
_log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled Spec");
} catch (SpecNotFoundException e) {
_log.warn("Unable to retrieve spec for URI {}", specURI);
}
} else {
throw new JobException(String.format(
"Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually",
Expand Down Expand Up @@ -666,13 +690,22 @@ public static class GobblinServiceJob extends BaseGobblinJob implements Interrup

@Override
public void executeImpl(JobExecutionContext context) throws JobExecutionException {
_log.info("Starting FlowSpec " + context.getJobDetail().getKey());
JobDetail jobDetail = context.getJobDetail();
_log.info("Starting FlowSpec " + jobDetail.getKey());

JobDataMap dataMap = context.getJobDetail().getJobDataMap();
JobDataMap dataMap = jobDetail.getJobDataMap();
JobScheduler jobScheduler = (JobScheduler) dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);

// Obtain trigger timestamp from trigger to pass to jobProps
Trigger trigger = context.getTrigger();
// THIS current event has already fired if this method is called, so it now exists in <previousFireTime>
long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
String.valueOf(triggerTimestampMillis));
_log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by "
+ "scheduler", triggerTimestampMillis, trigger.getNextFireTime().getTime());
try {
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
Expand Down