Skip to content

Commit

Permalink
Merge branch 'branch-3.0' into branch-3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sevev authored Aug 21, 2023
2 parents b221025 + 1d3bd8d commit 3e3b0e3
Show file tree
Hide file tree
Showing 33 changed files with 333 additions and 73 deletions.
6 changes: 4 additions & 2 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,10 @@ void AgentServer::Impl::submit_tasks(TAgentResult& agent_result, const std::vect
for (auto* task : all_tasks) { \
auto pool = get_thread_pool(t_task_type); \
auto signature = task->signature; \
if (register_task_info(task_type, signature)) { \
LOG(INFO) << "Submit task success. type=" << t_task_type << ", signature=" << signature; \
std::pair<bool, size_t> register_pair = register_task_info(task_type, signature); \
if (register_pair.first) { \
LOG(INFO) << "Submit task success. type=" << t_task_type << ", signature=" << signature \
<< ", task_count_in_queue=" << register_pair.second; \
ret_st = pool->submit_func( \
std::bind(do_func, std::make_shared<AGENT_REQ>(*task, task->request, time(nullptr)), env)); \
} else { \
Expand Down
4 changes: 3 additions & 1 deletion be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ static void unify_finish_agent_task(TStatusCode::type status_code, const std::ve
finish_task_request.__set_task_status(task_status);

finish_task(finish_task_request);
remove_task_info(task_type, signature);
size_t task_queue_size = remove_task_info(task_type, signature);
LOG(INFO) << "Remove task success. type=" << task_type << ", signature=" << signature
<< ", task_count_in_queue=" << task_queue_size;
}

void run_drop_tablet_task(const std::shared_ptr<DropTabletAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
Expand Down
9 changes: 6 additions & 3 deletions be/src/agent/task_singatures_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ namespace starrocks {
static std::mutex g_task_signatures_locks[TTaskType::type::NUM_TASK_TYPE];
static std::set<int64_t> g_task_signatures[TTaskType::type::NUM_TASK_TYPE];

bool register_task_info(TTaskType::type task_type, int64_t signature) {
std::pair<bool, size_t> register_task_info(TTaskType::type task_type, int64_t signature) {
std::lock_guard task_signatures_lock(g_task_signatures_locks[task_type]);
std::set<int64_t>& signature_set = g_task_signatures[task_type];
return signature_set.insert(signature).second;
bool register_success = signature_set.insert(signature).second;
size_t task_count = signature_set.size();
return std::make_pair(register_success, task_count);
}

std::vector<uint8_t> batch_register_task_info(const std::vector<const TAgentTaskRequest*>& tasks) {
Expand All @@ -44,9 +46,10 @@ std::vector<uint8_t> batch_register_task_info(const std::vector<const TAgentTask
return failed_task;
}

void remove_task_info(TTaskType::type task_type, int64_t signature) {
size_t remove_task_info(TTaskType::type task_type, int64_t signature) {
std::lock_guard task_signatures_lock(g_task_signatures_locks[task_type]);
g_task_signatures[task_type].erase(signature);
return g_task_signatures[task_type].size();
}

std::map<TTaskType::type, std::set<int64_t>> count_all_tasks() {
Expand Down
4 changes: 2 additions & 2 deletions be/src/agent/task_singatures_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace starrocks {

bool register_task_info(TTaskType::type task_type, int64_t signature);
std::pair<bool, size_t> register_task_info(TTaskType::type task_type, int64_t signature);
std::vector<uint8_t> batch_register_task_info(const std::vector<const TAgentTaskRequest*>& tasks);
void remove_task_info(TTaskType::type task_type, int64_t signature);
size_t remove_task_info(TTaskType::type task_type, int64_t signature);
std::map<TTaskType::type, std::set<int64_t>> count_all_tasks();

} // namespace starrocks
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ void TaskWorkerPool<AgentTaskRequest>::submit_task(const TAgentTaskRequest& task
std::string type_str;
EnumToString(TTaskType, task_type, type_str);

if (register_task_info(task_type, signature)) {
std::pair<bool, size_t> register_pair = register_task_info(task_type, signature);
if (register_pair.first) {
// Set the receiving time of task so that we can determine whether it is timed out later
auto new_task = _convert_task(task, time(nullptr));
size_t task_count = _push_task(std::move(new_task));
Expand Down
31 changes: 20 additions & 11 deletions be/src/exec/jni_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,37 @@ void JniScanner::do_update_counter(HdfsScanProfile* profile) {}

void JniScanner::do_close(RuntimeState* runtime_state) noexcept {
JNIEnv* _jni_env = JVMFunctionHelper::getInstance().getEnv();
_jni_env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
_check_jni_exception(_jni_env, "Failed to close the off-heap table scanner.");
_jni_env->DeleteLocalRef(_jni_scanner_obj);
_jni_env->DeleteLocalRef(_jni_scanner_cls);
if (_jni_scanner_obj != nullptr) {
if (_jni_scanner_close != nullptr) {
_jni_env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
}
_jni_env->DeleteLocalRef(_jni_scanner_obj);
_jni_scanner_obj = nullptr;
}
if (_jni_scanner_cls != nullptr) {
_jni_env->DeleteLocalRef(_jni_scanner_cls);
_jni_scanner_cls = nullptr;
}
}

void JniScanner::_init_profile(const HdfsScannerParams& scanner_params) {}

Status JniScanner::_init_jni_method(JNIEnv* _jni_env) {
// init jmethod
_jni_scanner_open = _jni_env->GetMethodID(_jni_scanner_cls, "open", "()V");
DCHECK(_jni_scanner_open != nullptr);
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `open` jni method"));

_jni_scanner_get_next_chunk = _jni_env->GetMethodID(_jni_scanner_cls, "getNextOffHeapChunk", "()J");
DCHECK(_jni_scanner_get_next_chunk != nullptr);
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `getNextOffHeapChunk` jni method"));

_jni_scanner_close = _jni_env->GetMethodID(_jni_scanner_cls, "close", "()V");
DCHECK(_jni_scanner_close != nullptr);
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `close` jni method"));

_jni_scanner_release_column = _jni_env->GetMethodID(_jni_scanner_cls, "releaseOffHeapColumnVector", "(I)V");
DCHECK(_jni_scanner_release_column != nullptr);
_jni_scanner_release_table = _jni_env->GetMethodID(_jni_scanner_cls, "releaseOffHeapTable", "()V");
DCHECK(_jni_scanner_release_table != nullptr);
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to init off-heap table jni methods."));
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `releaseOffHeapColumnVector` jni method"));

_jni_scanner_release_table = _jni_env->GetMethodID(_jni_scanner_cls, "releaseOffHeapTable", "()V");
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `releaseOffHeapTable` jni method"));
return Status::OK();
}

Expand Down
14 changes: 7 additions & 7 deletions be/src/exec/jni_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ class JniScanner : public HdfsScanner {

Status _release_off_heap_table(JNIEnv* _jni_env);

jclass _jni_scanner_cls;
jobject _jni_scanner_obj;
jmethodID _jni_scanner_open;
jmethodID _jni_scanner_get_next_chunk;
jmethodID _jni_scanner_close;
jmethodID _jni_scanner_release_column;
jmethodID _jni_scanner_release_table;
jclass _jni_scanner_cls = nullptr;
jobject _jni_scanner_obj = nullptr;
jmethodID _jni_scanner_open = nullptr;
jmethodID _jni_scanner_get_next_chunk = nullptr;
jmethodID _jni_scanner_close = nullptr;
jmethodID _jni_scanner_release_column = nullptr;
jmethodID _jni_scanner_release_table = nullptr;

std::map<std::string, std::string> _jni_scanner_params;
std::string _jni_scanner_factory_class;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/query_cache/cache_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ void CacheOperator::_update_probe_metrics(int64_t tablet_id, const std::vector<C
num_rows += chunk->num_rows();
}
_cache_probe_bytes_counter->update(num_bytes);
_cache_probe_chunks_counter->update(num_rows);
_cache_probe_chunks_counter->update(chunks.size());
_cache_probe_rows_counter->update(num_rows);
_probe_tablets.insert(tablet_id);
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/exprs/table_function/java_udtf_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "udf/java/java_data_converter.h"
#include "udf/java/java_udf.h"
#include "udf/java/utils.h"
#include "util/defer_op.h"

namespace starrocks {

Expand Down Expand Up @@ -135,6 +136,14 @@ std::pair<Columns, ColumnPtr> JavaUDTFFunction::process(TableFunctionState* stat

std::vector<jvalue> call_stack;
std::vector<jobject> rets;
DeferOp defer = DeferOp([&]() {
// clean up arrays
for (auto& ret : rets) {
if (ret) {
env->DeleteLocalRef(ret);
}
}
});
size_t num_rows = cols[0]->size();
size_t num_cols = cols.size();

Expand Down Expand Up @@ -173,6 +182,7 @@ std::pair<Columns, ColumnPtr> JavaUDTFFunction::process(TableFunctionState* stat
// update for col
for (int j = 0; j < len; ++j) {
jobject vi = env->GetObjectArrayElement((jobjectArray)rets[i], j);
LOCAL_REF_GUARD_ENV(env, vi);
append_jvalue(method_desc, col.get(), {.l = vi});
release_jvalue(method_desc.is_box, {.l = vi});
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Status StorageEngine::start_bg_threads() {
Thread::set_thread_name(_update_cache_expire_thread, "cache_expire");

_update_cache_evict_thread = std::thread([this] { _update_cache_evict_thread_callback(nullptr); });
Thread::set_thread_name(_update_cache_expire_thread, "evict_update_cache");
Thread::set_thread_name(_update_cache_evict_thread, "evict_update_cache");

_unused_rowset_monitor_thread = std::thread([this] { _unused_rowset_monitor_thread_callback(nullptr); });
Thread::set_thread_name(_unused_rowset_monitor_thread, "rowset_monitor");
Expand Down
10 changes: 5 additions & 5 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3920,16 +3920,16 @@ Status PersistentIndex::_minor_compaction(PersistentIndexMetaPB* index_meta) {
const std::string new_l1_filename =
strings::Substitute("$0/index.l1.$1.$2", _path, _version.major(), _version.minor());
const size_t tmp_l1_cnt = _get_tmp_l1_count();
// maybe need to append wal in 1.a
bool need_append_wal = false;
// maybe need to dump snapshot in 1.a
bool need_snapshot = false;
if (tmp_l1_cnt == 1) {
// step 1.a
// move tmp l1 to l1
std::string tmp_l1_filename = _l1_vec[_has_l1 ? 1 : 0]->filename();
RETURN_IF_ERROR(FileSystem::Default()->link_file(tmp_l1_filename, new_l1_filename));
if (_l0->size() > 0) {
// check if need to append wal
need_append_wal = true;
// check if need to dump snapshot
need_snapshot = true;
}
LOG(INFO) << "PersistentIndex minor compaction, link from tmp-l1: " << tmp_l1_filename
<< " to l1: " << new_l1_filename;
Expand Down Expand Up @@ -3978,7 +3978,7 @@ Status PersistentIndex::_minor_compaction(PersistentIndexMetaPB* index_meta) {
_version.to_pb(index_meta->mutable_version());
_version.to_pb(index_meta->mutable_l1_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
RETURN_IF_ERROR(_l0->commit(l0_meta, _version, need_append_wal ? kAppendWAL : kFlush));
RETURN_IF_ERROR(_l0->commit(l0_meta, _version, need_snapshot ? kSnapshot : kFlush));
return Status::OK();
}

Expand Down
10 changes: 9 additions & 1 deletion be/test/storage/persistent_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@ RowsetSharedPtr create_rowset(const TabletSharedPtr& tablet, const vector<int64_

void build_persistent_index_from_tablet(size_t N) {
FileSystem* fs = FileSystem::Default();
const std::string kPersistentIndexDir = "./persistent_index_test";
const std::string kPersistentIndexDir = "./persistent_index_test_build_from_tablet";
bool created;
ASSERT_OK(fs->create_dir_if_missing(kPersistentIndexDir, &created));

Expand Down Expand Up @@ -1442,12 +1442,20 @@ void build_persistent_index_from_tablet(size_t N) {
}

PARALLEL_TEST(PersistentIndexTest, test_build_from_tablet) {
auto manager = StorageEngine::instance()->update_manager();
config::l0_max_mem_usage = 104857600;
manager->mem_tracker()->set_limit(-1);
// dump snapshot
build_persistent_index_from_tablet(100000);
// write wal
build_persistent_index_from_tablet(250000);
// flush l1
config::l0_max_mem_usage = 1000000;
build_persistent_index_from_tablet(1000000);
// flush one tmp l1
config::l0_max_mem_usage = 18874368;
build_persistent_index_from_tablet(1000000);
config::l0_max_mem_usage = 104857600;
}

PARALLEL_TEST(PersistentIndexTest, test_fixlen_replace) {
Expand Down
18 changes: 12 additions & 6 deletions docs/loading/load_concept/strict_mode.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,26 @@ This topic introduces what strict mode is and how to set strict mode.

## Understand strict mode

If the original data type of a source column, or the new data type of a source column upon function computation, differs from the data type of the matching destination column, StarRocks converts the source column values to the destination data type during loading. Source column values that fail to be converted are processed into `NULL` values, which are called "error data." Rows that contain such error data are called "error rows."
During data loading, the data types of the source columns may not be completely consistent with the data types of the destination columns. In such cases, StarRocks performs conversions on the source column values that have inconsistent data types. Data conversions may fail due to various issues such as unmatched field data types and field length overflows. Source column values that fail to be properly converted are unqualified column values, and source rows that contain unqualified column values are referred to as "unqualified rows". Strict mode is used to control whether to filter out unqualified rows during data loading.

Strict mode works as follows:

- If strict mode is enabled, StarRocks loads only qualified rows. It filters out error rows and returns details about the error rows.
- If strict mode is disabled, StarRocks loads qualified rows together with error rows.
- If strict mode is enabled, StarRocks loads only qualified rows. It filters out unqualified rows and returns details about the unqualified rows.
- If strict mode is disabled, StarRocks converts unqualified column values into `NULL` and loads unqualified rows that contain these `NULL` values together with qualified rows.

Note the following points:

- In actual business scenarios, both qualified and unqualified rows may contain `NULL` values. If the destination columns do not allow `NULL` values, StarRocks reports errors and filters out the rows that contain `NULL` values.

- The maximum percentage of unqualified rows that can be filtered out for a [Stream Load](../../sql-reference/sql-statements/data-manipulation/STREAM%20LOAD.md), [Broker Load](../../sql-reference/sql-statements/data-manipulation/BROKER%20LOAD.md), [Routine Load](../../sql-reference/sql-statements/data-manipulation/CREATE%20ROUTINE%20LOAD.md), or [Spark Load](../../sql-reference/sql-statements/data-manipulation/SPARK%20LOAD.md) job is controlled by an optional job property `max_filter_ratio`. [INSERT](../../sql-reference/sql-statements/data-manipulation/insert.md) does not support setting the `max_filter_ratio` property.

For example, you want to load four rows that hold `\N` (`\N` denotes a `NULL` value), `abc`, `2000`, and `1` values respectively in a column from a CSV-formatted data file into a StarRocks table, and the data type of the destination StarRocks table column is TINYINT [-128, 127].

- The source column value `\N` is processed into `NULL` upon conversion to TINYINT.

- > **NOTE**
>
> `\N` is always processed into `NULL` upon conversion regardless of the destination data type.
> **NOTE**
>
> `\N` is always processed into `NULL` upon conversion regardless of the destination data type.
- The source column value `abc` is processed into `NULL`, because its data type is not TINYINT and the conversion fails.

Expand Down
2 changes: 1 addition & 1 deletion docs/using_starrocks/Materialized_view.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ GROUP BY order_id;

## Manually refresh an asynchronous materialized view

You can refresh an asynchronous materialized view regardless of its refreshing strategy via [REFRESH MATERIALIZED VIEW](../sql-reference/sql-statements/data-manipulation/REFRESH%20MATERIALIZED%20VIEW.md). StarRocks v2.5 supports refreshing specific partitions of an asynchronous materialized view by specifying partition names. StarRocks v3.1 supports making a synchronous call of the refresh task, and the SQL statement is returned only when the task succeeds or fails.
You can refresh an asynchronous materialized view regardless of its refreshing strategy via [REFRESH MATERIALIZED VIEW](../sql-reference/sql-statements/data-manipulation/REFRESH%20MATERIALIZED%20VIEW.md). StarRocks v2.5 supports refreshing specific partitions of an asynchronous materialized view by specifying partition names. From v3.0.4 onwards, StarRocks supports making a synchronous call of the refresh task, and the SQL statement is returned only when the task succeeds or fails.

```SQL
-- Refresh the materialized view via an asynchronous call (default).
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -695,12 +695,12 @@ under the License.
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starclient</artifactId>
<version>1.0.2</version>
<version>1.0.7</version>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starmanager</artifactId>
<version>1.0.2</version>
<version>1.0.7</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-api -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ private void analyzeProperties() throws AnalysisException {
if (!isCsvFormat()) {
throw new AnalysisException(PROP_COLUMN_SEPARATOR + " is only for CSV format");
}
columnSeparator = properties.get(PROP_COLUMN_SEPARATOR);
columnSeparator = Delimiter.convertDelimiter(properties.get(PROP_COLUMN_SEPARATOR));
}

if (properties.containsKey(PROP_LINE_DELIMITER)) {
if (!isCsvFormat()) {
throw new AnalysisException(PROP_LINE_DELIMITER + " is only for CSV format");
}
rowDelimiter = properties.get(PROP_LINE_DELIMITER);
rowDelimiter = Delimiter.convertDelimiter(properties.get(PROP_LINE_DELIMITER));
}

if (properties.containsKey(PARQUET_COMPRESSION_TYPE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public String getProperty(String propertyKey) {
protected void getProcNodeData(BaseProcResult result) {
String lowerCaseType = type.name().toLowerCase();
for (Map.Entry<String, String> entry : configs.entrySet()) {
if (entry.getKey().equalsIgnoreCase(PASSWORD)) {
continue;
}
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
}
Expand Down
Loading

0 comments on commit 3e3b0e3

Please sign in to comment.