Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1885] Ensure All Logged Timestamps are in UTC #3747

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(),
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimestamp());
return;
}
Expand All @@ -130,7 +130,7 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
this.noLongerLeasingStatusCount.inc();
log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: %s] ", leaseAttemptStatus.getClass().getName(),
log.debug("Received type of leaseAttemptStatus: [{}, eventTimestamp: {}] ", leaseAttemptStatus.getClass().getName(),
eventTimeMillis);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@

package org.apache.gobblin.service.modules.scheduler;

import com.codahale.metrics.MetricFilter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
Expand All @@ -28,37 +38,12 @@
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;

import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.MetricFilter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
Expand All @@ -70,6 +55,7 @@
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
Expand All @@ -82,13 +68,27 @@
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;

Expand Down Expand Up @@ -458,7 +458,7 @@ public synchronized void scheduleJob(Properties jobProps, JobListener jobListene
protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY);
log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} - Job newly scheduled",
trigger.getNextFireTime().getTime());
asUTCEpochMillis(trigger.getNextFireTime()));
}

protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) {
Expand All @@ -467,6 +467,16 @@ protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) {
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<no flow group>>"));
}

/**
* Takes a given Date object and converts the timezone to UTC before returning the number of millseconds since epoch
* @param date
*/
public static long asUTCEpochMillis(Date date) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if public for more than just testability, it may belong in a utility class rather than here

return ZonedDateTime.of(
LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()),
ZoneOffset.UTC).toInstant().toEpochMilli();
}

@Override
public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
try {
Expand Down Expand Up @@ -716,11 +726,11 @@ public void executeImpl(JobExecutionContext context) throws JobExecutionExceptio
// Obtain trigger timestamp from trigger to pass to jobProps
Trigger trigger = context.getTrigger();
// THIS current event has already fired if this method is called, so it now exists in <previousFireTime>
long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
long triggerTimestampMillis = asUTCEpochMillis(trigger.getPreviousFireTime());
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
String.valueOf(triggerTimestampMillis));
_log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by "
+ "scheduler", triggerTimestampMillis, trigger.getNextFireTime().getTime());
+ "scheduler", triggerTimestampMillis, asUTCEpochMillis(trigger.getNextFireTime()));
try {
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
Expand Down
Loading