Skip to content

Commit

Permalink
ENGCOM-8074: Cron cleanup repeatedly hits deadlocks on large environm…
Browse files Browse the repository at this point in the history
…ents where groups can overlap #28007
  • Loading branch information
gabrieldagama authored Dec 15, 2020
2 parents 8903b8e + 2c850a4 commit 0fc0dc4
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 62 deletions.
15 changes: 15 additions & 0 deletions app/code/Magento/Cron/Model/DeadlockRetrier.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@
*/
class DeadlockRetrier implements DeadlockRetrierInterface
{
/**
* @var \Psr\Log\LoggerInterface
*/
private $logger;

/**
* @param \Psr\Log\LoggerInterface $logger
*/
public function __construct(
\Psr\Log\LoggerInterface $logger
) {
$this->logger = $logger;
}

/**
* @inheritdoc
*/
Expand All @@ -30,6 +44,7 @@ public function execute(callable $callback, AdapterInterface $connection)
try {
return $callback();
} catch (DeadlockException $e) {
$this->logger->warning(sprintf("Deadlock detected in cron: %s", $e->getMessage()));
continue;
}
}
Expand Down
36 changes: 26 additions & 10 deletions app/code/Magento/Cron/Model/ResourceModel/Schedule.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,47 @@ public function trySetJobStatusAtomic($scheduleId, $newStatus, $currentStatus)
public function trySetJobUniqueStatusAtomic($scheduleId, $newStatus, $currentStatus)
{
$connection = $this->getConnection();
$connection->beginTransaction();

// this condition added to avoid cron jobs locking after incorrect termination of running job
$match = $connection->quoteInto(
'existing.job_code = current.job_code ' .
'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL) ' .
'AND existing.status = ?',
'AND existing.status = ? ' .
'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL)',
$newStatus
);

// Select and lock all related schedules - this prevents deadlock in case cron overlaps and two jobs of
// the same code attempt to lock at the same time, and force them to serialize
$selectIfUnlocked = $connection->select()
->from(
['current' => $this->getTable('cron_schedule')],
[]
)
->joinLeft(
['existing' => $this->getTable('cron_schedule')],
$match,
['status' => new \Zend_Db_Expr($connection->quote($newStatus))]
['existing.schedule_id']
)
->where('current.schedule_id = ?', $scheduleId)
->where('current.status = ?', $currentStatus)
->where('existing.schedule_id IS NULL');

$update = $connection->updateFromSelect($selectIfUnlocked, ['current' => $this->getTable('cron_schedule')]);
$result = $connection->query($update)->rowCount();
->forUpdate(true);

if ($result == 1) {
return true;
$scheduleId = $connection->fetchOne($selectIfUnlocked);
if (!empty($scheduleId)) {
// Existing running schedule found
$connection->commit();
return false;
}
return false;

// Mark our schedule as running
$connection->update(
$this->getTable('cron_schedule'),
['status' => new \Zend_Db_Expr($connection->quote($newStatus))],
['schedule_id = ?' => $scheduleId]
);

$connection->commit();
return true;
}
}
61 changes: 45 additions & 16 deletions app/code/Magento/Cron/Observer/ProcessCronQueueObserver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
namespace Magento\Cron\Observer;

use Magento\Cron\Model\ResourceModel\Schedule\Collection as ScheduleCollection;
use Magento\Cron\Model\Schedule;
use Magento\Framework\App\State;
use Magento\Framework\Console\Cli;
Expand Down Expand Up @@ -83,7 +84,7 @@ class ProcessCronQueueObserver implements ObserverInterface
const MAX_RETRIES = 5;

/**
* @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
* @var ScheduleCollection
*/
protected $_pendingSchedules;

Expand Down Expand Up @@ -278,12 +279,12 @@ function ($groupId) use ($currentTime) {
*
* It should be taken by standalone (child) process, not by the parent process.
*
* @param int $groupId
* @param string $groupId
* @param callable $callback
*
* @return void
*/
private function lockGroup($groupId, callable $callback)
private function lockGroup(string $groupId, callable $callback): void
{
if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
$this->logger->warning(
Expand Down Expand Up @@ -399,7 +400,7 @@ function () use ($schedule) {
* @param string $jobName
* @return void
*/
private function startProfiling(string $jobName = '')
private function startProfiling(string $jobName = ''): void
{
$this->statProfiler->clear();
$this->statProfiler->start(
Expand All @@ -416,7 +417,7 @@ private function startProfiling(string $jobName = '')
* @param string $jobName
* @return void
*/
private function stopProfiling(string $jobName = '')
private function stopProfiling(string $jobName = ''): void
{
$this->statProfiler->stop(
sprintf(self::CRON_TIMERID, $jobName),
Expand Down Expand Up @@ -445,9 +446,9 @@ private function getProfilingStat(string $jobName): string
* Return job collection from data base with status 'pending'.
*
* @param string $groupId
* @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
* @return ScheduleCollection
*/
private function getPendingSchedules($groupId)
private function getPendingSchedules(string $groupId): ScheduleCollection
{
$jobs = $this->_config->getJobs();
$pendingJobs = $this->_scheduleFactory->create()->getCollection();
Expand All @@ -462,7 +463,7 @@ private function getPendingSchedules($groupId)
* @param string $groupId
* @return $this
*/
private function generateSchedules($groupId)
private function generateSchedules(string $groupId): self
{
/**
* check if schedule generation is needed
Expand Down Expand Up @@ -533,13 +534,13 @@ protected function _generateJobs($jobs, $exists, $groupId)
* @param int $currentTime
* @return void
*/
private function cleanupJobs($groupId, $currentTime)
private function cleanupJobs(string $groupId, int $currentTime): void
{
// check if history cleanup is needed
$lastCleanup = (int)$this->_cache->load(self::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $groupId);
$historyCleanUp = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_CLEANUP_EVERY);
if ($lastCleanup > $this->dateTime->gmtTimestamp() - $historyCleanUp * self::SECONDS_IN_MINUTE) {
return $this;
return;
}
// save time history cleanup was ran with no expiration
$this->_cache->save(
Expand All @@ -550,6 +551,7 @@ private function cleanupJobs($groupId, $currentTime)
);

$this->cleanupDisabledJobs($groupId);
$this->cleanupRunningJobs($groupId);

$historySuccess = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_SUCCESS);
$historyFailure = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_FAILURE);
Expand Down Expand Up @@ -673,7 +675,7 @@ protected function getScheduleTimeInterval($groupId)
* @param string $groupId
* @return void
*/
private function cleanupDisabledJobs($groupId)
private function cleanupDisabledJobs(string $groupId): void
{
$jobs = $this->_config->getJobs();
$jobsToCleanup = [];
Expand All @@ -696,6 +698,33 @@ private function cleanupDisabledJobs($groupId)
}
}

/**
* Cleanup jobs that were left in a running state due to an unexpected stop
*
* @param string $groupId
* @return void
*/
private function cleanupRunningJobs(string $groupId): void
{
$scheduleResource = $this->_scheduleFactory->create()->getResource();
$connection = $scheduleResource->getConnection();

$jobs = $this->_config->getJobs();

$connection->update(
$scheduleResource->getTable('cron_schedule'),
[
'status' => \Magento\Cron\Model\Schedule::STATUS_ERROR,
'messages' => 'Time out'
],
[
$connection->quoteInto('status = ?', \Magento\Cron\Model\Schedule::STATUS_RUNNING),
$connection->quoteInto('job_code IN (?)', array_keys($jobs[$groupId])),
'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY'
]
);
}

/**
* Get cron expression of cron job.
*
Expand Down Expand Up @@ -773,13 +802,13 @@ private function isGroupInFilter($groupId): bool
* @param array $jobsRoot
* @param int $currentTime
*/
private function processPendingJobs($groupId, $jobsRoot, $currentTime)
private function processPendingJobs(string $groupId, array $jobsRoot, int $currentTime): void
{
$procesedJobs = [];
$processedJobs = [];
$pendingJobs = $this->getPendingSchedules($groupId);
/** @var Schedule $schedule */
foreach ($pendingJobs as $schedule) {
if (isset($procesedJobs[$schedule->getJobCode()])) {
if (isset($processedJobs[$schedule->getJobCode()])) {
// process only on job per run
continue;
}
Expand All @@ -796,7 +825,7 @@ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
$this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);

if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
$procesedJobs[$schedule->getJobCode()] = true;
$processedJobs[$schedule->getJobCode()] = true;
}

$this->retrier->execute(
Expand All @@ -821,7 +850,7 @@ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule,
{
// use sha1 to limit length
// phpcs:ignore Magento2.Security.InsecureFunction
$lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
$lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());

try {
for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
Expand Down
13 changes: 12 additions & 1 deletion app/code/Magento/Cron/Test/Unit/Model/DeadlockRetrierTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Magento\Framework\DB\Adapter\AdapterInterface;
use Magento\Framework\DB\Adapter\DeadlockException;
use PHPUnit\Framework\MockObject\MockObject;
use Psr\Log\LoggerInterface;

class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
{
Expand All @@ -27,6 +28,11 @@ class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
*/
private $adapterMock;

/**
* @var LoggerInterface|MockObject
*/
private $loggerMock;

/**
* @var AbstractModel|MockObject
*/
Expand All @@ -38,8 +44,9 @@ class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
protected function setUp(): void
{
$this->adapterMock = $this->getMockForAbstractClass(AdapterInterface::class);
$this->loggerMock = $this->getMockForAbstractClass(LoggerInterface::class);
$this->modelMock = $this->createMock(AbstractModel::class);
$this->retrier = new DeadlockRetrier();
$this->retrier = new DeadlockRetrier($this->loggerMock);
}

/**
Expand Down Expand Up @@ -75,6 +82,8 @@ public function testRetry(): void
$this->modelMock->expects($this->exactly(DeadlockRetrierInterface::MAX_RETRIES))
->method('getId')
->willThrowException(new DeadlockException());
$this->loggerMock->expects($this->exactly(DeadlockRetrierInterface::MAX_RETRIES - 1))
->method('warning');

$this->retrier->execute(
function () {
Expand All @@ -95,6 +104,8 @@ public function testRetrySecond(): void
$this->modelMock->expects($this->at(1))
->method('getId')
->willReturn(2);
$this->loggerMock->expects($this->once())
->method('warning');

$this->retrier->execute(
function () {
Expand Down
Loading

0 comments on commit 0fc0dc4

Please sign in to comment.