Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 86 additions & 38 deletions source/extensions/common/dynamic_forward_proxy/dns_cache_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,25 +194,28 @@ void DnsCacheImpl::startCacheLoad(const std::string& host, uint16_t default_port
return;
}

const auto host_attributes = Http::Utility::parseAuthority(host);
primary_host = createHost(host, default_port);
startResolve(host, *primary_host);
}

DnsCacheImpl::PrimaryHostInfo* DnsCacheImpl::createHost(const std::string& host,
uint16_t default_port) {
const auto host_attributes = Http::Utility::parseAuthority(host);
// TODO(mattklein123): Right now, the same host with different ports will become two
// independent primary hosts with independent DNS resolutions. I'm not sure how much this will
// matter, but we could consider collapsing these down and sharing the underlying DNS resolution.
{
absl::WriterMutexLock writer_lock{&primary_hosts_lock_};
primary_host = primary_hosts_
// try_emplace() is used here for direct argument forwarding.
.try_emplace(host, std::make_unique<PrimaryHostInfo>(
*this, std::string(host_attributes.host_),
host_attributes.port_.value_or(default_port),
host_attributes.is_ip_address_,
[this, host]() { onReResolve(host); },
[this, host]() { onResolveTimeout(host); }))
.first->second.get();
return primary_hosts_
// try_emplace() is used here for direct argument forwarding.
.try_emplace(host,
std::make_unique<PrimaryHostInfo>(
*this, std::string(host_attributes.host_),
host_attributes.port_.value_or(default_port),
host_attributes.is_ip_address_, [this, host]() { onReResolve(host); },
[this, host]() { onResolveTimeout(host); }))
.first->second.get();
}

startResolve(host, *primary_host);
}

DnsCacheImpl::PrimaryHostInfo& DnsCacheImpl::getPrimaryHost(const std::string& host) {
Expand Down Expand Up @@ -288,13 +291,15 @@ void DnsCacheImpl::startResolve(const std::string& host, PrimaryHostInfo& host_i

void DnsCacheImpl::finishResolve(const std::string& host,
Network::DnsResolver::ResolutionStatus status,
std::list<Network::DnsResponse>&& response, bool from_cache) {
std::list<Network::DnsResponse>&& response,
absl::optional<MonotonicTime> resolution_time) {
ASSERT(main_thread_dispatcher_.isThreadSafe());
ENVOY_LOG_EVENT(debug, "dns_cache_finish_resolve",
"main thread resolve complete for host '{}': {}", host,
accumulateToString<Network::DnsResponse>(response, [](const auto& dns_response) {
return dns_response.address_->asString();
}));
const bool from_cache = resolution_time.has_value();

// Functions like this one that modify primary_hosts_ are only called in the main thread so we
// know it is safe to use the PrimaryHostInfo pointers outside of the lock.
Expand All @@ -305,9 +310,19 @@ void DnsCacheImpl::finishResolve(const std::string& host,
return primary_host_it->second.get();
}();

const bool first_resolve = !primary_host_info->host_info_->firstResolveComplete();
primary_host_info->timeout_timer_->disableTimer();
primary_host_info->active_query_ = nullptr;
bool first_resolve = false;

if (!from_cache) {
first_resolve = !primary_host_info->host_info_->firstResolveComplete();
primary_host_info->timeout_timer_->disableTimer();
primary_host_info->active_query_ = nullptr;

if (status == Network::DnsResolver::ResolutionStatus::Failure) {
stats_.dns_query_failure_.inc();
} else {
stats_.dns_query_success_.inc();
}
}

// If the DNS resolver successfully resolved with an empty response list, the dns cache does not
// update. This ensures that a potentially previously resolved address does not stabilize back to
Expand All @@ -317,12 +332,6 @@ void DnsCacheImpl::finishResolve(const std::string& host,
primary_host_info->port_)
: nullptr;

if (status == Network::DnsResolver::ResolutionStatus::Failure) {
stats_.dns_query_failure_.inc();
} else {
stats_.dns_query_success_.inc();
}

// Only the change the address if:
// 1) The new address is valid &&
// 2a) The host doesn't yet have an address ||
Expand All @@ -333,26 +342,37 @@ void DnsCacheImpl::finishResolve(const std::string& host,
bool address_changed = false;
auto current_address = primary_host_info->host_info_->address();
if (new_address != nullptr && (current_address == nullptr || *current_address != *new_address)) {
if (!from_cache) {
addCacheEntry(host, new_address);
}
// TODO(alyssawilk) don't immediately push cached entries to threads.
// Only serve stale entries if a configured resolve timeout has fired.
ENVOY_LOG(debug, "host '{}' address has changed", host);
primary_host_info->host_info_->setAddress(new_address);
runAddUpdateCallbacks(host, primary_host_info->host_info_);
address_changed = true;
stats_.host_address_changed_.inc();
}

if (first_resolve || address_changed) {
if (!resolution_time.has_value()) {
resolution_time = main_thread_dispatcher_.timeSource().monotonicTime();
}
if (new_address) {
// Update the cache entry and staleness any time the ttl changes.
if (!from_cache) {
addCacheEntry(host, new_address, response.front().ttl_);
}
primary_host_info->host_info_->updateStale(resolution_time.value(), response.front().ttl_);
}

if (first_resolve) {
primary_host_info->host_info_->setFirstResolveComplete();
}
if (first_resolve || address_changed) {
// TODO(alyssawilk) only notify threads of stale results after a resolution
// timeout.
notifyThreads(host, primary_host_info->host_info_);
}

// Kick off the refresh timer.
// TODO(mattklein123): Consider jitter here. It may not be necessary since the initial host
// is populated dynamically.
// TODO(alyssawilk) also consider TTL here.
if (status == Network::DnsResolver::ResolutionStatus::Success) {
failure_backoff_strategy_->reset();
primary_host_info->refresh_timer_->enableTimer(refresh_interval_);
Expand Down Expand Up @@ -429,12 +449,16 @@ DnsCacheImpl::PrimaryHostInfo::~PrimaryHostInfo() {
}

void DnsCacheImpl::addCacheEntry(const std::string& host,
const Network::Address::InstanceConstSharedPtr& address) {
const Network::Address::InstanceConstSharedPtr& address,
const std::chrono::seconds ttl) {
if (!key_value_store_) {
return;
}
// TODO(alyssawilk) cache data should include TTL, or some other indicator.
const std::string value = absl::StrCat(address->asString());
MonotonicTime now = main_thread_dispatcher_.timeSource().monotonicTime();
uint64_t seconds_since_epoch =
std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
const std::string value =
absl::StrCat(address->asString(), "|", ttl.count(), "|", seconds_since_epoch);
key_value_store_->addOrUpdate(host, value);
}

Expand All @@ -455,18 +479,42 @@ void DnsCacheImpl::loadCacheEntries(
key_value_store_ = factory.createStore(config.key_value_config(), validation_visitor_,
main_thread_dispatcher_, file_system_);
KeyValueStore::ConstIterateCb load = [this](const std::string& key, const std::string& value) {
auto address = Network::Utility::parseInternetAddressAndPortNoThrow(value);
if (address == nullptr) {
Network::Address::InstanceConstSharedPtr address;
const auto parts = StringUtil::splitToken(value, "|");
std::chrono::seconds ttl(0);
absl::optional<MonotonicTime> resolution_time;
if (parts.size() == 3) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misreading this code but it seems like up on line 452, we write 3 things into the cache value. Down here, we spilt on | and ensure that we have 2 parts. This all makes sense. But I only see code looking at parts[0] (line 478) and parts[1] (line 483) but I don't see a reference to parts[2]. Is this expected?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I wasn't using part two yet. I was going to do in the PR where we start using stale, but I've worked it into this one with the time updates.
I'll say I'm not 100% on the conversion back and forth but it's hard to test until we have stale expirey which is coming in the next PR!

address = Network::Utility::parseInternetAddressAndPortNoThrow(std::string(parts[0]));
if (address == nullptr) {
ENVOY_LOG(warn, "{} is not a valid address", parts[0]);
}
uint64_t ttl_int;
if (absl::SimpleAtoi(parts[1], &ttl_int) && ttl_int != 0) {
ttl = std::chrono::seconds(ttl_int);
} else {
ENVOY_LOG(warn, "{} is not a valid ttl", parts[1]);
}
uint64_t epoch_int;
if (absl::SimpleAtoi(parts[2], &epoch_int)) {
MonotonicTime now = main_thread_dispatcher_.timeSource().monotonicTime();
const std::chrono::seconds seconds_since_epoch =
std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
resolution_time = main_thread_dispatcher_.timeSource().monotonicTime() -
(seconds_since_epoch - std::chrono::seconds(epoch_int));
}
} else {
ENVOY_LOG(warn, "Incorrect number of tokens in the cache line");
}
if (address == nullptr || ttl == std::chrono::seconds(0) || !resolution_time.has_value()) {
ENVOY_LOG(warn, "Unable to parse cache line '{}'", value);
return KeyValueStore::Iterate::Break;
}
stats_.cache_load_.inc();
std::list<Network::DnsResponse> response;
// TODO(alyssawilk) change finishResolve to actually use the TTL rather than
// putting 0 here, return the remaining TTL or indicate the result is stale.
response.emplace_back(Network::DnsResponse(address, std::chrono::seconds(0) /* ttl */));
startCacheLoad(key, address->ip()->port());
finishResolve(key, Network::DnsResolver::ResolutionStatus::Success, std::move(response), true);
createHost(key, address->ip()->port());
response.emplace_back(Network::DnsResponse(address, ttl));
finishResolve(key, Network::DnsResolver::ResolutionStatus::Success, std::move(response),
resolution_time);
return KeyValueStore::Iterate::Continue;
};
key_value_store_->iterate(load);
Expand Down
14 changes: 11 additions & 3 deletions source/extensions/common/dynamic_forward_proxy/dns_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class DnsCacheImpl : public DnsCache, Logger::Loggable<Logger::Id::forward_proxy
class DnsHostInfoImpl : public DnsHostInfo {
public:
DnsHostInfoImpl(TimeSource& time_source, absl::string_view resolved_host, bool is_ip_address)
: time_source_(time_source), resolved_host_(resolved_host), is_ip_address_(is_ip_address) {
: time_source_(time_source), resolved_host_(resolved_host), is_ip_address_(is_ip_address),
stale_at_time_(time_source.monotonicTime()) {
touch();
}

Expand All @@ -112,6 +113,9 @@ class DnsCacheImpl : public DnsCache, Logger::Loggable<Logger::Id::forward_proxy
const std::string& resolvedHost() const override { return resolved_host_; }
bool isIpAddress() const override { return is_ip_address_; }
void touch() final { last_used_time_ = time_source_.monotonicTime().time_since_epoch(); }
void updateStale(MonotonicTime resolution_time, std::chrono::seconds ttl) {
stale_at_time_ = resolution_time + ttl;
}

void setAddress(Network::Address::InstanceConstSharedPtr address) {
absl::WriterMutexLock lock{&resolve_lock_};
Expand Down Expand Up @@ -141,6 +145,7 @@ class DnsCacheImpl : public DnsCache, Logger::Loggable<Logger::Id::forward_proxy
// Using std::chrono::steady_clock::duration is required for compilation within an atomic vs.
// using MonotonicTime.
std::atomic<std::chrono::steady_clock::duration> last_used_time_;
std::atomic<MonotonicTime> stale_at_time_;
bool first_resolve_complete_ ABSL_GUARDED_BY(resolve_lock_){false};
};

Expand Down Expand Up @@ -177,7 +182,8 @@ class DnsCacheImpl : public DnsCache, Logger::Loggable<Logger::Id::forward_proxy
void startResolve(const std::string& host, PrimaryHostInfo& host_info)
ABSL_LOCKS_EXCLUDED(primary_hosts_lock_);
void finishResolve(const std::string& host, Network::DnsResolver::ResolutionStatus status,
std::list<Network::DnsResponse>&& response, bool from_cache = false);
std::list<Network::DnsResponse>&& response,
absl::optional<MonotonicTime> resolution_time = {});
void runAddUpdateCallbacks(const std::string& host, const DnsHostInfoSharedPtr& host_info);
void runRemoveCallbacks(const std::string& host);
void notifyThreads(const std::string& host, const DnsHostInfoImplSharedPtr& resolved_info);
Expand All @@ -186,10 +192,12 @@ class DnsCacheImpl : public DnsCache, Logger::Loggable<Logger::Id::forward_proxy
PrimaryHostInfo& getPrimaryHost(const std::string& host);

void addCacheEntry(const std::string& host,
const Network::Address::InstanceConstSharedPtr& address);
const Network::Address::InstanceConstSharedPtr& address,
const std::chrono::seconds ttl);
void removeCacheEntry(const std::string& host);
void loadCacheEntries(
const envoy::extensions::common::dynamic_forward_proxy::v3::DnsCacheConfig& config);
PrimaryHostInfo* createHost(const std::string& host, uint16_t default_port);

Event::Dispatcher& main_thread_dispatcher_;
const Network::DnsLookupFamily dns_lookup_family_;
Expand Down
2 changes: 1 addition & 1 deletion test/common/http/http2/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ TEST_P(Http2CodecImplTest, IdlePing) {
// Advance time past 1s. This time the ping should be sent, and the timeout
// alarm enabled.
RequestEncoder* request_encoder2 = &client_->newStream(response_decoder_);
client_connection_.dispatcher_.time_system_.advanceTimeAsyncImpl(std::chrono::seconds(2));
client_connection_.dispatcher_.globalTimeSystem().advanceTimeAsyncImpl(std::chrono::seconds(2));
EXPECT_CALL(*timeout_timer, enableTimer(_, _)).Times(0);
EXPECT_CALL(request_decoder_, decodeHeaders_(_, true));
EXPECT_TRUE(request_encoder2->encodeHeaders(request_headers, true).ok());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,9 @@ TEST(UtilityTest, PrepareDnsRefreshStrategy) {
}

TEST_F(DnsCacheImplTest, ResolveSuccessWithCaching) {
auto* time_source = new NiceMock<MockTimeSystem>();
dispatcher_.time_system_.reset(time_source);

// Configure the cache.
MockKeyValueStoreFactory factory;
EXPECT_CALL(factory, createEmptyConfigProto()).WillRepeatedly(Invoke([]() {
Expand Down Expand Up @@ -1064,14 +1067,14 @@ TEST_F(DnsCacheImplTest, ResolveSuccessWithCaching) {

EXPECT_CALL(*timeout_timer, disableTimer());
// Make sure the store gets the first insert.
EXPECT_CALL(*store, addOrUpdate("foo.com", "10.0.0.1:80"));
EXPECT_CALL(update_callbacks_,
onDnsHostAddOrUpdate("foo.com", DnsHostInfoEquals("10.0.0.1:80", "foo.com", false)));
EXPECT_CALL(*store, addOrUpdate("foo.com", "10.0.0.1:80|30|0"));
EXPECT_CALL(callbacks,
onLoadDnsCacheComplete(DnsHostInfoEquals("10.0.0.1:80", "foo.com", false)));
EXPECT_CALL(*resolve_timer, enableTimer(std::chrono::milliseconds(60000), _));
resolve_cb(Network::DnsResolver::ResolutionStatus::Success,
TestUtility::makeDnsResponse({"10.0.0.1"}));
TestUtility::makeDnsResponse({"10.0.0.1"}, std::chrono::seconds(30)));

checkStats(1 /* attempt */, 1 /* success */, 0 /* failure */, 1 /* address changed */,
1 /* added */, 0 /* removed */, 1 /* num hosts */);
Expand All @@ -1087,9 +1090,10 @@ TEST_F(DnsCacheImplTest, ResolveSuccessWithCaching) {

// Address does not change.
EXPECT_CALL(*timeout_timer, disableTimer());
EXPECT_CALL(*store, addOrUpdate("foo.com", "10.0.0.1:80|30|0"));
EXPECT_CALL(*resolve_timer, enableTimer(std::chrono::milliseconds(60000), _));
resolve_cb(Network::DnsResolver::ResolutionStatus::Success,
TestUtility::makeDnsResponse({"10.0.0.1"}));
TestUtility::makeDnsResponse({"10.0.0.1"}, std::chrono::seconds(30)));

checkStats(2 /* attempt */, 2 /* success */, 0 /* failure */, 1 /* address changed */,
1 /* added */, 0 /* removed */, 1 /* num hosts */);
Expand All @@ -1105,15 +1109,31 @@ TEST_F(DnsCacheImplTest, ResolveSuccessWithCaching) {

EXPECT_CALL(*timeout_timer, disableTimer());
// Make sure the store gets the updated address.
EXPECT_CALL(*store, addOrUpdate("foo.com", "10.0.0.2:80"));
EXPECT_CALL(update_callbacks_,
onDnsHostAddOrUpdate("foo.com", DnsHostInfoEquals("10.0.0.2:80", "foo.com", false)));
EXPECT_CALL(*store, addOrUpdate("foo.com", "10.0.0.2:80|30|0"));
EXPECT_CALL(*resolve_timer, enableTimer(std::chrono::milliseconds(60000), _));
resolve_cb(Network::DnsResolver::ResolutionStatus::Success,
TestUtility::makeDnsResponse({"10.0.0.2"}));
TestUtility::makeDnsResponse({"10.0.0.2"}, std::chrono::seconds(30)));

checkStats(3 /* attempt */, 3 /* success */, 0 /* failure */, 2 /* address changed */,
1 /* added */, 0 /* removed */, 1 /* num hosts */);

// Now do one more resolve, where the address does not change but the time
// does.

// Re-resolve timer.
EXPECT_CALL(*timeout_timer, enableTimer(std::chrono::milliseconds(5000), nullptr));
EXPECT_CALL(*resolver_, resolve("foo.com", _, _))
.WillOnce(DoAll(SaveArg<2>(&resolve_cb), Return(&resolver_->active_query_)));
resolve_timer->invokeCallback();

// Address does not change.
EXPECT_CALL(*timeout_timer, disableTimer());
EXPECT_CALL(*store, addOrUpdate("foo.com", "10.0.0.2:80|40|0"));
EXPECT_CALL(*resolve_timer, enableTimer(std::chrono::milliseconds(60000), _));
resolve_cb(Network::DnsResolver::ResolutionStatus::Success,
TestUtility::makeDnsResponse({"10.0.0.2"}, std::chrono::seconds(40)));
}

} // namespace
Expand Down
Loading