diff --git a/api/src/test/java/org/apache/iceberg/AssertHelpers.java b/api/src/test/java/org/apache/iceberg/AssertHelpers.java index 4c4daedc580d..d63c5d3d1133 100644 --- a/api/src/test/java/org/apache/iceberg/AssertHelpers.java +++ b/api/src/test/java/org/apache/iceberg/AssertHelpers.java @@ -155,4 +155,18 @@ public static void assertEmptyAvroField(GenericRecord record, String field) { AvroRuntimeException.class, () -> record.get(field)); } + + /** + * Same as {@link AssertHelpers#assertThrowsCause}, but this method compares root cause. + */ + public static void assertThrowsRootCause(String message, + Class expected, + String containedInMessage, + Runnable runnable) { + Assertions.assertThatThrownBy(runnable::run) + .as(message) + .getRootCause() + .isInstanceOf(expected) + .hasMessageContaining(containedInMessage); + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index 7ab23c53a4b7..509121fa9c71 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -28,7 +28,6 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.util.HadoopUtils; import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.descriptors.CatalogDescriptorValidator; import org.apache.flink.table.factories.CatalogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -70,6 +69,9 @@ public class FlinkCatalogFactory implements CatalogFactory { public static final String BASE_NAMESPACE = "base-namespace"; public static final String CACHE_ENABLED = "cache-enabled"; + public static final String TYPE = "type"; + public static final String PROPERTY_VERSION = "property-version"; + /** * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter. * @@ -104,8 +106,8 @@ static CatalogLoader createCatalogLoader(String name, Map proper @Override public Map requiredContext() { Map context = Maps.newHashMap(); - context.put(CatalogDescriptorValidator.CATALOG_TYPE, "iceberg"); - context.put(CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION, "1"); + context.put(TYPE, "iceberg"); + context.put(PROPERTY_VERSION, "1"); return context; } diff --git a/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index be65d974fd5e..29a9955a7e20 100644 --- a/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.iceberg.flink.FlinkDynamicTableFactory \ No newline at end of file +org.apache.iceberg.flink.FlinkDynamicTableFactory diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 20e8721eb5de..7b552fa9ecb3 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -273,7 +273,7 @@ public void testDowngradeTableToFormatV1ThroughTablePropertyFails() throws Excep Assert.assertEquals("should create table using format v2", 2, ops.refresh().formatVersion()); - AssertHelpers.assertThrowsCause("should fail to downgrade to v1", + AssertHelpers.assertThrowsRootCause("should fail to downgrade to v1", IllegalArgumentException.class, "Cannot downgrade v2 table to v1", () -> sql("ALTER TABLE tl SET('format-version'='1')")); diff --git a/versions.props b/versions.props index a488fd1ad503..cbfc92082d88 100644 --- a/versions.props +++ b/versions.props @@ -1,7 +1,7 @@ org.slf4j:* = 1.7.25 org.apache.avro:avro = 1.10.1 org.apache.calcite:* = 1.10.0 -org.apache.flink:* = 1.12.1 +org.apache.flink:* = 1.13.2 org.apache.hadoop:* = 2.7.3 org.apache.hive:hive-metastore = 2.3.8 org.apache.hive:hive-serde = 2.3.8