Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.time.Clock;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -50,13 +51,25 @@ class TableMetadataCache {

private final Catalog catalog;
private final long refreshMs;
private final Clock cacheRefreshClock;
private final int inputSchemasPerTableCacheMaximumSize;
private final Map<TableIdentifier, CacheItem> tableCache;

TableMetadataCache(
Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) {
this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
}

@VisibleForTesting
TableMetadataCache(
Catalog catalog,
int maximumSize,
long refreshMs,
int inputSchemasPerTableCacheMaximumSize,
Clock cacheRefreshClock) {
this.catalog = catalog;
this.refreshMs = refreshMs;
this.cacheRefreshClock = cacheRefreshClock;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
this.tableCache = new LRUCache<>(maximumSize);
}
Expand Down Expand Up @@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) {
tableCache.put(
identifier,
new CacheItem(
cacheRefreshClock.millis(),
true,
table.refs().keySet(),
table.schemas(),
Expand Down Expand Up @@ -186,14 +200,16 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
return EXISTS;
} catch (NoSuchTableException e) {
LOG.debug("Table doesn't exist {}", identifier, e);
tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
tableCache.put(
identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1));
return Tuple2.of(false, e);
}
}

private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
return allowRefresh
&& (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis());
&& (cacheItem == null
|| cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs);
}

public void invalidate(TableIdentifier identifier) {
Expand All @@ -202,20 +218,21 @@ public void invalidate(TableIdentifier identifier) {

/** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */
static class CacheItem {
private final long created = System.currentTimeMillis();

private final long createdTimestampMillis;
private final boolean tableExists;
private final Set<String> branches;
private final Map<Integer, Schema> tableSchemas;
private final Map<Integer, PartitionSpec> specs;
private final Map<Schema, ResolvedSchemaInfo> inputSchemas;

private CacheItem(
long createdTimestampMillis,
boolean tableExists,
Set<String> branches,
Map<Integer, Schema> tableSchemas,
Map<Integer, PartitionSpec> specs,
int inputSchemaCacheMaximumSize) {
this.createdTimestampMillis = createdTimestampMillis;
this.tableExists = tableExists;
this.branches = branches;
this.tableSchemas = tableSchemas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
Expand Down Expand Up @@ -91,4 +95,33 @@ void testCachingDisabled() {

assertThat(cache.getInternalCache()).isEmpty();
}

@Test
void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
// Create table
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
Table table = catalog.createTable(tableIdentifier, SCHEMA2);

// Init cache
TableMetadataCache cache =
new TableMetadataCache(
catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault()));
cache.update(tableIdentifier, table);

// Cache schema
Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema();
assertThat(schema.sameSchema(SCHEMA2)).isTrue();

// Cache schema with fewer fields
TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA);
assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
assertThat(schemaInfo.compareResult())
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);

// Assert both schemas are in cache
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
assertThat(cacheItem).isNotNull();
assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.time.Clock;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -50,13 +51,25 @@ class TableMetadataCache {

private final Catalog catalog;
private final long refreshMs;
private final Clock cacheRefreshClock;
private final int inputSchemasPerTableCacheMaximumSize;
private final Map<TableIdentifier, CacheItem> tableCache;

TableMetadataCache(
Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) {
this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
}

@VisibleForTesting
TableMetadataCache(
Catalog catalog,
int maximumSize,
long refreshMs,
int inputSchemasPerTableCacheMaximumSize,
Clock cacheRefreshClock) {
this.catalog = catalog;
this.refreshMs = refreshMs;
this.cacheRefreshClock = cacheRefreshClock;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
this.tableCache = new LRUCache<>(maximumSize);
}
Expand Down Expand Up @@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) {
tableCache.put(
identifier,
new CacheItem(
cacheRefreshClock.millis(),
true,
table.refs().keySet(),
table.schemas(),
Expand Down Expand Up @@ -186,14 +200,16 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
return EXISTS;
} catch (NoSuchTableException e) {
LOG.debug("Table doesn't exist {}", identifier, e);
tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
tableCache.put(
identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1));
return Tuple2.of(false, e);
}
}

private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
return allowRefresh
&& (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis());
&& (cacheItem == null
|| cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs);
}

public void invalidate(TableIdentifier identifier) {
Expand All @@ -202,20 +218,21 @@ public void invalidate(TableIdentifier identifier) {

/** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */
static class CacheItem {
private final long created = System.currentTimeMillis();

private final long createdTimestampMillis;
private final boolean tableExists;
private final Set<String> branches;
private final Map<Integer, Schema> tableSchemas;
private final Map<Integer, PartitionSpec> specs;
private final Map<Schema, ResolvedSchemaInfo> inputSchemas;

private CacheItem(
long createdTimestampMillis,
boolean tableExists,
Set<String> branches,
Map<Integer, Schema> tableSchemas,
Map<Integer, PartitionSpec> specs,
int inputSchemaCacheMaximumSize) {
this.createdTimestampMillis = createdTimestampMillis;
this.tableExists = tableExists;
this.branches = branches;
this.tableSchemas = tableSchemas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
Expand Down Expand Up @@ -91,4 +95,33 @@ void testCachingDisabled() {

assertThat(cache.getInternalCache()).isEmpty();
}

@Test
void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
// Create table
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
Table table = catalog.createTable(tableIdentifier, SCHEMA2);

// Init cache
TableMetadataCache cache =
new TableMetadataCache(
catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault()));
cache.update(tableIdentifier, table);

// Cache schema
Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema();
assertThat(schema.sameSchema(SCHEMA2)).isTrue();

// Cache schema with fewer fields
TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA);
assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
assertThat(schemaInfo.compareResult())
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);

// Assert both schemas are in cache
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
assertThat(cacheItem).isNotNull();
assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.time.Clock;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -50,13 +51,25 @@ class TableMetadataCache {

private final Catalog catalog;
private final long refreshMs;
private final Clock cacheRefreshClock;
private final int inputSchemasPerTableCacheMaximumSize;
private final Map<TableIdentifier, CacheItem> tableCache;

TableMetadataCache(
Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) {
this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
}

@VisibleForTesting
TableMetadataCache(
Catalog catalog,
int maximumSize,
long refreshMs,
int inputSchemasPerTableCacheMaximumSize,
Clock cacheRefreshClock) {
this.catalog = catalog;
this.refreshMs = refreshMs;
this.cacheRefreshClock = cacheRefreshClock;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
this.tableCache = new LRUCache<>(maximumSize);
}
Expand Down Expand Up @@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) {
tableCache.put(
identifier,
new CacheItem(
cacheRefreshClock.millis(),
true,
table.refs().keySet(),
table.schemas(),
Expand Down Expand Up @@ -186,14 +200,16 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
return EXISTS;
} catch (NoSuchTableException e) {
LOG.debug("Table doesn't exist {}", identifier, e);
tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
tableCache.put(
identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1));
return Tuple2.of(false, e);
}
}

private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
return allowRefresh
&& (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis());
&& (cacheItem == null
|| cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs);
}

public void invalidate(TableIdentifier identifier) {
Expand All @@ -202,20 +218,21 @@ public void invalidate(TableIdentifier identifier) {

/** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */
static class CacheItem {
private final long created = System.currentTimeMillis();

private final long createdTimestampMillis;
private final boolean tableExists;
private final Set<String> branches;
private final Map<Integer, Schema> tableSchemas;
private final Map<Integer, PartitionSpec> specs;
private final Map<Schema, ResolvedSchemaInfo> inputSchemas;

private CacheItem(
long createdTimestampMillis,
boolean tableExists,
Set<String> branches,
Map<Integer, Schema> tableSchemas,
Map<Integer, PartitionSpec> specs,
int inputSchemaCacheMaximumSize) {
this.createdTimestampMillis = createdTimestampMillis;
this.tableExists = tableExists;
this.branches = branches;
this.tableSchemas = tableSchemas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
Expand Down Expand Up @@ -91,4 +95,33 @@ void testCachingDisabled() {

assertThat(cache.getInternalCache()).isEmpty();
}

@Test
void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
// Create table
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
Table table = catalog.createTable(tableIdentifier, SCHEMA2);

// Init cache
TableMetadataCache cache =
new TableMetadataCache(
catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault()));
cache.update(tableIdentifier, table);

// Cache schema
Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema();
assertThat(schema.sameSchema(SCHEMA2)).isTrue();

// Cache schema with fewer fields
TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA);
assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
assertThat(schemaInfo.compareResult())
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);

// Assert both schemas are in cache
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
assertThat(cacheItem).isNotNull();
assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
}
}