Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,7 @@
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/*.json</exclude>
<exclude>CHANGES.txt</exclude>
<exclude>**/LICENSE*</exclude>
<!-- IDE files -->
Expand Down
15 changes: 14 additions & 1 deletion tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -200,6 +203,16 @@ static boolean setupTezJarsLocalResources(TezConfiguration conf,
return usingTezArchive;
}

public static ServicePluginsDescriptor createPluginsDescriptorFromJSON(InputStream is) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
if (is != null) {
return objectMapper.readValue(is, ServicePluginsDescriptor.class);
} else {
return ServicePluginsDescriptor.create(false);
}
}

private static boolean addLocalResources(Configuration conf,
String[] configUris, Map<String, LocalResource> tezJarResources,
Credentials credentials) throws IOException {
Expand Down Expand Up @@ -834,7 +847,7 @@ public static void addLog4jSystemProperties(String logLevel,
}
}

static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
public static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
ServicePluginsDescriptor servicePluginsDescriptor) {
assert amConf != null;
ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@

@SuppressWarnings("unchecked")
public class NamedEntityDescriptor<T extends NamedEntityDescriptor<T>> extends EntityDescriptor<NamedEntityDescriptor<T>> {
private final String entityName;

private String entityName;

/**
* Public constructor to allow this descriptor to be instantiated by Jackson.
*/
@InterfaceAudience.Private
public NamedEntityDescriptor() {}

@InterfaceAudience.Private
public NamedEntityDescriptor(String entityName, String className) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2332,4 +2332,11 @@ static Set<String> getPropertySet() {
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks";

/**
* Comma-separated list of additional hadoop config files to load from CLASSPATH in standalone mode.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_STANDALONE_CONFS = TEZ_AM_PREFIX + "standalone.confs";
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class TezConstants {
TEZ_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT =
"security.job.client.protocol.acl";

public static final String SERVICE_PLUGINS_DESCRIPTOR_JSON = "service_plugins_descriptor.json";
public static final String TEZ_PB_BINARY_CONF_NAME = "tez-conf.pb";
public static final String TEZ_PB_PLAN_BINARY_NAME = "tez-dag.pb";
public static final String TEZ_PB_PLAN_TEXT_NAME = "tez-dag.pb.txt";
Expand Down
13 changes: 9 additions & 4 deletions tez-api/src/main/java/org/apache/tez/dag/api/UserPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import javax.annotation.Nullable;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -31,12 +31,17 @@
* Wrapper class to hold user payloads
* Provides a version to help in evolving the payloads
*/
@Public
@InterfaceAudience.Public
public final class UserPayload {
private final ByteBuffer payload;
private final int version;
private ByteBuffer payload;
private int version;
private static final ByteBuffer EMPTY_BYTE = ByteBuffer.wrap(new byte[0]);

/**
* Public constructor to allow this descriptor to be instantiated by Jackson.
*/
public UserPayload() {}

private UserPayload(@Nullable ByteBuffer payload) {
this(payload, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
@InterfaceStability.Unstable
public class ContainerLauncherDescriptor extends NamedEntityDescriptor<ContainerLauncherDescriptor> {

/**
* Public constructor to allow this descriptor to be instantiated by Jackson.
*/
@InterfaceAudience.Private
public ContainerLauncherDescriptor() {}

private ContainerLauncherDescriptor(String containerLauncherName, String containerLauncherClassname) {
super(containerLauncherName, containerLauncherClassname);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
@InterfaceStability.Unstable
public class ServicePluginsDescriptor {

private final boolean enableContainers;
private final boolean enableUber;
private boolean enableContainers;
private boolean enableUber;

private TaskSchedulerDescriptor[] taskSchedulerDescriptors;
private ContainerLauncherDescriptor[] containerLauncherDescriptors;
private TaskCommunicatorDescriptor[] taskCommunicatorDescriptors;

@InterfaceAudience.Private
public ServicePluginsDescriptor() {}

private ServicePluginsDescriptor(boolean enableContainers, boolean enableUber,
TaskSchedulerDescriptor[] taskSchedulerDescriptors,
ContainerLauncherDescriptor[] containerLauncherDescriptors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
@InterfaceStability.Unstable
public class TaskCommunicatorDescriptor extends NamedEntityDescriptor<TaskCommunicatorDescriptor> {

/**
* Public constructor to allow this descriptor to be instantiated by Jackson.
*/
@InterfaceAudience.Private
public TaskCommunicatorDescriptor() {}

private TaskCommunicatorDescriptor(String taskCommName, String taskCommClassname) {
super(taskCommName, taskCommClassname);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
@InterfaceStability.Unstable
public class TaskSchedulerDescriptor extends NamedEntityDescriptor<TaskSchedulerDescriptor> {

/**
* Public constructor to allow this descriptor to be instantiated by Jackson.
*/
@InterfaceAudience.Private
public TaskSchedulerDescriptor() { }

private TaskSchedulerDescriptor(String taskSchedulerName, String schedulerClassname) {
super(taskSchedulerName, schedulerClassname);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -74,14 +75,21 @@ public final class TezUtilsInternal {

private TezUtilsInternal() {}

public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
IOException {
public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws IOException {
File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
try (FileInputStream fis = new FileInputStream(confPBFile)) {
return ConfigurationProto.parseFrom(fis);
}
}

public static Configuration readTezConfigurationXml(InputStream is) {
Configuration configuration = new Configuration();
if (is != null) {
configuration.addResource(is);
}
return configuration;
}

public static void addUserSpecifiedTezConfiguration(Configuration conf,
List<PlanKeyValuePair> kvPairList) {
if (kvPairList != null && !kvPairList.isEmpty()) {
Expand Down
36 changes: 36 additions & 0 deletions tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@

package org.apache.tez.common;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.io.InputStream;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;

import com.google.protobuf.ByteString;

Expand Down Expand Up @@ -291,4 +301,30 @@ public void testPopulateConfProtoFromEntries() {
assertEquals(confBuilder.getConfKeyValuesList().size(), 1);
}

@Test(timeout = 5000)
public void testReadTezConfigurationXmlFromClasspath() throws IOException {
InputStream is = ClassLoader.getSystemResourceAsStream(TezConfiguration.TEZ_SITE_XML);
Configuration conf = TezUtilsInternal.readTezConfigurationXml(is);
assertEquals("tez.tar.gz", conf.get("tez.lib.uris"));
}

@Test(timeout = 5000)
public void testPluginsDescriptorFromJSON() throws IOException {
InputStream is = ClassLoader.getSystemResourceAsStream(TezConstants.SERVICE_PLUGINS_DESCRIPTOR_JSON);
ServicePluginsDescriptor spd = TezClientUtils.createPluginsDescriptorFromJSON(is);
TaskSchedulerDescriptor tsd = spd.getTaskSchedulerDescriptors()[0];
ContainerLauncherDescriptor cld = spd.getContainerLauncherDescriptors()[0];
TaskCommunicatorDescriptor tcd = spd.getTaskCommunicatorDescriptors()[0];

assertFalse(spd.areContainersEnabled());
assertTrue(spd.isUberEnabled());
assertEquals("testScheduler0_class", tsd.getClassName());
assertEquals("testScheduler0", tsd.getEntityName());
assertEquals("testLauncher0_class", cld.getClassName());
assertEquals("testLauncher0", cld.getEntityName());
assertEquals("testComm0_class", tcd.getClassName());
assertEquals("testComm0", tcd.getEntityName());
assertEquals(1, tcd.getUserPayload().getVersion());
assertArrayEquals(new byte[] {0, 0, 0, 1}, tcd.getUserPayload().deepCopyAsArray());
}
}
26 changes: 26 additions & 0 deletions tez-common/src/test/resources/service_plugins_descriptor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"taskSchedulerDescriptors": [
{
"className": "testScheduler0_class",
"entityName": "testScheduler0"
}
],
"containerLauncherDescriptors": [
{
"className": "testLauncher0_class",
"entityName": "testLauncher0"
}
],
"taskCommunicatorDescriptors": [
{
"userPayload": {
"payload": "AAAAAQ==",
"version": 1
},
"className": "testComm0_class",
"entityName": "testComm0"
}
],
"enableContainers": false,
"enableUber": true
}
22 changes: 22 additions & 0 deletions tez-common/src/test/resources/tez-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->

<configuration>
<property>
<name>tez.lib.uris</name>
<value>tez.tar.gz</value>
</property>
</configuration>
Loading