Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions bigtop-packages/src/common/hive/patch1-HIVE-23190.diff

This file was deleted.

258 changes: 258 additions & 0 deletions bigtop-packages/src/common/hive/patch1-HIVE-27336.diff
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
From 5f79f2a059534eaa63c0479a2142a250fa78c1e3 Mon Sep 17 00:00:00 2001
From: amaruthappan <alagappan.maruthappan@yahooinc.com>
Date: Mon, 8 May 2023 17:00:46 -0700
Subject: [PATCH 1/2] =?UTF-8?q?HIVE-23190:=20LLAP:=20modify=20IndexCache?=
=?UTF-8?q?=20to=20pass=20filesystem=20object=20to=20TezSpillRecord=20(L?=
=?UTF-8?q?=C3=A1szl=C3=B3=20Bodor=20reviewed=20by=20Rajesh=20Balamohan)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
.../hadoop/hive/llap/shufflehandler/IndexCache.java | 11 +++++++++++
1 file changed, 11 insertions(+)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
index 4de03f232d70..c7b986469f4a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
@@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -42,11 +43,21 @@ class IndexCache {

private final LinkedBlockingQueue<String> queue =
new LinkedBlockingQueue<String>();
+ private FileSystem fs;

public IndexCache(Configuration conf) {
this.conf = conf;
totalMemoryAllowed = 10 * 1024 * 1024;
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+ initLocalFs();
+ }
+
+ private void initLocalFs() {
+ try {
+ this.fs = FileSystem.getLocal(conf).getRaw();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}

/**

From 1edbe403ff424f91ed0cd1ae91eb39290b5beb7f Mon Sep 17 00:00:00 2001
From: amaruthappan <alagappan.maruthappan@yahooinc.com>
Date: Mon, 8 May 2023 16:58:51 -0700
Subject: [PATCH 2/2] HIVE-27336: Upgrade Tez to 0.10.2 in Hive-3.X

---
.../hive/llap/daemon/impl/ContainerRunnerImpl.java | 4 ++--
.../hadoop/hive/llap/daemon/impl/LlapTaskReporter.java | 2 +-
.../hive/llap/daemon/impl/TaskRunnerCallable.java | 2 +-
.../hadoop/hive/llap/shufflehandler/IndexCache.java | 2 +-
.../hive/llap/tezplugins/LlapTaskCommunicator.java | 10 +++++-----
.../hive/llap/tezplugins/LlapTaskSchedulerService.java | 8 ++++----
.../hive/llap/tezplugins/TestLlapTaskCommunicator.java | 1 +
pom.xml | 2 +-
.../org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 7 +++++++
9 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index ef5922ef41b6..95d601a8e2f3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -215,7 +215,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());

// This is the start of container-annotated logging.
- final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString();
+ final String dagId = attemptId.getDAGID().toString();
final String queryId = vertex.getHiveQueryId();
final String fragmentId = LlapTezUtils.stripAttemptPrefix(fragmentIdString);
MDC.put("dagId", dagId);
@@ -237,7 +237,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser());

TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(fragmentIdString);
- int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
+ int dagIdentifier = taskAttemptId.getDAGID().getId();

QueryIdentifier queryIdentifier = new QueryIdentifier(
qIdProto.getApplicationIdString(), dagIdentifier);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index 33ade55ee1f5..cc7879cdecea 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -291,7 +291,7 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
int fromPreRoutedEventId = task.getNextPreRoutedEventId();
int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
- containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
+ containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat to AM, request=" + request);
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 7f436e23264b..66f7c330f786 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -327,7 +327,7 @@ private String constructThreadNameSuffix(TezTaskAttemptID taskAttemptId) {
StringBuilder sb = new StringBuilder();
TezTaskID taskId = taskAttemptId.getTaskID();
TezVertexID vertexId = taskId.getVertexID();
- TezDAGID dagId = vertexId.getDAGId();
+ TezDAGID dagId = vertexId.getDAGID();
ApplicationId appId = dagId.getApplicationId();
long clusterTs = appId.getClusterTimestamp();
long clusterTsShort = clusterTs % 1_000_000L;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
index c7b986469f4a..cc5019a64d84 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
@@ -129,7 +129,7 @@ private IndexInformation readIndexFileToCache(Path indexFileName,
LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
TezSpillRecord tmp = null;
try {
- tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
+ tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
} catch (Throwable e) {
tmp = new TezSpillRecord(0);
cache.remove(mapId);
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 5d4ce223d9e9..5eebe10ac9a3 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -322,7 +322,7 @@ public <T> void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo assig
UpdateFragmentRequestProto request = UpdateFragmentRequestProto.newBuilder()
.setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString())
.setQueryIdentifier(constructQueryIdentifierProto(
- attemptId.getTaskID().getVertexID().getDAGId().getId())).build();
+ attemptId.getDAGID().getId())).build();

communicator.sendUpdateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<UpdateFragmentResponseProto>() {
@@ -349,7 +349,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task
int priority) {
super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
credentialsChanged, priority);
- int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
+ int dagId = taskSpec.getTaskAttemptID().getDAGID().getId();
if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) {
// TODO HiveQueryId extraction by parsing the Processor payload is ugly. This can be improved
// once TEZ-2672 is fixed.
@@ -505,7 +505,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
TerminateFragmentRequestProto request =
TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(
constructQueryIdentifierProto(
- taskAttemptId.getTaskID().getVertexID().getDAGId().getId()))
+ taskAttemptId.getDAGID().getId()))
.setFragmentIdentifierString(taskAttemptId.toString()).build();
communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
@@ -649,7 +649,7 @@ private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId co

private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString,
final boolean isDone, final String nmAddress) {
- String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
+ String dagId = attemptID.getDAGID().toString();
String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""),
"?nm.id=", nmAddress);
String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers",
@@ -794,7 +794,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI
builder.setAmPort(getAddress().getPort());

Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
- taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+ taskSpec.getTaskAttemptID().getDAGID().getId());
ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
if (credentialsBinary == null) {
credentialsBinary = serializeCredentials(getContext().getCurrentDagInfo().getCredentials());
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 82179645da00..99038cd49542 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -1075,7 +1075,7 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin
writeLock.lock();
try {
if (!dagRunning && metrics != null && id != null) {
- metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
+ metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
dagStats.registerTaskRequest(hosts, racks);
@@ -1099,7 +1099,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container
writeLock.lock();
try {
if (!dagRunning && metrics != null && id != null) {
- metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
+ metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
dagStats.registerTaskRequest(null, null);
@@ -1114,7 +1114,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container
protected TezTaskAttemptID getTaskAttemptId(Object task) {
// TODO: why does Tez API use "Object" for this?
if (task instanceof TaskAttempt) {
- return ((TaskAttempt)task).getID();
+ return ((TaskAttempt)task).getTaskAttemptID();
}
throw new AssertionError("LLAP plugin can only schedule task attempts");
}
@@ -2030,7 +2030,7 @@ private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> r
continue; // Not the right host.
}
Map<Integer,Set<Integer>> depInfo = getDependencyInfo(
- taskInfo.attemptId.getTaskID().getVertexID().getDAGId());
+ taskInfo.attemptId.getDAGID());
Set<Integer> vertexDepInfo = null;
if (depInfo != null) {
vertexDepInfo = depInfo.get(forVertex);
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
index 5efe7c677ce6..2fa2487a74d7 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -364,6 +364,7 @@ private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID vertexId, int
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(
TezTaskID.getInstance(vertexId, taskIdx), 0);
doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
+ doReturn(taskAttemptId.getDAGID()).when(taskSpec).getDAGID();
doReturn(DAG_NAME).when(taskSpec).getDAGName();
doReturn(vertexName).when(taskSpec).getVertexName();
ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload);
diff --git a/pom.xml b/pom.xml
index cb54806ef5ca..053ccc059f3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,7 +196,7 @@
<slf4j.version>1.7.10</slf4j.version>
<ST4.version>4.0.4</ST4.version>
<storage-api.version>2.7.0</storage-api.version>
- <tez.version>0.9.1</tez.version>
+ <tez.version>0.10.2</tez.version>
<super-csv.version>2.2.0</super-csv.version>
<spark.version>2.3.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index a15482f19c43..288341a2b229 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -761,5 +761,12 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts>
return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
}
}
+
+ @Override
+ public String getWebUIAddress() throws IOException, TezException {
+ synchronized (dagClient) {
+ return dagClient.getWebUIAddress();
+ }
+ }
}
}