Skip to content

Commit

Permalink
Add debug string to raylet (#3317)
Browse files Browse the repository at this point in the history
* initial debug string

* format

* wip debug string

* fix compile

* fix

* update

* finished

* to file

* logs dir

* use temp root

* fix

* override
  • Loading branch information
ericl authored and pcmoritz committed Nov 16, 2018
1 parent d10cb57 commit e0bf9d7
Show file tree
Hide file tree
Showing 37 changed files with 404 additions and 26 deletions.
1 change: 1 addition & 0 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ def start_raylet(redis_address,
start_worker_command,
"", # Worker command for Java, not needed for Python.
redis_password or "",
get_temp_root(),
]

if use_valgrind:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ def init(redis_address=None,
driver_id: The ID of driver.
configure_logging: True if allow the logging cofiguration here.
Otherwise, the users may want to configure it by their own.
logging_level: Logging level, default will be loging.INFO.
logging_level: Logging level, default will be logging.INFO.
logging_format: Logging format, default will be "%(message)s"
which means only contains the message.
plasma_store_socket_name (str): If provided, it will specify the socket
Expand Down
23 changes: 23 additions & 0 deletions src/ray/common/client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ void ServerConnection<T>::ReadBuffer(
template <class T>
ray::Status ServerConnection<T>::WriteMessage(int64_t type, int64_t length,
const uint8_t *message) {
sync_writes_ += 1;
bytes_written_ += length;

std::vector<boost::asio::const_buffer> message_buffers;
auto write_version = RayConfig::instance().ray_protocol_version();
message_buffers.push_back(boost::asio::buffer(&write_version, sizeof(write_version)));
Expand All @@ -96,6 +99,9 @@ template <class T>
void ServerConnection<T>::WriteMessageAsync(
int64_t type, int64_t length, const uint8_t *message,
const std::function<void(const ray::Status &)> &handler) {
async_writes_ += 1;
bytes_written_ += length;

auto write_buffer = std::unique_ptr<AsyncWriteBuffer>(new AsyncWriteBuffer());
write_buffer->write_version = RayConfig::instance().ray_protocol_version();
write_buffer->write_type = type;
Expand Down Expand Up @@ -220,6 +226,7 @@ void ClientConnection<T>::ProcessMessageHeader(const boost::system::error_code &
RAY_CHECK(read_version_ == RayConfig::instance().ray_protocol_version());
// Resize the message buffer to match the received length.
read_message_.resize(read_length_);
ServerConnection<T>::bytes_read_ += read_length_;
// Wait for the message to be read.
boost::asio::async_read(
ServerConnection<T>::socket_, boost::asio::buffer(read_message_),
Expand All @@ -242,6 +249,22 @@ void ClientConnection<T>::ProcessMessage(const boost::system::error_code &error)
}
}

template <class T>
std::string ServerConnection<T>::DebugString() const {
std::stringstream result;
result << "\n- bytes read: " << bytes_read_;
result << "\n- bytes written: " << bytes_written_;
result << "\n- num async writes: " << async_writes_;
result << "\n- num sync writes: " << sync_writes_;
result << "\n- writing: " << async_write_in_flight_;
int64_t num_bytes = 0;
for (auto &buffer : async_write_queue_) {
num_bytes += buffer->write_length;
}
result << "\n- pending async bytes: " << num_bytes;
return result.str();
}

template class ServerConnection<boost::asio::local::stream_protocol>;
template class ServerConnection<boost::asio::ip::tcp>;
template class ClientConnection<boost::asio::local::stream_protocol>;
Expand Down
14 changes: 14 additions & 0 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection<T>
socket_.close(ec);
}

std::string DebugString() const;

protected:
/// A private constructor for a server connection.
ServerConnection(boost::asio::basic_stream_socket<T> &&socket);
Expand All @@ -97,6 +99,18 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection<T>
/// Whether we are in the middle of an async write.
bool async_write_in_flight_;

/// Count of async messages sent total.
int64_t async_writes_ = 0;

/// Count of sync messages sent total.
int64_t sync_writes_ = 0;

/// Count of bytes sent total.
int64_t bytes_written_ = 0;

/// Count of bytes read total.
int64_t bytes_read_ = 0;

private:
/// Asynchronously flushes the write queue. While async writes are running, the flag
/// async_write_in_flight_ will be set. This should only be called when no async writes
Expand Down
15 changes: 15 additions & 0 deletions src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,21 @@ Status AsyncGcsClient::Attach(boost::asio::io_service &io_service) {
return Status::OK();
}

std::string AsyncGcsClient::DebugString() const {
std::stringstream result;
result << "AsyncGcsClient:";
result << "\n- TaskTable: " << raylet_task_table_->DebugString();
result << "\n- ActorTable: " << actor_table_->DebugString();
result << "\n- TaskReconstructionLog: " << task_reconstruction_log_->DebugString();
result << "\n- TaskLeaseTable: " << task_lease_table_->DebugString();
result << "\n- HeartbeatTable: " << heartbeat_table_->DebugString();
result << "\n- ErrorTable: " << error_table_->DebugString();
result << "\n- ProfileTable: " << profile_table_->DebugString();
result << "\n- ClientTable: " << client_table_->DebugString();
result << "\n- DriverTable: " << driver_table_->DebugString();
return result.str();
}

ObjectTable &AsyncGcsClient::object_table() { return *object_table_; }

raylet::TaskTable &AsyncGcsClient::raylet_task_table() { return *raylet_task_table_; }
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class RAY_EXPORT AsyncGcsClient {
std::vector<std::shared_ptr<RedisContext>> shard_contexts() { return shard_contexts_; }
std::shared_ptr<RedisContext> primary_context() { return primary_context_; }

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;

private:
std::unique_ptr<FunctionTable> function_table_;
std::unique_ptr<ClassTable> class_table_;
Expand Down
35 changes: 35 additions & 0 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace gcs {
template <typename ID, typename Data>
Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> &dataT, const WriteCallback &done) {
num_appends_++;
auto callback = [this, id, dataT, done](const std::string &data) {
if (done != nullptr) {
(done)(client_, id, *dataT);
Expand All @@ -56,6 +57,7 @@ template <typename ID, typename Data>
Status Log<ID, Data>::AppendAt(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> &dataT, const WriteCallback &done,
const WriteCallback &failure, int log_length) {
num_appends_++;
auto callback = [this, id, dataT, done, failure](const std::string &data) {
if (data.empty()) {
if (done != nullptr) {
Expand All @@ -78,6 +80,7 @@ Status Log<ID, Data>::AppendAt(const JobID &job_id, const ID &id,

template <typename ID, typename Data>
Status Log<ID, Data>::Lookup(const JobID &job_id, const ID &id, const Callback &lookup) {
num_lookups_++;
auto callback = [this, id, lookup](const std::string &data) {
if (lookup != nullptr) {
std::vector<DataT> results;
Expand Down Expand Up @@ -164,9 +167,17 @@ Status Log<ID, Data>::CancelNotifications(const JobID &job_id, const ID &id,
pubsub_channel_, nullptr);
}

template <typename ID, typename Data>
std::string Log<ID, Data>::DebugString() const {
std::stringstream result;
result << "num lookups: " << num_lookups_ << ", num appends: " << num_appends_;
return result.str();
}

template <typename ID, typename Data>
Status Table<ID, Data>::Add(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> &dataT, const WriteCallback &done) {
num_adds_++;
auto callback = [this, id, dataT, done](const std::string &data) {
if (done != nullptr) {
(done)(client_, id, *dataT);
Expand All @@ -184,6 +195,7 @@ Status Table<ID, Data>::Add(const JobID &job_id, const ID &id,
template <typename ID, typename Data>
Status Table<ID, Data>::Lookup(const JobID &job_id, const ID &id, const Callback &lookup,
const FailureCallback &failure) {
num_lookups_++;
return Log<ID, Data>::Lookup(job_id, id,
[lookup, failure](AsyncGcsClient *client, const ID &id,
const std::vector<DataT> &data) {
Expand Down Expand Up @@ -221,6 +233,13 @@ Status Table<ID, Data>::Subscribe(const JobID &job_id, const ClientID &client_id
done);
}

template <typename ID, typename Data>
std::string Table<ID, Data>::DebugString() const {
std::stringstream result;
result << "num lookups: " << num_lookups_ << ", num adds: " << num_adds_;
return result.str();
}

Status ErrorTable::PushErrorToDriver(const JobID &job_id, const std::string &type,
const std::string &error_message, double timestamp) {
auto data = std::make_shared<ErrorTableDataT>();
Expand All @@ -234,6 +253,10 @@ Status ErrorTable::PushErrorToDriver(const JobID &job_id, const std::string &typ
});
}

std::string ErrorTable::DebugString() const {
return Log<JobID, ErrorTableData>::DebugString();
}

Status ProfileTable::AddProfileEvent(const std::string &event_type,
const std::string &component_type,
const UniqueID &component_id,
Expand Down Expand Up @@ -273,6 +296,10 @@ Status ProfileTable::AddProfileEventBatch(const ProfileTableData &profile_events
});
}

std::string ProfileTable::DebugString() const {
return Log<UniqueID, ProfileTableData>::DebugString();
}

Status DriverTable::AppendDriverData(const JobID &driver_id, bool is_dead) {
auto data = std::make_shared<DriverTableDataT>();
data->driver_id = driver_id.binary();
Expand Down Expand Up @@ -436,6 +463,14 @@ const std::unordered_map<ClientID, ClientTableDataT> &ClientTable::GetAllClients
return client_cache_;
}

std::string ClientTable::DebugString() const {
std::stringstream result;
result << Log<UniqueID, ClientTableData>::DebugString();
result << ", cache size: " << client_cache_.size()
<< ", num removed: " << removed_clients_.size();
return result.str();
}

template class Log<ObjectID, ObjectTableData>;
template class Log<TaskID, ray::protocol::Task>;
template class Table<TaskID, ray::protocol::Task>;
Expand Down
33 changes: 33 additions & 0 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ class Log : public LogInterface<ID, Data>, virtual public PubsubInterface<ID> {
Status CancelNotifications(const JobID &job_id, const ID &id,
const ClientID &client_id);

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;

protected:
std::shared_ptr<RedisContext> GetRedisContext(const ID &id) {
static std::hash<ray::UniqueID> index;
Expand All @@ -208,6 +213,10 @@ class Log : public LogInterface<ID, Data>, virtual public PubsubInterface<ID> {

/// Commands to a GCS table can either be regular (default) or chain-replicated.
CommandType command_type_ = CommandType::kRegular;

private:
int64_t num_appends_ = 0;
int64_t num_lookups_ = 0;
};

template <typename ID, typename Data>
Expand Down Expand Up @@ -295,13 +304,22 @@ class Table : private Log<ID, Data>,
const Callback &subscribe, const FailureCallback &failure,
const SubscriptionCallback &done);

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;

protected:
using Log<ID, Data>::shard_contexts_;
using Log<ID, Data>::client_;
using Log<ID, Data>::pubsub_channel_;
using Log<ID, Data>::prefix_;
using Log<ID, Data>::command_type_;
using Log<ID, Data>::GetRedisContext;

private:
int64_t num_adds_ = 0;
int64_t num_lookups_ = 0;
};

class ObjectTable : public Log<ObjectID, ObjectTableData> {
Expand Down Expand Up @@ -452,6 +470,11 @@ class ErrorTable : private Log<JobID, ErrorTableData> {
/// \return Status.
Status PushErrorToDriver(const JobID &job_id, const std::string &type,
const std::string &error_message, double timestamp);

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;
};

class ProfileTable : private Log<UniqueID, ProfileTableData> {
Expand Down Expand Up @@ -484,6 +507,11 @@ class ProfileTable : private Log<UniqueID, ProfileTableData> {
/// \param profile_events The profile events to record.
/// \return Status.
Status AddProfileEventBatch(const ProfileTableData &profile_events);

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;
};

using CustomSerializerTable = Table<ClassID, CustomSerializerData>;
Expand Down Expand Up @@ -583,6 +611,11 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// \return The client ID to client information map.
const std::unordered_map<ClientID, ClientTableDataT> &GetAllClients() const;

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;

private:
/// Handle a client table notification.
void HandleNotification(AsyncGcsClient *client, const ClientTableDataT &notifications);
Expand Down
16 changes: 16 additions & 0 deletions src/ray/object_manager/connection_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,20 @@ void ConnectionPool::Return(SenderMapType &conn_map, const ClientID &client_id,
RAY_LOG(DEBUG) << "Return " << client_id << " " << conn_map[client_id].size();
}

std::string ConnectionPool::DebugString() const {
std::stringstream result;
result << "ConnectionPool:";
result << "\n- num message send connections: " << message_send_connections_.size();
result << "\n- num transfer send connections: " << transfer_send_connections_.size();
result << "\n- num avail message send connections: "
<< available_transfer_send_connections_.size();
result << "\n- num avail transfer send connections: "
<< available_transfer_send_connections_.size();
result << "\n- num message receive connections: "
<< message_receive_connections_.size();
result << "\n- num transfer receive connections: "
<< transfer_receive_connections_.size();
return result.str();
}

} // namespace ray
5 changes: 5 additions & 0 deletions src/ray/object_manager/connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ class ConnectionPool {
/// \return Status of invoking this method.
ray::Status RemoveSender(ConnectionType type, std::shared_ptr<SenderConnection> conn);

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;

/// This object cannot be copied for thread-safety.
RAY_DISALLOW_COPY_AND_ASSIGN(ConnectionPool);

Expand Down
8 changes: 8 additions & 0 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,12 @@ void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) {
ARROW_CHECK_OK(store_client_.Delete(plasma_ids));
}

std::string ObjectBufferPool::DebugString() const {
std::stringstream result;
result << "BufferPool:";
result << "\n- get buffer state map size: " << get_buffer_state_.size();
result << "\n- create buffer state map size: " << create_buffer_state_.size();
return result.str();
}

} // namespace ray
5 changes: 5 additions & 0 deletions src/ray/object_manager/object_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ class ObjectBufferPool {
/// \return Void.
void FreeObjects(const std::vector<ObjectID> &object_ids);

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;

private:
/// Abort the create operation associated with an object. This destroys the buffer
/// state, including create operations in progress for all chunks of the object.
Expand Down
8 changes: 8 additions & 0 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,12 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
return status;
}

std::string ObjectDirectory::DebugString() const {
std::stringstream result;
result << "ObjectDirectory:";
result << "\n- num listeners: " << listeners_.size();
result << "\n- num eviction entries: " << object_evictions_.size();
return result.str();
}

} // namespace ray
Loading

0 comments on commit e0bf9d7

Please sign in to comment.