diff --git a/be/src/exec/pipeline/pipeline_driver_executor.cpp b/be/src/exec/pipeline/pipeline_driver_executor.cpp index 0d1b700915076..134583a367efe 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.cpp +++ b/be/src/exec/pipeline/pipeline_driver_executor.cpp @@ -207,6 +207,12 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo _update_profile_by_level(query_ctx, fragment_ctx, done); auto params = ExecStateReporter::create_report_exec_status_params(query_ctx, fragment_ctx, status, done); auto fe_addr = fragment_ctx->fe_addr(); + if (fe_addr.hostname.empty()) { + // query executed by external connectors, like spark and flink connector, + // does not need to report exec state to FE, so return if fe addr is empty. + return; + } + auto exec_env = fragment_ctx->runtime_state()->exec_env(); auto fragment_id = fragment_ctx->fragment_instance_id(); diff --git a/be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp b/be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp index 00dad21c764ac..5b71e165979bb 100644 --- a/be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp +++ b/be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp @@ -48,6 +48,7 @@ Status MemoryScratchSinkOperator::set_cancelled(RuntimeState* state) { _pending_result.reset(); _is_finished = true; _has_put_sentinel = true; + _queue->update_status(Status::Cancelled("Set cancelled by MemoryScratchSinkOperator")); return Status::OK(); } @@ -60,8 +61,12 @@ Status MemoryScratchSinkOperator::push_chunk(RuntimeState* state, const vectoriz return Status::OK(); } std::shared_ptr result; - RETURN_IF_ERROR(convert_chunk_to_arrow_batch(chunk.get(), _output_expr_ctxs, _arrow_schema, - arrow::default_memory_pool(), &result)); + auto status = convert_chunk_to_arrow_batch(chunk.get(), _output_expr_ctxs, _arrow_schema, + arrow::default_memory_pool(), &result); + if (!status.ok()) { + _queue->update_status(status); + return status; + } if (!_queue->try_put(result)) { DCHECK(_pending_result == nullptr); diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp index 9925979922d80..c1a1750ba8b03 100644 --- a/be/src/runtime/external_scan_context_mgr.cpp +++ b/be/src/runtime/external_scan_context_mgr.cpp @@ -94,19 +94,25 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) context = iter->second; if (context == nullptr) { _active_contexts.erase(context_id); - Status::OK(); + return Status::OK(); } iter = _active_contexts.erase(iter); } } if (context != nullptr) { // cancel pipeline + const auto& fragment_instance_id = context->fragment_instance_id; if (auto query_ctx = _exec_env->query_context_mgr()->get(context->query_id); query_ctx != nullptr) { - query_ctx->cancel(Status::Cancelled("user cancelled")); + if (auto fragment_ctx = query_ctx->fragment_mgr()->get(fragment_instance_id); fragment_ctx != nullptr) { + std::stringstream msg; + msg << "FragmentContext(id=" << print_id(fragment_instance_id) << ") cancelled by close_scanner"; + fragment_ctx->cancel(Status::Cancelled(msg.str())); + } } // clear the fragment instance's related result queue - _exec_env->result_queue_mgr()->cancel(context->fragment_instance_id); - LOG(INFO) << "close scan context: context id [ " << context_id << " ]"; + _exec_env->result_queue_mgr()->cancel(fragment_instance_id); + LOG(INFO) << "close scan context: context id [ " << context_id << " ], fragment instance id [ " + << print_id(fragment_instance_id) << " ]"; } return Status::OK(); }