From 3ae7a66d13dc37eae8542788f3d77bda6b6e9433 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Fri, 4 Mar 2022 13:06:24 -0800 Subject: [PATCH 1/5] Flink: FLIP-27 source reader --- .../flink/source/IcebergSourceEvents.java | 60 ++++++++ .../source/reader/IcebergSourceReader.java | 72 ++++++++++ .../reader/IcebergSourceReaderMetrics.java | 68 +++++++++ .../reader/IcebergSourceRecordEmitter.java | 39 +++++ .../reader/IcebergSourceSplitReader.java | 134 ++++++++++++++++++ 5 files changed, 373 insertions(+) create mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java create mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java create mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java create mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java create mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java new file mode 100644 index 000000000000..eac671476258 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.util.Collection; +import java.util.Collections; +import javax.annotation.Nullable; +import org.apache.flink.api.connector.source.SourceEvent; + +public class IcebergSourceEvents { + + /** + * We can remove this class once FLINK-21364 is resolved. + */ + public static final class SplitRequestEvent implements SourceEvent { + private static final long serialVersionUID = 1L; + + private final Collection finishedSplitIds; + @Nullable + private final String requesterHostname; + + public SplitRequestEvent() { + this(Collections.emptyList()); + } + + public SplitRequestEvent(Collection finishedSplitIds) { + this(finishedSplitIds, null); + } + + public SplitRequestEvent(Collection finishedSplitIds, String requesterHostname) { + this.finishedSplitIds = finishedSplitIds; + this.requesterHostname = requesterHostname; + } + + public Collection finishedSplitIds() { + return finishedSplitIds; + } + + public String requesterHostname() { + return requesterHostname; + } + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java new file mode 100644 index 000000000000..c38789f7a9b2 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.iceberg.flink.source.IcebergSourceEvents; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class IcebergSourceReader extends + SingleThreadMultiplexSourceReaderBase, T, IcebergSourceSplit, IcebergSourceSplit> { + + public IcebergSourceReader( + ReaderFunction readerFunction, + SourceReaderContext context, + IcebergSourceReaderMetrics metrics) { + super( + () -> new IcebergSourceSplitReader<>(readerFunction, context, metrics), + new IcebergSourceRecordEmitter<>(), + context.getConfiguration(), + context); + } + + @Override + public void start() { + requestSplit(Collections.emptyList()); + } + + @Override + protected void onSplitFinished(Map finishedSplitIds) { + if (!finishedSplitIds.isEmpty()) { + requestSplit(Lists.newArrayList(finishedSplitIds.keySet())); + } + } + + @Override + protected IcebergSourceSplit initializedState(IcebergSourceSplit split) { + return split; + } + + @Override + protected IcebergSourceSplit toSplitType( + String splitId, + IcebergSourceSplit splitState) { + return splitState; + } + + private void requestSplit(Collection finishedSplitIds) { + context.sendSourceEventToCoordinator(new IcebergSourceEvents.SplitRequestEvent(finishedSplitIds)); + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java new file mode 100644 index 000000000000..d50f96fe2453 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +@Internal +public class IcebergSourceReaderMetrics { + private final AtomicLong assignedSplits; + private final AtomicLong assignedBytes; + private final AtomicLong finishedSplits; + private final AtomicLong finishedBytes; + private final Counter splitReaderFetchCalls; + + public IcebergSourceReaderMetrics(MetricGroup metricGroup) { + MetricGroup readerMetricGroup = metricGroup.addGroup("IcebergSourceReader"); + + this.assignedSplits = new AtomicLong(); + this.assignedBytes = new AtomicLong(); + this.finishedSplits = new AtomicLong(); + this.finishedBytes = new AtomicLong(); + readerMetricGroup.gauge("assignedSplits", assignedSplits::get); + readerMetricGroup.gauge("assignedBytes", assignedBytes::get); + readerMetricGroup.gauge("finishedSplits", finishedSplits::get); + readerMetricGroup.gauge("finishedBytes", finishedBytes::get); + this.splitReaderFetchCalls = readerMetricGroup.counter("splitReaderFetchCalls"); + } + + public void incrementAssignedSplits(long delta) { + assignedSplits.addAndGet(delta); + } + + public void incrementAssignedBytes(long delta) { + assignedBytes.addAndGet(delta); + } + + public void incrementFinishedSplits(long delta) { + finishedSplits.addAndGet(delta); + } + + public void incrementFinishedBytes(long delta) { + finishedBytes.addAndGet(delta); + } + + public void incrementSplitReaderFetchCalls() { + splitReaderFetchCalls.inc(); + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java new file mode 100644 index 000000000000..4e467db92e93 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; + +final class IcebergSourceRecordEmitter implements RecordEmitter, T, IcebergSourceSplit> { + + IcebergSourceRecordEmitter() { + } + + @Override + public void emitRecord( + RecordAndPosition element, + SourceOutput output, + IcebergSourceSplit split) { + output.collect(element.record()); + split.updatePosition(element.fileOffset(), element.recordOffset()); + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java new file mode 100644 index 000000000000..d18bab6aaff5 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayDeque; +import java.util.Queue; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.io.CloseableIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class IcebergSourceSplitReader implements SplitReader, IcebergSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceSplitReader.class); + + private final ReaderFunction readerFunction; + private final int indexOfSubtask; + private final IcebergSourceReaderMetrics metrics; + private final Queue splits; + + private CloseableIterator>> currentReader; + private IcebergSourceSplit currentSplit; + private String currentSplitId; + + IcebergSourceSplitReader(ReaderFunction readerFunction, + SourceReaderContext context, + IcebergSourceReaderMetrics metrics) { + this.readerFunction = readerFunction; + this.indexOfSubtask = context.getIndexOfSubtask(); + this.metrics = metrics; + this.splits = new ArrayDeque<>(); + } + + @Override + public RecordsWithSplitIds> fetch() throws IOException { + metrics.incrementSplitReaderFetchCalls(); + checkSplitOrStartNext(); + + if (currentReader.hasNext()) { + // Because Iterator#next() doesn't support checked exception, + // we need to wrap and unwrap the checked IOException with UncheckedIOException + try { + return currentReader.next(); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } else { + return finishSplit(); + } + } + + @Override + public void handleSplitsChanges(SplitsChange splitsChanges) { + LOG.info("Add splits to reader: {}", splitsChanges.splits()); + splits.addAll(splitsChanges.splits()); + metrics.incrementAssignedSplits(splitsChanges.splits().size()); + metrics.incrementAssignedBytes(calculateBytes(splitsChanges)); + } + + @Override + public void wakeUp() { + } + + @Override + public void close() throws Exception { + currentSplitId = null; + if (currentReader != null) { + currentReader.close(); + } + } + + private long calculateBytes(IcebergSourceSplit split) { + return split.task().files().stream() + .map(fileScanTask -> fileScanTask.length()) + .reduce(0L, Long::sum); + } + + private long calculateBytes(SplitsChange splitsChanges) { + return splitsChanges.splits().stream() + .map(split -> calculateBytes(split)) + .reduce(0L, Long::sum); + } + + private void checkSplitOrStartNext() throws IOException { + if (currentReader != null) { + return; + } + + IcebergSourceSplit nextSplit = splits.poll(); + if (nextSplit == null) { + throw new IOException("No split remaining"); + } + + currentSplit = nextSplit; + currentSplitId = nextSplit.splitId(); + currentReader = readerFunction.apply(currentSplit); + } + + private ArrayBatchRecords finishSplit() throws IOException { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + + ArrayBatchRecords finishRecords = ArrayBatchRecords.finishedSplit(currentSplitId); + LOG.info("Split reader {} finished split: {}", indexOfSubtask, currentSplitId); + metrics.incrementFinishedSplits(1L); + metrics.incrementFinishedBytes(calculateBytes(currentSplit)); + currentSplitId = null; + return finishRecords; + } +} From 5ecd259dd97546e484d5bab9953c39ec760451f2 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Sun, 13 Mar 2022 20:39:53 -0700 Subject: [PATCH 2/5] address review comments --- .../flink/source/IcebergSourceEvents.java | 60 ------------ .../source/reader/IcebergSourceReader.java | 14 +-- .../reader/IcebergSourceReaderMetrics.java | 68 -------------- .../reader/IcebergSourceSplitReader.java | 49 +++++++--- .../source/reader/ReaderMetricsContext.java | 93 +++++++++++++++++++ .../source/reader/RecordAndPosition.java | 6 -- .../flink/source/split/SplitRequestEvent.java | 57 ++++++++++++ 7 files changed, 193 insertions(+), 154 deletions(-) delete mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java delete mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java create mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderMetricsContext.java create mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java deleted file mode 100644 index eac671476258..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.source; - -import java.util.Collection; -import java.util.Collections; -import javax.annotation.Nullable; -import org.apache.flink.api.connector.source.SourceEvent; - -public class IcebergSourceEvents { - - /** - * We can remove this class once FLINK-21364 is resolved. - */ - public static final class SplitRequestEvent implements SourceEvent { - private static final long serialVersionUID = 1L; - - private final Collection finishedSplitIds; - @Nullable - private final String requesterHostname; - - public SplitRequestEvent() { - this(Collections.emptyList()); - } - - public SplitRequestEvent(Collection finishedSplitIds) { - this(finishedSplitIds, null); - } - - public SplitRequestEvent(Collection finishedSplitIds, String requesterHostname) { - this.finishedSplitIds = finishedSplitIds; - this.requesterHostname = requesterHostname; - } - - public Collection finishedSplitIds() { - return finishedSplitIds; - } - - public String requesterHostname() { - return requesterHostname; - } - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java index c38789f7a9b2..cc52ee8edf57 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java @@ -24,8 +24,8 @@ import java.util.Map; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; -import org.apache.iceberg.flink.source.IcebergSourceEvents; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.SplitRequestEvent; import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class IcebergSourceReader extends @@ -34,7 +34,7 @@ public class IcebergSourceReader extends public IcebergSourceReader( ReaderFunction readerFunction, SourceReaderContext context, - IcebergSourceReaderMetrics metrics) { + ReaderMetricsContext metrics) { super( () -> new IcebergSourceSplitReader<>(readerFunction, context, metrics), new IcebergSourceRecordEmitter<>(), @@ -49,9 +49,7 @@ public void start() { @Override protected void onSplitFinished(Map finishedSplitIds) { - if (!finishedSplitIds.isEmpty()) { - requestSplit(Lists.newArrayList(finishedSplitIds.keySet())); - } + requestSplit(Lists.newArrayList(finishedSplitIds.keySet())); } @Override @@ -60,13 +58,11 @@ protected IcebergSourceSplit initializedState(IcebergSourceSplit split) { } @Override - protected IcebergSourceSplit toSplitType( - String splitId, - IcebergSourceSplit splitState) { + protected IcebergSourceSplit toSplitType(String splitId, IcebergSourceSplit splitState) { return splitState; } private void requestSplit(Collection finishedSplitIds) { - context.sendSourceEventToCoordinator(new IcebergSourceEvents.SplitRequestEvent(finishedSplitIds)); + context.sendSourceEventToCoordinator(new SplitRequestEvent(finishedSplitIds)); } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java deleted file mode 100644 index d50f96fe2453..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.source.reader; - -import java.util.concurrent.atomic.AtomicLong; -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.MetricGroup; - -@Internal -public class IcebergSourceReaderMetrics { - private final AtomicLong assignedSplits; - private final AtomicLong assignedBytes; - private final AtomicLong finishedSplits; - private final AtomicLong finishedBytes; - private final Counter splitReaderFetchCalls; - - public IcebergSourceReaderMetrics(MetricGroup metricGroup) { - MetricGroup readerMetricGroup = metricGroup.addGroup("IcebergSourceReader"); - - this.assignedSplits = new AtomicLong(); - this.assignedBytes = new AtomicLong(); - this.finishedSplits = new AtomicLong(); - this.finishedBytes = new AtomicLong(); - readerMetricGroup.gauge("assignedSplits", assignedSplits::get); - readerMetricGroup.gauge("assignedBytes", assignedBytes::get); - readerMetricGroup.gauge("finishedSplits", finishedSplits::get); - readerMetricGroup.gauge("finishedBytes", finishedBytes::get); - this.splitReaderFetchCalls = readerMetricGroup.counter("splitReaderFetchCalls"); - } - - public void incrementAssignedSplits(long delta) { - assignedSplits.addAndGet(delta); - } - - public void incrementAssignedBytes(long delta) { - assignedBytes.addAndGet(delta); - } - - public void incrementFinishedSplits(long delta) { - finishedSplits.addAndGet(delta); - } - - public void incrementFinishedBytes(long delta) { - finishedBytes.addAndGet(delta); - } - - public void incrementSplitReaderFetchCalls() { - splitReaderFetchCalls.inc(); - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java index d18bab6aaff5..0510721edadd 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -25,10 +25,14 @@ import java.util.Queue; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.metrics.MetricsContext.Counter; +import org.apache.iceberg.metrics.MetricsContext.Unit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,25 +41,35 @@ class IcebergSourceSplitReader implements SplitReader, I private final ReaderFunction readerFunction; private final int indexOfSubtask; - private final IcebergSourceReaderMetrics metrics; private final Queue splits; + private final Counter assignedSplits; + private final Counter assignedBytes; + private final Counter finishedSplits; + private final Counter finishedBytes; + private final Counter splitReaderFetchCalls; + private CloseableIterator>> currentReader; private IcebergSourceSplit currentSplit; private String currentSplitId; IcebergSourceSplitReader(ReaderFunction readerFunction, SourceReaderContext context, - IcebergSourceReaderMetrics metrics) { + ReaderMetricsContext metrics) { this.readerFunction = readerFunction; this.indexOfSubtask = context.getIndexOfSubtask(); - this.metrics = metrics; this.splits = new ArrayDeque<>(); + + this.assignedSplits = metrics.counter(ReaderMetricsContext.ASSIGNED_SPLITS, Long.class, Unit.COUNT); + this.assignedBytes = metrics.counter(ReaderMetricsContext.ASSIGNED_BYTES, Long.class, Unit.COUNT); + this.finishedSplits = metrics.counter(ReaderMetricsContext.FINISHED_SPLITS, Long.class, Unit.COUNT); + this.finishedBytes = metrics.counter(ReaderMetricsContext.FINISHED_BYTES, Long.class, Unit.COUNT); + this.splitReaderFetchCalls = metrics.counter(ReaderMetricsContext.SPLIT_READER_FETCH_CALLS, Long.class, Unit.COUNT); } @Override public RecordsWithSplitIds> fetch() throws IOException { - metrics.incrementSplitReaderFetchCalls(); + splitReaderFetchCalls.increment(); checkSplitOrStartNext(); if (currentReader.hasNext()) { @@ -72,11 +86,16 @@ public RecordsWithSplitIds> fetch() throws IOException { } @Override - public void handleSplitsChanges(SplitsChange splitsChanges) { - LOG.info("Add splits to reader: {}", splitsChanges.splits()); - splits.addAll(splitsChanges.splits()); - metrics.incrementAssignedSplits(splitsChanges.splits().size()); - metrics.incrementAssignedBytes(calculateBytes(splitsChanges)); + public void handleSplitsChanges(SplitsChange splitsChange) { + if (!(splitsChange instanceof SplitsAddition)) { + throw new UnsupportedOperationException(String.format( + "The SplitChange type of %s is not supported.", splitsChange.getClass())); + } + + LOG.info("Add splits to reader: {}", splitsChange.splits()); + splits.addAll(splitsChange.splits()); + assignedSplits.increment(Long.valueOf(splitsChange.splits().size())); + assignedBytes.increment(calculateBytes(splitsChange)); } @Override @@ -103,6 +122,14 @@ private long calculateBytes(SplitsChange splitsChanges) { .reduce(0L, Long::sum); } + /** + * @throws IOException when current split is done and there is no more splits available. + * It will be propagated by caller of {@code FetchTask#run()}. That will cause + * {@link SplitFetcher} to exit. When new split is assigned, a new {@code SplitFetcher} + * will be created to handle it. This behavior is a little odd. + * Right now, we are copying the same behavior from Flink file source. + * We can work with Flink community and potentially improve this behavior. + */ private void checkSplitOrStartNext() throws IOException { if (currentReader != null) { return; @@ -126,8 +153,8 @@ private ArrayBatchRecords finishSplit() throws IOException { ArrayBatchRecords finishRecords = ArrayBatchRecords.finishedSplit(currentSplitId); LOG.info("Split reader {} finished split: {}", indexOfSubtask, currentSplitId); - metrics.incrementFinishedSplits(1L); - metrics.incrementFinishedBytes(calculateBytes(currentSplit)); + finishedSplits.increment(1L); + finishedBytes.increment(calculateBytes(currentSplit)); currentSplitId = null; return finishRecords; } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderMetricsContext.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderMetricsContext.java new file mode 100644 index 000000000000..16173324839e --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderMetricsContext.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.MetricGroup; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.metrics.MetricsContext; + +@Internal +public class ReaderMetricsContext implements MetricsContext { + public static final String ASSIGNED_SPLITS = "assignedSplits"; + public static final String ASSIGNED_BYTES = "assignedBytes"; + public static final String FINISHED_SPLITS = "finishedSplits"; + public static final String FINISHED_BYTES = "finishedBytes"; + public static final String SPLIT_READER_FETCH_CALLS = "splitReaderFetchCalls"; + + private final AtomicLong assignedSplits; + private final AtomicLong assignedBytes; + private final AtomicLong finishedSplits; + private final AtomicLong finishedBytes; + private final AtomicLong splitReaderFetchCalls; + + public ReaderMetricsContext(MetricGroup metricGroup) { + MetricGroup readerMetricGroup = metricGroup.addGroup("IcebergSourceReader"); + this.assignedSplits = new AtomicLong(); + this.assignedBytes = new AtomicLong(); + this.finishedSplits = new AtomicLong(); + this.finishedBytes = new AtomicLong(); + this.splitReaderFetchCalls = new AtomicLong(); + readerMetricGroup.gauge(ASSIGNED_SPLITS, assignedSplits::get); + readerMetricGroup.gauge(ASSIGNED_BYTES, assignedBytes::get); + readerMetricGroup.gauge(FINISHED_SPLITS, finishedSplits::get); + readerMetricGroup.gauge(FINISHED_BYTES, finishedBytes::get); + readerMetricGroup.gauge(SPLIT_READER_FETCH_CALLS, splitReaderFetchCalls::get); + } + + @Override + public Counter counter(String name, Class type, Unit unit) { + switch (name) { + case ASSIGNED_SPLITS: + ValidationException.check(type == Long.class, "'%s' requires Long type", ASSIGNED_SPLITS); + return (Counter) longCounter(assignedSplits::addAndGet); + case ASSIGNED_BYTES: + ValidationException.check(type == Long.class, "'%s' requires Integer type", ASSIGNED_BYTES); + return (Counter) longCounter(assignedBytes::addAndGet); + case FINISHED_SPLITS: + ValidationException.check(type == Long.class, "'%s' requires Long type", FINISHED_SPLITS); + return (Counter) longCounter(finishedSplits::addAndGet); + case FINISHED_BYTES: + ValidationException.check(type == Long.class, "'%s' requires Integer type", FINISHED_BYTES); + return (Counter) longCounter(finishedBytes::addAndGet); + case SPLIT_READER_FETCH_CALLS: + ValidationException.check(type == Long.class, "'%s' requires Integer type", SPLIT_READER_FETCH_CALLS); + return (Counter) longCounter(splitReaderFetchCalls::addAndGet); + default: + throw new IllegalArgumentException(String.format("Unsupported counter: '%s'", name)); + } + } + + private Counter longCounter(Consumer consumer) { + return new Counter() { + @Override + public void increment() { + increment(1L); + } + + @Override + public void increment(Long amount) { + consumer.accept(amount); + } + }; + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java index 39b097fa4828..46538516cbda 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java @@ -67,12 +67,6 @@ public void set(T newRecord, int newFileOffset, long newRecordOffset) { this.recordOffset = newRecordOffset; } - /** Sets the position without setting a record. */ - public void position(int newFileOffset, long newRecordOffset) { - this.fileOffset = newFileOffset; - this.recordOffset = newRecordOffset; - } - /** Sets the next record of a sequence. This increments the {@code recordOffset} by one. */ public void record(T nextRecord) { this.record = nextRecord; diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java new file mode 100644 index 000000000000..7a7610cc1978 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.util.Collection; +import java.util.Collections; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * We can remove this class once FLINK-21364 is resolved. + */ +@Internal +public class SplitRequestEvent implements SourceEvent { + private static final long serialVersionUID = 1L; + + private final Collection finishedSplitIds; + private final String requesterHostname; + + public SplitRequestEvent() { + this(Collections.emptyList()); + } + + public SplitRequestEvent(Collection finishedSplitIds) { + this(finishedSplitIds, null); + } + + public SplitRequestEvent(Collection finishedSplitIds, String requesterHostname) { + this.finishedSplitIds = finishedSplitIds; + this.requesterHostname = requesterHostname; + } + + public Collection finishedSplitIds() { + return finishedSplitIds; + } + + public String requesterHostname() { + return requesterHostname; + } +} From 41ef9ba71550c5575af12cec54d01765b85e2e1a Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Mon, 14 Mar 2022 17:06:13 -0700 Subject: [PATCH 3/5] only request split on start if there is none restored --- .../iceberg/flink/source/reader/IcebergSourceReader.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java index cc52ee8edf57..44d3ba572dca 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java @@ -44,7 +44,11 @@ public IcebergSourceReader( @Override public void start() { - requestSplit(Collections.emptyList()); + // We request a split only if we did not get splits during the checkpoint restore. + // Otherwise, reader restarts will keep requesting more and more splits. + if (getNumberOfCurrentlyAssignedSplits() == 0) { + requestSplit(Collections.emptyList()); + } } @Override From 5eaf6b5bf59af03084c2c12a32699b2fc6d6a016 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Mon, 21 Mar 2022 08:41:10 -0700 Subject: [PATCH 4/5] address Ryan's comments --- .../reader/IcebergSourceSplitReader.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java index 0510721edadd..8b69f39fb1f4 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -25,7 +25,6 @@ import java.util.Queue; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; -import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; @@ -39,7 +38,7 @@ class IcebergSourceSplitReader implements SplitReader, IcebergSourceSplit> { private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceSplitReader.class); - private final ReaderFunction readerFunction; + private final ReaderFunction openSplitFunction; private final int indexOfSubtask; private final Queue splits; @@ -53,10 +52,10 @@ class IcebergSourceSplitReader implements SplitReader, I private IcebergSourceSplit currentSplit; private String currentSplitId; - IcebergSourceSplitReader(ReaderFunction readerFunction, + IcebergSourceSplitReader(ReaderFunction openSplitFunction, SourceReaderContext context, ReaderMetricsContext metrics) { - this.readerFunction = readerFunction; + this.openSplitFunction = openSplitFunction; this.indexOfSubtask = context.getIndexOfSubtask(); this.splits = new ArrayDeque<>(); @@ -89,10 +88,10 @@ public RecordsWithSplitIds> fetch() throws IOException { public void handleSplitsChanges(SplitsChange splitsChange) { if (!(splitsChange instanceof SplitsAddition)) { throw new UnsupportedOperationException(String.format( - "The SplitChange type of %s is not supported.", splitsChange.getClass())); + "Unsupported split change: %s", splitsChange.getClass())); } - LOG.info("Add splits to reader: {}", splitsChange.splits()); + LOG.info("Add {} splits to reader", splitsChange.splits().size()); splits.addAll(splitsChange.splits()); assignedSplits.increment(Long.valueOf(splitsChange.splits().size())); assignedBytes.increment(calculateBytes(splitsChange)); @@ -122,14 +121,6 @@ private long calculateBytes(SplitsChange splitsChanges) { .reduce(0L, Long::sum); } - /** - * @throws IOException when current split is done and there is no more splits available. - * It will be propagated by caller of {@code FetchTask#run()}. That will cause - * {@link SplitFetcher} to exit. When new split is assigned, a new {@code SplitFetcher} - * will be created to handle it. This behavior is a little odd. - * Right now, we are copying the same behavior from Flink file source. - * We can work with Flink community and potentially improve this behavior. - */ private void checkSplitOrStartNext() throws IOException { if (currentReader != null) { return; @@ -137,12 +128,18 @@ private void checkSplitOrStartNext() throws IOException { IcebergSourceSplit nextSplit = splits.poll(); if (nextSplit == null) { + // throws IOException when current split is done and there is no more splits available. + // It will be propagated by the caller of FetchTask#run(). That will cause + // SplitFetcher to exit. When new split is assigned, a new SplitFetcher + // will be created to handle it. This behavior is a little odd. + // Right now, we are copying the same behavior from Flink file source. + // We can work with Flink community and potentially improve this behavior. throw new IOException("No split remaining"); } currentSplit = nextSplit; currentSplitId = nextSplit.splitId(); - currentReader = readerFunction.apply(currentSplit); + currentReader = openSplitFunction.apply(currentSplit); } private ArrayBatchRecords finishSplit() throws IOException { From 8cadefe68a839987b38357aa9ef48d255bec56a2 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Fri, 25 Mar 2022 11:24:09 -0700 Subject: [PATCH 5/5] return empty result when split reader has no more split. It will cause split fetcher to be idle. SplitFetcherManager closes idle fetcher. --- .../reader/IcebergSourceSplitReader.java | 36 ++++++++----------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java index 8b69f39fb1f4..f918ae0466ed 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayDeque; +import java.util.Collections; import java.util.Queue; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; @@ -69,7 +71,18 @@ class IcebergSourceSplitReader implements SplitReader, I @Override public RecordsWithSplitIds> fetch() throws IOException { splitReaderFetchCalls.increment(); - checkSplitOrStartNext(); + if (currentReader == null) { + IcebergSourceSplit nextSplit = splits.poll(); + if (nextSplit != null) { + currentSplit = nextSplit; + currentSplitId = nextSplit.splitId(); + currentReader = openSplitFunction.apply(currentSplit); + } else { + // return an empty result, which will lead to split fetch to be idle. + // SplitFetcherManager will then close idle fetcher. + return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet()); + } + } if (currentReader.hasNext()) { // Because Iterator#next() doesn't support checked exception, @@ -121,27 +134,6 @@ private long calculateBytes(SplitsChange splitsChanges) { .reduce(0L, Long::sum); } - private void checkSplitOrStartNext() throws IOException { - if (currentReader != null) { - return; - } - - IcebergSourceSplit nextSplit = splits.poll(); - if (nextSplit == null) { - // throws IOException when current split is done and there is no more splits available. - // It will be propagated by the caller of FetchTask#run(). That will cause - // SplitFetcher to exit. When new split is assigned, a new SplitFetcher - // will be created to handle it. This behavior is a little odd. - // Right now, we are copying the same behavior from Flink file source. - // We can work with Flink community and potentially improve this behavior. - throw new IOException("No split remaining"); - } - - currentSplit = nextSplit; - currentSplitId = nextSplit.splitId(); - currentReader = openSplitFunction.apply(currentSplit); - } - private ArrayBatchRecords finishSplit() throws IOException { if (currentReader != null) { currentReader.close();