diff --git a/pom.xml b/pom.xml index f0c8d3d7..42e9db55 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,11 @@ commons-codec 1.16.0 + + io.github.resilience4j + resilience4j-retry + 1.7.0 + diff --git a/samples/DeleteOneDocument.java b/samples/DeleteOneDocument.java index 5fff10dd..e31946d7 100644 --- a/samples/DeleteOneDocument.java +++ b/samples/DeleteOneDocument.java @@ -1,16 +1,16 @@ -import com.coveo.pushapiclient.Source; +import com.coveo.pushapiclient.PushSource; import java.io.IOException; import java.net.http.HttpResponse; public class DeleteOneDocument { public static void main(String[] args) { - Source source = new Source("my_api_key", "my_org_id"); + PushSource source = new PushSource("my_api_key", "my_org_id"); String documentId = "https://my.document.uri"; Boolean deleteChildren = true; try { - HttpResponse response = source.deleteDocument("my_source_id", documentId, deleteChildren); + HttpResponse response = source.deleteDocument(documentId, deleteChildren); System.out.println(String.format("Delete document status: %s", response.statusCode())); System.out.println(String.format("Delete document response: %s", response.body())); } catch (IOException e) { diff --git a/samples/PushOneDocument.java b/samples/PushOneDocument.java index 0f44b529..4416e684 100644 --- a/samples/PushOneDocument.java +++ b/samples/PushOneDocument.java @@ -1,12 +1,13 @@ +import com.coveo.pushapiclient.BackoffOptionsBuilder; import com.coveo.pushapiclient.DocumentBuilder; -import com.coveo.pushapiclient.Source; +import com.coveo.pushapiclient.PushSource; import java.io.IOException; import java.net.http.HttpResponse; public class PushOneDocument { public static void main(String[] args) { - Source source = new Source("my_api_key", "my_org_id"); + PushSource source = new PushSource("my_api_key", "my_org_id", new BackoffOptionsBuilder().withMaxRetries(5).withRetryAfter(10000).build()); DocumentBuilder documentBuilder = new DocumentBuilder("https://my.document.uri", "My document title") .withData("these words will be searchable"); diff --git a/samples/PushOneDocumentWithMetadata.java b/samples/PushOneDocumentWithMetadata.java index d28ea89f..1fb9d7b5 100644 --- a/samples/PushOneDocumentWithMetadata.java +++ b/samples/PushOneDocumentWithMetadata.java @@ -1,5 +1,7 @@ +import com.coveo.pushapiclient.BackoffOptions; +import com.coveo.pushapiclient.BackoffOptionsBuilder; import com.coveo.pushapiclient.DocumentBuilder; -import com.coveo.pushapiclient.Source; +import com.coveo.pushapiclient.PushSource; import java.io.IOException; import java.net.http.HttpResponse; @@ -7,7 +9,7 @@ public class PushOneDocumentWithMetadata { public static void main(String[] args) { - Source source = new Source("my_api_key", "my_org_id"); + PushSource source = new PushSource("my_api_key", "my_org_id", new BackoffOptionsBuilder().withTimeMultiple(1).build()); DocumentBuilder documentBuilder = new DocumentBuilder("https://my.document.uri", "My document title") .withData("these words will be searchable") .withAuthor("bob") diff --git a/src/main/java/com/coveo/pushapiclient/ApiCore.java b/src/main/java/com/coveo/pushapiclient/ApiCore.java index 50f575f6..283b3761 100644 --- a/src/main/java/com/coveo/pushapiclient/ApiCore.java +++ b/src/main/java/com/coveo/pushapiclient/ApiCore.java @@ -1,27 +1,70 @@ package com.coveo.pushapiclient; +import io.github.resilience4j.core.IntervalFunction; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublisher; import java.net.http.HttpResponse; +import java.util.function.Function; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -// TODO: LENS-934 - Support throttling class ApiCore { private final HttpClient httpClient; private final Logger logger; + private final BackoffOptions options; public ApiCore() { - this.httpClient = HttpClient.newHttpClient(); - this.logger = LogManager.getLogger(ApiCore.class); + this(HttpClient.newHttpClient(), LogManager.getLogger(ApiCore.class)); } public ApiCore(HttpClient httpClient, Logger logger) { + this(httpClient, logger, new BackoffOptionsBuilder().build()); + } + + public ApiCore(HttpClient httpClient, Logger logger, BackoffOptions options) { this.httpClient = httpClient; this.logger = logger; + this.options = options; + } + + public HttpResponse callApiWithRetries(HttpRequest request) + throws IOException, InterruptedException { + IntervalFunction intervalFn = + IntervalFunction.ofExponentialRandomBackoff( + this.options.getRetryAfter(), this.options.getTimeMultiple()); + + RetryConfig retryConfig = + RetryConfig.>custom() + .maxAttempts(this.options.getMaxRetries()) + .intervalFunction(intervalFn) + .retryOnResult(response -> response != null && response.statusCode() == 429) + .build(); + + Retry retry = Retry.of("platformRequest", retryConfig); + + Function> retryRequestFn = + Retry.decorateFunction(retry, req -> sendRequest(req)); + + return retryRequestFn.apply(request); + } + + public HttpResponse sendRequest(HttpRequest request) { + String uri = request.uri().toString(); + String reqMethod = request.method(); + this.logger.debug(reqMethod + " " + uri); + try { + HttpResponse response = + this.httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + this.logResponse(response); + return response; + } catch (IOException | InterruptedException e) { + throw new Error(e.getMessage()); + } } public HttpResponse post(URI uri, String[] headers) @@ -31,42 +74,30 @@ public HttpResponse post(URI uri, String[] headers) public HttpResponse post(URI uri, String[] headers, BodyPublisher body) throws IOException, InterruptedException { - this.logger.debug("POST " + uri); HttpRequest request = HttpRequest.newBuilder().headers(headers).uri(uri).POST(body).build(); - HttpResponse response = - this.httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - this.logResponse(response); + HttpResponse response = this.callApiWithRetries(request); return response; } public HttpResponse put(URI uri, String[] headers, BodyPublisher body) throws IOException, InterruptedException { - this.logger.debug("PUT " + uri); HttpRequest request = HttpRequest.newBuilder().headers(headers).uri(uri).PUT(body).build(); - HttpResponse response = - this.httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - this.logResponse(response); + HttpResponse response = this.callApiWithRetries(request); return response; } public HttpResponse delete(URI uri, String[] headers) throws IOException, InterruptedException { - this.logger.debug("DELETE " + uri); HttpRequest request = HttpRequest.newBuilder().headers(headers).uri(uri).DELETE().build(); - HttpResponse response = - this.httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - this.logResponse(response); + HttpResponse response = this.callApiWithRetries(request); return response; } public HttpResponse delete(URI uri, String[] headers, BodyPublisher body) throws IOException, InterruptedException { - this.logger.debug("DELETE " + uri); HttpRequest request = HttpRequest.newBuilder().headers(headers).uri(uri).method("DELETE", body).build(); - HttpResponse response = - this.httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - this.logResponse(response); + HttpResponse response = this.callApiWithRetries(request); return response; } diff --git a/src/main/java/com/coveo/pushapiclient/BackoffOptions.java b/src/main/java/com/coveo/pushapiclient/BackoffOptions.java new file mode 100644 index 00000000..ad94adaa --- /dev/null +++ b/src/main/java/com/coveo/pushapiclient/BackoffOptions.java @@ -0,0 +1,25 @@ +package com.coveo.pushapiclient; + +public class BackoffOptions { + private final int retryAfter; + private final int maxRetries; + private final int timeMultiple; + + public BackoffOptions(int retryAfter, int maxRetries, int timeMultiple) { + this.retryAfter = retryAfter; + this.maxRetries = maxRetries; + this.timeMultiple = timeMultiple; + } + + public int getRetryAfter() { + return this.retryAfter; + } + + public int getMaxRetries() { + return this.maxRetries; + } + + public int getTimeMultiple() { + return this.timeMultiple; + } +} diff --git a/src/main/java/com/coveo/pushapiclient/BackoffOptionsBuilder.java b/src/main/java/com/coveo/pushapiclient/BackoffOptionsBuilder.java new file mode 100644 index 00000000..1f254d39 --- /dev/null +++ b/src/main/java/com/coveo/pushapiclient/BackoffOptionsBuilder.java @@ -0,0 +1,30 @@ +package com.coveo.pushapiclient; + +public class BackoffOptionsBuilder { + public static final int DEFAULT_RETRY_AFTER = 5000; + public static final int DEFAULT_MAX_RETRIES = 50; + public static final int DEFAULT_TIME_MULTIPLE = 2; + + private int retryAfter = DEFAULT_RETRY_AFTER; + private int maxRetries = DEFAULT_MAX_RETRIES; + private int timeMultiple = DEFAULT_TIME_MULTIPLE; + + public BackoffOptionsBuilder withRetryAfter(int retryAfter) { + this.retryAfter = retryAfter; + return this; + } + + public BackoffOptionsBuilder withMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public BackoffOptionsBuilder withTimeMultiple(int timeMultiple) { + this.timeMultiple = timeMultiple; + return this; + } + + public BackoffOptions build() { + return new BackoffOptions(this.retryAfter, this.maxRetries, this.timeMultiple); + } +} diff --git a/src/main/java/com/coveo/pushapiclient/PlatformClient.java b/src/main/java/com/coveo/pushapiclient/PlatformClient.java index a70bbb06..9d2c665b 100644 --- a/src/main/java/com/coveo/pushapiclient/PlatformClient.java +++ b/src/main/java/com/coveo/pushapiclient/PlatformClient.java @@ -28,7 +28,11 @@ public class PlatformClient { * @param organizationId The Coveo Organization identifier. */ public PlatformClient(String apiKey, String organizationId) { - this(apiKey, organizationId, new PlatformUrlBuilder().build()); + this( + apiKey, + organizationId, + new PlatformUrlBuilder().build(), + new BackoffOptionsBuilder().build()); } /** @@ -38,9 +42,36 @@ public PlatformClient(String apiKey, String organizationId) { * organization. * @see Manage API Keys * @param organizationId The Coveo Organization identifier. - * @param platformUrl The PlatformUrl. */ public PlatformClient(String apiKey, String organizationId, PlatformUrl platformUrl) { + this(apiKey, organizationId, platformUrl, new BackoffOptionsBuilder().build()); + } + + /** + * Construct a PlatformClient + * + * @param apiKey An apiKey capable of pushing documents and managing sources in a Coveo + * organization. + * @see Manage API Keys + * @param organizationId The Coveo Organization identifier. + * @param options The configuration options for exponential backoff. + */ + public PlatformClient(String apiKey, String organizationId, BackoffOptions options) { + this(apiKey, organizationId, new PlatformUrlBuilder().build(), options); + } + + /** + * Construct a PlatformClient + * + * @param apiKey An apiKey capable of pushing documents and managing sources in a Coveo + * organization. + * @see Manage API Keys + * @param organizationId The Coveo Organization identifier. + * @param platformUrl The PlatformUrl. + * @param options The configuration options for exponential backoff. + */ + public PlatformClient( + String apiKey, String organizationId, PlatformUrl platformUrl, BackoffOptions options) { this.apiKey = apiKey; this.organizationId = organizationId; this.api = new ApiCore(); @@ -57,9 +88,24 @@ public PlatformClient(String apiKey, String organizationId, PlatformUrl platform * @param httpClient The HttpClient. */ public PlatformClient(String apiKey, String organizationId, HttpClient httpClient) { + this(apiKey, organizationId, httpClient, new BackoffOptionsBuilder().build()); + } + + /** + * Construct a PlatformClient + * + * @param apiKey An apiKey capable of pushing documents and managing sources in a Coveo + * organization. + * @see Manage API Keys + * @param organizationId The Coveo Organization identifier. + * @param httpClient The HttpClient. + * @param options The configuration options for exponential backoff. + */ + public PlatformClient( + String apiKey, String organizationId, HttpClient httpClient, BackoffOptions options) { this.apiKey = apiKey; this.organizationId = organizationId; - this.api = new ApiCore(httpClient, LogManager.getLogger(ApiCore.class)); + this.api = new ApiCore(httpClient, LogManager.getLogger(ApiCore.class), options); this.platformUrl = new PlatformUrlBuilder().build(); } diff --git a/src/main/java/com/coveo/pushapiclient/PushService.java b/src/main/java/com/coveo/pushapiclient/PushService.java index 8691fe83..a7ba3665 100644 --- a/src/main/java/com/coveo/pushapiclient/PushService.java +++ b/src/main/java/com/coveo/pushapiclient/PushService.java @@ -10,13 +10,17 @@ public class PushService { private PushServiceInternal service; public PushService(PushEnabledSource source) { + this(source, new BackoffOptionsBuilder().build()); + } + + public PushService(PushEnabledSource source, BackoffOptions options) { String apiKey = source.getApiKey(); String organizationId = source.getOrganizationId(); PlatformUrl platformUrl = source.getPlatformUrl(); UploadStrategy uploader = this.getUploadStrategy(); DocumentUploadQueue queue = new DocumentUploadQueue(uploader); - this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl); + this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); this.service = new PushServiceInternal(queue); this.source = source; } diff --git a/src/main/java/com/coveo/pushapiclient/PushSource.java b/src/main/java/com/coveo/pushapiclient/PushSource.java index 1f3a8e28..5371598a 100644 --- a/src/main/java/com/coveo/pushapiclient/PushSource.java +++ b/src/main/java/com/coveo/pushapiclient/PushSource.java @@ -125,6 +125,35 @@ public static PushSource fromPlatformUrl( return new PushSource(apiKey, organizationId, sourceId, platformUrl); } + /** + * Create a Push source instance + * + * @param apiKey The API key used for all operations regarding your source. + *

Ensure your API key has the required privileges for the operation you will be performing + * * + *

For more information about which privileges are required, see Privilege Reference. + * @param organizationId The unique identifier of your organization. + *

The Organization Id can be retrieved in the URL of your Coveo organization. + * @param sourceId The unique identifier of the target Push source. + *

The Source Id can be retrieved when you edit your source in the Coveo Administration + * Console + * @param platformUrl The object containing additional information on the URL endpoint. You can + * use the {@link PlatformUrl} when your organization is located in a non-default Coveo + * environement and/or region. When not specified, the default platform URL values will be + * used: {@link PlatformUrl#DEFAULT_ENVIRONMENT} and {@link PlatformUrl#DEFAULT_REGION} + * @param options The configuration options for exponential backoff. + */ + public static PushSource fromPlatformUrl( + String apiKey, + String organizationId, + String sourceId, + PlatformUrl platformUrl, + BackoffOptions options) { + return new PushSource(apiKey, organizationId, sourceId, platformUrl, options); + } + private PushSource( String apiKey, String organizationId, String sourceId, PlatformUrl platformUrl) { this.apiKey = apiKey; @@ -132,6 +161,17 @@ private PushSource( this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl); } + private PushSource( + String apiKey, + String organizationId, + String sourceId, + PlatformUrl platformUrl, + BackoffOptions options) { + this.apiKey = apiKey; + this.urlExtractor = new ApiUrl(organizationId, sourceId, platformUrl); + this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); + } + /** * Create or update a security identity. * diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java index 3ba94f5e..41dde7e0 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamService.java +++ b/src/main/java/com/coveo/pushapiclient/StreamService.java @@ -25,6 +25,21 @@ public class StreamService { * @param source The source to which you want to send your documents. */ public StreamService(StreamEnabledSource source) { + this(source, new BackoffOptionsBuilder().build()); + } + + /** + * Creates a service to stream your documents to the provided source by interacting with the + * Stream API. + * + *

To perform full document updates, use the + * {@PushService}, since pushing documents with the {@StreamService} is equivalent to triggering a + * full source rebuild. The {@StreamService} can also be used for an initial catalog upload. + * + * @param source The source to which you want to send your documents. + * @param options The configuration options for exponential backoff. + */ + public StreamService(StreamEnabledSource source, BackoffOptions options) { String apiKey = source.getApiKey(); String organizationId = source.getOrganizationId(); PlatformUrl platformUrl = source.getPlatformUrl(); @@ -33,7 +48,7 @@ public StreamService(StreamEnabledSource source) { this.source = source; this.queue = new DocumentUploadQueue(uploader); - this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl); + this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient, logger); } diff --git a/src/test/java/com/coveo/pushapiclient/ApiCoreTest.java b/src/test/java/com/coveo/pushapiclient/ApiCoreTest.java index c876fbfe..ae60e5fa 100644 --- a/src/test/java/com/coveo/pushapiclient/ApiCoreTest.java +++ b/src/test/java/com/coveo/pushapiclient/ApiCoreTest.java @@ -25,6 +25,7 @@ public class ApiCoreTest { @Mock private HttpClient httpClient; @Mock private HttpRequest httpRequest; @Mock private Logger logger; + @Mock private BackoffOptions backoffOptions; @Mock private HttpResponse httpResponse; @InjectMocks private ApiCore api; @@ -46,12 +47,25 @@ private void mockErrorResponse() { when(httpRequest.method()).thenReturn("DELETE"); } + private void mockThrottledResponse() { + when(httpResponse.statusCode()).thenReturn(429); + when(httpResponse.body()).thenReturn("THROTTLED_REQUEST"); + when(httpRequest.method()).thenReturn("POST"); + } + + private void mockBackoffOptions() { + when(backoffOptions.getMaxRetries()).thenReturn(2); + when(backoffOptions.getRetryAfter()).thenReturn(100); + when(backoffOptions.getTimeMultiple()).thenReturn(2); + } + @Before public void setUp() throws Exception { closeable = MockitoAnnotations.openMocks(this); when(httpClient.send(any(HttpRequest.class), any(BodyHandler.class))).thenReturn(httpResponse); when(httpResponse.request()).thenReturn(httpRequest); + mockBackoffOptions(); } @After @@ -79,4 +93,16 @@ public void testShouldLogResponse() throws IOException, InterruptedException, UR verify(logger, times(1)).error("DELETE status: 412"); verify(logger, times(1)).error("DELETE response: BAD_REQUEST"); } + + @Test + public void testShouldHandleBackoffOptions() + throws IOException, InterruptedException, URISyntaxException { + this.mockThrottledResponse(); + + this.api.post(new URI("https://perdu.com/"), headers); + + verify(logger, times(2)).debug("POST https://perdu.com/"); + verify(logger, times(2)).error("POST status: 429"); + verify(logger, times(2)).error("POST response: THROTTLED_REQUEST"); + } } diff --git a/src/test/java/com/coveo/pushapiclient/BackoffOptionsBuilderTest.java b/src/test/java/com/coveo/pushapiclient/BackoffOptionsBuilderTest.java new file mode 100644 index 00000000..453b6651 --- /dev/null +++ b/src/test/java/com/coveo/pushapiclient/BackoffOptionsBuilderTest.java @@ -0,0 +1,42 @@ +package com.coveo.pushapiclient; + +import static org.junit.Assert.*; + +import org.junit.Before; +import org.junit.Test; + +public class BackoffOptionsBuilderTest { + + private BackoffOptionsBuilder backoffOptionsBuilder; + + @Before + public void setup() { + backoffOptionsBuilder = new BackoffOptionsBuilder(); + } + + @Test + public void testWithDefaultValues() { + BackoffOptions backoffOptions = backoffOptionsBuilder.build(); + assertEquals("Should return default retry after time", 5000, backoffOptions.getRetryAfter()); + assertEquals("Should return default max retries", 50, backoffOptions.getMaxRetries()); + assertEquals("Should return default time multiple", 2, backoffOptions.getTimeMultiple()); + } + + @Test + public void testWithNonDefaultRetryAfter() { + BackoffOptions backoffOptions = backoffOptionsBuilder.withRetryAfter(1000).build(); + assertEquals("Should return Europe platform URL", 1000, backoffOptions.getRetryAfter()); + } + + @Test + public void testWithNonDefaultMaxRetries() { + BackoffOptions backoffOptions = backoffOptionsBuilder.withMaxRetries(15).build(); + assertEquals("Should return the staging platform URL", 15, backoffOptions.getMaxRetries()); + } + + @Test + public void testWithNonDefaultTimeMultiple() { + BackoffOptions backoffOptions = backoffOptionsBuilder.withTimeMultiple(3).build(); + assertEquals(3, backoffOptions.getTimeMultiple()); + } +}