Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDAP-21027 : upgrading hadoop version to 3.3.6 #15648

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cdap-app-fabric-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@ the License.
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-hbase-compat-1.0</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@

@Test
public void testMultiWriter() throws Exception {
runDynamicPartitionerMR(ORDERED_RECORDS, true, true);
runDynamicPartitionerMr(ORDERED_RECORDS, true, true);
}

@Test
public void testSingleWriter() throws Exception {
runDynamicPartitionerMR(ORDERED_RECORDS, false, true);
runDynamicPartitionerMr(ORDERED_RECORDS, false, true);
}

@Test
Expand All @@ -95,24 +95,24 @@
createRecord("john", 84125));
// the input data is not ordered by output partition and its limiting to a single writer,
// so we expect this job to fail
runDynamicPartitionerMR(records, false, false);
runDynamicPartitionerMr(records, false, false);
}

@Test
public void testPartitionAppend() throws Exception {
runDynamicPartitionerMR(ORDERED_RECORDS, true, true,
runDynamicPartitionerMr(ORDERED_RECORDS, true, true,
DynamicPartitioner.PartitionWriteOption.CREATE_OR_APPEND, true);
}

@Test
public void testPartitionAppendWhenNotConfigured() throws Exception {
// partition will exist beforehand, but the append option is not configured; hence the job is expected to fail
runDynamicPartitionerMR(ORDERED_RECORDS, true, true, DynamicPartitioner.PartitionWriteOption.CREATE, false);
runDynamicPartitionerMr(ORDERED_RECORDS, true, true, DynamicPartitioner.PartitionWriteOption.CREATE, false);
}

@Test
public void testPartitionOverwrite() throws Exception {
runDynamicPartitionerMR(ORDERED_RECORDS, true, true,
runDynamicPartitionerMr(ORDERED_RECORDS, true, true,
DynamicPartitioner.PartitionWriteOption.CREATE_OR_OVERWRITE, true);
}

Expand All @@ -124,19 +124,19 @@
partitionOutput.addPartition();
}

private void runDynamicPartitionerMR(final List<? extends GenericRecord> records,
private void runDynamicPartitionerMr(final List<? extends GenericRecord> records,
boolean allowConcurrentWriters,
boolean expectedStatus) throws Exception {
runDynamicPartitionerMR(records, allowConcurrentWriters, false, null, expectedStatus);
runDynamicPartitionerMr(records, allowConcurrentWriters, false, null, expectedStatus);
}

private void runDynamicPartitionerMR(final List<? extends GenericRecord> records,
private void runDynamicPartitionerMr(final List<? extends GenericRecord> records,
boolean allowConcurrentWriters,
final boolean precreatePartitions,
@Nullable final DynamicPartitioner.PartitionWriteOption partitionWriteOption,
boolean expectedStatus) throws Exception {
ApplicationWithPrograms app = deployApp(AppWithMapReduceUsingAvroDynamicPartitioner.class);

Check warning on line 138 in cdap-app-fabric-tests/src/test/java/io/cdap/cdap/internal/app/runtime/batch/DynamicPartitionerWithAvroTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'app' declaration and its first usage is 7, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).

final long now = System.currentTimeMillis();
final Multimap<PartitionKey, GenericRecord> keyToRecordsMap = groupByPartitionKey(records, now);

Expand Down Expand Up @@ -172,6 +172,10 @@
// run the partition writer m/r with this output partition time
Map<String, String> arguments = new HashMap<>();
arguments.put(OUTPUT_PARTITION_KEY, Long.toString(now));
// The test case and the output committer was written in hadoop 2, which followed older file output committer
// algorithm ( version 1 ). After upgrading to Hadoop 3, the default algorithm is version 2. So setting it manually
// to previous algorithm version to comply with with test.
tivv marked this conversation as resolved.
Show resolved Hide resolved
arguments.put("system.mapreduce.mapreduce.fileoutputcommitter.algorithm.version", "1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here explaining why this property is set. We also need to document this in case a customer needs to overwrite this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Regarding documentation, I can add to release notes, but this change is only applicable to map reduce programs, which is already deprecated. I don't think we use map reduce outside of tests anymore.

For spark, we already support custom arguments

arguments.put(allowConcurrencyKey, Boolean.toString(allowConcurrentWriters));
if (partitionWriteOption != null) {
arguments.put("partitionWriteOption", partitionWriteOption.name());
Expand Down
6 changes: 5 additions & 1 deletion cdap-app-fabric/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<artifactId>powermock-api-mockito2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand All @@ -265,6 +265,10 @@
<artifactId>cdap-event-reader-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
tivv marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>hamcrest-core</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
Expand All @@ -81,6 +82,7 @@
public class MapReduceProgramRunner extends AbstractProgramRunnerWithPlugin {

private static final Logger LOG = LoggerFactory.getLogger(MapReduceProgramRunner.class);
public static final String MAPREDUCE_CUSTOM_CONFIG_PREFIX = "system.mapreduce.";

private final Injector injector;
private final CConfiguration cConf;
Expand All @@ -100,6 +102,9 @@ public class MapReduceProgramRunner extends AbstractProgramRunnerWithPlugin {
private final RemoteClientFactory remoteClientFactory;
private final AppStateStoreProvider appStateStoreProvider;

/**
* Constructor for a program runner that launches a Map reduce program.
*/
@Inject
public MapReduceProgramRunner(Injector injector, CConfiguration cConf, Configuration hConf,
NamespacePathLocator locationFactory,
Expand Down Expand Up @@ -205,6 +210,9 @@ public ProgramController run(final Program program, ProgramOptions options) {
hConf.set(JobContext.QUEUE_NAME, schedulerQueue);
}

hConf = setCustomMapReduceConfig(hConf, options.getArguments());
hConf = setCustomMapReduceConfig(hConf, options.getUserArguments());

ClusterMode clusterMode = ProgramRunners.getClusterMode(options);
Service mapReduceRuntimeService = new MapReduceRuntimeService(injector, cConf, hConf,
mapReduce, spec,
Expand Down Expand Up @@ -241,4 +249,25 @@ private File getPluginArchive(ProgramOptions options) {
}
return new File(options.getArguments().getOption(ProgramOptionConstants.PLUGIN_ARCHIVE));
}

/**
* A method to apply any custom map reduce configuration passed in by user. This will override the default config by
* hadoop or map reduce.
* If a runtime argument is passed by the user prefixed by {@MAPREDUCE_CUSTOM_CONFIG_PREFIX} , then we add it to
* hConf. Which later will be taken as hadoop / map reduce configuration.
*/
private Configuration setCustomMapReduceConfig(Configuration hConf, Arguments options) {
fernst marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> systemArgs = options.asMap();
for (String name : systemArgs.keySet()) {
if (!name.startsWith(MAPREDUCE_CUSTOM_CONFIG_PREFIX)) {
continue;
}
String value = systemArgs.get(name);
String key = name.substring(MAPREDUCE_CUSTOM_CONFIG_PREFIX.length());
if (key != null && value != null) {
hConf.set(key, value);
}
}
return hConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;

Check warning on line 44 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/dataset/partitioned/DynamicPartitioningOutputCommitter.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.UnusedImportsCheck

Unused import - org.apache.hadoop.mapred.InvalidJobConfException.
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand Down Expand Up @@ -71,9 +72,18 @@
// Note that the outputPath passed in is treated as a temporary directory.
// The commitJob method moves the files from within this directory to an parent (final) directory.
// The cleanupJob method removes this directory.
public DynamicPartitioningOutputCommitter(Path outputPath, TaskAttemptContext context)

Check warning on line 75 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/dataset/partitioned/DynamicPartitioningOutputCommitter.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck

Missing a Javadoc comment.
throws IOException {
super(outputPath, context);

//This output committer only works with `mapreduce.fileoutputcommitter.algorithm.version` = 1
//Since hadoop 3, by default it's 2. Fail early if it's set to 2.
if (isCommitJobRepeatable(context)){

Check warning on line 81 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/dataset/partitioned/DynamicPartitioningOutputCommitter.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.WhitespaceAroundCheck

WhitespaceAround: '{' is not preceded with whitespace.
throw new IllegalArgumentException("DynamicPartitioningOutputCommitter requires the Hadoop conf " +

Check warning on line 82 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/dataset/partitioned/DynamicPartitioningOutputCommitter.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.OperatorWrapCheck

'+' should be on a new line.
"`mapreduce.fileoutputcommitter.algorithm.version` to be set to 1." +

Check warning on line 83 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/dataset/partitioned/DynamicPartitioningOutputCommitter.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.OperatorWrapCheck

'+' should be on a new line.
"But Found 2.");
}

this.taskContext = context;
this.jobSpecificOutputPath = outputPath;
}
Expand Down Expand Up @@ -313,7 +323,7 @@
return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
}

/**

Check warning on line 326 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/dataset/partitioned/DynamicPartitioningOutputCommitter.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.SummaryJavadocCheck

First sentence of Javadoc is missing an ending period.
* given two paths as input: base: /my/base/path file: /my/base/path/some/other/file return
* "some/other/file"
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public void testSendForPaginatedListResponder() throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

Mockito.doAnswer(invocation -> {
ByteBuffers.writeToStream(invocation.getArgumentAt(0, ByteBuffer.class), byteArrayOutputStream);
ByteBuffers.writeToStream(invocation.getArgument(0, ByteBuffer.class), byteArrayOutputStream);
return null;
}).when(chunkResponder).sendChunk(Mockito.any(ByteBuffer.class));

JsonPaginatedListResponder.respond(
new Gson(), responder, "applications", (jsonListResponder)-> {
new Gson(), responder, "applications", (jsonListResponder) -> {
jsonListResponder.send("application");
return "nextToken";
});
Expand All @@ -65,12 +65,12 @@ public void testMultipleSendForPaginatedListResponder() throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

Mockito.doAnswer(invocation -> {
ByteBuffers.writeToStream(invocation.getArgumentAt(0, ByteBuffer.class), byteArrayOutputStream);
ByteBuffers.writeToStream(invocation.getArgument(0, ByteBuffer.class), byteArrayOutputStream);
return null;
}).when(chunkResponder).sendChunk(Mockito.any(ByteBuffer.class));

JsonPaginatedListResponder.respond(
new Gson(), responder, "applications", (jsonListResponder)-> {
new Gson(), responder, "applications", (jsonListResponder) -> {
jsonListResponder.send("application0");
jsonListResponder.send("application1");
return "nextToken";
Expand All @@ -91,12 +91,12 @@ public void testSendForWholeListResponder() throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

Mockito.doAnswer(invocation -> {
ByteBuffers.writeToStream(invocation.getArgumentAt(0, ByteBuffer.class), byteArrayOutputStream);
ByteBuffers.writeToStream(invocation.getArgument(0, ByteBuffer.class), byteArrayOutputStream);
return null;
}).when(chunkResponder).sendChunk(Mockito.any(ByteBuffer.class));

JsonWholeListResponder.respond(
new Gson(), responder, (jsonListResponder)-> {
new Gson(), responder, (jsonListResponder) -> {
jsonListResponder.send("application");
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.InMemoryDiscoveryService;
import org.hamcrest.CoreMatchers;
import org.hamcrest.CustomTypeSafeMatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

/**
Expand Down Expand Up @@ -103,76 +103,70 @@ private static void setUpMockBehaviorForApplicationLifeCycleService() throws App
//Throw ApplicationNotFoundException when ever NOT_FOUND_APP is used
Mockito.doThrow(new ApplicationNotFoundException(new ApplicationId(NAMESPACE, NOT_FOUND_APP)))
.when(applicationLifecycleService)
.saveState(Mockito.argThat(new AppNameAppStateKeyValueMatcher("NOT_FOUND_APP", NOT_FOUND_APP)));
.saveState(Mockito.argThat(new AppNameAppStateKeyValueMatcher(NOT_FOUND_APP)));
Mockito.doThrow(new ApplicationNotFoundException(new ApplicationId(NAMESPACE, NOT_FOUND_APP)))
.when(applicationLifecycleService)
.getState(Mockito.argThat(new AppNameAppStateKeyMatcher("NOT_FOUND_APP", NOT_FOUND_APP)));
.getState(Mockito.argThat(new AppNameAppStateKeyMatcher(NOT_FOUND_APP)));
Mockito.doThrow(new ApplicationNotFoundException(new ApplicationId(NAMESPACE, NOT_FOUND_APP)))
.when(applicationLifecycleService)
.deleteState(Mockito.argThat(new AppNameAppStateKeyMatcher("NOT_FOUND_APP", NOT_FOUND_APP)));
.deleteState(Mockito.argThat(new AppNameAppStateKeyMatcher(NOT_FOUND_APP)));

//Throw RuntimeException whenever error app is being used
Mockito.doThrow(new RuntimeException("test")).when(applicationLifecycleService)
.saveState(Mockito.argThat(new AppNameAppStateKeyValueMatcher("ERROR_APP", ERROR_APP)));
.saveState(Mockito.argThat(new AppNameAppStateKeyValueMatcher(ERROR_APP)));
Mockito.doThrow(new RuntimeException("test")).when(applicationLifecycleService)
.getState(Mockito.argThat(new AppNameAppStateKeyMatcher("ERROR_APP", ERROR_APP)));
.getState(Mockito.argThat(new AppNameAppStateKeyMatcher(ERROR_APP)));
Mockito.doThrow(new RuntimeException("test")).when(applicationLifecycleService)
.deleteState(Mockito.argThat(new AppNameAppStateKeyMatcher("ERROR_APP", ERROR_APP)));
.deleteState(Mockito.argThat(new AppNameAppStateKeyMatcher(ERROR_APP)));

String encodedInvalidKey = Base64.getEncoder().encodeToString(MISSING_KEY.getBytes(StandardCharsets.UTF_8));
// Different response for valid and invalid keys
Mockito.when(
applicationLifecycleService.getState(
Mockito.argThat(new CustomTypeSafeMatcher<AppStateKey>("valid key match") {
@Override
protected boolean matchesSafely(AppStateKey item) {
return item.getAppName().equals(SUCCESS_APP) && !item.getStateKey().equals(encodedInvalidKey);
}
})))
applicationLifecycleService.getState(
Mockito.argThat(
item -> item == null ? false :
item.getAppName().equals(SUCCESS_APP) && !item.getStateKey().equals(encodedInvalidKey)
)))
.thenReturn(Optional.of(TEST_VALUE.getBytes(StandardCharsets.UTF_8)));

Mockito.when(
applicationLifecycleService.getState(
Mockito.argThat(new CustomTypeSafeMatcher<AppStateKey>("invalid key match") {
@Override
protected boolean matchesSafely(AppStateKey item) {
return item.getAppName().equals(SUCCESS_APP) && item.getStateKey().equals(encodedInvalidKey);
}
})))
applicationLifecycleService.getState(
Mockito.argThat(
item -> item == null ? false :
item.getAppName().equals(SUCCESS_APP) && item.getStateKey().equals(encodedInvalidKey)
)))
.thenReturn(Optional.empty());
}

/**
* Simple AppStateKeyValue matcher that matches for appname
*/
private static class AppNameAppStateKeyValueMatcher extends CustomTypeSafeMatcher<AppStateKeyValue> {
private static class AppNameAppStateKeyValueMatcher implements ArgumentMatcher<AppStateKeyValue> {
private final String appName;

public AppNameAppStateKeyValueMatcher(String description, String appName) {
super(description);
public AppNameAppStateKeyValueMatcher(String appName) {
this.appName = appName;
}

@Override
protected boolean matchesSafely(AppStateKeyValue item) {
return item.getAppName().equals(appName);
public boolean matches(AppStateKeyValue item) {
return item == null ? false : item.getAppName().equals(appName);
}
}

/**
* Simple AppStateKey matcher that matches for appname
*/
private static class AppNameAppStateKeyMatcher extends CustomTypeSafeMatcher<AppStateKey> {
private static class AppNameAppStateKeyMatcher implements ArgumentMatcher<AppStateKey> {
private final String appName;

public AppNameAppStateKeyMatcher(String description, String appName) {
super(description);
public AppNameAppStateKeyMatcher(String appName) {
this.appName = appName;
}

@Override
protected boolean matchesSafely(AppStateKey item) {
return item.getAppName().equals(appName);
public boolean matches(AppStateKey item) {
return item == null ? false : item.getAppName().equals(appName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.common.utils.Tasks;
import io.cdap.cdap.internal.AppFabricTestHelper;
import io.cdap.cdap.security.server.LDAPLoginModule;
import io.cdap.cdap.security.server.LdapLoginModule;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -66,7 +66,7 @@ public void startStopServer() throws Exception {
}

@Test
public void testSSL() throws IOException {
public void testSsl() throws IOException {
CConfiguration cConf = CConfiguration.create();
cConf.setBoolean(Constants.Security.SSL.INTERNAL_ENABLED, true);
SConfiguration sConf = SConfiguration.create();
Expand All @@ -85,7 +85,7 @@ public void testSSL() throws IOException {
Assert.assertTrue(URIScheme.HTTPS.isMatch(discoverable));
InetSocketAddress addr = discoverable.getSocketAddress();
// Since the server uses a self signed certificate we need a client that trusts all certificates
SSLSocket socket = (SSLSocket) LDAPLoginModule.TrustAllSSLSocketFactory.getDefault()
SSLSocket socket = (SSLSocket) LdapLoginModule.TrustAllSslSocketFactory.getDefault()
.createSocket(addr.getHostName(), addr.getPort());
socket.setSoTimeout(5000); // in millis
// Would throw exception if the server does not support ssl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
CapabilityApplier capabilityApplier = new CapabilityApplier(null, null,
null, null, null,
artifactRepository, cConf, remoteClientFactory);
CapabilityApplier ca = Mockito.spy(capabilityApplier);

Check warning on line 84 in cdap-app-fabric/src/test/java/io/cdap/cdap/internal/capability/AutoInstallTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'ca' declaration and its first usage is 11, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
PowerMockito.mockStatic(HttpClients.class);
PowerMockito.mockStatic(Files.class);
PowerMockito.mockStatic(File.class);
Expand Down Expand Up @@ -152,7 +152,6 @@
ImmutableSet.of());
Mockito.verify(artifactRepository, Mockito.times(1)).writeArtifactProperties(artifact, properties);
// Verify that temp file was deleted
PowerMockito.verifyStatic();
java.nio.file.Files.deleteIfExists(mockPath);
}

Expand Down
Loading
Loading