Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metrics SDK] Performance improvement in measurement processing #1993

Merged
merged 33 commits into from
Mar 4, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
625f306
fix
lalitb Feb 21, 2023
b243da6
fix
lalitb Feb 21, 2023
5000317
fix
lalitb Feb 21, 2023
a724644
fix
lalitb Feb 21, 2023
44f7b44
fix
lalitb Feb 21, 2023
9e80106
fix build error
lalitb Feb 21, 2023
635df61
fix
lalitb Feb 21, 2023
bb8f6b5
fix
lalitb Feb 21, 2023
8e87139
fix
lalitb Feb 21, 2023
6ec205c
Merge branch 'main' into att-copy-fix
lalitb Feb 21, 2023
9b3f8cd
fix
lalitb Feb 21, 2023
6dec0c7
Merge branch 'att-copy-fix' of github.com:lalitb/opentelemetry-cpp in…
lalitb Feb 21, 2023
08b0da0
fix
lalitb Feb 21, 2023
552a7f5
fix
lalitb Feb 21, 2023
0faff5b
fix
lalitb Feb 21, 2023
e4a29f6
Fix
lalitb Feb 22, 2023
11a57c8
fix
lalitb Feb 22, 2023
29f730b
fix CI timeout
lalitb Feb 22, 2023
025da35
Merge branch 'att-copy-fix' of github.com:lalitb/opentelemetry-cpp in…
lalitb Feb 22, 2023
72da98b
Merge branch 'main' into att-copy-fix
ThomsonTan Feb 22, 2023
879e0ad
Merge branch 'main' into att-copy-fix
ThomsonTan Feb 22, 2023
5c4daee
Update sdk/include/opentelemetry/sdk/common/attributemap_hash.h
lalitb Feb 23, 2023
f76b7c1
improve mutex lock
lalitb Feb 24, 2023
1fe461d
Merge branch 'att-copy-fix' of github.com:lalitb/opentelemetry-cpp in…
lalitb Feb 24, 2023
4153560
fix
lalitb Feb 25, 2023
0c0e28f
fix warning
lalitb Feb 25, 2023
43b9ea2
fix
lalitb Feb 25, 2023
60d6161
fix
lalitb Feb 25, 2023
d9653a3
fix
lalitb Feb 25, 2023
4f227af
fix
lalitb Feb 27, 2023
7fccdc0
Merge branch 'main' into att-copy-fix
esigo Mar 3, 2023
ea4cac4
Merge branch 'main' into att-copy-fix
lalitb Mar 3, 2023
7d7755b
Merge branch 'main' into att-copy-fix
lalitb Mar 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions sdk/include/opentelemetry/sdk/common/attributemap_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#pragma once

#include <iostream>
#include <string>
#include "opentelemetry/sdk/common/attribute_utils.h"

Expand All @@ -14,7 +13,7 @@ namespace common
{

template <class T>
inline void GetHashForAttributeValue(size_t &seed, const T arg)
inline void GetHashForAttributeValue(size_t &seed, const T &arg)
{
std::hash<T> hasher;
// reference -
Expand Down Expand Up @@ -57,6 +56,37 @@ inline size_t GetHashForAttributeMap(const OrderedAttributeMap &attribute_map)
return seed;
}

// Calculate hash of keys and values of KeyValueIterable, filtered using callback.
inline size_t GetHashForAttributeMap(
const opentelemetry::common::KeyValueIterable &attributes,
nostd::function_ref<bool(nostd::string_view)> is_key_present_callback)
{
AttributeConverter converter;
size_t seed = 0UL;
attributes.ForEachKeyValue(
[&](nostd::string_view key, opentelemetry::common::AttributeValue value) noexcept {
if (!is_key_present_callback(key))
{
return true;
}
std::hash<std::string> hasher;
// reference -
// https://www.boost.org/doc/libs/1_37_0/doc/html/hash/reference.html#boost.hash_combine
seed ^= hasher(key.data()) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
lalitb marked this conversation as resolved.
Show resolved Hide resolved
auto attr_val = nostd::visit(converter, value);
lalitb marked this conversation as resolved.
Show resolved Hide resolved
nostd::visit(GetHashForAttributeValueVisitor(seed), attr_val);
return true;
});
return seed;
}

template <class T>
inline size_t GetHash(T value)
{
std::hash<T> hasher;
return hasher(value);
}

} // namespace common
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,24 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
{
auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
aggr->Aggregate(measurement.second);
auto prev = cumulative_hash_map_->Get(measurement.first);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(measurement.first);
auto prev = cumulative_hash_map_->Get(hash);
if (prev)
{
auto delta = prev->Diff(*aggr);
// store received value in cumulative map, and the diff in delta map (to pass it to temporal
// storage)
cumulative_hash_map_->Set(measurement.first, std::move(aggr));
delta_hash_map_->Set(measurement.first, std::move(delta));
cumulative_hash_map_->Set(measurement.first, std::move(aggr), hash);
delta_hash_map_->Set(measurement.first, std::move(delta), hash);
}
else
{
// store received value in cumulative and delta map.
cumulative_hash_map_->Set(
measurement.first,
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));
delta_hash_map_->Set(measurement.first, std::move(aggr));
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr),
hash);
delta_hash_map_->Set(measurement.first, std::move(aggr), hash);
}
}
}
Expand Down
89 changes: 72 additions & 17 deletions sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "opentelemetry/sdk/common/attributemap_hash.h"
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/view/attributes_processor.h"
#include "opentelemetry/version.h"

#include <functional>
Expand All @@ -19,6 +20,7 @@ namespace sdk
{
namespace metrics
{

using opentelemetry::sdk::common::OrderedAttributeMap;

class AttributeHashGenerator
Expand All @@ -33,12 +35,12 @@ class AttributeHashGenerator
class AttributesHashMap
{
public:
Aggregation *Get(const MetricAttributes &attributes) const
Aggregation *Get(size_t hash) const
{
auto it = hash_map_.find(attributes);
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
return it->second.get();
return it->second.second.get();
}
return nullptr;
}
Expand All @@ -47,35 +49,89 @@ class AttributesHashMap
* @return check if key is present in hash
*
*/
bool Has(const MetricAttributes &attributes) const
{
return (hash_map_.find(attributes) == hash_map_.end()) ? false : true;
}
bool Has(size_t hash) const { return (hash_map_.find(hash) == hash_map_.end()) ? false : true; }
lalitb marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return the pointer to value for given key if present.
* If not present, it uses the provided callback to generate
* value and store in the hash
*/
Aggregation *GetOrSetDefault(const opentelemetry::common::KeyValueIterable &attributes,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
{
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
return it->second.second.get();
}

MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
}

Aggregation *GetOrSetDefault(std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
{
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
return it->second.second.get();
}
MetricAttributes attr{};
hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
}

Aggregation *GetOrSetDefault(const MetricAttributes &attributes,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
{
auto it = hash_map_.find(attributes);
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
return it->second.get();
return it->second.second.get();
}

hash_map_[attributes] = aggregation_callback();
return hash_map_[attributes].get();
MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
}

/**
* Set the value for given key, overwriting the value if already present
*/
void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> value)
void Set(const opentelemetry::common::KeyValueIterable &attributes,
std::unique_ptr<Aggregation> aggr,
size_t hash)
{
hash_map_[attributes] = std::move(value);
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
it->second.second = std::move(aggr);
}
else
{
MetricAttributes attr{attributes};
hash_map_[hash] = {attr, std::move(aggr)};
}
}

void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> aggr, size_t hash)
{
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
it->second.second = std::move(aggr);
}
else
{
MetricAttributes attr{attributes};
hash_map_[hash] = {attr, std::move(aggr)};
}
}

/**
Expand All @@ -86,7 +142,7 @@ class AttributesHashMap
{
for (auto &kv : hash_map_)
{
if (!callback(kv.first, *(kv.second.get())))
if (!callback(kv.second.first, *(kv.second.second.get())))
{
return false; // callback is not prepared to consume data
}
Expand All @@ -100,8 +156,7 @@ class AttributesHashMap
size_t Size() { return hash_map_.size(); }

private:
std::unordered_map<MetricAttributes, std::unique_ptr<Aggregation>, AttributeHashGenerator>
hash_map_;
std::unordered_map<size_t, std::pair<MetricAttributes, std::unique_ptr<Aggregation>>> hash_map_;
};
} // namespace metrics

Expand Down
43 changes: 33 additions & 10 deletions sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor},
attributes_processor_(attributes_processor),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
#endif
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)

{
create_default_aggregation_ = [&, aggregation_type,
aggregation_config]() -> std::unique_ptr<Aggregation> {
Expand All @@ -60,8 +59,9 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now());
#endif
static size_t hash = opentelemetry::sdk::common::GetHash("");
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
attributes_hashmap_->GetOrSetDefault(create_default_aggregation_, hash)->Aggregate(value);
}

void RecordLong(int64_t value,
Expand All @@ -77,9 +77,21 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
exemplar_reservoir_->OfferMeasurement(value, attributes, context,
std::chrono::system_clock::now());
#endif
auto attr = attributes_processor_->process(attributes);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(
attributes, [this](nostd::string_view key) {
if (attributes_processor_)
{
return attributes_processor_->isPresent(key);
}
else
{
return true;
}
});

std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
attributes_hashmap_->GetOrSetDefault(attributes, create_default_aggregation_, hash)
->Aggregate(value);
}

void RecordDouble(double value,
Expand All @@ -93,8 +105,9 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now());
#endif
static size_t hash = opentelemetry::sdk::common::GetHash("");
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
attributes_hashmap_->GetOrSetDefault(create_default_aggregation_, hash)->Aggregate(value);
}

void RecordDouble(double value,
Expand All @@ -114,9 +127,20 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
exemplar_reservoir_->OfferMeasurement(value, attributes, context,
std::chrono::system_clock::now());
#endif
auto attr = attributes_processor_->process(attributes);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(
attributes, [this](nostd::string_view key) {
if (attributes_processor_)
{
return attributes_processor_->isPresent(key);
}
else
{
return true;
}
});
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
attributes_hashmap_->GetOrSetDefault(attributes, create_default_aggregation_, hash)
->Aggregate(value);
}

bool Collect(CollectorHandle *collector,
Expand All @@ -127,16 +151,15 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage

private:
InstrumentDescriptor instrument_descriptor_;

// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
// unreported metrics stash for all the collectors
std::unordered_map<CollectorHandle *, std::list<std::shared_ptr<AttributesHashMap>>>
unreported_metrics_;
// last reported metrics stash for all the collectors.
std::unordered_map<CollectorHandle *, LastReportedMetrics> last_reported_metrics_;
const AttributesProcessor *attributes_processor_;
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
const AttributesProcessor *attributes_processor_;
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include "opentelemetry/sdk/common/attribute_utils.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
Expand All @@ -20,10 +21,14 @@ class AttributesProcessor
{
public:
// Process the metric instrument attributes.
// @returns The processed attributes
// @returns integer with individual bits set if they are to be filtered.

virtual MetricAttributes process(
const opentelemetry::common::KeyValueIterable &attributes) const noexcept = 0;
virtual ~AttributesProcessor() = default;

virtual bool isPresent(nostd::string_view key) const noexcept = 0;

virtual ~AttributesProcessor() = default;
};

/**
Expand All @@ -33,12 +38,15 @@ class AttributesProcessor

class DefaultAttributesProcessor : public AttributesProcessor
{
public:
MetricAttributes process(
const opentelemetry::common::KeyValueIterable &attributes) const noexcept override
{
MetricAttributes result(attributes);
return result;
}

bool isPresent(nostd::string_view /*key*/) const noexcept override { return true; }
};

/**
Expand Down Expand Up @@ -70,6 +78,11 @@ class FilteringAttributesProcessor : public AttributesProcessor
return result;
}

bool isPresent(nostd::string_view key) const noexcept override
{
return (allowed_attribute_keys_.find(key.data()) != allowed_attribute_keys_.end());
}

private:
std::unordered_map<std::string, bool> allowed_attribute_keys_;
};
Expand Down
Loading