Skip to content

Commit ddd708c

Browse files
WIP: Implementing subprocesses
1 parent 01f15c2 commit ddd708c

File tree

4 files changed

+270
-100
lines changed

4 files changed

+270
-100
lines changed

Diff for: src/Command/DoPopulateIndex.php

+210
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Valantic\ElasticaBridgeBundle\Command;
6+
7+
use Elastica\Index as ElasticaIndex;
8+
use Pimcore\Model\Element\AbstractElement;
9+
use Symfony\Component\Console\Helper\ProgressBar;
10+
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Input\InputOption;
12+
use Symfony\Component\Console\Output\OutputInterface;
13+
use Valantic\ElasticaBridgeBundle\Document\DocumentInterface;
14+
use Valantic\ElasticaBridgeBundle\Exception\Command\DocumentFailedException;
15+
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
16+
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
17+
use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository;
18+
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
19+
use Valantic\ElasticaBridgeBundle\Service\DocumentHelper;
20+
21+
class DoPopulateIndex extends BaseCommand
22+
{
23+
private const OPTION_CONFIG = 'config';
24+
private const OPTION_INDEX = 'index';
25+
private const OPTION_BATCH_NUMBER = 'batch-number';
26+
private const OPTION_LISTING_COUNT = 'listing-count';
27+
private const OPTION_DOCUMENT = 'document';
28+
29+
public function __construct(
30+
private readonly IndexRepository $indexRepository,
31+
private readonly DocumentRepository $documentRepository,
32+
private readonly DocumentHelper $documentHelper,
33+
private readonly ConfigurationRepository $configurationRepository,
34+
) {
35+
parent::__construct();
36+
}
37+
38+
protected function configure(): void
39+
{
40+
$this->setName(self::COMMAND_NAMESPACE . 'do-populate-index')
41+
->setHidden(true)
42+
->setDescription('[INTERNAL]')
43+
->addOption(self::OPTION_CONFIG, mode: InputOption::VALUE_REQUIRED)
44+
->addOption(self::OPTION_INDEX, mode: InputOption::VALUE_REQUIRED)
45+
->addOption(self::OPTION_BATCH_NUMBER, mode: InputOption::VALUE_REQUIRED)
46+
->addOption(self::OPTION_LISTING_COUNT, mode: InputOption::VALUE_REQUIRED)
47+
->addOption(self::OPTION_DOCUMENT, mode: InputOption::VALUE_REQUIRED)
48+
;
49+
}
50+
51+
protected function execute(InputInterface $input, OutputInterface $output): int
52+
{
53+
$indexConfig = $this->getIndex();
54+
55+
if (!$indexConfig instanceof IndexInterface) {
56+
return self::FAILURE;
57+
}
58+
59+
$index = $indexConfig->getBlueGreenInactiveElasticaIndex();
60+
$success = $this->populateIndex($indexConfig, $index);
61+
62+
if (!$success) {
63+
return self::FAILURE;
64+
}
65+
66+
return self::SUCCESS;
67+
}
68+
69+
private function getIndex(): ?IndexInterface
70+
{
71+
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
72+
if ($indexConfig->getName() === $this->input->getOption(self::OPTION_CONFIG)) {
73+
return $indexConfig;
74+
}
75+
}
76+
77+
return null;
78+
}
79+
80+
private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esIndex): bool
81+
{
82+
ProgressBar::setFormatDefinition('custom', "%percent%%\t%remaining%\t%memory%\n%message%");
83+
84+
$batchNumber = (int) $this->input->getOption(self::OPTION_BATCH_NUMBER);
85+
$listingCount = (int) $this->input->getOption(self::OPTION_LISTING_COUNT);
86+
87+
$progressBar = new ProgressBar($this->output, $listingCount > 0 ? $listingCount : 1);
88+
$progressBar->setMessage('');
89+
$progressBar->setFormat('custom');
90+
91+
$progressBar->setProgress($batchNumber * $indexConfig->getBatchSize());
92+
93+
$allowedDocuments = $indexConfig->getAllowedDocuments();
94+
$document = $this->input->getOption(self::OPTION_DOCUMENT);
95+
96+
if (!in_array($document, $allowedDocuments, true)) {
97+
return false;
98+
}
99+
100+
$progressBar->setMessage($document);
101+
102+
if (!$indexConfig->shouldIndexInSubprocesses()) {
103+
$numberOfBatches = ceil($listingCount / $indexConfig->getBatchSize());
104+
105+
for ($batch = 0; $batch < $numberOfBatches; $batch++) {
106+
$success = $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batch);
107+
108+
if (!$success) {
109+
return false;
110+
}
111+
}
112+
113+
return true;
114+
} else {
115+
return $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batchNumber);
116+
}
117+
}
118+
119+
private function doPopulateIndex(
120+
ElasticaIndex $esIndex,
121+
IndexInterface $indexConfig,
122+
ProgressBar $progressBar,
123+
string $document,
124+
int $batchNumber,
125+
): bool {
126+
$documentInstance = $this->documentRepository->get($document);
127+
128+
try {
129+
$listing = $documentInstance->getListingInstance($indexConfig);
130+
$listing->setOffset($batchNumber * $indexConfig->getBatchSize());
131+
$listing->setLimit($indexConfig->getBatchSize());
132+
133+
$esDocuments = [];
134+
foreach ($listing->getData() ?? [] as $dataObject) {
135+
try {
136+
$progressBar->advance();
137+
138+
if (!$documentInstance->shouldIndex($dataObject)) {
139+
continue;
140+
}
141+
142+
$esDocuments[] = $this->documentHelper->elementToDocument($documentInstance, $dataObject);
143+
} catch (\Throwable $throwable) {
144+
$this->displayDocumentError($indexConfig, $document, $dataObject, $throwable);
145+
146+
if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
147+
throw new DocumentFailedException($throwable);
148+
}
149+
}
150+
}
151+
152+
if (count($esDocuments) > 0) {
153+
$esIndex->addDocuments($esDocuments);
154+
$esDocuments = [];
155+
}
156+
157+
if ($indexConfig->refreshIndexAfterEveryDocumentWhenPopulating()) {
158+
$esIndex->refresh();
159+
}
160+
} catch (\Throwable $throwable) {
161+
$this->displayIndexError($indexConfig, $throwable);
162+
163+
return false;
164+
} finally {
165+
$this->documentHelper->setTenantIfNeeded($documentInstance, $indexConfig);
166+
}
167+
168+
return true;
169+
}
170+
171+
private function displayDocumentError(
172+
IndexInterface $indexConfig,
173+
string $document,
174+
AbstractElement $dataObject,
175+
\Throwable $throwable,
176+
): void {
177+
$this->output->writeln('');
178+
$this->output->writeln(sprintf(
179+
'<fg=red;options=bold>Error while populating index %s, processing documents of type %s, last processed element ID %s.</>',
180+
$indexConfig::class,
181+
$document,
182+
$dataObject->getId()
183+
));
184+
$this->displayThrowable($throwable);
185+
}
186+
187+
private function displayIndexError(IndexInterface $indexConfig, \Throwable $throwable): void
188+
{
189+
$this->output->writeln('');
190+
$this->output->writeln(sprintf(
191+
'<fg=red;options=bold>Error while populating index %s.</>',
192+
$indexConfig::class,
193+
));
194+
195+
$this->displayThrowable($throwable);
196+
}
197+
198+
private function displayThrowable(\Throwable $throwable): void
199+
{
200+
$this->output->writeln('');
201+
$this->output->writeln(sprintf('In %s line %d', $throwable->getFile(), $throwable->getLine()));
202+
$this->output->writeln('');
203+
204+
$this->output->writeln($throwable->getMessage());
205+
$this->output->writeln('');
206+
207+
$this->output->writeln($throwable->getTraceAsString());
208+
$this->output->writeln('');
209+
}
210+
}

0 commit comments

Comments
 (0)