diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index 336b2ae03e50..ba2082ea59f6 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -226,6 +226,9 @@ public DataStream build() { if (!context.isStreaming()) { int parallelism = inferParallelism(format, context); + if (env.getMaxParallelism() > 0) { + parallelism = Math.min(parallelism, env.getMaxParallelism()); + } return env.createInput(format, typeInfo).setParallelism(parallelism); } else { StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java index 0d9e7bff8977..fd570ff12445 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; @@ -179,6 +180,31 @@ public void testInferedParallelism() throws IOException { Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism); } + @Test + public void testInferParallelismWithGlobalSetting() throws IOException { + Configuration cfg = tEnv.getConfig().getConfiguration(); + cfg.set(PipelineOptions.MAX_PARALLELISM, 1); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, null); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + List expectedRecords = Lists.newArrayList(); + long maxFileLen = 0; + for (int i = 0; i < 5; i++) { + List records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i); + DataFile dataFile = helper.writeFile(null, records); + helper.appendToTable(dataFile); + expectedRecords.addAll(records); + maxFileLen = Math.max(dataFile.fileSizeInBytes(), maxFileLen); + } + + // Make sure to generate multiple CombinedScanTasks + sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen); + + List results = run(null, Maps.newHashMap(), "", "*"); + org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA); + } + @Test public void testExposeLocality() throws Exception { Table table =