From f2e1f786cc6dfe9c5cddde2b8514d40cffb0a852 Mon Sep 17 00:00:00 2001 From: Marko Date: Fri, 8 Apr 2022 16:10:16 +0200 Subject: [PATCH 1/2] Add option to use versions when registering schemas --- src/Command/RegisterChangedSchemasCommand.php | 32 +++++++++++++--- .../RegisterChangedSchemasCommandTest.php | 38 ++++++++++++++++++- 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/src/Command/RegisterChangedSchemasCommand.php b/src/Command/RegisterChangedSchemasCommand.php index 0fbaf5e..27adce6 100644 --- a/src/Command/RegisterChangedSchemasCommand.php +++ b/src/Command/RegisterChangedSchemasCommand.php @@ -9,6 +9,7 @@ use Jobcloud\SchemaConsole\Helper\SchemaFileHelper; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; @@ -43,7 +44,13 @@ protected function configure(): void ->setName('kafka-schema-registry:register:changed') ->setDescription('Register all changed schemas from a path') ->setHelp('Register all changed schemas from a path') - ->addArgument('schemaDirectory', InputArgument::REQUIRED, 'Path to avro schema directory'); + ->addArgument('schemaDirectory', InputArgument::REQUIRED, 'Path to avro schema directory') + ->addOption( + 'useSchemaVersioning', + null, + InputOption::VALUE_NONE, + 'Register schemas with multiple versions (e.g. ch.jobcloud.namespace.schema.1.avsc)' + ); } /** @@ -64,8 +71,16 @@ public function execute(InputInterface $input, OutputInterface $output): int $failed = []; $succeeded = []; + $useSchemaVersioning = (bool) $input->getOption('useSchemaVersioning'); + + $successMessage = '%s with new version: %s'; + if ($useSchemaVersioning) { + natsort($avroFiles); + $successMessage = '%s with new versions, the latest being: %s'; + } + while (false === $this->abortRegister) { - if (false === $this->registerFiles($avroFiles, $io, $failed, $succeeded)) { + if (false === $this->registerFiles($avroFiles, $io, $failed, $succeeded, $useSchemaVersioning)) { return 1; } @@ -79,8 +94,8 @@ public function execute(InputInterface $input, OutputInterface $output): int if (isset($succeeded) && 0 !== count($succeeded)) { $io->success('Succeeded registering the following schemas:'); - $io->listing(array_map(static function ($item) { - return sprintf('%s with new version: %s', $item['name'], $item['version']); + $io->listing(array_map(static function ($item) use ($successMessage) { + return sprintf($successMessage, $item['name'], $item['version']); }, $succeeded)); } @@ -92,13 +107,15 @@ public function execute(InputInterface $input, OutputInterface $output): int * @param SymfonyStyle $io * @param array $failed * @param array $succeeded + * @param bool $useSchemaVersioning * @return boolean */ private function registerFiles( array $avroFiles, SymfonyStyle $io, array &$failed = [], - array &$succeeded = [] + array &$succeeded = [], + bool $useSchemaVersioning = false ): bool { foreach ($avroFiles as $schemaName => $avroFile) { /** @var string $fileContents */ @@ -110,6 +127,11 @@ private function registerFiles( /** @var string $localSchema */ $localSchema = json_encode($jsonDecoded); + if ($useSchemaVersioning) { + /** @var string $schemaName */ + $schemaName = preg_replace('/[.0-9]*$/', '', $schemaName); + } + try { $latestVersion = $this->schemaRegistryApi->getLatestSubjectVersion($schemaName); } catch (SubjectNotFoundException $e) { diff --git a/tests/Command/RegisterChangedSchemasCommandTest.php b/tests/Command/RegisterChangedSchemasCommandTest.php index 35f8be6..39e40df 100644 --- a/tests/Command/RegisterChangedSchemasCommandTest.php +++ b/tests/Command/RegisterChangedSchemasCommandTest.php @@ -121,7 +121,6 @@ public function testOutputWhenCommandRegisterWithSuccess():void public function testOutputWhenCommandSuccessWithSkipping():void { - $this->generateFiles(5); /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ @@ -154,7 +153,6 @@ public function testOutputWhenCommandSuccessWithSkipping():void public function testOutputWhenCommandSuccessWithAllNew():void { - $this->generateFiles(5); /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ @@ -247,4 +245,40 @@ public function testOutputTotalFailDueToIncompatibility():void self::assertEquals(1, $commandTester->getStatusCode()); } + + public function testOutputWhenCommandRegisterWithSuccessAndVersioningOption():void + { + $numFiles = 5; + $this->generateFiles($numFiles); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class, [ + 'checkSchemaCompatibilityForVersion' => TRUE, + 'getSchemaDefinitionByVersion', + 'getVersionForSchema', + 'registerNewSchemaVersion', + 'getLatestSubjectVersion' => '1', + ]); + + $schemaRegistryApi + ->method('getSchemaDefinitionByVersion') + ->willReturn([]) + ; + + $application = new Application(); + $application->add(new RegisterChangedSchemasCommand($schemaRegistryApi)); + $command = $application->find('kafka-schema-registry:register:changed'); + $commandTester = new CommandTester($command); + + $commandTester->execute([ + 'schemaDirectory' => self::SCHEMA_DIRECTORY, + '--useSchemaVersioning' => true + ]); + + $commandOutput = trim($commandTester->getDisplay()); + + self::assertMatchesRegularExpression('/^Successfully registered new version of schema /', $commandOutput); + self::assertStringContainsString('with new versions, the latest being', $commandOutput); + self::assertEquals(0, $commandTester->getStatusCode()); + } } From f7bfb7dfe9a70e7be0d40a4877f670d8d3e5f3cf Mon Sep 17 00:00:00 2001 From: Marko Date: Mon, 11 Apr 2022 14:11:57 +0200 Subject: [PATCH 2/2] Bump orbs --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 59076ca..9010856 100755 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,8 +1,8 @@ version: 2.1 orbs: - ci-caching: jobcloud/ci-caching@3.0 - ci-php: jobcloud/ci-php@2.1 + ci-caching: jobcloud/ci-caching@3.1 + ci-php: jobcloud/ci-php@2.3 workflows: test-console-kafka-schema-registry: