Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flink/v1.15/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'org.apache.hive', module: 'hive-storage-api'
}

testImplementation "org.apache.flink:flink-connector-test-utils:${flinkVersion}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added to use TestingSplitEnumeratorContext

testImplementation "org.apache.flink:flink-core:${flinkVersion}"
testImplementation "org.apache.flink:flink-runtime:${flinkVersion}"
testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TODO: publish enumerator monitor metrics like number of pending metrics after FLINK-21000 is resolved
*/
abstract class AbstractIcebergEnumerator implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class);

private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
private final SplitAssigner assigner;
private final Map<Integer, String> readersAwaitingSplit;
private final AtomicReference<CompletableFuture<Void>> availableFuture;

AbstractIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner) {
this.enumeratorContext = enumeratorContext;
this.assigner = assigner;
this.readersAwaitingSplit = new LinkedHashMap<>();
this.availableFuture = new AtomicReference<>();
}

@Override
public void start() {
assigner.start();
}

@Override
public void close() throws IOException {
assigner.close();
}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
// Iceberg source uses custom split request event to piggyback finished split ids.
throw new UnsupportedOperationException(String.format("Received invalid default split request event " +
"from subtask %d as Iceberg source uses custom split request event", subtaskId));
}

@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof SplitRequestEvent) {
SplitRequestEvent splitRequestEvent =
(SplitRequestEvent) sourceEvent;
LOG.info("Received request split event from subtask {}", subtaskId);
assigner.onCompletedSplits(splitRequestEvent.finishedSplitIds());
readersAwaitingSplit.put(subtaskId, splitRequestEvent.requesterHostname());
assignSplits();
} else {
throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s",
subtaskId, sourceEvent.getClass().getCanonicalName()));
}
}

@Override
public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
LOG.info("Add {} splits back to the pool for failed subtask {}",
splits.size(), subtaskId);
assigner.onUnassignedSplits(splits);
assignSplits();
}

@Override
public void addReader(int subtaskId) {
LOG.info("Added reader: {}", subtaskId);
}

private void assignSplits() {
LOG.info("Assigning splits for {} awaiting readers", readersAwaitingSplit.size());
Iterator<Map.Entry<Integer, String>> awaitingReader =
readersAwaitingSplit.entrySet().iterator();
while (awaitingReader.hasNext()) {
Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
// if the reader that requested another split has failed in the meantime, remove
// it from the list of waiting readers
if (!enumeratorContext.registeredReaders().containsKey(nextAwaiting.getKey())) {
awaitingReader.remove();
continue;
}

int awaitingSubtask = nextAwaiting.getKey();
String hostname = nextAwaiting.getValue();
GetSplitResult getResult = assigner.getNext(hostname);
if (getResult.status() == GetSplitResult.Status.AVAILABLE) {
LOG.info("Assign split to subtask {}: {}", awaitingSubtask, getResult.split());
enumeratorContext.assignSplit(getResult.split(), awaitingSubtask);
awaitingReader.remove();
} else if (getResult.status() == GetSplitResult.Status.CONSTRAINED) {
getAvailableFutureIfNeeded();
break;
} else if (getResult.status() == GetSplitResult.Status.UNAVAILABLE) {
if (shouldWaitForMoreSplits()) {
getAvailableFutureIfNeeded();
break;
} else {
LOG.info("No more splits available for subtask {}", awaitingSubtask);
enumeratorContext.signalNoMoreSplits(awaitingSubtask);
awaitingReader.remove();
}
} else {
throw new IllegalArgumentException("Unsupported status: " + getResult.status());
}
}
}

/**
* return true if enumerator should wait for splits
* like in the continuous enumerator case
*/
protected abstract boolean shouldWaitForMoreSplits();

private synchronized void getAvailableFutureIfNeeded() {
if (availableFuture.get() != null) {
return;
}
CompletableFuture<Void> future = assigner.isAvailable()
.thenAccept(ignore ->
// Must run assignSplits in coordinator thread
// because the future may be completed from other threads.
// E.g., in event time alignment assigner,
// watermark advancement from another source may
// cause the available future to be completed
enumeratorContext.runInCoordinatorThread(() -> {
LOG.debug("Executing callback of assignSplits");
availableFuture.set(null);
assignSplits();
}));
availableFuture.set(future);
LOG.debug("Registered callback for future available splits");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {

private static final Logger LOG = LoggerFactory.getLogger(ContinuousIcebergEnumerator.class);

private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
private final SplitAssigner assigner;
private final ScanContext scanContext;
private final ContinuousSplitPlanner splitPlanner;

/**
* snapshotId for the last enumerated snapshot. next incremental enumeration
* should be based off this as the starting position.
*/
private final AtomicReference<IcebergEnumeratorPosition> enumeratorPosition;

public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
ScanContext scanContext,
ContinuousSplitPlanner splitPlanner,
@Nullable IcebergEnumeratorState enumState) {
super(enumeratorContext, assigner);

this.enumeratorContext = enumeratorContext;
this.assigner = assigner;
this.scanContext = scanContext;
this.splitPlanner = splitPlanner;
this.enumeratorPosition = new AtomicReference<>();
if (enumState != null) {
this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
}
}

@Override
public void start() {
super.start();
enumeratorContext.callAsync(
this::discoverSplits,
this::processDiscoveredSplits,
0L,
scanContext.monitorInterval().toMillis());
}

@Override
public void close() throws IOException {
splitPlanner.close();
super.close();
}

@Override
protected boolean shouldWaitForMoreSplits() {
return true;
}

@Override
public IcebergEnumeratorState snapshotState(long checkpointId) {
return new IcebergEnumeratorState(enumeratorPosition.get(), assigner.state());
}

/**
* This method is executed in an IO thread pool.
*/
private ContinuousEnumerationResult discoverSplits() {
return splitPlanner.planSplits(enumeratorPosition.get());
}

/**
* This method is executed in a single coordinator thread.
*/
private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
if (error == null) {
if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
// Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O thread pool.
// E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the thread
// pool is busy and multiple discovery actions are executed concurrently. Discovery result should
// only be accepted if the starting position matches the enumerator position (like compare-and-swap).
LOG.info("Skip {} discovered splits because the scan starting position doesn't match " +
"the current enumerator position: enumerator position = {}, scan starting position = {}",
result.splits().size(), enumeratorPosition.get(), result.fromPosition());
} else {
assigner.onDiscoveredSplits(result.splits());
LOG.info("Added {} splits discovered between ({}, {}] to the assigner",
result.splits().size(), result.fromPosition(), result.toPosition());
// update the enumerator position even if there is no split discovered
// or the toPosition is empty (e.g. for empty table).
enumeratorPosition.set(result.toPosition());
LOG.info("Update enumerator position to {}", result.toPosition());
}
} else {
LOG.error("Failed to discover new splits", error);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;

class IcebergEnumeratorPositionSerializer implements SimpleVersionedSerializer<IcebergEnumeratorPosition> {

public static final IcebergEnumeratorPositionSerializer INSTANCE = new IcebergEnumeratorPositionSerializer();

private static final int VERSION = 1;

private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(128));

@Override
public int getVersion() {
return VERSION;
}

@Override
public byte[] serialize(IcebergEnumeratorPosition position) throws IOException {
return serializeV1(position);
}

@Override
public IcebergEnumeratorPosition deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return deserializeV1(serialized);
default:
throw new IOException("Unknown version: " + version);
}
}

private byte[] serializeV1(IcebergEnumeratorPosition position) throws IOException {
DataOutputSerializer out = SERIALIZER_CACHE.get();
out.writeBoolean(position.snapshotId() != null);
if (position.snapshotId() != null) {
out.writeLong(position.snapshotId());
}
out.writeBoolean(position.snapshotTimestampMs() != null);
if (position.snapshotTimestampMs() != null) {
out.writeLong(position.snapshotTimestampMs());
}
byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
}

private IcebergEnumeratorPosition deserializeV1(byte[] serialized) throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
Long snapshotId = null;
if (in.readBoolean()) {
snapshotId = in.readLong();
}

Long snapshotTimestampMs = null;
if (in.readBoolean()) {
snapshotTimestampMs = in.readLong();
}

if (snapshotId != null) {
return IcebergEnumeratorPosition.of(snapshotId, snapshotTimestampMs);
} else {
return IcebergEnumeratorPosition.empty();
}
}
}
Loading