Skip to content

Commit 35421a8

Browse files
author
Christian Bader
committed
Implemened subprocesses to prevent memory exhaustion on huge indices
1 parent 0f94137 commit 35421a8

10 files changed

+286
-94
lines changed

Diff for: src/Command/BaseCommand.php

+12-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,16 @@
88

99
abstract class BaseCommand extends AbstractCommand
1010
{
11-
protected const COMMAND_NAMESPACE = 'valantic:elastica-bridge:';
11+
protected function displayThrowable(\Throwable $throwable): void
12+
{
13+
$this->output->writeln('');
14+
$this->output->writeln(sprintf('In %s line %d', $throwable->getFile(), $throwable->getLine()));
15+
$this->output->writeln('');
16+
17+
$this->output->writeln($throwable->getMessage());
18+
$this->output->writeln('');
19+
20+
$this->output->writeln($throwable->getTraceAsString());
21+
$this->output->writeln('');
22+
}
1223
}

Diff for: src/Command/Cleanup.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Symfony\Component\Console\Input\InputOption;
1111
use Symfony\Component\Console\Output\OutputInterface;
1212
use Symfony\Component\Console\Question\ConfirmationQuestion;
13+
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
1314
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
1415
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
1516

@@ -27,7 +28,7 @@ public function __construct(
2728

2829
protected function configure(): void
2930
{
30-
$this->setName(self::COMMAND_NAMESPACE . 'cleanup')
31+
$this->setName(CommandConstants::COMMAND_CLEANUP)
3132
->setDescription('Deletes Elasticsearch indices and aliases known to (i.e. created by) the bundle')
3233
->addOption(
3334
self::OPTION_ALL_IN_CLUSTER,

Diff for: src/Command/DoPopulateIndex.php

+166
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Valantic\ElasticaBridgeBundle\Command;
6+
7+
use Elastica\Index as ElasticaIndex;
8+
use Pimcore\Model\Element\AbstractElement;
9+
use Symfony\Component\Console\Helper\ProgressBar;
10+
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Input\InputOption;
12+
use Symfony\Component\Console\Output\OutputInterface;
13+
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
14+
use Valantic\ElasticaBridgeBundle\Exception\Command\DocumentFailedException;
15+
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
16+
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
17+
use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository;
18+
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
19+
use Valantic\ElasticaBridgeBundle\Service\DocumentHelper;
20+
21+
class DoPopulateIndex extends BaseCommand
22+
{
23+
public function __construct(
24+
private readonly IndexRepository $indexRepository,
25+
private readonly DocumentRepository $documentRepository,
26+
private readonly DocumentHelper $documentHelper,
27+
private readonly ConfigurationRepository $configurationRepository,
28+
) {
29+
parent::__construct();
30+
}
31+
32+
protected function configure(): void
33+
{
34+
$this->setName(CommandConstants::COMMAND_DO_POPULATE_INDEX)
35+
->setHidden(true)
36+
->setDescription('[INTERNAL]')
37+
->addOption(CommandConstants::OPTION_CONFIG, mode: InputOption::VALUE_REQUIRED)
38+
->addOption(CommandConstants::OPTION_INDEX, mode: InputOption::VALUE_REQUIRED)
39+
->addOption(CommandConstants::OPTION_BATCH_NUMBER, mode: InputOption::VALUE_REQUIRED)
40+
->addOption(CommandConstants::OPTION_LISTING_COUNT, mode: InputOption::VALUE_REQUIRED)
41+
->addOption(CommandConstants::OPTION_DOCUMENT, mode: InputOption::VALUE_REQUIRED)
42+
;
43+
}
44+
45+
protected function execute(InputInterface $input, OutputInterface $output): int
46+
{
47+
$indexConfig = $this->getIndex();
48+
49+
if (!$indexConfig instanceof IndexInterface) {
50+
return self::FAILURE;
51+
}
52+
53+
return $this->populateIndex($indexConfig, $indexConfig->getBlueGreenInactiveElasticaIndex());
54+
}
55+
56+
private function getIndex(): ?IndexInterface
57+
{
58+
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
59+
if ($indexConfig->getName() === $this->input->getOption(CommandConstants::OPTION_CONFIG)) {
60+
return $indexConfig;
61+
}
62+
}
63+
64+
return null;
65+
}
66+
67+
private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esIndex): int
68+
{
69+
ProgressBar::setFormatDefinition('custom', "%percent%%\t%remaining%\t%memory%\n%message%");
70+
71+
$batchNumber = (int) $this->input->getOption(CommandConstants::OPTION_BATCH_NUMBER);
72+
$listingCount = (int) $this->input->getOption(CommandConstants::OPTION_LISTING_COUNT);
73+
74+
$allowedDocuments = $indexConfig->getAllowedDocuments();
75+
$document = $this->input->getOption(CommandConstants::OPTION_DOCUMENT);
76+
77+
if (!in_array($document, $allowedDocuments, true)) {
78+
return self::FAILURE;
79+
}
80+
81+
$progressBar = new ProgressBar($this->output, $listingCount > 0 ? $listingCount : 1);
82+
$progressBar->setMessage($document);
83+
$progressBar->setFormat('custom');
84+
$progressBar->setProgress($batchNumber * $indexConfig->getBatchSize());
85+
86+
if (!$indexConfig->shouldPopulateInSubprocesses()) {
87+
$numberOfBatches = ceil($listingCount / $indexConfig->getBatchSize());
88+
89+
for ($batch = 0; $batch < $numberOfBatches; $batch++) {
90+
$exitCode = $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batch);
91+
92+
if ($exitCode !== self::SUCCESS) {
93+
return $exitCode;
94+
}
95+
}
96+
} else {
97+
return $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batchNumber);
98+
}
99+
100+
return self::SUCCESS;
101+
}
102+
103+
private function doPopulateIndex(
104+
ElasticaIndex $esIndex,
105+
IndexInterface $indexConfig,
106+
ProgressBar $progressBar,
107+
string $document,
108+
int $batchNumber,
109+
): int {
110+
$documentInstance = $this->documentRepository->get($document);
111+
112+
$this->documentHelper->setTenantIfNeeded($documentInstance, $indexConfig);
113+
114+
$batchSize = $indexConfig->getBatchSize();
115+
116+
$listing = $documentInstance->getListingInstance($indexConfig);
117+
$listing->setOffset($batchNumber * $batchSize);
118+
$listing->setLimit($batchSize);
119+
120+
$esDocuments = [];
121+
122+
foreach ($listing->getData() ?? [] as $dataObject) {
123+
try {
124+
if (!$documentInstance->shouldIndex($dataObject)) {
125+
continue;
126+
}
127+
$progressBar->advance();
128+
129+
$esDocuments[] = $this->documentHelper->elementToDocument($documentInstance, $dataObject);
130+
} catch (\Throwable $throwable) {
131+
$this->displayDocumentError($indexConfig, $document, $dataObject, $throwable);
132+
133+
if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
134+
throw new DocumentFailedException($throwable);
135+
}
136+
}
137+
}
138+
139+
if (count($esDocuments) > 0) {
140+
$esIndex->addDocuments($esDocuments);
141+
$esDocuments = [];
142+
}
143+
144+
if ($indexConfig->refreshIndexAfterEveryDocumentWhenPopulating()) {
145+
$esIndex->refresh();
146+
}
147+
148+
return self::SUCCESS;
149+
}
150+
151+
private function displayDocumentError(
152+
IndexInterface $indexConfig,
153+
string $document,
154+
AbstractElement $dataObject,
155+
\Throwable $throwable,
156+
): void {
157+
$this->output->writeln('');
158+
$this->output->writeln(sprintf(
159+
'<fg=red;options=bold>Error while populating index %s, processing documents of type %s, last processed element ID %s.</>',
160+
$indexConfig::class,
161+
$document,
162+
$dataObject->getId()
163+
));
164+
$this->displayThrowable($throwable);
165+
}
166+
}

Diff for: src/Command/Index.php

+5-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Symfony\Component\Console\Output\OutputInterface;
1313
use Symfony\Component\HttpKernel\KernelInterface;
1414
use Symfony\Component\Process\Process;
15+
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
1516
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
1617
use Valantic\ElasticaBridgeBundle\Enum\IndexBlueGreenSuffix;
1718
use Valantic\ElasticaBridgeBundle\Exception\Index\BlueGreenIndicesIncorrectlySetupException;
@@ -39,7 +40,7 @@ public function __construct(
3940

4041
protected function configure(): void
4142
{
42-
$this->setName(self::COMMAND_NAMESPACE . 'index')
43+
$this->setName(CommandConstants::COMMAND_INDEX)
4344
->setDescription('Ensures all the indices are present and populated.')
4445
->addArgument(
4546
self::ARGUMENT_INDEX,
@@ -169,9 +170,9 @@ private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esInd
169170
self::$isPopulating = true;
170171
$process = new Process(
171172
[
172-
'bin/console', self::COMMAND_NAMESPACE . 'populate-index',
173-
'--config', $indexConfig->getName(),
174-
'--index', $esIndex->getName(),
173+
'bin/console', CommandConstants::COMMAND_POPULATE_INDEX,
174+
'--' . CommandConstants::OPTION_CONFIG, $indexConfig->getName(),
175+
'--' . CommandConstants::OPTION_INDEX, $esIndex->getName(),
175176
...array_filter([$this->output->isVerbose() ? '-v' : null,
176177
$this->output->isVeryVerbose() ? '-vv' : null,
177178
$this->output->isDebug() ? '-vvv' : null,

0 commit comments

Comments
 (0)