Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCT-169: fix migration of ES index #18374

Merged
merged 2 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions upgrades/.php_cd.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
'Symfony\Component\Console\Input\ArrayInput',
'Symfony\Component\Console\Output\BufferedOutput',
'Symfony\Component\DependencyInjection\ParameterBag\ParameterBag',
'Symfony\Component\Yaml\Yaml',
'Webmozart\Assert\Assert',
]
)->in('Pim\Upgrade\Schema'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Elasticsearch\ClientBuilder;
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\Yaml\Yaml;
use Webmozart\Assert\Assert;

/**
Expand All @@ -33,6 +34,11 @@ public function up(Schema $schema): void
throw new \RuntimeException('Unable to retrieve existing mapping.');
}

if ('text' === (current($existingMapping)['mappings']['properties']['id']['type'] ?? '')) {
$this->reindexWhenThereIsAlreadyAnId();
return;
}

$client->putMapping([
'index' => $eventsApiDebugIndexName,
'body' => [
Expand All @@ -41,7 +47,79 @@ public function up(Schema $schema): void
],
],
]);
}

private function reindexWhenThereIsAlreadyAnId(): void
{
$builder = $this->container->get('akeneo_elasticsearch.client_builder');
$builder->setHosts([$this->container->getParameter('index_hosts')]);
/** @var \Elasticsearch\Client $client */
$client = $builder->build();
$alias = $this->container->getParameter('events_api_debug_index_name');
$copy = sprintf('%s_copy', $alias);
$indice = array_keys($client->indices()->getAlias(['name' => $alias]))[0];

$mapping = $this->getMappingConfiguration();

$client->indices()->create([
'index' => $copy,
'body' => $mapping,
]);

$client->reindex([
'refresh' => true,
'body' => [
'source' => [
'index' => $indice,
],
'dest' => [
'index' => $copy,
],
],
]);

$client->indices()->putAlias([
'name' => $alias,
'index' => $copy,
]);

$client->indices()->delete([
'index' => $indice,
]);

$client->indices()->create([
'index' => $indice,
'body' => $mapping,
]);

$client->reindex([
'refresh' => true,
'body' => [
'source' => [
'index' => $copy,
],
'dest' => [
'index' => $indice,
],
],
]);

$client->indices()->putAlias([
'name' => $alias,
'index' => $indice,
]);

$client->indices()->delete([
'index' => $copy,
]);
}

private function getMappingConfiguration(): array
{
$ceDir = $this->container->getParameter('pim_ce_dev_src_folder_location');
$path = "{$ceDir}/src/Akeneo/Connectivity/Connection/back/Infrastructure/Symfony/Resources/elasticsearch/events_api_debug_mapping.yml";

return Yaml::parseFile($path);
}

public function down(Schema $schema): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Elasticsearch\ClientBuilder;
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\Yaml\Yaml;
use Webmozart\Assert\Assert;

/**
Expand All @@ -33,6 +34,11 @@ public function up(Schema $schema) : void
throw new \RuntimeException('Unable to retrieve existing mapping.');
}

if ('text' === (current($existingMapping)['mappings']['properties']['id']['type'] ?? '')) {
$this->reindexWhenThereIsAlreadyAnId();
return;
}

$client->putMapping([
'index' => $connectionErrorIndexName,
'body' => [
Expand All @@ -43,6 +49,79 @@ public function up(Schema $schema) : void
]);
}

private function reindexWhenThereIsAlreadyAnId(): void
{
$builder = $this->container->get('akeneo_elasticsearch.client_builder');
$builder->setHosts([$this->container->getParameter('index_hosts')]);
/** @var \Elasticsearch\Client $client */
$client = $builder->build();
$alias = $this->container->getParameter('connection_error_index_name');
$copy = sprintf('%s_copy', $alias);
$indice = array_keys($client->indices()->getAlias(['name' => $alias]))[0];

$mapping = $this->getMappingConfiguration();

$client->indices()->create([
'index' => $copy,
'body' => $mapping,
]);

$client->reindex([
'refresh' => true,
'body' => [
'source' => [
'index' => $indice,
],
'dest' => [
'index' => $copy,
],
],
]);

$client->indices()->putAlias([
'name' => $alias,
'index' => $copy,
]);

$client->indices()->delete([
'index' => $indice,
]);

$client->indices()->create([
'index' => $indice,
'body' => $mapping,
]);

$client->reindex([
'refresh' => true,
'body' => [
'source' => [
'index' => $copy,
],
'dest' => [
'index' => $indice,
],
],
]);

$client->indices()->putAlias([
'name' => $alias,
'index' => $indice,
]);

$client->indices()->delete([
'index' => $copy,
]);
}

private function getMappingConfiguration(): array
{
$ceDir = $this->container->getParameter('pim_ce_dev_src_folder_location');
$path = "{$ceDir}/src/Akeneo/Connectivity/Connection/back/Infrastructure/Symfony/Resources/elasticsearch/connection_error_mapping.yml";

return Yaml::parseFile($path);
}

public function down(Schema $schema) : void
{
$this->throwIrreversibleMigrationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

namespace Pim\Upgrade\Schema\Tests;

use Akeneo\Connectivity\Connection\Domain\Webhook\Model\EventsApiDebugLogLevels;
use Akeneo\Test\Integration\TestCase;
use Akeneo\Tool\Bundle\ElasticsearchBundle\Client;
use Akeneo\Tool\Bundle\ElasticsearchBundle\IndexConfiguration\Loader;
use Elasticsearch\Client as NativeClient;
use Ramsey\Uuid\Uuid;
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
use Symfony\Component\Yaml\Yaml;

Expand Down Expand Up @@ -37,7 +39,7 @@ protected function setUp(): void
$this->eventsApiDebugClient = $this->get('akeneo_connectivity.client.events_api_debug');
}

public function test_it_adds_the_entity_updated_property_to_the_mapping(): void
public function test_it_adds_the_id_property_to_the_mapping(): void
{
$this->recreateIndexWithoutIdFieldInTheMapping();
$properties = $this->getIndexMappingProperties();
Expand All @@ -50,6 +52,43 @@ public function test_it_adds_the_entity_updated_property_to_the_mapping(): void
self::assertSame(['type' => 'keyword'], $properties['id']);
}

public function test_it_changes_the_id_property_from_text_to_keyword(): void
{
$this->recreateIndexWithoutIdFieldInTheMapping();
self::assertArrayNotHasKey('id', $this->getIndexMappingProperties());

$this->nativeClient->index([
'index' => self::getContainer()->getParameter('events_api_debug_index_name'),
'body' => [
'id' => 'aa63292c-a06c-4c50-afb9-c98c97dc8a13',
'timestamp' => '1667946703',
'level' => 'notice',
'message' => 'foobar',
'connection_code' => 'foo',
'context' => [],
],
]);
$this->nativeClient->indices()->refresh();
self::assertSame('text', $this->getIndexMappingProperties()['id']['type']);

$this->reExecuteMigration(self::MIGRATION_LABEL);

$properties = $this->getIndexMappingProperties();
self::assertSame('keyword', $properties['id']['type']);

$documents = $this->nativeClient->search([
'index' => self::getContainer()->getParameter('events_api_debug_index_name'),
]);
self::assertSame([
'id' => 'aa63292c-a06c-4c50-afb9-c98c97dc8a13',
'timestamp' => '1667946703',
'level' => 'notice',
'message' => 'foobar',
'connection_code' => 'foo',
'context' => [],
], $documents['hits']['hits'][0][('_source')]);
}

private function recreateIndexWithoutIdFieldInTheMapping(): void
{
$configFiles = $this->getParameter('elasticsearch_index_configuration_files');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ protected function setUp(): void
$this->connectionErrorClient = $this->get('akeneo_connectivity.client.connection_error');
}

/** @test */
public function it_adds_the_entity_updated_property_to_the_mapping(): void
public function test_it_adds_the_id_property_to_the_mapping(): void
{

$this->recreateConnectionErrorIndexWithoutIdInTheMapping();
$properties = $this->getMappingProperties();
self::assertArrayNotHasKey('id', $properties);
Expand All @@ -52,6 +50,39 @@ public function it_adds_the_entity_updated_property_to_the_mapping(): void
self::assertSame(['type' => 'keyword'], $properties['id']);
}

public function test_it_changes_the_id_property_from_text_to_keyword(): void
{
$this->recreateConnectionErrorIndexWithoutIdInTheMapping();
self::assertArrayNotHasKey('id', $this->getMappingProperties());

$this->nativeClient->index([
'index' => self::getContainer()->getParameter('connection_error_index_name'),
'body' => [
'id' => 'aa63292c-a06c-4c50-afb9-c98c97dc8a13',
'connection_code' => 'foo',
'content' => [],
'error_datetime' => '2021-01-03T02:30:00+01:00',
],
]);
$this->nativeClient->indices()->refresh();
self::assertSame('text', $this->getMappingProperties()['id']['type']);

$this->reExecuteMigration(self::MIGRATION_LABEL);

$properties = $this->getMappingProperties();
self::assertSame('keyword', $properties['id']['type']);

$documents = $this->nativeClient->search([
'index' => self::getContainer()->getParameter('connection_error_index_name'),
]);
self::assertSame([
'id' => 'aa63292c-a06c-4c50-afb9-c98c97dc8a13',
'connection_code' => 'foo',
'content' => [],
'error_datetime' => '2021-01-03T02:30:00+01:00',
], $documents['hits']['hits'][0][('_source')]);
}

private function recreateConnectionErrorIndexWithoutIdInTheMapping(): void
{
$configFiles = $this->getParameter('elasticsearch_index_configuration_files');
Expand Down