Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sender log condition to record valuable debug info #1359

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/common/LogFileCollectOffsetIndicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ void LogFileCollectOffsetIndicator::RecordFileOffset(LoggroupTimeValue* data) {
devInode,
data->mLogGroupContext.mFuseMode,
fd,
data->mLastUpdateTime);
data->mEnqueueTime);
iter = mLogFileOffsetInfoMap.insert(std::make_pair(logFileInfo, logFileOffsetInfo)).first;
}
LogFileOffsetInfo* logFileOffsetInfo = iter->second;
logFileOffsetInfo->mLastUpdateTime = data->mLastUpdateTime;
logFileOffsetInfo->mLastUpdateTime = data->mEnqueueTime;

LogFileOffsetInfoNode node(seqNum,
fileInfoPtr->offset,
Expand Down
3 changes: 0 additions & 3 deletions core/common/LogstoreSenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ bool LogstoreSenderInfo::RecordSendResult(SendResult rst, LogstoreSenderStatisti
if (++mLastNetworkErrorCount >= INT32_FLAG(max_client_send_error_count)) {
mLastNetworkErrorCount = INT32_FLAG(max_client_send_error_count);
mNetworkValidFlag = false;
LOG_WARNING(sLogger,
("Network fail, disable ", this->mRegion)("retry interval", mNetworkRetryInterval));
}
break;
case LogstoreSenderInfo::SendResult_QuotaFail:
Expand All @@ -155,7 +153,6 @@ bool LogstoreSenderInfo::RecordSendResult(SendResult rst, LogstoreSenderStatisti
if (++mLastQuotaExceedCount >= INT32_FLAG(max_client_quota_exceed_count)) {
mLastQuotaExceedCount = INT32_FLAG(max_client_quota_exceed_count);
mQuotaValidFlag = false;
LOG_WARNING(sLogger, ("QuotaF fail, disable ", this->mRegion)("retry interval", mQuotaRetryInterval));
}
break;
default:
Expand Down
28 changes: 20 additions & 8 deletions core/common/LogstoreSenderQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct LogstoreSenderStatistics {
};

struct LoggroupTimeValue {
int32_t mLastUpdateTime;
int32_t mEnqueueTime;
SEND_DATA_TYPE mDataType;
std::string mLogData;
int32_t mRawSize;
Expand All @@ -63,6 +63,7 @@ struct LoggroupTimeValue {

int32_t mSendRetryTimes;
int32_t mLastSendTime;
int32_t mLastLogWarningTime;
std::string mAliuid;
std::string mRegion;
std::string mShardHashKey;
Expand Down Expand Up @@ -99,9 +100,10 @@ struct LoggroupTimeValue {
mDataType = dataType;
mLogLines = lines;
mRawSize = rawSize;
mLastUpdateTime = lastUpdateTime;
mEnqueueTime = lastUpdateTime;
mSendRetryTimes = 0;
mLastSendTime = 0;
mLastLogWarningTime = 0;
mLogData.clear();
mShardHashKey = shardHashKey;
mStatus = LoggroupSendStatus_Idle;
Expand Down Expand Up @@ -400,11 +402,11 @@ class SingleLogstoreSenderManager : public SingleLogstoreFeedbackQueue<LoggroupT
continue;
}

if (item->mLastUpdateTime < minSendTime) {
minSendTime = item->mLastUpdateTime;
if (item->mEnqueueTime < minSendTime) {
minSendTime = item->mEnqueueTime;
}
if (item->mLastUpdateTime > maxSendTime) {
maxSendTime = item->mLastUpdateTime;
if (item->mEnqueueTime > maxSendTime) {
maxSendTime = item->mEnqueueTime;
}
++statisticsItem.mSendQueueSize;
}
Expand All @@ -417,15 +419,25 @@ class SingleLogstoreSenderManager : public SingleLogstoreFeedbackQueue<LoggroupT

int32_t OnSendDone(LoggroupTimeValue* item, LogstoreSenderInfo::SendResult sendRst, bool& needTrigger) {
needTrigger = mSenderInfo.RecordSendResult(sendRst, mSenderStatistics);
if (!mSenderInfo.mNetworkValidFlag) {
LOG_WARNING(sLogger,
("Network fail, pause logstore", item->mLogstore)("project", item->mProjectName)(
"region", mSenderInfo.mRegion)("retry interval", mSenderInfo.mNetworkRetryInterval));
}
if (!mSenderInfo.mQuotaValidFlag) {
LOG_WARNING(sLogger,
("Quota fail, pause logstore", item->mLogstore)("project", item->mProjectName)(
"region", mSenderInfo.mRegion)("retry interval", mSenderInfo.mQuotaRetryInterval));
}
// if send error, reset status to idle, and wait to send again
// network fail or quota fail
if (sendRst != LogstoreSenderInfo::SendResult_OK && sendRst != LogstoreSenderInfo::SendResult_Buffered
&& sendRst != LogstoreSenderInfo::SendResult_DiscardFail) {
item->mStatus = LoggroupSendStatus_Idle;
return 0;
}
if (mSenderStatistics.mMaxSendSuccessTime < item->mLastUpdateTime) {
mSenderStatistics.mMaxSendSuccessTime = item->mLastUpdateTime;
if (mSenderStatistics.mMaxSendSuccessTime < item->mEnqueueTime) {
mSenderStatistics.mMaxSendSuccessTime = item->mEnqueueTime;
}
// else remove item except buffered
return RemoveItem(item, sendRst != LogstoreSenderInfo::SendResult_Buffered);
Expand Down
3 changes: 2 additions & 1 deletion core/monitor/LogIntegrity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ void LogIntegrity::Notify(LoggroupTimeValue* data, bool flag) {
PTScopedLock lock(mLogIntegrityMapLock);
LogIntegrityInfo* info = NULL;
if (FindLogIntegrityInfo(region, projectName, logstore, filename, info)) {
info->mLastUpdateTime = data->mLastUpdateTime;
info->mLastUpdateTime = data->mEnqueueTime;
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved

info->SetStatus(data->mLogGroupContext.mSeqNum,
data->mLogLines,
flag ? LogTimeInfo::LogIntegrityStatus_SendOK : LogTimeInfo::LogIntegrityStatus_SendFail);
Expand Down
77 changes: 43 additions & 34 deletions core/sender/Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ DEFINE_FLAG_INT32(reset_region_concurrency_error_count,
5);
DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this value", 5);
DEFINE_FLAG_INT32(test_unavailable_endpoint_interval, "test unavailable endpoint interval", 60);
DEFINE_FLAG_INT32(sending_cost_time_alarm_interval, "sending log group cost too much time, second", 3);
DEFINE_FLAG_INT32(log_group_wait_in_queue_alarm_interval,
"log group wait in queue alarm interval, may blocked by concurrency or quota, second",
6);
static const int SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND = 3;
static const int LOG_GROUP_WAIT_IN_QUEUE_ALARM_INTERVAL_SECOND = 6;
static const int ON_FAIL_LOG_WARNING_INTERVAL_SECOND = 10;
DEFINE_FLAG_STRING(data_endpoint_policy,
"policy for switching between data server endpoints, possible options include "
"'designated_first'(default) and 'designated_locked'",
Expand Down Expand Up @@ -126,7 +125,7 @@ void SendClosure::OnSuccess(sdk::Response* response) {
("SendSucess", "OK")("RequestId", response->requestId)("StatusCode", response->statusCode)(
"ResponseTime", curTime - mDataPtr->mLastSendTime)("Region", mDataPtr->mRegion)(
"Project", mDataPtr->mProjectName)("Logstore", mDataPtr->mLogstore)("Config", mDataPtr->mConfigName)(
"RetryTimes", mDataPtr->mSendRetryTimes)("TotalSendCost", curTime - mDataPtr->mLastUpdateTime)(
"RetryTimes", mDataPtr->mSendRetryTimes)("TotalSendCost", curTime - mDataPtr->mEnqueueTime)(
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
"LogLines", mDataPtr->mLogLines)("Bytes", mDataPtr->mLogData.size())(
"Endpoint", mDataPtr->mCurrentEndpoint)("IsProfileData", isProfileData));
}
Expand Down Expand Up @@ -172,6 +171,19 @@ static const char* GetOperationString(OperationOnFail op) {
}
}

/*
* @brief OnFail callback if send failed
* There are 3 possible outcomes:
* 1. RETRY_ASYNC_WHEN_FAIL: Resend the item immediately. It will be kept in the sender queue with its mStatus Sending.
* All RETRY_ASYNC_WHEN_FAIL must fall to RECORD_ERROR_WHEN_FAIL after several retries.
* 2. RECORD_ERROR_WHEN_FAIL: Resend the item later. It will be kept in the sender queue with its mStatus reseting to
* Idle. The item will be fetched on the next round when its sender queue is visited. resend later
* 3. DISCARD_WHEN_FAIL: Won't resend the item and delete it in the sender queue.
* @param response response from server (maybe empty)
* @param errorCode defined in sdk/Common.cpp
* @param errorMessage error message from server
*
*/
void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const string& errorMessage) {
// test
LOG_DEBUG(sLogger, ("send failed, error code", errorCode)("error msg", errorMessage));
Expand Down Expand Up @@ -230,20 +242,19 @@ void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const
BOOL_FLAG(global_network_success) = true;
if (errorCode == sdk::LOGE_SHARD_WRITE_QUOTA_EXCEED) {
failDetail << "shard write quota exceed";
suggestion << "split logstore shards. https://help.aliyun.com/document_detail/48998.html";
suggestion << "Split logstore shards. https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
} else {
failDetail << "project write quota exceed";
suggestion << "create ticket or raise issue in support chat group";
suggestion << "Submit quota modification request. "
"https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
}
Sender::Instance()->IncTotalSendStatistic(mDataPtr->mProjectName, mDataPtr->mLogstore, curTime);
if (curTime - mDataPtr->mLastUpdateTime > INT32_FLAG(sending_cost_time_alarm_interval)) {
LogtailAlarm::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM,
"error_code: " + errorCode + ", error_message: " + errorMessage
+ ", request_id:" + response->requestId,
mDataPtr->mProjectName,
mDataPtr->mLogstore,
mDataPtr->mRegion);
}
LogtailAlarm::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM,
"error_code: " + errorCode + ", error_message: " + errorMessage
+ ", request_id:" + response->requestId,
mDataPtr->mProjectName,
mDataPtr->mLogstore,
mDataPtr->mRegion);
operation = RECORD_ERROR_WHEN_FAIL;
recordRst = LogstoreSenderInfo::SendResult_QuotaFail;
} else if (sendResult == SEND_UNAUTHORIZED) {
Expand All @@ -264,7 +275,7 @@ void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const
if (SLSControl::GetInstance()->SetSlsSendClientAuth(mDataPtr->mAliuid, false, sendClient, lastUpdateTime))
operation = RETRY_ASYNC_WHEN_FAIL;
else if (curTime - lastUpdateTime < INT32_FLAG(unauthorized_allowed_delay_after_reset))
operation = RETRY_ASYNC_WHEN_FAIL;
operation = RECORD_ERROR_WHEN_FAIL;
else
operation = DISCARD_WHEN_FAIL;
#ifdef __ENTERPRISE__
Expand Down Expand Up @@ -341,7 +352,7 @@ void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const
// when retry times > unknow_error_try_max, we will drop this data
operation = DefaultOperation();
}
if (curTime - mDataPtr->mLastUpdateTime > INT32_FLAG(discard_send_fail_interval)) {
if (curTime - mDataPtr->mEnqueueTime > INT32_FLAG(discard_send_fail_interval)) {
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
operation = DISCARD_WHEN_FAIL;
}
bool isProfileData = Sender::IsProfileData(mDataPtr->mRegion, mDataPtr->mProjectName, mDataPtr->mLogstore);
Expand All @@ -354,25 +365,24 @@ void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const
"RequestId", response->requestId)("StatusCode", response->statusCode)("ErrorCode", errorCode)( \
"ErrorMessage", errorMessage)("ResponseTime", curTime - mDataPtr->mLastSendTime)("Region", mDataPtr->mRegion)( \
"Project", mDataPtr->mProjectName)("Logstore", mDataPtr->mLogstore)("Config", mDataPtr->mConfigName)( \
"RetryTimes", mDataPtr->mSendRetryTimes)("TotalSendCost", curTime - mDataPtr->mLastUpdateTime)( \
"LogLines", mDataPtr->mLogLines)("Bytes", mDataPtr->mLogData.size())("Endpoint", mDataPtr->mCurrentEndpoint)( \
"IsProfileData", isProfileData)
"RetryTimes", mDataPtr->mSendRetryTimes)("TotalSendCost", \
curTime - mDataPtr->mEnqueueTime)("LogLines", mDataPtr->mLogLines)( \
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
"Bytes", mDataPtr->mLogData.size())("Endpoint", mDataPtr->mCurrentEndpoint)("IsProfileData", isProfileData)

// Log warning if retry for too long or will discard data
switch (operation) {
case RETRY_ASYNC_WHEN_FAIL:
if (curTime - mDataPtr->mLastUpdateTime > INT32_FLAG(sending_cost_time_alarm_interval)
|| errorCode == sdk::LOGE_REQUEST_TIMEOUT) {
if (errorCode == sdk::LOGE_REQUEST_TIMEOUT) {
// retry on network timeout should be recorded, because this may lead to data duplication
LOG_WARNING(sLogger, LOG_PATTERN);
}
Sender::Instance()->SendToNetAsync(mDataPtr);
break;
case RECORD_ERROR_WHEN_FAIL:
if (curTime - mDataPtr->mLastUpdateTime > INT32_FLAG(sending_cost_time_alarm_interval)
|| errorCode == sdk::LOGE_REQUEST_TIMEOUT) {
// retry on network timeout should be recorded, because this may lead to data duplication
if (errorCode == sdk::LOGE_REQUEST_TIMEOUT
|| curTime - mDataPtr->mLastLogWarningTime > ON_FAIL_LOG_WARNING_INTERVAL_SECOND) {
LOG_WARNING(sLogger, LOG_PATTERN);
mDataPtr->mLastLogWarningTime = curTime;
}
// Sender::Instance()->PutIntoSecondaryBuffer(mDataPtr, 10);
Sender::Instance()->SubSendingBufferCount();
Expand Down Expand Up @@ -629,7 +639,7 @@ bool Sender::WriteToFile(LoggroupTimeValue* value, bool sendPerformance) {
outfile << value->mProjectName << "\t" << value->mLogstore << "\t" << time(NULL) << "\t" << value->mRawSize
<< "\t" << value->mLogLines << endl;
else
outfile << value->mProjectName << "\t" << value->mLogstore << "\t" << value->mLastUpdateTime << "\t"
outfile << value->mProjectName << "\t" << value->mLogstore << "\t" << value->mEnqueueTime << "\t"
<< value->mRawSize << "\t" << value->mLogLines << endl;
outfile.close();
return true;
Expand Down Expand Up @@ -1482,9 +1492,9 @@ void Sender::DaemonSender() {

for (vector<LoggroupTimeValue*>::iterator itr = logGroupToSend.begin(); itr != logGroupToSend.end(); ++itr) {
LoggroupTimeValue* data = *itr;
int32_t logGroupWaitTime = curTime - data->mLastUpdateTime;
int32_t logGroupWaitTime = curTime - data->mEnqueueTime;

if (logGroupWaitTime > INT32_FLAG(log_group_wait_in_queue_alarm_interval)) {
if (logGroupWaitTime > LOG_GROUP_WAIT_IN_QUEUE_ALARM_INTERVAL_SECOND) {
LOG_WARNING(sLogger,
("log group wait in queue for too long, may blocked by concurrency or quota, region",
data->mRegion)("project", data->mProjectName)("logstore", data->mLogstore)(
Expand Down Expand Up @@ -1514,17 +1524,17 @@ void Sender::DaemonSender() {
usleep(10 * 1000);
}
int32_t afterSleepTime = time(NULL);
int32_t sendCostTime = afterSleepTime - beforeSleepTime;
if (sendCostTime > INT32_FLAG(sending_cost_time_alarm_interval)) {
int32_t blockCostTime = afterSleepTime - beforeSleepTime;
if (blockCostTime > SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND) {
LOG_WARNING(sLogger,
("sending log group blocked too long because send concurrency reached limit. current "
"concurrency used",
GetSendingBufferCount())("max concurrency",
AppConfig::GetInstance()->GetSendRequestConcurrency())(
"blocked time", sendCostTime));
"blocked time", blockCostTime));
LogtailAlarm::GetInstance()->SendAlarm(SENDING_COSTS_TOO_MUCH_TIME_ALARM,
"sending log group costs too much time, blocked time "
+ ToString(sendCostTime),
"sending log group blocked for too much time, cost "
+ ToString(blockCostTime),
data->mProjectName,
data->mLogstore,
data->mRegion);
Expand All @@ -1534,7 +1544,6 @@ void Sender::DaemonSender() {
sendBufferBytes += data->mRawSize;
sendNetBodyBytes += data->mLogData.size();
sendLines += data->mLogLines;
data->mLastUpdateTime = time(NULL); // set last update time before sending
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
SendToNetAsync(data);
#ifdef __ENTERPRISE__
}
Expand Down
Loading