Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -1828,6 +1828,18 @@ public TezConfiguration(boolean loadDefaults) {
TEZ_PREFIX + "dag.recovery.enabled";
public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;


/**
* Boolean value. When set, this enables AM to fail when DAG recovery is enabled and
* restarted app master did not find anything to recover
* Expert level setting.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="boolean")
public static final String TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA =
TEZ_AM_PREFIX + "failure.on.missing.recovery.data";
public static final boolean TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT = false;

/**
* Int value. Size in bytes for the IO buffer size while processing the recovery file.
* Expert level setting.
Expand Down
26 changes: 22 additions & 4 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -1916,10 +1916,7 @@ private DAGRecoveryData recoverDAG() throws IOException, TezException {
LOG.info("Recovering data from previous attempts"
+ ", currentAttemptId=" + this.appAttemptID.getAttemptId());
this.state = DAGAppMasterState.RECOVERING;
RecoveryParser recoveryParser = new RecoveryParser(
this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
return recoveredDAGData;
return parseDAGFromRecoveryData();
}
} finally {
hadoopShim.clearHadoopCallerContext();
Expand All @@ -1928,6 +1925,27 @@ private DAGRecoveryData recoverDAG() throws IOException, TezException {
return null;
}

private DAGRecoveryData parseDAGFromRecoveryData() throws IOException {
RecoveryParser recoveryParser = new RecoveryParser(
this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();

/**
* Parsed recovery data can be NULL in scenarios where AM shutdown prematurely during the first attempt
* due to some FATAL error, if that happens recovery stream is not closed and no data is flushed on File System
* In cases like above, in next future attempts of application, recovery returns NULL instead of failing the DAG
* This config when enabled, throws an IOException for such cases, and it assumes that caller will catch these
* IOExceptions and will fail the DAG, which happens currently, JIRA: https://issues.apache.org/jira/browse/TEZ-4474
*/
if(Objects.isNull(recoveredDAGData) && amConf.getBoolean(
TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA,
TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT)) {
throw new IOException(String.format("Found nothing to recover in currentAttemptId=%s from recovery data dir=%s",
this.appAttemptID.getAttemptId(), this.recoveryDataDir));
}
return recoveredDAGData;
}

@Override
public void serviceStart() throws Exception {
//start all the components
Expand Down
156 changes: 128 additions & 28 deletions tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,32 @@

package org.apache.tez.dag.app;

import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.records.TezVertexID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.tez.common.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.TezApiVersionInfo;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
Expand All @@ -78,13 +56,44 @@
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestDAGAppMaster {

Expand Down Expand Up @@ -332,6 +341,97 @@ public void testParseAllPluginsCustomAndYarnSpecified() throws IOException {
assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
}

@Test(timeout = 60000)
public void testShutdownTezAMWithMissingRecoveryAndFailureOnMissingData() throws Exception {

TezConfiguration conf = new TezConfiguration();
conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, true);
conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);

/*
Setting very high timeout because in case when TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA is set, it should
not time out, it should get shutdown earlier only without the timeout flow kicking in
*/
conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1000000000);
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);

FileSystem mockFs = mock(FileSystem.class);
when(mockFs.exists(any())).thenReturn(false);

DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);

dam.init(conf);
Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
field.setAccessible(true);
field.set(dam, mockFs);

dam.start();

ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
// This ensures that recovery data file system was called for getting summary files, and it will return false
verify(mockFs, times(2)).exists(captor.capture());

Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/summary"));
Assert.assertTrue(captor.getAllValues().get(0).toString().contains("/recovery/1/RecoveryFatalErrorOccurred"));

verify(dam.mockScheduler).setShouldUnregisterFlag();
verify(dam.mockShutdown).shutdown();

/*
* Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is set,
* DAG will be in ERRORed state if recovery was missing for attempts > 1
*/
assertEquals(DAGAppMasterState.ERROR, dam.getState());
}

@Test
public void testShutdownTezAMWithMissingRecoveryAndNoFailureOnMissingData() throws Exception {

TezConfiguration conf = new TezConfiguration();
conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, false);
conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1);
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);

FileSystem mockFs = mock(FileSystem.class);
when(mockFs.exists(any())).thenReturn(false);

DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);

dam.init(conf);
Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
field.setAccessible(true);
field.set(dam, mockFs);

dam.start();
// Waiting for session timeout interval to kick in, which is set to 1 s
Thread.sleep(2000);

ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
// This ensures that recovery data file system was called for getting summary files, and it will return false
verify(mockFs, times(2)).exists(captor.capture());

Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/summary"));
Assert.assertTrue(captor.getAllValues().get(0).toString().contains("/recovery/1/RecoveryFatalErrorOccurred"));

verify(dam.mockScheduler).setShouldUnregisterFlag();
verify(dam.mockShutdown).shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're testing the new feature here, I'm missing something here:

  1. it's not clear for the first sight which part of the spy fs caused this expected behavior? (returning with ERROR)
  2. is there a chance to extend this method to reflect what happens in case of TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA=false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abstractdog , enhanced the test case

  1. For the spy, now I am checking the exact number of invocations + I am capturing the values in each invocation to confirm this happened during recovery flow only and that too during summary file fetch
  2. I created a separate test case to capture that scenario also when TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA is false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @mudit-97 , just a few questions:

  1. I'm not 100% sure, but do we really need spy here? as far as I know we use spies when really need an instance created by us instead of a mocked one, would you consider trying to use a mock here, which might be much simpler?
  2. if we end up using a spy here, please fix the method arguments' name...I know, this might look nitpicking, but even if we don't use them now, it's always a code smell to have args like "boolean b, int i, short i1, long l"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abstractdog , converted it to mock and removed spy, please check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very cool, thanks, one more comment :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abstractdog done that change also, please check


/*
* Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is unset,
* DAG will be in SUCCEEDED state if recovery was missing and timeout got triggered for attempts > 1
*/
assertEquals(DAGAppMasterState.SUCCEEDED, dam.getState());
}

private void verifyDescAndMap(List<NamedEntityDescriptor> descriptors, BiMap<String, Integer> map,
int numExpected, boolean verifyPayload,
String... expectedNames) throws
Expand Down