diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 378017b364..c37f0c181d 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -688,14 +688,15 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException { sessionAppId.toString()), TezConstants.TEZ_PB_PLAN_BINARY_NAME + serializedSubmitDAGPlanRequestCounter.incrementAndGet()); - try (FSDataOutputStream fsDataOutputStream = stagingFs.create(dagPlanPath, false)) { + FileSystem fs = dagPlanPath.getFileSystem(stagingFs.getConf()); + try (FSDataOutputStream fsDataOutputStream = fs.create(dagPlanPath, false)) { LOG.info("Send dag plan using YARN local resources since it's too large" + ", dag plan size=" + request.getSerializedSize() + ", max dag plan size through IPC=" + maxSubmitDAGRequestSizeThroughIPC + ", max IPC message size= " + amConfig.getTezConfiguration().getInt( CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT)); request.writeTo(fsDataOutputStream); - request = requestBuilder.clear().setSerializedRequestPath(stagingFs.resolvePath(dagPlanPath).toString()).build(); + request = requestBuilder.clear().setSerializedRequestPath(fs.resolvePath(dagPlanPath).toString()).build(); } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java index 72cf0d5642..4bdb468859 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -166,7 +166,8 @@ public SubmitDAGResponseProto submitDAG(RpcController controller, if (request.hasSerializedRequestPath()) { // need to deserialize large request from hdfs Path requestPath = new Path(request.getSerializedRequestPath()); - try (FSDataInputStream fsDataInputStream = stagingFs.open(requestPath)) { + FileSystem fs = requestPath.getFileSystem(stagingFs.getConf()); + try (FSDataInputStream fsDataInputStream = fs.open(requestPath)) { CodedInputStream in = CodedInputStream.newInstance(fsDataInputStream); in.setSizeLimit(Integer.MAX_VALUE); @@ -183,7 +184,7 @@ public SubmitDAGResponseProto submitDAG(RpcController controller, } String dagId = real.submitDAG(dagPlan, additionalResources); return SubmitDAGResponseProto.newBuilder().setDagId(dagId).build(); - } catch(TezException e) { + } catch(IOException | TezException e) { throw wrapException(e); } }