Skip to content

Commit 429c6a8

Browse files
authored
[native] Support using custom storage access username based on source and client tags (prestodb#25827)
## Description To support using custom storage access username based on source and client tags - Pass source and client tag from session to QueryConfig and QueryCtx - Abstract functions in PrestoServer, TaskManager, and QueryTextManager ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == NO RELEASE NOTE == ```
1 parent a40ff0d commit 429c6a8

File tree

7 files changed

+57
-17
lines changed

7 files changed

+57
-17
lines changed

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,8 @@ void PrestoServer::run() {
271271
exit(EXIT_FAILURE);
272272
}
273273

274-
registerFileSinks();
275274
registerFileSystems();
275+
registerFileSinks();
276276
registerFileReadersAndWriters();
277277
registerMemoryArbitrators();
278278
registerShuffleInterfaceFactories();
@@ -434,8 +434,7 @@ void PrestoServer::run() {
434434
nativeWorkerPool_ = velox::memory::MemoryManager::getInstance()->addLeafPool(
435435
"PrestoNativeWorker");
436436

437-
taskManager_ = std::make_unique<TaskManager>(
438-
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());
437+
createTaskManager();
439438

440439
if (systemConfig->prestoNativeSidecar()) {
441440
registerSidecarEndpoints();
@@ -1739,4 +1738,9 @@ void PrestoServer::registerDynamicFunctions() {
17391738
}
17401739
}
17411740

1741+
void PrestoServer::createTaskManager() {
1742+
taskManager_ = std::make_unique<TaskManager>(
1743+
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());
1744+
}
1745+
17421746
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/PrestoServer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ class PrestoServer {
227227

228228
void checkOverload();
229229

230+
virtual void createTaskManager();
231+
230232
const std::string configDirectoryPath_;
231233

232234
std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;

presto-native-execution/presto_cpp/main/PrestoToVeloxQueryConfig.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ void updateFromSessionConfigs(
7070
}
7171
}
7272

73+
if (session.source) {
74+
queryConfigs[velox::core::QueryConfig::kSource] = *session.source;
75+
}
76+
if (!session.clientTags.empty()) {
77+
queryConfigs[velox::core::QueryConfig::kClientTags] = folly::join(',', session.clientTags);
78+
}
79+
7380
// If there's a timeZoneKey, convert to timezone name and add to the
7481
// configs. Throws if timeZoneKey can't be resolved.
7582
if (session.timeZoneKey != 0) {

presto-native-execution/presto_cpp/main/QueryContextManager.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,23 @@ void QueryContextManager::setQueryHasStartedTasks(
5959
queryContextCache_.wlock()->setHasStartedTasks(queryIdFromTaskId(taskId));
6060
}
6161

62+
std::shared_ptr<core::QueryCtx> QueryContextManager::createAndCacheQueryCtx(
63+
QueryContextCache& cache,
64+
const QueryId& queryId,
65+
velox::core::QueryConfig&& queryConfig,
66+
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>&& connectorConfigs,
67+
std::shared_ptr<memory::MemoryPool>&& pool) {
68+
auto queryCtx = core::QueryCtx::create(
69+
driverExecutor_,
70+
std::move(queryConfig),
71+
std::move(connectorConfigs),
72+
cache::AsyncDataCache::getInstance(),
73+
std::move(pool),
74+
spillerExecutor_,
75+
queryId);
76+
return cache.insert(queryId, std::move(queryCtx));
77+
}
78+
6279
std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
6380
const TaskId& taskId,
6481
velox::core::QueryConfig&& queryConfig,
@@ -90,16 +107,12 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
90107
nullptr,
91108
poolDbgOpts);
92109

93-
auto queryCtx = core::QueryCtx::create(
94-
driverExecutor_,
110+
return createAndCacheQueryCtx(
111+
*lockedCache,
112+
queryId,
95113
std::move(queryConfig),
96114
std::move(connectorConfigs),
97-
cache::AsyncDataCache::getInstance(),
98-
std::move(pool),
99-
spillerExecutor_,
100-
queryId);
101-
102-
return lockedCache->insert(queryId, std::move(queryCtx));
115+
std::move(pool));
103116
}
104117

105118
void QueryContextManager::visitAllContexts(

presto-native-execution/presto_cpp/main/QueryContextManager.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ class QueryContextManager {
123123
folly::Executor* driverExecutor,
124124
folly::Executor* spillerExecutor);
125125

126+
virtual ~QueryContextManager() = default;
127+
126128
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtx(
127129
const protocol::TaskId& taskId,
128130
const protocol::TaskUpdateRequest& taskUpdateRequest);
@@ -141,17 +143,25 @@ class QueryContextManager {
141143
/// Test method to clear the query context cache.
142144
void testingClearCache();
143145

146+
protected:
147+
folly::Executor* const driverExecutor_{nullptr};
148+
folly::Executor* const spillerExecutor_{nullptr};
149+
144150
private:
151+
virtual std::shared_ptr<velox::core::QueryCtx> createAndCacheQueryCtx(
152+
QueryContextCache& cache,
153+
const protocol::QueryId& queryId,
154+
velox::core::QueryConfig&& queryConfig,
155+
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>>&& connectorConfigs,
156+
std::shared_ptr<velox::memory::MemoryPool>&& pool);
157+
145158
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtx(
146159
const protocol::TaskId& taskId,
147160
velox::core::QueryConfig&& queryConfig,
148161
std::unordered_map<
149162
std::string,
150163
std::shared_ptr<velox::config::ConfigBase>>&& connectorConfigStrings);
151164

152-
folly::Executor* const driverExecutor_{nullptr};
153-
folly::Executor* const spillerExecutor_{nullptr};
154-
155165
folly::Synchronized<QueryContextCache> queryContextCache_;
156166
};
157167

presto-native-execution/presto_cpp/main/TaskManager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,10 +355,10 @@ TaskManager::TaskManager(
355355
folly::Executor* driverExecutor,
356356
folly::Executor* httpSrvCpuExecutor,
357357
folly::Executor* spillerExecutor)
358-
: bufferManager_(velox::exec::OutputBufferManager::getInstanceRef()),
359-
queryContextManager_(std::make_unique<QueryContextManager>(
358+
: queryContextManager_(std::make_unique<QueryContextManager>(
360359
driverExecutor,
361360
spillerExecutor)),
361+
bufferManager_(velox::exec::OutputBufferManager::getInstanceRef()),
362362
httpSrvCpuExecutor_(httpSrvCpuExecutor) {
363363
VELOX_CHECK_NOT_NULL(bufferManager_, "invalid OutputBufferManager");
364364
}

presto-native-execution/presto_cpp/main/TaskManager.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class TaskManager {
3434
folly::Executor* httpSrvExecutor,
3535
folly::Executor* spillerExecutor);
3636

37+
virtual ~TaskManager() = default;
38+
3739
/// Invoked by Presto server shutdown to wait for all the tasks to complete
3840
/// and cleanup the completed tasks.
3941
void shutdown();
@@ -195,6 +197,9 @@ class TaskManager {
195197
/// See if we have any queued tasks that can be started.
196198
void maybeStartNextQueuedTask();
197199

200+
protected:
201+
std::unique_ptr<QueryContextManager> queryContextManager_;
202+
198203
private:
199204
static constexpr folly::StringPiece kMaxDriversPerTask{
200205
"max_drivers_per_task"};
@@ -229,7 +234,6 @@ class TaskManager {
229234
std::shared_ptr<velox::exec::OutputBufferManager> bufferManager_;
230235
folly::Synchronized<TaskMap> taskMap_;
231236
folly::Synchronized<TaskQueue> taskQueue_;
232-
std::unique_ptr<QueryContextManager> queryContextManager_;
233237
folly::Executor* httpSrvCpuExecutor_;
234238
std::atomic_bool serverOverloaded_{false};
235239
std::atomic_uint32_t numQueuedDrivers_{0};

0 commit comments

Comments
 (0)