Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flink/v1.16/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
exclude group: 'org.slf4j'
}

testImplementation libs.flink116.connector.test.utils
Expand Down Expand Up @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

testImplementation libs.awaitility
Expand Down Expand Up @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -78,25 +78,23 @@
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
private static final int PARALLELISM = 4;
private static final String SOURCE_NAME = "IcebergSource";
private static final int RECORD_NUM_FOR_2_SPLITS = 200;
private static final ConcurrentMap<Long, Integer> windows = Maps.newConcurrentMap();

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setConfiguration(
reporter.addToConfiguration(
// disable classloader check as Avro may cache class in the serializers.
new Configuration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false)))
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.withHaLeadershipControl()
.build());

Expand Down Expand Up @@ -273,23 +271,40 @@ public void apply(
public void testThrottling() throws Exception {
GenericAppenderHelper dataAppender = appender();

// Generate records with the following pattern:
// - File 1 - Later records (Watermark 6000000)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
// - File 2 - First records (Watermark 0)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch =
// Generate records in advance

// File 1 - Later records (Watermark 6.000.000 - 100 min)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
List<Record> batch1 =
ImmutableList.of(
generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103"));
dataAppender.appendToTable(batch);

batch = Lists.newArrayListWithCapacity(100);
// File 2 - First records (Watermark 0 - 0 min)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch2 = Lists.newArrayListWithCapacity(100);
for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
batch2.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
}

dataAppender.appendToTable(batch);
// File 3 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch3 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));

// File 4 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch4 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));

// File 5 - Records which will remove the block (Watermark 5.400.000 - 90 min)
List<Record> batch5 =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Expand All @@ -304,6 +319,11 @@ public void testThrottling() throws Exception {

try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
CommonTestUtils.waitForAllTaskRunning(
miniClusterResource.getMiniCluster(), jobClient.getJobID(), false);

// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
Expand All @@ -325,18 +345,7 @@ public void testThrottling() throws Exception {

// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
List<Record> newBatch1 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));
List<Record> newBatch2 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));
dataAppender.appendToTable(
dataAppender.writeFile(newBatch1), dataAppender.writeFile(newBatch2));
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
Expand All @@ -352,10 +361,7 @@ public void testThrottling() throws Exception {
.until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));

// Add some new records which should unblock the throttled reader
batch =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));
dataAppender.appendToTable(batch);
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

Expand Down
3 changes: 3 additions & 0 deletions flink/v1.17/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
exclude group: 'org.slf4j'
}

testImplementation libs.flink117.connector.test.utils
Expand Down Expand Up @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

testImplementation libs.awaitility
Expand Down Expand Up @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -77,22 +79,25 @@
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
private static final int PARALLELISM = 4;
private static final String SOURCE_NAME = "IcebergSource";
private static final int RECORD_NUM_FOR_2_SPLITS = 200;
private static final ConcurrentMap<Long, Integer> windows = Maps.newConcurrentMap();

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.setConfiguration(
reporter.addToConfiguration(
new Configuration().set(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS, true)))
.withHaLeadershipControl()
.build());

Expand Down Expand Up @@ -269,23 +274,40 @@ public void apply(
public void testThrottling() throws Exception {
GenericAppenderHelper dataAppender = appender();

// Generate records with the following pattern:
// - File 1 - Later records (Watermark 6000000)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
// - File 2 - First records (Watermark 0)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch =
// Generate records in advance

// File 1 - Later records (Watermark 6.000.000 - 100 min)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
List<Record> batch1 =
ImmutableList.of(
generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103"));
dataAppender.appendToTable(batch);

batch = Lists.newArrayListWithCapacity(100);
// File 2 - First records (Watermark 0 - 0 min)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch2 = Lists.newArrayListWithCapacity(100);
for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
batch2.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
}

dataAppender.appendToTable(batch);
// File 3 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch3 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));

// File 4 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch4 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));

// File 5 - Records which will remove the block (Watermark 5.400.000 - 90 min)
List<Record> batch5 =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Expand All @@ -300,6 +322,11 @@ public void testThrottling() throws Exception {

try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
CommonTestUtils.waitForAllTaskRunning(
miniClusterResource.getMiniCluster(), jobClient.getJobID(), false);

// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
Expand All @@ -321,18 +348,7 @@ public void testThrottling() throws Exception {

// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
List<Record> newBatch1 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));
List<Record> newBatch2 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));
dataAppender.appendToTable(
dataAppender.writeFile(newBatch1), dataAppender.writeFile(newBatch2));
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
Expand All @@ -348,10 +364,7 @@ public void testThrottling() throws Exception {
.until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));

// Add some new records which should unblock the throttled reader
batch =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));
dataAppender.appendToTable(batch);
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

Expand Down
3 changes: 3 additions & 0 deletions flink/v1.18/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
exclude group: 'org.slf4j'
}

testImplementation libs.flink118.connector.test.utils
Expand Down Expand Up @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

testImplementation libs.awaitility
Expand Down Expand Up @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") {
Expand Down
Loading