Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ public static void beforeClass() {
String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix;
S3FileIO fileIO = new S3FileIO(clientFactory::s3);
glueCatalog = new GlueCatalog();
glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), glue, fileIO);
glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), glue,
LockManagers.defaultLockManager(), fileIO);
AwsProperties properties = new AwsProperties();
properties.setGlueCatalogSkipArchive(true);
glueCatalogWithSkip = new GlueCatalog();
glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, glue, fileIO);
glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, glue,
LockManagers.defaultLockManager(), fileIO);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -202,8 +201,7 @@ public void testACL() throws Exception {

@Test
public void testClientFactorySerialization() throws Exception {
S3FileIO fileIO = new S3FileIO();
fileIO.initialize(Maps.newHashMap());
S3FileIO fileIO = new S3FileIO(clientFactory::s3);
write(fileIO);
byte [] data = SerializationUtils.serialize(fileIO);
S3FileIO fileIO2 = SerializationUtils.deserialize(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, Supp
private String warehousePath;
private AwsProperties awsProperties;
private FileIO fileIO;
private LockManager lockManager;

/**
* No-arg constructor to load the catalog dynamically.
Expand All @@ -94,6 +95,7 @@ public void initialize(String name, Map<String, String> properties) {
properties.get(CatalogProperties.WAREHOUSE_LOCATION),
new AwsProperties(properties),
AwsClientFactories.from(properties).glue(),
LockManagers.from(properties),
initializeFileIO(properties));
}

Expand All @@ -109,11 +111,12 @@ private FileIO initializeFileIO(Map<String, String> properties) {
}

@VisibleForTesting
void initialize(String name, String path, AwsProperties properties, GlueClient client, FileIO io) {
void initialize(String name, String path, AwsProperties properties, GlueClient client, LockManager lock, FileIO io) {
this.catalogName = name;
this.awsProperties = properties;
this.warehousePath = cleanWarehousePath(path);
this.glue = client;
this.lockManager = lock;
this.fileIO = io;
}

Expand All @@ -130,7 +133,7 @@ private String cleanWarehousePath(String path) {

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new GlueTableOperations(glue, catalogName, awsProperties, fileIO, tableIdentifier);
return new GlueTableOperations(glue, lockManager, catalogName, awsProperties, fileIO, tableIdentifier);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,20 @@ class GlueTableOperations extends BaseMetastoreTableOperations {
private final String databaseName;
private final String tableName;
private final String fullTableName;
private final String commitLockEntityId;
private final FileIO fileIO;
private final LockManager lockManager;

GlueTableOperations(GlueClient glue, String catalogName, AwsProperties awsProperties,
GlueTableOperations(GlueClient glue, LockManager lockManager, String catalogName, AwsProperties awsProperties,
FileIO fileIO, TableIdentifier tableIdentifier) {
this.glue = glue;
this.awsProperties = awsProperties;
this.databaseName = IcebergToGlueConverter.getDatabaseName(tableIdentifier);
this.tableName = IcebergToGlueConverter.getTableName(tableIdentifier);
this.fullTableName = String.format("%s.%s.%s", catalogName, databaseName, tableName);
this.commitLockEntityId = String.format("%s.%s", databaseName, tableName);
this.fileIO = fileIO;
this.lockManager = lockManager;
}

@Override
Expand Down Expand Up @@ -100,10 +104,11 @@ protected void doRefresh() {
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
boolean exceptionThrown = true;
Table glueTable = getGlueTable();
checkMetadataLocation(glueTable, base);
Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation);
try {
lock(newMetadataLocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this locks in the try block, the finally will be called when the lock fails. I think that's correct for cleaning up the metadata location, but the lock release will currently fail because the lock isn't held by this thread. That will cause the lock failure exception to get replaced by the unlock failure.

I think the solution is to not throw exceptions in release. I'll comment on that below.

Table glueTable = getGlueTable();
checkMetadataLocation(glueTable, base);
Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation);
persistGlueTable(glueTable, properties);
exceptionThrown = false;
} catch (ConcurrentModificationException e) {
Expand All @@ -114,9 +119,14 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
} catch (SdkException e) {
throw new CommitFailedException(e, "Cannot commit %s because unexpected exception contacting AWS", tableName());
} finally {
if (exceptionThrown) {
io().deleteFile(newMetadataLocation);
}
cleanupMetadataAndUnlock(exceptionThrown, newMetadataLocation);
}
}

private void lock(String newMetadataLocation) {
if (!lockManager.acquire(commitLockEntityId, newMetadataLocation)) {
throw new IllegalStateException(String.format("Fail to acquire lock %s to commit new metadata at %s",
commitLockEntityId, newMetadataLocation));
}
}

Expand Down Expand Up @@ -180,4 +190,18 @@ private void persistGlueTable(Table glueTable, Map<String, String> parameters) {
.build());
}
}

private void cleanupMetadataAndUnlock(boolean exceptionThrown, String metadataLocation) {
try {
if (exceptionThrown) {
// if anything went wrong, clean up the uncommitted metadata file
io().deleteFile(metadataLocation);
}
} catch (RuntimeException e) {
LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e);
throw e;
} finally {
lockManager.release(commitLockEntityId, metadataLocation);
}
}
}
50 changes: 50 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws.glue;

import java.util.Map;

/**
* An interface for locking, used to ensure Glue catalog commit isolation.
*/
interface LockManager extends AutoCloseable {

/**
* Try to acquire a lock
* @param entityId ID of the entity to lock
* @param ownerId ID of the owner if the lock
* @return if the lock for the entity is acquired by the owner
*/
boolean acquire(String entityId, String ownerId);

/**
* Release a lock
* @param entityId ID of the entity to lock
* @param ownerId ID of the owner if the lock
* @return if the lock for the entity of the owner is released
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite correct. It should return true if the lock was held and released, and false otherwise. This should also note that the contract requires not throwing exceptions from this method.

*/
boolean release(String entityId, String ownerId);

/**
* Initialize lock manager from catalog properties.
* @param properties catalog properties
*/
void initialize(Map<String, String> properties);
}
Loading