diff --git a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp index a7449ece1a30d..4a009b2a1d26d 100644 --- a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp +++ b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp @@ -109,14 +109,17 @@ std::unordered_map toVeloxConfigs( } std::unordered_map> -toConnectorConfigs(const protocol::SessionRepresentation& session) { +toConnectorConfigs(const protocol::TaskUpdateRequest& taskUpdateRequest) { std::unordered_map> connectorConfigs; - for (const auto& entry : session.catalogProperties) { - connectorConfigs.insert( - {entry.first, - std::unordered_map( - entry.second.begin(), entry.second.end())}); + for (const auto& entry : taskUpdateRequest.session.catalogProperties) { + auto sessionProperties = std::unordered_map( + entry.second.begin(), entry.second.end()); + sessionProperties.insert( + taskUpdateRequest.extraCredentials.begin(), + taskUpdateRequest.extraCredentials.end()); + sessionProperties.insert({"user", taskUpdateRequest.session.user}); + connectorConfigs.insert({entry.first, sessionProperties}); } return connectorConfigs; @@ -152,9 +155,11 @@ QueryContextManager::QueryContextManager( std::shared_ptr QueryContextManager::findOrCreateQueryCtx( const protocol::TaskId& taskId, - const protocol::SessionRepresentation& session) { + const protocol::TaskUpdateRequest& taskUpdateRequest) { return findOrCreateQueryCtx( - taskId, toVeloxConfigs(session), toConnectorConfigs(session)); + taskId, + toVeloxConfigs(taskUpdateRequest.session), + toConnectorConfigs(taskUpdateRequest)); } std::shared_ptr QueryContextManager::findOrCreateQueryCtx( diff --git a/presto-native-execution/presto_cpp/main/QueryContextManager.h b/presto-native-execution/presto_cpp/main/QueryContextManager.h index 12ac3fb29e30d..f02d1230f8490 100644 --- a/presto-native-execution/presto_cpp/main/QueryContextManager.h +++ b/presto-native-execution/presto_cpp/main/QueryContextManager.h @@ -106,7 +106,7 @@ class QueryContextManager { std::shared_ptr findOrCreateQueryCtx( const protocol::TaskId& taskId, - const protocol::SessionRepresentation& session); + const protocol::TaskUpdateRequest& taskUpdateRequest); /// Calls the given functor for every present query context. void visitAllContexts(std::functionfindOrCreateQueryCtx( - taskId, updateRequest.session); + taskId, updateRequest); VeloxBatchQueryPlanConverter converter( shuffleName, @@ -339,7 +339,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask( queryCtx = taskManager_.getQueryContextManager()->findOrCreateQueryCtx( - taskId, updateRequest.session); + taskId, updateRequest); VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool_); planFragment = converter.toVeloxQueryPlan(