Skip to content

Commit

Permalink
Cron cleanup repeatedly hits deadlocks on large environments
Browse files Browse the repository at this point in the history
Fix failing tests
  • Loading branch information
ihor-sviziev committed Aug 26, 2020
1 parent a901619 commit dda5c72
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 55 deletions.
47 changes: 23 additions & 24 deletions app/code/Magento/Cron/Observer/ProcessCronQueueObserver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
namespace Magento\Cron\Observer;

use Magento\Cron\Model\ResourceModel\Schedule\Collection as ScheduleCollection;
use Magento\Cron\Model\Schedule;
use Magento\Framework\App\State;
use Magento\Framework\Console\Cli;
Expand Down Expand Up @@ -83,7 +84,7 @@ class ProcessCronQueueObserver implements ObserverInterface
const MAX_RETRIES = 5;

/**
* @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
* @var ScheduleCollection
*/
protected $_pendingSchedules;

Expand Down Expand Up @@ -278,12 +279,12 @@ function ($groupId) use ($currentTime) {
*
* It should be taken by standalone (child) process, not by the parent process.
*
* @param int $groupId
* @param string $groupId
* @param callable $callback
*
* @return void
*/
private function lockGroup($groupId, callable $callback)
private function lockGroup(string $groupId, callable $callback): void
{
if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
$this->logger->warning(
Expand Down Expand Up @@ -399,7 +400,7 @@ function () use ($schedule) {
* @param string $jobName
* @return void
*/
private function startProfiling(string $jobName = '')
private function startProfiling(string $jobName = ''): void
{
$this->statProfiler->clear();
$this->statProfiler->start(
Expand All @@ -416,7 +417,7 @@ private function startProfiling(string $jobName = '')
* @param string $jobName
* @return void
*/
private function stopProfiling(string $jobName = '')
private function stopProfiling(string $jobName = ''): void
{
$this->statProfiler->stop(
sprintf(self::CRON_TIMERID, $jobName),
Expand Down Expand Up @@ -445,9 +446,9 @@ private function getProfilingStat(string $jobName): string
* Return job collection from data base with status 'pending'.
*
* @param string $groupId
* @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
* @return ScheduleCollection
*/
private function getPendingSchedules($groupId)
private function getPendingSchedules(string $groupId): ScheduleCollection
{
$jobs = $this->_config->getJobs();
$pendingJobs = $this->_scheduleFactory->create()->getCollection();
Expand All @@ -462,7 +463,7 @@ private function getPendingSchedules($groupId)
* @param string $groupId
* @return $this
*/
private function generateSchedules($groupId)
private function generateSchedules(string $groupId): self
{
/**
* check if schedule generation is needed
Expand Down Expand Up @@ -533,13 +534,13 @@ protected function _generateJobs($jobs, $exists, $groupId)
* @param int $currentTime
* @return void
*/
private function cleanupJobs($groupId, $currentTime)
private function cleanupJobs(string $groupId, int $currentTime): void
{
// check if history cleanup is needed
$lastCleanup = (int)$this->_cache->load(self::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $groupId);
$historyCleanUp = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_CLEANUP_EVERY);
if ($lastCleanup > $this->dateTime->gmtTimestamp() - $historyCleanUp * self::SECONDS_IN_MINUTE) {
return $this;
return;
}
// save time history cleanup was ran with no expiration
$this->_cache->save(
Expand Down Expand Up @@ -674,7 +675,7 @@ protected function getScheduleTimeInterval($groupId)
* @param string $groupId
* @return void
*/
private function cleanupDisabledJobs($groupId)
private function cleanupDisabledJobs(string $groupId): void
{
$jobs = $this->_config->getJobs();
$jobsToCleanup = [];
Expand Down Expand Up @@ -703,7 +704,7 @@ private function cleanupDisabledJobs($groupId)
* @param string $groupId
* @return void
*/
private function cleanupRunningJobs($groupId)
private function cleanupRunningJobs(string $groupId): void
{
$scheduleResource = $this->_scheduleFactory->create()->getResource();
$connection = $scheduleResource->getConnection();
Expand All @@ -716,13 +717,11 @@ private function cleanupRunningJobs($groupId)
'status' => \Magento\Cron\Model\Schedule::STATUS_ERROR,
'messages' => 'Time out'
],
$connection->quoteInto(
'status = ? ' .
'AND job_code IN (?) ' .
'AND (scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY)',
\Magento\Cron\Model\Schedule::STATUS_RUNNING,
array_keys($jobs[$groupId])
)
[
$connection->quoteInto('status = ?', \Magento\Cron\Model\Schedule::STATUS_RUNNING),
$connection->quoteInto('job_code IN (?)', array_keys($jobs[$groupId])),
'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY'
]
);
}

Expand Down Expand Up @@ -803,13 +802,13 @@ private function isGroupInFilter($groupId): bool
* @param array $jobsRoot
* @param int $currentTime
*/
private function processPendingJobs($groupId, $jobsRoot, $currentTime)
private function processPendingJobs(string $groupId, array $jobsRoot, int $currentTime): void
{
$procesedJobs = [];
$processedJobs = [];
$pendingJobs = $this->getPendingSchedules($groupId);
/** @var Schedule $schedule */
foreach ($pendingJobs as $schedule) {
if (isset($procesedJobs[$schedule->getJobCode()])) {
if (isset($processedJobs[$schedule->getJobCode()])) {
// process only on job per run
continue;
}
Expand All @@ -826,7 +825,7 @@ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
$this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);

if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
$procesedJobs[$schedule->getJobCode()] = true;
$processedJobs[$schedule->getJobCode()] = true;
}

$this->retrier->execute(
Expand All @@ -851,7 +850,7 @@ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule,
{
// use sha1 to limit length
// phpcs:ignore Magento2.Security.InsecureFunction
$lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
$lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());

try {
for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,53 +1045,85 @@ public function testMissedJobsCleanedInTime()
$scheduleMock->expects($this->exactly(10))->method('getResource')->willReturn($this->scheduleResourceMock);
$this->scheduleFactoryMock->expects($this->exactly(11))->method('create')->willReturn($scheduleMock);

$connectionMock = $this->prepareConnectionMock($tableName);

$this->scheduleResourceMock->expects($this->exactly(6))
->method('getTable')
->with($tableName)
->willReturn($tableName);
$this->scheduleResourceMock->expects($this->exactly(15))
->method('getConnection')
->willReturn($connectionMock);

$this->retrierMock->expects($this->exactly(5))
->method('execute')
->willReturnCallback(
function ($callback) {
return $callback();
}
);

$this->cronQueueObserver->execute($this->observerMock);
}

/**
* @param string $tableName
* @return AdapterInterface|MockObject
*/
private function prepareConnectionMock(string $tableName)
{
$connectionMock = $this->getMockForAbstractClass(AdapterInterface::class);

$connectionMock->expects($this->exactly(5))
->method('delete')
->withConsecutive(
[$tableName, ['status = ?' => 'pending', 'job_code in (?)' => ['test_job1']]],
[$tableName, ['status = ?' => 'success', 'job_code in (?)' => ['test_job1'], 'scheduled_at < ?' => null]],
[$tableName, ['status = ?' => 'missed', 'job_code in (?)' => ['test_job1'], 'scheduled_at < ?' => null]],
[$tableName, ['status = ?' => 'error', 'job_code in (?)' => ['test_job1'], 'scheduled_at < ?' => null]],
[$tableName, ['status = ?' => 'pending', 'job_code in (?)' => ['test_job1'], 'scheduled_at < ?' => null]]
[
$tableName,
['status = ?' => 'pending', 'job_code in (?)' => ['test_job1']]
],
[
$tableName,
['status = ?' => 'success', 'job_code in (?)' => ['test_job1'], 'scheduled_at < ?' => null]
],
[
$tableName,
['status = ?' => 'missed', 'job_code in (?)' => ['test_job1'], 'scheduled_at < ?' => null]
],
[
$tableName,
['status = ?' => 'error', 'job_code in (?)' => ['test_job1'], 'scheduled_at < ?' => null]
],
[
$tableName,
['status = ?' => 'pending', 'job_code in (?)' => ['test_job1'], 'scheduled_at < ?' => null]
]
)
->willReturn(1);

$connectionMock->expects($this->once())
$connectionMock->expects($this->any())
->method('quoteInto')
->with(
'status = ? AND job_code IN (?) AND (scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY)',
'running',
['test_job1']
->withConsecutive(
['status = ?', \Magento\Cron\Model\Schedule::STATUS_RUNNING],
['job_code IN (?)', ['test_job1']],
)
->willReturn('');
->willReturnOnConsecutiveCalls(
"status = 'running'",
"job_code IN ('test_job1')"
);

$connectionMock->expects($this->once())
->method('update')
->with(
$tableName,
['status' => 'error', 'messages' => 'Time out'],
''
[
"status = 'running'",
"job_code IN ('test_job1')",
'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY'
]
)
->willReturn(0);

$this->scheduleResourceMock->expects($this->exactly(6))
->method('getTable')
->with($tableName)
->willReturn($tableName);
$this->scheduleResourceMock->expects($this->exactly(15))
->method('getConnection')
->willReturn($connectionMock);

$this->retrierMock->expects($this->exactly(5))
->method('execute')
->willReturnCallback(
function ($callback) {
return $callback();
}
);

$this->cronQueueObserver->execute($this->observerMock);
return $connectionMock;
}
}
5 changes: 3 additions & 2 deletions app/code/Magento/Cron/etc/db_schema_whitelist.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
},
"index": {
"CRON_SCHEDULE_JOB_CODE": true,
"CRON_SCHEDULE_SCHEDULED_AT_STATUS": true
"CRON_SCHEDULE_SCHEDULED_AT_STATUS": true,
"CRON_SCHEDULE_JOB_CODE_STATUS_SCHEDULED_AT": true
},
"constraint": {
"PRIMARY": true
}
}
}
}

0 comments on commit dda5c72

Please sign in to comment.