Skip to content

Commit

Permalink
Migrated: ReplicationRequest DeleteRequest FindRequest FindAllRequest…
Browse files Browse the repository at this point in the history
… EchoRequest DirectorIndexRequest Sql*Request DisposeRequest ServiceManagementRequest*
  • Loading branch information
iagaponenko committed Nov 24, 2024
1 parent 6e44e0d commit 3e86574
Show file tree
Hide file tree
Showing 86 changed files with 1,191 additions and 2,034 deletions.
38 changes: 17 additions & 21 deletions src/replica/apps/AdminApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,51 +93,48 @@ int AdminApp::runImpl() {
// Launch requests against a collection of workers

CommonRequestTracker<ServiceManagementRequestBase> tracker(cout, _progressReport, _errorReport);

auto const workerNames =
_allWorkers ? serviceProvider()->config()->allWorkers() : serviceProvider()->config()->workers();

for (auto&& workerName : workerNames) {
if (_operation == "STATUS") {
tracker.add(controller->statusOfWorkerService(
workerName, [&tracker](ServiceStatusRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

tracker.add(ServiceStatusRequest::create(
controller, workerName,
[&tracker](ServiceStatusRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));
} else if (_operation == "SUSPEND") {
tracker.add(controller->suspendWorkerService(
workerName,
tracker.add(ServiceSuspendRequest::create(
controller, workerName,
[&tracker](ServiceSuspendRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

} else if (_operation == "RESUME") {
tracker.add(controller->resumeWorkerService(
workerName, [&tracker](ServiceResumeRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

tracker.add(ServiceResumeRequest::create(
controller, workerName,
[&tracker](ServiceResumeRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));
} else if (_operation == "REQUESTS") {
tracker.add(controller->requestsOfWorkerService(
workerName,
tracker.add(ServiceRequestsRequest::create(
controller, workerName,
[&tracker](ServiceRequestsRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

} else if (_operation == "DRAIN") {
tracker.add(controller->drainWorkerService(
workerName, [&tracker](ServiceDrainRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

tracker.add(ServiceDrainRequest::create(
controller, workerName,
[&tracker](ServiceDrainRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));
} else if (_operation == "RECONFIG") {
tracker.add(ServiceReconfigRequest::create(
controller, workerName,
[&tracker](ServiceReconfigRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));
} else {
throw logic_error("AdminApp::" + string(__func__) + " unsupported operation: " + _operation);
}
}

// Wait before all request are finished

tracker.track();

// Analyze and display results

vector<string> workerName;
vector<string> startedSecondsAgo;
vector<string> state;
vector<string> numNewRequests;
vector<string> numInProgressRequests;
vector<string> numFinishedRequests;

for (auto const& ptr : tracker.requests) {
workerName.push_back(ptr->workerName());
if ((ptr->state() == Request::State::FINISHED) &&
Expand Down Expand Up @@ -174,7 +171,6 @@ int AdminApp::runImpl() {
vector<string> requestType;
vector<string> queue;
vector<uint32_t> priority;

auto analyzeRemoteRequestInfo = [&](string const& workerName_, string const& queueName,
ProtocolServiceResponseInfo const& info) {
workerName.push_back(workerName_);
Expand Down
166 changes: 70 additions & 96 deletions src/replica/apps/ControllerApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ class ManagementRequestLauncher {

template <typename REQUEST>
typename REQUEST::Ptr status() const {
return _controller->statusById<REQUEST>(_workerName, _affectedRequestId, REQUEST::extendedPrinter,
!_doNotTrackRequest);
return REQUEST::create(_controller, _workerName, _affectedRequestId, REQUEST::extendedPrinter,
!_doNotTrackRequest);
}

template <typename REQUEST>
Expand Down Expand Up @@ -525,31 +525,29 @@ int ControllerApp::runImpl() {
Request::Ptr request;

if ("REPLICATE" == _requestType) {
request = controller->replicate(
_workerName, _sourceWorkerName, _databaseName, _chunkNumber,
[](Request::Ptr const& request_) { request_->print(); }, _priority, !_doNotTrackRequest,
_allowDuplicates);

request = ReplicationRequest::create(
controller, _workerName, _sourceWorkerName, _databaseName, _chunkNumber,
[](ReplicationRequest::Ptr const& request_) { request_->print(); }, _priority,
!_doNotTrackRequest, _allowDuplicates);
} else if ("DELETE" == _requestType) {
request = controller->deleteReplica(_workerName, _databaseName, _chunkNumber, Request::defaultPrinter,
_priority, !_doNotTrackRequest, _allowDuplicates);

request = DeleteRequest::create(controller, _workerName, _databaseName, _chunkNumber,
Request::defaultPrinter, _priority, !_doNotTrackRequest,
_allowDuplicates);
} else if ("FIND" == _requestType) {
request = controller->findReplica(_workerName, _databaseName, _chunkNumber, Request::defaultPrinter,
_priority, _computeCheckSum, !_doNotTrackRequest);

request = FindRequest::create(controller, _workerName, _databaseName, _chunkNumber,
Request::defaultPrinter, _priority, _computeCheckSum,
!_doNotTrackRequest);
} else if ("FIND_ALL" == _requestType) {
request = controller->findAllReplicas(_workerName, _databaseName, !_doNotSaveReplicaInfo,
Request::defaultPrinter, _priority, !_doNotTrackRequest);

request = FindAllRequest::create(controller, _workerName, _databaseName, !_doNotSaveReplicaInfo,
Request::defaultPrinter, _priority, !_doNotTrackRequest);
} else if ("ECHO" == _requestType) {
request = controller->echo(_workerName, _echoData, _echoDelayMilliseconds, Request::defaultPrinter,
_priority, !_doNotTrackRequest);

request = EchoRequest::create(controller, _workerName, _echoData, _echoDelayMilliseconds,
Request::defaultPrinter, _priority, !_doNotTrackRequest);
} else if ("INDEX" == _requestType) {
bool const hasTransactions = _transactionId != numeric_limits<TransactionId>::max();
request = controller->directorIndex(
_workerName, _sqlDatabase, _sqlTable, _chunkNumber, hasTransactions, _transactionId,
request = DirectorIndexRequest::create(
controller, _workerName, _sqlDatabase, _sqlTable, _chunkNumber, hasTransactions,
_transactionId,
[&](DirectorIndexRequest::Ptr const& request_) {
Request::defaultPrinter(request_);
auto const& responseData = request_->responseData();
Expand All @@ -570,125 +568,101 @@ int ControllerApp::runImpl() {
}
},
_priority, !_doNotTrackRequest);

} else if ("SQL_ALTER_TABLES" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlAlterTables(_workerName, _sqlDatabase, tables, _sqlAlterSpec,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

request = SqlAlterTablesRequest::create(controller, _workerName, _sqlDatabase, tables, _sqlAlterSpec,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_QUERY" == _requestType) {
request = controller->sqlQuery(_workerName, _sqlQuery, _sqlUser, _sqlPassword, _sqlMaxRows,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

} else if ("SQL_CREATE_DATABASE" == _requestType) {
request = controller->sqlCreateDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority,
request = SqlQueryRequest::create(controller, _workerName, _sqlQuery, _sqlUser, _sqlPassword,
_sqlMaxRows, SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

} else if ("SQL_CREATE_DATABASE" == _requestType) {
request = SqlCreateDbRequest::create(controller, _workerName, _sqlDatabase,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_DELETE_DATABASE" == _requestType) {
request = controller->sqlDeleteDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

request = SqlDeleteDbRequest::create(controller, _workerName, _sqlDatabase,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_ENABLE_DATABASE" == _requestType) {
request = controller->sqlEnableDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

request = SqlEnableDbRequest::create(controller, _workerName, _sqlDatabase,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_DISABLE_DATABASE" == _requestType) {
request = controller->sqlDisableDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

request = SqlDisableDbRequest::create(controller, _workerName, _sqlDatabase,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_GRANT_ACCESS" == _requestType) {
request = controller->sqlGrantAccess(_workerName, _sqlDatabase, _sqlUser, SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);

request = SqlGrantAccessRequest::create(controller, _workerName, _sqlDatabase, _sqlUser,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_CREATE_TABLE" == _requestType) {
request = controller->sqlCreateTable(_workerName, _sqlDatabase, _sqlTable, _sqlEngine,
_sqlPartitionByColumn,
SqlSchemaUtils::readFromTextFile(_sqlSchemaFile),
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

request = SqlCreateTableRequest::create(controller, _workerName, _sqlDatabase, _sqlTable, _sqlEngine,
_sqlPartitionByColumn,
SqlSchemaUtils::readFromTextFile(_sqlSchemaFile),
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_CREATE_TABLES" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlCreateTables(_workerName, _sqlDatabase, tables, _sqlEngine,
_sqlPartitionByColumn,
SqlSchemaUtils::readFromTextFile(_sqlSchemaFile),
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

request = SqlCreateTablesRequest::create(controller, _workerName, _sqlDatabase, tables, _sqlEngine,
_sqlPartitionByColumn,
SqlSchemaUtils::readFromTextFile(_sqlSchemaFile),
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_DELETE_TABLE" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlDeleteTable(_workerName, _sqlDatabase, tables, SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);

request = SqlDeleteTableRequest::create(controller, _workerName, _sqlDatabase, tables,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_REMOVE_TABLE_PARTITIONS" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlRemoveTablePartitions(_workerName, _sqlDatabase, tables,
SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

request = SqlRemoveTablePartitionsRequest::create(controller, _workerName, _sqlDatabase, tables,
SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);
} else if ("SQL_DELETE_TABLE_PARTITION" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlDeleteTablePartition(_workerName, _sqlDatabase, tables, _transactionId,
SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

request = SqlDeleteTablePartitionRequest::create(controller, _workerName, _sqlDatabase, tables,
_transactionId, SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);
} else if ("SQL_CREATE_TABLE_INDEXES" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlCreateTableIndexes(
_workerName, _sqlDatabase, tables, SqlRequestParams::IndexSpec(_sqlIndexSpecStr),
request = SqlCreateIndexesRequest::create(
controller, _workerName, _sqlDatabase, tables, SqlRequestParams::IndexSpec(_sqlIndexSpecStr),
_sqlIndexName, _sqlIndexComment,
SqlSchemaUtils::readIndexSpecFromTextFile(_sqlIndexColumnsFile), SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);

} else if ("SQL_DROP_TABLE_INDEXES" == _requestType) {
vector<string> const tables = {_sqlTable};
request =
controller->sqlDropTableIndexes(_workerName, _sqlDatabase, tables, _sqlIndexName,
request = SqlDropIndexesRequest::create(controller, _workerName, _sqlDatabase, tables, _sqlIndexName,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

} else if ("SQL_GET_TABLE_INDEXES" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlGetTableIndexes(_workerName, _sqlDatabase, tables,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

request = SqlGetIndexesRequest::create(controller, _workerName, _sqlDatabase, tables,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_TABLE_ROW_STATS" == _requestType) {
auto const databaseInfo = controller->serviceProvider()->config()->databaseInfo(_sqlDatabase);
bool const isPartitioned = databaseInfo.findTable(_sqlTable).isPartitioned;
vector<string> const tables = {
isPartitioned ? ChunkedTable(_sqlTable, _chunkNumber, _isOverlap).name() : _sqlTable};
request = controller->sqlRowStats(_workerName, _sqlDatabase, tables, SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);

request = SqlRowStatsRequest::create(controller, _workerName, _sqlDatabase, tables,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("STATUS" == _requestType) {
request = _launchStatusRequest(controller);

} else if ("STOP" == _requestType) {
request = _launchStatusRequest(controller);

} else if ("DISPOSE" == _requestType) {
vector<string> const targetIds = {_affectedRequestId};
request = controller->dispose(_workerName, targetIds, Request::defaultPrinter);

request = DisposeRequest::create(controller, _workerName, targetIds, Request::defaultPrinter);
} else if ("SERVICE_SUSPEND" == _requestType) {
request =
controller->suspendWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

request = ServiceSuspendRequest::create(controller, _workerName,
ServiceManagementRequestBase::extendedPrinter);
} else if ("SERVICE_RESUME" == _requestType) {
request = controller->resumeWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

request = ServiceResumeRequest::create(controller, _workerName,
ServiceManagementRequestBase::extendedPrinter);
} else if ("SERVICE_STATUS" == _requestType) {
request =
controller->statusOfWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

request = ServiceStatusRequest::create(controller, _workerName,
ServiceManagementRequestBase::extendedPrinter);
} else if ("SERVICE_REQUESTS" == _requestType) {
request = controller->requestsOfWorkerService(_workerName,
ServiceManagementRequestBase::extendedPrinter);

request = ServiceRequestsRequest::create(controller, _workerName,
ServiceManagementRequestBase::extendedPrinter);
} else if ("SERVICE_DRAIN" == _requestType) {
request = controller->drainWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

request = ServiceDrainRequest::create(controller, _workerName,
ServiceManagementRequestBase::extendedPrinter);
} else if ("SERVICE_RECONFIG" == _requestType) {
request =
controller->reconfigWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

request = ServiceReconfigRequest::create(controller, _workerName,
ServiceManagementRequestBase::extendedPrinter);
} else {
throw logic_error(context + "unsupported request: " + _affectedRequest);
}
Expand Down
4 changes: 2 additions & 2 deletions src/replica/apps/MessengerTestApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ int MessengerTestApp::runImpl() {
_onNumActiveCv.wait(lock, [&] { return _numActive < _maxActiveRequests; });

// Submit the next request.
auto const request = controller->echo(
_workerName, _data, _proccesingTimeSec,
auto const request = EchoRequest::create(
controller, _workerName, _data, _proccesingTimeSec,
[&](EchoRequest::Ptr request) {
{
unique_lock<mutex> lock(_mtx);
Expand Down
Loading

0 comments on commit 3e86574

Please sign in to comment.