diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 85a5a4abf29c..2c08a3486e7c 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.time.Clock; import java.util.Map; import java.util.Set; import org.apache.flink.annotation.Internal; @@ -50,13 +51,25 @@ class TableMetadataCache { private final Catalog catalog; private final long refreshMs; + private final Clock cacheRefreshClock; private final int inputSchemasPerTableCacheMaximumSize; private final Map tableCache; TableMetadataCache( Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { + this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC()); + } + + @VisibleForTesting + TableMetadataCache( + Catalog catalog, + int maximumSize, + long refreshMs, + int inputSchemasPerTableCacheMaximumSize, + Clock cacheRefreshClock) { this.catalog = catalog; this.refreshMs = refreshMs; + this.cacheRefreshClock = cacheRefreshClock; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; this.tableCache = new LRUCache<>(maximumSize); } @@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) { tableCache.put( identifier, new CacheItem( + cacheRefreshClock.millis(), true, table.refs().keySet(), table.schemas(), @@ -186,14 +200,16 @@ private Tuple2 refreshTable(TableIdentifier identifier) { return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); - tableCache.put(identifier, new CacheItem(false, null, null, null, 1)); + tableCache.put( + identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1)); return Tuple2.of(false, e); } } private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) { return allowRefresh - && (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis()); + && (cacheItem == null + || cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs); } public void invalidate(TableIdentifier identifier) { @@ -202,8 +218,7 @@ public void invalidate(TableIdentifier identifier) { /** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */ static class CacheItem { - private final long created = System.currentTimeMillis(); - + private final long createdTimestampMillis; private final boolean tableExists; private final Set branches; private final Map tableSchemas; @@ -211,11 +226,13 @@ static class CacheItem { private final Map inputSchemas; private CacheItem( + long createdTimestampMillis, boolean tableExists, Set branches, Map tableSchemas, Map specs, int inputSchemaCacheMaximumSize) { + this.createdTimestampMillis = createdTimestampMillis; this.tableExists = tableExists; this.branches = branches; this.tableSchemas = tableSchemas; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index 2264cc3a8db0..ff039bc998b5 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -20,9 +20,13 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import org.apache.commons.lang3.SerializationUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; @@ -91,4 +95,33 @@ void testCachingDisabled() { assertThat(cache.getInternalCache()).isEmpty(); } + + @Test + void testNoCacheRefreshingBeforeRefreshIntervalElapses() { + // Create table + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + Table table = catalog.createTable(tableIdentifier, SCHEMA2); + + // Init cache + TableMetadataCache cache = + new TableMetadataCache( + catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault())); + cache.update(tableIdentifier, table); + + // Cache schema + Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); + assertThat(schema.sameSchema(SCHEMA2)).isTrue(); + + // Cache schema with fewer fields + TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA); + assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue(); + assertThat(schemaInfo.compareResult()) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + + // Assert both schemas are in cache + TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier); + assertThat(cacheItem).isNotNull(); + assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2); + } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 85a5a4abf29c..2c08a3486e7c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.time.Clock; import java.util.Map; import java.util.Set; import org.apache.flink.annotation.Internal; @@ -50,13 +51,25 @@ class TableMetadataCache { private final Catalog catalog; private final long refreshMs; + private final Clock cacheRefreshClock; private final int inputSchemasPerTableCacheMaximumSize; private final Map tableCache; TableMetadataCache( Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { + this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC()); + } + + @VisibleForTesting + TableMetadataCache( + Catalog catalog, + int maximumSize, + long refreshMs, + int inputSchemasPerTableCacheMaximumSize, + Clock cacheRefreshClock) { this.catalog = catalog; this.refreshMs = refreshMs; + this.cacheRefreshClock = cacheRefreshClock; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; this.tableCache = new LRUCache<>(maximumSize); } @@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) { tableCache.put( identifier, new CacheItem( + cacheRefreshClock.millis(), true, table.refs().keySet(), table.schemas(), @@ -186,14 +200,16 @@ private Tuple2 refreshTable(TableIdentifier identifier) { return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); - tableCache.put(identifier, new CacheItem(false, null, null, null, 1)); + tableCache.put( + identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1)); return Tuple2.of(false, e); } } private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) { return allowRefresh - && (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis()); + && (cacheItem == null + || cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs); } public void invalidate(TableIdentifier identifier) { @@ -202,8 +218,7 @@ public void invalidate(TableIdentifier identifier) { /** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */ static class CacheItem { - private final long created = System.currentTimeMillis(); - + private final long createdTimestampMillis; private final boolean tableExists; private final Set branches; private final Map tableSchemas; @@ -211,11 +226,13 @@ static class CacheItem { private final Map inputSchemas; private CacheItem( + long createdTimestampMillis, boolean tableExists, Set branches, Map tableSchemas, Map specs, int inputSchemaCacheMaximumSize) { + this.createdTimestampMillis = createdTimestampMillis; this.tableExists = tableExists; this.branches = branches; this.tableSchemas = tableSchemas; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index 2264cc3a8db0..ff039bc998b5 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -20,9 +20,13 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import org.apache.commons.lang3.SerializationUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; @@ -91,4 +95,33 @@ void testCachingDisabled() { assertThat(cache.getInternalCache()).isEmpty(); } + + @Test + void testNoCacheRefreshingBeforeRefreshIntervalElapses() { + // Create table + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + Table table = catalog.createTable(tableIdentifier, SCHEMA2); + + // Init cache + TableMetadataCache cache = + new TableMetadataCache( + catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault())); + cache.update(tableIdentifier, table); + + // Cache schema + Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); + assertThat(schema.sameSchema(SCHEMA2)).isTrue(); + + // Cache schema with fewer fields + TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA); + assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue(); + assertThat(schemaInfo.compareResult()) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + + // Assert both schemas are in cache + TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier); + assertThat(cacheItem).isNotNull(); + assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2); + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 85a5a4abf29c..2c08a3486e7c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.time.Clock; import java.util.Map; import java.util.Set; import org.apache.flink.annotation.Internal; @@ -50,13 +51,25 @@ class TableMetadataCache { private final Catalog catalog; private final long refreshMs; + private final Clock cacheRefreshClock; private final int inputSchemasPerTableCacheMaximumSize; private final Map tableCache; TableMetadataCache( Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { + this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC()); + } + + @VisibleForTesting + TableMetadataCache( + Catalog catalog, + int maximumSize, + long refreshMs, + int inputSchemasPerTableCacheMaximumSize, + Clock cacheRefreshClock) { this.catalog = catalog; this.refreshMs = refreshMs; + this.cacheRefreshClock = cacheRefreshClock; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; this.tableCache = new LRUCache<>(maximumSize); } @@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) { tableCache.put( identifier, new CacheItem( + cacheRefreshClock.millis(), true, table.refs().keySet(), table.schemas(), @@ -186,14 +200,16 @@ private Tuple2 refreshTable(TableIdentifier identifier) { return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); - tableCache.put(identifier, new CacheItem(false, null, null, null, 1)); + tableCache.put( + identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1)); return Tuple2.of(false, e); } } private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) { return allowRefresh - && (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis()); + && (cacheItem == null + || cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs); } public void invalidate(TableIdentifier identifier) { @@ -202,8 +218,7 @@ public void invalidate(TableIdentifier identifier) { /** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */ static class CacheItem { - private final long created = System.currentTimeMillis(); - + private final long createdTimestampMillis; private final boolean tableExists; private final Set branches; private final Map tableSchemas; @@ -211,11 +226,13 @@ static class CacheItem { private final Map inputSchemas; private CacheItem( + long createdTimestampMillis, boolean tableExists, Set branches, Map tableSchemas, Map specs, int inputSchemaCacheMaximumSize) { + this.createdTimestampMillis = createdTimestampMillis; this.tableExists = tableExists; this.branches = branches; this.tableSchemas = tableSchemas; diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index 2264cc3a8db0..ff039bc998b5 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -20,9 +20,13 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import org.apache.commons.lang3.SerializationUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; @@ -91,4 +95,33 @@ void testCachingDisabled() { assertThat(cache.getInternalCache()).isEmpty(); } + + @Test + void testNoCacheRefreshingBeforeRefreshIntervalElapses() { + // Create table + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + Table table = catalog.createTable(tableIdentifier, SCHEMA2); + + // Init cache + TableMetadataCache cache = + new TableMetadataCache( + catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault())); + cache.update(tableIdentifier, table); + + // Cache schema + Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); + assertThat(schema.sameSchema(SCHEMA2)).isTrue(); + + // Cache schema with fewer fields + TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA); + assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue(); + assertThat(schemaInfo.compareResult()) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + + // Assert both schemas are in cache + TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier); + assertThat(cacheItem).isNotNull(); + assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2); + } }