From 6b229fe92d512980f2ba788a3c7b9a25666361ea Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 18 Aug 2023 18:28:24 -0700 Subject: [PATCH 1/4] ensure all logged timestamps are in utc --- .../scheduler/GobblinServiceJobScheduler.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index a9db2c9bde7..4972eb3bc5d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -458,7 +458,7 @@ public synchronized void scheduleJob(Properties jobProps, JobListener jobListene protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) { Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY); log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} - Job newly scheduled", - trigger.getNextFireTime().getTime()); + getMillisecondsSinceEpochInUTC(trigger.getNextFireTime())); } protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) { @@ -467,6 +467,18 @@ protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) { jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<>")); } + /** + * Takes a given Date object and converts the timezone to UTC before returning the number of millseconds since epoch + * @param date + */ + public static long getMillisecondsSinceEpochInUTC(Date date) { + // Create a Calendar object and set it to the given Date + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + calendar.setTimeZone(TimeZone.getTimeZone("UTC")); + return calendar.getTimeInMillis(); + } + @Override public void runJob(Properties jobProps, JobListener jobListener) throws JobException { try { @@ -716,11 +728,11 @@ public void executeImpl(JobExecutionContext context) throws JobExecutionExceptio // Obtain trigger timestamp from trigger to pass to jobProps Trigger trigger = context.getTrigger(); // THIS current event has already fired if this method is called, so it now exists in - long triggerTimestampMillis = trigger.getPreviousFireTime().getTime(); + long triggerTimestampMillis = getMillisecondsSinceEpochInUTC(trigger.getPreviousFireTime()); jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY, String.valueOf(triggerTimestampMillis)); _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by " - + "scheduler", triggerTimestampMillis, trigger.getNextFireTime().getTime()); + + "scheduler", triggerTimestampMillis, getMillisecondsSinceEpochInUTC(trigger.getNextFireTime())); try { jobScheduler.runJob(jobProps, jobListener); } catch (Throwable t) { From 5088c0f4cb67b73c106046f96ff64b6eefd5a63b Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Mon, 21 Aug 2023 10:33:17 -0700 Subject: [PATCH 2/4] use java.time class --- .../scheduler/GobblinServiceJobScheduler.java | 72 +++++++++---------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 4972eb3bc5d..fc810090249 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -17,9 +17,19 @@ package org.apache.gobblin.service.modules.scheduler; +import com.codahale.metrics.MetricFilter; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.net.URI; import java.text.ParseException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.util.Calendar; import java.util.Collection; import java.util.Date; @@ -28,37 +38,12 @@ import java.util.Map; import java.util.Properties; import java.util.TimeZone; - -import org.apache.commons.lang.StringUtils; -import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; -import org.apache.helix.HelixManager; -import org.quartz.CronExpression; -import org.quartz.DisallowConcurrentExecution; -import org.quartz.InterruptableJob; -import org.quartz.JobDataMap; -import org.quartz.JobDetail; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.SchedulerException; -import org.quartz.Trigger; -import org.quartz.UnableToInterruptJobException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.codahale.metrics.MetricFilter; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; import lombok.Getter; import lombok.extern.slf4j.Slf4j; - +import org.apache.commons.lang.StringUtils; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; @@ -70,6 +55,7 @@ import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecCatalogListener; +import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.runtime.metrics.RuntimeMetrics; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; @@ -82,15 +68,29 @@ import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.DagManager; -import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler; +import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PropertiesUtils; +import org.apache.helix.HelixManager; +import org.quartz.CronExpression; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.InterruptableJob; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.UnableToInterruptJobException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX; +import static org.apache.gobblin.service.ServiceConfigKeys.*; /** @@ -458,7 +458,7 @@ public synchronized void scheduleJob(Properties jobProps, JobListener jobListene protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) { Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY); log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} - Job newly scheduled", - getMillisecondsSinceEpochInUTC(trigger.getNextFireTime())); + asUTCEpochMillis(trigger.getNextFireTime())); } protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) { @@ -471,12 +471,10 @@ protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) { * Takes a given Date object and converts the timezone to UTC before returning the number of millseconds since epoch * @param date */ - public static long getMillisecondsSinceEpochInUTC(Date date) { - // Create a Calendar object and set it to the given Date - Calendar calendar = Calendar.getInstance(); - calendar.setTime(date); - calendar.setTimeZone(TimeZone.getTimeZone("UTC")); - return calendar.getTimeInMillis(); + public static long asUTCEpochMillis(Date date) { + return ZonedDateTime.of( + LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()), + ZoneOffset.UTC).toInstant().toEpochMilli(); } @Override @@ -728,11 +726,11 @@ public void executeImpl(JobExecutionContext context) throws JobExecutionExceptio // Obtain trigger timestamp from trigger to pass to jobProps Trigger trigger = context.getTrigger(); // THIS current event has already fired if this method is called, so it now exists in - long triggerTimestampMillis = getMillisecondsSinceEpochInUTC(trigger.getPreviousFireTime()); + long triggerTimestampMillis = asUTCEpochMillis(trigger.getPreviousFireTime()); jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY, String.valueOf(triggerTimestampMillis)); _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by " - + "scheduler", triggerTimestampMillis, getMillisecondsSinceEpochInUTC(trigger.getNextFireTime())); + + "scheduler", triggerTimestampMillis, asUTCEpochMillis(trigger.getNextFireTime())); try { jobScheduler.runJob(jobProps, jobListener); } catch (Throwable t) { From 5bdf3eb428551b3cd4ae41f27db584ad47c2a109 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Mon, 21 Aug 2023 13:23:11 -0700 Subject: [PATCH 3/4] fix logging --- .../service/modules/orchestration/FlowTriggerHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index 669b8eccb79..f4758ca3aaf 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -113,7 +113,7 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus; this.leaseObtainedCount.inc(); if (persistFlowAction(leaseObtainedStatus)) { - log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(), + log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(), leaseObtainedStatus.getEventTimestamp()); return; } @@ -130,7 +130,7 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo return; } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) { this.noLongerLeasingStatusCount.inc(); - log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: %s] ", leaseAttemptStatus.getClass().getName(), + log.debug("Received type of leaseAttemptStatus: [{}, eventTimestamp: {}] ", leaseAttemptStatus.getClass().getName(), eventTimeMillis); return; } From 84c006cd08126eb1582470ac7369c1dd7aa1e2c6 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 22 Aug 2023 14:23:38 -0700 Subject: [PATCH 4/4] remove .* import --- .../service/modules/scheduler/GobblinServiceJobScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index fc810090249..7df02f3d5fd 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -90,7 +90,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.gobblin.service.ServiceConfigKeys.*; +import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX; /**