diff --git a/README.md b/README.md index 5b284fa..953110f 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,18 @@ # Kafka Schema Registry API Client -## What is it? +## Description An API Client written in PHP to communicate with Kafka Schema Registry. -## What it can do? +## Installation +```bash +composer require jobcloud/php-kafka-schema-registry-client +``` + +## Requirements +- php: >= 7.4 + +## Supported API calls Currently it supports: * Get list of subjects @@ -24,13 +32,7 @@ Currently it supports: * Get Subject's latest schema version * Setting the registry mode -## How to use it? -### Installation -```bash -composer require jobcloud/php-kafka-schema-registry-client -``` - -### Code example +## Code example ```php getSubjects(); ``` -## External links? -If you want to be so kind to extend this library, make a pull request, -and whatever functionality you want to implement, this is a API reference to follow: +## Contributing +This is an open source project that welcomes pull requests and issues from anyone. +This is the API reference to follow for any new functionality: https://docs.confluent.io/current/schema-registry/develop/api.html diff --git a/composer.json b/composer.json index 1ee9277..b3fc3cd 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,7 @@ "sort-packages": true }, "require": { - "php": ">7.3", + "php": ">=7.4", "ext-json": "*", "pimple/pimple": "^3.2", "symfony/cache": "^5.0", @@ -29,9 +29,9 @@ }, "require-dev": { "squizlabs/php_codesniffer": "^3.4.2", - "phpunit/phpunit": "^8.3.4", - "phpstan/phpstan": "^0.11.8", + "phpunit/phpunit": "^9.5", + "phpstan/phpstan": "^0.12", "rregeer/phpunit-coverage-check": "^0.3.1", - "infection/infection": "^0.14.2" + "infection/infection": "^0.21" } } diff --git a/docker/.env b/docker/.env new file mode 100644 index 0000000..850bf32 --- /dev/null +++ b/docker/.env @@ -0,0 +1 @@ +COMPOSE_PROJECT_NAME=php-kafka-schema-registry-client \ No newline at end of file diff --git a/docker/dev/php/Dockerfile b/docker/dev/php/Dockerfile index b4548f0..ea447d0 100644 --- a/docker/dev/php/Dockerfile +++ b/docker/dev/php/Dockerfile @@ -1,4 +1,4 @@ -FROM php:7.3-cli-alpine3.10 +FROM php:7.4-cli-alpine3.10 ARG HOST_USER_ID diff --git a/phpstan.neon b/phpstan.neon index fff6e34..20b7cf9 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,4 +1,4 @@ parameters: paths: - src - level: 7 + level: 8 diff --git a/phpunit.xml b/phpunit.xml index 7a254aa..478d6bf 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,46 +1,30 @@ - - - - - - - - - - - - - - - - - - - ./tests - - - - - - src - - - - - - - - - - + + + + src + + + + + + + + + + + + + + + + + + + ./tests + + + + + diff --git a/src/ErrorHandler.php b/src/ErrorHandler.php index e921b93..97e5332 100644 --- a/src/ErrorHandler.php +++ b/src/ErrorHandler.php @@ -2,9 +2,9 @@ namespace Jobcloud\Kafka\SchemaRegistryClient; +use Buzz\Exception\ClientException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\BackendDatastoreException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\ImportException; -use Jobcloud\Kafka\SchemaRegistryClient\Exception\ClientException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\CompatibilityException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\IncompatibileAvroSchemaException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\InvalidAvroSchemaException; @@ -13,12 +13,15 @@ use Jobcloud\Kafka\SchemaRegistryClient\Exception\PathNotFoundException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\RequestForwardException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaNotFoundException; +use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaRegistryExceptionInterface; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SubjectNotFoundException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\UnauthorizedException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\UnprocessableEntityException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\VersionNotFoundException; +use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; +use JsonException; class ErrorHandler implements ErrorHandlerInterface { @@ -42,6 +45,9 @@ class ErrorHandler implements ErrorHandlerInterface * @throws UnprocessableEntityException * @throws VersionNotFoundException * @throws ImportException + * @throws JsonException + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface */ public function handleError(ResponseInterface $response, string $uri = null, RequestInterface $request = null): void { diff --git a/src/ErrorHandlerInterface.php b/src/ErrorHandlerInterface.php index 2ec4e66..aca8580 100644 --- a/src/ErrorHandlerInterface.php +++ b/src/ErrorHandlerInterface.php @@ -2,14 +2,20 @@ namespace Jobcloud\Kafka\SchemaRegistryClient; +use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaRegistryExceptionInterface; +use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; +use JsonException; interface ErrorHandlerInterface { /** * @param ResponseInterface $response * @param string|null $uri + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function handleError( ResponseInterface $response, diff --git a/src/Exception/AbstractKafkaSchemaRegistryException.php b/src/Exception/AbstractKafkaSchemaRegistryException.php new file mode 100644 index 0000000..58ba84b --- /dev/null +++ b/src/Exception/AbstractKafkaSchemaRegistryException.php @@ -0,0 +1,10 @@ + $body + * @param array $queryParams * @return RequestInterface + * @throws JsonException */ private function createRequest( string $method, @@ -86,9 +89,9 @@ private function createRequest( if ([] !== $body) { $jsonData = json_encode($body, JSON_THROW_ON_ERROR); - $dataLength = (string) strlen($jsonData); + $dataLength = strlen($jsonData); - $request = $request->withAddedHeader('Content-Length', $dataLength); + $request = $request->withAddedHeader('Content-Length', (string) $dataLength); $request->getBody()->write($jsonData); } @@ -107,10 +110,12 @@ private function createRequest( /** * @param string $method * @param string $uri - * @param array $body - * @param array $queryParams + * @param array $body + * @param array $queryParams * @return mixed * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function call(string $method, string $uri, array $body = [], array $queryParams = []) { diff --git a/src/HttpClientInterface.php b/src/HttpClientInterface.php index 581529a..8aa0db0 100644 --- a/src/HttpClientInterface.php +++ b/src/HttpClientInterface.php @@ -2,14 +2,21 @@ namespace Jobcloud\Kafka\SchemaRegistryClient; +use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaRegistryExceptionInterface; +use Psr\Http\Client\ClientExceptionInterface; +use JsonException; + interface HttpClientInterface { /** * @param string $method * @param string $uri - * @param array $body - * @param array $queryParams + * @param array $body + * @param array $queryParams * @return mixed + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function call(string $method, string $uri, array $body = [], array $queryParams = []); } diff --git a/src/KafkaSchemaRegistryApiClient.php b/src/KafkaSchemaRegistryApiClient.php index 6a76477..10267e6 100644 --- a/src/KafkaSchemaRegistryApiClient.php +++ b/src/KafkaSchemaRegistryApiClient.php @@ -2,10 +2,11 @@ namespace Jobcloud\Kafka\SchemaRegistryClient; -use Buzz\Exception\RequestException; -use Exception; +use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaRegistryExceptionInterface; +use Psr\Http\Client\ClientExceptionInterface; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaNotFoundException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\SubjectNotFoundException; +use JsonException; class KafkaSchemaRegistryApiClient implements KafkaSchemaRegistryApiClientInterface { @@ -24,7 +25,10 @@ public function __construct(HttpClientInterface $httpClient) } /** - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSubjects(): array { @@ -33,7 +37,10 @@ public function getSubjects(): array /** * @param string $subjectName - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getAllSubjectVersions(string $subjectName): array { @@ -43,7 +50,10 @@ public function getAllSubjectVersions(string $subjectName): array /** * @param string $subjectName * @param string $version - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSchemaByVersion(string $subjectName, string $version = self::VERSION_LATEST): array { @@ -55,7 +65,10 @@ public function getSchemaByVersion(string $subjectName, string $version = self:: /** * @param string $subjectName * @param string $version - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSchemaDefinitionByVersion(string $subjectName, string $version = self::VERSION_LATEST): array { @@ -71,6 +84,9 @@ public function getSchemaDefinitionByVersion(string $subjectName, string $versio * @param string $subjectName * @param string $version * @return int|null + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function deleteSchemaVersion(string $subjectName, string $version = self::VERSION_LATEST): ?int { @@ -80,6 +96,9 @@ public function deleteSchemaVersion(string $subjectName, string $version = self: /** * @param int $id * @return string + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSchemaById(int $id): string { @@ -89,7 +108,10 @@ public function getSchemaById(int $id): string /** * @param string $schema * @param string $subjectName - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function registerNewSchemaVersion(string $subjectName, string $schema): array { @@ -107,7 +129,9 @@ public function registerNewSchemaVersion(string $subjectName, string $schema): a * @param string $subjectName * @param string $version * @return bool - * @throws Exception + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function checkSchemaCompatibilityForVersion( string $subjectName, @@ -132,7 +156,9 @@ public function checkSchemaCompatibilityForVersion( /** * @param string $subjectName * @return string|null - * @throws Exception + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSubjectCompatibilityLevel(string $subjectName): ?string { @@ -148,6 +174,9 @@ public function getSubjectCompatibilityLevel(string $subjectName): ?string * @param string $subjectName * @param string $level * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function setSubjectCompatibilityLevel(string $subjectName, string $level = self::LEVEL_FULL): bool { @@ -157,6 +186,9 @@ public function setSubjectCompatibilityLevel(string $subjectName, string $level /** * @return string + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getDefaultCompatibilityLevel(): string { @@ -167,6 +199,9 @@ public function getDefaultCompatibilityLevel(): string /** * @param string $level * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function setDefaultCompatibilityLevel(string $level = self::LEVEL_FULL): bool { @@ -178,6 +213,9 @@ public function setDefaultCompatibilityLevel(string $level = self::LEVEL_FULL): * @param string $subjectName * @param string $schema * @return string|null + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getVersionForSchema(string $subjectName, string $schema): ?string { @@ -190,7 +228,7 @@ public function getVersionForSchema(string $subjectName, string $schema): ?strin $this->createRequestBodyFromSchema($schema) ) ?? []; - return (string) $results['version']; + return $results['version']; } catch (SubjectNotFoundException $e) { return null; } catch (SchemaNotFoundException $e) { @@ -202,6 +240,9 @@ public function getVersionForSchema(string $subjectName, string $schema): ?strin * @param string $subjectName * @param string $schema * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function isSchemaAlreadyRegistered(string $subjectName, string $schema): bool { @@ -210,7 +251,10 @@ public function isSchemaAlreadyRegistered(string $subjectName, string $schema): /** * @param string $subjectName - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function deleteSubject(string $subjectName): array { @@ -220,7 +264,9 @@ public function deleteSubject(string $subjectName): array /** * @param string $subjectName * @return string|null - * @throws RequestException + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getLatestSubjectVersion(string $subjectName): ?string { @@ -232,6 +278,9 @@ public function getLatestSubjectVersion(string $subjectName): ?string /** * @param string $mode * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function setImportMode(string $mode): bool { @@ -241,10 +290,11 @@ public function setImportMode(string $mode): bool /** * @param string $schema - * @return array + * @return array + * @throws JsonException */ private function createRequestBodyFromSchema(string $schema): array { - return ['schema' => json_encode(json_decode($schema, true))]; + return ['schema' => json_encode(json_decode($schema, true, 512, JSON_THROW_ON_ERROR), JSON_THROW_ON_ERROR)]; } } diff --git a/src/KafkaSchemaRegistryApiClientInterface.php b/src/KafkaSchemaRegistryApiClientInterface.php index 1eeb7cf..248be99 100644 --- a/src/KafkaSchemaRegistryApiClientInterface.php +++ b/src/KafkaSchemaRegistryApiClientInterface.php @@ -2,6 +2,10 @@ namespace Jobcloud\Kafka\SchemaRegistryClient; +use Jobcloud\Kafka\SchemaRegistryClient\Exception\SchemaRegistryExceptionInterface; +use Psr\Http\Client\ClientExceptionInterface; +use JsonException; + interface KafkaSchemaRegistryApiClientInterface { public const VERSION_LATEST = 'latest'; @@ -19,27 +23,39 @@ interface KafkaSchemaRegistryApiClientInterface public const MODE_READWRITE = 'READWRITE'; /** - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSubjects(): array; /** * @param string $subjectName - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getAllSubjectVersions(string $subjectName): array; /** * @param string $subjectName * @param string $version - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSchemaByVersion(string $subjectName, string $version = 'latest'): array; /** * @param string $subjectName * @param string $version - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSchemaDefinitionByVersion(string $subjectName, string $version = self::VERSION_LATEST): array; @@ -47,19 +63,28 @@ public function getSchemaDefinitionByVersion(string $subjectName, string $versio * @param string $subjectName * @param string $version * @return int|null + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function deleteSchemaVersion(string $subjectName, string $version = self::VERSION_LATEST): ?int; /** * @param int $id * @return string + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSchemaById(int $id): string; /** * @param string $subjectName * @param string $schema - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function registerNewSchemaVersion(string $subjectName, string $schema): array; @@ -68,6 +93,9 @@ public function registerNewSchemaVersion(string $subjectName, string $schema): a * @param string $schema * @param string $version * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function checkSchemaCompatibilityForVersion( string $subjectName, @@ -78,6 +106,9 @@ public function checkSchemaCompatibilityForVersion( /** * @param string $subjectName * @return string + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getSubjectCompatibilityLevel(string $subjectName): ?string; @@ -85,17 +116,26 @@ public function getSubjectCompatibilityLevel(string $subjectName): ?string; * @param string $subjectName * @param string $level * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function setSubjectCompatibilityLevel(string $subjectName, string $level = self::LEVEL_FULL): bool; /** * @return string + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getDefaultCompatibilityLevel(): string; /** * @param string $level * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function setDefaultCompatibilityLevel(string $level = self::LEVEL_FULL): bool; @@ -103,6 +143,9 @@ public function setDefaultCompatibilityLevel(string $level = self::LEVEL_FULL): * @param string $subjectName * @param string $schema * @return string|null + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function getVersionForSchema(string $subjectName, string $schema): ?string; @@ -110,24 +153,34 @@ public function getVersionForSchema(string $subjectName, string $schema): ?strin * @param string $subjectName * @param string $schema * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function isSchemaAlreadyRegistered(string $subjectName, string $schema): bool; /** * @param string $subjectName - * @return array + * @return array + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function deleteSubject(string $subjectName): array; /** * @param string $mode * @return bool + * @throws ClientExceptionInterface + * @throws SchemaRegistryExceptionInterface + * @throws JsonException */ public function setImportMode(string $mode): bool; /** * @param string $subjectName * @return string|null + * @throws JsonException */ public function getLatestSubjectVersion(string $subjectName): ?string; } diff --git a/src/ServiceProvider/KafkaSchemaRegistryApiClientProvider.php b/src/ServiceProvider/KafkaSchemaRegistryApiClientProvider.php index 7013f03..26281ca 100644 --- a/src/ServiceProvider/KafkaSchemaRegistryApiClientProvider.php +++ b/src/ServiceProvider/KafkaSchemaRegistryApiClientProvider.php @@ -31,7 +31,7 @@ class KafkaSchemaRegistryApiClientProvider implements ServiceProviderInterface /** * @param Container $container */ - public function register(Container $container) + public function register(Container $container): void { $this->checkRequiredOffsets($container); @@ -58,7 +58,7 @@ public function register(Container $container) /** @var ClientInterface $client */ $client = $container[self::CLIENT]; - /** @var RequestFactoryInterface $psr17factory */ + /** @var RequestFactoryInterface $requestFactory */ $requestFactory = $container[self::REQUEST_FACTORY]; return new HttpClient( @@ -66,8 +66,8 @@ public function register(Container $container) $requestFactory, $container[self::ERROR_HANDLER], $container[self::CONTAINER_KEY][self::SETTING_KEY_BASE_URL], - $container[self::CONTAINER_KEY][self::SETTING_KEY_USERNAME] ?? null, - $container[self::CONTAINER_KEY][self::SETTING_KEY_PASSWORD] ?? null + $container[self::CONTAINER_KEY][self::SETTING_KEY_USERNAME], + $container[self::CONTAINER_KEY][self::SETTING_KEY_PASSWORD] ); }; } @@ -82,7 +82,7 @@ public function register(Container $container) } } - private function checkRequiredOffsets(Container $container) + private function checkRequiredOffsets(Container $container): void { if (false === isset($container[self::CONTAINER_KEY][self::SETTING_KEY_BASE_URL])) { diff --git a/tests/ErrorHandlerTest.php b/tests/ErrorHandlerTest.php index 646a12a..2a59bc7 100644 --- a/tests/ErrorHandlerTest.php +++ b/tests/ErrorHandlerTest.php @@ -2,9 +2,9 @@ namespace Jobcloud\Kafka\SchemaRegistryClient\Tests; +use Buzz\Exception\ClientException; use Jobcloud\Kafka\SchemaRegistryClient\ErrorHandler; use Jobcloud\Kafka\SchemaRegistryClient\Exception\BackendDatastoreException; -use Jobcloud\Kafka\SchemaRegistryClient\Exception\ClientException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\CompatibilityException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\ImportException; use Jobcloud\Kafka\SchemaRegistryClient\Exception\IncompatibileAvroSchemaException; @@ -140,7 +140,7 @@ public function testExceptionThrowWithRequest(): void $this->expectException(BackendDatastoreException::class); $this->expectExceptionMessage(self::TEST_MESSAGE . sprintf(' (%s) with request body: %s', 'http://test.com', 'test body')); -echo 'asdfasf'; + $errorHandler->handleError($responseMock, 'http://test.com', $requestMock); } diff --git a/tests/KafkaSchemaRegistryApiClientProviderTest.php b/tests/KafkaSchemaRegistryApiClientProviderTest.php index 9263e2b..61f0c82 100644 --- a/tests/KafkaSchemaRegistryApiClientProviderTest.php +++ b/tests/KafkaSchemaRegistryApiClientProviderTest.php @@ -22,15 +22,15 @@ public function testDefaultContainersAndServicesSetWithMinimalConfig(): void { $container = new Container(); - $this->assertArrayNotHasKey('kafka.schema.registry', $container); - $this->assertArrayNotHasKey('username', $container['kafka.schema.registry'] ?? []); - $this->assertArrayNotHasKey('password', $container['kafka.schema.registry'] ?? []); - $this->assertArrayNotHasKey('base.url', $container['kafka.schema.registry'] ?? []); - $this->assertArrayNotHasKey('kafka.schema.registry.client', $container); - $this->assertArrayNotHasKey('kafka.schema.registry.request.factory', $container); - $this->assertArrayNotHasKey('kafka.schema.registry.client.http', $container); - $this->assertArrayNotHasKey('kafka.schema.registry.client.api', $container); - $this->assertArrayNotHasKey('kafka.schema.registry.error.handler', $container); + self::assertArrayNotHasKey('kafka.schema.registry', $container); + self::assertArrayNotHasKey('username', $container['kafka.schema.registry'] ?? []); + self::assertArrayNotHasKey('password', $container['kafka.schema.registry'] ?? []); + self::assertArrayNotHasKey('base.url', $container['kafka.schema.registry'] ?? []); + self::assertArrayNotHasKey('kafka.schema.registry.client', $container); + self::assertArrayNotHasKey('kafka.schema.registry.request.factory', $container); + self::assertArrayNotHasKey('kafka.schema.registry.client.http', $container); + self::assertArrayNotHasKey('kafka.schema.registry.client.api', $container); + self::assertArrayNotHasKey('kafka.schema.registry.error.handler', $container); $container['kafka.schema.registry'] = [ 'base.url' => 'http://some-url', @@ -40,23 +40,45 @@ public function testDefaultContainersAndServicesSetWithMinimalConfig(): void $container->register(new KafkaSchemaRegistryApiClientProvider()); - $this->assertArrayHasKey('kafka.schema.registry', $container); - $this->assertArrayHasKey('username', $container['kafka.schema.registry']); - $this->assertArrayHasKey('password', $container['kafka.schema.registry']); - $this->assertArrayHasKey('kafka.schema.registry.client', $container); - $this->assertArrayHasKey('kafka.schema.registry.request.factory', $container); - $this->assertArrayHasKey('kafka.schema.registry.client.http', $container); - $this->assertArrayHasKey('kafka.schema.registry.client.api', $container); - $this->assertArrayHasKey('kafka.schema.registry.error.handler', $container); - - $this->assertInstanceOf(RequestFactoryInterface::class, $container['kafka.schema.registry.request.factory']); - $this->assertInstanceOf(HttpClientInterface::class, $container['kafka.schema.registry.client.http']); - $this->assertInstanceOf(ErrorHandlerInterface::class, $container['kafka.schema.registry.error.handler']); - $this->assertInstanceOf( + self::assertArrayHasKey('kafka.schema.registry', $container); + self::assertArrayHasKey('username', $container['kafka.schema.registry']); + self::assertArrayHasKey('password', $container['kafka.schema.registry']); + self::assertArrayHasKey('kafka.schema.registry.client', $container); + self::assertArrayHasKey('kafka.schema.registry.request.factory', $container); + self::assertArrayHasKey('kafka.schema.registry.client.http', $container); + self::assertArrayHasKey('kafka.schema.registry.client.api', $container); + self::assertArrayHasKey('kafka.schema.registry.error.handler', $container); + + $client = $container['kafka.schema.registry.client.http']; + + self::assertInstanceOf(RequestFactoryInterface::class, $container['kafka.schema.registry.request.factory']); + self::assertInstanceOf(HttpClientInterface::class, $client); + self::assertInstanceOf(ErrorHandlerInterface::class, $container['kafka.schema.registry.error.handler']); + self::assertInstanceOf( KafkaSchemaRegistryApiClientInterface::class, $container['kafka.schema.registry.client.api'] ); + } + + public function testSuccessWithMissingAuth(): void + { + $container = new Container(); + + $container['kafka.schema.registry'] = [ + 'base.url' => 'http://some-url' + ]; + + $container->register(new KafkaSchemaRegistryApiClientProvider()); + $client = $container['kafka.schema.registry.client.http']; + + self::assertInstanceOf(HttpClientInterface::class, $client); + self::assertEquals( + $container['kafka.schema.registry']['password'], + self::getPropertyValue($client, 'password') + ); + self::assertNull(self::getPropertyValue($client, 'username')); + self::assertNull(self::getPropertyValue($client, 'password')); } public function testFailOnMissingBaseUrlInContainer(): void @@ -68,8 +90,8 @@ public function testFailOnMissingBaseUrlInContainer(): void 'password' => 'p1', ]; - $this->expectException(LogicException::class); - $this->expectExceptionMessage('Missing schema registry URL, please set it under "base.url" container offset'); + self::expectException(LogicException::class); + self::expectExceptionMessage('Missing schema registry URL, please set it under "base.url" container offset'); $container->register(new KafkaSchemaRegistryApiClientProvider()); } @@ -85,14 +107,14 @@ public function testUserNameAndPasswordFromSettingsArePassedToHttpClient(): void ]; $container->register(new KafkaSchemaRegistryApiClientProvider()); - $this->assertSame( + self::assertSame( $container['kafka.schema.registry']['username'], - $this->getPropertyValue($container['kafka.schema.registry.client.http'], 'username') + self::getPropertyValue($container['kafka.schema.registry.client.http'], 'username') ); - $this->assertSame( + self::assertSame( $container['kafka.schema.registry']['password'], - $this->getPropertyValue($container['kafka.schema.registry.client.http'], 'password') + self::getPropertyValue($container['kafka.schema.registry.client.http'], 'password') ); } diff --git a/tests/KafkaSchemaRegistryApiClientTest.php b/tests/KafkaSchemaRegistryApiClientTest.php index 2f09a39..58e0ad6 100644 --- a/tests/KafkaSchemaRegistryApiClientTest.php +++ b/tests/KafkaSchemaRegistryApiClientTest.php @@ -212,15 +212,18 @@ public function testGetDefaultCompatibiltyLeveWhenGetSubjectCompatibilityLevelTh $httpClientMock = $this->getHttpClientMock(); $httpClientMock - ->expects($this->at(0)) + ->expects(self::exactly(2)) ->method('call') - ->with('GET', sprintf('config/%s', self::TEST_SUBJECT_NAME)) - ->willThrowException(new SubjectNotFoundException()); - - $httpClientMock - ->expects($this->at(1)) - ->method('call') - ->willReturn(['compatibilityLevel' => KafkaSchemaRegistryApiClientInterface::LEVEL_FULL]); + ->withConsecutive( + ['GET', sprintf('config/%s', self::TEST_SUBJECT_NAME)], + [] + ) + ->will( + $this->onConsecutiveCalls( + $this->throwException(new SubjectNotFoundException()), + ['compatibilityLevel' => KafkaSchemaRegistryApiClientInterface::LEVEL_FULL] + ) + ); $api = new KafkaSchemaRegistryApiClient($httpClientMock); $result = $api->getSubjectCompatibilityLevel(self::TEST_SUBJECT_NAME);