diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java new file mode 100644 index 000000000000..44d3ba572dca --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.SplitRequestEvent; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class IcebergSourceReader extends + SingleThreadMultiplexSourceReaderBase, T, IcebergSourceSplit, IcebergSourceSplit> { + + public IcebergSourceReader( + ReaderFunction readerFunction, + SourceReaderContext context, + ReaderMetricsContext metrics) { + super( + () -> new IcebergSourceSplitReader<>(readerFunction, context, metrics), + new IcebergSourceRecordEmitter<>(), + context.getConfiguration(), + context); + } + + @Override + public void start() { + // We request a split only if we did not get splits during the checkpoint restore. + // Otherwise, reader restarts will keep requesting more and more splits. + if (getNumberOfCurrentlyAssignedSplits() == 0) { + requestSplit(Collections.emptyList()); + } + } + + @Override + protected void onSplitFinished(Map finishedSplitIds) { + requestSplit(Lists.newArrayList(finishedSplitIds.keySet())); + } + + @Override + protected IcebergSourceSplit initializedState(IcebergSourceSplit split) { + return split; + } + + @Override + protected IcebergSourceSplit toSplitType(String splitId, IcebergSourceSplit splitState) { + return splitState; + } + + private void requestSplit(Collection finishedSplitIds) { + context.sendSourceEventToCoordinator(new SplitRequestEvent(finishedSplitIds)); + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java new file mode 100644 index 000000000000..4e467db92e93 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; + +final class IcebergSourceRecordEmitter implements RecordEmitter, T, IcebergSourceSplit> { + + IcebergSourceRecordEmitter() { + } + + @Override + public void emitRecord( + RecordAndPosition element, + SourceOutput output, + IcebergSourceSplit split) { + output.collect(element.record()); + split.updatePosition(element.fileOffset(), element.recordOffset()); + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java new file mode 100644 index 000000000000..f918ae0466ed --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Queue; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.metrics.MetricsContext.Counter; +import org.apache.iceberg.metrics.MetricsContext.Unit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class IcebergSourceSplitReader implements SplitReader, IcebergSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceSplitReader.class); + + private final ReaderFunction openSplitFunction; + private final int indexOfSubtask; + private final Queue splits; + + private final Counter assignedSplits; + private final Counter assignedBytes; + private final Counter finishedSplits; + private final Counter finishedBytes; + private final Counter splitReaderFetchCalls; + + private CloseableIterator>> currentReader; + private IcebergSourceSplit currentSplit; + private String currentSplitId; + + IcebergSourceSplitReader(ReaderFunction openSplitFunction, + SourceReaderContext context, + ReaderMetricsContext metrics) { + this.openSplitFunction = openSplitFunction; + this.indexOfSubtask = context.getIndexOfSubtask(); + this.splits = new ArrayDeque<>(); + + this.assignedSplits = metrics.counter(ReaderMetricsContext.ASSIGNED_SPLITS, Long.class, Unit.COUNT); + this.assignedBytes = metrics.counter(ReaderMetricsContext.ASSIGNED_BYTES, Long.class, Unit.COUNT); + this.finishedSplits = metrics.counter(ReaderMetricsContext.FINISHED_SPLITS, Long.class, Unit.COUNT); + this.finishedBytes = metrics.counter(ReaderMetricsContext.FINISHED_BYTES, Long.class, Unit.COUNT); + this.splitReaderFetchCalls = metrics.counter(ReaderMetricsContext.SPLIT_READER_FETCH_CALLS, Long.class, Unit.COUNT); + } + + @Override + public RecordsWithSplitIds> fetch() throws IOException { + splitReaderFetchCalls.increment(); + if (currentReader == null) { + IcebergSourceSplit nextSplit = splits.poll(); + if (nextSplit != null) { + currentSplit = nextSplit; + currentSplitId = nextSplit.splitId(); + currentReader = openSplitFunction.apply(currentSplit); + } else { + // return an empty result, which will lead to split fetch to be idle. + // SplitFetcherManager will then close idle fetcher. + return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet()); + } + } + + if (currentReader.hasNext()) { + // Because Iterator#next() doesn't support checked exception, + // we need to wrap and unwrap the checked IOException with UncheckedIOException + try { + return currentReader.next(); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } else { + return finishSplit(); + } + } + + @Override + public void handleSplitsChanges(SplitsChange splitsChange) { + if (!(splitsChange instanceof SplitsAddition)) { + throw new UnsupportedOperationException(String.format( + "Unsupported split change: %s", splitsChange.getClass())); + } + + LOG.info("Add {} splits to reader", splitsChange.splits().size()); + splits.addAll(splitsChange.splits()); + assignedSplits.increment(Long.valueOf(splitsChange.splits().size())); + assignedBytes.increment(calculateBytes(splitsChange)); + } + + @Override + public void wakeUp() { + } + + @Override + public void close() throws Exception { + currentSplitId = null; + if (currentReader != null) { + currentReader.close(); + } + } + + private long calculateBytes(IcebergSourceSplit split) { + return split.task().files().stream() + .map(fileScanTask -> fileScanTask.length()) + .reduce(0L, Long::sum); + } + + private long calculateBytes(SplitsChange splitsChanges) { + return splitsChanges.splits().stream() + .map(split -> calculateBytes(split)) + .reduce(0L, Long::sum); + } + + private ArrayBatchRecords finishSplit() throws IOException { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + + ArrayBatchRecords finishRecords = ArrayBatchRecords.finishedSplit(currentSplitId); + LOG.info("Split reader {} finished split: {}", indexOfSubtask, currentSplitId); + finishedSplits.increment(1L); + finishedBytes.increment(calculateBytes(currentSplit)); + currentSplitId = null; + return finishRecords; + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderMetricsContext.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderMetricsContext.java new file mode 100644 index 000000000000..16173324839e --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderMetricsContext.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.MetricGroup; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.metrics.MetricsContext; + +@Internal +public class ReaderMetricsContext implements MetricsContext { + public static final String ASSIGNED_SPLITS = "assignedSplits"; + public static final String ASSIGNED_BYTES = "assignedBytes"; + public static final String FINISHED_SPLITS = "finishedSplits"; + public static final String FINISHED_BYTES = "finishedBytes"; + public static final String SPLIT_READER_FETCH_CALLS = "splitReaderFetchCalls"; + + private final AtomicLong assignedSplits; + private final AtomicLong assignedBytes; + private final AtomicLong finishedSplits; + private final AtomicLong finishedBytes; + private final AtomicLong splitReaderFetchCalls; + + public ReaderMetricsContext(MetricGroup metricGroup) { + MetricGroup readerMetricGroup = metricGroup.addGroup("IcebergSourceReader"); + this.assignedSplits = new AtomicLong(); + this.assignedBytes = new AtomicLong(); + this.finishedSplits = new AtomicLong(); + this.finishedBytes = new AtomicLong(); + this.splitReaderFetchCalls = new AtomicLong(); + readerMetricGroup.gauge(ASSIGNED_SPLITS, assignedSplits::get); + readerMetricGroup.gauge(ASSIGNED_BYTES, assignedBytes::get); + readerMetricGroup.gauge(FINISHED_SPLITS, finishedSplits::get); + readerMetricGroup.gauge(FINISHED_BYTES, finishedBytes::get); + readerMetricGroup.gauge(SPLIT_READER_FETCH_CALLS, splitReaderFetchCalls::get); + } + + @Override + public Counter counter(String name, Class type, Unit unit) { + switch (name) { + case ASSIGNED_SPLITS: + ValidationException.check(type == Long.class, "'%s' requires Long type", ASSIGNED_SPLITS); + return (Counter) longCounter(assignedSplits::addAndGet); + case ASSIGNED_BYTES: + ValidationException.check(type == Long.class, "'%s' requires Integer type", ASSIGNED_BYTES); + return (Counter) longCounter(assignedBytes::addAndGet); + case FINISHED_SPLITS: + ValidationException.check(type == Long.class, "'%s' requires Long type", FINISHED_SPLITS); + return (Counter) longCounter(finishedSplits::addAndGet); + case FINISHED_BYTES: + ValidationException.check(type == Long.class, "'%s' requires Integer type", FINISHED_BYTES); + return (Counter) longCounter(finishedBytes::addAndGet); + case SPLIT_READER_FETCH_CALLS: + ValidationException.check(type == Long.class, "'%s' requires Integer type", SPLIT_READER_FETCH_CALLS); + return (Counter) longCounter(splitReaderFetchCalls::addAndGet); + default: + throw new IllegalArgumentException(String.format("Unsupported counter: '%s'", name)); + } + } + + private Counter longCounter(Consumer consumer) { + return new Counter() { + @Override + public void increment() { + increment(1L); + } + + @Override + public void increment(Long amount) { + consumer.accept(amount); + } + }; + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java index 39b097fa4828..46538516cbda 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java @@ -67,12 +67,6 @@ public void set(T newRecord, int newFileOffset, long newRecordOffset) { this.recordOffset = newRecordOffset; } - /** Sets the position without setting a record. */ - public void position(int newFileOffset, long newRecordOffset) { - this.fileOffset = newFileOffset; - this.recordOffset = newRecordOffset; - } - /** Sets the next record of a sequence. This increments the {@code recordOffset} by one. */ public void record(T nextRecord) { this.record = nextRecord; diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java new file mode 100644 index 000000000000..7a7610cc1978 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.util.Collection; +import java.util.Collections; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * We can remove this class once FLINK-21364 is resolved. + */ +@Internal +public class SplitRequestEvent implements SourceEvent { + private static final long serialVersionUID = 1L; + + private final Collection finishedSplitIds; + private final String requesterHostname; + + public SplitRequestEvent() { + this(Collections.emptyList()); + } + + public SplitRequestEvent(Collection finishedSplitIds) { + this(finishedSplitIds, null); + } + + public SplitRequestEvent(Collection finishedSplitIds, String requesterHostname) { + this.finishedSplitIds = finishedSplitIds; + this.requesterHostname = requesterHostname; + } + + public Collection finishedSplitIds() { + return finishedSplitIds; + } + + public String requesterHostname() { + return requesterHostname; + } +}