diff --git a/src/KafkaSchemaRegistryApiClient.php b/src/KafkaSchemaRegistryApiClient.php index 7e5d836..6744d0a 100644 --- a/src/KafkaSchemaRegistryApiClient.php +++ b/src/KafkaSchemaRegistryApiClient.php @@ -3,6 +3,7 @@ namespace Jobcloud\Kafka\SchemaRegistryClient; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaRegistryExceptionInterface; +use Jobcloud\Kafka\SchemaRegistryClient\Exception\VersionNotFoundException; use Psr\Http\Client\ClientExceptionInterface; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaNotFoundException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SubjectNotFoundException; @@ -111,7 +112,7 @@ public function registerNewSchemaVersion(string $subjectName, string $schema): a /** * @throws ClientExceptionInterface * @throws SchemaRegistryExceptionInterface - * @throws JsonException + * @throws JsonException|VersionNotFoundException */ public function checkSchemaCompatibilityForVersion( string $subjectName, @@ -126,7 +127,11 @@ public function checkSchemaCompatibilityForVersion( sprintf('compatibility/subjects/%s/versions/%s', $subjectName, $version), $this->createRequestBodyFromSchema($schema) ); - } catch (SubjectNotFoundException $e) { + } catch (SubjectNotFoundException|VersionNotFoundException $e) { + if ($e instanceof VersionNotFoundException && self::VERSION_LATEST !== $version) { + throw $e; + } + return true; } diff --git a/tests/KafkaSchemaRegistryApiClientTest.php b/tests/KafkaSchemaRegistryApiClientTest.php index 61efce3..e257b1a 100644 --- a/tests/KafkaSchemaRegistryApiClientTest.php +++ b/tests/KafkaSchemaRegistryApiClientTest.php @@ -5,6 +5,7 @@ use Jobcloud\Kafka\SchemaRegistryClient\Exception\ImportException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaNotFoundException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SubjectNotFoundException; +use Jobcloud\Kafka\SchemaRegistryClient\Exception\VersionNotFoundException; use Jobcloud\Kafka\SchemaRegistryClient\HttpClient; use Jobcloud\Kafka\SchemaRegistryClient\HttpClientInterface; use Jobcloud\Kafka\SchemaRegistryClient\KafkaSchemaRegistryApiClient; @@ -208,7 +209,7 @@ public function testCheckSchemaCompatibilityForVersionFalse(): void self::assertFalse($result); } - public function testCheckSchemaCompatibilityForVersionNotFound(): void + public function testCheckSchemaCompatibilityForSubjectNotFound(): void { $httpClientMock = $this->getHttpClientMock(); @@ -231,6 +232,56 @@ public function testCheckSchemaCompatibilityForVersionNotFound(): void self::assertTrue($result); } + public function testCheckSchemaCompatibilityForVersionNotFound(): void + { + self::expectException(VersionNotFoundException::class); + $httpClientMock = $this->getHttpClientMock(); + + $httpClientMock + ->expects(self::once()) + ->method('call') + ->with( + 'POST', + sprintf('compatibility/subjects/%s/versions/%s', self::TEST_SUBJECT_NAME, self::TEST_VERSION), + ['schema' => '[]'] + ) + ->willThrowException(new VersionNotFoundException()); + + $api = new KafkaSchemaRegistryApiClient($httpClientMock); + $result = $api->checkSchemaCompatibilityForVersion( + self::TEST_SUBJECT_NAME, + self::TEST_SCHEMA, + self::TEST_VERSION + ); + } + + public function testCheckSchemaCompatibilityForVersionNotFoundLatest(): void + { + $httpClientMock = $this->getHttpClientMock(); + + $httpClientMock + ->expects(self::once()) + ->method('call') + ->with( + 'POST', + sprintf( + 'compatibility/subjects/%s/versions/%s', + self::TEST_SUBJECT_NAME, + KafkaSchemaRegistryApiClient::VERSION_LATEST + ), + ['schema' => '[]'] + ) + ->willThrowException(new VersionNotFoundException()); + + $api = new KafkaSchemaRegistryApiClient($httpClientMock); + $result = $api->checkSchemaCompatibilityForVersion( + self::TEST_SUBJECT_NAME, + self::TEST_SCHEMA, + KafkaSchemaRegistryApiClient::VERSION_LATEST + ); + self::assertTrue($result); + } + public function testGetSubjectCompatibilityLevel(): void { $httpClientMock = $this->getHttpClientMock();