Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add loong collector monitor #1636

Merged
merged 6 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ void Application::Start() { // GCOVR_EXCL_START
#endif

LogtailAlarm::GetInstance()->Init();
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();

PluginRegistry::GetInstance()->LoadPlugins();
Expand Down Expand Up @@ -356,6 +357,7 @@ void Application::Exit() {
#endif

LogtailMonitor::GetInstance()->Stop();
LoongCollectorMonitor::GetInstance()->Stop();
LogtailAlarm::GetInstance()->Stop();
// from now on, alarm should not be used.

Expand Down
13 changes: 10 additions & 3 deletions core/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ void LogInput::Start() {
initialized = true;

mInteruptFlag = false;

mGlobalOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_GLOBAL_OPEN_FD_TOTAL);
mGlobalRegisterHandlerTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_GLOBAL_REGISTER_HANDLER_TOTAL);

new Thread([this]() { ProcessLoop(); });
}

Expand Down Expand Up @@ -341,9 +345,12 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) {

LogtailMonitor::GetInstance()->UpdateMetric("event_tps",
1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
LogtailMonitor::GetInstance()->UpdateMetric("open_fd",
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", EventDispatcher::GetInstance()->GetHandlerCount());
int32_t openFdTotal = GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize();
LogtailMonitor::GetInstance()->UpdateMetric("open_fd", openFdTotal);
mGlobalOpenFdTotal->Set(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mGlobalRegisterHandlerTotal->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
mEventProcessCount = 0;
Expand Down
3 changes: 3 additions & 0 deletions core/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "common/Lock.h"
#include "common/LogRunnable.h"
#include "monitor/Monitor.h"

namespace logtail {

Expand Down Expand Up @@ -78,6 +79,8 @@ class LogInput : public LogRunnable {
volatile bool mIdleFlag;
int32_t mEventProcessCount;
int32_t mLastUpdateMetricTime;
IntGaugePtr mGlobalOpenFdTotal;
IntGaugePtr mGlobalRegisterHandlerTotal;

std::atomic_int mLastReadEventTime{0};
mutable std::mutex mThreadRunningMux;
Expand Down
8 changes: 4 additions & 4 deletions core/input/InputFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ bool InputFile::Init(const Json::Value& config, uint32_t& pluginIdx, Json::Value
mContext->SetExactlyOnceFlag(true);
}

mInputFileMonitorTotal = GetMetricsRecordRef().CreateGauge(METRIC_INPUT_FILE_MONITOR_TOTAL);
mInputFileMonitorTotal = GetMetricsRecordRef().CreateIntGauge(METRIC_INPUT_FILE_MONITOR_TOTAL);
static const std::unordered_map<std::string, MetricType> inputFileMetricKeys = {
// {METRIC_INPUT_RECORDS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_RECORDS_SIZE_BYTES, MetricType::METRIC_TYPE_COUNTER},
// {METRIC_INPUT_BATCH_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_READ_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_FILE_SIZE_BYTES, MetricType::METRIC_TYPE_GAUGE},
// {METRIC_INPUT_FILE_READ_DELAY_TIME_MS, MetricType::METRIC_TYPE_GAUGE},
{METRIC_INPUT_FILE_OFFSET_BYTES, MetricType::METRIC_TYPE_GAUGE},
{METRIC_INPUT_FILE_SIZE_BYTES, MetricType::METRIC_TYPE_INT_GAUGE},
// {METRIC_INPUT_FILE_READ_DELAY_TIME_MS, MetricType::METRIC_TYPE_INT_GAUGE},
{METRIC_INPUT_FILE_OFFSET_BYTES, MetricType::METRIC_TYPE_INT_GAUGE},
};
mPluginMetricManager
= std::make_shared<PluginMetricManager>(GetMetricsRecordRef()->GetLabels(), inputFileMetricKeys);
Expand Down
2 changes: 1 addition & 1 deletion core/input/InputFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class InputFile : public Input {
FileReaderOptions mFileReader;
MultilineOptions mMultiline;
PluginMetricManagerPtr mPluginMetricManager;
GaugePtr mInputFileMonitorTotal;
IntGaugePtr mInputFileMonitorTotal;
// others
uint32_t mMaxCheckpointDirSearchDepth = 0;
uint32_t mExactlyOnceConcurrency = 0;
Expand Down
132 changes: 74 additions & 58 deletions core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,6 @@ using namespace sls_logs;

namespace logtail {

Counter::Counter(const std::string& name, uint64_t val = 0) : mName(name), mVal(val) {
}

uint64_t Counter::GetValue() const {
return mVal;
}

const std::string& Counter::GetName() const {
return mName;
}

Counter* Counter::Collect() {
return new Counter(mName, mVal.exchange(0));
}

void Counter::Add(uint64_t value) {
mVal += value;
}

Gauge::Gauge(const std::string& name, uint64_t val = 0) : mName(name), mVal(val) {
}

uint64_t Gauge::GetValue() const {
return mVal;
}

const std::string& Gauge::GetName() const {
return mName;
}

Gauge* Gauge::Collect() {
return new Gauge(mName, mVal);
}

void Gauge::Set(uint64_t value) {
mVal = value;
}

MetricsRecord::MetricsRecord(LabelsPtr labels) : mLabels(labels), mDeleted(false) {
}

Expand All @@ -72,9 +34,15 @@ CounterPtr MetricsRecord::CreateCounter(const std::string& name) {
return counterPtr;
}

GaugePtr MetricsRecord::CreateGauge(const std::string& name) {
GaugePtr gaugePtr = std::make_shared<Gauge>(name);
mGauges.emplace_back(gaugePtr);
IntGaugePtr MetricsRecord::CreateIntGauge(const std::string& name) {
IntGaugePtr gaugePtr = std::make_shared<Gauge<uint64_t>>(name);
mIntGauges.emplace_back(gaugePtr);
return gaugePtr;
}

DoubleGaugePtr MetricsRecord::CreateDoubleGauge(const std::string& name) {
DoubleGaugePtr gaugePtr = std::make_shared<Gauge<double>>(name);
mDoubleGauges.emplace_back(gaugePtr);
return gaugePtr;
}

Expand All @@ -87,26 +55,44 @@ bool MetricsRecord::IsDeleted() const {
}

const LabelsPtr& MetricsRecord::GetLabels() const {
std::lock_guard<std::mutex> lock(mLabelsMutex);
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
return mLabels;
}

void MetricsRecord::SetLabels(LabelsPtr labels) {
std::lock_guard<std::mutex> lock(mLabelsMutex);
mLabels = labels;
}

const std::vector<CounterPtr>& MetricsRecord::GetCounters() const {
return mCounters;
}

const std::vector<GaugePtr>& MetricsRecord::GetGauges() const {
return mGauges;
const std::vector<IntGaugePtr>& MetricsRecord::GetIntGauges() const {
return mIntGauges;
}

const std::vector<DoubleGaugePtr>& MetricsRecord::GetDoubleGauges() const {
return mDoubleGauges;
}

MetricsRecord* MetricsRecord::Collect() {
MetricsRecord* metrics = new MetricsRecord(mLabels);
MetricsRecord* metrics;
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
{
std::lock_guard<std::mutex> lock(mLabelsMutex);
metrics = new MetricsRecord(mLabels);
}
for (auto& item : mCounters) {
CounterPtr newPtr(item->Collect());
metrics->mCounters.emplace_back(newPtr);
}
for (auto& item : mGauges) {
GaugePtr newPtr(item->Collect());
metrics->mGauges.emplace_back(newPtr);
for (auto& item : mIntGauges) {
IntGaugePtr newPtr(item->Collect());
metrics->mIntGauges.emplace_back(newPtr);
}
for (auto& item : mDoubleGauges) {
DoubleGaugePtr newPtr(item->Collect());
metrics->mDoubleGauges.emplace_back(newPtr);
}
return metrics;
}
Expand All @@ -129,6 +115,10 @@ void MetricsRecordRef::SetMetricsRecord(MetricsRecord* metricRecord) {
mMetrics = metricRecord;
}

void MetricsRecordRef::SetLabels(LabelsPtr labels) {
mMetrics->SetLabels(labels);
}

const LabelsPtr& MetricsRecordRef::GetLabels() const {
return mMetrics->GetLabels();
}
Expand All @@ -137,8 +127,12 @@ CounterPtr MetricsRecordRef::CreateCounter(const std::string& name) {
return mMetrics->CreateCounter(name);
}

GaugePtr MetricsRecordRef::CreateGauge(const std::string& name) {
return mMetrics->CreateGauge(name);
IntGaugePtr MetricsRecordRef::CreateIntGauge(const std::string& name) {
return mMetrics->CreateIntGauge(name);
}

DoubleGaugePtr MetricsRecordRef::CreateDoubleGauge(const std::string& name) {
return mMetrics->CreateDoubleGauge(name);
}

const MetricsRecord* MetricsRecordRef::operator->() const {
Expand All @@ -153,8 +147,11 @@ void ReentrantMetricsRecord::Init(MetricLabels& labels, std::unordered_map<std::
case MetricType::METRIC_TYPE_COUNTER:
mCounters[metric.first] = mMetricsRecordRef.CreateCounter(metric.first);
break;
case MetricType::METRIC_TYPE_GAUGE:
mGauges[metric.first] = mMetricsRecordRef.CreateGauge(metric.first);
case MetricType::METRIC_TYPE_INT_GAUGE:
mIntGauges[metric.first] = mMetricsRecordRef.CreateIntGauge(metric.first);
break;
case MetricType::METRIC_TYPE_DOUBLE_GAUGE:
mDoubleGauges[metric.first] = mMetricsRecordRef.CreateDoubleGauge(metric.first);
break;
default:
break;
Expand All @@ -174,9 +171,17 @@ CounterPtr ReentrantMetricsRecord::GetCounter(const std::string& name) {
return nullptr;
}

GaugePtr ReentrantMetricsRecord::GetGauge(const std::string& name) {
auto it = mGauges.find(name);
if (it != mGauges.end()) {
IntGaugePtr ReentrantMetricsRecord::GetIntGauge(const std::string& name) {
auto it = mIntGauges.find(name);
if (it != mIntGauges.end()) {
return it->second;
}
return nullptr;
}

DoubleGaugePtr ReentrantMetricsRecord::GetDoubleGauge(const std::string& name) {
auto it = mDoubleGauges.find(name);
if (it != mDoubleGauges.end()) {
return it->second;
}
return nullptr;
Expand Down Expand Up @@ -358,8 +363,14 @@ void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& log
contentPtr->set_key(VALUE_PREFIX + counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
}
for (auto& item : tmp->GetGauges()) {
GaugePtr gauge = item;
for (auto& item : tmp->GetIntGauges()) {
IntGaugePtr gauge = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
}
for (auto& item : tmp->GetDoubleGauges()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double ToString的精度可能比较长,后续看看是否需要控制一下

DoubleGaugePtr gauge = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
Expand Down Expand Up @@ -390,8 +401,13 @@ void ReadMetrics::ReadAsFileBuffer(std::string& metricsContent) const {
metricsRecordValue[VALUE_PREFIX + counter->GetName()] = ToString(counter->GetValue());
}

for (auto& item : tmp->GetGauges()) {
GaugePtr gauge = item;
for (auto& item : tmp->GetIntGauges()) {
IntGaugePtr gauge = item;
metricsRecordValue[VALUE_PREFIX + gauge->GetName()] = ToString(gauge->GetValue());
}

for (auto& item : tmp->GetDoubleGauges()) {
DoubleGaugePtr gauge = item;
metricsRecordValue[VALUE_PREFIX + gauge->GetName()] = ToString(gauge->GetValue());
}

Expand Down
Loading
Loading