diff --git a/src/Contracts/Http.php b/src/Contracts/Http.php index 7383eedb..ef9bbfb9 100644 --- a/src/Contracts/Http.php +++ b/src/Contracts/Http.php @@ -10,7 +10,7 @@ public function get(string $path, array $query = []); public function post(string $path, $body = null, array $query = [], string $contentType = null); - public function put(string $path, $body = null, array $query = []); + public function put(string $path, $body = null, array $query = [], string $contentType = null); public function patch(string $path, $body = null, array $query = []); diff --git a/src/Endpoints/Delegates/HandlesDocuments.php b/src/Endpoints/Delegates/HandlesDocuments.php index e0f8d4ce..8842b629 100644 --- a/src/Endpoints/Delegates/HandlesDocuments.php +++ b/src/Endpoints/Delegates/HandlesDocuments.php @@ -32,6 +32,21 @@ public function addDocuments(array $documents, ?string $primaryKey = null) return $this->http->post(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey]); } + public function addDocumentsJson(string $documents, ?string $primaryKey = null) + { + return $this->http->post(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'application/json'); + } + + public function addDocumentsCsv(string $documents, ?string $primaryKey = null) + { + return $this->http->post(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'text/csv'); + } + + public function addDocumentsNdjson(string $documents, ?string $primaryKey = null) + { + return $this->http->post(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'application/x-ndjson'); + } + public function addDocumentsInBatches(array $documents, ?int $batchSize = 1000, ?string $primaryKey = null) { $promises = []; @@ -42,19 +57,24 @@ public function addDocumentsInBatches(array $documents, ?int $batchSize = 1000, return $promises; } - public function addDocumentsJson(string $documents, ?string $primaryKey = null) + public function addDocumentsCsvInBatches(string $documents, ?int $batchSize = 1000, ?string $primaryKey = null) { - return $this->http->post(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'application/json'); - } + $promises = []; + foreach (self::batchCsvString($documents, $batchSize) as $batch) { + $promises[] = $this->addDocumentsCsv($batch, $primaryKey); + } - public function addDocumentsNdjson(string $documents, ?string $primaryKey = null) - { - return $this->http->post(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'application/x-ndjson'); + return $promises; } - public function addDocumentsCsv(string $documents, ?string $primaryKey = null) + public function addDocumentsNdjsonInBatches(string $documents, ?int $batchSize = 1000, ?string $primaryKey = null) { - return $this->http->post(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'text/csv'); + $promises = []; + foreach (self::batchNdjsonString($documents, $batchSize) as $batch) { + $promises[] = $this->addDocumentsNdjson($batch, $primaryKey); + } + + return $promises; } public function updateDocuments(array $documents, ?string $primaryKey = null) @@ -62,6 +82,21 @@ public function updateDocuments(array $documents, ?string $primaryKey = null) return $this->http->put(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey]); } + public function updateDocumentsJson(string $documents, ?string $primaryKey = null) + { + return $this->http->put(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'application/json'); + } + + public function updateDocumentsCsv(string $documents, ?string $primaryKey = null) + { + return $this->http->put(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'text/csv'); + } + + public function updateDocumentsNdjson(string $documents, ?string $primaryKey = null) + { + return $this->http->put(self::PATH.'/'.$this->uid.'/documents', $documents, ['primaryKey' => $primaryKey], 'application/x-ndjson'); + } + public function updateDocumentsInBatches(array $documents, ?int $batchSize = 1000, ?string $primaryKey = null) { $promises = []; @@ -72,6 +107,26 @@ public function updateDocumentsInBatches(array $documents, ?int $batchSize = 100 return $promises; } + public function updateDocumentsCsvInBatches(string $documents, ?int $batchSize = 1000, ?string $primaryKey = null) + { + $promises = []; + foreach (self::batchCsvString($documents, $batchSize) as $batch) { + $promises[] = $this->updateDocumentsCsv($batch, $primaryKey); + } + + return $promises; + } + + public function updateDocumentsNdjsonInBatches(string $documents, ?int $batchSize = 1000, ?string $primaryKey = null) + { + $promises = []; + foreach (self::batchNdjsonString($documents, $batchSize) as $batch) { + $promises[] = $this->updateDocumentsNdjson($batch, $primaryKey); + } + + return $promises; + } + public function deleteAllDocuments(): array { return $this->http->delete(self::PATH.'/'.$this->uid.'/documents'); @@ -100,6 +155,33 @@ private function assertValidDocumentId($documentId): void } } + private static function batchCsvString(string $documents, int $batchSize): Generator + { + $documents = preg_split("/\r\n|\n|\r/", trim($documents)); + $csvHeader = $documents[0]; + array_shift($documents); + $batches = array_chunk($documents, $batchSize); + + foreach ($batches as $batch) { + array_unshift($batch, $csvHeader); + $batch = implode("\n", $batch); + + yield $batch; + } + } + + private static function batchNdjsonString(string $documents, int $batchSize): Generator + { + $documents = preg_split("/\r\n|\n|\r/", trim($documents)); + $batches = array_chunk($documents, $batchSize); + + foreach ($batches as $batch) { + $batch = implode("\n", $batch); + + yield $batch; + } + } + private static function batch(array $documents, int $batchSize): Generator { $batches = array_chunk($documents, $batchSize); diff --git a/src/Http/Client.php b/src/Http/Client.php index 72c8ce32..76aceeb8 100644 --- a/src/Http/Client.php +++ b/src/Http/Client.php @@ -100,13 +100,18 @@ public function post(string $path, $body = null, array $query = [], string $cont return $this->execute($request); } - public function put(string $path, $body = null, array $query = []) + public function put(string $path, $body = null, array $query = [], string $contentType = null) { - $this->headers['Content-type'] = 'application/json'; + if (!\is_null($contentType)) { + $this->headers['Content-type'] = $contentType; + } else { + $this->headers['Content-type'] = 'application/json'; + $body = $this->json->serialize($body); + } $request = $this->requestFactory->createRequest( 'PUT', $this->baseUrl.$path.$this->buildQueryString($query) - )->withBody($this->streamFactory->createStream($this->json->serialize($body))); + )->withBody($this->streamFactory->createStream($body)); return $this->execute($request); } diff --git a/tests/Endpoints/DocumentsTest.php b/tests/Endpoints/DocumentsTest.php index f6c40ca3..48fdf6b8 100644 --- a/tests/Endpoints/DocumentsTest.php +++ b/tests/Endpoints/DocumentsTest.php @@ -256,6 +256,53 @@ public function testUpdateDocumentsInBatches(): void $this->assertCount(\count(self::DOCUMENTS), $response); } + public function testAddDocumentsCsvInBatches(): void + { + $index = $this->client->index('documentCsv'); + + $fileCsv = fopen('./tests/datasets/songs.csv', 'r'); + $documentCsv = fread($fileCsv, filesize('./tests/datasets/songs.csv')); + fclose($fileCsv); + + // Total number of lines excluding header + $total = \count(preg_split("/\r\n|\n|\r/", trim($documentCsv))) - 1; + + $promises = $index->addDocumentsCsvInBatches($documentCsv, 250); + + $this->assertCount(2, $promises); + + foreach ($promises as $promise) { + $this->assertIsValidPromise($promise); + $index->waitForTask($promise['taskUid']); + } + + $response = $index->getDocuments(); + $this->assertSame($total, $response->getTotal()); + } + + public function testAddDocumentsNdjsonInBatches(): void + { + $index = $this->client->index('documentNdJson'); + + $fileNdJson = fopen('./tests/datasets/songs.ndjson', 'r'); + $documentNdJson = fread($fileNdJson, filesize('./tests/datasets/songs.ndjson')); + fclose($fileNdJson); + + $total = \count(preg_split("/\r\n|\n|\r/", trim($documentNdJson))); + + $promises = $index->addDocumentsNdjsonInBatches($documentNdJson, 150); + + $this->assertCount(2, $promises); + + foreach ($promises as $promise) { + $this->assertIsValidPromise($promise); + $index->waitForTask($promise['taskUid']); + } + + $response = $index->getDocuments(); + $this->assertSame($total, $response->getTotal()); + } + public function testAddWithUpdateDocuments(): void { $index = $this->createEmptyIndex($this->safeIndexName('movies')); @@ -444,6 +491,155 @@ public function testGetDocumentsWithPagination(): void $this->assertCount(3, $response); } + public function testUpdateDocumentsJson(): void + { + $index = $this->client->index('documentJson'); + + $fileJson = fopen('./tests/datasets/small_movies.json', 'r'); + $documentJson = fread($fileJson, filesize('./tests/datasets/small_movies.json')); + fclose($fileJson); + + $promise = $index->addDocumentsJson($documentJson); + $index->waitForTask($promise['taskUid']); + + $replacement = [ + [ + 'id' => 522681, + 'title' => 'No Escape Room', + ], + ]; + + $promise = $index->updateDocumentsJson(json_encode($replacement)); + $index->waitForTask($promise['taskUid']); + + $response = $index->getDocument($replacement[0]['id']); + + $this->assertSame($replacement[0]['id'], $response['id']); + $this->assertSame($replacement[0]['title'], $response['title']); + + $documents = $index->getDocuments(); + + $this->assertCount(20, $documents); + } + + public function testUpdateDocumentsCsv(): void + { + $index = $this->client->index('documentCsv'); + + $fileCsv = fopen('./tests/datasets/songs.csv', 'r'); + $documentCsv = fread($fileCsv, filesize('./tests/datasets/songs.csv')); + fclose($fileCsv); + + $promise = $index->addDocumentsCsv($documentCsv); + $index->waitForTask($promise['taskUid']); + + $replacement = 'id,title'.PHP_EOL; + $replacement .= '888221515,Young folks'.PHP_EOL; + + $promise = $index->updateDocumentsCsv($replacement); + $index->waitForTask($promise['taskUid']); + + $response = $index->getDocument(888221515); + + $this->assertSame(888221515, (int) $response['id']); + $this->assertSame('Young folks', $response['title']); + + $documents = $index->getDocuments(); + + $this->assertSame(499, $documents->getTotal()); + } + + public function testUpdateDocumentsNdjson(): void + { + $index = $this->client->index('documentNdJson'); + + $fileNdJson = fopen('./tests/datasets/songs.ndjson', 'r'); + $documentNdJson = fread($fileNdJson, filesize('./tests/datasets/songs.ndjson')); + fclose($fileNdJson); + + $promise = $index->addDocumentsNdjson($documentNdJson); + $index->waitForTask($promise['taskUid']); + + $replacement = json_encode(['id' => 412559401, 'title' => 'WASPTHOVEN']).PHP_EOL; + $replacement .= json_encode(['id' => 70764404, 'artist' => 'Ailitp']).PHP_EOL; + + $promise = $index->updateDocumentsNdjson($replacement); + $index->waitForTask($promise['taskUid']); + + $response = $index->getDocument(412559401); + $this->assertSame(412559401, (int) $response['id']); + $this->assertSame('WASPTHOVEN', $response['title']); + + $response = $index->getDocument(70764404); + $this->assertSame(70764404, (int) $response['id']); + $this->assertSame('Ailitp', $response['artist']); + + $documents = $index->getDocuments(); + + $this->assertSame(225, $documents->getTotal()); + } + + public function testUpdateDocumentsCsvInBatches(): void + { + $index = $this->client->index('documentCsv'); + + $fileCsv = fopen('./tests/datasets/songs.csv', 'r'); + $documentCsv = fread($fileCsv, filesize('./tests/datasets/songs.csv')); + fclose($fileCsv); + + $addPromise = $index->addDocumentsCsv($documentCsv); + $index->waitForTask($addPromise['taskUid']); + + $replacement = 'id,title'.PHP_EOL; + $replacement .= '888221515,Young folks'.PHP_EOL; + $replacement .= '235115704,Mister Klein'.PHP_EOL; + + $promises = $index->updateDocumentsCsvInBatches($replacement, 1); + $this->assertCount(2, $promises); + foreach ($promises as $promise) { + $this->assertIsValidPromise($promise); + $index->waitForTask($promise['taskUid']); + } + + $response = $index->getDocument(888221515); + $this->assertSame(888221515, (int) $response['id']); + $this->assertSame('Young folks', $response['title']); + + $response = $index->getDocument(235115704); + $this->assertSame(235115704, (int) $response['id']); + $this->assertSame('Mister Klein', $response['title']); + } + + public function testUpdateDocumentsNdjsonInBatches(): void + { + $index = $this->client->index('documentNdJson'); + + $fileNdJson = fopen('./tests/datasets/songs.ndjson', 'r'); + $documentNdJson = fread($fileNdJson, filesize('./tests/datasets/songs.ndjson')); + fclose($fileNdJson); + + $addPromise = $index->addDocumentsNdjson($documentNdJson); + $index->waitForTask($addPromise['taskUid']); + + $replacement = json_encode(['id' => 412559401, 'title' => 'WASPTHOVEN']).PHP_EOL; + $replacement .= json_encode(['id' => 70764404, 'artist' => 'Ailitp']).PHP_EOL; + + $promises = $index->updateDocumentsNdjsonInBatches($replacement, 1); + $this->assertCount(2, $promises); + foreach ($promises as $promise) { + $this->assertIsValidPromise($promise); + $index->waitForTask($promise['taskUid']); + } + + $response = $index->getDocument(412559401); + $this->assertSame(412559401, (int) $response['id']); + $this->assertSame('WASPTHOVEN', $response['title']); + + $response = $index->getDocument(70764404); + $this->assertSame(70764404, (int) $response['id']); + $this->assertSame('Ailitp', $response['artist']); + } + /** * @dataProvider invalidDocumentIds */