Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.exceptions;

/**
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To Reviewers: This is our new exception, unless an operations class throws this particular exception all snapshot producer behavior remains the same

* Exception for a failure to confirm either affirmatively or negatively that a commit was applied. The client
* cannot take any further action without possibly corrupting the table.
*/
public class CommitStateUnknownException extends RuntimeException {

private static final String COMMON_INFO =
"Cannot determine whether the commit was successful or not, the underlying data files may or " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks descriptive enough!

"may not be needed. Manual intervention via the Remove Orphan Files Action can remove these " +
"files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.\n" +
"Please check to see whether or not your commit was successful before retrying this commit. Retrying " +
"an already successful operation will result in duplicate records or unintentional modifications.\n" +
"At this time no files will be deleted including possibly unused manifest lists.";

public CommitStateUnknownException(Throwable cause) {
super(cause.getMessage() + "\n" + COMMON_INFO, cause);
}
}
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.function.Consumer;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -299,6 +300,8 @@ public void commit() {
taskOps.commit(base, updated.withUUID());
});

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes deletion of data files, manifests and snapshots related to unknown state commits

} catch (CommitStateUnknownException commitStateUnknownException) {
throw commitStateUnknownException;
} catch (RuntimeException e) {
Exceptions.suppressAndThrow(e, this::cleanAll);
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public interface TableOperations {
* Implementations must check that the base metadata is current to avoid overwriting updates.
* Once the atomic commit operation succeeds, implementations must not perform any operations that
* may fail because failure in this method cannot be distinguished from commit failure.
* <p>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requirement for new exception detailed in api here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

* Implementations must throw a {@link org.apache.iceberg.exceptions.CommitStateUnknownException}
* in cases where it cannot be determined if the commit succeeded or failed.
* For example if a network partition causes the confirmation of the commit to be lost,
* the implementation should throw a CommitStateUnknownException. This is important because downstream users of
* this API need to know whether they can clean up the commit or not, if the state is unknown then it is not safe
* to remove any files. All other exceptions will be treated as if the commit has failed.
*
* @param base table metadata on which changes were based
* @param metadata new table metadata with updates
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ private TableProperties() {
public static final String COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms";
public static final int COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000; // 30 minutes

public static final String COMMIT_NUM_STATUS_CHECKS = "commit.num-status-checks";
public static final int COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3;

public static final String MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes";
public static final long MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8388608; // 8 MB

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,32 @@
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;

/**
* TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to
* avoid code duplication between this class and Metacat Tables.
*/
public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class);

private static final int COMMIT_STATUS_CHECK_WAIT_MS = 1000;

private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
Expand All @@ -91,6 +99,12 @@ private static class WaitingForLockException extends RuntimeException {
}
}

private enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
}

private final HiveClientPool metaClients;
private final String fullName;
private final String database;
Expand Down Expand Up @@ -153,12 +167,13 @@ protected void doRefresh() {
refreshFromMetadataLocation(metadataLocation);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);

boolean threw = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than the true/false here we need the abilty to differentiate between knowing that

  1. The Commit Failed
  2. The Commit Suceeded
  3. We aren't sure

CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveTable = false;
Optional<Long> lockId = Optional.empty();
try {
Expand Down Expand Up @@ -203,8 +218,23 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
.orElseGet(ImmutableMap::of);
setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), removedProps, hiveEngineEnabled, summary);

persistTable(tbl, updateHiveTable);
threw = false;
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much cleaner now!

persistTable(tbl, updateHiveTable);
commitStatus = CommitStatus.SUCCESS;
} catch (Throwable persistFailure) {
LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database, tableName, persistFailure);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
switch (commitStatus) {
case SUCCESS:
break;
case FAILURE:
throw persistFailure;
case UNKNOWN:
throw new CommitStateUnknownException(persistFailure);
}
}

} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);

Expand All @@ -222,11 +252,56 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new RuntimeException("Interrupted during commit", e);

} finally {
cleanupMetadataAndUnlock(threw, newMetadataLocation, lockId);
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
}
}

/**
* Attempt to load the table and see if any current or past metadata location matches the one we were attempting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the javadoc! It is going to be helpful for folks who touch this code next time.

* to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has
* failed but are not proof that this is the case. Past locations must also be searched on the chance that a second
* committer was able to successfully commit on top of our commit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I have a question about the past locations check here that when checkCommitStatus() is called, we still holds the metastore lock, so is it possible for another commiter to commit on top of our commit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We request an EXCLUSIVE lock before committing. Until we hold the lock, no other committer can get another lock on the same table, so no other committer can commit

*
* @param newMetadataLocation the path of the new commit file
* @param config metadata to use for configuration
* @return Commit Status of Success, Failure or Unknown
*/
private CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here looks correct to me.

int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), COMMIT_NUM_STATUS_CHECKS,
COMMIT_NUM_STATUS_CHECKS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(COMMIT_STATUS_CHECK_WAIT_MS, COMMIT_STATUS_CHECK_WAIT_MS, Long.MAX_VALUE, 2.0)
.onFailure((location, checkException) ->
LOG.error("Cannot check if commit to {}.{} exists.", database, tableName, checkException))
.run(location -> {
TableMetadata metadata = refresh();
String currentMetadataLocation = metadata.metadataFileLocation();
boolean commitSuccess = currentMetadataLocation.equals(newMetadataLocation) ||
metadata.previousFiles().stream().anyMatch(log -> log.file().equals(newMetadataLocation));
if (commitSuccess) {
LOG.info("Commit status check: Commit to {}.{} of {} succeeded", database, tableName, newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.info("Commit status check: Commit to {}.{} of {} failed", database, tableName, newMetadataLocation);
status.set(CommitStatus.FAILURE);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error("Cannot determine commit state to {}.{}. Failed during checking {} times. " +
"Treating commit state as unknown.",
database, tableName, maxAttempts);
}
return status.get();
}

private void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException {
@VisibleForTesting
void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException {
if (updateHiveTable) {
metaClients.run(client -> {
EnvironmentContext envContext = new EnvironmentContext(
Expand Down Expand Up @@ -335,7 +410,8 @@ private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hive
return storageDescriptor;
}

private long acquireLock() throws UnknownHostException, TException, InterruptedException {
@VisibleForTesting
long acquireLock() throws UnknownHostException, TException, InterruptedException {
final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
lockComponent.setTablename(tableName);
final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent),
Expand Down Expand Up @@ -401,10 +477,10 @@ private long acquireLock() throws UnknownHostException, TException, InterruptedE
return lockId;
}

private void cleanupMetadataAndUnlock(boolean errorThrown, String metadataLocation, Optional<Long> lockId) {
private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId) {
try {
if (errorThrown) {
// if anything went wrong, clean up the uncommitted metadata file
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the issue with deleting metadata.json in unknown state cases

if (commitStatus == CommitStatus.FAILURE) {
// If we are sure the commit failed, clean up the uncommitted metadata file
io().deleteFile(metadataLocation);
}
} catch (RuntimeException e) {
Expand All @@ -425,8 +501,8 @@ private void unlock(Optional<Long> lockId) {
}
}

// visible for testing
protected void doUnlock(long lockId) throws TException, InterruptedException {
@VisibleForTesting
void doUnlock(long lockId) throws TException, InterruptedException {
metaClients.run(client -> {
client.unlock(lockId);
return null;
Expand Down
Loading