Skip to content

Commit

Permalink
Merge branch 'StarRocks:branch-2.5' into branch-2.5
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangguangyxg authored Aug 4, 2023
2 parents 0aa97cf + f0b62f4 commit 07d6eda
Show file tree
Hide file tree
Showing 56 changed files with 1,052 additions and 241 deletions.
9 changes: 7 additions & 2 deletions be/src/formats/csv/binary_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "formats/csv/binary_converter.h"

#include "column/binary_column.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
#include "runtime/types.h"

Expand Down Expand Up @@ -47,7 +48,8 @@ bool BinaryConverter::read_string(Column* column, Slice s, const Options& option
}

if (UNLIKELY((s.size > TypeDescriptor::MAX_VARCHAR_LENGTH) || (max_size > 0 && s.size > max_size))) {
LOG(WARNING) << "Column [" << column->get_name() << "]'s length exceed max varchar length.";
VLOG(3) << strings::Substitute("Column [$0]'s length exceed max varchar length. str_size($1), max_size($2)",
column->get_name(), s.size, max_size);
return false;
}
down_cast<BinaryColumn*>(column)->append(s);
Expand Down Expand Up @@ -92,7 +94,10 @@ bool BinaryConverter::read_quoted_string(Column* column, Slice s, const Options&
size_t ext_size = new_size - old_size;
if (UNLIKELY((ext_size > TypeDescriptor::MAX_VARCHAR_LENGTH) || (max_size > 0 && ext_size > max_size))) {
bytes.resize(old_size);
LOG(WARNING) << "Column [" << column->get_name() << "]'s length exceed max varchar length.";
VLOG(3) << strings::Substitute(
"Column [$0]'s length exceed max varchar length. old_size($1), new_size($2), ext_size($3), "
"max_size($4)",
column->get_name(), old_size, new_size, ext_size, max_size);
return false;
}
offsets.push_back(bytes.size());
Expand Down
12 changes: 9 additions & 3 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ Status DeltaWriter::_init() {
_tablet = tablet_mgr->get_tablet(_opt.tablet_id, false);
if (_tablet == nullptr) {
std::stringstream ss;
ss << "Fail to get tablet. tablet_id=" << _opt.tablet_id;
ss << "Fail to get tablet, perhaps this table is doing schema change, or it has already been deleted. Please "
"try again. tablet_id="
<< _opt.tablet_id;
LOG(WARNING) << ss.str();
Status st = Status::InternalError(ss.str());
_set_state(kUninitialized, st);
Expand Down Expand Up @@ -121,8 +123,12 @@ Status DeltaWriter::_init() {
if (config::enable_event_based_compaction_framework) {
StorageEngine::instance()->compaction_manager()->update_tablet_async(_tablet);
}
auto msg = fmt::format("Too many versions. tablet_id: {}, version_count: {}, limit: {}, replica_state: {}",
_opt.tablet_id, _tablet->version_count(), config::tablet_max_versions, _replica_state);
auto msg = fmt::format(
"Failed to load data into tablet {}, because of too many versions, current/limit: {}/{}. You can "
"reduce the loading job concurrency, or increase loading data batch size. If you are loading data with "
"Routine Load, you can increase FE configs routine_load_task_consume_second and "
"max_routine_load_batch_size,",
_opt.tablet_id, _tablet->version_count(), config::tablet_max_versions);
LOG(ERROR) << msg;
Status st = Status::ServiceUnavailable(msg);
_set_state(kUninitialized, st);
Expand Down
11 changes: 11 additions & 0 deletions be/src/storage/rowset/zone_map_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ Status ZoneMapIndexReader::_do_load(FileSystem* fs, const std::string& filename,
if (!_page_zone_maps[i].ParseFromArray(value.data, value.size)) {
return Status::Corruption("Failed to parse zone map");
}

// Currently if the column type is varchar(length) and the values is all null,
// a zonemap string of length will be written to the segment file,
// causing the loaded metadata to occupy a large amount of memory.
//
// The main purpose of this code is to optimize the reading of segment files
// generated by the old version.
if (_page_zone_maps[i].has_has_not_null() && !_page_zone_maps[i].has_not_null()) {
delete _page_zone_maps[i].release_min();
delete _page_zone_maps[i].release_max();
}
column->resize(0);
}
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(SchemaChangeParams& sc_p
if (status.ok()) {
status = sc_params.new_tablet->check_version_integrity(sc_params.version);
}
sc_params.new_tablet->update_max_continuous_version();

LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
<< "base_tablet=" << sc_params.base_tablet->full_name()
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ class Tablet : public BaseTablet {

void get_basic_info(TabletBasicInfo& info);

void update_max_continuous_version() { _timestamped_version_tracker.update_max_continuous_version(); }

protected:
void on_shutdown() override;

Expand Down
25 changes: 20 additions & 5 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,10 @@ Status TabletUpdates::_get_apply_version_and_rowsets(int64_t* version, std::vect
std::vector<uint32_t>* rowset_ids) {
std::lock_guard rl(_lock);
if (_edit_version_infos.empty()) {
string msg = Substitute("tablet deleted when _get_apply_version_and_rowsets tablet:$0", _tablet.tablet_id());
string msg = strings::Substitute(
"Tablet is deleted, perhaps this table is doing schema change, or it has already been deleted. Please "
"try again. get_apply_version_and_rowsets tablet:$0",
_tablet.tablet_id());
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
Expand Down Expand Up @@ -543,7 +546,10 @@ Status TabletUpdates::rowset_commit(int64_t version, const RowsetSharedPtr& rows
{
std::unique_lock<std::mutex> ul(_lock);
if (_edit_version_infos.empty()) {
string msg = Substitute("tablet deleted when rowset_commit tablet:$0", _tablet.tablet_id());
string msg = strings::Substitute(
"Tablet is deleted, perhaps this table is doing schema change, or it has already been deleted. "
"Please try again. rowset_commit tablet:$0",
_tablet.tablet_id());
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
Expand Down Expand Up @@ -830,7 +836,10 @@ void TabletUpdates::_stop_and_wait_apply_done() {
Status TabletUpdates::get_latest_applied_version(EditVersion* latest_applied_version) {
std::lock_guard l(_lock);
if (_edit_version_infos.empty()) {
string msg = Substitute("tablet deleted when get_latest_applied_version tablet:$0", _tablet.tablet_id());
string msg = strings::Substitute(
"Tablet is deleted, perhaps this table is doing schema change, or it has already been deleted. "
"get_latest_applied_version tablet:$0",
_tablet.tablet_id());
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
Expand Down Expand Up @@ -1242,7 +1251,10 @@ RowsetSharedPtr TabletUpdates::_get_rowset(uint32_t rowset_id) {
Status TabletUpdates::_wait_for_version(const EditVersion& version, int64_t timeout_ms,
std::unique_lock<std::mutex>& ul) {
if (_edit_version_infos.empty()) {
string msg = Substitute("tablet deleted when _wait_for_version tablet:$0", _tablet.tablet_id());
string msg = strings::Substitute(
"Tablet is deleted, perhaps this table is doing schema change, or it has already been deleted. "
"_wait_for_version tablet:$0",
_tablet.tablet_id());
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
Expand Down Expand Up @@ -2565,7 +2577,10 @@ Status TabletUpdates::get_applied_rowsets(int64_t version, std::vector<RowsetSha
// wait for version timeout 55s, should smaller than exec_plan_fragment rpc timeout(60s)
RETURN_IF_ERROR(_wait_for_version(EditVersion(version, 0), 55000, ul));
if (_edit_version_infos.empty()) {
string msg = Substitute("tablet deleted when get_applied_rowsets tablet:$0", _tablet.tablet_id());
string msg = strings::Substitute(
"Tablet is deleted, perhaps this table is doing schema change, or it has already been deleted. Please "
"try again. get_applied_rowsets tablet:$0",
_tablet.tablet_id());
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/storage/version_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ int64_t TimestampedVersionTracker::get_max_continuous_version() const {
return _version_graph.max_continuous_version();
}

void TimestampedVersionTracker::update_max_continuous_version() {
_version_graph.update_max_continuous_version();
}

int64_t TimestampedVersionTracker::get_min_readable_version() const {
return _version_graph.min_readable_version();
}
Expand Down Expand Up @@ -223,6 +227,10 @@ std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::times
return _timestamped_versions_container;
}

void VersionGraph::update_max_continuous_version() {
_max_continuous_version = _get_max_continuous_version_from(0);
}

void VersionGraph::construct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas, int64_t* max_version) {
_version_graph.clear();
_max_continuous_version = -1;
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/version_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class VersionGraph {
// Get max continuous version from 0
int64_t max_continuous_version() const { return _max_continuous_version; }

void update_max_continuous_version();

int64_t min_readable_version() const { return _min_readable_version; }

private:
Expand Down Expand Up @@ -198,6 +200,8 @@ class TimestampedVersionTracker {
// Get max continuous version from 0
int64_t get_max_continuous_version() const;

void update_max_continuous_version();

int64_t get_min_readable_version() const;

private:
Expand Down
4 changes: 3 additions & 1 deletion docs/table_design/table_types/primary_key_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ PROPERTIES("replication_num" = "3",

## What to do next

You can run a stream load, broker load, or routine load job to perform insert, update, or delete operations on all or individual columns of a table that uses the Primary Key table. For more information, see [Overview of data loading](../../loading/Loading_intro.md).
After table creation, you can run load jobs to load data into the Primary Key table. For more information about supported loading methods, see [Overview of data loading](../../loading/Loading_intro.md).

If you need to update data in the Primary Key table, you can [run a load job](../../loading/Load_to_Primary_Key_tables.md) or execute a DML statement ([UPDATE](../../sql-reference/sql-statements/data-manipulation/UPDATE.md) or [DELETE](../../sql-reference/sql-statements/data-manipulation/DELETE.md)). Also, these update operations guarantee atomicity.
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,8 @@ private void swapTableInternal(Database db, OlapTable origTable, OlapTable newTb
db.createTable(origTable);
}

public void processAlterView(AlterViewStmt stmt, ConnectContext ctx) throws UserException {
TableName dbTableName = stmt.getTbl();
public void processAlterView(AlterViewStmt stmt, ConnectContext ctx) throws DdlException {
TableName dbTableName = stmt.getTableName();
String dbName = dbTableName.getDb();

Database db = GlobalStateMgr.getCurrentState().getDb(dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ private FunctionName() {
public FunctionName(String db, String fn) {
db_ = db;
fn_ = fn.toLowerCase();
if (db_ != null) {
db_ = db_.toLowerCase();
}
}

public FunctionName(String fn) {
Expand All @@ -56,7 +53,7 @@ public FunctionName(String fn) {
}

public FunctionName(TFunctionName thriftName) {
db_ = thriftName.db_name.toLowerCase();
db_ = thriftName.db_name;
fn_ = thriftName.function_name.toLowerCase();
}

Expand Down
23 changes: 23 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/analysis/TableName.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.starrocks.analysis;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.gson.annotations.SerializedName;
import com.starrocks.cluster.ClusterNamespace;
Expand All @@ -35,10 +36,12 @@
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.CatalogMgr;
import com.starrocks.sql.analyzer.SemanticException;
import org.apache.commons.lang3.StringUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Objects;

public class TableName implements Writable, GsonPreProcessable, GsonPostProcessable {
Expand All @@ -64,6 +67,26 @@ public TableName(String catalog, String db, String tbl) {
this.tbl = tbl;
}

public static TableName fromString(String name) {
List<String> pieces = Splitter.on(".").splitToList(name);
String catalog = ConnectContext.get().getCurrentCatalog();
String db = ConnectContext.get().getDatabase();
if (pieces.isEmpty()) {
throw new IllegalArgumentException("empty table name");
} else if (pieces.size() == 1) {
if (StringUtils.isEmpty(db)) {
throw new IllegalArgumentException("no database");
}
return new TableName(catalog, db, pieces.get(0));
} else if (pieces.size() == 2) {
return new TableName(catalog, pieces.get(0), pieces.get(1));
} else if (pieces.size() == 3) {
return new TableName(pieces.get(0), pieces.get(1), pieces.get(2));
} else {
throw new IllegalArgumentException("illegal table name: " + name);
}
}

public void analyze(Analyzer analyzer) throws AnalysisException {
if (Strings.isNullOrEmpty(catalog)) {
catalog = analyzer.getDefaultCatalog();
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -580,6 +581,14 @@ public Set<String> getTableNamesViewWithLock() {
}
}

public Optional<Table> tryGetTable(String tableName) {
return Optional.ofNullable(nameToTable.get(tableName));
}

public Optional<Table> tryGetTable(long tableId) {
return Optional.ofNullable(idToTable.get(tableId));
}

public Table getTable(String tableName) {
if (nameToTable.containsKey(tableName)) {
return nameToTable.get(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1191,4 +1191,7 @@ public void setPlanContext(MVRewriteContextCache mvRewriteContextCache) {
this.mvRewriteContextCache = mvRewriteContextCache;
}

public String inspectMeta() {
return GsonUtils.GSON.toJson(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static void reportAnalysisException(ErrorCode errorCode, Object... objs)
reportAnalysisException(null, errorCode, objs);
}

public static SemanticException buildSemanticException(ErrorCode errorCode, Object... objs) {
return new SemanticException(reportCommon(null, errorCode, objs));
}

public static void reportSemanticException(ErrorCode errorCode, Object... objs) {
reportSemanticException(null, errorCode, objs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.starrocks.common.util;

import com.clearspring.analytics.util.Lists;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -366,9 +367,10 @@ private static void checkAvailableBackendsIsEnough(short replicationNum) throws
}
List<Long> backendIds = GlobalStateMgr.getCurrentSystemInfo().getAvailableBackendIds();
if (replicationNum > backendIds.size()) {
throw new AnalysisException("Replication num should be less than the number of available BE nodes. "
+ "Replication num is " + replicationNum + " available BE nodes is " + backendIds.size() +
", You can change this default by setting the replication_num table properties.");
throw new AnalysisException("Table replication num should be less than " +
"of equal to the number of available BE nodes. "
+ "You can change this default by setting the replication_num table properties. "
+ "Current alive backend is [" + Joiner.on(",").join(backendIds) + "].");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

package com.starrocks.connector.hive;

import com.google.gson.JsonObject;
import com.starrocks.connector.PartitionInfo;
import com.starrocks.persist.gson.GsonUtils;

import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -94,6 +96,16 @@ public String toString() {
return sb.toString();
}

public JsonObject toJson() {
JsonObject obj = new JsonObject();
obj.add("parameters", (GsonUtils.GSON.toJsonTree(parameters)));
obj.add("inputFormat", (GsonUtils.GSON.toJsonTree(inputFormat)));
obj.add("textFileFormatDesc", (GsonUtils.GSON.toJsonTree(textFileFormatDesc)));
obj.add("fullPath", GsonUtils.GSON.toJsonTree(fullPath));
obj.add("isSplittable", GsonUtils.GSON.toJsonTree(isSplittable));
return obj;
}

public static Builder builder() {
return new Builder();
}
Expand Down
6 changes: 5 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/qe/QueryState.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ public QueryState() {

public void reset() {
stateType = MysqlStateType.OK;
errorMessage = "";
errorCode = null;
infoMessage = null;
serverStatus = 0;
errType = ErrType.OTHER_ERR;
isQuery = false;
affectedRows = 0;
warningRows = 0;
serverStatus = 0;
}

public MysqlStateType getStateType() {
Expand Down
Loading

0 comments on commit 07d6eda

Please sign in to comment.