Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions tez-api/src/main/java/org/apache/tez/common/CachedEntity.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.common;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;

/**
* A thread safe implementation used as a container for cacheable entries with Expiration times.
* It supports custom {@link Clock} to control the elapsed time calculation.
* @param <T> the data object type.
*/
public class CachedEntity<T> {
private final AtomicReference<T> entryDataRef;
private final Clock cacheClock;
private final long expiryDurationMS;
private volatile long entryTimeStamp;

public CachedEntity(TimeUnit expiryTimeUnit, long expiryLength, Clock clock) {
entryDataRef = new AtomicReference<>(null);
cacheClock = clock;
expiryDurationMS = TimeUnit.MILLISECONDS.convert(expiryLength, expiryTimeUnit);
entryTimeStamp = 0;
}

public CachedEntity(TimeUnit expiryTimeUnit, long expiryLength) {
this(expiryTimeUnit, expiryLength, new MonotonicClock());
}

/**
*
* @return true if expiration timestamp is 0, or the elapsed time since last update is
* greater than {@link #expiryDurationMS}
*/
public boolean isExpired() {
return (entryTimeStamp == 0)
|| ((cacheClock.getTime() - entryTimeStamp) > expiryDurationMS);
}

/**
* If the entry has expired, it reset the cache reference through {@link #clearExpiredEntry()}.
* @return cached data if the timestamp is valid. Null, if the timestamp has expired.
*/
public T getValue() {
if (isExpired()) { // quick check for expiration
if (clearExpiredEntry()) { // remove reference to the expired entry
return null;
}
}
return entryDataRef.get();
}

/**
* Safely sets the cached data.
* @param newEntry
*/
public void setValue(T newEntry) {
T currentEntry = entryDataRef.get();
while (!entryDataRef.compareAndSet(currentEntry, newEntry)) {
currentEntry = entryDataRef.get();
}
entryTimeStamp = cacheClock.getTime();
}

/**
* Enforces the expiration of the cached entry.
*/
public void enforceExpiration() {
entryTimeStamp = 0;
}

/**
* Safely deletes the reference to the data if it was not null.
* @return true if the reference is set to Null. False indicates that another thread
* updated the cache.
*/
private boolean clearExpiredEntry() {
T currentEntry = entryDataRef.get();
if (currentEntry == null) {
return true;
}
// the current value is not null: try to reset it.
// if the CAS is successful, then we won't override a recent update to the cache.
return (entryDataRef.compareAndSet(currentEntry, null));
}
}
12 changes: 12 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,18 @@ static Set<String> getPropertySet() {
TEZ_PREFIX + "test.minicluster.app.wait.on.shutdown.secs";
public static final long TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS_DEFAULT = 30;

/**
* Long value
* Status Cache timeout window in minutes for the DAGClient.
*/
@Private
@ConfigurationScope(Scope.CLIENT)
@ConfigurationProperty(type="long")
public static final String TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS = TEZ_PREFIX
+ "client.dag.status.cache.timeout-secs";
// Default timeout is 60 seconds.
public static final long TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS_DEFAULT = 60;

/**
* Long value
* Time to wait (in milliseconds) for yarn app's diagnotics is available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.CachedEntity;
import org.apache.tez.common.Preconditions;

import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
Expand Down Expand Up @@ -58,13 +60,15 @@ public class DAGClientImpl extends DAGClient {
private final String dagId;
private final TezConfiguration conf;
private final FrameworkClient frameworkClient;

/**
* Container to cache the last {@link DAGStatus}.
*/
private final CachedEntity<DAGStatus> cachedDAGStatusRef;
@VisibleForTesting
protected DAGClientInternal realClient;
private boolean dagCompleted = false;
private volatile boolean dagCompleted = false;
@VisibleForTesting
protected boolean isATSEnabled = false;
private DAGStatus cachedDagStatus = null;
Map<String, VertexStatus> cachedVertexStatus = new HashMap<String, VertexStatus>();

private static final long SLEEP_FOR_COMPLETION = 500;
Expand Down Expand Up @@ -110,6 +114,28 @@ public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf,
this.diagnoticsWaitTimeout = conf.getLong(
TezConfiguration.TEZ_CLIENT_DIAGNOSTICS_WAIT_TIMEOUT_MS,
TezConfiguration.TEZ_CLIENT_DIAGNOSTICS_WAIT_TIMEOUT_MS_DEFAULT);
cachedDAGStatusRef = initCacheDAGRefFromConf(conf);
}

/**
* Constructs a new {@link CachedEntity} for {@link DAGStatus}.
* @param tezConf TEZ configuration parameters.
* @return a caching entry to hold the {@link DAGStatus}.
*/
protected CachedEntity<DAGStatus> initCacheDAGRefFromConf(TezConfiguration tezConf) {
long clientDAGStatusCacheTimeOut = tezConf.getLong(
TezConfiguration.TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS,
TezConfiguration.TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS_DEFAULT);
if (clientDAGStatusCacheTimeOut <= 0) {
LOG.error("DAG Status cache timeout interval should be positive. Enforcing default value.");
clientDAGStatusCacheTimeOut =
TezConfiguration.TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS_DEFAULT;
}
return new CachedEntity<>(TimeUnit.SECONDS, clientDAGStatusCacheTimeOut);
}

protected CachedEntity<DAGStatus> getCachedDAGStatusRef() {
return cachedDAGStatusRef;
}

@Override
Expand All @@ -133,13 +159,11 @@ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
}

long startTime = System.currentTimeMillis();
boolean refreshStatus;
DAGStatus dagStatus;
if(cachedDagStatus != null) {
dagStatus = cachedDagStatus;
refreshStatus = true;
} else {
// For the first lookup only. After this cachedDagStatus should be populated.

DAGStatus dagStatus = cachedDAGStatusRef.getValue();
boolean refreshStatus = true;
if (dagStatus == null) {
// the first lookup only or when the cachedDAG has expired
dagStatus = getDAGStatus(statusOptions);
refreshStatus = false;
}
Expand Down Expand Up @@ -221,13 +245,14 @@ protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOpti
final DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout);

if (!dagCompleted) {
if (dagStatus != null) {
cachedDagStatus = dagStatus;
if (dagStatus != null) { // update the cached DAGStatus
cachedDAGStatusRef.setValue(dagStatus);
return dagStatus;
}
if (cachedDagStatus != null) {
DAGStatus cachedDAG = cachedDAGStatusRef.getValue();
if (cachedDAG != null) {
// could not get from AM (not reachable/ was killed). return cached status.
return cachedDagStatus;
return cachedDAG;
Copy link
Contributor

@abstractdog abstractdog Nov 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am I right to assume this is the codepath where the original issue happened? could you please clarify how can we indefinitely stuck here?
I mean, we can only hit this part if getDAGStatusViaAM returns null but dagCompleted is not true, so when we hit this again and again in getDAGStatusViaAM :

    } catch (TezException e) {
      // can be either due to a n/w issue of due to AM completed.
    } catch (IOException e) {
      // can be either due to a n/w issue of due to AM completed.
    }

also getApplicationReportInternal keeps returning null in checkAndSetDagCompletionStatus

was it the case for you?
if so, does it make sense to put at least debug level log messages to the silent catch branches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback.

Yes, considering the implementation of TezJob.run()-Line222 in Pig:

Pig polls on the DAGStatus inside an infinite loop:

        while (true) {
            try {
                dagStatus = dagClient.getDAGStatus(null);
            } catch (Exception e) {
                log.info("Cannot retrieve DAG status", e);
                break;
            }
           if (dagStatus.isCompleted()) {
               // do something
               // break;
           }
           sleep(1000);
       }  

Let's assume the following scenario on Tez Side:

  • Pig first iteration calls getDAGStatusViaAM() which successfully pulls the DAGStatus and updates the cachedDAGStatus to running.
  • Pig sleeps 1000
  • second call from Pig calls getDAGStatusViaAM() which encounters TezException or IOException. The call would return the last cachedDAGStatus (which is running), instead of null.
  • Since the status is running, the Pig-thread sleeps
  • This will keep going as long as the getDAGStatusViaAM() fails, and the last valid DAGStatus is still cached.

The problem in this corner case is that the Pig client will keep looping indefinitely as long as it does not receive a null or dagClient.getDAGStatus(null) does not throw an exception.
From a client perspective, it is better to fail early in order to recover faster.

}
}

Expand All @@ -253,8 +278,11 @@ protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOpti

// dag completed and Timeline service is either not enabled or does not have completion status
// return cached status if completion info is present.
if (dagCompleted && cachedDagStatus != null && cachedDagStatus.isCompleted()) {
return cachedDagStatus;
if (dagCompleted) {
DAGStatus cachedDag = cachedDAGStatusRef.getValue();
if (cachedDag != null && cachedDag.isCompleted()) {
return cachedDag;
}
}

// everything else fails rely on RM.
Expand Down Expand Up @@ -377,9 +405,11 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
// can be either due to a n/w issue or due to AM completed.
LOG.info("Cannot retrieve DAG Status due to TezException: {}", e.getMessage());
} catch (IOException e) {
// can be either due to a n/w issue of due to AM completed.
// can be either due to a n/w issue or due to AM completed.
LOG.info("Cannot retrieve DAG Status due to IOException: {}", e.getMessage());
}

if (dagStatus == null && !dagCompleted) {
Expand Down
Loading