Skip to content

Commit c59c984

Browse files
committed
HTTP63 caching for sync lookups
Signed-off-by: David Radley <[email protected]>
1 parent a1e485e commit c59c984

File tree

10 files changed

+343
-48
lines changed

10 files changed

+343
-48
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818
runs-on: ubuntu-latest
1919
strategy:
2020
matrix:
21-
flink: [ "1.16.3", "1.17.2", "1.18.1"]
21+
flink: ["1.16.3", "1.17.2", "1.18.1", "1.19.0"]
2222
steps:
2323
- uses: actions/checkout@v3
2424

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## [Unreleased]
44

5+
### Added
6+
7+
- Added support for caching of synchronous lookup joins.
8+
59
## [0.13.0] - 2024-04-03
610

711
### Added

README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
399399
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
400400
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. |
401401
| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
402+
| lookup.cache | optional | Enum possible values: NONE, PARTIAL. The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external API). |
403+
| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
404+
| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
405+
| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
402406
| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
403407
| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
404408
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
@@ -410,6 +414,7 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
410414
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
411415
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
412416

417+
413418
### HTTP Sink
414419
| Option | Required | Description/Value |
415420
|---------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
@@ -434,6 +439,19 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
434439
| gid.connector.http.sink.writer.request.mode | optional | Sets Http Sink request submission mode. Two modes are available to select, `single` and `batch` which is the default mode if option is not specified. |
435440
| gid.connector.http.sink.request.batch.size | optional | Applicable only for `gid.connector.http.sink.writer.request.mode = batch`. Sets number of individual events/requests that will be submitted as one HTTP request by HTTP sink. The default value is 500 which is same as HTTP Sink `maxBatchSize` |
436441

442+
443+
## Lookup Cache
444+
The HTTP Client connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.
445+
446+
By default, lookup cache is not enabled. You can enable it by setting lookup.cache to PARTIAL.
447+
448+
The lookup cache is used to improve performance of temporal join the HTTP Client connector. By default, lookup cache is not enabled, so all the API requests are sent on the network. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests
449+
on the network when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.partial-cache.max-rows or when the row exceeds the max time to live specified by lookup.partial-cache.expire-after-write or lookup.partial-cache.expire-after-access.
450+
The cached rows might not be the latest, users can tune expiration options to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.
451+
452+
By default, flink will cache the empty query result for a Primary key, you can toggle the behaviour by setting lookup.partial-cache.cache-missing-key to false.
453+
454+
437455
## Build and deployment
438456
To build the project locally you need to have `maven 3` and Java 11+. </br>
439457

@@ -519,7 +537,7 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json
519537
## TODO
520538

521539
### HTTP TableLookup Source
522-
- Implement caches.
540+
- Implement caches for async.
523541
- Think about Retry Policy for Http Request
524542
- Check other `//TODO`'s.
525543

pom.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ under the License.
6868

6969
<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
7070
section, omitting the patch part (so for 1.15.0 use 1.15). -->
71+
7172
<flink.version>1.16.3</flink.version>
7273

7374
<target.java.version>11</target.java.version>
@@ -290,13 +291,13 @@ under the License.
290291
<target>${target.java.version}</target>
291292
</configuration>
292293
</plugin>
293-
294+
<!-- argLine needed for Flink 1.16 and 1.17 or there are unit test errors-->
294295
<plugin>
295296
<groupId>org.apache.maven.plugins</groupId>
296297
<artifactId>maven-surefire-plugin</artifactId>
297298
<version>3.0.0-M5</version>
298299
<configuration>
299-
300+
<argLine>--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED</argLine>
300301
</configuration>
301302
</plugin>
302303

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.Collection;
6+
import java.util.List;
7+
import java.util.Optional;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
10+
import lombok.AccessLevel;
11+
import lombok.Getter;
12+
import lombok.extern.slf4j.Slf4j;
13+
import org.apache.flink.annotation.VisibleForTesting;
14+
import org.apache.flink.api.common.serialization.DeserializationSchema;
15+
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
16+
import org.apache.flink.table.data.RowData;
17+
import org.apache.flink.table.functions.FunctionContext;
18+
import org.apache.flink.table.functions.LookupFunction;
19+
20+
import com.getindata.connectors.http.internal.PollingClient;
21+
import com.getindata.connectors.http.internal.PollingClientFactory;
22+
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
23+
24+
@Slf4j
25+
//public class HttpTableLookupFunction extends TableFunction<RowData> {
26+
public class CachingHttpTableLookupFunction extends LookupFunction {
27+
private final PollingClientFactory<RowData> pollingClientFactory;
28+
29+
private final DeserializationSchema<RowData> responseSchemaDecoder;
30+
31+
@VisibleForTesting
32+
@Getter(AccessLevel.PACKAGE)
33+
private final LookupRow lookupRow;
34+
35+
@VisibleForTesting
36+
@Getter(AccessLevel.PACKAGE)
37+
private final HttpLookupConfig options;
38+
39+
private transient AtomicInteger localHttpCallCounter;
40+
41+
private transient PollingClient<RowData> client;
42+
43+
private LookupCache cache;
44+
45+
public CachingHttpTableLookupFunction(
46+
PollingClientFactory<RowData> pollingClientFactory,
47+
DeserializationSchema<RowData> responseSchemaDecoder,
48+
LookupRow lookupRow,
49+
HttpLookupConfig options,
50+
LookupCache cache) {
51+
52+
this.pollingClientFactory = pollingClientFactory;
53+
this.responseSchemaDecoder = responseSchemaDecoder;
54+
this.lookupRow = lookupRow;
55+
this.options = options;
56+
this.cache = cache;
57+
}
58+
59+
public LookupCache getCache() {
60+
return cache;
61+
}
62+
63+
public void setCache(LookupCache cache) {
64+
this.cache = cache;
65+
}
66+
67+
@Override
68+
public void open(FunctionContext context) throws Exception {
69+
super.open(context);
70+
71+
this.responseSchemaDecoder.open(
72+
SerializationSchemaUtils
73+
.createDeserializationInitContext(CachingHttpTableLookupFunction.class));
74+
75+
this.localHttpCallCounter = new AtomicInteger(0);
76+
this.client = pollingClientFactory
77+
.createPollClient(options, responseSchemaDecoder);
78+
79+
context
80+
.getMetricGroup()
81+
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
82+
}
83+
84+
/**
85+
* This is a lookup method which is called by Flink framework in a runtime.
86+
*/
87+
@Override
88+
public Collection<RowData> lookup(RowData keyRow) throws IOException {
89+
log.debug("lookup=" + lookupRow);
90+
localHttpCallCounter.incrementAndGet();
91+
Optional<RowData> rowData= client.pull(keyRow);
92+
List<RowData> result = new ArrayList<>();
93+
rowData.ifPresent(row -> { result.add(row); });
94+
log.debug("lookup result=" + result);
95+
return result;
96+
}
97+
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
1414
import org.apache.flink.table.connector.source.DynamicTableSource;
1515
import org.apache.flink.table.connector.source.LookupTableSource;
16-
import org.apache.flink.table.connector.source.TableFunctionProvider;
1716
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
1817
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
18+
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
19+
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
20+
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
1921
import org.apache.flink.table.data.RowData;
2022
import org.apache.flink.table.factories.DynamicTableFactory;
2123
import org.apache.flink.table.factories.FactoryUtil;
@@ -46,17 +48,20 @@ public class HttpLookupTableSource
4648
private final DynamicTableFactory.Context dynamicTableFactoryContext;
4749

4850
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
51+
private final LookupCache cache;
4952

5053
public HttpLookupTableSource(
5154
DataType physicalRowDataType,
5255
HttpLookupConfig lookupConfig,
5356
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
54-
DynamicTableFactory.Context dynamicTablecontext) {
57+
DynamicTableFactory.Context dynamicTablecontext,
58+
LookupCache cache) {
5559

5660
this.physicalRowDataType = physicalRowDataType;
5761
this.lookupConfig = lookupConfig;
5862
this.decodingFormat = decodingFormat;
5963
this.dynamicTableFactoryContext = dynamicTablecontext;
64+
this.cache =cache;
6065
}
6166

6267
@Override
@@ -66,6 +71,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
6671

6772
@Override
6873
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
74+
log.debug("getLookupRuntimeProvider Entry");
6975

7076
LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());
7177

@@ -94,21 +100,41 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
94100
PollingClientFactory<RowData> pollingClientFactory =
95101
createPollingClientFactory(lookupQueryCreator, lookupConfig);
96102

97-
HttpTableLookupFunction dataLookupFunction =
98-
new HttpTableLookupFunction(
99-
pollingClientFactory,
100-
responseSchemaDecoder,
101-
lookupRow,
102-
lookupConfig
103-
);
103+
// In line with the JDBC implementation and current requirements, we are only
104+
// supporting Partial Caching for synchronous operations.
105+
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
106+
}
104107

108+
protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
109+
DeserializationSchema<RowData> responseSchemaDecoder,
110+
PollingClientFactory<RowData> pollingClientFactory) {
105111
if (lookupConfig.isUseAsync()) {
112+
HttpTableLookupFunction dataLookupFunction =
113+
new HttpTableLookupFunction(
114+
pollingClientFactory,
115+
responseSchemaDecoder,
116+
lookupRow,
117+
lookupConfig
118+
);
106119
log.info("Using Async version of HttpLookupTable.");
107120
return AsyncTableFunctionProvider.of(
108-
new AsyncHttpTableLookupFunction(dataLookupFunction));
121+
new AsyncHttpTableLookupFunction(dataLookupFunction));
109122
} else {
110-
log.info("Using blocking version of HttpLookupTable.");
111-
return TableFunctionProvider.of(dataLookupFunction);
123+
CachingHttpTableLookupFunction dataLookupFunction =
124+
new CachingHttpTableLookupFunction(
125+
pollingClientFactory,
126+
responseSchemaDecoder,
127+
lookupRow,
128+
lookupConfig,
129+
cache
130+
);
131+
if (cache != null) {
132+
log.debug("PartialCachingLookupProvider; cache = " + cache);
133+
return PartialCachingLookupProvider.of(dataLookupFunction, cache);
134+
} else {
135+
log.debug("Using LookupFunctionProvider.");
136+
return LookupFunctionProvider.of(dataLookupFunction);
137+
}
112138
}
113139
}
114140

@@ -118,7 +144,8 @@ public DynamicTableSource copy() {
118144
physicalRowDataType,
119145
lookupConfig,
120146
decodingFormat,
121-
dynamicTableFactoryContext
147+
dynamicTableFactoryContext,
148+
cache
122149
);
123150
}
124151

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.Set;
66
import java.util.function.Predicate;
77
import java.util.stream.Collectors;
8+
import javax.annotation.Nullable;
89

910
import org.apache.flink.api.common.serialization.DeserializationSchema;
1011
import org.apache.flink.configuration.ConfigOption;
@@ -15,6 +16,9 @@
1516
import org.apache.flink.table.catalog.ResolvedSchema;
1617
import org.apache.flink.table.connector.format.DecodingFormat;
1718
import org.apache.flink.table.connector.source.DynamicTableSource;
19+
import org.apache.flink.table.connector.source.lookup.LookupOptions;
20+
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
21+
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
1822
import org.apache.flink.table.data.RowData;
1923
import org.apache.flink.table.factories.DeserializationFormatFactory;
2024
import org.apache.flink.table.factories.DynamicTableSourceFactory;
@@ -47,7 +51,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
4751
FactoryUtil.TableFactoryHelper helper =
4852
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);
4953

50-
ReadableConfig readableConfig = helper.getOptions();
54+
ReadableConfig readable = helper.getOptions();
5155
helper.validateExcept(
5256
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
5357
"table.",
@@ -61,7 +65,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
6165
FactoryUtil.FORMAT
6266
);
6367

64-
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig);
68+
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);
6569

6670
ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();
6771

@@ -72,7 +76,8 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
7276
physicalRowDataType,
7377
lookupConfig,
7478
decodingFormat,
75-
dynamicTableContext
79+
dynamicTableContext,
80+
getLookupCache(readable)
7681
);
7782
}
7883

@@ -88,7 +93,16 @@ public Set<ConfigOption<?>> requiredOptions() {
8893

8994
@Override
9095
public Set<ConfigOption<?>> optionalOptions() {
91-
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD);
96+
return Set.of(
97+
URL_ARGS,
98+
ASYNC_POLLING,
99+
LOOKUP_METHOD,
100+
LookupOptions.CACHE_TYPE,
101+
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
102+
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
103+
LookupOptions.PARTIAL_CACHE_MAX_ROWS,
104+
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
105+
LookupOptions.MAX_RETRIES);
92106
}
93107

94108
private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
@@ -105,6 +119,18 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re
105119
.build();
106120
}
107121

122+
@Nullable
123+
private LookupCache getLookupCache(ReadableConfig tableOptions) {
124+
LookupCache cache = null;
125+
// Do not support legacy cache options
126+
if (tableOptions
127+
.get(LookupOptions.CACHE_TYPE)
128+
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
129+
cache = DefaultLookupCache.fromConfig(tableOptions);
130+
}
131+
return cache;
132+
}
133+
108134
// TODO verify this since we are on 1.15 now.
109135
// Backport from Flink 1.15-Master
110136
private DataType toRowDataType(List<Column> columns, Predicate<Column> columnPredicate) {

0 commit comments

Comments
 (0)