Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());

// This is the start of container-annotated logging.
final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString();
final String dagId = attemptId.getDAGID().toString();
final String queryId = vertex.getHiveQueryId();
final String fragmentId = LlapTezUtils.stripAttemptPrefix(fragmentIdString);
MDC.put("dagId", dagId);
Expand All @@ -237,7 +237,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser());

TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(fragmentIdString);
int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
int dagIdentifier = taskAttemptId.getDAGID().getId();

QueryIdentifier queryIdentifier = new QueryIdentifier(
qIdProto.getApplicationIdString(), dagIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
int fromPreRoutedEventId = task.getNextPreRoutedEventId();
int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat to AM, request=" + request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private String constructThreadNameSuffix(TezTaskAttemptID taskAttemptId) {
StringBuilder sb = new StringBuilder();
TezTaskID taskId = taskAttemptId.getTaskID();
TezVertexID vertexId = taskId.getVertexID();
TezDAGID dagId = vertexId.getDAGId();
TezDAGID dagId = vertexId.getDAGID();
ApplicationId appId = dagId.getApplicationId();
long clusterTs = appId.getClusterTimestamp();
long clusterTsShort = clusterTs % 1_000_000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
Expand All @@ -42,11 +43,21 @@ class IndexCache {

private final LinkedBlockingQueue<String> queue =
new LinkedBlockingQueue<String>();
private FileSystem fs;

public IndexCache(Configuration conf) {
this.conf = conf;
totalMemoryAllowed = 10 * 1024 * 1024;
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
initLocalFs();
}

private void initLocalFs() {
try {
this.fs = FileSystem.getLocal(conf).getRaw();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
Expand Down Expand Up @@ -118,7 +129,7 @@ private IndexInformation readIndexFileToCache(Path indexFileName,
LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
TezSpillRecord tmp = null;
try {
tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
} catch (Throwable e) {
tmp = new TezSpillRecord(0);
cache.remove(mapId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public <T> void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo assig
UpdateFragmentRequestProto request = UpdateFragmentRequestProto.newBuilder()
.setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString())
.setQueryIdentifier(constructQueryIdentifierProto(
attemptId.getTaskID().getVertexID().getDAGId().getId())).build();
attemptId.getDAGID().getId())).build();

communicator.sendUpdateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<UpdateFragmentResponseProto>() {
Expand All @@ -349,7 +349,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task
int priority) {
super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
credentialsChanged, priority);
int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
int dagId = taskSpec.getTaskAttemptID().getDAGID().getId();
if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) {
// TODO HiveQueryId extraction by parsing the Processor payload is ugly. This can be improved
// once TEZ-2672 is fixed.
Expand Down Expand Up @@ -505,7 +505,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
TerminateFragmentRequestProto request =
TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(
constructQueryIdentifierProto(
taskAttemptId.getTaskID().getVertexID().getDAGId().getId()))
taskAttemptId.getDAGID().getId()))
.setFragmentIdentifierString(taskAttemptId.toString()).build();
communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
Expand Down Expand Up @@ -649,7 +649,7 @@ private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId co

private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString,
final boolean isDone, final String nmAddress) {
String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
String dagId = attemptID.getDAGID().toString();
String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""),
"?nm.id=", nmAddress);
String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers",
Expand Down Expand Up @@ -794,7 +794,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI
builder.setAmPort(getAddress().getPort());

Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
taskSpec.getTaskAttemptID().getDAGID().getId());
ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
if (credentialsBinary == null) {
credentialsBinary = serializeCredentials(getContext().getCurrentDagInfo().getCredentials());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin
writeLock.lock();
try {
if (!dagRunning && metrics != null && id != null) {
metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
dagStats.registerTaskRequest(hosts, racks);
Expand All @@ -1099,7 +1099,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container
writeLock.lock();
try {
if (!dagRunning && metrics != null && id != null) {
metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
dagStats.registerTaskRequest(null, null);
Expand All @@ -1114,7 +1114,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container
protected TezTaskAttemptID getTaskAttemptId(Object task) {
// TODO: why does Tez API use "Object" for this?
if (task instanceof TaskAttempt) {
return ((TaskAttempt)task).getID();
return ((TaskAttempt)task).getTaskAttemptID();
}
throw new AssertionError("LLAP plugin can only schedule task attempts");
}
Expand Down Expand Up @@ -2030,7 +2030,7 @@ private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> r
continue; // Not the right host.
}
Map<Integer,Set<Integer>> depInfo = getDependencyInfo(
taskInfo.attemptId.getTaskID().getVertexID().getDAGId());
taskInfo.attemptId.getDAGID());
Set<Integer> vertexDepInfo = null;
if (depInfo != null) {
vertexDepInfo = depInfo.get(forVertex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID vertexId, int
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(
TezTaskID.getInstance(vertexId, taskIdx), 0);
doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
doReturn(taskAttemptId.getDAGID()).when(taskSpec).getDAGID();
doReturn(DAG_NAME).when(taskSpec).getDAGName();
doReturn(vertexName).when(taskSpec).getVertexName();
ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
<slf4j.version>1.7.10</slf4j.version>
<ST4.version>4.0.4</ST4.version>
<storage-api.version>2.7.0</storage-api.version>
<tez.version>0.9.1</tez.version>
<tez.version>0.10.2</tez.version>
<super-csv.version>2.2.0</super-csv.version>
<spark.version>2.3.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
Expand Down
7 changes: 7 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -761,5 +761,12 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts>
return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
}
}

@Override
public String getWebUIAddress() throws IOException, TezException {
synchronized (dagClient) {
return dagClient.getWebUIAddress();
}
}
}
}