diff --git a/flink/v1.15/build.gradle b/flink/v1.15/build.gradle index 46572696f90b..5d04cde8b6fe 100644 --- a/flink/v1.15/build.gradle +++ b/flink/v1.15/build.gradle @@ -61,6 +61,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { exclude group: 'org.apache.hive', module: 'hive-storage-api' } + testImplementation "org.apache.flink:flink-connector-test-utils:${flinkVersion}" testImplementation "org.apache.flink:flink-core:${flinkVersion}" testImplementation "org.apache.flink:flink-runtime:${flinkVersion}" testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") { diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java new file mode 100644 index 000000000000..98c5fa3eded5 --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.iceberg.flink.source.assigner.GetSplitResult; +import org.apache.iceberg.flink.source.assigner.SplitAssigner; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.SplitRequestEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO: publish enumerator monitor metrics like number of pending metrics after FLINK-21000 is resolved + */ +abstract class AbstractIcebergEnumerator implements SplitEnumerator { + private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final SplitAssigner assigner; + private final Map readersAwaitingSplit; + private final AtomicReference> availableFuture; + + AbstractIcebergEnumerator( + SplitEnumeratorContext enumeratorContext, + SplitAssigner assigner) { + this.enumeratorContext = enumeratorContext; + this.assigner = assigner; + this.readersAwaitingSplit = new LinkedHashMap<>(); + this.availableFuture = new AtomicReference<>(); + } + + @Override + public void start() { + assigner.start(); + } + + @Override + public void close() throws IOException { + assigner.close(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + // Iceberg source uses custom split request event to piggyback finished split ids. + throw new UnsupportedOperationException(String.format("Received invalid default split request event " + + "from subtask %d as Iceberg source uses custom split request event", subtaskId)); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitRequestEvent) { + SplitRequestEvent splitRequestEvent = + (SplitRequestEvent) sourceEvent; + LOG.info("Received request split event from subtask {}", subtaskId); + assigner.onCompletedSplits(splitRequestEvent.finishedSplitIds()); + readersAwaitingSplit.put(subtaskId, splitRequestEvent.requesterHostname()); + assignSplits(); + } else { + throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s", + subtaskId, sourceEvent.getClass().getCanonicalName())); + } + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + LOG.info("Add {} splits back to the pool for failed subtask {}", + splits.size(), subtaskId); + assigner.onUnassignedSplits(splits); + assignSplits(); + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Added reader: {}", subtaskId); + } + + private void assignSplits() { + LOG.info("Assigning splits for {} awaiting readers", readersAwaitingSplit.size()); + Iterator> awaitingReader = + readersAwaitingSplit.entrySet().iterator(); + while (awaitingReader.hasNext()) { + Map.Entry nextAwaiting = awaitingReader.next(); + // if the reader that requested another split has failed in the meantime, remove + // it from the list of waiting readers + if (!enumeratorContext.registeredReaders().containsKey(nextAwaiting.getKey())) { + awaitingReader.remove(); + continue; + } + + int awaitingSubtask = nextAwaiting.getKey(); + String hostname = nextAwaiting.getValue(); + GetSplitResult getResult = assigner.getNext(hostname); + if (getResult.status() == GetSplitResult.Status.AVAILABLE) { + LOG.info("Assign split to subtask {}: {}", awaitingSubtask, getResult.split()); + enumeratorContext.assignSplit(getResult.split(), awaitingSubtask); + awaitingReader.remove(); + } else if (getResult.status() == GetSplitResult.Status.CONSTRAINED) { + getAvailableFutureIfNeeded(); + break; + } else if (getResult.status() == GetSplitResult.Status.UNAVAILABLE) { + if (shouldWaitForMoreSplits()) { + getAvailableFutureIfNeeded(); + break; + } else { + LOG.info("No more splits available for subtask {}", awaitingSubtask); + enumeratorContext.signalNoMoreSplits(awaitingSubtask); + awaitingReader.remove(); + } + } else { + throw new IllegalArgumentException("Unsupported status: " + getResult.status()); + } + } + } + + /** + * return true if enumerator should wait for splits + * like in the continuous enumerator case + */ + protected abstract boolean shouldWaitForMoreSplits(); + + private synchronized void getAvailableFutureIfNeeded() { + if (availableFuture.get() != null) { + return; + } + CompletableFuture future = assigner.isAvailable() + .thenAccept(ignore -> + // Must run assignSplits in coordinator thread + // because the future may be completed from other threads. + // E.g., in event time alignment assigner, + // watermark advancement from another source may + // cause the available future to be completed + enumeratorContext.runInCoordinatorThread(() -> { + LOG.debug("Executing callback of assignSplits"); + availableFuture.set(null); + assignSplits(); + })); + availableFuture.set(future); + LOG.debug("Registered callback for future available splits"); + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java new file mode 100644 index 000000000000..e2b94b8c3e2b --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.assigner.SplitAssigner; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousIcebergEnumerator.class); + + private final SplitEnumeratorContext enumeratorContext; + private final SplitAssigner assigner; + private final ScanContext scanContext; + private final ContinuousSplitPlanner splitPlanner; + + /** + * snapshotId for the last enumerated snapshot. next incremental enumeration + * should be based off this as the starting position. + */ + private final AtomicReference enumeratorPosition; + + public ContinuousIcebergEnumerator( + SplitEnumeratorContext enumeratorContext, + SplitAssigner assigner, + ScanContext scanContext, + ContinuousSplitPlanner splitPlanner, + @Nullable IcebergEnumeratorState enumState) { + super(enumeratorContext, assigner); + + this.enumeratorContext = enumeratorContext; + this.assigner = assigner; + this.scanContext = scanContext; + this.splitPlanner = splitPlanner; + this.enumeratorPosition = new AtomicReference<>(); + if (enumState != null) { + this.enumeratorPosition.set(enumState.lastEnumeratedPosition()); + } + } + + @Override + public void start() { + super.start(); + enumeratorContext.callAsync( + this::discoverSplits, + this::processDiscoveredSplits, + 0L, + scanContext.monitorInterval().toMillis()); + } + + @Override + public void close() throws IOException { + splitPlanner.close(); + super.close(); + } + + @Override + protected boolean shouldWaitForMoreSplits() { + return true; + } + + @Override + public IcebergEnumeratorState snapshotState(long checkpointId) { + return new IcebergEnumeratorState(enumeratorPosition.get(), assigner.state()); + } + + /** + * This method is executed in an IO thread pool. + */ + private ContinuousEnumerationResult discoverSplits() { + return splitPlanner.planSplits(enumeratorPosition.get()); + } + + /** + * This method is executed in a single coordinator thread. + */ + private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) { + if (error == null) { + if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) { + // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O thread pool. + // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the thread + // pool is busy and multiple discovery actions are executed concurrently. Discovery result should + // only be accepted if the starting position matches the enumerator position (like compare-and-swap). + LOG.info("Skip {} discovered splits because the scan starting position doesn't match " + + "the current enumerator position: enumerator position = {}, scan starting position = {}", + result.splits().size(), enumeratorPosition.get(), result.fromPosition()); + } else { + assigner.onDiscoveredSplits(result.splits()); + LOG.info("Added {} splits discovered between ({}, {}] to the assigner", + result.splits().size(), result.fromPosition(), result.toPosition()); + // update the enumerator position even if there is no split discovered + // or the toPosition is empty (e.g. for empty table). + enumeratorPosition.set(result.toPosition()); + LOG.info("Update enumerator position to {}", result.toPosition()); + } + } else { + LOG.error("Failed to discover new splits", error); + } + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java new file mode 100644 index 000000000000..83b230e80e08 --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.IOException; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +class IcebergEnumeratorPositionSerializer implements SimpleVersionedSerializer { + + public static final IcebergEnumeratorPositionSerializer INSTANCE = new IcebergEnumeratorPositionSerializer(); + + private static final int VERSION = 1; + + private static final ThreadLocal SERIALIZER_CACHE = + ThreadLocal.withInitial(() -> new DataOutputSerializer(128)); + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(IcebergEnumeratorPosition position) throws IOException { + return serializeV1(position); + } + + @Override + public IcebergEnumeratorPosition deserialize(int version, byte[] serialized) throws IOException { + switch (version) { + case 1: + return deserializeV1(serialized); + default: + throw new IOException("Unknown version: " + version); + } + } + + private byte[] serializeV1(IcebergEnumeratorPosition position) throws IOException { + DataOutputSerializer out = SERIALIZER_CACHE.get(); + out.writeBoolean(position.snapshotId() != null); + if (position.snapshotId() != null) { + out.writeLong(position.snapshotId()); + } + out.writeBoolean(position.snapshotTimestampMs() != null); + if (position.snapshotTimestampMs() != null) { + out.writeLong(position.snapshotTimestampMs()); + } + byte[] result = out.getCopyOfBuffer(); + out.clear(); + return result; + } + + private IcebergEnumeratorPosition deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer in = new DataInputDeserializer(serialized); + Long snapshotId = null; + if (in.readBoolean()) { + snapshotId = in.readLong(); + } + + Long snapshotTimestampMs = null; + if (in.readBoolean()) { + snapshotTimestampMs = in.readLong(); + } + + if (snapshotId != null) { + return IcebergEnumeratorPosition.of(snapshotId, snapshotTimestampMs); + } else { + return IcebergEnumeratorPosition.empty(); + } + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java new file mode 100644 index 000000000000..bd2f44c0059b --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.Serializable; +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; + +/** + * Enumerator state for checkpointing + */ +public class IcebergEnumeratorState implements Serializable { + @Nullable + private final IcebergEnumeratorPosition lastEnumeratedPosition; + private final Collection pendingSplits; + + public IcebergEnumeratorState(Collection pendingSplits) { + this(null, pendingSplits); + } + + public IcebergEnumeratorState( + @Nullable IcebergEnumeratorPosition lastEnumeratedPosition, + Collection pendingSplits) { + this.lastEnumeratedPosition = lastEnumeratedPosition; + this.pendingSplits = pendingSplits; + } + + @Nullable + public IcebergEnumeratorPosition lastEnumeratedPosition() { + return lastEnumeratedPosition; + } + + public Collection pendingSplits() { + return pendingSplits; + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java new file mode 100644 index 000000000000..8f020bbe539e --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.IOException; +import java.util.Collection; +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +@Internal +public class IcebergEnumeratorStateSerializer implements SimpleVersionedSerializer { + + public static final IcebergEnumeratorStateSerializer INSTANCE = new IcebergEnumeratorStateSerializer(); + + private static final int VERSION = 1; + + private static final ThreadLocal SERIALIZER_CACHE = + ThreadLocal.withInitial(() -> new DataOutputSerializer(1024)); + + private final IcebergEnumeratorPositionSerializer positionSerializer = IcebergEnumeratorPositionSerializer.INSTANCE; + private final IcebergSourceSplitSerializer splitSerializer = IcebergSourceSplitSerializer.INSTANCE; + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(IcebergEnumeratorState enumState) throws IOException { + return serializeV1(enumState); + } + + @Override + public IcebergEnumeratorState deserialize(int version, byte[] serialized) throws IOException { + switch (version) { + case 1: + return deserializeV1(serialized); + default: + throw new IOException("Unknown version: " + version); + } + } + + private byte[] serializeV1(IcebergEnumeratorState enumState) throws IOException { + DataOutputSerializer out = SERIALIZER_CACHE.get(); + + out.writeBoolean(enumState.lastEnumeratedPosition() != null); + if (enumState.lastEnumeratedPosition() != null) { + out.writeInt(positionSerializer.getVersion()); + byte[] positionBytes = positionSerializer.serialize(enumState.lastEnumeratedPosition()); + out.writeInt(positionBytes.length); + out.write(positionBytes); + } + + out.writeInt(splitSerializer.getVersion()); + out.writeInt(enumState.pendingSplits().size()); + for (IcebergSourceSplitState splitState : enumState.pendingSplits()) { + byte[] splitBytes = splitSerializer.serialize(splitState.split()); + out.writeInt(splitBytes.length); + out.write(splitBytes); + out.writeUTF(splitState.status().name()); + } + + byte[] result = out.getCopyOfBuffer(); + out.clear(); + return result; + } + + private IcebergEnumeratorState deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer in = new DataInputDeserializer(serialized); + + IcebergEnumeratorPosition enumeratorPosition = null; + if (in.readBoolean()) { + int version = in.readInt(); + byte[] positionBytes = new byte[in.readInt()]; + in.read(positionBytes); + enumeratorPosition = positionSerializer.deserialize(version, positionBytes); + } + + int splitSerializerVersion = in.readInt(); + int splitCount = in.readInt(); + Collection pendingSplits = Lists.newArrayListWithCapacity(splitCount); + for (int i = 0; i < splitCount; ++i) { + byte[] splitBytes = new byte[in.readInt()]; + in.read(splitBytes); + IcebergSourceSplit split = splitSerializer.deserialize(splitSerializerVersion, splitBytes); + String statusName = in.readUTF(); + pendingSplits.add(new IcebergSourceSplitState(split, IcebergSourceSplitStatus.valueOf(statusName))); + } + return new IcebergEnumeratorState(enumeratorPosition, pendingSplits); + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java new file mode 100644 index 000000000000..0f287864be73 --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.source.FlinkSplitPlanner; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.assigner.SplitAssigner; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * One-time split enumeration at the start-up + */ +@Internal +public class StaticIcebergEnumerator extends AbstractIcebergEnumerator { + private static final Logger LOG = LoggerFactory.getLogger(StaticIcebergEnumerator.class); + + private final SplitAssigner assigner; + private final Table table; + private final ScanContext scanContext; + private final boolean shouldEnumerate; + + public StaticIcebergEnumerator( + SplitEnumeratorContext enumeratorContext, + SplitAssigner assigner, + Table table, + ScanContext scanContext, + @Nullable IcebergEnumeratorState enumState) { + super(enumeratorContext, assigner); + this.assigner = assigner; + this.table = table; + this.scanContext = scanContext; + // split enumeration is not needed during restore scenario + this.shouldEnumerate = enumState == null; + } + + @Override + public void start() { + super.start(); + if (shouldEnumerate) { + // Ideally, operatorId should be used as the threadPoolName as Flink guarantees its uniqueness within a job. + // SplitEnumeratorContext doesn't expose the OperatorCoordinator.Context, which would contain the OperatorID. + // Need to discuss with Flink community whether it is ok to expose a public API like the protected method + // "OperatorCoordinator.Context getCoordinatorContext()" from SourceCoordinatorContext implementation. + // For now, - is used as the unique thread pool name. + String threadName = "iceberg-plan-worker-pool-" + table.name() + "-" + UUID.randomUUID(); + ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); + try { + List splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool); + assigner.onDiscoveredSplits(splits); + LOG.info("Discovered {} splits from table {} during job initialization", + splits.size(), table.name()); + } finally { + workerPool.shutdown(); + } + } + } + + @Override + protected boolean shouldWaitForMoreSplits() { + return false; + } + + @Override + public IcebergEnumeratorState snapshotState(long checkpointId) { + return new IcebergEnumeratorState(null, assigner.state()); + } +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java new file mode 100644 index 000000000000..5e7f926e3aee --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.SplitHelpers; +import org.apache.iceberg.flink.source.StreamingStartingStrategy; +import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.flink.source.split.SplitRequestEvent; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestContinuousIcebergEnumerator { + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Test + public void testDiscoverSplitWhenNoReaderRegistered() throws Exception { + ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(); + TestingSplitEnumeratorContext enumeratorContext = + new TestingSplitEnumeratorContext<>(4); + ScanContext scanContext = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner); + + Collection pendingSplitsEmpty = enumerator.snapshotState(1).pendingSplits(); + Assert.assertEquals(0, pendingSplitsEmpty.size()); + + // make one split available and trigger the periodic discovery + List splits = SplitHelpers + .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1); + splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L)); + enumeratorContext.triggerAllActions(); + + Collection pendingSplits = enumerator.snapshotState(2).pendingSplits(); + Assert.assertEquals(1, pendingSplits.size()); + IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next(); + Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId()); + Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status()); + } + + @Test + public void testDiscoverWhenReaderRegistered() throws Exception { + ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(); + TestingSplitEnumeratorContext enumeratorContext = + new TestingSplitEnumeratorContext<>(4); + ScanContext scanContext = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner); + + // register one reader, and let it request a split + enumeratorContext.registerReader(2, "localhost"); + enumerator.addReader(2); + enumerator.handleSourceEvent(2, + new SplitRequestEvent()); + + // make one split available and trigger the periodic discovery + List splits = SplitHelpers + .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1); + splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L)); + enumeratorContext.triggerAllActions(); + + Assert.assertTrue(enumerator.snapshotState(1).pendingSplits().isEmpty()); + MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(), + CoreMatchers.hasItem(splits.get(0))); + } + + @Test + public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception { + ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(); + TestingSplitEnumeratorContext enumeratorContext = + new TestingSplitEnumeratorContext<>(4); + ScanContext config = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, config, splitPlanner); + + // register one reader, and let it request a split + enumeratorContext.registerReader(2, "localhost"); + enumerator.addReader(2); + enumerator.handleSourceEvent(2, + new SplitRequestEvent()); + + // remove the reader (like in a failure) + enumeratorContext.registeredReaders().remove(2); + + // make one split available and trigger the periodic discovery + List splits = SplitHelpers + .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1); + Assert.assertEquals(1, splits.size()); + splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L)); + enumeratorContext.triggerAllActions(); + + Assert.assertFalse(enumeratorContext.getSplitAssignments().containsKey(2)); + List pendingSplitIds = enumerator.snapshotState(1).pendingSplits().stream() + .map(IcebergSourceSplitState::split) + .map(IcebergSourceSplit::splitId) + .collect(Collectors.toList()); + Assert.assertEquals(splits.size(), pendingSplitIds.size()); + Assert.assertEquals(splits.get(0).splitId(), pendingSplitIds.get(0)); + + // register the reader again, and let it request a split + enumeratorContext.registerReader(2, "localhost"); + enumerator.addReader(2); + enumerator.handleSourceEvent(2, + new SplitRequestEvent()); + + Assert.assertTrue(enumerator.snapshotState(2).pendingSplits().isEmpty()); + MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(), + CoreMatchers.hasItem(splits.get(0))); + } + + private static ContinuousIcebergEnumerator createEnumerator( + SplitEnumeratorContext context, + ScanContext scanContext, + ContinuousSplitPlanner splitPlanner) { + + ContinuousIcebergEnumerator enumerator = + new ContinuousIcebergEnumerator( + context, + new SimpleSplitAssigner(Collections.emptyList()), + scanContext, + splitPlanner, + null); + enumerator.start(); + return enumerator; + } + +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java new file mode 100644 index 000000000000..05abe7bc1792 --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.flink.source.SplitHelpers; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergEnumeratorStateSerializer { + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private final IcebergEnumeratorStateSerializer serializer = IcebergEnumeratorStateSerializer.INSTANCE; + + @Test + public void testEmptySnapshotIdAndPendingSplits() throws Exception { + IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(Collections.emptyList()); + byte[] result = serializer.serialize(enumeratorState); + IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result); + assertEnumeratorStateEquals(enumeratorState, deserialized); + } + + @Test + public void testSomeSnapshotIdAndEmptyPendingSplits() throws Exception { + IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(1L, System.currentTimeMillis()); + IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, Collections.emptyList()); + byte[] result = serializer.serialize(enumeratorState); + IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result); + assertEnumeratorStateEquals(enumeratorState, deserialized); + } + + @Test + public void testSomeSnapshotIdAndPendingSplits() throws Exception { + IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(2L, System.currentTimeMillis()); + List splits = SplitHelpers + .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 3, 1); + Collection pendingSplits = Lists.newArrayList(); + pendingSplits.add(new IcebergSourceSplitState(splits.get(0), IcebergSourceSplitStatus.UNASSIGNED)); + pendingSplits.add(new IcebergSourceSplitState(splits.get(1), IcebergSourceSplitStatus.ASSIGNED)); + pendingSplits.add(new IcebergSourceSplitState(splits.get(2), IcebergSourceSplitStatus.COMPLETED)); + + IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, pendingSplits); + byte[] result = serializer.serialize(enumeratorState); + IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result); + assertEnumeratorStateEquals(enumeratorState, deserialized); + } + + private void assertEnumeratorStateEquals(IcebergEnumeratorState expected, IcebergEnumeratorState actual) { + Assert.assertEquals(expected.lastEnumeratedPosition(), actual.lastEnumeratedPosition()); + Assert.assertEquals(expected.pendingSplits().size(), actual.pendingSplits().size()); + Iterator expectedIterator = expected.pendingSplits().iterator(); + Iterator actualIterator = actual.pendingSplits().iterator(); + for (int i = 0; i < expected.pendingSplits().size(); ++i) { + IcebergSourceSplitState expectedSplitState = expectedIterator.next(); + IcebergSourceSplitState actualSplitState = actualIterator.next(); + Assert.assertEquals(expectedSplitState.split().splitId(), actualSplitState.split().splitId()); + Assert.assertEquals(expectedSplitState.split().fileOffset(), actualSplitState.split().fileOffset()); + Assert.assertEquals(expectedSplitState.split().recordOffset(), actualSplitState.split().recordOffset()); + Assert.assertEquals(expectedSplitState.status(), actualSplitState.status()); + } + } +}