diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index b155e8089bb..2d60fd5c83d 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -98,9 +98,9 @@ public class ConfigurationKeys { // Scheduler lease determination store configuration public static final String MYSQL_LEASE_ARBITER_PREFIX = "MysqlMultiActiveLeaseArbiter"; public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable"; - public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_multi_active_scheduler_constants_store"; + public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = "gobblin_multi_active_scheduler_constants_store"; public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable"; - public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_scheduler_lease_determination_store"; + public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = "gobblin_scheduler_lease_determination_store"; public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = "eventToRevisitTimestampMillis"; public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = "triggerEventTimestampMillis"; public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis"; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java index 56b1ac8c045..c49de3c6aec 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java @@ -398,7 +398,7 @@ public void scheduleJob(Properties jobProps, JobListener jobListener, Map - long triggerTimestampMillis = trigger.getPreviousFireTime().getTime(); - jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY, - String.valueOf(triggerTimestampMillis)); - try { jobScheduler.runJob(jobProps, jobListener); } catch (Throwable t) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index b62a869ba96..5e0ac398840 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -30,14 +30,17 @@ import java.util.TimeZone; import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.helix.HelixManager; import org.quartz.CronExpression; import org.quartz.DisallowConcurrentExecution; import org.quartz.InterruptableJob; import org.quartz.JobDataMap; +import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.SchedulerException; +import org.quartz.Trigger; import org.quartz.UnableToInterruptJobException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +99,7 @@ */ @Alpha @Singleton +@Slf4j public class GobblinServiceJobScheduler extends JobScheduler implements SpecCatalogListener { // Scheduler related configuration @@ -442,6 +446,19 @@ public synchronized void scheduleJob(Properties jobProps, JobListener jobListene } } + @Override + protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) { + Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY); + log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} - Job newly scheduled", + trigger.getNextFireTime()); + } + + protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) { + return String.format("Scheduler trigger tracing: [flowName: %s flowGroup: %s] - ", + jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, "<>"), + jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<>")); + } + @Override public void runJob(Properties jobProps, JobListener jobListener) throws JobException { try { @@ -576,6 +593,13 @@ private void unscheduleSpec(URI specURI, String specVersion) throws JobException this.scheduledFlowSpecs.remove(specURI.toString()); this.lastUpdatedTimeForFlowSpec.remove(specURI.toString()); unscheduleJob(specURI.toString()); + try { + FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI); + Properties properties = spec.getConfigAsProperties(); + _log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled Spec"); + } catch (SpecNotFoundException e) { + _log.warn("Unable to retrieve spec for URI {}", specURI); + } } else { throw new JobException(String.format( "Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually", @@ -666,13 +690,22 @@ public static class GobblinServiceJob extends BaseGobblinJob implements Interrup @Override public void executeImpl(JobExecutionContext context) throws JobExecutionException { - _log.info("Starting FlowSpec " + context.getJobDetail().getKey()); + JobDetail jobDetail = context.getJobDetail(); + _log.info("Starting FlowSpec " + jobDetail.getKey()); - JobDataMap dataMap = context.getJobDetail().getJobDataMap(); + JobDataMap dataMap = jobDetail.getJobDataMap(); JobScheduler jobScheduler = (JobScheduler) dataMap.get(JOB_SCHEDULER_KEY); Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY); JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY); + // Obtain trigger timestamp from trigger to pass to jobProps + Trigger trigger = context.getTrigger(); + // THIS current event has already fired if this method is called, so it now exists in + long triggerTimestampMillis = trigger.getPreviousFireTime().getTime(); + jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY, + String.valueOf(triggerTimestampMillis)); + _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by " + + "scheduler", triggerTimestampMillis, trigger.getNextFireTime().getTime()); try { jobScheduler.runJob(jobProps, jobListener); } catch (Throwable t) {