Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public class TimelineHealth {
* Timline health status.
*
* RUNNING - Service is up and running
* READER_CONNECTION_FAULURE - isConnectionAlive() of reader implementation
* CONNECTION_FAULURE - isConnectionAlive() of reader / writer implementation
* reported an error
*/
public enum TimelineHealthStatus {
RUNNING,
READER_CONNECTION_FAILURE
CONNECTION_FAILURE
}

private TimelineHealthStatus healthStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public TimelineHealth getHealthStatus() {
"");
} else {
return new TimelineHealth(
TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE,
TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE,
"Timeline store reader not initialized.");
}
}
Expand All @@ -131,4 +131,4 @@ private Set<TimelineEntity> applyFilters(TimelineEntityFilters filters,
}
return timelineEntities;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack;
Expand Down Expand Up @@ -151,6 +152,11 @@ public TimelineWriteResponse write(TimelineCollectorContext context,
return null;
}

@Override
public TimelineHealth getHealthStatus() {
return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, "");
}

private void appendSubAppUserIfExists(TimelineCollectorContext context,
String subApplicationUser) {
String userId = context.getUserId();
Expand Down Expand Up @@ -282,4 +288,4 @@ public TimelineWriteResponse aggregate(TimelineEntity data,
@Override
public void flush() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public TimelineHealth getHealthStatus() {
"");
} catch (IOException e){
return new TimelineHealth(
TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE,
TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE,
"HBase connection is down");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
Expand Down Expand Up @@ -604,6 +605,19 @@ public TimelineWriteResponse aggregate(TimelineEntity data,
return null;
}

@Override
public TimelineHealth getHealthStatus() {
try {
storageMonitor.checkStorageIsUp();
return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING,
"");
} catch (IOException e){
return new TimelineHealth(
TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE,
"HBase connection is down");
}
}

/*
* (non-Javadoc)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
Expand Down Expand Up @@ -71,6 +72,9 @@ public abstract class TimelineCollector extends CompositeService {

private volatile boolean isStopped = false;

private int maxWriteRetries;
private long writeRetryInterval;

public TimelineCollector(String name) {
super(name);
}
Expand All @@ -86,6 +90,13 @@ protected void serviceInit(Configuration conf) throws Exception {
new ArrayBlockingQueue<>(capacity));
pool.setRejectedExecutionHandler(
new ThreadPoolExecutor.DiscardOldestPolicy());

maxWriteRetries =
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
writeRetryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
}

@Override
Expand Down Expand Up @@ -153,18 +164,54 @@ public TimelineWriteResponse putEntities(TimelineEntities entities,
UserGroupInformation callerUgi) throws IOException {
LOG.debug("putEntities(entities={}, callerUgi={})", entities, callerUgi);

TimelineWriteResponse response;
// synchronize on the writer object so that no other threads can
// flush the writer buffer concurrently and swallow any exception
// caused by the timeline enitites that are being put here.
synchronized (writer) {
response = writeTimelineEntities(entities, callerUgi);
flushBufferedTimelineEntities();
TimelineWriteResponse response = null;
try {
boolean isStorageUp = checkRetryWithSleep();
if (isStorageUp) {
// synchronize on the writer object so that no other threads can
// flush the writer buffer concurrently and swallow any exception
// caused by the timeline enitites that are being put here.
synchronized (writer) {
response = writeTimelineEntities(entities, callerUgi);
flushBufferedTimelineEntities();
}
} else {
String msg = String.format("Failed to putEntities(" +
"entities=%s, callerUgi=%s) as Timeline Storage is Down",
entities, callerUgi);
throw new IOException(msg);
}
} catch (InterruptedException ex) {
String msg = String.format("Interrupted while retrying to putEntities(" +
"entities=%s, callerUgi=%s)", entities, callerUgi);
throw new IOException(msg);
}

return response;
}


private boolean checkRetryWithSleep() throws InterruptedException {
int retries = maxWriteRetries;
while (retries > 0) {
TimelineHealth timelineHealth = writer.getHealthStatus();
if (timelineHealth.getHealthStatus().equals(
TimelineHealth.TimelineHealthStatus.RUNNING)) {
return true;
} else {
try {
Thread.sleep(writeRetryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw ex;
}
retries--;
}
}
return false;
}


/**
* Add or update an domain. If the domain already exists, only the owner
* and the admin can update it.
Expand All @@ -179,11 +226,25 @@ public TimelineWriteResponse putDomain(TimelineDomain domain,
UserGroupInformation callerUgi) throws IOException {
LOG.debug("putDomain(domain={}, callerUgi={})", domain, callerUgi);

TimelineWriteResponse response;
synchronized (writer) {
final TimelineCollectorContext context = getTimelineEntityContext();
response = writer.write(context, domain);
flushBufferedTimelineEntities();
TimelineWriteResponse response = null;
try {
boolean isStorageUp = checkRetryWithSleep();
if (isStorageUp) {
synchronized (writer) {
final TimelineCollectorContext context = getTimelineEntityContext();
response = writer.write(context, domain);
flushBufferedTimelineEntities();
}
} else {
String msg = String.format("Failed to putDomain(" +
"domain=%s, callerUgi=%s) as Timeline Storage is Down",
domain, callerUgi);
throw new IOException(msg);
}
} catch (InterruptedException ex) {
String msg = String.format("Interrupted while retrying to putDomain(" +
"domain=%s, callerUgi=%s)", domain, callerUgi);
throw new IOException(msg);
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ public TimelineHealth getHealthStatus() {
fs.exists(rootPath);
} catch (IOException e) {
return new TimelineHealth(
TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE,
TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE,
e.getMessage()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
Expand Down Expand Up @@ -194,6 +195,20 @@ public void flush() throws IOException {
// no op
}

@Override
public TimelineHealth getHealthStatus() {
try {
fs.exists(rootPath);
} catch (IOException e) {
return new TimelineHealth(
TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE,
e.getMessage()
);
}
return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING,
"");
}

private void mkdirs(Path... paths) throws IOException, InterruptedException {
for (Path path: paths) {
if (!existsWithRetries(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
Expand Down Expand Up @@ -77,4 +78,10 @@ public TimelineWriteResponse aggregate(TimelineEntity data,
public void flush() throws IOException {
LOG.debug("NoOpTimelineWriter is configured. Ignoring flush call");
}

@Override
public TimelineHealth getHealthStatus() {
return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING,
"NoOpTimelineWriter is configured. ");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
Expand Down Expand Up @@ -95,4 +96,13 @@ TimelineWriteResponse aggregate(TimelineEntity data,
* entities to the backend storage.
*/
void flush() throws IOException;

/**
* Check if writer connection is working properly.
*
* @return True if writer connection works as expected, false otherwise.
*/
TimelineHealth getHealthStatus();


}
Loading