Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hard-coded deadline for reflection stream. #150

Merged
merged 8 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/libCli/Call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,9 @@ namespace cli
}
}

//before calling the RPC, close the DescDb connection with a timeout.
grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbWithDeadline(serverAddress, deadline);
if (not dbDescStatus.ok())
{
std::cerr << "Failed to close reflection stream ;( Status code: " << std::to_string(dbDescStatus.error_code()) << " " << cli::getGrpcStatusCodeAsString(dbDescStatus.error_code()) << ", error message: " << dbDescStatus.error_message() << std::endl;
if(dbDescStatus.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED)
{
std::cerr << "Note: You can increase the deadline by setting the --rpcTimeoutMilliseconds option to a number or 'None'." << std::endl;
}
return -1;
}
//before calling the RPC, close the DescDb connection with a default timeout. We still continue with rpc call
//but remove the cache file if the stream was not closed gracefully so it fetches the reflection data again next time.
grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbStream(serverAddress);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbStream(serverAddress);
ConnectionManager::getInstance().closeDescDbStream(serverAddress);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


grpc::testing::CliCall call(channel, methodStr, clientMetadata, deadline);

Expand Down
5 changes: 2 additions & 3 deletions src/libCli/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ namespace cli
return m_connections[f_serverAddress].descPool;
}

grpc::Status ConnectionManager::closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status ConnectionManager::closeDescDbStream(std::string f_serverAddress)
{
if (m_connections[f_serverAddress].descDbProxy == nullptr)
{
Expand All @@ -75,7 +74,7 @@ namespace cli
}

//if proxy exists close the stream with a deadline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//if proxy exists close the stream with a deadline.
//if proxy exists close the stream

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream(deadline);
grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream();

//delete the proxy, findChannelByAddress() protects from accessing uninitialzed DbProxy.
m_connections[f_serverAddress].descDbProxy.reset();
Expand Down
6 changes: 2 additions & 4 deletions src/libCli/libCli/ConnectionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ namespace cli
/// @returns the gRpc DescriptorPool of the corresponding server address.
std::shared_ptr<grpc::protobuf::DescriptorPool> getDescPool(std::string f_serverAddress, ArgParse::ParsedElement &f_parseTree);

/// @brief closes the DescDb stream with a given deadline.
/// @brief closes the DescDb stream with a default deadline.
abb3r marked this conversation as resolved.
Show resolved Hide resolved
/// @param f_serverAddress server addresss to lookup the assigned DescDbProxy.
/// @param deadline optional dealine for closing the stream.
/// @return returns grpc::StatusCode::ABORTED status if no DescDb proxy is attached to the server address,
/// otherwise grpc status as a result of stream closure.
grpc::Status closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream(std::string f_serverAddress);

private:
ConnectionManager() {}
Expand Down
22 changes: 17 additions & 5 deletions src/libLocalDescriptorCache/DescDbProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,21 +372,29 @@ void DescDbProxy::getDescriptors(const std::string &f_hostAddress)
}
}

grpc::Status DescDbProxy::closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status DescDbProxy::closeDescDbStream()
{
grpc::Status status;
if ( m_reflectionDescDb == nullptr )
{
return grpc::Status::OK;
return status;
}
return m_reflectionDescDb->closeStreamWithDeadline(deadline);
status = m_reflectionDescDb->closeDescDbStream();
if(not status.ok())
{
//failure to close stream leads to invalid cache,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good thought :)

//removing it here so it will be written again next time.
std::string cacheFilePath = prepareCacheFile();
std::filesystem::remove(cacheFilePath);
}
return status;
}

DescDbProxy::DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel,
ArgParse::ParsedElement &parseTree)
{
m_channel = channel;
m_parseTree = parseTree;
m_disableCache = disableCache;
if(disableCache)
{
// Get Desc directly via reflection and without touching localDB
Expand All @@ -410,4 +418,8 @@ DescDbProxy::DescDbProxy(bool disableCache, const std::string &hostAddress, std:
}
}

DescDbProxy::~DescDbProxy(){}
DescDbProxy::~DescDbProxy()
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, isn't destructor called each time gwhisper terminates?

If so, we would invalidate the cache each time. This would make the cache use-less :P It's only purpose is to transport descriptr info from one invocation the the next.
(e.g. every time a user presses gwhisper is started afresh and parses the input / gets descriptors without cache)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes the proxy dtor is called evrytime.
actually the reflection stream if already closed gracefully will be make the close at proxy dtor a noop since we check for nullptr.
The cache is only invalidated if we fail to close the stream in time. so we don't disable it and the functional tests are green to prove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, understand. Maybe add a comment in the destructor.
Then it should be OK

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

closeDescDbStream(); //close it here to ensure invalid cache file is removed if an error occurs.
//enforcing descDb repopulation next time.
}
5 changes: 2 additions & 3 deletions src/libLocalDescriptorCache/DescDbProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{
/// @param hostAdress Address to the current host
void getDescriptors(const std::string &hostAddress);

/// @brief close the DescDb stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the DescDb stream.
/// @brief close the DescDb stream with a default deadline.
/// @return return grpc status as a result of call the finish() on the DescDb stream.
grpc::Status closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream();

DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel, ArgParse::ParsedElement &parseTree);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
// Provide a list of full names of registered services
bool GetServices(std::vector<grpc::string>* output);

/// @brief close the reflection stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the reflection stream.
/// @brief close the reflection stream with a default deadline.
/// @return return grpc status as a result of call the finish() on the reflection stream.
grpc::Status closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream();

private:
typedef ClientReaderWriter<
Expand Down
14 changes: 9 additions & 5 deletions third_party/gRPC_utils/proto_reflection_descriptor_database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using grpc::reflection::v1alpha::ServerReflection;
using grpc::reflection::v1alpha::ServerReflectionRequest;
using grpc::reflection::v1alpha::ServerReflectionResponse;

const uint8_t g_timeoutGrpcMainStreamSeconds = 20; //using default gwhisper timeout of 20 seconds.
Copy link
Author

@abb3r abb3r Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increased since we already have default rpc timeout of 30secs anyway, is that too big? we saw that 10secs easily times out on slower systems.

namespace grpc {

ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
Expand Down Expand Up @@ -300,6 +301,9 @@ void ProtoReflectionDescriptorDatabase::AddFileFromResponse(
const std::shared_ptr<ProtoReflectionDescriptorDatabase::ClientStream>
ProtoReflectionDescriptorDatabase::GetStream() {
if (!stream_) {
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(g_timeoutGrpcMainStreamSeconds);
ctx_.set_deadline(deadline);
stream_ = stub_->ServerReflectionInfo(&ctx_);
}
return stream_;
Expand All @@ -317,16 +321,13 @@ bool ProtoReflectionDescriptorDatabase::DoOneRequest(
return success;
}

grpc::Status ProtoReflectionDescriptorDatabase::closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status ProtoReflectionDescriptorDatabase::closeDescDbStream()
{
stream_mutex_.lock();
if( deadline != std::nullopt )
{
ctx_.set_deadline(deadline.value());
}

auto status = closeStream();
stream_.reset();

stream_mutex_.unlock();
return status;
}
Expand All @@ -342,6 +343,9 @@ grpc::Status ProtoReflectionDescriptorDatabase::closeStream()
fprintf(stderr,
"Reflection request not implemented; "
"is the ServerReflection service enabled?\n");
} else if (status.error_code() == StatusCode::DEADLINE_EXCEEDED) {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Grpc Server failed to close the stream within %d seconds.\n", g_timeoutGrpcMainStreamSeconds);
} else {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Error code: %d, message: %s, "
Expand Down
Loading