From d25cf6e714ffedd258520bfed3cdc53eaad35242 Mon Sep 17 00:00:00 2001 From: wxp4532 Date: Fri, 24 Jun 2022 01:15:40 +0800 Subject: [PATCH 1/6] fix lose data on some rollback scene --- .../hudi/sink/StreamWriteOperatorCoordinator.java | 12 +++++++++++- .../java/org/apache/hudi/sink/meta/CkpMetadata.java | 12 +++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 25b34b11e7cc3..ac49987b76350 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -195,7 +196,7 @@ public void start() throws Exception { initMetadataSync(); } this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), metaClient.getBasePath()); - this.ckpMetadata.bootstrap(this.metaClient); + this.ckpMetadata.reInit(); } @Override @@ -304,6 +305,13 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { public void subtaskFailed(int i, @Nullable Throwable throwable) { // reset the event this.eventBuffer[i] = null; + if (Arrays.stream(this.eventBuffer).allMatch(event -> event == null)) { + try { + this.ckpMetadata.reInit(); + } catch (IOException e) { + throw new HoodieException("Re init ckpMeta error", e); + } + } LOG.warn("Reset the event for task [" + i + "]", throwable); } @@ -404,6 +412,8 @@ private void initInstant(String instant) { startInstant(); // upgrade downgrade this.writeClient.upgradeDowngrade(this.instant, this.metaClient); + // avoid write task load wrong instant on running rollback + this.ckpMetadata.bootstrap(this.metaClient); }, "initialize instant %s", instant); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 45a4e04bab285..b742f6781565a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -89,14 +89,17 @@ public void close() { // WRITE METHODS // ------------------------------------------------------------------------- + public void reInit() throws IOException { + fs.delete(path, true); + fs.mkdirs(path); + } + /** * Initialize the message bus, would clean all the messages and publish the last pending instant. * *

This expects to be called by the driver. */ public void bootstrap(HoodieTableMetaClient metaClient) throws IOException { - fs.delete(path, true); - fs.mkdirs(path); metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction() .lastInstant().ifPresent(instant -> startInstant(instant.getTimestamp())); } @@ -154,7 +157,10 @@ public void commitInstant(String instant) { public void abortInstant(String instant) { Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED)); try { - fs.createNewFile(path); + // when all write task failed ckpMeta will reInit and not need abort + if (fs.exists(fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)))) { + fs.createNewFile(path); + } } catch (IOException e) { throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant); } From 5f81f396635c9234a2dacbfc071d0c230691677f Mon Sep 17 00:00:00 2001 From: wxp4532 Date: Fri, 24 Jun 2022 15:56:41 +0800 Subject: [PATCH 2/6] fix lose data on some rollback scene --- .../hudi/sink/StreamWriteOperatorCoordinator.java | 11 +---------- .../hudi/sink/common/AbstractStreamWriteFunction.java | 2 +- .../java/org/apache/hudi/sink/meta/CkpMetadata.java | 9 ++------- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index ac49987b76350..60fda79020a8a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -196,7 +196,7 @@ public void start() throws Exception { initMetadataSync(); } this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), metaClient.getBasePath()); - this.ckpMetadata.reInit(); + this.ckpMetadata.bootstrap(this.metaClient); } @Override @@ -305,13 +305,6 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { public void subtaskFailed(int i, @Nullable Throwable throwable) { // reset the event this.eventBuffer[i] = null; - if (Arrays.stream(this.eventBuffer).allMatch(event -> event == null)) { - try { - this.ckpMetadata.reInit(); - } catch (IOException e) { - throw new HoodieException("Re init ckpMeta error", e); - } - } LOG.warn("Reset the event for task [" + i + "]", throwable); } @@ -412,8 +405,6 @@ private void initInstant(String instant) { startInstant(); // upgrade downgrade this.writeClient.upgradeDowngrade(this.instant, this.metaClient); - // avoid write task load wrong instant on running rollback - this.ckpMetadata.bootstrap(this.metaClient); }, "initialize instant %s", instant); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 98085fa74f5a7..6fecea3b59632 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -265,6 +265,6 @@ protected String instantToWrite(boolean hasData) { * Returns whether the pending instant is invalid to write with. */ private boolean invalidInstant(String instant, boolean hasData) { - return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant); + return instant.equals(this.currentInstant) && hasData && this.ckpMetadata.isAborted(instant); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index b742f6781565a..408e4e2065461 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -89,19 +89,14 @@ public void close() { // WRITE METHODS // ------------------------------------------------------------------------- - public void reInit() throws IOException { - fs.delete(path, true); - fs.mkdirs(path); - } - /** * Initialize the message bus, would clean all the messages and publish the last pending instant. * *

This expects to be called by the driver. */ public void bootstrap(HoodieTableMetaClient metaClient) throws IOException { - metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction() - .lastInstant().ifPresent(instant -> startInstant(instant.getTimestamp())); + fs.delete(path, true); + fs.mkdirs(path); } public void startInstant(String instant) { From 237c256ce3a94d4ed7d20728dddd59336d4f828c Mon Sep 17 00:00:00 2001 From: wxp4532 Date: Fri, 24 Jun 2022 16:09:10 +0800 Subject: [PATCH 3/6] fix lose data on some rollback scene --- .../org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 1 - .../src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 60fda79020a8a..25b34b11e7cc3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -50,7 +50,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 408e4e2065461..f059c7050ca56 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -152,10 +152,7 @@ public void commitInstant(String instant) { public void abortInstant(String instant) { Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED)); try { - // when all write task failed ckpMeta will reInit and not need abort - if (fs.exists(fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)))) { - fs.createNewFile(path); - } + fs.createNewFile(path); } catch (IOException e) { throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant); } From 60991611d88ed813e8717489a7f59ac559b73dce Mon Sep 17 00:00:00 2001 From: wxp4532 Date: Fri, 24 Jun 2022 17:30:46 +0800 Subject: [PATCH 4/6] do not abort instant when some write task was failed --- .../apache/hudi/sink/StreamWriteOperatorCoordinator.java | 9 ++++++--- .../hudi/sink/common/AbstractStreamWriteFunction.java | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 25b34b11e7cc3..f68caffe4e3a7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -266,9 +266,12 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public void notifyCheckpointAborted(long checkpointId) { if (checkpointId == this.checkpointId) { - executor.execute(() -> { - this.ckpMetadata.abortInstant(this.instant); - }, "abort instant %s", this.instant); + // write task failed we do not reuse the instant + if (Arrays.stream(eventBuffer).anyMatch(s -> s == null)) { + executor.execute(() -> { + this.ckpMetadata.abortInstant(this.instant); + }, "abort instant %s", this.instant); + } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 6fecea3b59632..98085fa74f5a7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -265,6 +265,6 @@ protected String instantToWrite(boolean hasData) { * Returns whether the pending instant is invalid to write with. */ private boolean invalidInstant(String instant, boolean hasData) { - return instant.equals(this.currentInstant) && hasData && this.ckpMetadata.isAborted(instant); + return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant); } } From 1b1334c81b330b852eef17550b151f561c188720 Mon Sep 17 00:00:00 2001 From: wxp4532 Date: Fri, 24 Jun 2022 20:08:10 +0800 Subject: [PATCH 5/6] do not abort instant when some write task was failed --- .../sink/StreamWriteOperatorCoordinator.java | 17 +++++++++++------ .../org/apache/hudi/sink/meta/CkpMetadata.java | 4 +++- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index f68caffe4e3a7..b85393403fb16 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -266,12 +267,9 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public void notifyCheckpointAborted(long checkpointId) { if (checkpointId == this.checkpointId) { - // write task failed we do not reuse the instant - if (Arrays.stream(eventBuffer).anyMatch(s -> s == null)) { - executor.execute(() -> { - this.ckpMetadata.abortInstant(this.instant); - }, "abort instant %s", this.instant); - } + executor.execute(() -> { + this.ckpMetadata.abortInstant(this.instant); + }, "abort instant %s", this.instant); } } @@ -308,6 +306,13 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) { // reset the event this.eventBuffer[i] = null; LOG.warn("Reset the event for task [" + i + "]", throwable); + if (Arrays.stream(this.eventBuffer).allMatch(event -> event == null)) { + try { + this.ckpMetadata.bootstrap(this.metaClient); + } catch (IOException e) { + throw new HoodieException("Bootstrap ckpMetadata exception", e); + } + } } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index f059c7050ca56..05859964ceda3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -152,7 +152,9 @@ public void commitInstant(String instant) { public void abortInstant(String instant) { Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED)); try { - fs.createNewFile(path); + if (fs.exists(fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)))) { + fs.createNewFile(path); + } } catch (IOException e) { throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant); } From 3e8a36c14fb71cc567ec525da37869d75af67bbc Mon Sep 17 00:00:00 2001 From: wxp4532 Date: Mon, 27 Jun 2022 11:05:52 +0800 Subject: [PATCH 6/6] rollback task failed scene --- .../hudi/sink/StreamWriteOperatorCoordinator.java | 10 +--------- .../java/org/apache/hudi/sink/meta/CkpMetadata.java | 4 +--- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index b85393403fb16..6f40bcd50ce98 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -50,7 +50,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -266,7 +265,7 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public void notifyCheckpointAborted(long checkpointId) { - if (checkpointId == this.checkpointId) { + if (checkpointId == this.checkpointId && !WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { executor.execute(() -> { this.ckpMetadata.abortInstant(this.instant); }, "abort instant %s", this.instant); @@ -306,13 +305,6 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) { // reset the event this.eventBuffer[i] = null; LOG.warn("Reset the event for task [" + i + "]", throwable); - if (Arrays.stream(this.eventBuffer).allMatch(event -> event == null)) { - try { - this.ckpMetadata.bootstrap(this.metaClient); - } catch (IOException e) { - throw new HoodieException("Bootstrap ckpMetadata exception", e); - } - } } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 05859964ceda3..f059c7050ca56 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -152,9 +152,7 @@ public void commitInstant(String instant) { public void abortInstant(String instant) { Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED)); try { - if (fs.exists(fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)))) { - fs.createNewFile(path); - } + fs.createNewFile(path); } catch (IOException e) { throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant); }