Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SR-4073] Query making no progress for a long time is expired #32

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ void ExecStateReporter::submit(FragmentContext* fragment_ctx, const Status& stat
}
if (clean) {
auto query_id = fragment_ctx->query_id();
FragmentContextManager::instance()->unregister(fragment_ctx->fragment_instance_id());
auto* query_ctx = QueryContextManager::instance()->get_raw(query_id);
DCHECK(query_ctx != nullptr);
auto&& query_ctx = QueryContextManager::instance()->get(query_id);
DCHECK(query_ctx);
query_ctx->fragment_mgr()->unregister(fragment_ctx->fragment_instance_id());
if (query_ctx->count_down_fragment()) {
QueryContextManager::instance()->unregister(query_id);
}
Expand Down
13 changes: 9 additions & 4 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
namespace starrocks {
namespace pipeline {

FragmentContextManager::FragmentContextManager() {}
FragmentContextManager::~FragmentContextManager() {}
FragmentContext* FragmentContextManager::get_or_register(const TUniqueId& fragment_id) {
std::lock_guard<std::mutex> lock(_lock);
auto it = _fragment_contexts.find(fragment_id);
Expand All @@ -19,11 +17,11 @@ FragmentContext* FragmentContextManager::get_or_register(const TUniqueId& fragme
}
}

FragmentContext* FragmentContextManager::get(const TUniqueId& fragment_id) {
FragmentContextPtr FragmentContextManager::get(const TUniqueId& fragment_id) {
std::lock_guard<std::mutex> lock(_lock);
auto it = _fragment_contexts.find(fragment_id);
if (it != _fragment_contexts.end()) {
return it->second.get();
return it->second;
} else {
return nullptr;
}
Expand All @@ -34,5 +32,12 @@ void FragmentContextManager::unregister(const TUniqueId& fragment_id) {
_fragment_contexts.erase(fragment_id);
}

void FragmentContextManager::cancel(const Status& status) {
std::lock_guard<std::mutex> lock(_lock);
for (auto ctx_it = _fragment_contexts.begin(); ctx_it != _fragment_contexts.end(); ++ctx_it) {
ctx_it->second->cancel(status);
}
}

} // namespace pipeline
} // namespace starrocks
13 changes: 10 additions & 3 deletions be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,19 @@ class FragmentContext {
std::atomic<bool> _cancel_flag;
};
class FragmentContextManager {
DECLARE_SINGLETON(FragmentContextManager);

public:
FragmentContextManager() = default;
~FragmentContextManager() = default;

FragmentContextManager(const FragmentContextManager&) = delete;
FragmentContextManager(FragmentContextManager&&) = delete;
FragmentContextManager& operator=(const FragmentContextManager&) = delete;
FragmentContextManager& operator=(FragmentContextManager&&) = delete;

FragmentContext* get_or_register(const TUniqueId& fragment_id);
FragmentContext* get(const TUniqueId& fragment_id);
FragmentContextPtr get(const TUniqueId& fragment_id);
void unregister(const TUniqueId& fragment_id);
void cancel(const Status& status);

private:
std::mutex _lock;
Expand Down
24 changes: 20 additions & 4 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "exec/pipeline/exchange/exchange_sink_operator.h"
#include "exec/pipeline/exchange/local_exchange_source_operator.h"
#include "exec/pipeline/exchange/sink_buffer.h"
#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/morsel.h"
#include "exec/pipeline/pipeline_builder.h"
#include "exec/pipeline/result_sink_operator.h"
Expand Down Expand Up @@ -35,13 +36,28 @@ Morsels convert_scan_range_to_morsel(const std::vector<TScanRangeParams>& scan_r

Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParams& request) {
const TPlanFragmentExecParams& params = request.params;
auto& query_id = params.query_id;
auto& fragment_id = params.fragment_instance_id;
const auto& query_id = params.query_id;
const auto& fragment_id = params.fragment_instance_id;

// prevent an identical fragment instance from multiple execution
auto&& existing_query_ctx = QueryContextManager::instance()->get(query_id);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we could prepare the same fragment instance multi time?

if (existing_query_ctx) {
auto&& existing_fragment_ctx = existing_query_ctx->fragment_mgr()->get(fragment_id);
if (existing_fragment_ctx) {
return Status::OK();
}
}

_query_ctx = QueryContextManager::instance()->get_or_register(query_id);
if (params.__isset.instances_number) {
_query_ctx->set_num_fragments(params.instances_number);
_query_ctx->set_total_fragments(params.instances_number);
}
if (request.query_options.__isset.pipeline_query_expire_seconds) {
_query_ctx->set_expire_seconds(std::max<int>(request.query_options.pipeline_query_expire_seconds, 1));
} else {
_query_ctx->set_expire_seconds(300);
}
_fragment_ctx = FragmentContextManager::instance()->get_or_register(fragment_id);
_fragment_ctx = _query_ctx->fragment_mgr()->get_or_register(fragment_id);
_fragment_ctx->set_query_id(query_id);
_fragment_ctx->set_fragment_instance_id(fragment_id);
_fragment_ctx->set_fe_addr(request.coord);
Expand Down
11 changes: 9 additions & 2 deletions be/src/exec/pipeline/pipeline_driver_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void GlobalDriverDispatcher::run() {
size_t queue_index;
auto driver = this->_driver_queue->take(&queue_index);
DCHECK(driver != nullptr);
auto* query_ctx = driver->query_ctx();
auto* fragment_ctx = driver->fragment_ctx();
auto* runtime_state = fragment_ctx->runtime_state();

Expand All @@ -51,13 +52,19 @@ void GlobalDriverDispatcher::run() {
}
continue;
}

// a blocked driver is canceled because of fragment cancellation or query expiration.
if (driver->is_finished()) {
driver->finalize(runtime_state, driver->driver_state());
continue;
}
// query context has ready drivers to run, so extend its lifetime.
query_ctx->extend_lifetime();
satanson marked this conversation as resolved.
Show resolved Hide resolved
auto status = driver->process(runtime_state);
this->_driver_queue->get_sub_queue(queue_index)->update_accu_time(driver);

if (!status.ok()) {
VLOG_ROW << "[Driver] Process error: error=" << status.status().to_string();
fragment_ctx->cancel(status.status());
query_ctx->cancel(status.status());
if (driver->source_operator()->pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
_blocked_driver_poller->add_blocked_driver(driver);
Expand Down
32 changes: 25 additions & 7 deletions be/src/exec/pipeline/pipeline_driver_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,37 @@ void PipelineDriverPoller::run_internal() {
auto driver_it = local_blocked_drivers.begin();
while (driver_it != local_blocked_drivers.end()) {
auto& driver = *driver_it;
// driver->pending_finish() return true means that when a driver's sink operator is finished,
// but its source operator still has pending io task that executed in io threads and has
// reference to object outside(such as desc_tbl) owned by FragmentContext. So an driver in
// PENDING_FINISH state should should wait for pending io task's completion, then turn into
// FINISH state, otherwise, pending tasks shall reference to destructed objects in
// FragmentContext since FragmentContext is unregistered prematurely.

if (driver->pending_finish() && !driver->source_operator()->pending_finish()) {
// driver->pending_finish() return true means that when a driver's sink operator is finished,
// but its source operator still has pending io task that executed in io threads and has
// reference to object outside(such as desc_tbl) owned by FragmentContext. So an driver in
// PENDING_FINISH state should wait for pending io task's completion, then turn into FINISH state,
// otherwise, pending tasks shall reference to destructed objects in FragmentContext since
// FragmentContext is unregistered prematurely.
driver->set_driver_state(DriverState::FINISH);
_dispatch_queue->put_back(*driver_it);
local_blocked_drivers.erase(driver_it++);
} else if (driver->is_finished()) {
local_blocked_drivers.erase(driver_it++);
} else if (driver->fragment_ctx()->is_canceled() || driver->is_not_blocked()) {
} else if (driver->query_ctx()->is_expired()) {
// there are not any drivers belonging to a query context can make progress for a expiration period
// indicates that some fragments are missing because of failed exec_plan_fragment invocation. in
// this situation, query is failed finally, so drivers are marked PENDING_FINISH/FINISH.
if (driver->source_operator()->pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
++driver_it;
} else {
driver->set_driver_state(DriverState::FINISH);
_dispatch_queue->put_back(*driver_it);
local_blocked_drivers.erase(driver_it++);
}
} else if (driver->fragment_ctx()->is_canceled()) {
driver->set_driver_state(DriverState::CANCELED);
_dispatch_queue->put_back(*driver_it);
local_blocked_drivers.erase(driver_it++);
} else if (driver->is_not_blocked()) {
driver->set_driver_state(DriverState::READY);
_dispatch_queue->put_back(*driver_it);
local_blocked_drivers.erase(driver_it++);
} else {
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/pipeline_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class QueryContext;
using QueryContextPtr = std::shared_ptr<QueryContext>;
class FragmentContext;
using FragmentContextPtr = std::shared_ptr<FragmentContext>;
class FragmentContextManager;
using FragmentContextManagerPtr = std::unique_ptr<FragmentContextManager>;
class FragmentExecutor;
class Pipeline;
using PipelinePtr = std::shared_ptr<Pipeline>;
Expand Down
26 changes: 16 additions & 10 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
@@ -1,32 +1,38 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.
#include "exec/pipeline/query_context.h"

#include "exec/pipeline/fragment_context.h"

namespace starrocks {
namespace pipeline {
QueryContext::QueryContext()
: _fragment_mgr(new FragmentContextManager()), _num_fragments(0), _num_active_fragments(0) {}

FragmentContextManager* QueryContext::fragment_mgr() {
return _fragment_mgr.get();
}

void QueryContext::cancel(const Status& status) {
_fragment_mgr->cancel(status);
}

QueryContextManager::QueryContextManager() {}
QueryContextManager::~QueryContextManager() {}
QueryContext* QueryContextManager::get_or_register(const TUniqueId& query_id) {
std::lock_guard lock(_lock);
auto iter = _contexts.find(query_id);
if (iter != _contexts.end()) {
iter->second->increment_num_fragments();
return iter->second.get();
}

auto&& ctx = std::make_unique<QueryContext>();
auto* ctx_raw_ptr = ctx.get();
ctx_raw_ptr->increment_num_fragments();
_contexts.emplace(query_id, std::move(ctx));
return ctx_raw_ptr;
}

QueryContext* QueryContextManager::get_raw(const TUniqueId& query_id) {
std::lock_guard lock(_lock);
auto it = _contexts.find(query_id);
if (it != _contexts.end()) {
return it->second.get();
} else {
return nullptr;
}
}

QueryContextPtr QueryContextManager::get(const TUniqueId& query_id) {
std::lock_guard lock(_lock);
auto it = _contexts.find(query_id);
Expand Down
46 changes: 36 additions & 10 deletions be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
#pragma once

#include <atomic>
#include <chrono>
#include <mutex>
#include <unordered_map>

#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "gen_cpp/InternalService_types.h" // for TQueryOptions
#include "gen_cpp/Types_types.h" // for TUniqueId
Expand All @@ -17,36 +19,60 @@ namespace starrocks {
class MemTracker;
namespace pipeline {

using std::chrono::seconds;
using std::chrono::milliseconds;
using std::chrono::steady_clock;
using std::chrono::duration_cast;
// The context for all fragment of one query in one BE
class QueryContext {
public:
QueryContext() : _num_fragments_initialized(false), _num_fragments(0) {}
QueryContext();
RuntimeState* get_runtime_state() { return _runtime_state.get(); }
void set_num_fragments(size_t num_fragments) {
bool old_value = false;
if (_num_fragments_initialized.compare_exchange_strong(old_value, true)) {
_num_fragments.store(num_fragments);
}
void set_total_fragments(size_t total_fragments) { _total_fragments = total_fragments; }

void increment_num_fragments() {
_num_fragments.fetch_add(1);
_num_active_fragments.fetch_add(1);
}

bool count_down_fragment() { return _num_active_fragments.fetch_sub(1) == 1; }

void set_expire_seconds(int expire_seconds) { _expire_seconds = seconds(expire_seconds); }

// now time point pass by deadline point.
bool is_expired() {
auto now = duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
return now > _deadline;
}
bool count_down_fragment() { return _num_fragments.fetch_sub(1) == 1; }

// add expired seconds to deadline
void extend_lifetime() {
_deadline = duration_cast<milliseconds>(steady_clock::now().time_since_epoch() + _expire_seconds).count();
}

FragmentContextManager* fragment_mgr();

void cancel(const Status& status);

private:
std::unique_ptr<RuntimeState> _runtime_state;
std::shared_ptr<RuntimeProfile> _runtime_profile;
std::unique_ptr<MemTracker> _mem_tracker;
TQueryOptions _query_options;
TUniqueId _query_id;
std::atomic<bool> _num_fragments_initialized;
std::unique_ptr<FragmentContextManager> _fragment_mgr;
size_t _total_fragments;
std::atomic<size_t> _num_fragments;
std::atomic<size_t> _num_active_fragments;
int64_t _deadline;
seconds _expire_seconds;
};

class QueryContextManager {
DECLARE_SINGLETON(QueryContextManager);

public:
QueryContext* get_or_register(const TUniqueId& query_id);

QueryContext* get_raw(const TUniqueId& query_id);
QueryContextPtr get(const TUniqueId& query_id);
void unregister(const TUniqueId& query_id);

Expand Down
24 changes: 21 additions & 3 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/fragment_executor.h"
#include "gen_cpp/BackendService.h"
#include "gutil/strings/substitute.h"
#include "runtime/buffer_control_block.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -279,9 +280,26 @@ void PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll
LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid) << ", reason: " << reason_string;

if (request->has_is_pipeline() && request->is_pipeline()) {
auto fragment_ctx = starrocks::pipeline::FragmentContextManager::instance()->get(tid);
if (fragment_ctx == nullptr) {
LOG(INFO) << "Fragment context already destroyed: fragment_instance_id=" << print_id(tid);
TUniqueId query_id;
if (!request->has_query_id()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When upgrading StarRocKs, we firstly upgrade BE, then upgrade FE. So this check is invalid.

LOG(WARNING) << "cancel_plan_fragment must provide query_id in request, upgrade FE";
st = Status::NotSupported("cancel_plan_fragment must provide query_id in request, upgrade FE");
st.to_protobuf(result->mutable_status());
return;
}
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
auto&& query_ctx = starrocks::pipeline::QueryContextManager::instance()->get(query_id);
if (!query_ctx) {
LOG(INFO) << strings::Substitute("QueryContext already destroyed: query_id=$0, fragment_instance_id=$1",
print_id(query_id), print_id(tid));
st.to_protobuf(result->mutable_status());
return;
}
auto&& fragment_ctx = query_ctx->fragment_mgr()->get(tid);
if (!fragment_ctx) {
LOG(INFO) << strings::Substitute("FragmentContext already destroyed: query_id=$0, fragment_instance_id=$1",
print_id(query_id), print_id(tid));
} else {
fragment_ctx->cancel(Status::Cancelled(reason_string));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

public static final String PIPELINE_SCAN_MODE = "pipeline_scan_mode";

public static final String PIPELINE_QUERY_EXPIRE_SECONDS = "pipeline_query_expire_seconds";

// vectorized insert flag
public static final String ENABLE_VECTORIZED_INSERT = "enable_vectorized_insert";

Expand Down Expand Up @@ -320,6 +322,11 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
@VariableMgr.VarAttr(name = PIPELINE_SCAN_MODE)
private int pipelineScanMode = 1;

// a query that can not make any progress for more than pipelineQueryExpireSeconds
// (300s in default) will be canceled.
@VariableMgr.VarAttr(name = PIPELINE_QUERY_EXPIRE_SECONDS)
private int pipelineQueryExpireSeconds = 300;

@VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT)
private boolean enableInsertStrict = true;

Expand Down Expand Up @@ -779,6 +786,7 @@ public TQueryOptions toThrift() {
tResult.setRuntime_filter_send_timeout_ms(global_runtime_filter_rpc_timeout);
tResult.setQuery_threads(pipelineQueryThreads);
tResult.setPipeline_scan_mode(pipelineScanMode);
tResult.setPipeline_query_expire_seconds(pipelineQueryExpireSeconds);
return tResult;
}

Expand Down
Loading