Skip to content

Commit 09eee2e

Browse files
authored
feat(MPM-453): Add option to use versions when registering schemas (#35)
* Add option to use versions when registering schemas * Bump orbs
1 parent b757de5 commit 09eee2e

File tree

3 files changed

+65
-9
lines changed

3 files changed

+65
-9
lines changed

.circleci/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
version: 2.1
22

33
orbs:
4-
ci-caching: jobcloud/ci-caching@3.0
5-
ci-php: jobcloud/ci-php@2.1
4+
ci-caching: jobcloud/ci-caching@3.1
5+
ci-php: jobcloud/ci-php@2.3
66

77
workflows:
88
test-console-kafka-schema-registry:

src/Command/RegisterChangedSchemasCommand.php

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Jobcloud\SchemaConsole\Helper\SchemaFileHelper;
1010
use Symfony\Component\Console\Input\InputArgument;
1111
use Symfony\Component\Console\Input\InputInterface;
12+
use Symfony\Component\Console\Input\InputOption;
1213
use Symfony\Component\Console\Output\OutputInterface;
1314
use Symfony\Component\Console\Style\SymfonyStyle;
1415

@@ -43,7 +44,13 @@ protected function configure(): void
4344
->setName('kafka-schema-registry:register:changed')
4445
->setDescription('Register all changed schemas from a path')
4546
->setHelp('Register all changed schemas from a path')
46-
->addArgument('schemaDirectory', InputArgument::REQUIRED, 'Path to avro schema directory');
47+
->addArgument('schemaDirectory', InputArgument::REQUIRED, 'Path to avro schema directory')
48+
->addOption(
49+
'useSchemaVersioning',
50+
null,
51+
InputOption::VALUE_NONE,
52+
'Register schemas with multiple versions (e.g. ch.jobcloud.namespace.schema.1.avsc)'
53+
);
4754
}
4855

4956
/**
@@ -64,8 +71,16 @@ public function execute(InputInterface $input, OutputInterface $output): int
6471
$failed = [];
6572
$succeeded = [];
6673

74+
$useSchemaVersioning = (bool) $input->getOption('useSchemaVersioning');
75+
76+
$successMessage = '%s with new version: %s';
77+
if ($useSchemaVersioning) {
78+
natsort($avroFiles);
79+
$successMessage = '%s with new versions, the latest being: %s';
80+
}
81+
6782
while (false === $this->abortRegister) {
68-
if (false === $this->registerFiles($avroFiles, $io, $failed, $succeeded)) {
83+
if (false === $this->registerFiles($avroFiles, $io, $failed, $succeeded, $useSchemaVersioning)) {
6984
return 1;
7085
}
7186

@@ -79,8 +94,8 @@ public function execute(InputInterface $input, OutputInterface $output): int
7994

8095
if (isset($succeeded) && 0 !== count($succeeded)) {
8196
$io->success('Succeeded registering the following schemas:');
82-
$io->listing(array_map(static function ($item) {
83-
return sprintf('%s with new version: %s', $item['name'], $item['version']);
97+
$io->listing(array_map(static function ($item) use ($successMessage) {
98+
return sprintf($successMessage, $item['name'], $item['version']);
8499
}, $succeeded));
85100
}
86101

@@ -92,13 +107,15 @@ public function execute(InputInterface $input, OutputInterface $output): int
92107
* @param SymfonyStyle $io
93108
* @param array<string, mixed> $failed
94109
* @param array<string, mixed> $succeeded
110+
* @param bool $useSchemaVersioning
95111
* @return boolean
96112
*/
97113
private function registerFiles(
98114
array $avroFiles,
99115
SymfonyStyle $io,
100116
array &$failed = [],
101-
array &$succeeded = []
117+
array &$succeeded = [],
118+
bool $useSchemaVersioning = false
102119
): bool {
103120
foreach ($avroFiles as $schemaName => $avroFile) {
104121
/** @var string $fileContents */
@@ -110,6 +127,11 @@ private function registerFiles(
110127
/** @var string $localSchema */
111128
$localSchema = json_encode($jsonDecoded);
112129

130+
if ($useSchemaVersioning) {
131+
/** @var string $schemaName */
132+
$schemaName = preg_replace('/[.0-9]*$/', '', $schemaName);
133+
}
134+
113135
try {
114136
$latestVersion = $this->schemaRegistryApi->getLatestSubjectVersion($schemaName);
115137
} catch (SubjectNotFoundException $e) {

tests/Command/RegisterChangedSchemasCommandTest.php

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ public function testOutputWhenCommandRegisterWithSuccess():void
121121

122122
public function testOutputWhenCommandSuccessWithSkipping():void
123123
{
124-
125124
$this->generateFiles(5);
126125

127126
/** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */
@@ -154,7 +153,6 @@ public function testOutputWhenCommandSuccessWithSkipping():void
154153

155154
public function testOutputWhenCommandSuccessWithAllNew():void
156155
{
157-
158156
$this->generateFiles(5);
159157

160158
/** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */
@@ -247,4 +245,40 @@ public function testOutputTotalFailDueToIncompatibility():void
247245

248246
self::assertEquals(1, $commandTester->getStatusCode());
249247
}
248+
249+
public function testOutputWhenCommandRegisterWithSuccessAndVersioningOption():void
250+
{
251+
$numFiles = 5;
252+
$this->generateFiles($numFiles);
253+
254+
/** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */
255+
$schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class, [
256+
'checkSchemaCompatibilityForVersion' => TRUE,
257+
'getSchemaDefinitionByVersion',
258+
'getVersionForSchema',
259+
'registerNewSchemaVersion',
260+
'getLatestSubjectVersion' => '1',
261+
]);
262+
263+
$schemaRegistryApi
264+
->method('getSchemaDefinitionByVersion')
265+
->willReturn([])
266+
;
267+
268+
$application = new Application();
269+
$application->add(new RegisterChangedSchemasCommand($schemaRegistryApi));
270+
$command = $application->find('kafka-schema-registry:register:changed');
271+
$commandTester = new CommandTester($command);
272+
273+
$commandTester->execute([
274+
'schemaDirectory' => self::SCHEMA_DIRECTORY,
275+
'--useSchemaVersioning' => true
276+
]);
277+
278+
$commandOutput = trim($commandTester->getDisplay());
279+
280+
self::assertMatchesRegularExpression('/^Successfully registered new version of schema /', $commandOutput);
281+
self::assertStringContainsString('with new versions, the latest being', $commandOutput);
282+
self::assertEquals(0, $commandTester->getStatusCode());
283+
}
250284
}

0 commit comments

Comments
 (0)