Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- Added support for caching of lookup joins.

### Fixed

- Fixed issue in the logging code of the `JavaNetHttpPollingClient` which prevents showing the status code and response body when the log level is configured at DEBUG (or lower) level.
Expand Down
62 changes: 43 additions & 19 deletions README.md

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ under the License.

<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
section, omitting the patch part (so for 1.15.0 use 1.15). -->

<flink.version>1.16.3</flink.version>

<target.java.version>11</target.java.version>
Expand Down Expand Up @@ -296,7 +297,8 @@ under the License.
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>

<!-- argLine needed for Flink 1.16 and 1.17 or there are unit test errors-->
<argLine>--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED</argLine>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

Expand All @@ -19,7 +17,7 @@

@Slf4j
@RequiredArgsConstructor
public class AsyncHttpTableLookupFunction extends AsyncTableFunction<RowData> {
public class AsyncHttpTableLookupFunction extends AsyncLookupFunction {

private static final String PULLING_THREAD_POOL_SIZE = "8";

Expand Down Expand Up @@ -73,29 +71,27 @@ public void open(FunctionContext context) throws Exception {
);
}

public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys) {

CompletableFuture<Optional<RowData>> future = new CompletableFuture<>();
future.completeAsync(() -> decorate.lookupByKeys(keys), pullingThreadPool);
@Override
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
CompletableFuture<Collection<RowData>> future = new CompletableFuture<>();
future.completeAsync(() -> decorate.lookup(keyRow), pullingThreadPool);

// We don't want to use ForkJoinPool at all. We are using a different thread pool
// for publishing here intentionally to avoid thread starvation.
CompletableFuture<Collection<RowData>> resultFuture = new CompletableFuture<>();
future.whenCompleteAsync(
(optionalResult, throwable) -> {
(result, throwable) -> {
if (throwable != null) {
log.error("Exception while processing Http Async request", throwable);
resultFuture.completeExceptionally(
new RuntimeException("Exception while processing Http Async request",
throwable));
} else {
if (optionalResult.isPresent()) {
resultFuture.complete(Collections.singleton(optionalResult.get()));
} else {
resultFuture.complete(Collections.emptyList());
}
resultFuture.complete(result);
}
},
publishingThreadPool);
return resultFuture;
}

public LookupRow getLookupRow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
Expand All @@ -10,15 +11,19 @@
import org.apache.flink.table.api.DataTypes.Field;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider ;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -46,17 +51,20 @@ public class HttpLookupTableSource
private final DynamicTableFactory.Context dynamicTableFactoryContext;

private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
@Nullable
private final LookupCache cache;

public HttpLookupTableSource(
DataType physicalRowDataType,
HttpLookupConfig lookupConfig,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DynamicTableFactory.Context dynamicTablecontext) {

DynamicTableFactory.Context dynamicTablecontext,
@Nullable LookupCache cache) {
this.physicalRowDataType = physicalRowDataType;
this.lookupConfig = lookupConfig;
this.decodingFormat = decodingFormat;
this.dynamicTableFactoryContext = dynamicTablecontext;
this.cache = cache;
}

@Override
Expand All @@ -66,6 +74,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
log.debug("getLookupRuntimeProvider Entry");

LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());

Expand Down Expand Up @@ -94,21 +103,38 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
PollingClientFactory<RowData> pollingClientFactory =
createPollingClientFactory(lookupQueryCreator, lookupConfig);

HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
}

protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
DeserializationSchema<RowData> responseSchemaDecoder,
PollingClientFactory<RowData> pollingClientFactory) {

HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
if (lookupConfig.isUseAsync()) {
log.info("Using Async version of HttpLookupTable.");
return AsyncTableFunctionProvider.of(
new AsyncHttpTableLookupFunction(dataLookupFunction));
AsyncLookupFunction asyncLookupFunction =
new AsyncHttpTableLookupFunction(dataLookupFunction);
if (cache != null) {
log.info("Using async version of HttpLookupTable with cache.");
return PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache);
} else {
log.info("Using async version of HttpLookupTable without cache.");
return AsyncLookupFunctionProvider.of(asyncLookupFunction);
}
} else {
log.info("Using blocking version of HttpLookupTable.");
return TableFunctionProvider.of(dataLookupFunction);
if (cache != null) {
log.info("Using blocking version of HttpLookupTable with cache.");
return PartialCachingLookupProvider.of(dataLookupFunction, cache);
} else {
log.info("Using blocking version of HttpLookupTable without cache.");
return LookupFunctionProvider.of(dataLookupFunction);
}
}
}

Expand All @@ -118,7 +144,8 @@ public DynamicTableSource copy() {
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableFactoryContext
dynamicTableFactoryContext,
cache
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
Expand All @@ -15,6 +16,9 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
Expand Down Expand Up @@ -48,7 +52,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);

ReadableConfig readableConfig = helper.getOptions();
ReadableConfig readable = helper.getOptions();
helper.validateExcept(
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
"table.",
Expand All @@ -62,7 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.FORMAT
);

HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig);
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);

ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();

Expand All @@ -73,7 +77,8 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableContext
dynamicTableContext,
getLookupCache(readable)
);
}

Expand All @@ -89,7 +94,18 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER);

return Set.of(
URL_ARGS,
ASYNC_POLLING,
LOOKUP_METHOD,
REQUEST_CALLBACK_IDENTIFIER,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
LookupOptions.PARTIAL_CACHE_MAX_ROWS,
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
LookupOptions.MAX_RETRIES);
}

private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
Expand All @@ -115,6 +131,18 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re
.build();
}

@Nullable
private LookupCache getLookupCache(ReadableConfig tableOptions) {
LookupCache cache = null;
// Do not support legacy cache options
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
cache = DefaultLookupCache.fromConfig(tableOptions);
}
return cache;
}

// TODO verify this since we are on 1.15 now.
// Backport from Flink 1.15-Master
private DataType toRowDataType(List<Column> columns, Predicate<Column> columnPredicate) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -8,17 +10,16 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.LookupFunction;

import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;

@Slf4j
public class HttpTableLookupFunction extends TableFunction<RowData> {
public class HttpTableLookupFunction extends LookupFunction {

private final PollingClientFactory<RowData> pollingClientFactory;

Expand Down Expand Up @@ -50,32 +51,22 @@ public HttpTableLookupFunction(

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);

this.responseSchemaDecoder.open(
SerializationSchemaUtils
.createDeserializationInitContext(HttpTableLookupFunction.class));

this.localHttpCallCounter = new AtomicInteger(0);
this.client = pollingClientFactory
.createPollClient(options, responseSchemaDecoder);

context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}
.createPollClient(options, responseSchemaDecoder);

/**
* This is a lookup method which is called by Flink framework in a runtime.
*/
public void eval(Object... keys) {
lookupByKeys(keys)
.ifPresent(this::collect);
context.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}

public Optional<RowData> lookupByKeys(Object[] keys) {
RowData rowData = GenericRowData.of(keys);
@Override
public Collection<RowData> lookup(RowData keyRow) {
localHttpCallCounter.incrementAndGet();
return client.pull(rowData);
Optional<RowData> result = client.pull(keyRow);
return result.map(Collections::singletonList).orElse(Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public JavaNetHttpPollingClient(
@Override
public Optional<RowData> pull(RowData lookupRow) {
try {
log.debug("Optional<RowData> pull with Rowdata={}.", lookupRow);
return queryAndProcess(lookupRow);
} catch (Exception e) {
log.error("Exception during HTTP request.", e);
Expand Down
Loading