From 74d493cd929a624a6dc2af4626d7ff95896f9218 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Thu, 19 Jan 2017 19:37:53 -0800 Subject: [PATCH 1/2] Rewrite stream manager back presssure algorithm --- heron/common/src/cpp/basics/BUILD | 1 + heron/common/src/cpp/basics/spmultimap.h | 52 ++++++ heron/common/src/cpp/network/connection.cpp | 26 +-- heron/common/src/cpp/network/connection.h | 4 +- heron/proto/stmgr.proto | 2 + heron/stmgr/src/cpp/manager/stmgr-client.cpp | 9 +- heron/stmgr/src/cpp/manager/stmgr-client.h | 4 +- .../stmgr/src/cpp/manager/stmgr-clientmgr.cpp | 29 +++- heron/stmgr/src/cpp/manager/stmgr-clientmgr.h | 9 +- heron/stmgr/src/cpp/manager/stmgr-server.cpp | 152 +++++++++--------- heron/stmgr/src/cpp/manager/stmgr-server.h | 23 +-- heron/stmgr/src/cpp/manager/stmgr.cpp | 69 +++++++- heron/stmgr/src/cpp/manager/stmgr.h | 14 +- 13 files changed, 267 insertions(+), 127 deletions(-) create mode 100644 heron/common/src/cpp/basics/spmultimap.h diff --git a/heron/common/src/cpp/basics/BUILD b/heron/common/src/cpp/basics/BUILD index 82f872e76f9..99a36822698 100644 --- a/heron/common/src/cpp/basics/BUILD +++ b/heron/common/src/cpp/basics/BUILD @@ -33,6 +33,7 @@ cc_library( "sptest.h", "sptypes.h", "strutils.h", + "spmultimap.h" ], hdrs = [ "basics.h", diff --git a/heron/common/src/cpp/basics/spmultimap.h b/heron/common/src/cpp/basics/spmultimap.h new file mode 100644 index 00000000000..884cdb861f6 --- /dev/null +++ b/heron/common/src/cpp/basics/spmultimap.h @@ -0,0 +1,52 @@ +/* + * Copyright 2017 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __SP_MULTIMAP_H +#define __SP_MULTIMAP_H + +#include +#include +#include +#include + +template , + typename _Alloc = std::allocator>> +class sp_multimap : public std::multimap<_Key, _Tp, _Compare, _Alloc> { + public: + typedef typename std::multimap<_Key, _Tp, _Compare, _Alloc>::value_type value_type; + typedef typename std::multimap<_Key, _Tp, _Compare, _Alloc>::size_type size_type; + + using std::multimap<_Key, _Tp, _Compare, _Alloc>::erase; + + size_type erase(const value_type& v) { + auto iterpair = this->equal_range(v.first); + size_type num = 0; + + for (auto it = iterpair.first; it != iterpair.second;) { + if (it->second == v.second) { + it = erase(it); + num++; + } else { + it++; + } + } + + return num; + } +}; + +#endif diff --git a/heron/common/src/cpp/network/connection.cpp b/heron/common/src/cpp/network/connection.cpp index 3e1fa6b39d2..8be2cc8a32c 100644 --- a/heron/common/src/cpp/network/connection.cpp +++ b/heron/common/src/cpp/network/connection.cpp @@ -45,7 +45,7 @@ Connection::Connection(ConnectionEndPoint* endpoint, ConnectionOptions* options, mWriteBatchsize = __SYSTEM_NETWORK_DEFAULT_WRITE_BATCH_SIZE__; mCausedBackPressure = false; - mUnderBackPressure = false; + mUnderBackPressure = 0; mNumEnqueuesWithBufferFull = 0; } @@ -251,21 +251,25 @@ void Connection::handleDataRead() { } sp_int32 Connection::putBackPressure() { - mUnderBackPressure = true; - // For now stop reads from this connection - if (unregisterEndpointForRead() < 0) { - LOG(ERROR) << "Could not start back pressure on connection"; - return -1; + if (mUnderBackPressure == 0) { + // For now stop reads from this connection + if (unregisterEndpointForRead() < 0) { + LOG(ERROR) << "Could not start back pressure on connection"; + return -1; + } } + mUnderBackPressure++; return 0; } sp_int32 Connection::removeBackPressure() { - mUnderBackPressure = false; - // Resume reading from this connection - if (registerEndpointForRead() < 0) { - LOG(ERROR) << "Could not remove back pressure from connection"; - return -1; + mUnderBackPressure--; + if (mUnderBackPressure == 0) { + // Resume reading from this connection + if (registerEndpointForRead() < 0) { + LOG(ERROR) << "Could not remove back pressure from connection"; + return -1; + } } return 0; } diff --git a/heron/common/src/cpp/network/connection.h b/heron/common/src/cpp/network/connection.h index 482d3d49d06..2c1d176f090 100644 --- a/heron/common/src/cpp/network/connection.h +++ b/heron/common/src/cpp/network/connection.h @@ -102,7 +102,7 @@ class Connection : public BaseConnection { void setCausedBackPressure() { mCausedBackPressure = true; } void unsetCausedBackPressure() { mCausedBackPressure = false; } bool hasCausedBackPressure() const { return mCausedBackPressure; } - bool isUnderBackPressure() const { return mUnderBackPressure; } + bool isUnderBackPressure() const { return mUnderBackPressure > 0; } sp_int32 putBackPressure(); sp_int32 removeBackPressure(); @@ -153,7 +153,7 @@ class Connection : public BaseConnection { // Have we caused back pressure? bool mCausedBackPressure; // Are our reads being throttled? - bool mUnderBackPressure; + sp_int32 mUnderBackPressure; // How many times have we enqueued data and found that we had outstanding bytes > // HWM of back pressure threshold sp_uint8 mNumEnqueuesWithBufferFull; diff --git a/heron/proto/stmgr.proto b/heron/proto/stmgr.proto index 388160382df..30728c795a8 100644 --- a/heron/proto/stmgr.proto +++ b/heron/proto/stmgr.proto @@ -63,6 +63,7 @@ message StartBackPressureMessage { required string topology_id = 2; required string stmgr = 3; required string message_id = 4; + required int32 task_id = 5; } message StopBackPressureMessage { @@ -70,6 +71,7 @@ message StopBackPressureMessage { required string topology_id = 2; required string stmgr = 3; required string message_id = 4; + required int32 task_id = 5; } // Tuples exchanged between stream managers and instances diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp index 9c713022653..ee186e111f2 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp @@ -148,9 +148,6 @@ void StMgrClient::HandleHelloResponse(void*, proto::stmgr::StrMgrHelloResponse* Stop(); } delete _response; - if (client_manager_->DidAnnounceBackPressure()) { - SendStartBackPressureMessage(); - } } void StMgrClient::OnReConnectTimer() { Start(); } @@ -195,7 +192,7 @@ void StMgrClient::StopBackPressureConnectionCb(Connection* _connection) { client_manager_->StopBackPressureOnServer(other_stmgr_id_); } -void StMgrClient::SendStartBackPressureMessage() { +void StMgrClient::SendStartBackPressureMessage(const sp_int32 _task_id) { REQID_Generator generator; REQID rand = generator.generate(); // generator.generate(rand); @@ -205,12 +202,13 @@ void StMgrClient::SendStartBackPressureMessage() { message->set_topology_id(topology_id_); message->set_stmgr(our_stmgr_id_); message->set_message_id(rand.str()); + message->set_task_id(_task_id); SendMessage(*message); release(message); } -void StMgrClient::SendStopBackPressureMessage() { +void StMgrClient::SendStopBackPressureMessage(const sp_int32 _task_id) { REQID_Generator generator; REQID rand = generator.generate(); // generator.generate(rand); @@ -220,6 +218,7 @@ void StMgrClient::SendStopBackPressureMessage() { message->set_topology_id(topology_id_); message->set_stmgr(our_stmgr_id_); message->set_message_id(rand.str()); + message->set_task_id(_task_id); SendMessage(*message); release(message); diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.h b/heron/stmgr/src/cpp/manager/stmgr-client.h index d4926a258f7..234eba410d2 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-client.h +++ b/heron/stmgr/src/cpp/manager/stmgr-client.h @@ -44,8 +44,8 @@ class StMgrClient : public Client { void Quit(); void SendTupleStreamMessage(proto::stmgr::TupleStreamMessage2& _msg); - void SendStartBackPressureMessage(); - void SendStopBackPressureMessage(); + void SendStartBackPressureMessage(const sp_int32 _task_id); + void SendStopBackPressureMessage(const sp_int32 _task_id); protected: virtual void HandleConnect(NetworkErrorCode status); diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp index 3609c5a998f..04e4e5544a5 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp @@ -77,6 +77,7 @@ void StMgrClientMgr::NewPhysicalPlan(const proto::system::PhysicalPlan* _pplan) // This stmgr has actually moved to a different host/port clients_[s.id()]->Quit(); // this will delete itself. clients_[s.id()] = CreateClient(s.id(), s.host_name(), s.data_port()); + instance_stats_[s.id()].clear(); } else { // This stmgr has remained the same. Don't do anything } @@ -101,13 +102,10 @@ void StMgrClientMgr::NewPhysicalPlan(const proto::system::PhysicalPlan* _pplan) LOG(INFO) << "Stmgr " << *iter << " no longer required"; clients_[*iter]->Quit(); // This will delete itself. clients_.erase(*iter); + instance_stats_.erase(*iter); } } -bool StMgrClientMgr::DidAnnounceBackPressure() { - return stream_manager_->DidAnnounceBackPressure(); -} - StMgrClient* StMgrClientMgr::CreateClient(const sp_string& _other_stmgr_id, const sp_string& _hostname, sp_int32 _port) { stmgr_clientmgr_metrics_->scope(METRIC_STMGR_NEW_CONNECTIONS)->incr(); @@ -125,11 +123,26 @@ StMgrClient* StMgrClientMgr::CreateClient(const sp_string& _other_stmgr_id, return client; } +sp_int32 StMgrClientMgr::FindBusiestTaskOnStmgr(const sp_string& _stmgr_id) { + sp_int32 task_id; + sp_int64 max = 0; + for (auto iter = instance_stats_[_stmgr_id].begin(); + iter!= instance_stats_[_stmgr_id].end(); + iter++) { + if (iter->second > max) { + task_id = iter->first; + max = iter->second; + } + } + return task_id; +} + void StMgrClientMgr::SendTupleStreamMessage(sp_int32 _task_id, const sp_string& _stmgr_id, const proto::system::HeronTupleSet2& _msg) { auto iter = clients_.find(_stmgr_id); CHECK(iter != clients_.end()); + instance_stats_[_stmgr_id][_task_id] += _msg.GetCachedSize(); // Acquire the message proto::stmgr::TupleStreamMessage2* out = nullptr; out = clients_[_stmgr_id]->acquire(out); @@ -151,15 +164,15 @@ void StMgrClientMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) stream_manager_->StopBackPressureOnServer(_other_stmgr_id); } -void StMgrClientMgr::SendStartBackPressureToOtherStMgrs() { +void StMgrClientMgr::SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id) { for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) { - iter->second->SendStartBackPressureMessage(); + iter->second->SendStartBackPressureMessage(_task_id); } } -void StMgrClientMgr::SendStopBackPressureToOtherStMgrs() { +void StMgrClientMgr::SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id) { for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) { - iter->second->SendStopBackPressureMessage(); + iter->second->SendStopBackPressureMessage(_task_id); } } diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h index 7cf60d5e33b..d4262de4061 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h +++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h @@ -53,9 +53,9 @@ class StMgrClientMgr { void StopBackPressureOnServer(const sp_string& _other_stmgr_id); // Used by the server to tell the client to send the back pressure related // messages - void SendStartBackPressureToOtherStMgrs(); - void SendStopBackPressureToOtherStMgrs(); - bool DidAnnounceBackPressure(); + void SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id); + void SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id); + sp_int32 FindBusiestTaskOnStmgr(const sp_string& _stmgr_id); private: StMgrClient* CreateClient(const sp_string& _other_stmgr_id, const sp_string& _host_name, @@ -76,6 +76,9 @@ class StMgrClientMgr { sp_int64 high_watermark_; sp_int64 low_watermark_; + + // Counters for remote instance traffic, this is used for back pressure + std::unordered_map> instance_stats_; }; } // namespace stmgr diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index fcba33d702e..a3f4ef0f381 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -110,7 +110,7 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, back_pressure_metric_aggr_); metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT, back_pressure_metric_initiated_); - spouts_under_back_pressure_ = false; + instances_under_back_pressure_ = 0; // Update queue related metrics every 10 seconds CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) { @@ -203,10 +203,13 @@ void StMgrServer::HandleConnectionClose(Connection* _conn, NetworkErrorCode) { sp_string stmgr_id = rstmgrs_[_conn]; // Did we receive a start back pressure message from this stmgr to // begin with? - if (stmgrs_who_announced_back_pressure_.find(stmgr_id) != - stmgrs_who_announced_back_pressure_.end()) { - stmgrs_who_announced_back_pressure_.erase(stmgr_id); + // Note: Connections to other stream managers get handled in StmgrClient + // Now attempt to stop the back pressure + auto iterpair = stmgrs_who_announced_back_pressure_.equal_range(stmgr_id); + for (auto it = iterpair.first; it != iterpair.second; ++it) { + AttemptStopBackPressureFromInstances(it->second, false); } + stmgrs_who_announced_back_pressure_.erase(stmgr_id); } else if (remote_ends_who_caused_back_pressure_.find(GetInstanceName(_conn)) != remote_ends_who_caused_back_pressure_.end()) { _conn->unsetCausedBackPressure(); @@ -215,13 +218,10 @@ void StMgrServer::HandleConnectionClose(Connection* _conn, NetworkErrorCode) { // This is a instance connection heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[GetInstanceName(_conn)]; instance_metric->Stop(); - if (remote_ends_who_caused_back_pressure_.empty()) { - SendStopBackPressureToOtherStMgrs(); - } + + SendStopBackPressureToOtherStMgrs(active_instances_[_conn]); + AttemptStopBackPressureFromInstances(active_instances_[_conn], true); } - // Note: Connections to other stream managers get handled in StmgrClient - // Now attempt to stop the back pressure - AttemptStopBackPressureFromSpouts(); // Now cleanup the data structures auto siter = rstmgrs_.find(_conn); @@ -319,7 +319,8 @@ void StMgrServer::HandleRegisterInstanceRequest(REQID _reqid, Connection* _conn, if (instance_info_.find(task_id) != instance_info_.end() && instance_info_[task_id]->conn_ != NULL) { - LOG(ERROR) << "Instance with the same task id already exists in our map " << instance_id; + LOG(ERROR) << "Instance " << instance_id << " with the same task id already exists in our map: " + << task_id; LOG(ERROR) << "Closing the old connection"; instance_info_[task_id]->conn_->closeConnection(); @@ -330,7 +331,7 @@ void StMgrServer::HandleRegisterInstanceRequest(REQID _reqid, Connection* _conn, response.mutable_status()->set_status(proto::system::NOTOK); SendResponse(_reqid, _conn, response); } else { - LOG(INFO) << "New instance registered with us " << instance_id; + LOG(INFO) << "New instance registered with us " << instance_id << " with task_id: " << task_id; active_instances_[_conn] = task_id; if (instance_info_.find(task_id) == instance_info_.end()) { instance_info_[task_id] = new InstanceData(_request->release_instance()); @@ -462,10 +463,7 @@ void StMgrServer::StartBackPressureConnectionCb(Connection* _connection) { sp_string instance_name = GetInstanceName(_connection); CHECK_NE(instance_name, ""); - if (remote_ends_who_caused_back_pressure_.empty()) { - SendStartBackPressureToOtherStMgrs(); - back_pressure_metric_initiated_->Start(); - } + SendStartBackPressureToOtherStMgrs(active_instances_[_connection]); // Indicate which instance component had back pressure heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[instance_name]; @@ -473,7 +471,7 @@ void StMgrServer::StartBackPressureConnectionCb(Connection* _connection) { remote_ends_who_caused_back_pressure_.insert(instance_name); LOG(INFO) << "We observe back pressure on sending data to instance " << instance_name; - StartBackPressureOnSpouts(); + StartBackPressureOnInstances(active_instances_[_connection], true); } void StMgrServer::StopBackPressureConnectionCb(Connection* _connection) { @@ -491,44 +489,38 @@ void StMgrServer::StopBackPressureConnectionCb(Connection* _connection) { heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[instance_name]; instance_metric->Stop(); - if (remote_ends_who_caused_back_pressure_.empty()) { - SendStopBackPressureToOtherStMgrs(); - back_pressure_metric_initiated_->Stop(); - } + SendStopBackPressureToOtherStMgrs(active_instances_[_connection]); + LOG(INFO) << "We don't observe back pressure now on sending data to instance " << instance_name; - AttemptStopBackPressureFromSpouts(); + AttemptStopBackPressureFromInstances(active_instances_[_connection], true); } -void StMgrServer::StartBackPressureClientCb(const sp_string& _other_stmgr_id) { - if (remote_ends_who_caused_back_pressure_.empty()) { - SendStartBackPressureToOtherStMgrs(); - back_pressure_metric_initiated_->Start(); - } +void StMgrServer::StartBackPressureClientCb(const sp_string& _other_stmgr_id, sp_int32 task_id) { + SendStartBackPressureToOtherStMgrs(task_id); + remote_ends_who_caused_back_pressure_.insert(_other_stmgr_id); LOG(INFO) << "We observe back pressure on sending data to remote stream manager " - << _other_stmgr_id; - StartBackPressureOnSpouts(); + << _other_stmgr_id << " for task " << task_id; + StartBackPressureOnInstances(task_id, true); } -void StMgrServer::StopBackPressureClientCb(const sp_string& _other_stmgr_id) { +void StMgrServer::StopBackPressureClientCb(const sp_string& _other_stmgr_id, sp_int32 task_id) { CHECK(remote_ends_who_caused_back_pressure_.find(_other_stmgr_id) != remote_ends_who_caused_back_pressure_.end()); remote_ends_who_caused_back_pressure_.erase(_other_stmgr_id); - if (remote_ends_who_caused_back_pressure_.empty()) { - SendStopBackPressureToOtherStMgrs(); - back_pressure_metric_initiated_->Stop(); - } + SendStopBackPressureToOtherStMgrs(task_id); LOG(INFO) << "We don't observe back pressure now on sending data to remote " "stream manager " - << _other_stmgr_id; - AttemptStopBackPressureFromSpouts(); + << _other_stmgr_id << " for task " << task_id; + AttemptStopBackPressureFromInstances(task_id, true); } void StMgrServer::HandleStartBackPressureMessage(Connection* _conn, proto::stmgr::StartBackPressureMessage* _message) { // Close spouts - LOG(INFO) << "Received start back pressure from str mgr " << _message->stmgr(); + LOG(INFO) << "Received start back pressure from str mgr " << _message->stmgr() << " for " + << _message->task_id(); if (_message->topology_name() != topology_name_ || _message->topology_id() != topology_id_) { LOG(ERROR) << "Received start back pressure message from unknown stream manager " << _message->topology_name() << " " << _message->topology_id() << " " @@ -540,16 +532,17 @@ void StMgrServer::HandleStartBackPressureMessage(Connection* _conn, auto iter = rstmgrs_.find(_conn); CHECK(iter != rstmgrs_.end()); sp_string stmgr_id = iter->second; - stmgrs_who_announced_back_pressure_.insert(stmgr_id); + stmgrs_who_announced_back_pressure_.insert(std::make_pair(stmgr_id, _message->task_id())); - StartBackPressureOnSpouts(); + StartBackPressureOnInstances(_message->task_id(), false); release(_message); } void StMgrServer::HandleStopBackPressureMessage(Connection* _conn, proto::stmgr::StopBackPressureMessage* _message) { - LOG(INFO) << "Received stop back pressure from str mgr " << _message->stmgr(); + LOG(INFO) << "Received stop back pressure from str mgr " << _message->stmgr() << " for " + << _message->task_id(); if (_message->topology_name() != topology_name_ || _message->topology_id() != topology_id_) { LOG(ERROR) << "Received stop back pressure message from unknown stream manager " << _message->topology_name() << " " << _message->topology_id() << " " @@ -563,55 +556,66 @@ void StMgrServer::HandleStopBackPressureMessage(Connection* _conn, sp_string stmgr_id = iter->second; // Did we receive a start back pressure message from this stmgr to // begin with? We could have been dead at the time of the announcement - if (stmgrs_who_announced_back_pressure_.find(stmgr_id) != - stmgrs_who_announced_back_pressure_.end()) { - stmgrs_who_announced_back_pressure_.erase(stmgr_id); - AttemptStopBackPressureFromSpouts(); + if (stmgrs_who_announced_back_pressure_.erase(std::make_pair(stmgr_id, _message->task_id()))) { + AttemptStopBackPressureFromInstances(_message->task_id(), false); } release(_message); } -void StMgrServer::SendStartBackPressureToOtherStMgrs() { - LOG(INFO) << "Sending start back pressure notification to all other " - << "stream managers"; - stmgr_->SendStartBackPressureToOtherStMgrs(); +void StMgrServer::SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id) { + LOG(INFO) << "Sending start back pressure notification for " << _task_id + << " to all other stream managers"; + stmgr_->SendStartBackPressureToOtherStMgrs(_task_id); } -void StMgrServer::SendStopBackPressureToOtherStMgrs() { - LOG(INFO) << "Sending stop back pressure notification to all other " - << "stream managers"; - stmgr_->SendStopBackPressureToOtherStMgrs(); +void StMgrServer::SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id) { + LOG(INFO) << "Sending stop back pressure notification for " << _task_id + << " to all other stream managers"; + stmgr_->SendStopBackPressureToOtherStMgrs(_task_id); } -void StMgrServer::StartBackPressureOnSpouts() { - if (!spouts_under_back_pressure_) { - LOG(WARNING) << "Stopping reading from spouts to do back pressure"; +void StMgrServer::StartBackPressureOnInstances(const sp_int32 _task_id, bool initiated) { + auto upstream_instances = stmgr_->GetUpstreamInstances(_task_id); + + for (auto iiter = instance_info_.begin(); iiter != instance_info_.end(); ++iiter) { + if (!iiter->second->conn_) + continue; + if (upstream_instances.find(iiter->second->instance_->info().task_id()) + == upstream_instances.end()) + continue; + LOG(INFO) << "Stopping reading from instance " << iiter->second->instance_->info().task_id() + << " to do back pressure"; + iiter->second->conn_->putBackPressure(); + } - spouts_under_back_pressure_ = true; - // Put back pressure on all spouts - for (auto iiter = instance_info_.begin(); iiter != instance_info_.end(); ++iiter) { - if (!iiter->second->local_spout_) continue; - if (!iiter->second->conn_) continue; - if (!iiter->second->conn_->isUnderBackPressure()) iiter->second->conn_->putBackPressure(); - } + if (instances_under_back_pressure_ == 0) { back_pressure_metric_aggr_->Start(); + if (initiated) + back_pressure_metric_initiated_->Start(); } + instances_under_back_pressure_++; } -void StMgrServer::AttemptStopBackPressureFromSpouts() { - if (spouts_under_back_pressure_ && remote_ends_who_caused_back_pressure_.empty() && - stmgrs_who_announced_back_pressure_.empty()) { - LOG(INFO) << "Starting reading from spouts to relieve back pressure"; - spouts_under_back_pressure_ = false; - - // Remove backpressure from all pipes - for (auto iiter = instance_info_.begin(); iiter != instance_info_.end(); ++iiter) { - if (!iiter->second->local_spout_) continue; - if (!iiter->second->conn_) continue; - if (iiter->second->conn_->isUnderBackPressure()) iiter->second->conn_->removeBackPressure(); - } +void StMgrServer::AttemptStopBackPressureFromInstances(const sp_int32 _task_id, bool initiated) { + auto upstream_instances = stmgr_->GetUpstreamInstances(_task_id); + + for (auto iiter = instance_info_.begin(); iiter != instance_info_.end(); ++iiter) { + if (!iiter->second->conn_) + continue; + if (upstream_instances.find(iiter->second->instance_->info().task_id()) + == upstream_instances.end()) + continue; + LOG(INFO) << "Starting reading from instance " << iiter->second->instance_->info().task_id() + << " to relieve back pressure"; + iiter->second->conn_->removeBackPressure(); + } + + instances_under_back_pressure_--; + if (instances_under_back_pressure_ == 0) { back_pressure_metric_aggr_->Stop(); + if (initiated) + back_pressure_metric_initiated_->Stop(); } } diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h index 6a30dee2eb9..562fe69d3cb 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.h +++ b/heron/stmgr/src/cpp/manager/stmgr-server.h @@ -24,6 +24,7 @@ #include "proto/messages.h" #include "network/network.h" #include "basics/basics.h" +#include "basics/spmultimap.h" namespace heron { namespace common { @@ -57,9 +58,9 @@ class StMgrServer : public Server { void BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan); // Do back pressure - void StartBackPressureClientCb(const sp_string& _other_stmgr_id); + void StartBackPressureClientCb(const sp_string& _other_stmgr_id, sp_int32 task_id); // Relieve back pressure - void StopBackPressureClientCb(const sp_string& _other_stmgr_id); + void StopBackPressureClientCb(const sp_string& _other_stmgr_id, sp_int32 task_id); bool HaveAllInstancesConnectedToUs() const { return active_instances_.size() == expected_instances_.size(); @@ -97,8 +98,8 @@ class StMgrServer : public Server { proto::stmgr::StartBackPressureMessage* _message); void HandleStopBackPressureMessage(Connection* _conn, proto::stmgr::StopBackPressureMessage* _message); - void SendStartBackPressureToOtherStMgrs(); - void SendStopBackPressureToOtherStMgrs(); + void SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id); + void SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id); // Back pressure related connection callbacks // Do back pressure @@ -106,10 +107,10 @@ class StMgrServer : public Server { // Relieve back pressure void StopBackPressureConnectionCb(Connection* _connection); - // Can we free the back pressure on the spouts? - void AttemptStopBackPressureFromSpouts(); - // Start back pressure on the spouts - void StartBackPressureOnSpouts(); + // Can we free the back pressure on the instances? + void AttemptStopBackPressureFromInstances(const sp_int32 _task_id, bool initiated); + // Start back pressure on the instances + void StartBackPressureOnInstances(const sp_int32 _task_id, bool initiated); // Compute the LocalSpouts from Physical Plan void ComputeLocalSpouts(const proto::system::PhysicalPlan& _pplan); @@ -154,8 +155,8 @@ class StMgrServer : public Server { // instances/stream mgrs causing back pressure std::unordered_set remote_ends_who_caused_back_pressure_; - // stream managers that have announced back pressure - std::unordered_set stmgrs_who_announced_back_pressure_; + // (stream manager, task id) pairs who have announced back pressure + sp_multimap stmgrs_who_announced_back_pressure_; sp_string topology_name_; sp_string topology_id_; @@ -169,7 +170,7 @@ class StMgrServer : public Server { heron::common::TimeSpentMetric* back_pressure_metric_aggr_; heron::common::TimeSpentMetric* back_pressure_metric_initiated_; - bool spouts_under_back_pressure_; + sp_int32 instances_under_back_pressure_; }; } // namespace stmgr diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp index 6a996c80ca9..4a72b0b5c13 100644 --- a/heron/stmgr/src/cpp/manager/stmgr.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include "manager/stmgr-clientmgr.h" #include "manager/stmgr-server.h" #include "manager/stream-consumers.h" @@ -149,8 +150,6 @@ StMgr::~StMgr() { delete tuple_set_from_other_stmgr_; } -bool StMgr::DidAnnounceBackPressure() { return server_->DidAnnounceBackPressure(); } - void StMgr::CheckTMasterLocation(EventLoop::Status) { if (!tmaster_client_) { LOG(FATAL) << "Could not fetch the TMaster location in time. Exiting. "; @@ -432,6 +431,7 @@ void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan* _pplan) { PopulateStreamConsumers(_pplan->mutable_topology(), component_to_task_ids); PopulateXorManagers(_pplan->topology(), ExtractTopologyTimeout(_pplan->topology()), component_to_task_ids); + PopulateUpstreamTasks(_pplan); } delete pplan_; @@ -521,6 +521,39 @@ void StMgr::PopulateXorManagers( xor_mgrs_ = new XorManager(eventLoop_, _message_timeout, all_spout_tasks); } +void StMgr::PopulateUpstreamTasks(proto::system::PhysicalPlan* _pplan) { + proto::api::Topology* topology = _pplan->mutable_topology(); + // Build a graph with components + std::unordered_map> components_mapping; + for (sp_int32 i = 0; i < topology->bolts_size(); ++i) { + const sp_string& component_name = topology->bolts(i).comp().name(); + components_mapping[component_name] = {}; + for (sp_int32 j = 0; j < topology->bolts(i).inputs_size(); ++j) { + const proto::api::InputStream& is = topology->bolts(i).inputs(j); + components_mapping[component_name].push_back(is.stream().component_name()); + DLOG(INFO) << component_name << " <= " << is.stream().component_name(); + } + } + + // Build a component name to task id mapping + std::unordered_map component_to_task_id; + for (sp_int32 i = 0; i < _pplan->instances_size(); ++i) { + const sp_string& component_name = _pplan->instances(i).info().component_name(); + component_to_task_id[component_name] = _pplan->instances(i).info().task_id(); + DLOG(INFO) << component_name << " = " << _pplan->instances(i).info().task_id(); + } + + // Convert the above graph to task ids + for (auto it = components_mapping.begin(); it != components_mapping.end(); it++) { + sp_int32 task_id = component_to_task_id[it->first]; + tasks_mapping_[task_id] = {}; + for (auto s = it->second.begin(); s != it->second.end(); s++) { + tasks_mapping_[task_id].insert(component_to_task_id[*s]); + DLOG(INFO) << task_id << " <= " << component_to_task_id[*s]; + } + } +} + const proto::system::PhysicalPlan* StMgr::GetPhysicalPlan() const { return pplan_; } void StMgr::HandleStreamManagerData(const sp_string&, @@ -712,19 +745,41 @@ void StMgr::CopyDataOutBound(sp_int32 _src_task_id, bool _local_spout, void StMgr::StartBackPressureOnServer(const sp_string& _other_stmgr_id) { // Ask the StMgrServer to stop consuming. The client does // not consume anything - server_->StartBackPressureClientCb(_other_stmgr_id); + sp_int32 task_id = clientmgr_->FindBusiestTaskOnStmgr(_other_stmgr_id); + server_->StartBackPressureClientCb(_other_stmgr_id, task_id); + backpressure_starters_.push(task_id); } void StMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) { // Call the StMgrServers removeBackPressure method - server_->StopBackPressureClientCb(_other_stmgr_id); + // Note: This is not good, we probably do not want to unthrottle them all + // at once, the upper layer only calls this once. + while (!backpressure_starters_.empty()) { + server_->StopBackPressureClientCb(_other_stmgr_id, backpressure_starters_.front()); + backpressure_starters_.pop(); + } +} + +void StMgr::SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id) { + clientmgr_->SendStartBackPressureToOtherStMgrs(_task_id); } -void StMgr::SendStartBackPressureToOtherStMgrs() { - clientmgr_->SendStartBackPressureToOtherStMgrs(); +void StMgr::SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id) { + clientmgr_->SendStopBackPressureToOtherStMgrs(_task_id); } -void StMgr::SendStopBackPressureToOtherStMgrs() { clientmgr_->SendStopBackPressureToOtherStMgrs(); } +void StMgr::_GetUpstreamInstances(const sp_int32 _task_id, std::unordered_set& result) { + for (auto& i : tasks_mapping_[_task_id]) { + result.insert(i); + _GetUpstreamInstances(i, result); + } +} + +std::unordered_set StMgr::GetUpstreamInstances(const sp_int32 _task_id) { + std::unordered_set result; + _GetUpstreamInstances(_task_id, result); + return result; +} } // namespace stmgr } // namespace heron diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h index 5114e8ef67d..47e664947a7 100644 --- a/heron/stmgr/src/cpp/manager/stmgr.h +++ b/heron/stmgr/src/cpp/manager/stmgr.h @@ -17,13 +17,14 @@ #ifndef SRC_CPP_SVCS_STMGR_SRC_MANAGER_STMGR_H_ #define SRC_CPP_SVCS_STMGR_SRC_MANAGER_STMGR_H_ -#include #include +#include #include #include #include #include #include +#include #include "proto/messages.h" #include "network/network.h" #include "basics/basics.h" @@ -73,12 +74,13 @@ class StMgr { virtual void StopBackPressureOnServer(const sp_string& _other_stmgr_id); // Used by the server to tell the client to send the back pressure related // messages - void SendStartBackPressureToOtherStMgrs(); - void SendStopBackPressureToOtherStMgrs(); + void SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id); + void SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id); void StartTMasterClient(); - bool DidAnnounceBackPressure(); + std::unordered_set GetUpstreamInstances(const sp_int32 _task_id); private: + void _GetUpstreamInstances(const sp_int32 _task_id, std::unordered_set& result); void OnTMasterLocationFetch(proto::tmaster::TMasterLocation* _tmaster, proto::system::StatusCode); void OnMetricsCacheLocationFetch( proto::tmaster::MetricsCacheLocation* _tmaster, proto::system::StatusCode); @@ -92,6 +94,7 @@ class StMgr { void PopulateStreamConsumers( proto::api::Topology* _topology, const std::map >& _component_to_task_ids); + void PopulateUpstreamTasks(proto::system::PhysicalPlan* _pplan); void PopulateXorManagers( const proto::api::Topology& _topology, sp_int32 _message_timeout, const std::map >& _component_to_task_ids); @@ -136,6 +139,8 @@ class StMgr { std::unordered_map task_id_to_stmgr_; // map of to its consumers std::unordered_map, StreamConsumers*> stream_consumers_; + // The graph of all the tasks + std::unordered_map> tasks_mapping_; // xor managers XorManager* xor_mgrs_; // Tuple Cache to optimize message building @@ -169,6 +174,7 @@ class StMgr { sp_int64 high_watermark_; sp_int64 low_watermark_; + std::queue backpressure_starters_; }; } // namespace stmgr From 9c0b3c7cdd8ab5f3235d79a7d9ad14a0f9649415 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Mon, 10 Apr 2017 11:29:50 -0700 Subject: [PATCH 2/2] Address review comments from Sanjeev --- heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp | 2 ++ heron/stmgr/src/cpp/manager/stmgr-clientmgr.h | 3 ++- heron/stmgr/src/cpp/manager/stmgr.cpp | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp index 04e4e5544a5..fa48ad0410e 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp @@ -124,6 +124,7 @@ StMgrClient* StMgrClientMgr::CreateClient(const sp_string& _other_stmgr_id, } sp_int32 StMgrClientMgr::FindBusiestTaskOnStmgr(const sp_string& _stmgr_id) { + CHECK(instance_stats_.find(_stmgr_id) != instance_stats_.end()); sp_int32 task_id; sp_int64 max = 0; for (auto iter = instance_stats_[_stmgr_id].begin(); @@ -162,6 +163,7 @@ void StMgrClientMgr::StartBackPressureOnServer(const sp_string& _other_stmgr_id) void StMgrClientMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) { // Call the StMgrServers removeBackPressure method stream_manager_->StopBackPressureOnServer(_other_stmgr_id); + instance_stats_.clear(); } void StMgrClientMgr::SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id) { diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h index d4262de4061..005f1b645b2 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h +++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h @@ -77,7 +77,8 @@ class StMgrClientMgr { sp_int64 high_watermark_; sp_int64 low_watermark_; - // Counters for remote instance traffic, this is used for back pressure + // Counters for the traffic per remote instance on per stmgr since the last back pressure, + // this is used by back pressure algorithm to decide which instance to blame std::unordered_map> instance_stats_; }; diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp index 4a72b0b5c13..4124eae010e 100644 --- a/heron/stmgr/src/cpp/manager/stmgr.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr.cpp @@ -753,7 +753,7 @@ void StMgr::StartBackPressureOnServer(const sp_string& _other_stmgr_id) { void StMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) { // Call the StMgrServers removeBackPressure method // Note: This is not good, we probably do not want to unthrottle them all - // at once, the upper layer only calls this once. + // at once, but the lower layer only calls us once. while (!backpressure_starters_.empty()) { server_->StopBackPressureClientCb(_other_stmgr_id, backpressure_starters_.front()); backpressure_starters_.pop();