-
Notifications
You must be signed in to change notification settings - Fork 2
feat(CDA-1565): Add command for changing compatibility level #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
bajdzun
merged 9 commits into
main
from
feat/CDA-1565/add-command-for-changing-compatibility-level
Oct 9, 2025
Merged
Changes from 8 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
64cf06e
feat(CDA-1565): Add command for changing compatibility level
bajdzun f0d2272
Register command
bajdzun d67aed3
Add command to change compatibility for all schemas from config
bajdzun 53462f3
Update command
bajdzun 9cec16c
Update command
bajdzun 3073e5b
Fix pipeline
bajdzun 44001e0
Adjustments
bajdzun 51d5f04
Improvements
bajdzun ed3ba39
Fix condition
bajdzun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,3 @@ | ||
| version: '3.2' | ||
| services: | ||
| php: | ||
| build: | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,3 @@ | ||
| version: '3.2' | ||
| services: | ||
| php: | ||
| build: | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace Jobcloud\SchemaConsole\Command; | ||
|
|
||
| use Symfony\Component\Console\Command\Command; | ||
| use Symfony\Component\Console\Input\InputArgument; | ||
| use Symfony\Component\Console\Input\InputInterface; | ||
| use Symfony\Component\Console\Output\OutputInterface; | ||
|
|
||
| class SetAllSchemasCompatibilityModeCommand extends AbstractSchemaCommand | ||
| { | ||
| protected function configure(): void | ||
| { | ||
| $this | ||
| ->setName('kafka-schema-registry:set:compatibility:mode:all') | ||
| ->setDescription('Set compatibility modes for multiple schemas from a JSON configuration file') | ||
| ->setHelp($this->getHelpText()) | ||
| ->addArgument( | ||
| 'configFile', | ||
| InputArgument::REQUIRED, | ||
| 'Path to JSON configuration file containing schema-compatibility mappings' | ||
| ); | ||
| } | ||
|
|
||
| private function getHelpText(): string | ||
| { | ||
| return <<<'HELP' | ||
| Set compatibility modes for multiple schemas based on configuration in a JSON file. | ||
|
|
||
| JSON File Format: | ||
| The configuration file must be a valid JSON array with the following structure: | ||
|
|
||
| [ | ||
| { | ||
| "schemaName": "schema-subject-name", | ||
| "compatibilityLevel": "COMPATIBILITY_LEVEL" | ||
| } | ||
| ] | ||
|
|
||
| Required Fields: | ||
| - schemaName: The subject name of the schema in the registry | ||
| - compatibilityLevel: One of the following compatibility levels: | ||
| * NONE | ||
| * BACKWARD | ||
| * BACKWARD_TRANSITIVE | ||
| * FORWARD | ||
| * FORWARD_TRANSITIVE | ||
| * FULL | ||
| * FULL_TRANSITIVE | ||
|
|
||
| Example: | ||
| [ | ||
| { | ||
| "schemaName": "user-events", | ||
| "compatibilityLevel": "BACKWARD_TRANSITIVE" | ||
| }, | ||
| { | ||
| "schemaName": "order-events", | ||
| "compatibilityLevel": "FORWARD" | ||
| }, | ||
| { | ||
| "schemaName": "payment-events", | ||
| "compatibilityLevel": "FULL" | ||
| } | ||
| ] | ||
|
|
||
| The command will process each schema in the order specified and provide feedback | ||
| for each operation. If any schema update fails, the command will continue processing | ||
| the remaining schemas and return a non-zero exit code at the end. | ||
| HELP; | ||
| } | ||
|
|
||
| public function execute(InputInterface $input, OutputInterface $output): int | ||
| { | ||
| $configFilePath = (string) $input->getArgument('configFile'); | ||
|
|
||
| $config = $this->loadConfigFile($configFilePath, $output); | ||
| if (null === $config) { | ||
| return Command::FAILURE; | ||
| } | ||
|
|
||
| $totalSchemas = count($config); | ||
| $successCount = 0; | ||
| $failureCount = 0; | ||
|
|
||
| $output->writeln(sprintf('Processing %d schema compatibility configurations...', $totalSchemas)); | ||
|
|
||
| foreach ($config as $index => $schemaConfig) { | ||
| if (false === $this->isValidSchemaConfig($schemaConfig)) { | ||
| $output->writeln( | ||
| sprintf('Invalid configuration at index %d: missing schemaName or compatibilityLevel', $index) | ||
| ); | ||
| $failureCount++; | ||
|
|
||
| continue; | ||
| } | ||
|
|
||
| $schemaName = $schemaConfig['schemaName']; | ||
| $compatibilityLevel = $schemaConfig['compatibilityLevel']; | ||
|
|
||
| $output->write( | ||
| sprintf('Setting compatibility mode for schema "%s" to "%s"... ', $schemaName, $compatibilityLevel) | ||
| ); | ||
|
|
||
| if ($this->setSchemaCompatibility($schemaName, $compatibilityLevel, $output)) { | ||
| $successCount++; | ||
|
|
||
| continue; | ||
| } | ||
|
|
||
| $failureCount++; | ||
| } | ||
|
|
||
| $this->outputSummary($output, $totalSchemas, $successCount, $failureCount); | ||
|
|
||
| return $failureCount > Command::SUCCESS ? Command::FAILURE : Command::SUCCESS; | ||
| } | ||
|
|
||
| /** | ||
| * @return array<int, array<string, string>>|null | ||
| */ | ||
| private function loadConfigFile(string $configFilePath, OutputInterface $output): ?array | ||
| { | ||
| $jsonContent = @file_get_contents($configFilePath); | ||
| if (false === $jsonContent) { | ||
| $output->writeln(sprintf('Could not read configuration file: %s', $configFilePath)); | ||
|
|
||
| return null; | ||
| } | ||
|
|
||
| $config = json_decode($jsonContent, true); | ||
| if (null === $config || false === is_array($config) || false === array_is_list($config)) { | ||
| $output->writeln('Configuration file must contain a JSON array of schema configurations'); | ||
|
|
||
| return null; | ||
| } | ||
|
|
||
| return $config; | ||
| } | ||
|
|
||
| private function isValidSchemaConfig(mixed $schemaConfig): bool | ||
| { | ||
| return is_array($schemaConfig) | ||
| && isset($schemaConfig['schemaName']) | ||
| && isset($schemaConfig['compatibilityLevel']); | ||
| } | ||
|
|
||
| private function setSchemaCompatibility( | ||
| string $schemaName, | ||
| string $compatibilityLevel, | ||
| OutputInterface $output | ||
| ): bool { | ||
| try { | ||
| $this->schemaRegistryApi->setSubjectCompatibilityLevel($schemaName, $compatibilityLevel); | ||
| } catch (\Exception $e) { | ||
| $output->writeln(sprintf('<error>FAILED: %s</error>', $e->getMessage())); | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| $output->writeln('<info>SUCCESS</info>'); | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| private function outputSummary(OutputInterface $output, int $total, int $success, int $failure): void | ||
| { | ||
| $output->writeln(''); | ||
| $output->writeln('=== Summary ==='); | ||
| $output->writeln(sprintf('Total schemas processed: %d', $total)); | ||
| $output->writeln(sprintf('Successful updates: %d', $success)); | ||
| $output->writeln(sprintf('Failed updates: %d', $failure)); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace Jobcloud\SchemaConsole\Command; | ||
|
|
||
| use Symfony\Component\Console\Command\Command; | ||
| use Symfony\Component\Console\Input\InputArgument; | ||
| use Symfony\Component\Console\Input\InputInterface; | ||
| use Symfony\Component\Console\Output\OutputInterface; | ||
|
|
||
| class SetCompatibilityModeForSchemaCommand extends AbstractSchemaCommand | ||
| { | ||
| protected function configure(): void | ||
| { | ||
| $this | ||
| ->setName('kafka-schema-registry:set:schema:compatibility:mode') | ||
| ->setDescription('Set the compatibility mode for a given schema') | ||
| ->setHelp('Set the compatibility mode for a given schema') | ||
| ->addArgument('schemaName', InputArgument::REQUIRED, 'Name of the schema') | ||
| ->addArgument('compatibilityLevel', InputArgument::REQUIRED, 'Compatibility level to set'); | ||
| } | ||
|
|
||
| public function execute(InputInterface $input, OutputInterface $output): int | ||
| { | ||
| $schemaName = (string) $input->getArgument('schemaName'); | ||
| $compatibilityLevel = (string) $input->getArgument('compatibilityLevel'); | ||
|
|
||
| try { | ||
| $this->schemaRegistryApi->setSubjectCompatibilityLevel($schemaName, $compatibilityLevel); | ||
| } catch (\Exception $e) { | ||
| $output->writeln( | ||
| sprintf('Could not change compatibility mode for schema %s: %s', $schemaName, $e->getMessage()) | ||
| ); | ||
|
|
||
| return Command::FAILURE; | ||
| } | ||
|
|
||
| $output->writeln(sprintf('Successfully changed compatibility mode for schema: %s', $schemaName)); | ||
|
|
||
| return Command::SUCCESS; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.