Skip to content
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
.gitignore.swp
.project
.settings
.DS_Store
target
bin
/flink.http.connector.iml
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## [Unreleased]

- Added ability to continue on error, introducing new metadata columns and new configuration option
`gid.connector.http.source.lookup.continue-on-error`

## [0.21.0] - 2025-09-16

- optimized logging in HttpHeaderUtils.
Expand Down
46 changes: 43 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,55 @@ The second one is set per individual HTTP requests by HTTP client. Its default v
Flink's current implementation of `AsyncTableFunction` does not allow specifying custom logic for handling Flink AsyncIO timeouts as it is for Java API.
Because of that, if AsyncIO timer passes, Flink will throw TimeoutException which will cause job restart.

#### Retries (Lookup source)
#### Available Metadata (Lookup source)

The metadata column `http-status-code`, if specified in the table definition, will get the HTTP status code.
The metadata column `http-headers-map `, if specified in the table definition, will get a map of the HTTP headers.

HTTP requests can fail either immediately or after temporary error retries. The usual behaviour after such failures is to end the job. If you would like to continue
processing after these failures then specify `gid.connector.http.source.lookup.continue-on-error` as true. THe lookup join will complete without content in the expected enrichment columns from the http call,
this means that these columns will be null for nullable columns and hold a default value for the type for non-nullable columns.

When using `gid.connector.http.source.lookup.continue-on-error` as true, consider adding extra metadata columns that will surface information about failures into your stream.

Metadata columns can be specified and hold http information. They are optional read-only columns that must be declared VIRTUAL to exclude them during an INSERT INTO operation.

| Key | Data Type | Description |
|-----------------------|----------------------------------|----------------------------------------|
| error-string | STRING NULL | A message associated with the error |
| http-status-code | INT NULL | The HTTP status code |
| http-headers-map | MAP <STRING, ARRAY<STRING>> NULL | The headers returned with the response |
| http-completion-state | STRING NULL | The completion state of the http call. |

##### http-completion-state possible values

| Value | Description |
|:------------------|------------------------|
| SUCCESS | Success |
| HTTP_ERROR_STATUS | HTTP error status code |
| EXCEPTION | An Exception occurred |

If the `error-string` metadata column is defined on the table and the call succeeds then it will have a null value.

When a http lookup call fails and populates the metadata columns with the error information, the expected enrichment columns from the http call
are not populated, this means that they will be null for nullable columns and hold a default value for the type for non-nullable columns.

If you are using the Table API `TableResult` and have an `await` with a timeout, this Timeout exception will cause the job to terminate,
even if there are metadata columns defined.

#### Retries and handling errors (Lookup source)
Lookup source handles auto-retries for two scenarios:
1. IOException occurs (e.g. temporary network outage)
2. The response contains a HTTP error code that indicates a retriable error. These codes are defined in the table configuration (see `gid.connector.http.source.lookup.retry-codes`).
Retries are executed silently, without restarting the job. After reaching max retries attempts (per request) operation will fail and restart job.
Retries are executed silently, without restarting the job.

Notice that HTTP codes are categorized into into 3 groups:
- successful responses - response is returned immediately for further processing
- temporary errors - request will be retried up to the retry limit
- error responses - unexpected responses are not retried and will fail the job. Any HTTP error code which is not configured as successful or temporary error is treated as an unretriable error.
- error responses - unexpected responses are not retried. Any HTTP error code which is not configured as successful or temporary error is treated as an unretriable error.

For temporary errors that have reached max retries attempts (per request) and error responses, the operation will
succeed if `gid.connector.http.source.lookup.continue-on-error` is true, otherwise the job will fail.

##### Retry strategy
User can choose retry strategy type for source table:
Expand Down Expand Up @@ -555,6 +594,7 @@ be requested if the current time is later than the cached token expiry time minu
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff | optional | Exponential-delay initial delay. Default 1 second. |
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff | optional | Exponential-delay maximum delay. Default 1 minute. Use with `lookup.max-retries` parameter. |
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier | optional | Exponential-delay multiplier. Default value 1.5 |
| gid.connector.http.source.lookup.continue-on-error | optional | When true, the flow will continue on errors, returning row content. When false (the default) the job ends on errors. |
| gid.connector.http.source.lookup.proxy.host | optional | Specify the hostname of the proxy. |
| gid.connector.http.source.lookup.proxy.port | optional | Specify the port of the proxy. |
| gid.connector.http.source.lookup.proxy.username | optional | Specify the username used for proxy authentication. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package com.getindata.connectors.http.internal;

import java.util.Collection;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;

import com.getindata.connectors.http.internal.table.lookup.HttpRowDataWrapper;

/**
* A client that is used to get enrichment data from external component.
*/
public interface PollingClient<T> {
public interface PollingClient {

/**
* Gets enrichment data from external component using provided lookup arguments.
* @param lookupRow A {@link RowData} containing request parameters.
* @return an optional result of data lookup.
* @return an optional result of data lookup with http information.
*/
Collection<T> pull(RowData lookupRow);
HttpRowDataWrapper pull(RowData lookupRow);

/**
* Initialize the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import java.io.Serializable;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.ConfigurationException;

import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;

public interface PollingClientFactory<OUT> extends Serializable {
public interface PollingClientFactory extends Serializable {

PollingClient<OUT> createPollClient(
PollingClient createPollClient(
HttpLookupConfig options,
DeserializationSchema<OUT> schemaDecoder
DeserializationSchema<RowData> schemaDecoder
) throws ConfigurationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public final class HttpConnectorConfigConstants {
public static final String SOURCE_CONNECTION_TIMEOUT =
SOURCE_LOOKUP_PREFIX + "connection.timeout";

public static final String CONTINUE_ON_ERROR =
SOURCE_LOOKUP_PREFIX + "continue-on-error";

public static final String SOURCE_PROXY_HOST =
SOURCE_LOOKUP_PREFIX + "proxy.host";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public enum RetryStrategyType {
FIXED_DELAY("fixed-delay"),
EXPONENTIAL_DELAY("exponential-delay"),
;
EXPONENTIAL_DELAY("exponential-delay");

private final String code;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.getindata.connectors.http.internal.table.lookup;

public enum HttpCompletionState {
HTTP_ERROR_STATUS,
EXCEPTION,
SUCCESS
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public class HttpLookupConnectorOptions {
.noDefaultValue()
.withDescription("Http client connection timeout.");

public static final ConfigOption<Boolean> SOURCE_LOOKUP_CONTINUE_ON_ERROR =
ConfigOptions.key(CONTINUE_ON_ERROR)
.booleanType()
.defaultValue(false)
.withDescription("Continue job on error.");

public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_HOST =
ConfigOptions.key(SOURCE_PROXY_HOST)
.stringType()
Expand Down
Loading
Loading