diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index ef5922ef41b6..95d601a8e2f3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -215,7 +215,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); // This is the start of container-annotated logging. - final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString(); + final String dagId = attemptId.getDAGID().toString(); final String queryId = vertex.getHiveQueryId(); final String fragmentId = LlapTezUtils.stripAttemptPrefix(fragmentIdString); MDC.put("dagId", dagId); @@ -237,7 +237,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser()); TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(fragmentIdString); - int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); + int dagIdentifier = taskAttemptId.getDAGID().getId(); QueryIdentifier queryIdentifier = new QueryIdentifier( qIdProto.getApplicationIdString(), dagIdentifier); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 33ade55ee1f5..cc7879cdecea 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -291,7 +291,7 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t int fromPreRoutedEventId = task.getNextPreRoutedEventId(); int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle()); TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId, - containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents); + containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, 0); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat to AM, request=" + request); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 7f436e23264b..66f7c330f786 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -327,7 +327,7 @@ private String constructThreadNameSuffix(TezTaskAttemptID taskAttemptId) { StringBuilder sb = new StringBuilder(); TezTaskID taskId = taskAttemptId.getTaskID(); TezVertexID vertexId = taskId.getVertexID(); - TezDAGID dagId = vertexId.getDAGId(); + TezDAGID dagId = vertexId.getDAGID(); ApplicationId appId = dagId.getApplicationId(); long clusterTs = appId.getClusterTimestamp(); long clusterTsShort = clusterTs % 1_000_000L; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java index 4de03f232d70..cc5019a64d84 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; @@ -42,11 +43,21 @@ class IndexCache { private final LinkedBlockingQueue queue = new LinkedBlockingQueue(); + private FileSystem fs; public IndexCache(Configuration conf) { this.conf = conf; totalMemoryAllowed = 10 * 1024 * 1024; LOG.info("IndexCache created with max memory = " + totalMemoryAllowed); + initLocalFs(); + } + + private void initLocalFs() { + try { + this.fs = FileSystem.getLocal(conf).getRaw(); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** @@ -118,7 +129,7 @@ private IndexInformation readIndexFileToCache(Path indexFileName, LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ; TezSpillRecord tmp = null; try { - tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner); + tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner); } catch (Throwable e) { tmp = new TezSpillRecord(0); cache.remove(mapId); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 5d4ce223d9e9..5eebe10ac9a3 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -322,7 +322,7 @@ public void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo assig UpdateFragmentRequestProto request = UpdateFragmentRequestProto.newBuilder() .setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString()) .setQueryIdentifier(constructQueryIdentifierProto( - attemptId.getTaskID().getVertexID().getDAGId().getId())).build(); + attemptId.getDAGID().getId())).build(); communicator.sendUpdateFragment(request, nodeId.getHostname(), nodeId.getPort(), new LlapProtocolClientProxy.ExecuteRequestCallback() { @@ -349,7 +349,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task int priority) { super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); - int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); + int dagId = taskSpec.getTaskAttemptID().getDAGID().getId(); if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) { // TODO HiveQueryId extraction by parsing the Processor payload is ugly. This can be improved // once TEZ-2672 is fixed. @@ -505,7 +505,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, TerminateFragmentRequestProto request = TerminateFragmentRequestProto.newBuilder().setQueryIdentifier( constructQueryIdentifierProto( - taskAttemptId.getTaskID().getVertexID().getDAGId().getId())) + taskAttemptId.getDAGID().getId())) .setFragmentIdentifierString(taskAttemptId.toString()).build(); communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), new LlapProtocolClientProxy.ExecuteRequestCallback() { @@ -649,7 +649,7 @@ private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId co private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString, final boolean isDone, final String nmAddress) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); + String dagId = attemptID.getDAGID().toString(); String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""), "?nm.id=", nmAddress); String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers", @@ -794,7 +794,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI builder.setAmPort(getAddress().getPort()); Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() == - taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); + taskSpec.getTaskAttemptID().getDAGID().getId()); ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); if (credentialsBinary == null) { credentialsBinary = serializeCredentials(getContext().getCurrentDagInfo().getCredentials()); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 82179645da00..99038cd49542 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -1075,7 +1075,7 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin writeLock.lock(); try { if (!dagRunning && metrics != null && id != null) { - metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString()); + metrics.setDagId(id.getDAGID().toString()); } dagRunning = true; dagStats.registerTaskRequest(hosts, racks); @@ -1099,7 +1099,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container writeLock.lock(); try { if (!dagRunning && metrics != null && id != null) { - metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString()); + metrics.setDagId(id.getDAGID().toString()); } dagRunning = true; dagStats.registerTaskRequest(null, null); @@ -1114,7 +1114,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container protected TezTaskAttemptID getTaskAttemptId(Object task) { // TODO: why does Tez API use "Object" for this? if (task instanceof TaskAttempt) { - return ((TaskAttempt)task).getID(); + return ((TaskAttempt)task).getTaskAttemptID(); } throw new AssertionError("LLAP plugin can only schedule task attempts"); } @@ -2030,7 +2030,7 @@ private List preemptTasksFromMap(TreeMap> r continue; // Not the right host. } Map> depInfo = getDependencyInfo( - taskInfo.attemptId.getTaskID().getVertexID().getDAGId()); + taskInfo.attemptId.getDAGID()); Set vertexDepInfo = null; if (depInfo != null) { vertexDepInfo = depInfo.get(forVertex); diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java index 5efe7c677ce6..2fa2487a74d7 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java @@ -364,6 +364,7 @@ private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID vertexId, int TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance( TezTaskID.getInstance(vertexId, taskIdx), 0); doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID(); + doReturn(taskAttemptId.getDAGID()).when(taskSpec).getDAGID(); doReturn(DAG_NAME).when(taskSpec).getDAGName(); doReturn(vertexName).when(taskSpec).getVertexName(); ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload); diff --git a/pom.xml b/pom.xml index cb54806ef5ca..053ccc059f3f 100644 --- a/pom.xml +++ b/pom.xml @@ -196,7 +196,7 @@ 1.7.10 4.0.4 2.7.0 - 0.9.1 + 0.10.2 2.2.0 2.3.0 2.11 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index a15482f19c43..288341a2b229 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -761,5 +761,12 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts); } } + + @Override + public String getWebUIAddress() throws IOException, TezException { + synchronized (dagClient) { + return dagClient.getWebUIAddress(); + } + } } }