Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.exceptions;

/**
* Exception for a failure to confirm either affirmatively or negatively that a commit was applied. The client
* cannot take any further action without possibly corrupting the table.
*/
public class CommitStateUnknownException extends RuntimeException {

private static final String COMMON_INFO =
"Cannot determine whether the commit was successful or not, the underlying data files may or " +
"may not be needed. Manual intervention via the Remove Orphan Files Action can remove these " +
"files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.\n" +
"Please check to see whether or not your commit was successful before retrying this commit. Retrying " +
"an already successful operation will result in duplicate records or unintentional modifications.\n" +
"At this time no files will be deleted including possibly unused manifest lists.";

public CommitStateUnknownException(Throwable cause) {
super(cause.getMessage() + "\n" + COMMON_INFO, cause);
}
}
21 changes: 15 additions & 6 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.function.Consumer;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -218,12 +219,15 @@ private Map<String, String> summary(TableMetadata previous) {
}
} else {
// if there was no previous snapshot, default the summary to start totals at 0
previousSummary = ImmutableMap.of(
SnapshotSummary.TOTAL_RECORDS_PROP, "0",
SnapshotSummary.TOTAL_DATA_FILES_PROP, "0",
SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0",
SnapshotSummary.TOTAL_POS_DELETES_PROP, "0",
SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0");
ImmutableMap.Builder<String, String> summaryBuilder = ImmutableMap.builder();
summaryBuilder
.put(SnapshotSummary.TOTAL_RECORDS_PROP, "0")
.put(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "0")
.put(SnapshotSummary.TOTAL_DATA_FILES_PROP, "0")
.put(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
.put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
.put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0");
previousSummary = summaryBuilder.build();
}

ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
Expand All @@ -234,6 +238,9 @@ private Map<String, String> summary(TableMetadata previous) {
updateTotal(
builder, previousSummary, SnapshotSummary.TOTAL_RECORDS_PROP,
summary, SnapshotSummary.ADDED_RECORDS_PROP, SnapshotSummary.DELETED_RECORDS_PROP);
updateTotal(
builder, previousSummary, SnapshotSummary.TOTAL_FILE_SIZE_PROP,
summary, SnapshotSummary.ADDED_FILE_SIZE_PROP, SnapshotSummary.REMOVED_FILE_SIZE_PROP);
updateTotal(
builder, previousSummary, SnapshotSummary.TOTAL_DATA_FILES_PROP,
summary, SnapshotSummary.ADDED_FILES_PROP, SnapshotSummary.DELETED_FILES_PROP);
Expand Down Expand Up @@ -293,6 +300,8 @@ public void commit() {
taskOps.commit(base, updated.withUUID());
});

} catch (CommitStateUnknownException commitStateUnknownException) {
throw commitStateUnknownException;
} catch (RuntimeException e) {
Exceptions.suppressAndThrow(e, this::cleanAll);
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class SnapshotSummary {
public static final String TOTAL_RECORDS_PROP = "total-records";
public static final String ADDED_FILE_SIZE_PROP = "added-files-size";
public static final String REMOVED_FILE_SIZE_PROP = "removed-files-size";
public static final String TOTAL_FILE_SIZE_PROP = "total-files-size";
public static final String ADDED_POS_DELETES_PROP = "added-position-deletes";
public static final String REMOVED_POS_DELETES_PROP = "removed-position-deletes";
public static final String TOTAL_POS_DELETES_PROP = "total-position-deletes";
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public interface TableOperations {
* Implementations must check that the base metadata is current to avoid overwriting updates.
* Once the atomic commit operation succeeds, implementations must not perform any operations that
* may fail because failure in this method cannot be distinguished from commit failure.
* <p>
* Implementations must throw a {@link org.apache.iceberg.exceptions.CommitStateUnknownException}
* in cases where it cannot be determined if the commit succeeded or failed.
* For example if a network partition causes the confirmation of the commit to be lost,
* the implementation should throw a CommitStateUnknownException. This is important because downstream users of
* this API need to know whether they can clean up the commit or not, if the state is unknown then it is not safe
* to remove any files. All other exceptions will be treated as if the commit has failed.
*
* @param base table metadata on which changes were based
* @param metadata new table metadata with updates
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ private TableProperties() {
public static final String COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms";
public static final int COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000; // 30 minutes

public static final String COMMIT_NUM_STATUS_CHECKS = "commit.num-status-checks";
public static final int COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3;

public static final String MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes";
public static final long MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8388608; // 8 MB

Expand Down
82 changes: 82 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestSnapshotSummary extends TableTestBase {
public TestSnapshotSummary(int formatVersion) {
super(formatVersion);
}

@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
}

@Test
public void testFileSizeSummary() {
Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());

// fast append
table.newFastAppend()
.appendFile(FILE_A)
.commit();
Map<String, String> summary = table.currentSnapshot().summary();
Assert.assertEquals("10", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP));
Assert.assertNull(summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP));
Assert.assertEquals("10", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));

// merge append
table.newAppend()
.appendFile(FILE_B)
.commit();
summary = table.currentSnapshot().summary();
Assert.assertEquals("10", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP));
Assert.assertNull(summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP));
Assert.assertEquals("20", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));

table.newOverwrite()
.deleteFile(FILE_A)
.deleteFile(FILE_B)
.addFile(FILE_C)
.addFile(FILE_D)
.addFile(FILE_D)
.commit();
summary = table.currentSnapshot().summary();
Assert.assertEquals("30", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP));
Assert.assertEquals("20", summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP));
Assert.assertEquals("30", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));

table.newDelete()
.deleteFile(FILE_C)
.deleteFile(FILE_D)
.commit();
summary = table.currentSnapshot().summary();
Assert.assertNull(summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP));
Assert.assertEquals("20", summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP));
Assert.assertEquals("10", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A {@link HiveTableOperations} that does not override any existing Hive metadata.
* TODO: This extension should be removed once dual-publish of iceberg+hive is stopped.
*
* The behaviour of this class differs from {@link HiveTableOperations} in the following ways:
* 1. Does not modify serde information of existing Hive table, this means that if Iceberg schema is updated
Expand Down Expand Up @@ -139,7 +142,7 @@ protected void doRefresh() {
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);

boolean threw = true;
CommitStatus commitStatus = CommitStatus.FAILURE;
Optional<Long> lockId = Optional.empty();
try {
lockId = Optional.of(acquireLock());
Expand Down Expand Up @@ -180,26 +183,27 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
baseMetadataLocation, metadataLocation, database, tableName);
}

setParameters(newMetadataLocation, tbl, false);

if (tableExists) {
metaClients.run(client -> {
EnvironmentContext envContext = new EnvironmentContext(
ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)
);
LOG.debug("Updating the metadata location of the following table:");
logTable(tbl);
LOG.debug("Metadata Location: {}", tbl.getParameters().get(METADATA_LOCATION_PROP));
ALTER_TABLE.invoke(client, database, tableName, tbl, envContext);
return null;
});
} else {
metaClients.run(client -> {
client.createTable(tbl);
return null;
});
// [LINKEDIN] comply to the new signature of setting Hive table's properties by
// setting newly added parameters as empty container.
setHmsTableParameters(newMetadataLocation, tbl, ImmutableMap.of(),
ImmutableSet.of(), false, ImmutableMap.of());

try {
persistTableVerbal(tbl, tableExists);
commitStatus = CommitStatus.SUCCESS;
} catch (Throwable persistFailure) {
LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database, tableName, persistFailure);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
switch (commitStatus) {
case SUCCESS:
break;
case FAILURE:
throw persistFailure;
case UNKNOWN:
throw new CommitStateUnknownException(persistFailure);
}
}
threw = false;
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);

Expand All @@ -216,30 +220,31 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new RuntimeException("Interrupted during commit", e);

} finally {
if (threw && !metadataUpdatedSuccessfully(newMetadataLocation)) {
// if anything went wrong and meta store hasn't been updated, clean up the uncommitted metadata file
io().deleteFile(newMetadataLocation);
}
unlock(lockId);
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
}
}

private boolean metadataUpdatedSuccessfully(String newMetadataLocation) {
boolean metadataUpdatedSuccessfully = false;
try {
Table tbl;
boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName));
if (tableExists) {
tbl = metaClients.run(client -> client.getTable(database, tableName));
if (tbl.getParameters().get(METADATA_LOCATION_PROP).equals(newMetadataLocation)) {
metadataUpdatedSuccessfully = true;
}
}
} catch (Exception e) {
// This indicate the client might be closed and we cannout make sure whether the table has been updated, so
// assume it succeeds to avoid table pointing to non-exist location
metadataUpdatedSuccessfully = true;
/**
* [LINKEDIN] a log-enhanced persistTable as a refactoring inspired by
* org.apache.iceberg.hive.HiveTableOperations#persistTable
*/
void persistTableVerbal(Table tbl, boolean tableExists) throws TException, InterruptedException {
if (tableExists) {
metaClients.run(client -> {
EnvironmentContext envContext = new EnvironmentContext(
ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)
);
LOG.debug("Updating the metadata location of the following table:");
logTable(tbl);
LOG.debug("Metadata Location: {}", tbl.getParameters().get(METADATA_LOCATION_PROP));
ALTER_TABLE.invoke(client, database, tableName, tbl, envContext);
return null;
});
} else {
metaClients.run(client -> {
client.createTable(tbl);
return null;
});
}
return metadataUpdatedSuccessfully;
}
}
Loading