Skip to content

Commit 6be2698

Browse files
author
Grzegorz Kołakowski
committed
Improve implementation
* Use LookupFunction and AsyncLookupFunction interfaces * Add integration tests
1 parent c4acdb3 commit 6be2698

File tree

10 files changed

+176
-211
lines changed

10 files changed

+176
-211
lines changed

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## [Unreleased]
44

5+
### Added
6+
7+
- Added support for caching of lookup joins.
8+
59
### Fixed
610

711
- Fixed issue in the logging code of the `JavaNetHttpPollingClient` which prevents showing the status code and response body when the log level is configured at DEBUG (or lower) level.
@@ -16,8 +20,6 @@
1620
[Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
1721
is used instead.
1822

19-
- Added support for caching of synchronous lookup joins.
20-
2123
## [0.13.0] - 2024-04-03
2224

2325
### Added

README.md

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -424,12 +424,12 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
424424
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
425425
| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. |
426426
| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
427-
| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). |
428-
| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
429-
| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
430-
| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
431-
| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
432-
| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
427+
| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). |
428+
| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
429+
| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
430+
| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
431+
| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
432+
| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
433433
| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
434434
| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
435435
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
@@ -469,18 +469,20 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
469469

470470

471471
## Lookup Cache
472-
The HTTP Client connector can be used in temporal join as a lookup source (also known as a dimension table).
472+
The HTTP Client connector can be used in lookup join as a lookup source (also known as a dimension table).
473473

474-
By default, the lookup cache is not enabled. You can enable it by setting `lookup.cache` to `PARTIAL`. Caching is only enabled if `asyncPolling` = false.
475-
The scope of the cache is per job, so long running jobs can benefit from this caching.
474+
By default, the lookup cache is not enabled. You can enable it by setting `lookup.cache` to `PARTIAL`.
475+
The scope of the cache is per job, so long-running jobs can benefit from this caching.
476476

477-
The lookup cache is used to improve the performance of temporal joins. By default, the lookup cache is not enabled, so all the API requests are sent on the network. When the lookup cache is enabled, Flink looks in the cache first, and only sends requests
478-
on the network when there is no cached value, then the cache is updated with the returned rows. The oldest rows in this cache are expired when the cache hits the max cached rows `lookup.partial-cache.max-rows` or when the row exceeds the max time to live specified by `lookup.partial-cache.expire-after-write` or `lookup.partial-cache.expire-after-access`.
479-
The cached rows might not be the latest, but users can tune expiration options to a smaller value to have fresher data, but this may increase the number of API requests sent. So this is a balance between throughput and correctness.
480-
A good use case for enabling this cache, is when the API responses are very slowly changing; for example master or reference data.
481-
There are many cases when caching is not appropriate, for example calling an API to get the latest stock price.
477+
The lookup cache is used to improve the performance of temporal joins. By default, the lookup cache is not enabled,
478+
so all the API requests are sent on the network. When the lookup cache is enabled, Flink looks in the cache first,
479+
and only sends requests on the network when there is no cached value, then the cache is updated with the returned rows.
480+
The oldest rows in this cache are expired when the cache hits the max cached rows `lookup.partial-cache.max-rows`
481+
or when the row exceeds the max time to live specified by `lookup.partial-cache.expire-after-write`
482+
or `lookup.partial-cache.expire-after-access`.
482483

483-
By default, flink caches the empty query result for a Primary key. You can toggle this behaviour by setting `lookup.partial-cache.cache-missing-key` to false.
484+
By default, flink caches the empty query result for the primary key. You can toggle this behaviour by setting
485+
`lookup.partial-cache.cache-missing-key` to false.
484486

485487

486488
## Build and deployment
@@ -568,7 +570,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json
568570
## TODO
569571

570572
### HTTP TableLookup Source
571-
- Implement caches for async.
572573
- Think about Retry Policy for Http Request
573574
- Check other `//TODO`'s.
574575

src/main/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunction.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

33
import java.util.Collection;
4-
import java.util.Collections;
5-
import java.util.Optional;
64
import java.util.concurrent.CompletableFuture;
75
import java.util.concurrent.ExecutorService;
86
import java.util.concurrent.Executors;
97

108
import lombok.RequiredArgsConstructor;
119
import lombok.extern.slf4j.Slf4j;
1210
import org.apache.flink.table.data.RowData;
13-
import org.apache.flink.table.functions.AsyncTableFunction;
11+
import org.apache.flink.table.functions.AsyncLookupFunction;
1412
import org.apache.flink.table.functions.FunctionContext;
1513
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
1614

@@ -19,7 +17,7 @@
1917

2018
@Slf4j
2119
@RequiredArgsConstructor
22-
public class AsyncHttpTableLookupFunction extends AsyncTableFunction<RowData> {
20+
public class AsyncHttpTableLookupFunction extends AsyncLookupFunction {
2321

2422
private static final String PULLING_THREAD_POOL_SIZE = "8";
2523

@@ -73,29 +71,27 @@ public void open(FunctionContext context) throws Exception {
7371
);
7472
}
7573

76-
public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys) {
77-
78-
CompletableFuture<Optional<RowData>> future = new CompletableFuture<>();
79-
future.completeAsync(() -> decorate.lookupByKeys(keys), pullingThreadPool);
74+
@Override
75+
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
76+
CompletableFuture<Collection<RowData>> future = new CompletableFuture<>();
77+
future.completeAsync(() -> decorate.lookup(keyRow), pullingThreadPool);
8078

8179
// We don't want to use ForkJoinPool at all. We are using a different thread pool
8280
// for publishing here intentionally to avoid thread starvation.
81+
CompletableFuture<Collection<RowData>> resultFuture = new CompletableFuture<>();
8382
future.whenCompleteAsync(
84-
(optionalResult, throwable) -> {
83+
(result, throwable) -> {
8584
if (throwable != null) {
8685
log.error("Exception while processing Http Async request", throwable);
8786
resultFuture.completeExceptionally(
88-
new RuntimeException("Exception while processing Http Async request",
89-
throwable));
87+
new RuntimeException("Exception while processing Http Async request",
88+
throwable));
9089
} else {
91-
if (optionalResult.isPresent()) {
92-
resultFuture.complete(Collections.singleton(optionalResult.get()));
93-
} else {
94-
resultFuture.complete(Collections.emptyList());
95-
}
90+
resultFuture.complete(result);
9691
}
9792
},
9893
publishingThreadPool);
94+
return resultFuture;
9995
}
10096

10197
public LookupRow getLookupRow() {

src/main/java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java

Lines changed: 0 additions & 88 deletions
This file was deleted.

0 commit comments

Comments
 (0)